mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* feat(mount): add singleflight deduplication for concurrent chunk reads When multiple FUSE readers request the same uncached chunk concurrently, only one network fetch is performed. Other readers wait and share the downloaded data, reducing redundant volume server traffic under parallel read workloads. * fix(util): make singleflight panic-safe with defer cleanup If the provided function panics, the WaitGroup and map entry are now cleaned up via defer, preventing other waiters from hanging forever. * fix(filer): remove singleflight from reader_cache to fix buffer ownership The singleflight wrapper around chunk fetches returned the same []byte buffer to concurrent callers. Since each SingleChunkCacher owns and frees its data buffer in destroy(), sharing the same slice would cause a use-after-free or double-free with the mem allocator. The downloaders map already deduplicates in-flight downloads for the same fileId, so the singleflight was redundant at this layer. The SingleFlightGroup utility is retained for use elsewhere.
546 lines
15 KiB
Go
546 lines
15 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// mockChunkCacheForReaderCache implements chunk cache for testing
|
|
type mockChunkCacheForReaderCache struct {
|
|
data map[string][]byte
|
|
hitCount int32
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func newMockChunkCacheForReaderCache() *mockChunkCacheForReaderCache {
|
|
return &mockChunkCacheForReaderCache{
|
|
data: make(map[string][]byte),
|
|
}
|
|
}
|
|
|
|
func (m *mockChunkCacheForReaderCache) GetChunk(fileId string, minSize uint64) []byte {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if d, ok := m.data[fileId]; ok {
|
|
atomic.AddInt32(&m.hitCount, 1)
|
|
return d
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *mockChunkCacheForReaderCache) ReadChunkAt(data []byte, fileId string, offset uint64) (int, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
if d, ok := m.data[fileId]; ok && int(offset) < len(d) {
|
|
atomic.AddInt32(&m.hitCount, 1)
|
|
n := copy(data, d[offset:])
|
|
return n, nil
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
func (m *mockChunkCacheForReaderCache) SetChunk(fileId string, data []byte) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.data[fileId] = data
|
|
}
|
|
|
|
func (m *mockChunkCacheForReaderCache) GetMaxFilePartSizeInCache() uint64 {
|
|
return 1024 * 1024 // 1MB
|
|
}
|
|
|
|
func (m *mockChunkCacheForReaderCache) IsInCache(fileId string, lockNeeded bool) bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
_, ok := m.data[fileId]
|
|
return ok
|
|
}
|
|
|
|
// TestReaderCacheContextCancellation tests that a reader can cancel its wait
|
|
// while the download continues for other readers
|
|
func TestReaderCacheContextCancellation(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
// Create a ReaderCache - we can't easily test the full flow without mocking HTTP,
|
|
// but we can test the context cancellation in readChunkAt
|
|
rc := NewReaderCache(10, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
// Pre-populate cache to avoid HTTP calls
|
|
testData := []byte("test data for context cancellation")
|
|
cache.SetChunk("test-file-1", testData)
|
|
|
|
// Test that context cancellation works
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
buffer := make([]byte, len(testData))
|
|
n, err := rc.ReadChunkAt(ctx, buffer, "test-file-1", nil, false, 0, len(testData), true)
|
|
if err != nil {
|
|
t.Errorf("Expected no error, got: %v", err)
|
|
}
|
|
if n != len(testData) {
|
|
t.Errorf("Expected %d bytes, got %d", len(testData), n)
|
|
}
|
|
|
|
// Cancel context and verify it doesn't affect already completed reads
|
|
cancel()
|
|
|
|
// Subsequent read with cancelled context should still work from cache
|
|
buffer2 := make([]byte, len(testData))
|
|
n2, err2 := rc.ReadChunkAt(ctx, buffer2, "test-file-1", nil, false, 0, len(testData), true)
|
|
// Note: This may or may not error depending on whether it hits cache
|
|
_ = n2
|
|
_ = err2
|
|
}
|
|
|
|
// TestReaderCacheFallbackToChunkCache tests that when a cacher returns n=0, err=nil,
|
|
// we fall back to the chunkCache
|
|
func TestReaderCacheFallbackToChunkCache(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
// Pre-populate the chunk cache with data
|
|
testData := []byte("fallback test data that should be found in chunk cache")
|
|
cache.SetChunk("fallback-file", testData)
|
|
|
|
rc := NewReaderCache(10, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
// Read should hit the chunk cache
|
|
buffer := make([]byte, len(testData))
|
|
n, err := rc.ReadChunkAt(context.Background(), buffer, "fallback-file", nil, false, 0, len(testData), true)
|
|
|
|
if err != nil {
|
|
t.Errorf("Expected no error, got: %v", err)
|
|
}
|
|
if n != len(testData) {
|
|
t.Errorf("Expected %d bytes, got %d", len(testData), n)
|
|
}
|
|
|
|
// Verify cache was hit
|
|
if cache.hitCount == 0 {
|
|
t.Error("Expected chunk cache to be hit")
|
|
}
|
|
}
|
|
|
|
// TestReaderCacheMultipleReadersWaitForSameChunk tests that multiple readers
|
|
// can wait for the same chunk download to complete
|
|
func TestReaderCacheMultipleReadersWaitForSameChunk(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
// Pre-populate cache so we don't need HTTP
|
|
testData := make([]byte, 1024)
|
|
for i := range testData {
|
|
testData[i] = byte(i % 256)
|
|
}
|
|
cache.SetChunk("shared-chunk", testData)
|
|
|
|
rc := NewReaderCache(10, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
// Launch multiple concurrent readers for the same chunk
|
|
numReaders := 10
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numReaders)
|
|
bytesRead := make(chan int, numReaders)
|
|
|
|
for i := 0; i < numReaders; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
buffer := make([]byte, len(testData))
|
|
n, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, len(testData), true)
|
|
if err != nil {
|
|
errors <- err
|
|
}
|
|
bytesRead <- n
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
close(bytesRead)
|
|
|
|
// Check for errors
|
|
for err := range errors {
|
|
t.Errorf("Reader got error: %v", err)
|
|
}
|
|
|
|
// Verify all readers got the expected data
|
|
for n := range bytesRead {
|
|
if n != len(testData) {
|
|
t.Errorf("Expected %d bytes, got %d", len(testData), n)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestReaderCachePartialRead tests reading at different offsets
|
|
func TestReaderCachePartialRead(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
testData := []byte("0123456789ABCDEFGHIJ")
|
|
cache.SetChunk("partial-read-file", testData)
|
|
|
|
rc := NewReaderCache(10, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
tests := []struct {
|
|
name string
|
|
offset int64
|
|
size int
|
|
expected []byte
|
|
}{
|
|
{"read from start", 0, 5, []byte("01234")},
|
|
{"read from middle", 5, 5, []byte("56789")},
|
|
{"read to end", 15, 5, []byte("FGHIJ")},
|
|
{"read single byte", 10, 1, []byte("A")},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
buffer := make([]byte, tt.size)
|
|
n, err := rc.ReadChunkAt(context.Background(), buffer, "partial-read-file", nil, false, tt.offset, len(testData), true)
|
|
|
|
if err != nil {
|
|
t.Errorf("Expected no error, got: %v", err)
|
|
}
|
|
if n != tt.size {
|
|
t.Errorf("Expected %d bytes, got %d", tt.size, n)
|
|
}
|
|
if string(buffer[:n]) != string(tt.expected) {
|
|
t.Errorf("Expected %q, got %q", tt.expected, buffer[:n])
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestReaderCacheCleanup tests that old downloaders are cleaned up
|
|
func TestReaderCacheCleanup(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
// Create cache with limit of 3
|
|
rc := NewReaderCache(3, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
// Add data for multiple files
|
|
for i := 0; i < 5; i++ {
|
|
fileId := string(rune('A' + i))
|
|
data := []byte("data for file " + fileId)
|
|
cache.SetChunk(fileId, data)
|
|
}
|
|
|
|
// Read from multiple files - should trigger cleanup when exceeding limit
|
|
for i := 0; i < 5; i++ {
|
|
fileId := string(rune('A' + i))
|
|
buffer := make([]byte, 20)
|
|
_, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
|
|
if err != nil {
|
|
t.Errorf("Read error for file %s: %v", fileId, err)
|
|
}
|
|
}
|
|
|
|
// Cache should still work - reads should succeed
|
|
for i := 0; i < 5; i++ {
|
|
fileId := string(rune('A' + i))
|
|
buffer := make([]byte, 20)
|
|
n, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
|
|
if err != nil {
|
|
t.Errorf("Second read error for file %s: %v", fileId, err)
|
|
}
|
|
if n == 0 {
|
|
t.Errorf("Expected data for file %s, got 0 bytes", fileId)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestSingleChunkCacherDoneSignal tests that done channel is always closed
|
|
func TestSingleChunkCacherDoneSignal(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
rc := NewReaderCache(10, cache, nil)
|
|
defer rc.destroy()
|
|
|
|
// Test that we can read even when data is in cache (done channel should work)
|
|
testData := []byte("done signal test")
|
|
cache.SetChunk("done-signal-test", testData)
|
|
|
|
// Multiple goroutines reading same chunk
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 5; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
buffer := make([]byte, len(testData))
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
n, err := rc.ReadChunkAt(ctx, buffer, "done-signal-test", nil, false, 0, len(testData), true)
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
t.Errorf("Unexpected error: %v", err)
|
|
}
|
|
if n == 0 && err == nil {
|
|
t.Error("Got 0 bytes with no error")
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Should complete without hanging
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Success
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatal("Test timed out - done channel may not be signaled correctly")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Tests that exercise SingleChunkCacher concurrency logic
|
|
// ============================================================================
|
|
//
|
|
// These tests use blocking lookupFileIdFn to exercise the wait/cancellation
|
|
// logic in SingleChunkCacher without requiring HTTP calls.
|
|
|
|
// TestSingleChunkCacherLookupError tests handling of lookup errors
|
|
func TestSingleChunkCacherLookupError(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
|
|
// Lookup function that returns an error
|
|
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
|
|
return nil, fmt.Errorf("lookup failed for %s", fileId)
|
|
}
|
|
|
|
rc := NewReaderCache(10, cache, lookupFn)
|
|
defer rc.destroy()
|
|
|
|
buffer := make([]byte, 100)
|
|
_, err := rc.ReadChunkAt(context.Background(), buffer, "error-test", nil, false, 0, 100, true)
|
|
|
|
if err == nil {
|
|
t.Error("Expected an error, got nil")
|
|
}
|
|
}
|
|
|
|
// TestSingleChunkCacherContextCancellationDuringLookup tests that a reader can
|
|
// cancel its wait while the lookup is in progress. This exercises the actual
|
|
// SingleChunkCacher wait/cancel logic.
|
|
func TestSingleChunkCacherContextCancellationDuringLookup(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
lookupStarted := make(chan struct{})
|
|
lookupCanFinish := make(chan struct{})
|
|
|
|
// Lookup function that blocks to simulate slow operation
|
|
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
|
|
close(lookupStarted)
|
|
<-lookupCanFinish // Block until test allows completion
|
|
return nil, fmt.Errorf("lookup completed but reader should have cancelled")
|
|
}
|
|
|
|
rc := NewReaderCache(10, cache, lookupFn)
|
|
defer rc.destroy()
|
|
defer close(lookupCanFinish) // Ensure cleanup
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
readResult := make(chan error, 1)
|
|
|
|
go func() {
|
|
buffer := make([]byte, 100)
|
|
_, err := rc.ReadChunkAt(ctx, buffer, "cancel-during-lookup", nil, false, 0, 100, true)
|
|
readResult <- err
|
|
}()
|
|
|
|
// Wait for lookup to start, then cancel the reader's context
|
|
select {
|
|
case <-lookupStarted:
|
|
cancel() // Cancel the reader while lookup is blocked
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Lookup never started")
|
|
}
|
|
|
|
// Read should return with context.Canceled
|
|
select {
|
|
case err := <-readResult:
|
|
if err != context.Canceled {
|
|
t.Errorf("Expected context.Canceled, got: %v", err)
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Read did not complete after context cancellation")
|
|
}
|
|
}
|
|
|
|
// TestSingleChunkCacherMultipleReadersWaitForDownload tests that multiple readers
|
|
// can wait for the same SingleChunkCacher download to complete. When lookup fails,
|
|
// all readers should receive the same error.
|
|
func TestSingleChunkCacherMultipleReadersWaitForDownload(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
lookupStarted := make(chan struct{})
|
|
lookupCanFinish := make(chan struct{})
|
|
var lookupStartedOnce sync.Once
|
|
|
|
// Lookup function that blocks to simulate slow operation
|
|
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
|
|
lookupStartedOnce.Do(func() { close(lookupStarted) })
|
|
<-lookupCanFinish
|
|
return nil, fmt.Errorf("simulated lookup error")
|
|
}
|
|
|
|
rc := NewReaderCache(10, cache, lookupFn)
|
|
defer rc.destroy()
|
|
|
|
numReaders := 5
|
|
var wg sync.WaitGroup
|
|
errors := make(chan error, numReaders)
|
|
|
|
// Start multiple readers for the same chunk
|
|
for i := 0; i < numReaders; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
buffer := make([]byte, 100)
|
|
_, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, 100, true)
|
|
errors <- err
|
|
}()
|
|
}
|
|
|
|
// Wait for lookup to start, then allow completion
|
|
select {
|
|
case <-lookupStarted:
|
|
close(lookupCanFinish)
|
|
case <-time.After(5 * time.Second):
|
|
close(lookupCanFinish)
|
|
t.Fatal("Lookup never started")
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errors)
|
|
|
|
// All readers should receive an error
|
|
errorCount := 0
|
|
for err := range errors {
|
|
if err != nil {
|
|
errorCount++
|
|
}
|
|
}
|
|
if errorCount != numReaders {
|
|
t.Errorf("Expected %d errors, got %d", numReaders, errorCount)
|
|
}
|
|
}
|
|
|
|
// TestReaderCacheDownloaderDedup tests that concurrent ReadChunkAt calls for
|
|
// the same fileId result in only one network fetch (lookup call), because
|
|
// the downloaders map deduplicates in-flight downloads.
|
|
func TestReaderCacheDownloaderDedup(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
var lookupCount int32
|
|
lookupGate := make(chan struct{})
|
|
|
|
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
|
|
atomic.AddInt32(&lookupCount, 1)
|
|
<-lookupGate
|
|
// Return an error so we don't need to mock the HTTP fetch.
|
|
return nil, fmt.Errorf("simulated lookup for %s", fileId)
|
|
}
|
|
|
|
rc := NewReaderCache(10, cache, lookupFn)
|
|
defer rc.destroy()
|
|
|
|
const numReaders = 10
|
|
var wg sync.WaitGroup
|
|
wg.Add(numReaders)
|
|
|
|
for i := 0; i < numReaders; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
buffer := make([]byte, 100)
|
|
rc.ReadChunkAt(context.Background(), buffer, "dedup-file", nil, false, 0, 100, false)
|
|
}()
|
|
}
|
|
|
|
// Allow downloads to proceed.
|
|
close(lookupGate)
|
|
wg.Wait()
|
|
|
|
count := atomic.LoadInt32(&lookupCount)
|
|
if count != 1 {
|
|
t.Errorf("expected exactly 1 lookup call, got %d", count)
|
|
}
|
|
}
|
|
|
|
// TestSingleChunkCacherOneReaderCancelsOthersContinue tests that when one reader
|
|
// cancels, other readers waiting on the same chunk continue to wait.
|
|
func TestSingleChunkCacherOneReaderCancelsOthersContinue(t *testing.T) {
|
|
cache := newMockChunkCacheForReaderCache()
|
|
lookupStarted := make(chan struct{})
|
|
lookupCanFinish := make(chan struct{})
|
|
var lookupStartedOnce sync.Once
|
|
|
|
lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
|
|
lookupStartedOnce.Do(func() { close(lookupStarted) })
|
|
<-lookupCanFinish
|
|
return nil, fmt.Errorf("simulated error after delay")
|
|
}
|
|
|
|
rc := NewReaderCache(10, cache, lookupFn)
|
|
defer rc.destroy()
|
|
|
|
cancelledReaderDone := make(chan error, 1)
|
|
otherReaderDone := make(chan error, 1)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Start reader that will be cancelled
|
|
go func() {
|
|
buffer := make([]byte, 100)
|
|
_, err := rc.ReadChunkAt(ctx, buffer, "shared-chunk-2", nil, false, 0, 100, true)
|
|
cancelledReaderDone <- err
|
|
}()
|
|
|
|
// Start reader that will NOT be cancelled
|
|
go func() {
|
|
buffer := make([]byte, 100)
|
|
_, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk-2", nil, false, 0, 100, true)
|
|
otherReaderDone <- err
|
|
}()
|
|
|
|
// Wait for lookup to start
|
|
select {
|
|
case <-lookupStarted:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Lookup never started")
|
|
}
|
|
|
|
// Cancel the first reader
|
|
cancel()
|
|
|
|
// First reader should complete with context.Canceled quickly
|
|
select {
|
|
case err := <-cancelledReaderDone:
|
|
if err != context.Canceled {
|
|
t.Errorf("Cancelled reader: expected context.Canceled, got: %v", err)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Cancelled reader did not complete quickly")
|
|
}
|
|
|
|
// Allow the download to complete
|
|
close(lookupCanFinish)
|
|
|
|
// Other reader should eventually complete (with error since lookup returns error)
|
|
select {
|
|
case err := <-otherReaderDone:
|
|
if err == nil || err == context.Canceled {
|
|
t.Errorf("Other reader: expected non-nil non-cancelled error, got: %v", err)
|
|
}
|
|
// Expected: "simulated error after delay"
|
|
case <-time.After(5 * time.Second):
|
|
t.Error("Other reader did not complete")
|
|
}
|
|
}
|