Files
seaweedfs/weed/util/singleflight.go
Chris Lu 886d50a6a5 feat(mount): singleflight dedup for concurrent chunk reads (#9100)
* feat(mount): add singleflight deduplication for concurrent chunk reads

When multiple FUSE readers request the same uncached chunk concurrently,
only one network fetch is performed. Other readers wait and share the
downloaded data, reducing redundant volume server traffic under parallel
read workloads.

* fix(util): make singleflight panic-safe with defer cleanup

If the provided function panics, the WaitGroup and map entry are now
cleaned up via defer, preventing other waiters from hanging forever.

* fix(filer): remove singleflight from reader_cache to fix buffer ownership

The singleflight wrapper around chunk fetches returned the same []byte
buffer to concurrent callers. Since each SingleChunkCacher owns and
frees its data buffer in destroy(), sharing the same slice would cause
a use-after-free or double-free with the mem allocator.

The downloaders map already deduplicates in-flight downloads for the
same fileId, so the singleflight was redundant at this layer. The
SingleFlightGroup utility is retained for use elsewhere.
2026-04-16 10:18:05 -07:00

56 lines
1.3 KiB
Go

package util
import (
"sync"
)
// call represents an in-flight or completed function call.
type call struct {
wg sync.WaitGroup
val []byte
err error
}
// SingleFlightGroup provides deduplication of concurrent function calls
// keyed by a string. If multiple goroutines call Do with the same key
// concurrently, only one executes the function; the others wait and
// receive the same result.
//
// After a call completes, the key is removed so that subsequent calls
// trigger a fresh execution.
type SingleFlightGroup struct {
mu sync.Mutex
m map[string]*call
}
// Do executes fn once for a given key, even if called concurrently.
// All callers for the same key block until fn returns and then receive
// the same result.
func (g *SingleFlightGroup) Do(key string, fn func() ([]byte, error)) ([]byte, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := &call{}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// Use defer to ensure cleanup even if fn panics. This prevents
// waiters from hanging indefinitely and removes the stale key.
defer func() {
g.mu.Lock()
delete(g.m, key)
c.wg.Done()
g.mu.Unlock()
}()
c.val, c.err = fn()
return c.val, c.err
}