mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-18 15:51:29 +00:00
mount: use BeginRefresh/CommitRefresh in doEnsureVisited
Replace the incremental batch-insert approach in doEnsureVisited with the new refresh lifecycle: 1. BeginRefresh - start buffering subscription events 2. ReadDirAllEntries - fetch full listing (no lock held) 3. CommitRefresh - atomically replace cache + replay buffered events This ensures that any creates, deletes, or updates that arrive via the subscription handler during the filer listing are not lost. The snapshot replaces all stale entries, and buffered events are replayed on top to bring the cache up to date.
This commit is contained in:
@@ -97,12 +97,6 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
|
||||
return mc.localStore.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
|
||||
// This is more efficient than inserting entries one by one.
|
||||
func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
|
||||
return mc.leveldbStore.BatchInsertEntries(ctx, entries)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
|
||||
mc.Lock()
|
||||
defer mc.Unlock()
|
||||
|
||||
@@ -49,11 +49,6 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
// batchInsertSize is the number of entries to accumulate before flushing to LevelDB.
|
||||
// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency
|
||||
// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency.
|
||||
const batchInsertSize = 100
|
||||
|
||||
func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
|
||||
// Use singleflight to deduplicate concurrent requests for the same path
|
||||
_, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) {
|
||||
@@ -69,42 +64,36 @@ func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerCl
|
||||
|
||||
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
||||
|
||||
// Collect entries in batches for efficient LevelDB writes
|
||||
var batch []*filer.Entry
|
||||
// Start buffering subscription events for this directory.
|
||||
// Any events that arrive during the fetch will be replayed
|
||||
// after the snapshot is committed, preventing lost mutations.
|
||||
mc.BeginRefresh(path)
|
||||
|
||||
// Collect all entries from the filer. No lock is held during
|
||||
// network I/O so subscription events can still be buffered.
|
||||
var allEntries []*filer.Entry
|
||||
|
||||
fetchErr := util.Retry("ReadDirAllEntries", func() error {
|
||||
batch = nil // Reset batch on retry, allow GC of previous entries
|
||||
allEntries = nil // Reset on retry
|
||||
return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
||||
entry := filer.FromPbEntry(string(path), pbEntry)
|
||||
if IsHiddenSystemEntry(string(path), entry.Name()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
batch = append(batch, entry)
|
||||
|
||||
// Flush batch when it reaches the threshold
|
||||
// Don't rely on isLast here - hidden entries may cause early return
|
||||
if len(batch) >= batchInsertSize {
|
||||
// No lock needed - LevelDB Write() is thread-safe
|
||||
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
|
||||
return fmt.Errorf("batch insert for %s: %w", path, err)
|
||||
}
|
||||
// Create new slice to allow GC of flushed entries
|
||||
batch = make([]*filer.Entry, 0, batchInsertSize)
|
||||
}
|
||||
allEntries = append(allEntries, entry)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
if fetchErr != nil {
|
||||
mc.CancelRefresh(path)
|
||||
return nil, fmt.Errorf("list %s: %w", path, fetchErr)
|
||||
}
|
||||
|
||||
// Flush any remaining entries in the batch
|
||||
if len(batch) > 0 {
|
||||
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
|
||||
return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err)
|
||||
}
|
||||
// Atomically replace cached entries with the snapshot and replay
|
||||
// any buffered events that arrived during the fetch.
|
||||
if err := mc.CommitRefresh(ctx, path, allEntries); err != nil {
|
||||
return nil, fmt.Errorf("commit refresh for %s: %w", path, err)
|
||||
}
|
||||
mc.markCachedFn(path)
|
||||
return nil, nil
|
||||
|
||||
Reference in New Issue
Block a user