From e91d43ef08efe812eba1d64a5113ecd6deb38ee0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Mar 2026 00:46:10 -0800 Subject: [PATCH] mount: use BeginRefresh/CommitRefresh in doEnsureVisited Replace the incremental batch-insert approach in doEnsureVisited with the new refresh lifecycle: 1. BeginRefresh - start buffering subscription events 2. ReadDirAllEntries - fetch full listing (no lock held) 3. CommitRefresh - atomically replace cache + replay buffered events This ensures that any creates, deletes, or updates that arrive via the subscription handler during the filer listing are not lost. The snapshot replaces all stale entries, and buffered events are replayed on top to bring the cache up to date. --- weed/mount/meta_cache/meta_cache.go | 6 ---- weed/mount/meta_cache/meta_cache_init.go | 41 +++++++++--------------- 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 21bc9a591..3abc2fbea 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -97,12 +97,6 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro return mc.localStore.InsertEntry(ctx, entry) } -// doBatchInsertEntries inserts multiple entries using LevelDB's batch write. -// This is more efficient than inserting entries one by one. -func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error { - return mc.leveldbStore.BatchInsertEntries(ctx, entries) -} - func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { mc.Lock() defer mc.Unlock() diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index 10ec9dad7..34f26d57d 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -49,11 +49,6 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full return g.Wait() } -// batchInsertSize is the number of entries to accumulate before flushing to LevelDB. -// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency -// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency. -const batchInsertSize = 100 - func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { // Use singleflight to deduplicate concurrent requests for the same path _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) { @@ -69,42 +64,36 @@ func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerCl glog.V(4).Infof("ReadDirAllEntries %s ...", path) - // Collect entries in batches for efficient LevelDB writes - var batch []*filer.Entry + // Start buffering subscription events for this directory. + // Any events that arrive during the fetch will be replayed + // after the snapshot is committed, preventing lost mutations. + mc.BeginRefresh(path) + + // Collect all entries from the filer. No lock is held during + // network I/O so subscription events can still be buffered. + var allEntries []*filer.Entry fetchErr := util.Retry("ReadDirAllEntries", func() error { - batch = nil // Reset batch on retry, allow GC of previous entries + allEntries = nil // Reset on retry return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer.FromPbEntry(string(path), pbEntry) if IsHiddenSystemEntry(string(path), entry.Name()) { return nil } - - batch = append(batch, entry) - - // Flush batch when it reaches the threshold - // Don't rely on isLast here - hidden entries may cause early return - if len(batch) >= batchInsertSize { - // No lock needed - LevelDB Write() is thread-safe - if err := mc.doBatchInsertEntries(ctx, batch); err != nil { - return fmt.Errorf("batch insert for %s: %w", path, err) - } - // Create new slice to allow GC of flushed entries - batch = make([]*filer.Entry, 0, batchInsertSize) - } + allEntries = append(allEntries, entry) return nil }) }) if fetchErr != nil { + mc.CancelRefresh(path) return nil, fmt.Errorf("list %s: %w", path, fetchErr) } - // Flush any remaining entries in the batch - if len(batch) > 0 { - if err := mc.doBatchInsertEntries(ctx, batch); err != nil { - return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err) - } + // Atomically replace cached entries with the snapshot and replay + // any buffered events that arrived during the fetch. + if err := mc.CommitRefresh(ctx, path, allEntries); err != nil { + return nil, fmt.Errorf("commit refresh for %s: %w", path, err) } mc.markCachedFn(path) return nil, nil