From f438cc3544a96f87a11ef820b683a703ea5431f8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 22 Apr 2026 16:47:01 -0700 Subject: [PATCH] fix(volume_server): refuse ReceiveFile overwrite of mounted EC shard (#9184) (#9186) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test(volume_server): reproduce #9184 ReceiveFile truncating a mounted shard ReceiveFile for an EC shard calls os.Create(filePath) which opens the path with O_TRUNC. When the shard is already mounted, the in-memory EcVolume holds a file descriptor against the same inode, so a second ReceiveFile call for the same (volume, shard) truncates the live shard file beneath the reader. Reproducer: generate and mount shard 0 for a populated volume, capture the on-disk size, then send a smaller payload for the same shard via ReceiveFile. The current handler accepts the overwrite and leaves the shard truncated in place; this test pins that behavior. When the fix lands the server should reject (or rename-then-swap) and this test must be inverted. * fix(volume_server): refuse ReceiveFile overwrite of mounted EC shard ReceiveFile used os.Create on EC shard paths, which opens with O_TRUNC and truncates in place. When an EC shard is already mounted, the in-memory EcVolume holds file descriptors against the same inodes, so the truncation corrupts the live shard beneath any ongoing read. On retries of an EC task this produced the "missing parts" class of errors in #9184. The fix rejects any ReceiveFile for an EC volume that currently has mounted shards. The caller must unmount before retrying — silent truncation is never an acceptable outcome. Non-EC writes and ReceiveFile for volumes that have never been mounted on this server continue to work as before. Tests: - TestReceiveFileRejectsOverwriteOfMountedEcShard: mounts a shard, attempts an overwrite, asserts the error response and that the on-disk file and live reads are undisturbed. - TestReceiveFileAllowsEcShardWhenNoMount: pins the common-case contract that a first write to a target still succeeds. * fix(volume-rust): refuse ReceiveFile overwrite of mounted EC shard Mirror the Go-side change: reject receive_file for any EC volume that currently has mounted shards on this server. std::fs::File::create truncates in place and the in-memory EcVolume holds fds on the same inodes, so an overwrite would corrupt live readers. --- seaweed-volume/src/server/grpc_server.rs | 10 + .../grpc/ec_receive_truncates_mounted_test.go | 186 ++++++++++++++++++ weed/server/volume_grpc_copy.go | 10 + 3 files changed, 206 insertions(+) create mode 100644 test/volume_server/grpc/ec_receive_truncates_mounted_test.go diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index c06ec5f6e..d152c9cb0 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1425,6 +1425,16 @@ impl VolumeServer for VolumeGrpcService { // Determine file path let path = if info.is_ec_volume { let store = self.state.store.read().unwrap(); + // std::fs::File::create truncates in place; a mounted + // EcVolume holds fds on the same inodes, so overwriting + // corrupts live readers. + if store.has_ec_volume(VolumeId(info.volume_id)) { + resp_error = Some(format!( + "ec volume {} is mounted; unmount before ReceiveFile", + info.volume_id + )); + break; + } // disk_id=0 means "unset" (protobuf default), so auto-select // mirrors VolumeEcShardsCopy: prefer a disk already holding // this volume's shards, then any HDD, then any disk. diff --git a/test/volume_server/grpc/ec_receive_truncates_mounted_test.go b/test/volume_server/grpc/ec_receive_truncates_mounted_test.go new file mode 100644 index 000000000..d207ae0c2 --- /dev/null +++ b/test/volume_server/grpc/ec_receive_truncates_mounted_test.go @@ -0,0 +1,186 @@ +package volume_server_grpc_test + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +func TestReceiveFileRejectsOverwriteOfMountedEcShard(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const volumeID = uint32(91841) + framework.AllocateVolume(t, grpcClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 918401, 0x9184CAFE) + uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, + []byte("ec-receive-overwrite-content-for-issue-9184-repro")) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, + Collection: "", + }); err != nil { + t.Fatalf("VolumeEcShardsGenerate: %v", err) + } + + if _, err := grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, + Collection: "", + ShardIds: []uint32{0}, + }); err != nil { + t.Fatalf("VolumeEcShardsMount: %v", err) + } + + dataDir := filepath.Join(clusterHarness.BaseDir(), "volume") + shardPath := filepath.Join(dataDir, fmt.Sprintf("%d.ec00", volumeID)) + origInfo, err := os.Stat(shardPath) + if err != nil { + t.Fatalf("stat mounted shard %s: %v", shardPath, err) + } + origSize := origInfo.Size() + if origSize == 0 { + t.Fatalf("mounted shard %s unexpectedly empty", shardPath) + } + + readStream, err := grpcClient.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + VolumeId: volumeID, + ShardId: 0, + Offset: 0, + Size: 1, + }) + if err != nil { + t.Fatalf("VolumeEcShardRead (pre): %v", err) + } + if _, err := readStream.Recv(); err != nil { + t.Fatalf("VolumeEcShardRead Recv (pre): %v", err) + } + + overwritePayload := []byte("bug-9184-overwrite") + receiveStream, err := grpcClient.ReceiveFile(ctx) + if err != nil { + t.Fatalf("ReceiveFile stream create: %v", err) + } + if err = receiveStream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_Info{ + Info: &volume_server_pb.ReceiveFileInfo{ + VolumeId: volumeID, + Ext: ".ec00", + Collection: "", + IsEcVolume: true, + ShardId: 0, + FileSize: uint64(len(overwritePayload)), + }, + }, + }); err != nil { + t.Fatalf("ReceiveFile send info: %v", err) + } + resp, err := receiveStream.CloseAndRecv() + if err != nil { + t.Logf("ReceiveFile rejected at stream level: %v", err) + } else { + if resp.GetError() == "" { + t.Fatalf("expected ReceiveFile to reject overwrite of mounted shard, got success: %+v", resp) + } + if !strings.Contains(resp.GetError(), "mounted") { + t.Fatalf("expected error to mention mounted; got: %s", resp.GetError()) + } + } + + afterInfo, err := os.Stat(shardPath) + if err != nil { + t.Fatalf("stat shard after rejected overwrite: %v", err) + } + if afterInfo.Size() != origSize { + t.Fatalf("shard %s was modified despite rejection: size was %d, now %d", + shardPath, origSize, afterInfo.Size()) + } + + postStream, err := grpcClient.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + VolumeId: volumeID, + ShardId: 0, + Offset: 0, + Size: 1, + }) + if err != nil { + t.Fatalf("VolumeEcShardRead (post): %v", err) + } + if _, err := postStream.Recv(); err != nil { + t.Fatalf("VolumeEcShardRead Recv (post): %v", err) + } + + t.Logf("ReceiveFile correctly refused overwrite; mounted shard intact at %d bytes", afterInfo.Size()) +} + +func TestReceiveFileAllowsEcShardWhenNoMount(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + const volumeID = uint32(91843) + const collection = "ec-receive-no-mount" + payload := []byte("ok-to-receive-not-mounted") + + stream, err := grpcClient.ReceiveFile(ctx) + if err != nil { + t.Fatalf("ReceiveFile stream create: %v", err) + } + if err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_Info{ + Info: &volume_server_pb.ReceiveFileInfo{ + VolumeId: volumeID, + Ext: ".ec00", + Collection: collection, + IsEcVolume: true, + ShardId: 0, + FileSize: uint64(len(payload)), + }, + }, + }); err != nil { + t.Fatalf("ReceiveFile send info: %v", err) + } + if err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_FileContent{FileContent: payload}, + }); err != nil { + t.Fatalf("ReceiveFile send content: %v", err) + } + resp, err := stream.CloseAndRecv() + if err != nil { + t.Fatalf("ReceiveFile close: %v", err) + } + if resp.GetError() != "" { + t.Fatalf("expected success on unmounted volume, got error: %s", resp.GetError()) + } + if resp.GetBytesWritten() != uint64(len(payload)) { + t.Fatalf("bytes_written mismatch: got %d want %d", resp.GetBytesWritten(), len(payload)) + } +} diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 508bf26f3..e1aa7812c 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -561,6 +561,16 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize) if fileInfo.IsEcVolume { + // os.Create below truncates in place; a mounted EcVolume + // holds fds on the same inodes, so overwriting corrupts + // live readers. + if _, mounted := vs.store.FindEcVolume(needle.VolumeId(fileInfo.VolumeId)); mounted { + glog.Errorf("ReceiveFile: ec volume %d is mounted; refusing overwrite for %s", fileInfo.VolumeId, fileInfo.Ext) + return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ + Error: fmt.Sprintf("ec volume %d is mounted; unmount before ReceiveFile", fileInfo.VolumeId), + }) + } + // disk_id=0 means "unset" (protobuf default), so auto-select // mirrors VolumeEcShardsCopy: prefer a disk already holding // this volume's shards, then any HDD, then any disk.