diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 21bc9a591..3abc2fbea 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -97,12 +97,6 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro return mc.localStore.InsertEntry(ctx, entry) } -// doBatchInsertEntries inserts multiple entries using LevelDB's batch write. -// This is more efficient than inserting entries one by one. -func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error { - return mc.leveldbStore.BatchInsertEntries(ctx, entries) -} - func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { mc.Lock() defer mc.Unlock() diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index 10ec9dad7..34f26d57d 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -49,11 +49,6 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full return g.Wait() } -// batchInsertSize is the number of entries to accumulate before flushing to LevelDB. -// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency -// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency. -const batchInsertSize = 100 - func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { // Use singleflight to deduplicate concurrent requests for the same path _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) { @@ -69,42 +64,36 @@ func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerCl glog.V(4).Infof("ReadDirAllEntries %s ...", path) - // Collect entries in batches for efficient LevelDB writes - var batch []*filer.Entry + // Start buffering subscription events for this directory. + // Any events that arrive during the fetch will be replayed + // after the snapshot is committed, preventing lost mutations. + mc.BeginRefresh(path) + + // Collect all entries from the filer. No lock is held during + // network I/O so subscription events can still be buffered. + var allEntries []*filer.Entry fetchErr := util.Retry("ReadDirAllEntries", func() error { - batch = nil // Reset batch on retry, allow GC of previous entries + allEntries = nil // Reset on retry return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer.FromPbEntry(string(path), pbEntry) if IsHiddenSystemEntry(string(path), entry.Name()) { return nil } - - batch = append(batch, entry) - - // Flush batch when it reaches the threshold - // Don't rely on isLast here - hidden entries may cause early return - if len(batch) >= batchInsertSize { - // No lock needed - LevelDB Write() is thread-safe - if err := mc.doBatchInsertEntries(ctx, batch); err != nil { - return fmt.Errorf("batch insert for %s: %w", path, err) - } - // Create new slice to allow GC of flushed entries - batch = make([]*filer.Entry, 0, batchInsertSize) - } + allEntries = append(allEntries, entry) return nil }) }) if fetchErr != nil { + mc.CancelRefresh(path) return nil, fmt.Errorf("list %s: %w", path, fetchErr) } - // Flush any remaining entries in the batch - if len(batch) > 0 { - if err := mc.doBatchInsertEntries(ctx, batch); err != nil { - return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err) - } + // Atomically replace cached entries with the snapshot and replay + // any buffered events that arrived during the fetch. + if err := mc.CommitRefresh(ctx, path, allEntries); err != nil { + return nil, fmt.Errorf("commit refresh for %s: %w", path, err) } mc.markCachedFn(path) return nil, nil