Files
seaweedfs/weed/util/singleflight_test.go
Chris Lu 886d50a6a5 feat(mount): singleflight dedup for concurrent chunk reads (#9100)
* feat(mount): add singleflight deduplication for concurrent chunk reads

When multiple FUSE readers request the same uncached chunk concurrently,
only one network fetch is performed. Other readers wait and share the
downloaded data, reducing redundant volume server traffic under parallel
read workloads.

* fix(util): make singleflight panic-safe with defer cleanup

If the provided function panics, the WaitGroup and map entry are now
cleaned up via defer, preventing other waiters from hanging forever.

* fix(filer): remove singleflight from reader_cache to fix buffer ownership

The singleflight wrapper around chunk fetches returned the same []byte
buffer to concurrent callers. Since each SingleChunkCacher owns and
frees its data buffer in destroy(), sharing the same slice would cause
a use-after-free or double-free with the mem allocator.

The downloaders map already deduplicates in-flight downloads for the
same fileId, so the singleflight was redundant at this layer. The
SingleFlightGroup utility is retained for use elsewhere.
2026-04-16 10:18:05 -07:00

160 lines
3.8 KiB
Go

package util
import (
"errors"
"sync"
"sync/atomic"
"testing"
)
func TestSingleFlightGroupDedup(t *testing.T) {
// Verify that two concurrent Do calls for the same key execute fn
// only once. The main goroutine signals right before calling Do,
// and a helper goroutine waits for that signal before closing
// the gate. Because the signal-to-Do gap is a single function
// call on the main goroutine and the key cannot be removed until
// the gate is closed, the helper's close(gate) cannot race ahead.
var g SingleFlightGroup
expected := []byte("result")
var primaryCalls, secondaryCalls int32
fnRunning := make(chan struct{}) // closed by fn
aboutToDo := make(chan struct{}) // closed by main before calling Do
gate := make(chan struct{}) // closed by helper to release fn
// Primary goroutine: executes fn, which blocks on gate.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
g.Do("key1", func() ([]byte, error) {
atomic.AddInt32(&primaryCalls, 1)
close(fnRunning)
<-gate
return expected, nil
})
}()
// Wait for fn to be running (key is now in the map).
<-fnRunning
// Helper goroutine: waits for the main goroutine to signal it is
// about to call Do, then closes gate. By the time close(gate)
// executes, the main goroutine has already entered Do (or is about
// to) -- and since fn is still blocked on gate until this very
// close, the key is still in the map.
go func() {
<-aboutToDo
close(gate)
}()
// Signal the helper then immediately call Do. The key is in the
// map (fn is blocked on gate which has not been closed yet because
// the helper is waiting on aboutToDo which we are about to close).
close(aboutToDo)
v, err := g.Do("key1", func() ([]byte, error) {
atomic.AddInt32(&secondaryCalls, 1)
return nil, nil
})
wg.Wait()
if atomic.LoadInt32(&primaryCalls) != 1 {
t.Errorf("expected primary fn called once, got %d", primaryCalls)
}
if atomic.LoadInt32(&secondaryCalls) != 0 {
t.Errorf("expected secondary fn never called, got %d", secondaryCalls)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if string(v) != string(expected) {
t.Errorf("got %q, want %q", v, expected)
}
}
func TestSingleFlightGroupDifferentKeys(t *testing.T) {
var g SingleFlightGroup
var calls int32
fn := func() ([]byte, error) {
atomic.AddInt32(&calls, 1)
return []byte("ok"), nil
}
var wg sync.WaitGroup
wg.Add(3)
for _, key := range []string{"a", "b", "c"} {
go func(k string) {
defer wg.Done()
g.Do(k, fn)
}(key)
}
wg.Wait()
if c := atomic.LoadInt32(&calls); c != 3 {
t.Errorf("expected 3 independent calls, got %d", c)
}
}
func TestSingleFlightGroupErrorPropagation(t *testing.T) {
var g SingleFlightGroup
gate := make(chan struct{})
testErr := errors.New("download failed")
fn := func() ([]byte, error) {
<-gate
return nil, testErr
}
const n = 5
errs := make([]error, n)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func(idx int) {
defer wg.Done()
_, errs[idx] = g.Do("errkey", fn)
}(i)
}
close(gate)
wg.Wait()
for i := 0; i < n; i++ {
if !errors.Is(errs[i], testErr) {
t.Errorf("caller %d: expected %v, got %v", i, testErr, errs[i])
}
}
}
func TestSingleFlightGroupFreshCallAfterCompletion(t *testing.T) {
var g SingleFlightGroup
var calls int32
fn := func() ([]byte, error) {
c := atomic.AddInt32(&calls, 1)
return []byte{byte(c)}, nil
}
// First call.
v1, err1 := g.Do("key", fn)
if err1 != nil {
t.Fatalf("first call error: %v", err1)
}
// Second call should trigger a fresh execution.
v2, err2 := g.Do("key", fn)
if err2 != nil {
t.Fatalf("second call error: %v", err2)
}
if atomic.LoadInt32(&calls) != 2 {
t.Errorf("expected fn called twice for sequential calls, got %d", calls)
}
if v1[0] == v2[0] {
t.Errorf("expected different results for sequential calls, got %v and %v", v1, v2)
}
}