diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 91ec1cfb0..1833ad165 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -256,10 +256,11 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { Fsync: *b.fsync, } ar := &operation.VolumeAssignRequest{ - Count: 1, - Collection: *b.collection, - Replication: *b.replication, - DiskType: *b.diskType, + Count: 1, + Collection: *b.collection, + Replication: *b.replication, + DiskType: *b.diskType, + ExpectedDataSize: uint64(fileSize), } if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil { fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 56c3a7f07..e849c8e07 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -553,7 +553,7 @@ func detectMimeType(f *os.File) string { return mimeType } -func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { +func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64, _ uint64) (chunk *filer_pb.FileChunk, err error) { uploader, uploaderErr := operation.NewUploader() if uploaderErr != nil { return nil, fmt.Errorf("upload data: %w", uploaderErr) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index b82cdf52a..978b0675e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -270,7 +270,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer } } - manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0) + manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0, uint64(len(data))) if err != nil { return nil, err } @@ -281,4 +281,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer return } -type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64, expectedDataSize uint64) (chunk *filer_pb.FileChunk, err error) diff --git a/weed/filer/filechunk_manifest_test.go b/weed/filer/filechunk_manifest_test.go index 0cf3ef298..4411b01fb 100644 --- a/weed/filer/filechunk_manifest_test.go +++ b/weed/filer/filechunk_manifest_test.go @@ -132,7 +132,7 @@ func newTestManifestStore() *testManifestStore { } func (s *testManifestStore) saveFunc() SaveDataAsChunkFunctionType { - return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { + return func(reader io.Reader, name string, offset int64, tsNs int64, _ uint64) (*filer_pb.FileChunk, error) { data, err := io.ReadAll(reader) if err != nil { return nil, err diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 3efcc4fc9..036c825cf 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -57,6 +57,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi Replication: util.Nvl(f.metaLogReplication, rule.Replication), DiskType: rule.DiskType, WritableVolumeCount: rule.VolumeGrowthCount, + ExpectedDataSize: uint64(len(data)), } assignResult, err := operation.Assign(context.Background(), f.GetMaster, f.GrpcDialOption, assignRequest) diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index a7b7d5926..4cd21d1a5 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -72,7 +72,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade fileFullPath := pages.fh.FullPath() fileName := fileFullPath.Name() - chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs) + chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs, uint64(size)) if err != nil { glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err) pages.lastErr = err diff --git a/weed/mount/fileids_pool.go b/weed/mount/fileids_pool.go deleted file mode 100644 index 391b665a9..000000000 --- a/weed/mount/fileids_pool.go +++ /dev/null @@ -1,181 +0,0 @@ -package mount - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" -) - -// FileIdEntry holds a pre-allocated file ID from the filer/master, ready for -// immediate use by an upload worker without an AssignVolume round-trip. -type FileIdEntry struct { - FileId string - Host string // volume server address (already adjusted for access mode) - Auth security.EncodedJwt - Time time.Time -} - -// FileIdPool pre-allocates file IDs in batches so that chunk uploads can grab -// one instantly instead of blocking on an AssignVolume RPC per chunk. -// -// The pool is refilled in the background when it drops below a low-water mark. -// All IDs are allocated with the mount's global (replication, collection, ttl, -// diskType, dataCenter) parameters. Path-based storage rules (filer.conf) are -// NOT applied to pooled IDs since the pool allocates ahead of any specific file -// path. This is an intentional tradeoff for writeback cache performance. -type FileIdPool struct { - wfs *WFS - - mu sync.Mutex - cond *sync.Cond - entries []FileIdEntry // available pre-allocated IDs - filling bool // true when a background refill is in progress - - poolSize int // target pool capacity - batchSize int // how many IDs to request per Assign RPC - lowWater int // refill trigger threshold - maxAge time.Duration -} - -func NewFileIdPool(wfs *WFS) *FileIdPool { - concurrency := wfs.option.ConcurrentWriters - if concurrency <= 0 { - concurrency = 128 // match default async flush worker count - } - pool := &FileIdPool{ - wfs: wfs, - poolSize: concurrency * 2, - batchSize: concurrency, - lowWater: concurrency, - maxAge: 25 * time.Second, // conservative; JWT TTL is typically 30s+ - } - pool.cond = sync.NewCond(&pool.mu) - return pool -} - -// Get returns a pre-allocated file ID entry. If the pool is empty and a refill -// is in progress, callers wait for it to complete rather than failing. Returns -// an error only if the Assign RPC fails. -func (p *FileIdPool) Get() (FileIdEntry, error) { - p.mu.Lock() - defer p.mu.Unlock() - - for { - p.evictExpired() - - if len(p.entries) > 0 { - entry := p.entries[0] - p.entries = p.entries[1:] - if len(p.entries) < p.lowWater && !p.filling { - p.filling = true - go p.doRefill() - } - return entry, nil - } - - // Pool empty. - if p.filling { - // Wait for the in-flight refill to complete. - p.cond.Wait() - continue - } - - // No refill in progress — start one synchronously. - p.filling = true - p.mu.Unlock() - entries, err := p.assignBatch(p.batchSize) - p.mu.Lock() - p.filling = false - p.cond.Broadcast() - - if err != nil { - return FileIdEntry{}, fmt.Errorf("fileIdPool: %w", err) - } - p.entries = append(p.entries, entries...) - // Loop back to pop from entries. - } -} - -func (p *FileIdPool) evictExpired() { - cutoff := time.Now().Add(-p.maxAge) - i := 0 - for i < len(p.entries) && p.entries[i].Time.Before(cutoff) { - i++ - } - if i > 0 { - p.entries = p.entries[i:] - } -} - -// doRefill runs in a background goroutine to refill the pool. -func (p *FileIdPool) doRefill() { - entries, err := p.assignBatch(p.batchSize) - if err != nil { - glog.V(1).Infof("fileIdPool refill: %v", err) - } - - p.mu.Lock() - if err == nil { - p.entries = append(p.entries, entries...) - } - p.filling = false - p.cond.Broadcast() - p.mu.Unlock() -} - -// assignBatch requests `count` file IDs from the filer using individual -// Count=1 RPCs over a single gRPC connection. Each response includes a -// per-fid JWT, so uploads work correctly when JWT security is enabled. -// -// We use individual requests instead of Count=N because the master generates -// one JWT for the base file ID only (master_grpc_server_assign.go:158), and -// the volume server validates that the JWT's Fid matches the upload's file ID -// exactly (volume_server_handlers.go:367). Sequential IDs derived from a -// Count=N response would fail this check. -// -// Note: the AssignVolumeRequest intentionally omits the Path field. Pooled IDs -// use the mount's global storage parameters, not per-path rules from filer.conf -// (detectStorageOption / MatchStorageRule). This is a writeback cache tradeoff. -func (p *FileIdPool) assignBatch(count int) ([]FileIdEntry, error) { - var entries []FileIdEntry - err := p.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - now := time.Now() - req := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: p.wfs.option.Replication, - Collection: p.wfs.option.Collection, - TtlSec: p.wfs.option.TtlSec, - DiskType: string(p.wfs.option.DiskType), - DataCenter: p.wfs.option.DataCenter, - ExpectedDataSize: uint64(p.wfs.option.ChunkSizeLimit), - } - for i := 0; i < count; i++ { - resp, assignErr := client.AssignVolume(context.Background(), req) - if assignErr != nil { - if len(entries) > 0 { - break // partial batch is fine - } - return assignErr - } - if resp.Error != "" { - if len(entries) > 0 { - break - } - return fmt.Errorf("assign: %s", resp.Error) - } - entries = append(entries, FileIdEntry{ - FileId: resp.FileId, - Host: p.wfs.AdjustedUrl(resp.Location), - Auth: security.EncodedJwt(resp.Auth), - Time: now, - }) - } - return nil - }) - return entries, err -} diff --git a/weed/mount/fileids_pool_test.go b/weed/mount/fileids_pool_test.go deleted file mode 100644 index 400c4889a..000000000 --- a/weed/mount/fileids_pool_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package mount - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" -) - -// mockFilerClient simulates AssignVolume RPCs with configurable latency. -type mockFilerClient struct { - latency time.Duration - nextVolId uint32 - nextKey uint64 - mu sync.Mutex -} - -func (m *mockFilerClient) AssignVolume(_ context.Context, req *filer_pb.AssignVolumeRequest, _ ...grpc.CallOption) (*filer_pb.AssignVolumeResponse, error) { - if m.latency > 0 { - time.Sleep(m.latency) - } - m.mu.Lock() - defer m.mu.Unlock() - - count := int(req.Count) - if count <= 0 { - count = 1 - } - vid := needle.VolumeId(m.nextVolId + 1) - key := m.nextKey + 1000 // start at a non-zero key for valid hex encoding - m.nextKey += uint64(count) - m.nextVolId++ - - fid := needle.NewFileId(vid, key, 0x12345678) - return &filer_pb.AssignVolumeResponse{ - FileId: fid.String(), - Count: int32(count), - Auth: "test-jwt-token", - Location: &filer_pb.Location{ - Url: "127.0.0.1:8080", - PublicUrl: "127.0.0.1:8080", - GrpcPort: 18080, - }, - }, nil -} - -// TestFileIdPoolSequentialIds verifies that batch assignment generates -// correct sequential file IDs from a single AssignVolume response. -func TestFileIdPoolSequentialIds(t *testing.T) { - mock := &mockFilerClient{} - resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 5}) - if err != nil { - t.Fatal(err) - } - - baseFid, parseErr := needle.ParseFileIdFromString(resp.FileId) - if parseErr != nil { - t.Fatal(parseErr) - } - - baseKey := uint64(baseFid.Key) - for i := 0; i < int(resp.Count); i++ { - fid := needle.NewFileId(baseFid.VolumeId, baseKey+uint64(i), uint32(baseFid.Cookie)) - t.Logf("ID %d: %s", i, fid.String()) - parsed, err := needle.ParseFileIdFromString(fid.String()) - if err != nil { - t.Fatalf("Failed to parse sequential ID %d: %v", i, err) - } - if uint64(parsed.Key) != baseKey+uint64(i) { - t.Fatalf("ID %d: expected key %d, got %d", i, baseKey+uint64(i), uint64(parsed.Key)) - } - } -} - -// TestFileIdPoolExpiry verifies that expired entries are evicted. -func TestFileIdPoolExpiry(t *testing.T) { - pool := &FileIdPool{ - maxAge: time.Second, - } - pool.cond = sync.NewCond(&pool.mu) - now := time.Now() - pool.entries = []FileIdEntry{ - {FileId: "1,old", Time: now.Add(-2 * time.Second)}, - {FileId: "2,old", Time: now.Add(-2 * time.Second)}, - {FileId: "3,fresh", Time: now}, - } - pool.evictExpired() - if len(pool.entries) != 1 { - t.Fatalf("expected 1 entry after eviction, got %d", len(pool.entries)) - } - if pool.entries[0].FileId != "3,fresh" { - t.Fatalf("expected fresh entry, got %s", pool.entries[0].FileId) - } -} - -// TestFileIdPoolGetWaitsForRefill verifies that concurrent Get() calls wait -// for an in-flight refill instead of returning an error. -func TestFileIdPoolGetWaitsForRefill(t *testing.T) { - pool := &FileIdPool{ - poolSize: 10, - batchSize: 5, - lowWater: 3, - maxAge: 30 * time.Second, - } - pool.cond = sync.NewCond(&pool.mu) - - // Simulate a slow refill in progress. - pool.filling = true - done := make(chan struct{}) - go func() { - // After a short delay, deliver entries and signal. - time.Sleep(10 * time.Millisecond) - pool.mu.Lock() - now := time.Now() - for i := 0; i < 5; i++ { - pool.entries = append(pool.entries, FileIdEntry{ - FileId: fmt.Sprintf("1,%x12345678", 1000+i), - Host: "127.0.0.1:8080", - Auth: "jwt", - Time: now, - }) - } - pool.filling = false - pool.cond.Broadcast() - pool.mu.Unlock() - close(done) - }() - - // Get() should wait for the refill, not return an error. - entry, err := pool.Get() - if err != nil { - t.Fatalf("Get() returned error while refill in progress: %v", err) - } - if entry.FileId == "" { - t.Fatal("Get() returned empty entry") - } - <-done -} - -// BenchmarkPoolGetVsDirectAssign measures the latency difference between -// getting a file ID from the pool vs a direct (simulated) AssignVolume RPC. -func BenchmarkPoolGetVsDirectAssign(b *testing.B) { - assignLatencies := []time.Duration{ - 0, - 100 * time.Microsecond, - 1 * time.Millisecond, - 5 * time.Millisecond, - } - - for _, latency := range assignLatencies { - name := "latency=" + latency.String() - - b.Run("DirectAssign/"+name, func(b *testing.B) { - mock := &mockFilerClient{latency: latency} - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 1}) - if err != nil { - b.Fatal(err) - } - } - }) - - b.Run("PoolGet/"+name, func(b *testing.B) { - pool := &FileIdPool{ - maxAge: 30 * time.Second, - lowWater: 1000000, // disable background refill - } - pool.cond = sync.NewCond(&pool.mu) - // Pre-fill with a fixed-size pool; refill under StopTimer when depleted. - const preload = 4096 - refill := func() { - now := time.Now() - for i := 0; i < preload; i++ { - pool.entries = append(pool.entries, FileIdEntry{ - FileId: fmt.Sprintf("1,%x12345678", i), - Host: "127.0.0.1:8080", - Auth: security.EncodedJwt("test-jwt"), - Time: now, - }) - } - } - refill() - b.ResetTimer() - for i := 0; i < b.N; i++ { - pool.mu.Lock() - if len(pool.entries) == 0 { - pool.mu.Unlock() - b.StopTimer() - refill() - b.StartTimer() - pool.mu.Lock() - } - _ = pool.entries[0] - pool.entries = pool.entries[1:] - pool.mu.Unlock() - } - }) - } -} - -// BenchmarkConcurrentPoolGet measures pool throughput under concurrent access. -func BenchmarkConcurrentPoolGet(b *testing.B) { - for _, workers := range []int{1, 4, 16, 64} { - b.Run(fmt.Sprintf("workers=%d", workers), func(b *testing.B) { - pool := &FileIdPool{ - maxAge: 30 * time.Second, - lowWater: 1000000, - } - pool.cond = sync.NewCond(&pool.mu) - // Pre-fill - now := time.Now() - total := b.N*workers + 1000 - pool.entries = make([]FileIdEntry, total) - for i := range pool.entries { - pool.entries[i] = FileIdEntry{ - FileId: fmt.Sprintf("1,%x12345678", i+1000), - Host: "127.0.0.1:8080", - Auth: security.EncodedJwt("test-jwt"), - Time: now, - } - } - - var ops atomic.Int64 - b.ResetTimer() - - var wg sync.WaitGroup - perWorker := b.N - for w := 0; w < workers; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < perWorker; i++ { - pool.mu.Lock() - if len(pool.entries) > 0 { - _ = pool.entries[0] - pool.entries = pool.entries[1:] - ops.Add(1) - } - pool.mu.Unlock() - } - }() - } - wg.Wait() - b.ReportMetric(float64(ops.Load())/b.Elapsed().Seconds(), "ids/sec") - }) - } -} - -// BenchmarkBatchAssign measures the cost of batch vs individual assign RPCs. -func BenchmarkBatchAssign(b *testing.B) { - for _, batchSize := range []int{1, 8, 16, 32} { - b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) { - mock := &mockFilerClient{latency: 1 * time.Millisecond} - totalIds := 0 - b.ResetTimer() - for i := 0; i < b.N; i++ { - resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: int32(batchSize)}) - if err != nil { - b.Fatal(err) - } - - baseFid, _ := needle.ParseFileIdFromString(resp.FileId) - baseKey := uint64(baseFid.Key) - for j := 0; j < int(resp.Count); j++ { - _ = needle.NewFileId(baseFid.VolumeId, baseKey+uint64(j), uint32(baseFid.Cookie)).String() - totalIds++ - } - } - b.ReportMetric(float64(totalIds)/b.Elapsed().Seconds(), "ids/sec") - }) - } -} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 2c3f588d5..969dfbba2 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -142,7 +142,6 @@ type WFS struct { dirHotWindow time.Duration dirHotThreshold int dirIdleEvict time.Duration - fileIdPool *FileIdPool // openMtimeCache maps inode -> [mtime_sec, mtime_ns] from the last Open. // Used to decide whether to set FOPEN_KEEP_CACHE on subsequent opens. @@ -387,11 +386,6 @@ func (wfs *WFS) StartBackgroundTasks() error { go wfs.loopEvictIdleDirCache() go wfs.loopProactiveFlush() - if wfs.option.WritebackCache { - wfs.fileIdPool = NewFileIdPool(wfs) - glog.V(0).Infof("file ID pool enabled for writeback cache (batch=%d)", wfs.fileIdPool.batchSize) - } - return nil } diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 9a4b205d6..bdcb05d90 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -315,7 +315,7 @@ func (wfs *WFS) maybeMergeChunks(fileFullPath util.FullPath, compactedChunks []* for { n, readErr := io.ReadFull(reader, buf) if n > 0 { - chunk, uploadErr := saveFunc(bytes.NewReader(buf[:n]), "", offset, 0) + chunk, uploadErr := saveFunc(bytes.NewReader(buf[:n]), "", offset, 0, uint64(n)) if uploadErr != nil { return nil, uploadErr } diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index aa411b04b..42d8c891d 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -8,13 +8,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" ) func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { - return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + return func(reader io.Reader, filename string, offset int64, tsNs int64, _ uint64) (chunk *filer_pb.FileChunk, err error) { uploader, err := operation.NewUploader() if err != nil { return @@ -35,37 +34,19 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun return fileUrl } - var fileId string - var uploadResult *operation.UploadResult - var data []byte - - if wfs.fileIdPool != nil { - // Use pre-allocated file ID from pool — avoids AssignVolume RPC. - fileId, uploadResult, err, data = uploader.UploadWithAssignFunc( - func() (string, string, security.EncodedJwt, error) { - entry, getErr := wfs.fileIdPool.Get() - if getErr != nil { - return "", "", "", getErr - } - return entry.FileId, entry.Host, entry.Auth, nil - }, - uploadOption, genFileUrlFn, reader, - ) - } else { - fileId, uploadResult, err, data = uploader.UploadWithRetry( - wfs, - &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: wfs.option.Replication, - Collection: wfs.option.Collection, - TtlSec: wfs.option.TtlSec, - DiskType: string(wfs.option.DiskType), - DataCenter: wfs.option.DataCenter, - Path: string(fullPath), - }, - uploadOption, genFileUrlFn, reader, - ) - } + fileId, uploadResult, err, data := uploader.UploadWithRetry( + wfs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DiskType: string(wfs.option.DiskType), + DataCenter: wfs.option.DataCenter, + Path: string(fullPath), + }, + uploadOption, genFileUrlFn, reader, + ) if err != nil { glog.V(0).Infof("upload data %v: %v", filename, err) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 4526a2c3a..1d94f5bab 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -52,16 +52,23 @@ type GetMasterFn func(ctx context.Context) pb.ServerAddress func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) + var totalBytes int64 for index, file := range files { results[index].FileName = file.FileName + totalBytes += file.FileSize + } + var avgBytes uint64 + if n := len(files); n > 0 { + avgBytes = uint64((totalBytes + int64(n) - 1) / int64(n)) } ar := &VolumeAssignRequest{ - Count: uint64(len(files)), - Replication: pref.Replication, - Collection: pref.Collection, - DataCenter: pref.DataCenter, - Ttl: pref.Ttl, - DiskType: pref.DiskType, + Count: uint64(len(files)), + Replication: pref.Replication, + Collection: pref.Collection, + DataCenter: pref.DataCenter, + Ttl: pref.Ttl, + DiskType: pref.DiskType, + ExpectedDataSize: avgBytes, } ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { @@ -150,11 +157,12 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j var id string if fi.Pref.DataCenter != "" { ar := &VolumeAssignRequest{ - Count: uint64(chunks), - Replication: fi.Pref.Replication, - Collection: fi.Pref.Collection, - Ttl: fi.Pref.Ttl, - DiskType: fi.Pref.DiskType, + Count: uint64(chunks), + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, + ExpectedDataSize: uint64(chunkSize), } ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { @@ -163,12 +171,18 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j } for i := int64(0); i < chunks; i++ { if fi.Pref.DataCenter == "" { + remaining := fi.FileSize - i*chunkSize + thisChunk := chunkSize + if remaining < thisChunk { + thisChunk = remaining + } ar := &VolumeAssignRequest{ - Count: 1, - Replication: fi.Pref.Replication, - Collection: fi.Pref.Collection, - Ttl: fi.Pref.Ttl, - DiskType: fi.Pref.DiskType, + Count: 1, + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, + ExpectedDataSize: uint64(thisChunk), } ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index b0e27d7d6..57109b149 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -36,7 +36,7 @@ type ChunkedUploadOption struct { Jwt security.EncodedJwt MimeType string Cipher bool // encrypt data on volume servers - AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) + AssignFunc func(ctx context.Context, count int, expectedDataSize uint64) (*VolumeAssignRequest, *AssignResult, error) UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing } @@ -146,7 +146,7 @@ uploadLoop: // Upload chunk in parallel goroutine wg.Add(1) - go func(offset int64, buf *bytes.Buffer) { + go func(offset int64, buf *bytes.Buffer, size int64) { defer func() { chunkBufferPool.Put(buf) <-bytesBufferLimitChan @@ -154,7 +154,7 @@ uploadLoop: }() // Assign volume for this chunk - _, assignResult, assignErr := opt.AssignFunc(ctx, 1) + _, assignResult, assignErr := opt.AssignFunc(ctx, 1, uint64(size)) if assignErr != nil { uploadErrLock.Lock() if uploadErr == nil { @@ -235,7 +235,7 @@ uploadLoop: fileChunks = append(fileChunks, chunk) fileChunksLock.Unlock() - }(chunkOffset, bytesBuffer) + }(chunkOffset, bytesBuffer, dataSize) // Update offset for next chunk chunkOffset += dataSize diff --git a/weed/operation/upload_chunked_test.go b/weed/operation/upload_chunked_test.go index ec7ffbba2..4ea2965bc 100644 --- a/weed/operation/upload_chunked_test.go +++ b/weed/operation/upload_chunked_test.go @@ -20,7 +20,7 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { uploadAttempts := 0 // Create a mock assign function that succeeds for first chunk, then fails - assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + assignFunc := func(ctx context.Context, count int, expectedDataSize uint64) (*VolumeAssignRequest, *AssignResult, error) { uploadAttempts++ if uploadAttempts == 1 { @@ -97,7 +97,7 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) { reader := bytes.NewReader(testData) // Mock assign function that always succeeds - assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + assignFunc := func(ctx context.Context, count int, expectedDataSize uint64) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ Fid: "test-fid,1234", Url: "http://test-volume:8080", @@ -187,7 +187,7 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) { // Cancel immediately to trigger cancellation handling cancel() - assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + assignFunc := func(ctx context.Context, count int, expectedDataSize uint64) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ Fid: "test-fid,1234", Url: "http://test-volume:8080", @@ -268,7 +268,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) { failAfter: 10000, // Fail after 10KB } - assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + assignFunc := func(ctx context.Context, count int, expectedDataSize uint64) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ Fid: "test-fid,1234", Url: "http://test-volume:8080", diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index e115a11d0..722bd352a 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -172,29 +172,6 @@ func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, ho return } -// AssignFunc returns a file ID, host, and auth token for uploading. -type AssignFunc func() (fileId string, host string, auth security.EncodedJwt, err error) - -// UploadWithAssignFunc uploads data using a caller-provided assign function. -// This allows callers to use pre-allocated file IDs from a pool instead of -// making an AssignVolume RPC per chunk. -func (uploader *Uploader) UploadWithAssignFunc(assignFn AssignFunc, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { - bytesReader, ok := reader.(*util.BytesReader) - if ok { - data = bytesReader.Bytes - } else { - data, err = io.ReadAll(reader) - if err != nil { - glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err) - err = fmt.Errorf("read input: %w", err) - return - } - glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) - } - fileId, uploadResult, err = uploader.uploadWithRetryData(assignFn, uploadOption, genFileUrlFn, data) - return -} - // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { @@ -211,6 +188,12 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) } + // Tell the master the real chunk size so its effectiveSize accounting + // doesn't fall back to the 1 MB DefaultNeedleSizeEstimate per fid. + if assignRequest.ExpectedDataSize == 0 { + assignRequest.ExpectedDataSize = uint64(len(data)) + } + fileId, uploadResult, err = uploader.uploadWithRetryData(func() (fileId string, host string, auth security.EncodedJwt, err error) { // grpc assign volume if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index cac24c946..cb4c6a5e8 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -956,7 +956,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, err } @@ -986,7 +986,7 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer // Prepare chunk copy (assign new volume and get source URL) fileId := originalChunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, rangeChunk.Size) if err != nil { return nil, err } @@ -1015,16 +1015,17 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer } // assignNewVolume assigns a new volume for the chunk -func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) { +func (s3a *S3ApiServer) assignNewVolume(dstPath string, expectedDataSize uint64) (*filer_pb.AssignVolumeResponse, error) { var assignResult *filer_pb.AssignVolumeResponse err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: "", - Collection: "", - DiskType: "", - DataCenter: s3a.option.DataCenter, - Path: dstPath, + Count: 1, + Replication: "", + Collection: "", + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: dstPath, + ExpectedDataSize: expectedDataSize, }) if err != nil { return fmt.Errorf("assign volume: %w", err) @@ -1251,9 +1252,9 @@ func (s3a *S3ApiServer) setChunkFileId(chunk *filer_pb.FileChunk, assignResult * } // prepareChunkCopy prepares a chunk for copying by assigning a new volume and looking up the source URL -func (s3a *S3ApiServer) prepareChunkCopy(sourceFileId, dstPath string) (*filer_pb.AssignVolumeResponse, string, error) { +func (s3a *S3ApiServer) prepareChunkCopy(sourceFileId, dstPath string, expectedDataSize uint64) (*filer_pb.AssignVolumeResponse, string, error) { // Assign new volume - assignResult, err := s3a.assignNewVolume(dstPath) + assignResult, err := s3a.assignNewVolume(dstPath, expectedDataSize) if err != nil { return nil, "", fmt.Errorf("assign volume: %w", err) } @@ -1447,7 +1448,7 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, err } @@ -1546,7 +1547,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, nil, err } @@ -1827,7 +1828,7 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, err } @@ -2169,7 +2170,7 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, err } @@ -2388,7 +2389,7 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun // Prepare chunk copy (assign new volume and get source URL) fileId := chunk.GetFileIdString() - assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath) + assignResult, srcUrl, err := s3a.prepareChunkCopy(fileId, dstPath, chunk.Size) if err != nil { return nil, err } diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index e8af0776a..381f6b4bb 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -454,16 +454,17 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } // Create assign function for chunked upload - assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { + assignFunc := func(ctx context.Context, count int, expectedDataSize uint64) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { var assignResult *filer_pb.AssignVolumeResponse err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ - Count: int32(count), - Replication: "", - Collection: collection, - DiskType: "", - DataCenter: s3a.option.DataCenter, - Path: filePath, + Count: int32(count), + Replication: "", + Collection: collection, + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: filePath, + ExpectedDataSize: expectedDataSize, }) if err != nil { return fmt.Errorf("assign volume: %w", err) diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 915b530d1..eaaf0018a 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -133,6 +133,14 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re chunkSize = (entry.Remote.RemoteSize + 999) / 1000 } + // Now that chunkSize is known, hint it to the master so per-chunk + // assigns don't fall back to the 1 MB default estimate. Slightly over- + // estimates for the final partial chunk (< chunkSize) by design. + assignRequest.ExpectedDataSize = uint64(chunkSize) + if altRequest != nil { + altRequest.ExpectedDataSize = uint64(chunkSize) + } + dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) var chunks []*filer_pb.FileChunk diff --git a/weed/server/filer_server_handlers_copy.go b/weed/server/filer_server_handlers_copy.go index 50de72c48..2577c41b2 100644 --- a/weed/server/filer_server_handlers_copy.go +++ b/weed/server/filer_server_handlers_copy.go @@ -629,7 +629,7 @@ func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*fi // Save the manifest data as a new chunk saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { // Assign a new file ID - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so, uint64(len(data))) if assignErr != nil { return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr) } @@ -730,7 +730,7 @@ func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks [] // streamCopyChunk copies a chunk using streaming to minimize memory usage func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) { // Assign a new file ID for destination - fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so, srcChunk.Size) if err != nil { return nil, fmt.Errorf("failed to assign new file ID: %w", err) } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index ae9c46fb2..971fef973 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -35,7 +35,7 @@ type FilerPostResult struct { Error string `json:"error,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption, expectedDataSize uint64) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc() start := time.Now() @@ -44,6 +44,10 @@ func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.Stor }() ar, altRequest := so.ToAssignRequests(1) + ar.ExpectedDataSize = expectedDataSize + if altRequest != nil { + altRequest.ExpectedDataSize = expectedDataSize + } // Use a context that ignores cancellation from the request context assignCtx := context.WithoutCancel(ctx) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 5e3a5f0e4..94af39b8d 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -350,13 +350,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { - return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { + return func(reader io.Reader, name string, offset int64, tsNs int64, expectedDataSize uint64) (*filer_pb.FileChunk, error) { var fileId string var uploadResult *operation.UploadResult err := util.Retry("saveAsChunk", func() error { // assign one file id for one chunk - assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) + assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so, expectedDataSize) if assignErr != nil { return assignErr } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 40a5ca4f5..256adf82c 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -208,7 +208,7 @@ func (fs *FilerServer) dataToChunkWithSSE(ctx context.Context, r *http.Request, err := util.Retry("filerDataToChunk", func() error { // assign one file id for one chunk - fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so) + fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so, uint64(len(data))) if uploadErr != nil { glog.V(4).InfofCtx(ctx, "retry later due to assign error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go index 3afa1b6af..224eb72e9 100644 --- a/weed/server/filer_server_tus_handlers.go +++ b/weed/server/filer_server_tus_handlers.go @@ -383,7 +383,7 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of chunkData := chunkBuf[:n] // Assign file ID from master for this sub-chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so, uint64(n)) if assignErr != nil { uploadErr = fmt.Errorf("assign volume: %w", assignErr) break diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 700441883..36f23cb44 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -402,7 +402,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, return fs.stat(ctx, name) } -func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { +func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64, _ uint64) (chunk *filer_pb.FileChunk, err error) { // Delegate to the shared filer-gateway helper so WebDAV, NFS, and // any future filer-backed protocols go through one implementation of // AssignVolume + volume-server upload. @@ -449,7 +449,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) { var chunk *filer_pb.FileChunk - chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano()) + chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano(), uint64(len(data))) if flushErr != nil { if f.entry.Attributes.Mtime == 0 {