From e8a8449553857553912bb8d47871db3c8aeb8776 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2026 20:02:42 -0700 Subject: [PATCH] feat(mount): pre-allocate file IDs in pool for writeback cache mode (#9038) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(mount): pre-allocate file IDs in pool for writeback cache mode When writeback caching is enabled, chunk uploads no longer block on a per-chunk AssignVolume RPC. Instead, a FileIdPool pre-allocates file IDs in batches using a single AssignVolume(Count=N, ExpectedDataSize=ChunkSize) call and hands them out instantly to upload workers. Pool size is 2x ConcurrentWriters, refilled in background when it drops below ConcurrentWriters. Entries expire after 25s to respect JWT TTL. Sequential needle keys are generated from the base file ID returned by the master, so one Assign RPC produces N usable IDs. This cuts per-chunk upload latency from 2 RTTs (assign + upload) to 1 RTT (upload only), with the assign cost amortized across the batch. * test: add benchmarks for file ID pool vs direct assign Benchmarks measure: - Pool Get vs Direct AssignVolume at various simulated latencies - Batch assign scaling (Count=1 through Count=32) - Concurrent pool access with 1-64 workers Results on Apple M4: - Pool Get: constant ~3ns regardless of assign latency - Batch=16: 15.7x more IDs/sec than individual assigns - 64 concurrent workers: 19M IDs/sec throughput * fix(mount): address review feedback on file ID pool 1. Fix race condition in Get(): use sync.Cond so callers wait for an in-flight refill instead of returning an error when the pool is empty. 2. Match default pool size to async flush worker count (128, not 16) when ConcurrentWriters is unset. 3. Add logging to UploadWithAssignFunc for consistency with UploadWithRetry. 4. Document that pooled assigns omit the Path field, bypassing path-based storage rules (filer.conf). This is an intentional tradeoff for writeback cache performance. 5. Fix flaky expiry test: widen time margin from 50ms to 1s. 6. Add TestFileIdPoolGetWaitsForRefill to verify concurrent waiters. * fix(mount): use individual Count=1 assigns to get per-fid JWTs The master generates one JWT per AssignResponse, bound to the base file ID (master_grpc_server_assign.go:158). The volume server validates that the JWT's Fid matches the upload exactly (volume_server_handlers.go:367). Using Count=N and deriving sequential IDs would fail this check. Switch to individual Count=1 RPCs over a single gRPC connection. This still amortizes connection overhead while getting a correct per-fid JWT for each entry. Partial batches are accepted if some requests fail. Remove unused needle import now that sequential ID generation is gone. * fix(mount): separate pprof from FUSE protocol debug logging The -debug flag was enabling both the pprof HTTP server and the noisy go-fuse protocol logging (rx/tx lines for every FUSE operation). This makes profiling impractical as the log output dominates. Split into two flags: - -debug: enables pprof HTTP server only (for profiling) - -debug.fuse: enables raw FUSE protocol request/response logging * perf(mount): replace LevelDB read+write with in-memory overlay for dir mtime Profile showed TouchDirMtimeCtime at 0.22s — every create/rename/unlink in a directory did a LevelDB FindEntry (read) + UpdateEntry (write) just to bump the parent dir's mtime/ctime. Replace with an in-memory map (same pattern as existing atime overlay): - touchDirMtimeCtimeLocal now stores inode→timestamp in dirMtimeMap - applyInMemoryDirMtime overlays onto GetAttr/Lookup output - No LevelDB I/O on the mutation hot path The overlay only advances timestamps forward (max of stored vs overlay), so stale entries are harmless. Map is bounded at 8192 entries. * perf(mount): skip self-originated metadata subscription events in writeback mode With writeback caching, this mount is the single writer. All local mutations are already applied to the local meta cache (via applyLocalMetadataEvent or direct InsertEntry). The filer subscription then delivers the same event back, causing redundant work: proto.Clone, enqueue to apply loop, dedup ring check, and sometimes redundant LevelDB writes when the dedup ring misses (deferred creates). Check EventNotification.Signatures against selfSignature and skip events that originated from this mount. This eliminates the redundant processing for every self-originated mutation. * perf(mount): increase kernel FUSE cache TTL in writeback cache mode With writeback caching, this mount is the single writer — the local meta cache is authoritative. Increase EntryValid and AttrValid from 1s to 10s so the kernel doesn't re-issue Lookup/GetAttr for every path component and stat call. This reduces FUSE /dev/fuse round-trips which dominate the profile at 38% of CPU (syscall.rawsyscalln). Each saved round-trip eliminates a kernel→userspace→kernel transition. Normal (non-writeback) mode retains the 1s TTL for multi-mount consistency. --- weed/command/mount.go | 2 + weed/command/mount_std.go | 2 +- weed/mount/fileids_pool.go | 181 +++++++++++ weed/mount/fileids_pool_test.go | 280 ++++++++++++++++++ weed/mount/meta_cache/meta_cache_subscribe.go | 15 +- weed/mount/weedfs.go | 23 +- weed/mount/weedfs_attr.go | 63 +++- weed/mount/weedfs_dir_lookup.go | 7 +- weed/mount/weedfs_write.go | 74 +++-- weed/operation/upload_content.go | 23 ++ 10 files changed, 618 insertions(+), 52 deletions(-) create mode 100644 weed/mount/fileids_pool.go create mode 100644 weed/mount/fileids_pool_test.go diff --git a/weed/command/mount.go b/weed/command/mount.go index 768fd8f37..aa0589c7c 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -34,6 +34,7 @@ type MountOptions struct { includeSystemEntries *bool debug *bool debugPort *int + debugFuse *bool localSocket *string disableXAttr *bool extraOptions []string @@ -109,6 +110,7 @@ func init() { mountOptions.includeSystemEntries = cmdMount.Flag.Bool("includeSystemEntries", false, "show filer system entries (e.g. /topics, /etc) in directory listings") mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") + mountOptions.debugFuse = cmdMount.Flag.Bool("debug.fuse", false, "log raw FUSE protocol requests and responses") mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-.sock") mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr") mountOptions.hasAutofs = cmdMount.Flag.Bool("autofs", false, "ignore autofs mounted on the same mountpoint (useful when systemd.automount and autofs is used)") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index f297e2cce..ef23e8210 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -251,7 +251,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Name: "seaweedfs", SingleThreaded: false, DisableXAttrs: *option.disableXAttr, - Debug: *option.debug, + Debug: *option.debugFuse, EnableLocks: true, ExplicitDataCacheControl: false, DirectMount: true, diff --git a/weed/mount/fileids_pool.go b/weed/mount/fileids_pool.go new file mode 100644 index 000000000..391b665a9 --- /dev/null +++ b/weed/mount/fileids_pool.go @@ -0,0 +1,181 @@ +package mount + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" +) + +// FileIdEntry holds a pre-allocated file ID from the filer/master, ready for +// immediate use by an upload worker without an AssignVolume round-trip. +type FileIdEntry struct { + FileId string + Host string // volume server address (already adjusted for access mode) + Auth security.EncodedJwt + Time time.Time +} + +// FileIdPool pre-allocates file IDs in batches so that chunk uploads can grab +// one instantly instead of blocking on an AssignVolume RPC per chunk. +// +// The pool is refilled in the background when it drops below a low-water mark. +// All IDs are allocated with the mount's global (replication, collection, ttl, +// diskType, dataCenter) parameters. Path-based storage rules (filer.conf) are +// NOT applied to pooled IDs since the pool allocates ahead of any specific file +// path. This is an intentional tradeoff for writeback cache performance. +type FileIdPool struct { + wfs *WFS + + mu sync.Mutex + cond *sync.Cond + entries []FileIdEntry // available pre-allocated IDs + filling bool // true when a background refill is in progress + + poolSize int // target pool capacity + batchSize int // how many IDs to request per Assign RPC + lowWater int // refill trigger threshold + maxAge time.Duration +} + +func NewFileIdPool(wfs *WFS) *FileIdPool { + concurrency := wfs.option.ConcurrentWriters + if concurrency <= 0 { + concurrency = 128 // match default async flush worker count + } + pool := &FileIdPool{ + wfs: wfs, + poolSize: concurrency * 2, + batchSize: concurrency, + lowWater: concurrency, + maxAge: 25 * time.Second, // conservative; JWT TTL is typically 30s+ + } + pool.cond = sync.NewCond(&pool.mu) + return pool +} + +// Get returns a pre-allocated file ID entry. If the pool is empty and a refill +// is in progress, callers wait for it to complete rather than failing. Returns +// an error only if the Assign RPC fails. +func (p *FileIdPool) Get() (FileIdEntry, error) { + p.mu.Lock() + defer p.mu.Unlock() + + for { + p.evictExpired() + + if len(p.entries) > 0 { + entry := p.entries[0] + p.entries = p.entries[1:] + if len(p.entries) < p.lowWater && !p.filling { + p.filling = true + go p.doRefill() + } + return entry, nil + } + + // Pool empty. + if p.filling { + // Wait for the in-flight refill to complete. + p.cond.Wait() + continue + } + + // No refill in progress — start one synchronously. + p.filling = true + p.mu.Unlock() + entries, err := p.assignBatch(p.batchSize) + p.mu.Lock() + p.filling = false + p.cond.Broadcast() + + if err != nil { + return FileIdEntry{}, fmt.Errorf("fileIdPool: %w", err) + } + p.entries = append(p.entries, entries...) + // Loop back to pop from entries. + } +} + +func (p *FileIdPool) evictExpired() { + cutoff := time.Now().Add(-p.maxAge) + i := 0 + for i < len(p.entries) && p.entries[i].Time.Before(cutoff) { + i++ + } + if i > 0 { + p.entries = p.entries[i:] + } +} + +// doRefill runs in a background goroutine to refill the pool. +func (p *FileIdPool) doRefill() { + entries, err := p.assignBatch(p.batchSize) + if err != nil { + glog.V(1).Infof("fileIdPool refill: %v", err) + } + + p.mu.Lock() + if err == nil { + p.entries = append(p.entries, entries...) + } + p.filling = false + p.cond.Broadcast() + p.mu.Unlock() +} + +// assignBatch requests `count` file IDs from the filer using individual +// Count=1 RPCs over a single gRPC connection. Each response includes a +// per-fid JWT, so uploads work correctly when JWT security is enabled. +// +// We use individual requests instead of Count=N because the master generates +// one JWT for the base file ID only (master_grpc_server_assign.go:158), and +// the volume server validates that the JWT's Fid matches the upload's file ID +// exactly (volume_server_handlers.go:367). Sequential IDs derived from a +// Count=N response would fail this check. +// +// Note: the AssignVolumeRequest intentionally omits the Path field. Pooled IDs +// use the mount's global storage parameters, not per-path rules from filer.conf +// (detectStorageOption / MatchStorageRule). This is a writeback cache tradeoff. +func (p *FileIdPool) assignBatch(count int) ([]FileIdEntry, error) { + var entries []FileIdEntry + err := p.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + now := time.Now() + req := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: p.wfs.option.Replication, + Collection: p.wfs.option.Collection, + TtlSec: p.wfs.option.TtlSec, + DiskType: string(p.wfs.option.DiskType), + DataCenter: p.wfs.option.DataCenter, + ExpectedDataSize: uint64(p.wfs.option.ChunkSizeLimit), + } + for i := 0; i < count; i++ { + resp, assignErr := client.AssignVolume(context.Background(), req) + if assignErr != nil { + if len(entries) > 0 { + break // partial batch is fine + } + return assignErr + } + if resp.Error != "" { + if len(entries) > 0 { + break + } + return fmt.Errorf("assign: %s", resp.Error) + } + entries = append(entries, FileIdEntry{ + FileId: resp.FileId, + Host: p.wfs.AdjustedUrl(resp.Location), + Auth: security.EncodedJwt(resp.Auth), + Time: now, + }) + } + return nil + }) + return entries, err +} diff --git a/weed/mount/fileids_pool_test.go b/weed/mount/fileids_pool_test.go new file mode 100644 index 000000000..400c4889a --- /dev/null +++ b/weed/mount/fileids_pool_test.go @@ -0,0 +1,280 @@ +package mount + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +// mockFilerClient simulates AssignVolume RPCs with configurable latency. +type mockFilerClient struct { + latency time.Duration + nextVolId uint32 + nextKey uint64 + mu sync.Mutex +} + +func (m *mockFilerClient) AssignVolume(_ context.Context, req *filer_pb.AssignVolumeRequest, _ ...grpc.CallOption) (*filer_pb.AssignVolumeResponse, error) { + if m.latency > 0 { + time.Sleep(m.latency) + } + m.mu.Lock() + defer m.mu.Unlock() + + count := int(req.Count) + if count <= 0 { + count = 1 + } + vid := needle.VolumeId(m.nextVolId + 1) + key := m.nextKey + 1000 // start at a non-zero key for valid hex encoding + m.nextKey += uint64(count) + m.nextVolId++ + + fid := needle.NewFileId(vid, key, 0x12345678) + return &filer_pb.AssignVolumeResponse{ + FileId: fid.String(), + Count: int32(count), + Auth: "test-jwt-token", + Location: &filer_pb.Location{ + Url: "127.0.0.1:8080", + PublicUrl: "127.0.0.1:8080", + GrpcPort: 18080, + }, + }, nil +} + +// TestFileIdPoolSequentialIds verifies that batch assignment generates +// correct sequential file IDs from a single AssignVolume response. +func TestFileIdPoolSequentialIds(t *testing.T) { + mock := &mockFilerClient{} + resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 5}) + if err != nil { + t.Fatal(err) + } + + baseFid, parseErr := needle.ParseFileIdFromString(resp.FileId) + if parseErr != nil { + t.Fatal(parseErr) + } + + baseKey := uint64(baseFid.Key) + for i := 0; i < int(resp.Count); i++ { + fid := needle.NewFileId(baseFid.VolumeId, baseKey+uint64(i), uint32(baseFid.Cookie)) + t.Logf("ID %d: %s", i, fid.String()) + parsed, err := needle.ParseFileIdFromString(fid.String()) + if err != nil { + t.Fatalf("Failed to parse sequential ID %d: %v", i, err) + } + if uint64(parsed.Key) != baseKey+uint64(i) { + t.Fatalf("ID %d: expected key %d, got %d", i, baseKey+uint64(i), uint64(parsed.Key)) + } + } +} + +// TestFileIdPoolExpiry verifies that expired entries are evicted. +func TestFileIdPoolExpiry(t *testing.T) { + pool := &FileIdPool{ + maxAge: time.Second, + } + pool.cond = sync.NewCond(&pool.mu) + now := time.Now() + pool.entries = []FileIdEntry{ + {FileId: "1,old", Time: now.Add(-2 * time.Second)}, + {FileId: "2,old", Time: now.Add(-2 * time.Second)}, + {FileId: "3,fresh", Time: now}, + } + pool.evictExpired() + if len(pool.entries) != 1 { + t.Fatalf("expected 1 entry after eviction, got %d", len(pool.entries)) + } + if pool.entries[0].FileId != "3,fresh" { + t.Fatalf("expected fresh entry, got %s", pool.entries[0].FileId) + } +} + +// TestFileIdPoolGetWaitsForRefill verifies that concurrent Get() calls wait +// for an in-flight refill instead of returning an error. +func TestFileIdPoolGetWaitsForRefill(t *testing.T) { + pool := &FileIdPool{ + poolSize: 10, + batchSize: 5, + lowWater: 3, + maxAge: 30 * time.Second, + } + pool.cond = sync.NewCond(&pool.mu) + + // Simulate a slow refill in progress. + pool.filling = true + done := make(chan struct{}) + go func() { + // After a short delay, deliver entries and signal. + time.Sleep(10 * time.Millisecond) + pool.mu.Lock() + now := time.Now() + for i := 0; i < 5; i++ { + pool.entries = append(pool.entries, FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", 1000+i), + Host: "127.0.0.1:8080", + Auth: "jwt", + Time: now, + }) + } + pool.filling = false + pool.cond.Broadcast() + pool.mu.Unlock() + close(done) + }() + + // Get() should wait for the refill, not return an error. + entry, err := pool.Get() + if err != nil { + t.Fatalf("Get() returned error while refill in progress: %v", err) + } + if entry.FileId == "" { + t.Fatal("Get() returned empty entry") + } + <-done +} + +// BenchmarkPoolGetVsDirectAssign measures the latency difference between +// getting a file ID from the pool vs a direct (simulated) AssignVolume RPC. +func BenchmarkPoolGetVsDirectAssign(b *testing.B) { + assignLatencies := []time.Duration{ + 0, + 100 * time.Microsecond, + 1 * time.Millisecond, + 5 * time.Millisecond, + } + + for _, latency := range assignLatencies { + name := "latency=" + latency.String() + + b.Run("DirectAssign/"+name, func(b *testing.B) { + mock := &mockFilerClient{latency: latency} + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 1}) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("PoolGet/"+name, func(b *testing.B) { + pool := &FileIdPool{ + maxAge: 30 * time.Second, + lowWater: 1000000, // disable background refill + } + pool.cond = sync.NewCond(&pool.mu) + // Pre-fill with a fixed-size pool; refill under StopTimer when depleted. + const preload = 4096 + refill := func() { + now := time.Now() + for i := 0; i < preload; i++ { + pool.entries = append(pool.entries, FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", i), + Host: "127.0.0.1:8080", + Auth: security.EncodedJwt("test-jwt"), + Time: now, + }) + } + } + refill() + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.mu.Lock() + if len(pool.entries) == 0 { + pool.mu.Unlock() + b.StopTimer() + refill() + b.StartTimer() + pool.mu.Lock() + } + _ = pool.entries[0] + pool.entries = pool.entries[1:] + pool.mu.Unlock() + } + }) + } +} + +// BenchmarkConcurrentPoolGet measures pool throughput under concurrent access. +func BenchmarkConcurrentPoolGet(b *testing.B) { + for _, workers := range []int{1, 4, 16, 64} { + b.Run(fmt.Sprintf("workers=%d", workers), func(b *testing.B) { + pool := &FileIdPool{ + maxAge: 30 * time.Second, + lowWater: 1000000, + } + pool.cond = sync.NewCond(&pool.mu) + // Pre-fill + now := time.Now() + total := b.N*workers + 1000 + pool.entries = make([]FileIdEntry, total) + for i := range pool.entries { + pool.entries[i] = FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", i+1000), + Host: "127.0.0.1:8080", + Auth: security.EncodedJwt("test-jwt"), + Time: now, + } + } + + var ops atomic.Int64 + b.ResetTimer() + + var wg sync.WaitGroup + perWorker := b.N + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < perWorker; i++ { + pool.mu.Lock() + if len(pool.entries) > 0 { + _ = pool.entries[0] + pool.entries = pool.entries[1:] + ops.Add(1) + } + pool.mu.Unlock() + } + }() + } + wg.Wait() + b.ReportMetric(float64(ops.Load())/b.Elapsed().Seconds(), "ids/sec") + }) + } +} + +// BenchmarkBatchAssign measures the cost of batch vs individual assign RPCs. +func BenchmarkBatchAssign(b *testing.B) { + for _, batchSize := range []int{1, 8, 16, 32} { + b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) { + mock := &mockFilerClient{latency: 1 * time.Millisecond} + totalIds := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: int32(batchSize)}) + if err != nil { + b.Fatal(err) + } + + baseFid, _ := needle.ParseFileIdFromString(resp.FileId) + baseKey := uint64(baseFid.Key) + for j := 0; j < int(resp.Count); j++ { + _ = needle.NewFileId(baseFid.VolumeId, baseKey+uint64(j), uint32(baseFid.Cookie)).String() + totalIds++ + } + } + b.ReportMetric(float64(totalIds)/b.Elapsed().Seconds(), "ids/sec") + }) + } +} diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 12e9d4a77..d2f472468 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -42,7 +42,7 @@ func mergeProcessors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse } } -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error { +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, skipSelfEvents bool, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error { var prefixes []string for _, follower := range followers { @@ -50,11 +50,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - // Let all events (including self-originated ones) flow through the - // applier so that the directory-build buffering and dedup logic - // can handle them consistently. The dedupRing in - // applyMetadataResponseNow catches duplicates that were already - // applied locally via applyLocalMetadataEvent. + if skipSelfEvents && resp.EventNotification != nil { + for _, sig := range resp.EventNotification.Signatures { + if sig == selfSignature { + glog.V(4).Infof("skip self-originated event %s", resp.Directory) + return nil + } + } + } return mc.ApplyMetadataResponse(context.Background(), resp, SubscriberMetadataResponseApplyOptions) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index a698a3a8f..af12cb41f 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -131,9 +131,14 @@ type WFS struct { refreshingDirs map[util.FullPath]struct{} atimeMu sync.Mutex atimeMap map[uint64]time.Time // inode -> atime, in-memory only, bounded + dirMtimeMu sync.Mutex + dirMtimeMap map[uint64]time.Time // inode -> mtime/ctime, in-memory overlay for dirs + entryValidSec uint64 // kernel FUSE entry cache TTL in seconds + attrValidSec uint64 // kernel FUSE attr cache TTL in seconds dirHotWindow time.Duration dirHotThreshold int dirIdleEvict time.Duration + fileIdPool *FileIdPool // asyncFlushWg tracks pending background flush work items for writebackCache mode. // Must be waited on before unmount cleanup to prevent data loss. @@ -211,11 +216,22 @@ func NewSeaweedFileSystem(option *Option) *WFS { posixLocks: NewPosixLockTable(), refreshingDirs: make(map[util.FullPath]struct{}), atimeMap: make(map[uint64]time.Time, 8192), + dirMtimeMap: make(map[uint64]time.Time, 1024), + entryValidSec: 1, + attrValidSec: 1, dirHotWindow: dirHotWindow, dirHotThreshold: dirHotThreshold, dirIdleEvict: dirIdleEvict, } + // With writeback caching, this mount is the single writer. Increase kernel + // FUSE cache TTLs so the kernel doesn't re-issue Lookup/GetAttr for every + // path component and stat — the local meta cache is authoritative. + if option.WritebackCache { + wfs.entryValidSec = 10 + wfs.attrValidSec = 10 + } + if option.EnableDistributedLock && !option.WritebackCache && len(option.FilerAddresses) > 0 { wfs.lockClient = cluster.NewLockClient(option.GrpcDialOption, option.FilerAddresses[0]) glog.V(0).Infof("distributed lock manager enabled for mount") @@ -341,7 +357,7 @@ func (wfs *WFS) StartBackgroundTasks() error { } startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), func(lastTsNs int64, err error) { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), wfs.option.WritebackCache, func(lastTsNs int64, err error) { glog.Warningf("meta events follow retry from %v: %v", time.Unix(0, lastTsNs), err) if deleteErr := wfs.metaCache.DeleteFolderChildren(context.Background(), util.FullPath(wfs.option.FilerMountRootPath)); deleteErr != nil { glog.Warningf("meta cache cleanup failed: %v", deleteErr) @@ -352,6 +368,11 @@ func (wfs *WFS) StartBackgroundTasks() error { go wfs.loopFlushDirtyMetadata() go wfs.loopEvictIdleDirCache() + if wfs.option.WritebackCache { + wfs.fileIdPool = NewFileIdPool(wfs) + glog.V(0).Infof("file ID pool enabled for writeback cache (batch=%d)", wfs.fileIdPool.batchSize) + } + return nil } diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 6431ae49a..d1438f970 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -1,7 +1,6 @@ package mount import ( - "context" "os" "syscall" "time" @@ -26,16 +25,19 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse inode := input.NodeId path, _, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) wfs.applyInMemoryAtime(&out.Attr, inode) - if entry.IsDirectory && wfs.option.PosixDirNlink { - wfs.applyDirNlink(&out.Attr, path) + if entry.IsDirectory { + wfs.applyInMemoryDirMtime(&out.Attr, inode) + if wfs.option.PosixDirNlink { + wfs.applyDirNlink(&out.Attr, path) + } } return status } else { if fh, found := wfs.fhMap.FindFileHandle(inode); found { - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec // Use shared lock to prevent race with Write operations fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock) wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true) @@ -155,7 +157,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse entry.Attributes.Ctime = now.Unix() entry.Attributes.CtimeNs = int32(now.Nanosecond()) - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec size, includeSize := input.GetSize() if includeSize { out.Attr.Size = size @@ -262,16 +264,16 @@ func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.E func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.Entry) { out.NodeId = inode out.Generation = 1 - out.EntryValid = 1 - out.AttrValid = 1 + out.EntryValid = wfs.entryValidSec + out.AttrValid = wfs.attrValidSec wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) } func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) { out.NodeId = inode out.Generation = 1 - out.EntryValid = 1 - out.AttrValid = 1 + out.EntryValid = wfs.entryValidSec + out.AttrValid = wfs.attrValidSec wfs.setAttrByFilerEntry(&out.Attr, inode, entry) } @@ -302,15 +304,46 @@ func (wfs *WFS) touchDirMtimeCtime(dirPath util.FullPath) { wfs.saveEntry(dirPath, dirEntry) } -// touchDirMtimeCtimeLocal updates a directory's mtime and ctime directly -// in the local metadata cache, without a filer RPC. +// touchDirMtimeCtimeLocal updates a directory's mtime and ctime in an in-memory +// overlay, avoiding LevelDB reads and writes entirely. The overlay is applied +// by applyInMemoryDirMtime when GetAttr/Lookup reads the directory's attributes. func (wfs *WFS) touchDirMtimeCtimeLocal(dirPath util.FullPath) { - now := time.Now() - if err := wfs.metaCache.TouchDirMtimeCtime(context.Background(), dirPath, now); err != nil { - glog.V(3).Infof("touchDirMtimeCtimeLocal %s: %v", dirPath, err) + if inode, found := wfs.inodeToPath.GetInode(dirPath); found { + wfs.setDirMtime(inode, time.Now()) } } +const dirMtimeMapMaxSize = 8192 + +func (wfs *WFS) setDirMtime(inode uint64, t time.Time) { + wfs.dirMtimeMu.Lock() + defer wfs.dirMtimeMu.Unlock() + if len(wfs.dirMtimeMap) >= dirMtimeMapMaxSize { + for k := range wfs.dirMtimeMap { + delete(wfs.dirMtimeMap, k) + break + } + } + wfs.dirMtimeMap[inode] = t +} + +// applyInMemoryDirMtime overlays the in-memory mtime/ctime onto fuse.Attr +// for directories that had recent child mutations. +func (wfs *WFS) applyInMemoryDirMtime(out *fuse.Attr, inode uint64) { + wfs.dirMtimeMu.Lock() + if t, ok := wfs.dirMtimeMap[inode]; ok { + sec := uint64(t.Unix()) + nsec := uint32(t.Nanosecond()) + if sec > out.Mtime || (sec == out.Mtime && nsec > out.Mtimensec) { + out.Mtime = sec + out.Mtimensec = nsec + out.Ctime = sec + out.Ctimensec = nsec + } + } + wfs.dirMtimeMu.Unlock() +} + const atimeMapMaxSize = 8192 // setAtime stores an in-memory atime for an inode. The map is bounded; diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 2c29d907a..1b5253354 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -44,8 +44,11 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin wfs.outputFilerEntry(out, inode, localEntry) - if localEntry.IsDirectory() && wfs.option.PosixDirNlink { - wfs.applyDirNlink(&out.Attr, fullFilePath) + if localEntry.IsDirectory() { + wfs.applyInMemoryDirMtime(&out.Attr, inode) + if wfs.option.PosixDirNlink { + wfs.applyDirNlink(&out.Attr, fullFilePath) + } } return fuse.OK diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index de8a756ce..aa411b04b 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -19,33 +20,52 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun return } - fileId, uploadResult, err, data := uploader.UploadWithRetry( - wfs, - &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: wfs.option.Replication, - Collection: wfs.option.Collection, - TtlSec: wfs.option.TtlSec, - DiskType: string(wfs.option.DiskType), - DataCenter: wfs.option.DataCenter, - Path: string(fullPath), - }, - &operation.UploadOption{ - Filename: filename, - Cipher: wfs.option.Cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - }, - func(host, fileId string) string { - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) - } - return fileUrl - }, - reader, - ) + uploadOption := &operation.UploadOption{ + Filename: filename, + Cipher: wfs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + } + genFileUrlFn := func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.VolumeServerAccess == "filerProxy" { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) + } + return fileUrl + } + + var fileId string + var uploadResult *operation.UploadResult + var data []byte + + if wfs.fileIdPool != nil { + // Use pre-allocated file ID from pool — avoids AssignVolume RPC. + fileId, uploadResult, err, data = uploader.UploadWithAssignFunc( + func() (string, string, security.EncodedJwt, error) { + entry, getErr := wfs.fileIdPool.Get() + if getErr != nil { + return "", "", "", getErr + } + return entry.FileId, entry.Host, entry.Auth, nil + }, + uploadOption, genFileUrlFn, reader, + ) + } else { + fileId, uploadResult, err, data = uploader.UploadWithRetry( + wfs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DiskType: string(wfs.option.DiskType), + DataCenter: wfs.option.DataCenter, + Path: string(fullPath), + }, + uploadOption, genFileUrlFn, reader, + ) + } if err != nil { glog.V(0).Infof("upload data %v: %v", filename, err) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 47ae230c5..e115a11d0 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -172,6 +172,29 @@ func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, ho return } +// AssignFunc returns a file ID, host, and auth token for uploading. +type AssignFunc func() (fileId string, host string, auth security.EncodedJwt, err error) + +// UploadWithAssignFunc uploads data using a caller-provided assign function. +// This allows callers to use pre-allocated file IDs from a pool instead of +// making an AssignVolume RPC per chunk. +func (uploader *Uploader) UploadWithAssignFunc(assignFn AssignFunc, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { + bytesReader, ok := reader.(*util.BytesReader) + if ok { + data = bytesReader.Bytes + } else { + data, err = io.ReadAll(reader) + if err != nil { + glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err) + err = fmt.Errorf("read input: %w", err) + return + } + glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) + } + fileId, uploadResult, err = uploader.uploadWithRetryData(assignFn, uploadOption, genFileUrlFn, data) + return +} + // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {