mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* feat(mount): add singleflight deduplication for concurrent chunk reads When multiple FUSE readers request the same uncached chunk concurrently, only one network fetch is performed. Other readers wait and share the downloaded data, reducing redundant volume server traffic under parallel read workloads. * fix(util): make singleflight panic-safe with defer cleanup If the provided function panics, the WaitGroup and map entry are now cleaned up via defer, preventing other waiters from hanging forever. * fix(filer): remove singleflight from reader_cache to fix buffer ownership The singleflight wrapper around chunk fetches returned the same []byte buffer to concurrent callers. Since each SingleChunkCacher owns and frees its data buffer in destroy(), sharing the same slice would cause a use-after-free or double-free with the mem allocator. The downloaders map already deduplicates in-flight downloads for the same fileId, so the singleflight was redundant at this layer. The SingleFlightGroup utility is retained for use elsewhere.
56 lines
1.3 KiB
Go
56 lines
1.3 KiB
Go
package util
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// call represents an in-flight or completed function call.
|
|
type call struct {
|
|
wg sync.WaitGroup
|
|
val []byte
|
|
err error
|
|
}
|
|
|
|
// SingleFlightGroup provides deduplication of concurrent function calls
|
|
// keyed by a string. If multiple goroutines call Do with the same key
|
|
// concurrently, only one executes the function; the others wait and
|
|
// receive the same result.
|
|
//
|
|
// After a call completes, the key is removed so that subsequent calls
|
|
// trigger a fresh execution.
|
|
type SingleFlightGroup struct {
|
|
mu sync.Mutex
|
|
m map[string]*call
|
|
}
|
|
|
|
// Do executes fn once for a given key, even if called concurrently.
|
|
// All callers for the same key block until fn returns and then receive
|
|
// the same result.
|
|
func (g *SingleFlightGroup) Do(key string, fn func() ([]byte, error)) ([]byte, error) {
|
|
g.mu.Lock()
|
|
if g.m == nil {
|
|
g.m = make(map[string]*call)
|
|
}
|
|
if c, ok := g.m[key]; ok {
|
|
g.mu.Unlock()
|
|
c.wg.Wait()
|
|
return c.val, c.err
|
|
}
|
|
c := &call{}
|
|
c.wg.Add(1)
|
|
g.m[key] = c
|
|
g.mu.Unlock()
|
|
|
|
// Use defer to ensure cleanup even if fn panics. This prevents
|
|
// waiters from hanging indefinitely and removes the stale key.
|
|
defer func() {
|
|
g.mu.Lock()
|
|
delete(g.m, key)
|
|
c.wg.Done()
|
|
g.mu.Unlock()
|
|
}()
|
|
|
|
c.val, c.err = fn()
|
|
return c.val, c.err
|
|
}
|