fix(volume_server): refuse ReceiveFile overwrite of mounted EC shard (#9184) (#9186)

* test(volume_server): reproduce #9184 ReceiveFile truncating a mounted shard

ReceiveFile for an EC shard calls os.Create(filePath) which opens the
path with O_TRUNC. When the shard is already mounted, the in-memory
EcVolume holds a file descriptor against the same inode, so a second
ReceiveFile call for the same (volume, shard) truncates the live shard
file beneath the reader.

Reproducer: generate and mount shard 0 for a populated volume, capture
the on-disk size, then send a smaller payload for the same shard via
ReceiveFile. The current handler accepts the overwrite and leaves the
shard truncated in place; this test pins that behavior. When the fix
lands the server should reject (or rename-then-swap) and this test
must be inverted.

* fix(volume_server): refuse ReceiveFile overwrite of mounted EC shard

ReceiveFile used os.Create on EC shard paths, which opens with
O_TRUNC and truncates in place. When an EC shard is already
mounted, the in-memory EcVolume holds file descriptors against the
same inodes, so the truncation corrupts the live shard beneath any
ongoing read. On retries of an EC task this produced the "missing
parts" class of errors in #9184.

The fix rejects any ReceiveFile for an EC volume that currently
has mounted shards. The caller must unmount before retrying —
silent truncation is never an acceptable outcome. Non-EC writes and
ReceiveFile for volumes that have never been mounted on this server
continue to work as before.

Tests:
- TestReceiveFileRejectsOverwriteOfMountedEcShard: mounts a shard,
  attempts an overwrite, asserts the error response and that the
  on-disk file and live reads are undisturbed.
- TestReceiveFileAllowsEcShardWhenNoMount: pins the common-case
  contract that a first write to a target still succeeds.

* fix(volume-rust): refuse ReceiveFile overwrite of mounted EC shard

Mirror the Go-side change: reject receive_file for any EC volume that
currently has mounted shards on this server. std::fs::File::create
truncates in place and the in-memory EcVolume holds fds on the same
inodes, so an overwrite would corrupt live readers.
This commit is contained in:
Chris Lu
2026-04-22 16:47:01 -07:00
committed by GitHub
parent 628363c4a6
commit f438cc3544
3 changed files with 206 additions and 0 deletions

View File

@@ -1425,6 +1425,16 @@ impl VolumeServer for VolumeGrpcService {
// Determine file path
let path = if info.is_ec_volume {
let store = self.state.store.read().unwrap();
// std::fs::File::create truncates in place; a mounted
// EcVolume holds fds on the same inodes, so overwriting
// corrupts live readers.
if store.has_ec_volume(VolumeId(info.volume_id)) {
resp_error = Some(format!(
"ec volume {} is mounted; unmount before ReceiveFile",
info.volume_id
));
break;
}
// disk_id=0 means "unset" (protobuf default), so auto-select
// mirrors VolumeEcShardsCopy: prefer a disk already holding
// this volume's shards, then any HDD, then any disk.

View File

@@ -0,0 +1,186 @@
package volume_server_grpc_test
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
func TestReceiveFileRejectsOverwriteOfMountedEcShard(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const volumeID = uint32(91841)
framework.AllocateVolume(t, grpcClient, volumeID, "")
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 918401, 0x9184CAFE)
uploadResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid,
[]byte("ec-receive-overwrite-content-for-issue-9184-repro"))
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("upload expected 201, got %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID,
Collection: "",
}); err != nil {
t.Fatalf("VolumeEcShardsGenerate: %v", err)
}
if _, err := grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID,
Collection: "",
ShardIds: []uint32{0},
}); err != nil {
t.Fatalf("VolumeEcShardsMount: %v", err)
}
dataDir := filepath.Join(clusterHarness.BaseDir(), "volume")
shardPath := filepath.Join(dataDir, fmt.Sprintf("%d.ec00", volumeID))
origInfo, err := os.Stat(shardPath)
if err != nil {
t.Fatalf("stat mounted shard %s: %v", shardPath, err)
}
origSize := origInfo.Size()
if origSize == 0 {
t.Fatalf("mounted shard %s unexpectedly empty", shardPath)
}
readStream, err := grpcClient.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: volumeID,
ShardId: 0,
Offset: 0,
Size: 1,
})
if err != nil {
t.Fatalf("VolumeEcShardRead (pre): %v", err)
}
if _, err := readStream.Recv(); err != nil {
t.Fatalf("VolumeEcShardRead Recv (pre): %v", err)
}
overwritePayload := []byte("bug-9184-overwrite")
receiveStream, err := grpcClient.ReceiveFile(ctx)
if err != nil {
t.Fatalf("ReceiveFile stream create: %v", err)
}
if err = receiveStream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_Info{
Info: &volume_server_pb.ReceiveFileInfo{
VolumeId: volumeID,
Ext: ".ec00",
Collection: "",
IsEcVolume: true,
ShardId: 0,
FileSize: uint64(len(overwritePayload)),
},
},
}); err != nil {
t.Fatalf("ReceiveFile send info: %v", err)
}
resp, err := receiveStream.CloseAndRecv()
if err != nil {
t.Logf("ReceiveFile rejected at stream level: %v", err)
} else {
if resp.GetError() == "" {
t.Fatalf("expected ReceiveFile to reject overwrite of mounted shard, got success: %+v", resp)
}
if !strings.Contains(resp.GetError(), "mounted") {
t.Fatalf("expected error to mention mounted; got: %s", resp.GetError())
}
}
afterInfo, err := os.Stat(shardPath)
if err != nil {
t.Fatalf("stat shard after rejected overwrite: %v", err)
}
if afterInfo.Size() != origSize {
t.Fatalf("shard %s was modified despite rejection: size was %d, now %d",
shardPath, origSize, afterInfo.Size())
}
postStream, err := grpcClient.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: volumeID,
ShardId: 0,
Offset: 0,
Size: 1,
})
if err != nil {
t.Fatalf("VolumeEcShardRead (post): %v", err)
}
if _, err := postStream.Recv(); err != nil {
t.Fatalf("VolumeEcShardRead Recv (post): %v", err)
}
t.Logf("ReceiveFile correctly refused overwrite; mounted shard intact at %d bytes", afterInfo.Size())
}
func TestReceiveFileAllowsEcShardWhenNoMount(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
const volumeID = uint32(91843)
const collection = "ec-receive-no-mount"
payload := []byte("ok-to-receive-not-mounted")
stream, err := grpcClient.ReceiveFile(ctx)
if err != nil {
t.Fatalf("ReceiveFile stream create: %v", err)
}
if err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_Info{
Info: &volume_server_pb.ReceiveFileInfo{
VolumeId: volumeID,
Ext: ".ec00",
Collection: collection,
IsEcVolume: true,
ShardId: 0,
FileSize: uint64(len(payload)),
},
},
}); err != nil {
t.Fatalf("ReceiveFile send info: %v", err)
}
if err = stream.Send(&volume_server_pb.ReceiveFileRequest{
Data: &volume_server_pb.ReceiveFileRequest_FileContent{FileContent: payload},
}); err != nil {
t.Fatalf("ReceiveFile send content: %v", err)
}
resp, err := stream.CloseAndRecv()
if err != nil {
t.Fatalf("ReceiveFile close: %v", err)
}
if resp.GetError() != "" {
t.Fatalf("expected success on unmounted volume, got error: %s", resp.GetError())
}
if resp.GetBytesWritten() != uint64(len(payload)) {
t.Fatalf("bytes_written mismatch: got %d want %d", resp.GetBytesWritten(), len(payload))
}
}

View File

@@ -561,6 +561,16 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive
fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize)
if fileInfo.IsEcVolume {
// os.Create below truncates in place; a mounted EcVolume
// holds fds on the same inodes, so overwriting corrupts
// live readers.
if _, mounted := vs.store.FindEcVolume(needle.VolumeId(fileInfo.VolumeId)); mounted {
glog.Errorf("ReceiveFile: ec volume %d is mounted; refusing overwrite for %s", fileInfo.VolumeId, fileInfo.Ext)
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{
Error: fmt.Sprintf("ec volume %d is mounted; unmount before ReceiveFile", fileInfo.VolumeId),
})
}
// disk_id=0 means "unset" (protobuf default), so auto-select
// mirrors VolumeEcShardsCopy: prefer a disk already holding
// this volume's shards, then any HDD, then any disk.