diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index df458524d..86b637a4a 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -35,7 +35,12 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte entry := fh.GetEntry() - if entry.IsInRemoteOnly() { + // IsInRemoteOnly inspects entry.Chunks, so take the LockedEntry lock the + // async uploader appends under. + entry.RLock() + remoteOnly := entry.Entry.IsInRemoteOnly() + entry.RUnlock() + if remoteOnly { glog.V(4).Infof("download remote entry %s", fileFullPath) err := fh.downloadRemoteEntry(entry) if err != nil { @@ -44,10 +49,21 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte } } - fileSize := int64(entry.Attributes.FileSize) + // Snapshot size, inline content, and the chunk list under the LockedEntry + // lock. Async upload workers append chunks under this lock (AddChunks), so + // reading entry.Chunks / FileSize without it races with the slice + // reallocation and can crash in filer.TotalSize. The captured slice headers + // stay valid afterwards: append never mutates the old backing array, and + // truncate is excluded by the fh.entryLock held for this whole read. + entry.RLock() + pbEntry := entry.Entry + fileSize := int64(pbEntry.Attributes.FileSize) if fileSize == 0 { - fileSize = int64(filer.FileSize(entry.GetEntry())) + fileSize = int64(filer.FileSize(pbEntry)) } + content := pbEntry.Content + chunks := pbEntry.Chunks + entry.RUnlock() if fileSize == 0 { glog.V(1).Infof("empty fh %v", fileFullPath) @@ -59,15 +75,15 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return 0, 0, io.EOF } - if offset < int64(len(entry.Content)) { - totalRead := copy(buff, entry.Content[offset:]) + if offset < int64(len(content)) { + totalRead := copy(buff, content[offset:]) glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), 0, nil } // Try RDMA acceleration first if available if fh.wfs.rdmaClient != nil && fh.wfs.option.RdmaEnabled { - totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, entry) + totalRead, ts, err := fh.tryRDMARead(ctx, fileSize, buff, offset, chunks) if err == nil { glog.V(4).Infof("RDMA read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), ts, nil @@ -79,7 +95,7 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte // Any failure falls through transparently. See design-weed-mount- // peer-chunk-sharing.md ยง4.3. if fh.wfs.option.PeerEnabled && fh.wfs.peerGrpcServer != nil { - totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, entry) + totalRead, ts, err := fh.tryPeerRead(ctx, fileSize, buff, offset, chunks) if err == nil { glog.V(4).Infof("peer read successful for %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), ts, nil @@ -104,13 +120,13 @@ func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte return int64(totalRead), ts, err } -// tryRDMARead attempts to read file data using RDMA acceleration -func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { +// tryRDMARead attempts to read file data using RDMA acceleration. chunks is a +// snapshot captured under the LockedEntry lock by the caller. +func (fh *FileHandle) tryRDMARead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) { // For now, we'll try to read the chunks directly using RDMA // This is a simplified approach - in a full implementation, we'd need to // handle chunk boundaries, multiple chunks, etc. - chunks := entry.GetEntry().Chunks if len(chunks) == 0 { return 0, 0, fmt.Errorf("no chunks available for RDMA read") } diff --git a/weed/mount/peer_fetcher.go b/weed/mount/peer_fetcher.go index 6354283bc..c04b69410 100644 --- a/weed/mount/peer_fetcher.go +++ b/weed/mount/peer_fetcher.go @@ -53,7 +53,8 @@ const maxPeerFetchChunkBytes = 64 * 1024 * 1024 // end-to-end against FileChunk.ETag. // 4. On success, populate chunk_cache and enqueue an announce so // other mounts can discover us as a new holder. -func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, entry *LockedEntry) (int64, int64, error) { +// chunks is a snapshot captured under the LockedEntry lock by the caller. +func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []byte, offset int64, chunks []*filer_pb.FileChunk) (int64, int64, error) { if fh.wfs.peerRegistrar == nil || fh.wfs.peerConnPool == nil { return 0, 0, fmt.Errorf("peer sharing not configured") } @@ -63,7 +64,7 @@ func (fh *FileHandle) tryPeerRead(ctx context.Context, fileSize int64, buff []by if readStop > fileSize { readStop = fileSize } - dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), entry.GetEntry().Chunks, offset, readStop) + dataChunks, _, err := filer.ResolveChunkManifest(ctx, fh.wfs.LookupFn(), chunks, offset, readStop) if err != nil { return 0, 0, fmt.Errorf("resolve manifest: %w", err) } diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index fc0961fa1..9882b256c 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -23,10 +23,21 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse } inode := input.NodeId - path, _, entry, status := wfs.maybeReadEntry(inode) + path, fh, entry, status := wfs.maybeReadEntry(inode) if status == fuse.OK { out.AttrValid = wfs.attrValidSec + // When an open handle owns the entry, async upload workers append + // chunks under the LockedEntry lock; take it for reading so FileSize + // does not iterate the chunk slice mid-reallocation. Re-read under the + // lock in case SetEntry swapped the pointer since maybeReadEntry. + if fh != nil { + fh.entry.RLock() + entry = fh.entry.Entry + } wfs.setAttrByPbEntry(&out.Attr, inode, entry, true) + if fh != nil { + fh.entry.RUnlock() + } wfs.applyInMemoryAtime(&out.Attr, inode) if entry.IsDirectory { wfs.applyInMemoryDirMtime(&out.Attr, inode) @@ -40,7 +51,9 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse out.AttrValid = wfs.attrValidSec // Use shared lock to prevent race with Write operations fhActiveLock := wfs.fhLockTable.AcquireLock("GetAttr", fh.fh, util.SharedLock) - wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.GetEntry(), true) + fh.entry.RLock() + wfs.setAttrByPbEntry(&out.Attr, inode, fh.entry.Entry, true) + fh.entry.RUnlock() wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock) wfs.applyInMemoryAtime(&out.Attr, inode) out.Nlink = 0 @@ -65,6 +78,15 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if fh != nil { fh.entryLock.Lock() defer fh.entryLock.Unlock() + // entry is the handle's shared LockedEntry.Entry. Async upload workers + // mutate its Chunks slice under the LockedEntry lock (AddChunks); hold + // that same lock so the truncate and FileSize reads below don't tear + // against a concurrent append. Re-read under the lock in case SetEntry + // swapped the pointer since maybeReadEntry, so we don't mutate an + // orphaned entry and lose the update. + fh.entry.Lock() + defer fh.entry.Unlock() + entry = fh.entry.Entry } wormEnforced, wormEnabled := wfs.wormEnforcedForEntry(path, entry) diff --git a/weed/mount/weedfs_attr_race_test.go b/weed/mount/weedfs_attr_race_test.go new file mode 100644 index 000000000..0fd386121 --- /dev/null +++ b/weed/mount/weedfs_attr_race_test.go @@ -0,0 +1,152 @@ +package mount + +import ( + "sync" + "testing" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestAttrChunkRace guards the locking around an open handle's chunk slice. +// +// With writebackCache, async upload workers append chunks to an open file +// handle's shared entry under the LockedEntry lock (FileHandle.AddChunks), +// while metadata ops compute the file size by iterating entry.Chunks. SetAttr +// and GetAttr used to read that slice without the LockedEntry lock, so a +// concurrent append that reallocated the backing array produced a torn slice +// read and a nil pointer dereference in filer.TotalSize. Run under -race. +func TestAttrChunkRace(t *testing.T) { + wfs := &WFS{ + option: &Option{}, + inodeToPath: NewInodeToPath(util.FullPath("/"), 0), + fhMap: NewFileHandleToInode(), + openMtimeCache: make(map[uint64][2]int64, 8), + } + + const inode = uint64(42) + fullPath := util.FullPath("/dir/sample.txt") + wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true) + + entry := &filer_pb.Entry{ + Name: "sample.txt", + Attributes: &filer_pb.FuseAttributes{FileMode: 0644}, + } + chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1) + if err != nil { + t.Fatalf("NewChunkGroup: %v", err) + } + fh := &FileHandle{ + fh: FileHandleId(1), + inode: inode, + wfs: wfs, + entry: &LockedEntry{Entry: entry}, + entryChunkGroup: chunkGroup, + } + wfs.fhMap.inode2fh[inode] = fh + wfs.fhMap.fh2inode[fh.fh] = inode + + const iterations = 2000 + var wg sync.WaitGroup + wg.Add(3) + + // Async uploader: append chunks, reallocating the backing array. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}}) + } + }() + + // SetAttr: mtime-only recomputes FileSize by iterating chunks; a shrinking + // size takes the truncate path that rewrites entry.Chunks under the lock. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + in := &fuse.SetAttrIn{} + in.NodeId = inode + if i%2 == 0 { + in.Valid = fuse.FATTR_MTIME + in.Mtime = uint64(i) + } else { + in.Valid = fuse.FATTR_SIZE + in.Size = uint64(i % 8) + } + var out fuse.AttrOut + wfs.SetAttr(nil, in, &out) + } + }() + + // GetAttr also computes FileSize by iterating chunks. + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + in := &fuse.GetAttrIn{} + in.NodeId = inode + var out fuse.AttrOut + wfs.GetAttr(nil, in, &out) + } + }() + + wg.Wait() +} + +// TestReadFromChunksRace guards the read path's chunk-slice access. The read +// path holds fh.entryLock (which excludes SetAttr) but not the LockedEntry lock +// the async uploader appends under, so readFromChunks used to compute FileSize +// and walk entry.Chunks while AddChunks reallocated the slice. Run under -race. +func TestReadFromChunksRace(t *testing.T) { + wfs := &WFS{ + option: &Option{}, + inodeToPath: NewInodeToPath(util.FullPath("/"), 0), + fhMap: NewFileHandleToInode(), + } + + const inode = uint64(42) + fullPath := util.FullPath("/dir/sample.txt") + wfs.inodeToPath.Lookup(fullPath, 1, false, false, inode, true) + + // FileSize 0 forces readFromChunks down the filer.FileSize(chunks) branch. + entry := &filer_pb.Entry{ + Name: "sample.txt", + Attributes: &filer_pb.FuseAttributes{FileMode: 0644}, + } + chunkGroup, err := filer.NewChunkGroup(nil, nil, nil, 1) + if err != nil { + t.Fatalf("NewChunkGroup: %v", err) + } + fh := &FileHandle{ + fh: FileHandleId(1), + inode: inode, + wfs: wfs, + entry: &LockedEntry{Entry: entry}, + entryChunkGroup: chunkGroup, + } + wfs.fhMap.inode2fh[inode] = fh + wfs.fhMap.fh2inode[fh.fh] = inode + + const iterations = 2000 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + fh.AddChunks([]*filer_pb.FileChunk{{FileId: "x", Offset: int64(i), Size: 1}}) + } + }() + + // A read past EOF returns before touching the volume tier, but only after + // the racy size/chunk snapshot has run. + go func() { + defer wg.Done() + buff := make([]byte, 16) + for i := 0; i < iterations; i++ { + fh.readFromChunks(buff, 1<<62) + } + }() + + wg.Wait() +}