From 68cae26c0b5d62fe50dee8a36e37b0a140c705f9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 May 2026 23:49:41 -0700 Subject: [PATCH] mount: fix SetAttr/GetAttr crash from concurrent chunk append under writebackCache (#9667) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * mount: hold the entry lock while reading chunk size in GetAttr/SetAttr Async upload workers append chunks to an open handle's shared entry under the LockedEntry lock (FileHandle.AddChunks), but GetAttr and SetAttr computed FileSize by iterating entry.Chunks without taking it. A concurrent append that reallocated the backing array tore the slice read and crashed in filer.TotalSize. Surfaces with -writebackCache, where handles stay open and flush asynchronously while metadata ops keep arriving. Take the LockedEntry lock for those reads (and SetAttr's truncate rewrite). * mount: re-read entry under the lock in GetAttr/SetAttr If SetEntry swapped the handle's entry pointer between maybeReadEntry and the lock acquisition, the old pointer is orphaned. Re-read fh.entry.Entry under the lock so SetAttr mutates the live entry instead of losing the update, and GetAttr reports the current one. * mount: cover the truncate path in TestAttrChunkRace Alternate SetAttr between mtime-only and a shrinking size so the test also exercises the entry.Chunks rewrite under fh.entry.Lock, not just the read-side size walk. * mount: snapshot chunks under the entry lock on the read path readFromChunks holds fh.entryLock (excludes SetAttr) but not the LockedEntry lock the async uploader appends under, so IsInRemoteOnly, the FileSize fallback, and the RDMA/peer chunk walks read entry.Chunks while AddChunks reallocated it — the same torn-slice crash as GetAttr/SetAttr. Snapshot size, inline content, and the chunk list under a brief LockedEntry RLock, then hand the snapshot to the RDMA/peer helpers instead of holding the lock across network I/O. The captured slice stays valid: append never mutates the old backing array, and truncate is excluded by the fh.entryLock. --- weed/mount/filehandle_read.go | 36 +++++-- weed/mount/peer_fetcher.go | 5 +- weed/mount/weedfs_attr.go | 26 ++++- weed/mount/weedfs_attr_race_test.go | 152 ++++++++++++++++++++++++++++ 4 files changed, 205 insertions(+), 14 deletions(-) create mode 100644 weed/mount/weedfs_attr_race_test.go diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index df458524d..86b637a4a 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -35,7 +35,12 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte entry := fh.GetEntry() - if entry.IsInRemoteOnly() { + // IsInRemoteOnly inspects entry.Chunks, so take the LockedEntry lock the + // async uploader appends under. + entry.RLock() + remoteOnly := entry.Entry.IsInRemoteOnly() + entry.RUnlock() + if remoteOnly { glog.V(4).Infof("download remote entry %s", fileFullPath) err := fh.downloadRemoteEntry(entry) if err != nil { @@ -44,10 +49,21 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte } } - fileSize := int64(entry.Attributes.FileSize) + // Snapshot size, inline content, and the chunk list under the LockedEntry + // lock. Async upload workers append chunks under this lock (AddChunks), so + // reading entry.Chunks / FileSize without it races with the slice + // reallocation and can crash in filer.TotalSize. The captured slice headers + // stay valid afterwards: append never mutates the old backing array, and + // truncate is excluded by the fh.entryLock held for this whole read. + entry.RLock() + pbEntry := entry.Entry + fileSize := int64(pbEntry.Attributes.FileSize) if fileSize == 0 { - fileSize = int64(filer.FileSize(entry.GetEntry())) + fileSize = int64(filer.FileSize(pbEntry)) } + content := pbEntry.Content + chunks := pbEntry.Chunks + entry.RUnlock() if fileSize == 0 { glog.V(1).Infof("empty fh %v", fileFullPath) @@ -59,15 +75,15 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return 0, 0, io.EOF } - if offset < int64(len(entry.Content)) { - totalRead := copy(buff, entry.Content[offset:]) + if offset < int64(len(content)) { + totalRead := copy(buff, content[offset:]) glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), 0, nil } // Try RDMA acceleration first if available if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled { - totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry) + totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, chunks) if err == nil { glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), ts, nil @@ -79,7 +95,7 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte // Any failure falls through transparently. See design-weed-mount- // peer-chunk-sharing.md §4.3. if fh.wfs.option.PeerEnabled && fh.wfs.peerGrpcServer != nil { - totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, entry) + totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, chunks) if err == nil { glog.V(4).Infof("peer read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), ts, nil @@ -104,13 +120,13 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), ts, err } -// tryRDMARead attempts to read file data using RDMA acceleration -func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { +// tryRDMARead attempts to read file data using RDMA acceleration. chunks is a +// snapshot captured under the LockedEntry lock by the caller. +func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) { // For now, we'll try to read the chunks directly using RDMA // This is a simplified approach - in a full implementation, we'd need to // handle chunk boundaries, multiple chunks, etc. - chunks := entry.GetEntry().Chunks if len(chunks) == 0 { return 0, 0, fmt.Errorf("no chunks available for RDMA read") } diff --git a/weed/mount/peer_fetcher.go b/weed/mount/peer_fetcher.go index 6354283bc..c04b69410 100644 --- a/weed/mount/peer_fetcher.go +++ b/weed/mount/peer_fetcher.go @@ -53,7 +53,8 @@ const maxPeerFetchChunkBytes = 64 * 1024 * 1024 // end-to-end against FileChunk.ETag. // 4. On success, populate chunk_cache and enqueue an announce so // other mounts can discover us as a new holder. -func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { +// chunks is a snapshot captured under the LockedEntry lock by the caller. +func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) { if fh.wfs.peerRegistrar == nil || fh.wfs.peerConnPool == nil { return 0, 0, fmt.Errorf("peer sharing not configured") } @@ -63,7 +64,7 @@ func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []by if readStop > fileSize { readStop = fileSize } - dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), entry.GetEntry().Chunks, offset, readStop) + dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), chunks, offset, readStop) if err != nil { return 0, 0, fmt.Errorf("resolve manifest: %w", err) } diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index fc0961fa1..9882b256c 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -23,10 +23,21 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse } inode := input.NodeId - path, _, entry, status := wfs.maybeReadEntry(inode) + path, fh, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { out.AttrValid = wfs.attrValidSec + // When an open handle owns the entry, async upload workers append + // chunks under the LockedEntry lock; take it for reading so FileSize + // does not iterate the chunk slice mid-reallocation. Re-read under the + // lock in case SetEntry swapped the pointer since maybeReadEntry. + if fh != nil { + fh.entry.RLock() + entry = fh.entry.Entry + } wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) + if fh != nil { + fh.entry.RUnlock() + } wfs.applyInMemoryAtime(&out.Attr, inode) if entry.IsDirectory { wfs.applyInMemoryDirMtime(&out.Attr, inode) @@ -40,7 +51,9 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse out.AttrValid = wfs.attrValidSec // Use shared lock to prevent race with Write operations fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock) - wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true) + fh.entry.RLock() + wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.Entry, true) + fh.entry.RUnlock() wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) wfs.applyInMemoryAtime(&out.Attr, inode) out.Nlink = 0 @@ -65,6 +78,15 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if fh != nil { fh.entryLock.Lock() defer fh.entryLock.Unlock() + // entry is the handle's shared LockedEntry.Entry. Async upload workers + // mutate its Chunks slice under the LockedEntry lock (AddChunks); hold + // that same lock so the truncate and FileSize reads below don't tear + // against a concurrent append. Re-read under the lock in case SetEntry + // swapped the pointer since maybeReadEntry, so we don't mutate an + // orphaned entry and lose the update. + fh.entry.Lock() + defer fh.entry.Unlock() + entry = fh.entry.Entry } wormEnforced, wormEnabled := wfs.wormEnforcedForEntry(path, entry) diff --git a/weed/mount/weedfs_attr_race_test.go b/weed/mount/weedfs_attr_race_test.go new file mode 100644 index 000000000..0fd386121 --- /dev/null +++ b/weed/mount/weedfs_attr_race_test.go @@ -0,0 +1,152 @@ +package mount + +import ( + "sync" + "testing" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestAttrChunkRace guards the locking around an open handle's chunk slice. +// +// With writebackCache, async upload workers append chunks to an open file +// handle's shared entry under the LockedEntry lock (FileHandle.AddChunks), +// while metadata ops compute the file size by iterating entry.Chunks. SetAttr +// and GetAttr used to read that slice without the LockedEntry lock, so a +// concurrent append that reallocated the backing array produced a torn slice +// read and a nil pointer dereference in filer.TotalSize. Run under -race. +func TestAttrChunkRace(t *testing.T) { + wfs := &WFS{ + option: &Option{}, + inodeToPath: NewInodeToPath(util.FullPath("/"), 0), + fhMap: NewFileHandleToInode(), + openMtimeCache: make(map[uint64][2]int64, 8), + } + + const inode = uint64(42) + fullPath := util.FullPath("/dir/sample.txt") + wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true) + + entry := &filer_pb.Entry{ + Name: "sample.txt", + Attributes: &filer_pb.FuseAttributes{FileMode: 0644}, + } + chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1) + if err != nil { + t.Fatalf("NewChunkGroup: %v", err) + } + fh := &FileHandle{ + fh: FileHandleId(1), + inode: inode, + wfs: wfs, + entry: &LockedEntry{Entry: entry}, + entryChunkGroup: chunkGroup, + } + wfs.fhMap.inode2fh[inode] = fh + wfs.fhMap.fh2inode[fh.fh] = inode + + const iterations = 2000 + var wg sync.WaitGroup + wg.Add(3) + + // Async uploader: append chunks, reallocating the backing array. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}}) + } + }() + + // SetAttr: mtime-only recomputes FileSize by iterating chunks; a shrinking + // size takes the truncate path that rewrites entry.Chunks under the lock. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + in := &fuse.SetAttrIn{} + in.NodeId = inode + if i%2 == 0 { + in.Valid = fuse.FATTR_MTIME + in.Mtime = uint64(i) + } else { + in.Valid = fuse.FATTR_SIZE + in.Size = uint64(i % 8) + } + var out fuse.AttrOut + wfs.SetAttr(nil, in, &out) + } + }() + + // GetAttr also computes FileSize by iterating chunks. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + in := &fuse.GetAttrIn{} + in.NodeId = inode + var out fuse.AttrOut + wfs.GetAttr(nil, in, &out) + } + }() + + wg.Wait() +} + +// TestReadFromChunksRace guards the read path's chunk-slice access. The read +// path holds fh.entryLock (which excludes SetAttr) but not the LockedEntry lock +// the async uploader appends under, so readFromChunks used to compute FileSize +// and walk entry.Chunks while AddChunks reallocated the slice. Run under -race. +func TestReadFromChunksRace(t *testing.T) { + wfs := &WFS{ + option: &Option{}, + inodeToPath: NewInodeToPath(util.FullPath("/"), 0), + fhMap: NewFileHandleToInode(), + } + + const inode = uint64(42) + fullPath := util.FullPath("/dir/sample.txt") + wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true) + + // FileSize 0 forces readFromChunks down the filer.FileSize(chunks) branch. + entry := &filer_pb.Entry{ + Name: "sample.txt", + Attributes: &filer_pb.FuseAttributes{FileMode: 0644}, + } + chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1) + if err != nil { + t.Fatalf("NewChunkGroup: %v", err) + } + fh := &FileHandle{ + fh: FileHandleId(1), + inode: inode, + wfs: wfs, + entry: &LockedEntry{Entry: entry}, + entryChunkGroup: chunkGroup, + } + wfs.fhMap.inode2fh[inode] = fh + wfs.fhMap.fh2inode[fh.fh] = inode + + const iterations = 2000 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}}) + } + }() + + // A read past EOF returns before touching the volume tier, but only after + // the racy size/chunk snapshot has run. + go func() { + defer wg.Done() + buff := make([]byte, 16) + for i := 0; i < iterations; i++ { + fh.readFromChunks(buff, 1<<62) + } + }() + + wg.Wait() +}