mount: fix SetAttr/GetAttr crash from concurrent chunk append under writebackCache (#9667)

* mount: hold the entry lock while reading chunk size in GetAttr/SetAttr

Async upload workers append chunks to an open handle's shared entry under
the LockedEntry lock (FileHandle.AddChunks), but GetAttr and SetAttr
computed FileSize by iterating entry.Chunks without taking it. A concurrent
append that reallocated the backing array tore the slice read and crashed in
filer.TotalSize. Surfaces with -writebackCache, where handles stay open and
flush asynchronously while metadata ops keep arriving.

Take the LockedEntry lock for those reads (and SetAttr's truncate rewrite).

* mount: re-read entry under the lock in GetAttr/SetAttr

If SetEntry swapped the handle's entry pointer between maybeReadEntry and the
lock acquisition, the old pointer is orphaned. Re-read fh.entry.Entry under
the lock so SetAttr mutates the live entry instead of losing the update, and
GetAttr reports the current one.

* mount: cover the truncate path in TestAttrChunkRace

Alternate SetAttr between mtime-only and a shrinking size so the test also
exercises the entry.Chunks rewrite under fh.entry.Lock, not just the read-side
size walk.

* mount: snapshot chunks under the entry lock on the read path

readFromChunks holds fh.entryLock (excludes SetAttr) but not the LockedEntry
lock the async uploader appends under, so IsInRemoteOnly, the FileSize
fallback, and the RDMA/peer chunk walks read entry.Chunks while AddChunks
reallocated it — the same torn-slice crash as GetAttr/SetAttr.

Snapshot size, inline content, and the chunk list under a brief LockedEntry
RLock, then hand the snapshot to the RDMA/peer helpers instead of holding the
lock across network I/O. The captured slice stays valid: append never mutates
the old backing array, and truncate is excluded by the fh.entryLock.
This commit is contained in:
Chris Lu
2026-05-24 23:49:41 -07:00
committed by GitHub
parent fef49c2d75
commit 68cae26c0b
4 changed files with 205 additions and 14 deletions

View File

@@ -35,7 +35,12 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
entry := fh.GetEntry()
if entry.IsInRemoteOnly() {
// IsInRemoteOnly inspects entry.Chunks, so take the LockedEntry lock the
// async uploader appends under.
entry.RLock()
remoteOnly := entry.Entry.IsInRemoteOnly()
entry.RUnlock()
if remoteOnly {
glog.V(4).Infof("download remote entry %s", fileFullPath)
err := fh.downloadRemoteEntry(entry)
if err != nil {
@@ -44,10 +49,21 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
}
}
fileSize := int64(entry.Attributes.FileSize)
// Snapshot size, inline content, and the chunk list under the LockedEntry
// lock. Async upload workers append chunks under this lock (AddChunks), so
// reading entry.Chunks / FileSize without it races with the slice
// reallocation and can crash in filer.TotalSize. The captured slice headers
// stay valid afterwards: append never mutates the old backing array, and
// truncate is excluded by the fh.entryLock held for this whole read.
entry.RLock()
pbEntry := entry.Entry
fileSize := int64(pbEntry.Attributes.FileSize)
if fileSize == 0 {
fileSize = int64(filer.FileSize(entry.GetEntry()))
fileSize = int64(filer.FileSize(pbEntry))
}
content := pbEntry.Content
chunks := pbEntry.Chunks
entry.RUnlock()
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
@@ -59,15 +75,15 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return 0, 0, io.EOF
}
if offset < int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
if offset < int64(len(content)) {
totalRead := copy(buff, content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), 0, nil
}
// Try RDMA acceleration first if available
if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled {
totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry)
totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, chunks)
if err == nil {
glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), ts, nil
@@ -79,7 +95,7 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
// Any failure falls through transparently. See design-weed-mount-
// peer-chunk-sharing.md §4.3.
if fh.wfs.option.PeerEnabled && fh.wfs.peerGrpcServer != nil {
totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, entry)
totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, chunks)
if err == nil {
glog.V(4).Infof("peer read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), ts, nil
@@ -104,13 +120,13 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte
return int64(totalRead), ts, err
}
// tryRDMARead attempts to read file data using RDMA acceleration
func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) {
// tryRDMARead attempts to read file data using RDMA acceleration. chunks is a
// snapshot captured under the LockedEntry lock by the caller.
func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) {
// For now, we'll try to read the chunks directly using RDMA
// This is a simplified approach - in a full implementation, we'd need to
// handle chunk boundaries, multiple chunks, etc.
chunks := entry.GetEntry().Chunks
if len(chunks) == 0 {
return 0, 0, fmt.Errorf("no chunks available for RDMA read")
}

View File

@@ -53,7 +53,8 @@ const maxPeerFetchChunkBytes = 64 * 1024 * 1024
// end-to-end against FileChunk.ETag.
// 4. On success, populate chunk_cache and enqueue an announce so
// other mounts can discover us as a new holder.
func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) {
// chunks is a snapshot captured under the LockedEntry lock by the caller.
func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) {
if fh.wfs.peerRegistrar == nil || fh.wfs.peerConnPool == nil {
return 0, 0, fmt.Errorf("peer sharing not configured")
}
@@ -63,7 +64,7 @@ func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []by
if readStop > fileSize {
readStop = fileSize
}
dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), entry.GetEntry().Chunks, offset, readStop)
dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), chunks, offset, readStop)
if err != nil {
return 0, 0, fmt.Errorf("resolve manifest: %w", err)
}

View File

@@ -23,10 +23,21 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
}
inode := input.NodeId
path, _, entry, status := wfs.maybeReadEntry(inode)
path, fh, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
out.AttrValid = wfs.attrValidSec
// When an open handle owns the entry, async upload workers append
// chunks under the LockedEntry lock; take it for reading so FileSize
// does not iterate the chunk slice mid-reallocation. Re-read under the
// lock in case SetEntry swapped the pointer since maybeReadEntry.
if fh != nil {
fh.entry.RLock()
entry = fh.entry.Entry
}
wfs.setAttrByPbEntry(&out.Attr, inode, entry, true)
if fh != nil {
fh.entry.RUnlock()
}
wfs.applyInMemoryAtime(&out.Attr, inode)
if entry.IsDirectory {
wfs.applyInMemoryDirMtime(&out.Attr, inode)
@@ -40,7 +51,9 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
out.AttrValid = wfs.attrValidSec
// Use shared lock to prevent race with Write operations
fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock)
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true)
fh.entry.RLock()
wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.Entry, true)
fh.entry.RUnlock()
wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
wfs.applyInMemoryAtime(&out.Attr, inode)
out.Nlink = 0
@@ -65,6 +78,15 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
if fh != nil {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
// entry is the handle's shared LockedEntry.Entry. Async upload workers
// mutate its Chunks slice under the LockedEntry lock (AddChunks); hold
// that same lock so the truncate and FileSize reads below don't tear
// against a concurrent append. Re-read under the lock in case SetEntry
// swapped the pointer since maybeReadEntry, so we don't mutate an
// orphaned entry and lose the update.
fh.entry.Lock()
defer fh.entry.Unlock()
entry = fh.entry.Entry
}
wormEnforced, wormEnabled := wfs.wormEnforcedForEntry(path, entry)

View File

@@ -0,0 +1,152 @@
package mount
import (
"sync"
"testing"
"github.com/seaweedfs/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// TestAttrChunkRace guards the locking around an open handle's chunk slice.
//
// With writebackCache, async upload workers append chunks to an open file
// handle's shared entry under the LockedEntry lock (FileHandle.AddChunks),
// while metadata ops compute the file size by iterating entry.Chunks. SetAttr
// and GetAttr used to read that slice without the LockedEntry lock, so a
// concurrent append that reallocated the backing array produced a torn slice
// read and a nil pointer dereference in filer.TotalSize. Run under -race.
func TestAttrChunkRace(t *testing.T) {
wfs := &WFS{
option: &Option{},
inodeToPath: NewInodeToPath(util.FullPath("/"), 0),
fhMap: NewFileHandleToInode(),
openMtimeCache: make(map[uint64][2]int64, 8),
}
const inode = uint64(42)
fullPath := util.FullPath("/dir/sample.txt")
wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true)
entry := &filer_pb.Entry{
Name: "sample.txt",
Attributes: &filer_pb.FuseAttributes{FileMode: 0644},
}
chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1)
if err != nil {
t.Fatalf("NewChunkGroup: %v", err)
}
fh := &FileHandle{
fh: FileHandleId(1),
inode: inode,
wfs: wfs,
entry: &LockedEntry{Entry: entry},
entryChunkGroup: chunkGroup,
}
wfs.fhMap.inode2fh[inode] = fh
wfs.fhMap.fh2inode[fh.fh] = inode
const iterations = 2000
var wg sync.WaitGroup
wg.Add(3)
// Async uploader: append chunks, reallocating the backing array.
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}})
}
}()
// SetAttr: mtime-only recomputes FileSize by iterating chunks; a shrinking
// size takes the truncate path that rewrites entry.Chunks under the lock.
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
in := &fuse.SetAttrIn{}
in.NodeId = inode
if i%2 == 0 {
in.Valid = fuse.FATTR_MTIME
in.Mtime = uint64(i)
} else {
in.Valid = fuse.FATTR_SIZE
in.Size = uint64(i % 8)
}
var out fuse.AttrOut
wfs.SetAttr(nil, in, &out)
}
}()
// GetAttr also computes FileSize by iterating chunks.
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
in := &fuse.GetAttrIn{}
in.NodeId = inode
var out fuse.AttrOut
wfs.GetAttr(nil, in, &out)
}
}()
wg.Wait()
}
// TestReadFromChunksRace guards the read path's chunk-slice access. The read
// path holds fh.entryLock (which excludes SetAttr) but not the LockedEntry lock
// the async uploader appends under, so readFromChunks used to compute FileSize
// and walk entry.Chunks while AddChunks reallocated the slice. Run under -race.
func TestReadFromChunksRace(t *testing.T) {
wfs := &WFS{
option: &Option{},
inodeToPath: NewInodeToPath(util.FullPath("/"), 0),
fhMap: NewFileHandleToInode(),
}
const inode = uint64(42)
fullPath := util.FullPath("/dir/sample.txt")
wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true)
// FileSize 0 forces readFromChunks down the filer.FileSize(chunks) branch.
entry := &filer_pb.Entry{
Name: "sample.txt",
Attributes: &filer_pb.FuseAttributes{FileMode: 0644},
}
chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1)
if err != nil {
t.Fatalf("NewChunkGroup: %v", err)
}
fh := &FileHandle{
fh: FileHandleId(1),
inode: inode,
wfs: wfs,
entry: &LockedEntry{Entry: entry},
entryChunkGroup: chunkGroup,
}
wfs.fhMap.inode2fh[inode] = fh
wfs.fhMap.fh2inode[fh.fh] = inode
const iterations = 2000
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}})
}
}()
// A read past EOF returns before touching the volume tier, but only after
// the racy size/chunk snapshot has run.
go func() {
defer wg.Done()
buff := make([]byte, 16)
for i := 0; i < iterations; i++ {
fh.readFromChunks(buff, 1<<62)
}
}()
wg.Wait()
}