mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-02 23:16:27 +00:00
filer: reuse the caller's fetched entry in CreateEntry (#9645)
CreateEntry starts with a FindEntry to load the current entry. A conditional CreateEntry already fetched that entry to evaluate the precondition under the per-path lock, so the create repeated the lookup. Add an existing *Entry parameter: when non-nil it is used as the current entry and the internal lookup is skipped; nil keeps the lookup. The gRPC CreateEntry handler passes the entry it fetched for the precondition, removing the redundant read while the lock is held. All other callers pass nil.
This commit is contained in:
@@ -209,7 +209,11 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
|
||||
return f.Store.RollbackTransaction(ctx)
|
||||
}
|
||||
|
||||
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool, maxFilenameLength uint32) error {
|
||||
// CreateEntry creates or replaces an entry. When existing is non-nil the caller
|
||||
// has already fetched the current entry at this path under a path lock, and it
|
||||
// is reused instead of looking the store up again; pass nil to have CreateEntry
|
||||
// look it up itself.
|
||||
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, existing *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool, maxFilenameLength uint32) error {
|
||||
|
||||
if string(entry.FullPath) == "/" {
|
||||
return nil
|
||||
@@ -227,7 +231,10 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
||||
entry.Attr.Atime = entryInitialAtime(entry.Attr)
|
||||
}
|
||||
|
||||
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
|
||||
oldEntry := existing
|
||||
if oldEntry == nil {
|
||||
oldEntry, _ = f.FindEntry(ctx, entry.FullPath)
|
||||
}
|
||||
|
||||
/*
|
||||
if !hasWritePermission(lastDirectoryEntry, entry) {
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestCreateEntryAssignsInodeWhenMissing(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err := f.CreateEntry(context.Background(), entry, false, false, nil, false, f.MaxFilenameLength)
|
||||
err := f.CreateEntry(context.Background(), entry, nil, false, false, nil, false, f.MaxFilenameLength)
|
||||
require.NoError(t, err)
|
||||
|
||||
stored, findErr := store.FindEntry(context.Background(), entry.FullPath)
|
||||
@@ -48,7 +48,7 @@ func TestCreateEntryAssignsInodesToAutoCreatedParents(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
err := f.CreateEntry(context.Background(), entry, false, false, nil, false, f.MaxFilenameLength)
|
||||
err := f.CreateEntry(context.Background(), entry, nil, false, false, nil, false, f.MaxFilenameLength)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, path := range []string{"/a", "/a/b", "/a/b/c.txt"} {
|
||||
|
||||
@@ -96,7 +96,7 @@ func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) (
|
||||
persistBaseCtx, cancelPersist := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancelPersist()
|
||||
persistCtx := context.WithValue(persistBaseCtx, lazyFetchContextKey{}, true)
|
||||
saveErr := f.CreateEntry(persistCtx, entry, false, false, nil, true, f.MaxFilenameLength)
|
||||
saveErr := f.CreateEntry(persistCtx, entry, nil, false, false, nil, true, f.MaxFilenameLength)
|
||||
if saveErr != nil {
|
||||
glog.Warningf("maybeLazyFetchFromRemote: failed to persist filer entry for %s: %v", p, saveErr)
|
||||
f.lazyFetchGroup.Forget(key)
|
||||
|
||||
@@ -152,7 +152,7 @@ func (f *Filer) maybeLazyListFromRemote(ctx context.Context, p util.FullPath) {
|
||||
entry.Attr.FileSize = uint64(remoteEntry.RemoteSize)
|
||||
}
|
||||
}
|
||||
if saveErr := f.CreateEntry(persistCtx, entry, false, false, nil, true, f.MaxFilenameLength); saveErr != nil {
|
||||
if saveErr := f.CreateEntry(persistCtx, entry, nil, false, false, nil, true, f.MaxFilenameLength); saveErr != nil {
|
||||
glog.Warningf("maybeLazyListFromRemote: persist %s: %v", childPath, saveErr)
|
||||
}
|
||||
}
|
||||
@@ -193,7 +193,7 @@ func (f *Filer) updateDirectoryListingSyncedAt(ctx context.Context, p util.FullP
|
||||
dirEntry.Extended = make(map[string][]byte)
|
||||
}
|
||||
dirEntry.Extended[xattrRemoteListingSyncedAt] = []byte(fmt.Sprintf("%d", syncTime.Unix()))
|
||||
if saveErr := f.CreateEntry(ctx, dirEntry, false, false, nil, true, f.MaxFilenameLength); saveErr != nil {
|
||||
if saveErr := f.CreateEntry(ctx, dirEntry, nil, false, false, nil, true, f.MaxFilenameLength); saveErr != nil {
|
||||
glog.Warningf("maybeLazyListFromRemote: create dir synced_at for %s: %v", p, saveErr)
|
||||
}
|
||||
return
|
||||
|
||||
@@ -43,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
|
||||
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
|
||||
|
||||
// update the entry
|
||||
err = f.CreateEntry(context.Background(), entry, false, false, nil, false, f.MaxFilenameLength)
|
||||
err = f.CreateEntry(context.Background(), entry, nil, false, false, nil, false, f.MaxFilenameLength)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
if err := testFiler.CreateEntry(ctx, entry1, nil, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
if err := testFiler.CreateEntry(ctx, entry1, nil, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
if err := testFiler.CreateEntry(ctx, entry1, nil, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestCreateAndFind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
if err := testFiler.CreateEntry(ctx, entry1, nil, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
t.Errorf("create entry %v: %v", entry1.FullPath, err)
|
||||
return
|
||||
}
|
||||
@@ -149,7 +149,7 @@ func TestListDirectoryWithPrefix(t *testing.T) {
|
||||
Gid: 1,
|
||||
},
|
||||
}
|
||||
if err := testFiler.CreateEntry(ctx, entry, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
if err := testFiler.CreateEntry(ctx, entry, nil, false, false, nil, false, testFiler.MaxFilenameLength); err != nil {
|
||||
t.Fatalf("Failed to create entry %s: %v", fullpath, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +195,9 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
||||
|
||||
// Evaluate the optional precondition against the current entry while the
|
||||
// path lock is held, so the check and the write are atomic on this filer.
|
||||
// The fetched entry is then handed to CreateEntry below so it does not look
|
||||
// the same path up again under the lock.
|
||||
var existing *filer.Entry
|
||||
if conditionIsSet(req.Condition) {
|
||||
current, findErr := fs.filer.FindEntry(ctx, fullpath)
|
||||
if findErr != nil && findErr != filer_pb.ErrNotFound {
|
||||
@@ -210,10 +213,11 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
|
||||
ErrorCode: filer_pb.FilerError_PRECONDITION_FAILED,
|
||||
}, nil
|
||||
}
|
||||
existing = current
|
||||
}
|
||||
|
||||
ctx, eventSink := filer.WithMetadataEventSink(ctx)
|
||||
createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
|
||||
createErr := fs.filer.CreateEntry(ctx, newEntry, existing, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory, so.MaxFileNameLength)
|
||||
|
||||
if createErr == nil {
|
||||
fs.filer.DeleteChunksNotRecursive(garbage)
|
||||
@@ -327,7 +331,7 @@ func (fs *FilerServer) applyObjectMutation(ctx context.Context, m *filer_pb.Obje
|
||||
return fmt.Errorf("PUT requires an entry")
|
||||
}
|
||||
newEntry := filer.FromPbEntry(m.Directory, m.Entry)
|
||||
return fs.filer.CreateEntry(ctx, newEntry, false, fromOtherCluster, signatures, false, fs.filer.MaxFilenameLength)
|
||||
return fs.filer.CreateEntry(ctx, newEntry, nil, false, fromOtherCluster, signatures, false, fs.filer.MaxFilenameLength)
|
||||
|
||||
case filer_pb.ObjectMutation_DELETE:
|
||||
fullpath := util.NewFullPath(m.Directory, m.Name)
|
||||
@@ -592,7 +596,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
||||
glog.V(0).InfofCtx(ctx, "MaybeManifestize: %v", err)
|
||||
}
|
||||
|
||||
err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false, fs.filer.MaxFilenameLength)
|
||||
err = fs.filer.CreateEntry(context.Background(), entry, nil, false, false, nil, false, fs.filer.MaxFilenameLength)
|
||||
|
||||
return &filer_pb.AppendToEntryResponse{}, err
|
||||
}
|
||||
|
||||
@@ -234,3 +234,42 @@ func TestCreateEntryConditionEnforced(t *testing.T) {
|
||||
t.Fatalf("matching etag should overwrite, got error %q", resp.Error)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateEntry reuses a provided existing entry instead of reading the store
|
||||
// again; passing nil makes it look the path up itself.
|
||||
func TestCreateEntryReusesProvidedExisting(t *testing.T) {
|
||||
existing := &filer.Entry{
|
||||
FullPath: "/test/obj",
|
||||
Attr: filer.Attr{Inode: 1, Mtime: time.Unix(1700000000, 0), Crtime: time.Unix(1700000000, 0)},
|
||||
Extended: map[string][]byte{s3_constants.ExtETagKey: []byte("abc")},
|
||||
}
|
||||
newEntry := func() *filer.Entry {
|
||||
return &filer.Entry{
|
||||
FullPath: "/test/obj",
|
||||
Attr: filer.Attr{Inode: 1, Mtime: time.Unix(1700000001, 0)},
|
||||
}
|
||||
}
|
||||
|
||||
// Provided existing vs nil: the only difference is CreateEntry's own lookup,
|
||||
// so providing it must save exactly one path read (other store-layer reads
|
||||
// during the overwrite are the same in both runs).
|
||||
store := newRenameTestStore()
|
||||
store.entries["/test/obj"] = existing.ShallowClone()
|
||||
f := newRenameTestFiler(store)
|
||||
if err := f.CreateEntry(context.Background(), newEntry(), existing, false, false, nil, true, f.MaxFilenameLength); err != nil {
|
||||
t.Fatalf("create with existing: %v", err)
|
||||
}
|
||||
withExisting := store.findEntryCallCount("/test/obj")
|
||||
|
||||
store2 := newRenameTestStore()
|
||||
store2.entries["/test/obj"] = existing.ShallowClone()
|
||||
f2 := newRenameTestFiler(store2)
|
||||
if err := f2.CreateEntry(context.Background(), newEntry(), nil, false, false, nil, true, f2.MaxFilenameLength); err != nil {
|
||||
t.Fatalf("create with nil: %v", err)
|
||||
}
|
||||
withNil := store2.findEntryCallCount("/test/obj")
|
||||
|
||||
if withNil != withExisting+1 {
|
||||
t.Fatalf("providing existing should save one path lookup: existing=%d nil=%d", withExisting, withNil)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,7 +245,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee
|
||||
return fmt.Errorf("insert entry %s: %v", newEntry.FullPath, createErr)
|
||||
}
|
||||
} else {
|
||||
if createErr := fs.filer.CreateEntry(filer.WithSuppressedMetadataEvents(ctx), newEntry, false, false, signatures, false, fs.filer.MaxFilenameLength); createErr != nil {
|
||||
if createErr := fs.filer.CreateEntry(filer.WithSuppressedMetadataEvents(ctx), newEntry, nil, false, false, signatures, false, fs.filer.MaxFilenameLength); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,7 @@ func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.
|
||||
|
||||
// Pass o_excl = !overwrite so the default copy refuses to replace an
|
||||
// existing destination, while overwrite=true updates the pre-created target.
|
||||
if createErr := fs.filer.CreateEntry(ctx, newEntry, !overwrite, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
|
||||
if createErr := fs.filer.CreateEntry(ctx, newEntry, nil, !overwrite, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
|
||||
err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||
return
|
||||
|
||||
@@ -42,7 +42,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, nil, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||
glog.V(0).InfofCtx(ctx, "failing to update %s tagging : %v", path, dbErr)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
||||
return
|
||||
@@ -108,7 +108,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
|
||||
return
|
||||
}
|
||||
|
||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||
if dbErr := fs.filer.CreateEntry(ctx, existingEntry, nil, false, false, nil, false, fs.filer.MaxFilenameLength); dbErr != nil {
|
||||
glog.V(0).InfofCtx(ctx, "failing to delete %s tagging : %v", path, dbErr)
|
||||
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
|
||||
return
|
||||
|
||||
@@ -339,7 +339,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||
}
|
||||
}
|
||||
|
||||
dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength)
|
||||
dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, nil, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength)
|
||||
if dbErr != nil {
|
||||
replyerr = dbErr
|
||||
filerResult.Error = dbErr.Error()
|
||||
@@ -437,7 +437,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
||||
Name: util.FullPath(path).Name(),
|
||||
}
|
||||
|
||||
if dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
||||
if dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, nil, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
||||
replyerr = dbErr
|
||||
filerResult.Error = dbErr.Error()
|
||||
glog.V(0).InfofCtx(ctx, "failing to create dir %s on filer server : %v", path, dbErr)
|
||||
|
||||
@@ -132,7 +132,7 @@ func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPat
|
||||
Uid: OS_UID,
|
||||
Gid: OS_GID,
|
||||
},
|
||||
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
}, nil, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
return nil, fmt.Errorf("create session directory: %w", err)
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession)
|
||||
Content: sessionData,
|
||||
}
|
||||
|
||||
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
if err := fs.filer.CreateEntry(ctx, entry, nil, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
return fmt.Errorf("save session info entry: %w", err)
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ func (fs *FilerServer) saveTusChunk(ctx context.Context, uploadID string, chunk
|
||||
Uid: OS_UID,
|
||||
Gid: OS_GID,
|
||||
},
|
||||
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
}, nil, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
return fmt.Errorf("save chunk info: %w", err)
|
||||
}
|
||||
|
||||
@@ -371,7 +371,7 @@ func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSessio
|
||||
}
|
||||
|
||||
// Ensure parent directory exists
|
||||
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
if err := fs.filer.CreateEntry(ctx, entry, nil, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
|
||||
return fmt.Errorf("create final file entry: %w", err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user