fix(ec): blanket-clean every destination over the full shard range (#9512)

* fix(ec): blanket-clean every destination over the full shard range

The previous cleanup pass walked t.sources only, with the shard ids the
topology had reported at detection time. In the wild, a destination can
end up with EC shards mounted that the topology snapshot didn't list —
shards on a sibling disk that hadn't heartbeated, or shards left over
from a concurrent attempt's mount step. FindEcVolume still returns
true, so the next ReceiveFile trips the mounted-volume guard.

Cleanup now unions t.sources (with ShardIds) and t.targets and issues
unmount + delete over [0..totalShards-1] on each. Both RPCs are
idempotent on missing shards, so the wider sweep is free.

Two new tests cover the gap: shards mounted beyond what t.sources
lists, and a target-only destination with no source row.

* log(ec): include disk_id in EC unmount/delete/refusal log lines

The current logs identify the volume and shard but leave disk_id off,
which makes the cross-server cleanup story hard to follow when
multiple disks of one server hold pieces of the same volume:

  UnmountEcShards 4121.1                              -> add disk_id
  ec volume video-recordings_4121 shard delete [1 5]  -> add per-loc disk_id
  volume server X:Y deletes ec shards from 4121 [...] -> add disk_id
  ReceiveFile: ec volume 4121 is mounted; refusing... -> add disk_ids

ReceiveFile's refusal now names the disk_ids actually holding the
mount so operators can see whether the next cleanup pass needs to
target a sibling disk. Added Store.FindEcVolumeDiskIds /
Store::find_ec_volume_disk_ids as the supporting primitive.

Mirrored in seaweed-volume/src/ (unmount log in Store::unmount_ec_shard,
heartbeat delete log in diff_ec_shard_delta_messages, refusal in the
ReceiveFile handler).

* test(ec): stub VolumeEcShardsUnmount/Delete on the fake volume server

The plugin-worker EC tests boot a fake volume server that embeds
UnimplementedVolumeServerServer. After the worker started calling
VolumeEcShardsUnmount + VolumeEcShardsDelete pre-distribute, the
default Unimplemented response surfaced as fourteen "method not
implemented" errors and TestErasureCodingExecutionEncodesShards
failed. Both RPCs are no-ops here — nothing on the fake server has
mounted state or persisted shard files to remove.
This commit is contained in:
Chris Lu
2026-05-17 11:31:37 -07:00
committed by GitHub
parent bf9110ebd3
commit 2a41e76101
10 changed files with 255 additions and 42 deletions

View File

@@ -1511,9 +1511,11 @@ impl VolumeServer for VolumeGrpcService {
// EcVolume holds fds on the same inodes, so overwriting
// corrupts live readers.
if store.has_ec_volume(VolumeId(info.volume_id)) {
let mounted_disks =
store.find_ec_volume_disk_ids(VolumeId(info.volume_id));
resp_error = Some(format!(
"ec volume {} is mounted; unmount before ReceiveFile",
info.volume_id
"ec volume {} is mounted on disk_ids:{:?}; unmount before ReceiveFile",
info.volume_id, mounted_disks
));
break;
}

View File

@@ -344,6 +344,12 @@ fn diff_ec_shard_delta_messages(
if !current.contains_key(key) {
let mut deleted = message.clone();
deleted.shard_sizes = vec![0];
tracing::info!(
volume_id = deleted.id,
disk_id = deleted.disk_id,
ec_index_bits = deleted.ec_index_bits,
"deletes ec shards"
);
deleted_ec_shards.push(deleted);
}
}

View File

@@ -699,10 +699,20 @@ impl Store {
// Walk all locations rather than stopping at the first with the
// vid — split-disk reconciled volumes can have the same vid on
// multiple disks, with the target shard on any of them.
for loc in &mut self.locations {
if loc.has_ec_volume(vid) {
loc.unmount_ec_shards(vid, &[shard_id]);
for disk_id in 0..self.locations.len() {
let has_shard = self.locations[disk_id]
.find_ec_volume(vid)
.is_some_and(|ec_vol| ec_vol.has_shard(shard_id as u8));
if !has_shard {
continue;
}
tracing::info!(
volume_id = vid.0,
shard_id,
disk_id,
"UnmountEcShards"
);
self.locations[disk_id].unmount_ec_shards(vid, &[shard_id]);
}
// Go returns nil if shard not found (no error)
Ok(())
@@ -733,6 +743,21 @@ impl Store {
self.locations.iter().any(|loc| loc.has_ec_volume(vid))
}
/// Returns every disk_id on this store that has an EcVolume entry
/// for `vid`. Useful for diagnostic logging when a single
/// `has_ec_volume` hit hides which disk is actually holding the
/// mount (e.g., the ReceiveFile mounted-volume guard).
/// Mirrors Go's `Store.FindEcVolumeDiskIds`.
pub fn find_ec_volume_disk_ids(&self, vid: VolumeId) -> Vec<u32> {
let mut ids = Vec::new();
for (idx, loc) in self.locations.iter().enumerate() {
if loc.has_ec_volume(vid) {
ids.push(idx as u32);
}
}
ids
}
/// Returns the index of the disk location that has `(vid, shard_id)`
/// mounted, if any. Mirrors Go's `Store.findEcShard` and is the
/// right primitive for read/unmount/delete operations on a single

View File

@@ -292,6 +292,20 @@ func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_serv
return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
}
// VolumeEcShardsUnmount is a no-op stub: the worker's pre-distribute
// cleanup calls it against every destination, and the fake server has no
// mounted state to clear.
func (v *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
}
// VolumeEcShardsDelete is a no-op stub paired with VolumeEcShardsUnmount
// above; the fake server doesn't persist shard files beyond what
// ReceiveFile wrote, so there's nothing to remove.
func (v *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}
func (v *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
if req == nil {
return nil, fmt.Errorf("VolumeEcShardsInfo request is nil")

View File

@@ -300,7 +300,7 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
},
}
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(&ecShardMessage)
glog.V(0).Infof("volume server %s:%d deletes ec shards from %d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, si.String())
glog.V(0).Infof("volume server %s:%d deletes ec shards from %d disk_id:%d [%s]", vs.store.Ip, vs.store.Port, ecShardMessage.Id, ecShardMessage.DiskId, si.String())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err

View File

@@ -567,9 +567,10 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive
// holds fds on the same inodes, so overwriting corrupts
// live readers.
if _, mounted := vs.store.FindEcVolume(needle.VolumeId(fileInfo.VolumeId)); mounted {
glog.Errorf("ReceiveFile: ec volume %d is mounted; refusing overwrite for %s", fileInfo.VolumeId, fileInfo.Ext)
mountedDisks := vs.store.FindEcVolumeDiskIds(needle.VolumeId(fileInfo.VolumeId))
glog.Errorf("ReceiveFile: ec volume %d is mounted on disk_ids:%v; refusing overwrite for %s", fileInfo.VolumeId, mountedDisks, fileInfo.Ext)
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{
Error: fmt.Sprintf("ec volume %d is mounted; unmount before ReceiveFile", fileInfo.VolumeId),
Error: fmt.Sprintf("ec volume %d is mounted on disk_ids:%v; unmount before ReceiveFile", fileInfo.VolumeId, mountedDisks),
})
}

View File

@@ -327,9 +327,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
for _, location := range vs.store.Locations {
for diskId, location := range vs.store.Locations {
if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
glog.Errorf("deleteEcShards from disk_id:%d %s %s.%v: %v", diskId, location.Directory, bName, req.ShardIds, err)
return nil, err
}
}

View File

@@ -237,12 +237,12 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
location := s.Locations[diskId]
if deleted := location.UnloadEcShard(vid, shardId); deleted {
glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
glog.V(0).Infof("UnmountEcShards %d.%d disk_id:%d", vid, shardId, diskId)
s.DeletedEcShardsChan <- message
return nil
}
return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
return fmt.Errorf("UnmountEcShards %d.%d not found on disk %d", vid, shardId, diskId)
}
func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) {
@@ -269,6 +269,20 @@ func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, boo
return nil, false
}
// FindEcVolumeDiskIds returns every disk_id on this store that has an
// EcVolume entry for the given volume. Useful for diagnostic logging
// when a single FindEcVolume hit hides which disk is actually holding
// the mount (e.g., the ReceiveFile mounted-volume guard).
func (s *Store) FindEcVolumeDiskIds(vid needle.VolumeId) []uint32 {
var ids []uint32
for diskId, location := range s.Locations {
if _, found := location.FindEcVolume(vid); found {
ids = append(ids, uint32(diskId))
}
}
return ids
}
// shardFiles is a list of shard files, which is used to return the shard locations
func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
for _, location := range s.Locations {

View File

@@ -691,54 +691,49 @@ func (t *ErasureCodingTask) getReplicas() []string {
return replicas
}
// cleanupStaleEcShards unmounts and deletes partial EC shards still mounted
// on destinations from a previous failed encode. Safe by ordering: runs
// after the source .dat is in the worker's workdir and a full local shard
// set is generated. Per-destination errors are aggregated, not short-circuited.
// cleanupStaleEcShards unmounts and deletes any EC shards still mounted on
// destinations from a previous failed encode of this volume. Targets every
// node we plan to write to (t.targets) plus every node detection saw EC
// shards on (t.sources with ShardIds set), and issues the cleanup over the
// full shard range so a stale topology snapshot — or shards landed by a
// prior attempt that haven't heartbeated yet — cannot leave the
// mounted-volume guard tripped during distributeEcShards. Safe by ordering:
// runs after the source .dat is in the worker's workdir and a full local
// shard set is generated. Per-destination errors are aggregated, not
// short-circuited.
func (t *ErasureCodingTask) cleanupStaleEcShards(ctx context.Context) error {
if len(t.sources) == 0 {
return nil
}
// Union shard ids per destination node — volume-server cleanup walks
// every DiskLocation, so per-disk source rows collapse to one RPC.
perNode := make(map[string]map[uint32]struct{})
nodes := make(map[string]struct{})
for _, source := range t.sources {
if source == nil || len(source.ShardIds) == 0 {
if source == nil || source.Node == "" || len(source.ShardIds) == 0 {
continue
}
shardSet, ok := perNode[source.Node]
if !ok {
shardSet = make(map[uint32]struct{})
perNode[source.Node] = shardSet
}
for _, shardID := range source.ShardIds {
shardSet[shardID] = struct{}{}
}
nodes[source.Node] = struct{}{}
}
if len(perNode) == 0 {
for _, target := range t.targets {
if target == nil || target.Node == "" {
continue
}
nodes[target.Node] = struct{}{}
}
if len(nodes) == 0 {
return nil
}
var cleanupErrors []string
for node, shardSet := range perNode {
shardIds := make([]uint32, 0, len(shardSet))
for id := range shardSet {
shardIds = append(shardIds, id)
}
allShards := fullShardIdRange(t.dataShards, t.parityShards)
var cleanupErrors []string
for node := range nodes {
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"destination": node,
"shard_ids": shardIds,
"shard_ids": allShards,
}).Info("Clearing stale EC shards on destination before re-distribute")
if err := unmountAndDeleteEcShards(ctx, t.grpcDialOption, node, t.volumeID, t.collection, shardIds); err != nil {
if err := unmountAndDeleteEcShards(ctx, t.grpcDialOption, node, t.volumeID, t.collection, allShards); err != nil {
cleanupErrors = append(cleanupErrors, fmt.Sprintf("%s: %v", node, err))
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"destination": node,
"shard_ids": shardIds,
"error": err.Error(),
}).Error("Failed to clear stale EC shards on destination")
}
@@ -751,6 +746,24 @@ func (t *ErasureCodingTask) cleanupStaleEcShards(ctx context.Context) error {
return nil
}
// fullShardIdRange builds [0..total-1] for unmount/delete RPCs. Falls back
// to erasure_coding.TotalShardsCount when the task's ratio is unset (early
// callers, tests); the helper never returns an empty slice.
func fullShardIdRange(dataShards, parityShards int32) []uint32 {
total := int(dataShards + parityShards)
if total <= 0 {
total = erasure_coding.TotalShardsCount
}
if total > erasure_coding.MaxShardCount {
total = erasure_coding.MaxShardCount
}
ids := make([]uint32, total)
for i := range ids {
ids[i] = uint32(i)
}
return ids
}
// unmountAndDeleteEcShards unmounts then deletes the named shards on one
// destination. Unmount must precede delete (delete requires the shard be
// unmounted); both RPCs are idempotent against missing shards.

View File

@@ -106,6 +106,144 @@ func TestCleanupStaleEcShardsBeforeDistribute(t *testing.T) {
"EC-shard sources must not appear in replica delete list")
}
// The destination has more shards mounted than t.sources lists — simulates
// detection-time topology missing shards (e.g., a prior attempt's mount
// hadn't heartbeated yet, or different shards live on a sibling disk). The
// cleanup must clear FindEcVolume regardless, by issuing unmount/delete
// over the full shard range, so the next ReceiveFile lands.
func TestCleanupStaleEcShardsClearsShardsBeyondSources(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const (
volumeID = uint32(94780)
collection = "ec-9478-beyond"
)
framework.AllocateVolume(t, grpcClient, volumeID, collection)
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 9478001, 0x9478FACE)
upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid,
[]byte("payload-for-shards-beyond-sources"))
_ = framework.ReadAllAndClose(t, upResp)
require.Equal(t, http.StatusCreated, upResp.StatusCode)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
_, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID, Collection: collection,
})
require.NoError(t, err)
// Five shards mounted on the destination.
mountedShards := []uint32{0, 1, 2, 3, 4}
_, err = grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID, Collection: collection,
ShardIds: mountedShards,
})
require.NoError(t, err)
// Detection only "saw" two of them — the rest must still get cleared.
knownToDetection := []uint32{0, 1}
task := NewErasureCodingTask(
"stale-ec-beyond-sources",
clusterHarness.VolumeServerAddress(),
volumeID,
collection,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
task.dataShards = erasure_coding.DataShardsCount
task.parityShards = erasure_coding.ParityShardsCount
task.sources = []*worker_pb.TaskSource{
{
Node: clusterHarness.VolumeServerAddress(),
VolumeId: volumeID,
ShardIds: knownToDetection,
},
}
require.NoError(t, task.cleanupStaleEcShards(ctx))
_, infoErr := grpcClient.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{VolumeId: volumeID})
require.Error(t, infoErr,
"all mounted shards must be cleared even though detection only listed a subset")
shardPath := makeTinyEcShardFile(t)
require.NoError(t,
sendShardViaReceiveFile(ctx, grpcClient, volumeID, collection, 4, shardPath),
"ReceiveFile for a shard outside detection's snapshot must succeed after cleanup")
}
// Cleanup also targets fresh destinations from t.targets even when no
// EC-shard source row exists for them. This catches concurrent-attempt
// fallout where a previous worker mounted shards on a node we're now
// writing to but the topology snapshot is stale.
func TestCleanupStaleEcShardsCoversTargetsWithoutSources(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
const (
volumeID = uint32(94781)
collection = "ec-9478-targets-only"
)
framework.AllocateVolume(t, grpcClient, volumeID, collection)
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 9478101, 0x947811CE)
upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid,
[]byte("payload-for-targets-only-cleanup"))
_ = framework.ReadAllAndClose(t, upResp)
require.Equal(t, http.StatusCreated, upResp.StatusCode)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
_, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID, Collection: collection,
})
require.NoError(t, err)
_, err = grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID, Collection: collection,
ShardIds: []uint32{0, 1},
})
require.NoError(t, err)
task := NewErasureCodingTask(
"stale-ec-targets-only",
clusterHarness.VolumeServerAddress(),
volumeID,
collection,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
task.dataShards = erasure_coding.DataShardsCount
task.parityShards = erasure_coding.ParityShardsCount
// No sources. The destination is named only as a target — the cleanup
// must still reach it.
task.targets = []*worker_pb.TaskTarget{
{Node: clusterHarness.VolumeServerAddress(), VolumeId: volumeID, ShardIds: []uint32{0}},
}
require.NoError(t, task.cleanupStaleEcShards(ctx))
_, infoErr := grpcClient.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{VolumeId: volumeID})
require.Error(t, infoErr,
"target-only destinations must also be cleaned even without a source row")
}
// Cleanup is a no-op when sources carry only the regular .dat replica.
func TestCleanupStaleEcShardsSkipsRegularReplicas(t *testing.T) {
if testing.Short() {