diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 10f84af13..4e84ddec8 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1511,9 +1511,11 @@ impl VolumeServer for VolumeGrpcService { // EcVolume holds fds on the same inodes, so overwriting // corrupts live readers. if store.has_ec_volume(VolumeId(info.volume_id)) { + let mounted_disks = + store.find_ec_volume_disk_ids(VolumeId(info.volume_id)); resp_error = Some(format!( - "ec volume {} is mounted; unmount before ReceiveFile", - info.volume_id + "ec volume {} is mounted on disk_ids:{:?}; unmount before ReceiveFile", + info.volume_id, mounted_disks )); break; } diff --git a/seaweed-volume/src/server/heartbeat.rs b/seaweed-volume/src/server/heartbeat.rs index 4809393cf..2f5272c5d 100644 --- a/seaweed-volume/src/server/heartbeat.rs +++ b/seaweed-volume/src/server/heartbeat.rs @@ -344,6 +344,12 @@ fn diff_ec_shard_delta_messages( if !current.contains_key(key) { let mut deleted = message.clone(); deleted.shard_sizes = vec![0]; + tracing::info!( + volume_id = deleted.id, + disk_id = deleted.disk_id, + ec_index_bits = deleted.ec_index_bits, + "deletes ec shards" + ); deleted_ec_shards.push(deleted); } } diff --git a/seaweed-volume/src/storage/store.rs b/seaweed-volume/src/storage/store.rs index baf5a42bd..52daf00f6 100644 --- a/seaweed-volume/src/storage/store.rs +++ b/seaweed-volume/src/storage/store.rs @@ -699,10 +699,20 @@ impl Store { // Walk all locations rather than stopping at the first with the // vid — split-disk reconciled volumes can have the same vid on // multiple disks, with the target shard on any of them. - for loc in &mut self.locations { - if loc.has_ec_volume(vid) { - loc.unmount_ec_shards(vid, &[shard_id]); + for disk_id in 0..self.locations.len() { + let has_shard = self.locations[disk_id] + .find_ec_volume(vid) + .is_some_and(|ec_vol| ec_vol.has_shard(shard_id as u8)); + if !has_shard { + continue; } + tracing::info!( + volume_id = vid.0, + shard_id, + disk_id, + "UnmountEcShards" + ); + self.locations[disk_id].unmount_ec_shards(vid, &[shard_id]); } // Go returns nil if shard not found (no error) Ok(()) @@ -733,6 +743,21 @@ impl Store { self.locations.iter().any(|loc| loc.has_ec_volume(vid)) } + /// Returns every disk_id on this store that has an EcVolume entry + /// for `vid`. Useful for diagnostic logging when a single + /// `has_ec_volume` hit hides which disk is actually holding the + /// mount (e.g., the ReceiveFile mounted-volume guard). + /// Mirrors Go's `Store.FindEcVolumeDiskIds`. + pub fn find_ec_volume_disk_ids(&self, vid: VolumeId) -> Vec { + let mut ids = Vec::new(); + for (idx, loc) in self.locations.iter().enumerate() { + if loc.has_ec_volume(vid) { + ids.push(idx as u32); + } + } + ids + } + /// Returns the index of the disk location that has `(vid, shard_id)` /// mounted, if any. Mirrors Go's `Store.findEcShard` and is the /// right primitive for read/unmount/delete operations on a single diff --git a/test/plugin_workers/fake_volume_server.go b/test/plugin_workers/fake_volume_server.go index e38aa77a1..6d3ce469f 100644 --- a/test/plugin_workers/fake_volume_server.go +++ b/test/plugin_workers/fake_volume_server.go @@ -292,6 +292,20 @@ func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_serv return &volume_server_pb.VolumeEcShardsMountResponse{}, nil } +// VolumeEcShardsUnmount is a no-op stub: the worker's pre-distribute +// cleanup calls it against every destination, and the fake server has no +// mounted state to clear. +func (v *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) { + return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil +} + +// VolumeEcShardsDelete is a no-op stub paired with VolumeEcShardsUnmount +// above; the fake server doesn't persist shard files beyond what +// ReceiveFile wrote, so there's nothing to remove. +func (v *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil +} + func (v *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) { if req == nil { return nil, fmt.Errorf("VolumeEcShardsInfo request is nil") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index f48fc51f7..c363f823c 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -300,7 +300,7 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp }, } si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(&ecShardMessage) - glog.V(0).Infof("volume server %s:%d deletes ec shards from %d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, si.String()) + glog.V(0).Infof("volume server %s:%d deletes ec shards from %d disk_id:%d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, ecShardMessage.DiskId, si.String()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 4385516ea..f8cd66e24 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -567,9 +567,10 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive // holds fds on the same inodes, so overwriting corrupts // live readers. if _, mounted := vs.store.FindEcVolume(needle.VolumeId(fileInfo.VolumeId)); mounted { - glog.Errorf("ReceiveFile: ec volume %d is mounted; refusing overwrite for %s", fileInfo.VolumeId, fileInfo.Ext) + mountedDisks := vs.store.FindEcVolumeDiskIds(needle.VolumeId(fileInfo.VolumeId)) + glog.Errorf("ReceiveFile: ec volume %d is mounted on disk_ids:%v; refusing overwrite for %s", fileInfo.VolumeId, mountedDisks, fileInfo.Ext) return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{ - Error: fmt.Sprintf("ec volume %d is mounted; unmount before ReceiveFile", fileInfo.VolumeId), + Error: fmt.Sprintf("ec volume %d is mounted on disk_ids:%v; unmount before ReceiveFile", fileInfo.VolumeId, mountedDisks), }) } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 389e33207..4186fb5e1 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -327,9 +327,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds) - for _, location := range vs.store.Locations { + for diskId, location := range vs.store.Locations { if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil { - glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err) + glog.Errorf("deleteEcShards from disk_id:%d %s %s.%v: %v", diskId, location.Directory, bName, req.ShardIds, err) return nil, err } } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 199a8b7d6..cef80faf9 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -237,12 +237,12 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar location := s.Locations[diskId] if deleted := location.UnloadEcShard(vid, shardId); deleted { - glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId) + glog.V(0).Infof("UnmountEcShards %d.%d disk_id:%d", vid, shardId, diskId) s.DeletedEcShardsChan <- message return nil } - return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId) + return fmt.Errorf("UnmountEcShards %d.%d not found on disk %d", vid, shardId, diskId) } func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) { @@ -269,6 +269,20 @@ func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, boo return nil, false } +// FindEcVolumeDiskIds returns every disk_id on this store that has an +// EcVolume entry for the given volume. Useful for diagnostic logging +// when a single FindEcVolume hit hides which disk is actually holding +// the mount (e.g., the ReceiveFile mounted-volume guard). +func (s *Store) FindEcVolumeDiskIds(vid needle.VolumeId) []uint32 { + var ids []uint32 + for diskId, location := range s.Locations { + if _, found := location.FindEcVolume(vid); found { + ids = append(ids, uint32(diskId)) + } + } + return ids +} + // shardFiles is a list of shard files, which is used to return the shard locations func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) { for _, location := range s.Locations { diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index fc735d6a3..5c80e6712 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -691,54 +691,49 @@ func (t *ErasureCodingTask) getReplicas() []string { return replicas } -// cleanupStaleEcShards unmounts and deletes partial EC shards still mounted -// on destinations from a previous failed encode. Safe by ordering: runs -// after the source .dat is in the worker's workdir and a full local shard -// set is generated. Per-destination errors are aggregated, not short-circuited. +// cleanupStaleEcShards unmounts and deletes any EC shards still mounted on +// destinations from a previous failed encode of this volume. Targets every +// node we plan to write to (t.targets) plus every node detection saw EC +// shards on (t.sources with ShardIds set), and issues the cleanup over the +// full shard range so a stale topology snapshot — or shards landed by a +// prior attempt that haven't heartbeated yet — cannot leave the +// mounted-volume guard tripped during distributeEcShards. Safe by ordering: +// runs after the source .dat is in the worker's workdir and a full local +// shard set is generated. Per-destination errors are aggregated, not +// short-circuited. func (t *ErasureCodingTask) cleanupStaleEcShards(ctx context.Context) error { - if len(t.sources) == 0 { - return nil - } - - // Union shard ids per destination node — volume-server cleanup walks - // every DiskLocation, so per-disk source rows collapse to one RPC. - perNode := make(map[string]map[uint32]struct{}) + nodes := make(map[string]struct{}) for _, source := range t.sources { - if source == nil || len(source.ShardIds) == 0 { + if source == nil || source.Node == "" || len(source.ShardIds) == 0 { continue } - shardSet, ok := perNode[source.Node] - if !ok { - shardSet = make(map[uint32]struct{}) - perNode[source.Node] = shardSet - } - for _, shardID := range source.ShardIds { - shardSet[shardID] = struct{}{} - } + nodes[source.Node] = struct{}{} } - if len(perNode) == 0 { + for _, target := range t.targets { + if target == nil || target.Node == "" { + continue + } + nodes[target.Node] = struct{}{} + } + if len(nodes) == 0 { return nil } - var cleanupErrors []string - for node, shardSet := range perNode { - shardIds := make([]uint32, 0, len(shardSet)) - for id := range shardSet { - shardIds = append(shardIds, id) - } + allShards := fullShardIdRange(t.dataShards, t.parityShards) + var cleanupErrors []string + for node := range nodes { t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, "destination": node, - "shard_ids": shardIds, + "shard_ids": allShards, }).Info("Clearing stale EC shards on destination before re-distribute") - if err := unmountAndDeleteEcShards(ctx, t.grpcDialOption, node, t.volumeID, t.collection, shardIds); err != nil { + if err := unmountAndDeleteEcShards(ctx, t.grpcDialOption, node, t.volumeID, t.collection, allShards); err != nil { cleanupErrors = append(cleanupErrors, fmt.Sprintf("%s: %v", node, err)) t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, "destination": node, - "shard_ids": shardIds, "error": err.Error(), }).Error("Failed to clear stale EC shards on destination") } @@ -751,6 +746,24 @@ func (t *ErasureCodingTask) cleanupStaleEcShards(ctx context.Context) error { return nil } +// fullShardIdRange builds [0..total-1] for unmount/delete RPCs. Falls back +// to erasure_coding.TotalShardsCount when the task's ratio is unset (early +// callers, tests); the helper never returns an empty slice. +func fullShardIdRange(dataShards, parityShards int32) []uint32 { + total := int(dataShards + parityShards) + if total <= 0 { + total = erasure_coding.TotalShardsCount + } + if total > erasure_coding.MaxShardCount { + total = erasure_coding.MaxShardCount + } + ids := make([]uint32, total) + for i := range ids { + ids[i] = uint32(i) + } + return ids +} + // unmountAndDeleteEcShards unmounts then deletes the named shards on one // destination. Unmount must precede delete (delete requires the shard be // unmounted); both RPCs are idempotent against missing shards. diff --git a/weed/worker/tasks/erasure_coding/ec_task_stale_shard_cleanup_test.go b/weed/worker/tasks/erasure_coding/ec_task_stale_shard_cleanup_test.go index 1e7f1c683..96abbbfe8 100644 --- a/weed/worker/tasks/erasure_coding/ec_task_stale_shard_cleanup_test.go +++ b/weed/worker/tasks/erasure_coding/ec_task_stale_shard_cleanup_test.go @@ -106,6 +106,144 @@ func TestCleanupStaleEcShardsBeforeDistribute(t *testing.T) { "EC-shard sources must not appear in replica delete list") } +// The destination has more shards mounted than t.sources lists — simulates +// detection-time topology missing shards (e.g., a prior attempt's mount +// hadn't heartbeated yet, or different shards live on a sibling disk). The +// cleanup must clear FindEcVolume regardless, by issuing unmount/delete +// over the full shard range, so the next ReceiveFile lands. +func TestCleanupStaleEcShardsClearsShardsBeyondSources(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const ( + volumeID = uint32(94780) + collection = "ec-9478-beyond" + ) + + framework.AllocateVolume(t, grpcClient, volumeID, collection) + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 9478001, 0x9478FACE) + upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, + []byte("payload-for-shards-beyond-sources")) + _ = framework.ReadAllAndClose(t, upResp) + require.Equal(t, http.StatusCreated, upResp.StatusCode) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: collection, + }) + require.NoError(t, err) + + // Five shards mounted on the destination. + mountedShards := []uint32{0, 1, 2, 3, 4} + _, err = grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: collection, + ShardIds: mountedShards, + }) + require.NoError(t, err) + + // Detection only "saw" two of them — the rest must still get cleared. + knownToDetection := []uint32{0, 1} + + task := NewErasureCodingTask( + "stale-ec-beyond-sources", + clusterHarness.VolumeServerAddress(), + volumeID, + collection, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + task.dataShards = erasure_coding.DataShardsCount + task.parityShards = erasure_coding.ParityShardsCount + task.sources = []*worker_pb.TaskSource{ + { + Node: clusterHarness.VolumeServerAddress(), + VolumeId: volumeID, + ShardIds: knownToDetection, + }, + } + + require.NoError(t, task.cleanupStaleEcShards(ctx)) + + _, infoErr := grpcClient.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{VolumeId: volumeID}) + require.Error(t, infoErr, + "all mounted shards must be cleared even though detection only listed a subset") + + shardPath := makeTinyEcShardFile(t) + require.NoError(t, + sendShardViaReceiveFile(ctx, grpcClient, volumeID, collection, 4, shardPath), + "ReceiveFile for a shard outside detection's snapshot must succeed after cleanup") +} + +// Cleanup also targets fresh destinations from t.targets even when no +// EC-shard source row exists for them. This catches concurrent-attempt +// fallout where a previous worker mounted shards on a node we're now +// writing to but the topology snapshot is stale. +func TestCleanupStaleEcShardsCoversTargetsWithoutSources(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + const ( + volumeID = uint32(94781) + collection = "ec-9478-targets-only" + ) + + framework.AllocateVolume(t, grpcClient, volumeID, collection) + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 9478101, 0x947811CE) + upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, + []byte("payload-for-targets-only-cleanup")) + _ = framework.ReadAllAndClose(t, upResp) + require.Equal(t, http.StatusCreated, upResp.StatusCode) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: volumeID, Collection: collection, + }) + require.NoError(t, err) + _, err = grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: volumeID, Collection: collection, + ShardIds: []uint32{0, 1}, + }) + require.NoError(t, err) + + task := NewErasureCodingTask( + "stale-ec-targets-only", + clusterHarness.VolumeServerAddress(), + volumeID, + collection, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + task.dataShards = erasure_coding.DataShardsCount + task.parityShards = erasure_coding.ParityShardsCount + // No sources. The destination is named only as a target — the cleanup + // must still reach it. + task.targets = []*worker_pb.TaskTarget{ + {Node: clusterHarness.VolumeServerAddress(), VolumeId: volumeID, ShardIds: []uint32{0}}, + } + + require.NoError(t, task.cleanupStaleEcShards(ctx)) + + _, infoErr := grpcClient.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{VolumeId: volumeID}) + require.Error(t, infoErr, + "target-only destinations must also be cleaned even without a source row") +} + // Cleanup is a no-op when sources carry only the regular .dat replica. func TestCleanupStaleEcShardsSkipsRegularReplicas(t *testing.T) { if testing.Short() {