diff --git a/weed/command/mount.go b/weed/command/mount.go index 768fd8f37..aa0589c7c 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -34,6 +34,7 @@ type MountOptions struct { includeSystemEntries *bool debug *bool debugPort *int + debugFuse *bool localSocket *string disableXAttr *bool extraOptions []string @@ -109,6 +110,7 @@ func init() { mountOptions.includeSystemEntries = cmdMount.Flag.Bool("includeSystemEntries", false, "show filer system entries (e.g. /topics, /etc) in directory listings") mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") + mountOptions.debugFuse = cmdMount.Flag.Bool("debug.fuse", false, "log raw FUSE protocol requests and responses") mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-.sock") mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr") mountOptions.hasAutofs = cmdMount.Flag.Bool("autofs", false, "ignore autofs mounted on the same mountpoint (useful when systemd.automount and autofs is used)") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index f297e2cce..ef23e8210 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -251,7 +251,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Name: "seaweedfs", SingleThreaded: false, DisableXAttrs: *option.disableXAttr, - Debug: *option.debug, + Debug: *option.debugFuse, EnableLocks: true, ExplicitDataCacheControl: false, DirectMount: true, diff --git a/weed/mount/fileids_pool.go b/weed/mount/fileids_pool.go new file mode 100644 index 000000000..391b665a9 --- /dev/null +++ b/weed/mount/fileids_pool.go @@ -0,0 +1,181 @@ +package mount + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" +) + +// FileIdEntry holds a pre-allocated file ID from the filer/master, ready for +// immediate use by an upload worker without an AssignVolume round-trip. +type FileIdEntry struct { + FileId string + Host string // volume server address (already adjusted for access mode) + Auth security.EncodedJwt + Time time.Time +} + +// FileIdPool pre-allocates file IDs in batches so that chunk uploads can grab +// one instantly instead of blocking on an AssignVolume RPC per chunk. +// +// The pool is refilled in the background when it drops below a low-water mark. +// All IDs are allocated with the mount's global (replication, collection, ttl, +// diskType, dataCenter) parameters. Path-based storage rules (filer.conf) are +// NOT applied to pooled IDs since the pool allocates ahead of any specific file +// path. This is an intentional tradeoff for writeback cache performance. +type FileIdPool struct { + wfs *WFS + + mu sync.Mutex + cond *sync.Cond + entries []FileIdEntry // available pre-allocated IDs + filling bool // true when a background refill is in progress + + poolSize int // target pool capacity + batchSize int // how many IDs to request per Assign RPC + lowWater int // refill trigger threshold + maxAge time.Duration +} + +func NewFileIdPool(wfs *WFS) *FileIdPool { + concurrency := wfs.option.ConcurrentWriters + if concurrency <= 0 { + concurrency = 128 // match default async flush worker count + } + pool := &FileIdPool{ + wfs: wfs, + poolSize: concurrency * 2, + batchSize: concurrency, + lowWater: concurrency, + maxAge: 25 * time.Second, // conservative; JWT TTL is typically 30s+ + } + pool.cond = sync.NewCond(&pool.mu) + return pool +} + +// Get returns a pre-allocated file ID entry. If the pool is empty and a refill +// is in progress, callers wait for it to complete rather than failing. Returns +// an error only if the Assign RPC fails. +func (p *FileIdPool) Get() (FileIdEntry, error) { + p.mu.Lock() + defer p.mu.Unlock() + + for { + p.evictExpired() + + if len(p.entries) > 0 { + entry := p.entries[0] + p.entries = p.entries[1:] + if len(p.entries) < p.lowWater && !p.filling { + p.filling = true + go p.doRefill() + } + return entry, nil + } + + // Pool empty. + if p.filling { + // Wait for the in-flight refill to complete. + p.cond.Wait() + continue + } + + // No refill in progress — start one synchronously. + p.filling = true + p.mu.Unlock() + entries, err := p.assignBatch(p.batchSize) + p.mu.Lock() + p.filling = false + p.cond.Broadcast() + + if err != nil { + return FileIdEntry{}, fmt.Errorf("fileIdPool: %w", err) + } + p.entries = append(p.entries, entries...) + // Loop back to pop from entries. + } +} + +func (p *FileIdPool) evictExpired() { + cutoff := time.Now().Add(-p.maxAge) + i := 0 + for i < len(p.entries) && p.entries[i].Time.Before(cutoff) { + i++ + } + if i > 0 { + p.entries = p.entries[i:] + } +} + +// doRefill runs in a background goroutine to refill the pool. +func (p *FileIdPool) doRefill() { + entries, err := p.assignBatch(p.batchSize) + if err != nil { + glog.V(1).Infof("fileIdPool refill: %v", err) + } + + p.mu.Lock() + if err == nil { + p.entries = append(p.entries, entries...) + } + p.filling = false + p.cond.Broadcast() + p.mu.Unlock() +} + +// assignBatch requests `count` file IDs from the filer using individual +// Count=1 RPCs over a single gRPC connection. Each response includes a +// per-fid JWT, so uploads work correctly when JWT security is enabled. +// +// We use individual requests instead of Count=N because the master generates +// one JWT for the base file ID only (master_grpc_server_assign.go:158), and +// the volume server validates that the JWT's Fid matches the upload's file ID +// exactly (volume_server_handlers.go:367). Sequential IDs derived from a +// Count=N response would fail this check. +// +// Note: the AssignVolumeRequest intentionally omits the Path field. Pooled IDs +// use the mount's global storage parameters, not per-path rules from filer.conf +// (detectStorageOption / MatchStorageRule). This is a writeback cache tradeoff. +func (p *FileIdPool) assignBatch(count int) ([]FileIdEntry, error) { + var entries []FileIdEntry + err := p.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + now := time.Now() + req := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: p.wfs.option.Replication, + Collection: p.wfs.option.Collection, + TtlSec: p.wfs.option.TtlSec, + DiskType: string(p.wfs.option.DiskType), + DataCenter: p.wfs.option.DataCenter, + ExpectedDataSize: uint64(p.wfs.option.ChunkSizeLimit), + } + for i := 0; i < count; i++ { + resp, assignErr := client.AssignVolume(context.Background(), req) + if assignErr != nil { + if len(entries) > 0 { + break // partial batch is fine + } + return assignErr + } + if resp.Error != "" { + if len(entries) > 0 { + break + } + return fmt.Errorf("assign: %s", resp.Error) + } + entries = append(entries, FileIdEntry{ + FileId: resp.FileId, + Host: p.wfs.AdjustedUrl(resp.Location), + Auth: security.EncodedJwt(resp.Auth), + Time: now, + }) + } + return nil + }) + return entries, err +} diff --git a/weed/mount/fileids_pool_test.go b/weed/mount/fileids_pool_test.go new file mode 100644 index 000000000..400c4889a --- /dev/null +++ b/weed/mount/fileids_pool_test.go @@ -0,0 +1,280 @@ +package mount + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +// mockFilerClient simulates AssignVolume RPCs with configurable latency. +type mockFilerClient struct { + latency time.Duration + nextVolId uint32 + nextKey uint64 + mu sync.Mutex +} + +func (m *mockFilerClient) AssignVolume(_ context.Context, req *filer_pb.AssignVolumeRequest, _ ...grpc.CallOption) (*filer_pb.AssignVolumeResponse, error) { + if m.latency > 0 { + time.Sleep(m.latency) + } + m.mu.Lock() + defer m.mu.Unlock() + + count := int(req.Count) + if count <= 0 { + count = 1 + } + vid := needle.VolumeId(m.nextVolId + 1) + key := m.nextKey + 1000 // start at a non-zero key for valid hex encoding + m.nextKey += uint64(count) + m.nextVolId++ + + fid := needle.NewFileId(vid, key, 0x12345678) + return &filer_pb.AssignVolumeResponse{ + FileId: fid.String(), + Count: int32(count), + Auth: "test-jwt-token", + Location: &filer_pb.Location{ + Url: "127.0.0.1:8080", + PublicUrl: "127.0.0.1:8080", + GrpcPort: 18080, + }, + }, nil +} + +// TestFileIdPoolSequentialIds verifies that batch assignment generates +// correct sequential file IDs from a single AssignVolume response. +func TestFileIdPoolSequentialIds(t *testing.T) { + mock := &mockFilerClient{} + resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 5}) + if err != nil { + t.Fatal(err) + } + + baseFid, parseErr := needle.ParseFileIdFromString(resp.FileId) + if parseErr != nil { + t.Fatal(parseErr) + } + + baseKey := uint64(baseFid.Key) + for i := 0; i < int(resp.Count); i++ { + fid := needle.NewFileId(baseFid.VolumeId, baseKey+uint64(i), uint32(baseFid.Cookie)) + t.Logf("ID %d: %s", i, fid.String()) + parsed, err := needle.ParseFileIdFromString(fid.String()) + if err != nil { + t.Fatalf("Failed to parse sequential ID %d: %v", i, err) + } + if uint64(parsed.Key) != baseKey+uint64(i) { + t.Fatalf("ID %d: expected key %d, got %d", i, baseKey+uint64(i), uint64(parsed.Key)) + } + } +} + +// TestFileIdPoolExpiry verifies that expired entries are evicted. +func TestFileIdPoolExpiry(t *testing.T) { + pool := &FileIdPool{ + maxAge: time.Second, + } + pool.cond = sync.NewCond(&pool.mu) + now := time.Now() + pool.entries = []FileIdEntry{ + {FileId: "1,old", Time: now.Add(-2 * time.Second)}, + {FileId: "2,old", Time: now.Add(-2 * time.Second)}, + {FileId: "3,fresh", Time: now}, + } + pool.evictExpired() + if len(pool.entries) != 1 { + t.Fatalf("expected 1 entry after eviction, got %d", len(pool.entries)) + } + if pool.entries[0].FileId != "3,fresh" { + t.Fatalf("expected fresh entry, got %s", pool.entries[0].FileId) + } +} + +// TestFileIdPoolGetWaitsForRefill verifies that concurrent Get() calls wait +// for an in-flight refill instead of returning an error. +func TestFileIdPoolGetWaitsForRefill(t *testing.T) { + pool := &FileIdPool{ + poolSize: 10, + batchSize: 5, + lowWater: 3, + maxAge: 30 * time.Second, + } + pool.cond = sync.NewCond(&pool.mu) + + // Simulate a slow refill in progress. + pool.filling = true + done := make(chan struct{}) + go func() { + // After a short delay, deliver entries and signal. + time.Sleep(10 * time.Millisecond) + pool.mu.Lock() + now := time.Now() + for i := 0; i < 5; i++ { + pool.entries = append(pool.entries, FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", 1000+i), + Host: "127.0.0.1:8080", + Auth: "jwt", + Time: now, + }) + } + pool.filling = false + pool.cond.Broadcast() + pool.mu.Unlock() + close(done) + }() + + // Get() should wait for the refill, not return an error. + entry, err := pool.Get() + if err != nil { + t.Fatalf("Get() returned error while refill in progress: %v", err) + } + if entry.FileId == "" { + t.Fatal("Get() returned empty entry") + } + <-done +} + +// BenchmarkPoolGetVsDirectAssign measures the latency difference between +// getting a file ID from the pool vs a direct (simulated) AssignVolume RPC. +func BenchmarkPoolGetVsDirectAssign(b *testing.B) { + assignLatencies := []time.Duration{ + 0, + 100 * time.Microsecond, + 1 * time.Millisecond, + 5 * time.Millisecond, + } + + for _, latency := range assignLatencies { + name := "latency=" + latency.String() + + b.Run("DirectAssign/"+name, func(b *testing.B) { + mock := &mockFilerClient{latency: latency} + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 1}) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("PoolGet/"+name, func(b *testing.B) { + pool := &FileIdPool{ + maxAge: 30 * time.Second, + lowWater: 1000000, // disable background refill + } + pool.cond = sync.NewCond(&pool.mu) + // Pre-fill with a fixed-size pool; refill under StopTimer when depleted. + const preload = 4096 + refill := func() { + now := time.Now() + for i := 0; i < preload; i++ { + pool.entries = append(pool.entries, FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", i), + Host: "127.0.0.1:8080", + Auth: security.EncodedJwt("test-jwt"), + Time: now, + }) + } + } + refill() + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.mu.Lock() + if len(pool.entries) == 0 { + pool.mu.Unlock() + b.StopTimer() + refill() + b.StartTimer() + pool.mu.Lock() + } + _ = pool.entries[0] + pool.entries = pool.entries[1:] + pool.mu.Unlock() + } + }) + } +} + +// BenchmarkConcurrentPoolGet measures pool throughput under concurrent access. +func BenchmarkConcurrentPoolGet(b *testing.B) { + for _, workers := range []int{1, 4, 16, 64} { + b.Run(fmt.Sprintf("workers=%d", workers), func(b *testing.B) { + pool := &FileIdPool{ + maxAge: 30 * time.Second, + lowWater: 1000000, + } + pool.cond = sync.NewCond(&pool.mu) + // Pre-fill + now := time.Now() + total := b.N*workers + 1000 + pool.entries = make([]FileIdEntry, total) + for i := range pool.entries { + pool.entries[i] = FileIdEntry{ + FileId: fmt.Sprintf("1,%x12345678", i+1000), + Host: "127.0.0.1:8080", + Auth: security.EncodedJwt("test-jwt"), + Time: now, + } + } + + var ops atomic.Int64 + b.ResetTimer() + + var wg sync.WaitGroup + perWorker := b.N + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < perWorker; i++ { + pool.mu.Lock() + if len(pool.entries) > 0 { + _ = pool.entries[0] + pool.entries = pool.entries[1:] + ops.Add(1) + } + pool.mu.Unlock() + } + }() + } + wg.Wait() + b.ReportMetric(float64(ops.Load())/b.Elapsed().Seconds(), "ids/sec") + }) + } +} + +// BenchmarkBatchAssign measures the cost of batch vs individual assign RPCs. +func BenchmarkBatchAssign(b *testing.B) { + for _, batchSize := range []int{1, 8, 16, 32} { + b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) { + mock := &mockFilerClient{latency: 1 * time.Millisecond} + totalIds := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: int32(batchSize)}) + if err != nil { + b.Fatal(err) + } + + baseFid, _ := needle.ParseFileIdFromString(resp.FileId) + baseKey := uint64(baseFid.Key) + for j := 0; j < int(resp.Count); j++ { + _ = needle.NewFileId(baseFid.VolumeId, baseKey+uint64(j), uint32(baseFid.Cookie)).String() + totalIds++ + } + } + b.ReportMetric(float64(totalIds)/b.Elapsed().Seconds(), "ids/sec") + }) + } +} diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index 12e9d4a77..d2f472468 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -42,7 +42,7 @@ func mergeProcessors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse } } -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error { +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, skipSelfEvents bool, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error { var prefixes []string for _, follower := range followers { @@ -50,11 +50,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - // Let all events (including self-originated ones) flow through the - // applier so that the directory-build buffering and dedup logic - // can handle them consistently. The dedupRing in - // applyMetadataResponseNow catches duplicates that were already - // applied locally via applyLocalMetadataEvent. + if skipSelfEvents && resp.EventNotification != nil { + for _, sig := range resp.EventNotification.Signatures { + if sig == selfSignature { + glog.V(4).Infof("skip self-originated event %s", resp.Directory) + return nil + } + } + } return mc.ApplyMetadataResponse(context.Background(), resp, SubscriberMetadataResponseApplyOptions) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index a698a3a8f..af12cb41f 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -131,9 +131,14 @@ type WFS struct { refreshingDirs map[util.FullPath]struct{} atimeMu sync.Mutex atimeMap map[uint64]time.Time // inode -> atime, in-memory only, bounded + dirMtimeMu sync.Mutex + dirMtimeMap map[uint64]time.Time // inode -> mtime/ctime, in-memory overlay for dirs + entryValidSec uint64 // kernel FUSE entry cache TTL in seconds + attrValidSec uint64 // kernel FUSE attr cache TTL in seconds dirHotWindow time.Duration dirHotThreshold int dirIdleEvict time.Duration + fileIdPool *FileIdPool // asyncFlushWg tracks pending background flush work items for writebackCache mode. // Must be waited on before unmount cleanup to prevent data loss. @@ -211,11 +216,22 @@ func NewSeaweedFileSystem(option *Option) *WFS { posixLocks: NewPosixLockTable(), refreshingDirs: make(map[util.FullPath]struct{}), atimeMap: make(map[uint64]time.Time, 8192), + dirMtimeMap: make(map[uint64]time.Time, 1024), + entryValidSec: 1, + attrValidSec: 1, dirHotWindow: dirHotWindow, dirHotThreshold: dirHotThreshold, dirIdleEvict: dirIdleEvict, } + // With writeback caching, this mount is the single writer. Increase kernel + // FUSE cache TTLs so the kernel doesn't re-issue Lookup/GetAttr for every + // path component and stat — the local meta cache is authoritative. + if option.WritebackCache { + wfs.entryValidSec = 10 + wfs.attrValidSec = 10 + } + if option.EnableDistributedLock && !option.WritebackCache && len(option.FilerAddresses) > 0 { wfs.lockClient = cluster.NewLockClient(option.GrpcDialOption, option.FilerAddresses[0]) glog.V(0).Infof("distributed lock manager enabled for mount") @@ -341,7 +357,7 @@ func (wfs *WFS) StartBackgroundTasks() error { } startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), func(lastTsNs int64, err error) { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), wfs.option.WritebackCache, func(lastTsNs int64, err error) { glog.Warningf("meta events follow retry from %v: %v", time.Unix(0, lastTsNs), err) if deleteErr := wfs.metaCache.DeleteFolderChildren(context.Background(), util.FullPath(wfs.option.FilerMountRootPath)); deleteErr != nil { glog.Warningf("meta cache cleanup failed: %v", deleteErr) @@ -352,6 +368,11 @@ func (wfs *WFS) StartBackgroundTasks() error { go wfs.loopFlushDirtyMetadata() go wfs.loopEvictIdleDirCache() + if wfs.option.WritebackCache { + wfs.fileIdPool = NewFileIdPool(wfs) + glog.V(0).Infof("file ID pool enabled for writeback cache (batch=%d)", wfs.fileIdPool.batchSize) + } + return nil } diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index 6431ae49a..d1438f970 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -1,7 +1,6 @@ package mount import ( - "context" "os" "syscall" "time" @@ -26,16 +25,19 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse inode := input.NodeId path, _, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) wfs.applyInMemoryAtime(&out.Attr, inode) - if entry.IsDirectory && wfs.option.PosixDirNlink { - wfs.applyDirNlink(&out.Attr, path) + if entry.IsDirectory { + wfs.applyInMemoryDirMtime(&out.Attr, inode) + if wfs.option.PosixDirNlink { + wfs.applyDirNlink(&out.Attr, path) + } } return status } else { if fh, found := wfs.fhMap.FindFileHandle(inode); found { - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec // Use shared lock to prevent race with Write operations fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock) wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true) @@ -155,7 +157,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse entry.Attributes.Ctime = now.Unix() entry.Attributes.CtimeNs = int32(now.Nanosecond()) - out.AttrValid = 1 + out.AttrValid = wfs.attrValidSec size, includeSize := input.GetSize() if includeSize { out.Attr.Size = size @@ -262,16 +264,16 @@ func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.E func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.Entry) { out.NodeId = inode out.Generation = 1 - out.EntryValid = 1 - out.AttrValid = 1 + out.EntryValid = wfs.entryValidSec + out.AttrValid = wfs.attrValidSec wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) } func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) { out.NodeId = inode out.Generation = 1 - out.EntryValid = 1 - out.AttrValid = 1 + out.EntryValid = wfs.entryValidSec + out.AttrValid = wfs.attrValidSec wfs.setAttrByFilerEntry(&out.Attr, inode, entry) } @@ -302,15 +304,46 @@ func (wfs *WFS) touchDirMtimeCtime(dirPath util.FullPath) { wfs.saveEntry(dirPath, dirEntry) } -// touchDirMtimeCtimeLocal updates a directory's mtime and ctime directly -// in the local metadata cache, without a filer RPC. +// touchDirMtimeCtimeLocal updates a directory's mtime and ctime in an in-memory +// overlay, avoiding LevelDB reads and writes entirely. The overlay is applied +// by applyInMemoryDirMtime when GetAttr/Lookup reads the directory's attributes. func (wfs *WFS) touchDirMtimeCtimeLocal(dirPath util.FullPath) { - now := time.Now() - if err := wfs.metaCache.TouchDirMtimeCtime(context.Background(), dirPath, now); err != nil { - glog.V(3).Infof("touchDirMtimeCtimeLocal %s: %v", dirPath, err) + if inode, found := wfs.inodeToPath.GetInode(dirPath); found { + wfs.setDirMtime(inode, time.Now()) } } +const dirMtimeMapMaxSize = 8192 + +func (wfs *WFS) setDirMtime(inode uint64, t time.Time) { + wfs.dirMtimeMu.Lock() + defer wfs.dirMtimeMu.Unlock() + if len(wfs.dirMtimeMap) >= dirMtimeMapMaxSize { + for k := range wfs.dirMtimeMap { + delete(wfs.dirMtimeMap, k) + break + } + } + wfs.dirMtimeMap[inode] = t +} + +// applyInMemoryDirMtime overlays the in-memory mtime/ctime onto fuse.Attr +// for directories that had recent child mutations. +func (wfs *WFS) applyInMemoryDirMtime(out *fuse.Attr, inode uint64) { + wfs.dirMtimeMu.Lock() + if t, ok := wfs.dirMtimeMap[inode]; ok { + sec := uint64(t.Unix()) + nsec := uint32(t.Nanosecond()) + if sec > out.Mtime || (sec == out.Mtime && nsec > out.Mtimensec) { + out.Mtime = sec + out.Mtimensec = nsec + out.Ctime = sec + out.Ctimensec = nsec + } + } + wfs.dirMtimeMu.Unlock() +} + const atimeMapMaxSize = 8192 // setAtime stores an in-memory atime for an inode. The map is bounded; diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go index 2c29d907a..1b5253354 100644 --- a/weed/mount/weedfs_dir_lookup.go +++ b/weed/mount/weedfs_dir_lookup.go @@ -44,8 +44,11 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin wfs.outputFilerEntry(out, inode, localEntry) - if localEntry.IsDirectory() && wfs.option.PosixDirNlink { - wfs.applyDirNlink(&out.Attr, fullFilePath) + if localEntry.IsDirectory() { + wfs.applyInMemoryDirMtime(&out.Attr, inode) + if wfs.option.PosixDirNlink { + wfs.applyDirNlink(&out.Attr, fullFilePath) + } } return fuse.OK diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index de8a756ce..aa411b04b 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -19,33 +20,52 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun return } - fileId, uploadResult, err, data := uploader.UploadWithRetry( - wfs, - &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: wfs.option.Replication, - Collection: wfs.option.Collection, - TtlSec: wfs.option.TtlSec, - DiskType: string(wfs.option.DiskType), - DataCenter: wfs.option.DataCenter, - Path: string(fullPath), - }, - &operation.UploadOption{ - Filename: filename, - Cipher: wfs.option.Cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - }, - func(host, fileId string) string { - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) - } - return fileUrl - }, - reader, - ) + uploadOption := &operation.UploadOption{ + Filename: filename, + Cipher: wfs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + } + genFileUrlFn := func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if wfs.option.VolumeServerAccess == "filerProxy" { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) + } + return fileUrl + } + + var fileId string + var uploadResult *operation.UploadResult + var data []byte + + if wfs.fileIdPool != nil { + // Use pre-allocated file ID from pool — avoids AssignVolume RPC. + fileId, uploadResult, err, data = uploader.UploadWithAssignFunc( + func() (string, string, security.EncodedJwt, error) { + entry, getErr := wfs.fileIdPool.Get() + if getErr != nil { + return "", "", "", getErr + } + return entry.FileId, entry.Host, entry.Auth, nil + }, + uploadOption, genFileUrlFn, reader, + ) + } else { + fileId, uploadResult, err, data = uploader.UploadWithRetry( + wfs, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DiskType: string(wfs.option.DiskType), + DataCenter: wfs.option.DataCenter, + Path: string(fullPath), + }, + uploadOption, genFileUrlFn, reader, + ) + } if err != nil { glog.V(0).Infof("upload data %v: %v", filename, err) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 47ae230c5..e115a11d0 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -172,6 +172,29 @@ func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, ho return } +// AssignFunc returns a file ID, host, and auth token for uploading. +type AssignFunc func() (fileId string, host string, auth security.EncodedJwt, err error) + +// UploadWithAssignFunc uploads data using a caller-provided assign function. +// This allows callers to use pre-allocated file IDs from a pool instead of +// making an AssignVolume RPC per chunk. +func (uploader *Uploader) UploadWithAssignFunc(assignFn AssignFunc, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { + bytesReader, ok := reader.(*util.BytesReader) + if ok { + data = bytesReader.Bytes + } else { + data, err = io.ReadAll(reader) + if err != nil { + glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err) + err = fmt.Errorf("read input: %w", err) + return + } + glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) + } + fileId, uploadResult, err = uploader.uploadWithRetryData(assignFn, uploadOption, genFileUrlFn, data) + return +} + // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {