feat(mount): pre-allocate file IDs in pool for writeback cache mode (#9038)

* feat(mount): pre-allocate file IDs in pool for writeback cache mode

When writeback caching is enabled, chunk uploads no longer block on a
per-chunk AssignVolume RPC. Instead, a FileIdPool pre-allocates file IDs
in batches using a single AssignVolume(Count=N, ExpectedDataSize=ChunkSize)
call and hands them out instantly to upload workers.

Pool size is 2x ConcurrentWriters, refilled in background when it drops
below ConcurrentWriters. Entries expire after 25s to respect JWT TTL.
Sequential needle keys are generated from the base file ID returned by
the master, so one Assign RPC produces N usable IDs.

This cuts per-chunk upload latency from 2 RTTs (assign + upload) to
1 RTT (upload only), with the assign cost amortized across the batch.

* test: add benchmarks for file ID pool vs direct assign

Benchmarks measure:
- Pool Get vs Direct AssignVolume at various simulated latencies
- Batch assign scaling (Count=1 through Count=32)
- Concurrent pool access with 1-64 workers

Results on Apple M4:
- Pool Get: constant ~3ns regardless of assign latency
- Batch=16: 15.7x more IDs/sec than individual assigns
- 64 concurrent workers: 19M IDs/sec throughput

* fix(mount): address review feedback on file ID pool

1. Fix race condition in Get(): use sync.Cond so callers wait for an
   in-flight refill instead of returning an error when the pool is empty.

2. Match default pool size to async flush worker count (128, not 16)
   when ConcurrentWriters is unset.

3. Add logging to UploadWithAssignFunc for consistency with UploadWithRetry.

4. Document that pooled assigns omit the Path field, bypassing path-based
   storage rules (filer.conf). This is an intentional tradeoff for
   writeback cache performance.

5. Fix flaky expiry test: widen time margin from 50ms to 1s.

6. Add TestFileIdPoolGetWaitsForRefill to verify concurrent waiters.

* fix(mount): use individual Count=1 assigns to get per-fid JWTs

The master generates one JWT per AssignResponse, bound to the base file
ID (master_grpc_server_assign.go:158). The volume server validates that
the JWT's Fid matches the upload exactly (volume_server_handlers.go:367).
Using Count=N and deriving sequential IDs would fail this check.

Switch to individual Count=1 RPCs over a single gRPC connection. This
still amortizes connection overhead while getting a correct per-fid JWT
for each entry. Partial batches are accepted if some requests fail.

Remove unused needle import now that sequential ID generation is gone.

* fix(mount): separate pprof from FUSE protocol debug logging

The -debug flag was enabling both the pprof HTTP server and the noisy
go-fuse protocol logging (rx/tx lines for every FUSE operation). This
makes profiling impractical as the log output dominates.

Split into two flags:
- -debug: enables pprof HTTP server only (for profiling)
- -debug.fuse: enables raw FUSE protocol request/response logging

* perf(mount): replace LevelDB read+write with in-memory overlay for dir mtime

Profile showed TouchDirMtimeCtime at 0.22s — every create/rename/unlink
in a directory did a LevelDB FindEntry (read) + UpdateEntry (write) just
to bump the parent dir's mtime/ctime.

Replace with an in-memory map (same pattern as existing atime overlay):
- touchDirMtimeCtimeLocal now stores inode→timestamp in dirMtimeMap
- applyInMemoryDirMtime overlays onto GetAttr/Lookup output
- No LevelDB I/O on the mutation hot path

The overlay only advances timestamps forward (max of stored vs overlay),
so stale entries are harmless. Map is bounded at 8192 entries.

* perf(mount): skip self-originated metadata subscription events in writeback mode

With writeback caching, this mount is the single writer. All local
mutations are already applied to the local meta cache (via
applyLocalMetadataEvent or direct InsertEntry). The filer subscription
then delivers the same event back, causing redundant work:
proto.Clone, enqueue to apply loop, dedup ring check, and sometimes
redundant LevelDB writes when the dedup ring misses (deferred creates).

Check EventNotification.Signatures against selfSignature and skip
events that originated from this mount. This eliminates the redundant
processing for every self-originated mutation.

* perf(mount): increase kernel FUSE cache TTL in writeback cache mode

With writeback caching, this mount is the single writer — the local
meta cache is authoritative. Increase EntryValid and AttrValid from 1s
to 10s so the kernel doesn't re-issue Lookup/GetAttr for every path
component and stat call.

This reduces FUSE /dev/fuse round-trips which dominate the profile at
38% of CPU (syscall.rawsyscalln). Each saved round-trip eliminates a
kernel→userspace→kernel transition.

Normal (non-writeback) mode retains the 1s TTL for multi-mount
consistency.
This commit is contained in:
Chris Lu
2026-04-11 20:02:42 -07:00
committed by GitHub
parent b37bbf541a
commit e8a8449553
10 changed files with 618 additions and 52 deletions

View File

@@ -34,6 +34,7 @@ type MountOptions struct {
includeSystemEntries *bool
debug *bool
debugPort *int
debugFuse *bool
localSocket *string
disableXAttr *bool
extraOptions []string
@@ -109,6 +110,7 @@ func init() {
mountOptions.includeSystemEntries = cmdMount.Flag.Bool("includeSystemEntries", false, "show filer system entries (e.g. /topics, /etc) in directory listings")
mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging")
mountOptions.debugFuse = cmdMount.Flag.Bool("debug.fuse", false, "log raw FUSE protocol requests and responses")
mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock")
mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr")
mountOptions.hasAutofs = cmdMount.Flag.Bool("autofs", false, "ignore autofs mounted on the same mountpoint (useful when systemd.automount and autofs is used)")

View File

@@ -251,7 +251,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: *option.disableXAttr,
Debug: *option.debug,
Debug: *option.debugFuse,
EnableLocks: true,
ExplicitDataCacheControl: false,
DirectMount: true,

181
weed/mount/fileids_pool.go Normal file
View File

@@ -0,0 +1,181 @@
package mount
import (
"context"
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
)
// FileIdEntry holds a pre-allocated file ID from the filer/master, ready for
// immediate use by an upload worker without an AssignVolume round-trip.
type FileIdEntry struct {
FileId string
Host string // volume server address (already adjusted for access mode)
Auth security.EncodedJwt
Time time.Time
}
// FileIdPool pre-allocates file IDs in batches so that chunk uploads can grab
// one instantly instead of blocking on an AssignVolume RPC per chunk.
//
// The pool is refilled in the background when it drops below a low-water mark.
// All IDs are allocated with the mount's global (replication, collection, ttl,
// diskType, dataCenter) parameters. Path-based storage rules (filer.conf) are
// NOT applied to pooled IDs since the pool allocates ahead of any specific file
// path. This is an intentional tradeoff for writeback cache performance.
type FileIdPool struct {
wfs *WFS
mu sync.Mutex
cond *sync.Cond
entries []FileIdEntry // available pre-allocated IDs
filling bool // true when a background refill is in progress
poolSize int // target pool capacity
batchSize int // how many IDs to request per Assign RPC
lowWater int // refill trigger threshold
maxAge time.Duration
}
func NewFileIdPool(wfs *WFS) *FileIdPool {
concurrency := wfs.option.ConcurrentWriters
if concurrency <= 0 {
concurrency = 128 // match default async flush worker count
}
pool := &FileIdPool{
wfs: wfs,
poolSize: concurrency * 2,
batchSize: concurrency,
lowWater: concurrency,
maxAge: 25 * time.Second, // conservative; JWT TTL is typically 30s+
}
pool.cond = sync.NewCond(&pool.mu)
return pool
}
// Get returns a pre-allocated file ID entry. If the pool is empty and a refill
// is in progress, callers wait for it to complete rather than failing. Returns
// an error only if the Assign RPC fails.
func (p *FileIdPool) Get() (FileIdEntry, error) {
p.mu.Lock()
defer p.mu.Unlock()
for {
p.evictExpired()
if len(p.entries) > 0 {
entry := p.entries[0]
p.entries = p.entries[1:]
if len(p.entries) < p.lowWater && !p.filling {
p.filling = true
go p.doRefill()
}
return entry, nil
}
// Pool empty.
if p.filling {
// Wait for the in-flight refill to complete.
p.cond.Wait()
continue
}
// No refill in progress — start one synchronously.
p.filling = true
p.mu.Unlock()
entries, err := p.assignBatch(p.batchSize)
p.mu.Lock()
p.filling = false
p.cond.Broadcast()
if err != nil {
return FileIdEntry{}, fmt.Errorf("fileIdPool: %w", err)
}
p.entries = append(p.entries, entries...)
// Loop back to pop from entries.
}
}
func (p *FileIdPool) evictExpired() {
cutoff := time.Now().Add(-p.maxAge)
i := 0
for i < len(p.entries) && p.entries[i].Time.Before(cutoff) {
i++
}
if i > 0 {
p.entries = p.entries[i:]
}
}
// doRefill runs in a background goroutine to refill the pool.
func (p *FileIdPool) doRefill() {
entries, err := p.assignBatch(p.batchSize)
if err != nil {
glog.V(1).Infof("fileIdPool refill: %v", err)
}
p.mu.Lock()
if err == nil {
p.entries = append(p.entries, entries...)
}
p.filling = false
p.cond.Broadcast()
p.mu.Unlock()
}
// assignBatch requests `count` file IDs from the filer using individual
// Count=1 RPCs over a single gRPC connection. Each response includes a
// per-fid JWT, so uploads work correctly when JWT security is enabled.
//
// We use individual requests instead of Count=N because the master generates
// one JWT for the base file ID only (master_grpc_server_assign.go:158), and
// the volume server validates that the JWT's Fid matches the upload's file ID
// exactly (volume_server_handlers.go:367). Sequential IDs derived from a
// Count=N response would fail this check.
//
// Note: the AssignVolumeRequest intentionally omits the Path field. Pooled IDs
// use the mount's global storage parameters, not per-path rules from filer.conf
// (detectStorageOption / MatchStorageRule). This is a writeback cache tradeoff.
func (p *FileIdPool) assignBatch(count int) ([]FileIdEntry, error) {
var entries []FileIdEntry
err := p.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
now := time.Now()
req := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: p.wfs.option.Replication,
Collection: p.wfs.option.Collection,
TtlSec: p.wfs.option.TtlSec,
DiskType: string(p.wfs.option.DiskType),
DataCenter: p.wfs.option.DataCenter,
ExpectedDataSize: uint64(p.wfs.option.ChunkSizeLimit),
}
for i := 0; i < count; i++ {
resp, assignErr := client.AssignVolume(context.Background(), req)
if assignErr != nil {
if len(entries) > 0 {
break // partial batch is fine
}
return assignErr
}
if resp.Error != "" {
if len(entries) > 0 {
break
}
return fmt.Errorf("assign: %s", resp.Error)
}
entries = append(entries, FileIdEntry{
FileId: resp.FileId,
Host: p.wfs.AdjustedUrl(resp.Location),
Auth: security.EncodedJwt(resp.Auth),
Time: now,
})
}
return nil
})
return entries, err
}

View File

@@ -0,0 +1,280 @@
package mount
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
)
// mockFilerClient simulates AssignVolume RPCs with configurable latency.
type mockFilerClient struct {
latency time.Duration
nextVolId uint32
nextKey uint64
mu sync.Mutex
}
func (m *mockFilerClient) AssignVolume(_ context.Context, req *filer_pb.AssignVolumeRequest, _ ...grpc.CallOption) (*filer_pb.AssignVolumeResponse, error) {
if m.latency > 0 {
time.Sleep(m.latency)
}
m.mu.Lock()
defer m.mu.Unlock()
count := int(req.Count)
if count <= 0 {
count = 1
}
vid := needle.VolumeId(m.nextVolId + 1)
key := m.nextKey + 1000 // start at a non-zero key for valid hex encoding
m.nextKey += uint64(count)
m.nextVolId++
fid := needle.NewFileId(vid, key, 0x12345678)
return &filer_pb.AssignVolumeResponse{
FileId: fid.String(),
Count: int32(count),
Auth: "test-jwt-token",
Location: &filer_pb.Location{
Url: "127.0.0.1:8080",
PublicUrl: "127.0.0.1:8080",
GrpcPort: 18080,
},
}, nil
}
// TestFileIdPoolSequentialIds verifies that batch assignment generates
// correct sequential file IDs from a single AssignVolume response.
func TestFileIdPoolSequentialIds(t *testing.T) {
mock := &mockFilerClient{}
resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 5})
if err != nil {
t.Fatal(err)
}
baseFid, parseErr := needle.ParseFileIdFromString(resp.FileId)
if parseErr != nil {
t.Fatal(parseErr)
}
baseKey := uint64(baseFid.Key)
for i := 0; i < int(resp.Count); i++ {
fid := needle.NewFileId(baseFid.VolumeId, baseKey+uint64(i), uint32(baseFid.Cookie))
t.Logf("ID %d: %s", i, fid.String())
parsed, err := needle.ParseFileIdFromString(fid.String())
if err != nil {
t.Fatalf("Failed to parse sequential ID %d: %v", i, err)
}
if uint64(parsed.Key) != baseKey+uint64(i) {
t.Fatalf("ID %d: expected key %d, got %d", i, baseKey+uint64(i), uint64(parsed.Key))
}
}
}
// TestFileIdPoolExpiry verifies that expired entries are evicted.
func TestFileIdPoolExpiry(t *testing.T) {
pool := &FileIdPool{
maxAge: time.Second,
}
pool.cond = sync.NewCond(&pool.mu)
now := time.Now()
pool.entries = []FileIdEntry{
{FileId: "1,old", Time: now.Add(-2 * time.Second)},
{FileId: "2,old", Time: now.Add(-2 * time.Second)},
{FileId: "3,fresh", Time: now},
}
pool.evictExpired()
if len(pool.entries) != 1 {
t.Fatalf("expected 1 entry after eviction, got %d", len(pool.entries))
}
if pool.entries[0].FileId != "3,fresh" {
t.Fatalf("expected fresh entry, got %s", pool.entries[0].FileId)
}
}
// TestFileIdPoolGetWaitsForRefill verifies that concurrent Get() calls wait
// for an in-flight refill instead of returning an error.
func TestFileIdPoolGetWaitsForRefill(t *testing.T) {
pool := &FileIdPool{
poolSize: 10,
batchSize: 5,
lowWater: 3,
maxAge: 30 * time.Second,
}
pool.cond = sync.NewCond(&pool.mu)
// Simulate a slow refill in progress.
pool.filling = true
done := make(chan struct{})
go func() {
// After a short delay, deliver entries and signal.
time.Sleep(10 * time.Millisecond)
pool.mu.Lock()
now := time.Now()
for i := 0; i < 5; i++ {
pool.entries = append(pool.entries, FileIdEntry{
FileId: fmt.Sprintf("1,%x12345678", 1000+i),
Host: "127.0.0.1:8080",
Auth: "jwt",
Time: now,
})
}
pool.filling = false
pool.cond.Broadcast()
pool.mu.Unlock()
close(done)
}()
// Get() should wait for the refill, not return an error.
entry, err := pool.Get()
if err != nil {
t.Fatalf("Get() returned error while refill in progress: %v", err)
}
if entry.FileId == "" {
t.Fatal("Get() returned empty entry")
}
<-done
}
// BenchmarkPoolGetVsDirectAssign measures the latency difference between
// getting a file ID from the pool vs a direct (simulated) AssignVolume RPC.
func BenchmarkPoolGetVsDirectAssign(b *testing.B) {
assignLatencies := []time.Duration{
0,
100 * time.Microsecond,
1 * time.Millisecond,
5 * time.Millisecond,
}
for _, latency := range assignLatencies {
name := "latency=" + latency.String()
b.Run("DirectAssign/"+name, func(b *testing.B) {
mock := &mockFilerClient{latency: latency}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: 1})
if err != nil {
b.Fatal(err)
}
}
})
b.Run("PoolGet/"+name, func(b *testing.B) {
pool := &FileIdPool{
maxAge: 30 * time.Second,
lowWater: 1000000, // disable background refill
}
pool.cond = sync.NewCond(&pool.mu)
// Pre-fill with a fixed-size pool; refill under StopTimer when depleted.
const preload = 4096
refill := func() {
now := time.Now()
for i := 0; i < preload; i++ {
pool.entries = append(pool.entries, FileIdEntry{
FileId: fmt.Sprintf("1,%x12345678", i),
Host: "127.0.0.1:8080",
Auth: security.EncodedJwt("test-jwt"),
Time: now,
})
}
}
refill()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.mu.Lock()
if len(pool.entries) == 0 {
pool.mu.Unlock()
b.StopTimer()
refill()
b.StartTimer()
pool.mu.Lock()
}
_ = pool.entries[0]
pool.entries = pool.entries[1:]
pool.mu.Unlock()
}
})
}
}
// BenchmarkConcurrentPoolGet measures pool throughput under concurrent access.
func BenchmarkConcurrentPoolGet(b *testing.B) {
for _, workers := range []int{1, 4, 16, 64} {
b.Run(fmt.Sprintf("workers=%d", workers), func(b *testing.B) {
pool := &FileIdPool{
maxAge: 30 * time.Second,
lowWater: 1000000,
}
pool.cond = sync.NewCond(&pool.mu)
// Pre-fill
now := time.Now()
total := b.N*workers + 1000
pool.entries = make([]FileIdEntry, total)
for i := range pool.entries {
pool.entries[i] = FileIdEntry{
FileId: fmt.Sprintf("1,%x12345678", i+1000),
Host: "127.0.0.1:8080",
Auth: security.EncodedJwt("test-jwt"),
Time: now,
}
}
var ops atomic.Int64
b.ResetTimer()
var wg sync.WaitGroup
perWorker := b.N
for w := 0; w < workers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < perWorker; i++ {
pool.mu.Lock()
if len(pool.entries) > 0 {
_ = pool.entries[0]
pool.entries = pool.entries[1:]
ops.Add(1)
}
pool.mu.Unlock()
}
}()
}
wg.Wait()
b.ReportMetric(float64(ops.Load())/b.Elapsed().Seconds(), "ids/sec")
})
}
}
// BenchmarkBatchAssign measures the cost of batch vs individual assign RPCs.
func BenchmarkBatchAssign(b *testing.B) {
for _, batchSize := range []int{1, 8, 16, 32} {
b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) {
mock := &mockFilerClient{latency: 1 * time.Millisecond}
totalIds := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
resp, err := mock.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{Count: int32(batchSize)})
if err != nil {
b.Fatal(err)
}
baseFid, _ := needle.ParseFileIdFromString(resp.FileId)
baseKey := uint64(baseFid.Key)
for j := 0; j < int(resp.Count); j++ {
_ = needle.NewFileId(baseFid.VolumeId, baseKey+uint64(j), uint32(baseFid.Cookie)).String()
totalIds++
}
}
b.ReportMetric(float64(totalIds)/b.Elapsed().Seconds(), "ids/sec")
})
}
}

View File

@@ -42,7 +42,7 @@ func mergeProcessors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse
}
}
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error {
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, skipSelfEvents bool, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error {
var prefixes []string
for _, follower := range followers {
@@ -50,11 +50,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
// Let all events (including self-originated ones) flow through the
// applier so that the directory-build buffering and dedup logic
// can handle them consistently. The dedupRing in
// applyMetadataResponseNow catches duplicates that were already
// applied locally via applyLocalMetadataEvent.
if skipSelfEvents && resp.EventNotification != nil {
for _, sig := range resp.EventNotification.Signatures {
if sig == selfSignature {
glog.V(4).Infof("skip self-originated event %s", resp.Directory)
return nil
}
}
}
return mc.ApplyMetadataResponse(context.Background(), resp, SubscriberMetadataResponseApplyOptions)
}

View File

@@ -131,9 +131,14 @@ type WFS struct {
refreshingDirs map[util.FullPath]struct{}
atimeMu sync.Mutex
atimeMap map[uint64]time.Time // inode -> atime, in-memory only, bounded
dirMtimeMu sync.Mutex
dirMtimeMap map[uint64]time.Time // inode -> mtime/ctime, in-memory overlay for dirs
entryValidSec uint64 // kernel FUSE entry cache TTL in seconds
attrValidSec uint64 // kernel FUSE attr cache TTL in seconds
dirHotWindow time.Duration
dirHotThreshold int
dirIdleEvict time.Duration
fileIdPool *FileIdPool
// asyncFlushWg tracks pending background flush work items for writebackCache mode.
// Must be waited on before unmount cleanup to prevent data loss.
@@ -211,11 +216,22 @@ func NewSeaweedFileSystem(option *Option) *WFS {
posixLocks: NewPosixLockTable(),
refreshingDirs: make(map[util.FullPath]struct{}),
atimeMap: make(map[uint64]time.Time, 8192),
dirMtimeMap: make(map[uint64]time.Time, 1024),
entryValidSec: 1,
attrValidSec: 1,
dirHotWindow: dirHotWindow,
dirHotThreshold: dirHotThreshold,
dirIdleEvict: dirIdleEvict,
}
// With writeback caching, this mount is the single writer. Increase kernel
// FUSE cache TTLs so the kernel doesn't re-issue Lookup/GetAttr for every
// path component and stat — the local meta cache is authoritative.
if option.WritebackCache {
wfs.entryValidSec = 10
wfs.attrValidSec = 10
}
if option.EnableDistributedLock && !option.WritebackCache && len(option.FilerAddresses) > 0 {
wfs.lockClient = cluster.NewLockClient(option.GrpcDialOption, option.FilerAddresses[0])
glog.V(0).Infof("distributed lock manager enabled for mount")
@@ -341,7 +357,7 @@ func (wfs *WFS) StartBackgroundTasks() error {
}
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), func(lastTsNs int64, err error) {
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), wfs.option.WritebackCache, func(lastTsNs int64, err error) {
glog.Warningf("meta events follow retry from %v: %v", time.Unix(0, lastTsNs), err)
if deleteErr := wfs.metaCache.DeleteFolderChildren(context.Background(), util.FullPath(wfs.option.FilerMountRootPath)); deleteErr != nil {
glog.Warningf("meta cache cleanup failed: %v", deleteErr)
@@ -352,6 +368,11 @@ func (wfs *WFS) StartBackgroundTasks() error {
go wfs.loopFlushDirtyMetadata()
go wfs.loopEvictIdleDirCache()
if wfs.option.WritebackCache {
wfs.fileIdPool = NewFileIdPool(wfs)
glog.V(0).Infof("file ID pool enabled for writeback cache (batch=%d)", wfs.fileIdPool.batchSize)
}
return nil
}

View File

@@ -1,7 +1,6 @@
package mount
import (
"context"
"os"
"syscall"
"time"
@@ -26,16 +25,19 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
inode := input.NodeId
path, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
out.AttrValid = 1
out.AttrValid = wfs.attrValidSec
wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
wfs.applyInMemoryAtime(&out.Attr, inode)
if entry.IsDirectory && wfs.option.PosixDirNlink {
wfs.applyDirNlink(&out.Attr, path)
if entry.IsDirectory {
wfs.applyInMemoryDirMtime(&out.Attr, inode)
if wfs.option.PosixDirNlink {
wfs.applyDirNlink(&out.Attr, path)
}
}
return status
} else {
if fh, found := wfs.fhMap.FindFileHandle(inode); found {
out.AttrValid = 1
out.AttrValid = wfs.attrValidSec
// Use shared lock to prevent race with Write operations
fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock)
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true)
@@ -155,7 +157,7 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
entry.Attributes.Ctime = now.Unix()
entry.Attributes.CtimeNs = int32(now.Nanosecond())
out.AttrValid = 1
out.AttrValid = wfs.attrValidSec
size, includeSize := input.GetSize()
if includeSize {
out.Attr.Size = size
@@ -262,16 +264,16 @@ func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.E
func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.Entry) {
out.NodeId = inode
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
out.EntryValid = wfs.entryValidSec
out.AttrValid = wfs.attrValidSec
wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
}
func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {
out.NodeId = inode
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
out.EntryValid = wfs.entryValidSec
out.AttrValid = wfs.attrValidSec
wfs.setAttrByFilerEntry(&out.Attr, inode, entry)
}
@@ -302,15 +304,46 @@ func (wfs *WFS) touchDirMtimeCtime(dirPath util.FullPath) {
wfs.saveEntry(dirPath, dirEntry)
}
// touchDirMtimeCtimeLocal updates a directory's mtime and ctime directly
// in the local metadata cache, without a filer RPC.
// touchDirMtimeCtimeLocal updates a directory's mtime and ctime in an in-memory
// overlay, avoiding LevelDB reads and writes entirely. The overlay is applied
// by applyInMemoryDirMtime when GetAttr/Lookup reads the directory's attributes.
func (wfs *WFS) touchDirMtimeCtimeLocal(dirPath util.FullPath) {
now := time.Now()
if err := wfs.metaCache.TouchDirMtimeCtime(context.Background(), dirPath, now); err != nil {
glog.V(3).Infof("touchDirMtimeCtimeLocal %s: %v", dirPath, err)
if inode, found := wfs.inodeToPath.GetInode(dirPath); found {
wfs.setDirMtime(inode, time.Now())
}
}
const dirMtimeMapMaxSize = 8192
func (wfs *WFS) setDirMtime(inode uint64, t time.Time) {
wfs.dirMtimeMu.Lock()
defer wfs.dirMtimeMu.Unlock()
if len(wfs.dirMtimeMap) >= dirMtimeMapMaxSize {
for k := range wfs.dirMtimeMap {
delete(wfs.dirMtimeMap, k)
break
}
}
wfs.dirMtimeMap[inode] = t
}
// applyInMemoryDirMtime overlays the in-memory mtime/ctime onto fuse.Attr
// for directories that had recent child mutations.
func (wfs *WFS) applyInMemoryDirMtime(out *fuse.Attr, inode uint64) {
wfs.dirMtimeMu.Lock()
if t, ok := wfs.dirMtimeMap[inode]; ok {
sec := uint64(t.Unix())
nsec := uint32(t.Nanosecond())
if sec > out.Mtime || (sec == out.Mtime && nsec > out.Mtimensec) {
out.Mtime = sec
out.Mtimensec = nsec
out.Ctime = sec
out.Ctimensec = nsec
}
}
wfs.dirMtimeMu.Unlock()
}
const atimeMapMaxSize = 8192
// setAtime stores an in-memory atime for an inode. The map is bounded;

View File

@@ -44,8 +44,11 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
wfs.outputFilerEntry(out, inode, localEntry)
if localEntry.IsDirectory() && wfs.option.PosixDirNlink {
wfs.applyDirNlink(&out.Attr, fullFilePath)
if localEntry.IsDirectory() {
wfs.applyInMemoryDirMtime(&out.Attr, inode)
if wfs.option.PosixDirNlink {
wfs.applyDirNlink(&out.Attr, fullFilePath)
}
}
return fuse.OK

View File

@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -19,33 +20,52 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
return
}
fileId, uploadResult, err, data := uploader.UploadWithRetry(
wfs,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
},
&operation.UploadOption{
Filename: filename,
Cipher: wfs.option.Cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
},
func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
return fileUrl
},
reader,
)
uploadOption := &operation.UploadOption{
Filename: filename,
Cipher: wfs.option.Cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
}
genFileUrlFn := func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
return fileUrl
}
var fileId string
var uploadResult *operation.UploadResult
var data []byte
if wfs.fileIdPool != nil {
// Use pre-allocated file ID from pool — avoids AssignVolume RPC.
fileId, uploadResult, err, data = uploader.UploadWithAssignFunc(
func() (string, string, security.EncodedJwt, error) {
entry, getErr := wfs.fileIdPool.Get()
if getErr != nil {
return "", "", "", getErr
}
return entry.FileId, entry.Host, entry.Auth, nil
},
uploadOption, genFileUrlFn, reader,
)
} else {
fileId, uploadResult, err, data = uploader.UploadWithRetry(
wfs,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
},
uploadOption, genFileUrlFn, reader,
)
}
if err != nil {
glog.V(0).Infof("upload data %v: %v", filename, err)

View File

@@ -172,6 +172,29 @@ func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, ho
return
}
// AssignFunc returns a file ID, host, and auth token for uploading.
type AssignFunc func() (fileId string, host string, auth security.EncodedJwt, err error)
// UploadWithAssignFunc uploads data using a caller-provided assign function.
// This allows callers to use pre-allocated file IDs from a pool instead of
// making an AssignVolume RPC per chunk.
func (uploader *Uploader) UploadWithAssignFunc(assignFn AssignFunc, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
} else {
data, err = io.ReadAll(reader)
if err != nil {
glog.V(0).Infof("upload read input %s: %v", uploadOption.SourceUrl, err)
err = fmt.Errorf("read input: %w", err)
return
}
glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl)
}
fileId, uploadResult, err = uploader.uploadWithRetryData(assignFn, uploadOption, genFileUrlFn, data)
return
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {