EC vacuum distribution updated for generation-aware mount/copy RPCs

This commit is contained in:
chrislu
2025-08-10 15:04:55 -07:00
parent de9399761b
commit 62d89fa60b

View File

@@ -156,7 +156,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(sourceNode),
Generation: t.targetGeneration, // copy new EC shards as G+1
Generation: t.sourceGeneration, // collect existing shards from source generation G
})
if copyErr != nil {
return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr)
@@ -167,6 +167,7 @@ func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) {
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToCopyBits.ToUint32Slice(),
Generation: t.sourceGeneration, // mount collected shards from source generation G
})
if mountErr != nil {
return fmt.Errorf("failed to mount shards %v on %s: %w", needToCopyBits.ShardIds(), targetNode, mountErr)
@@ -221,8 +222,9 @@ func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error
// distributeNewEcShards distributes the new EC shards across the cluster
func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error {
t.LogInfo("Distributing new EC shards", map[string]interface{}{
"volume_id": t.volumeID,
"source": sourceNode,
"volume_id": t.volumeID,
"source": sourceNode,
"target_generation": t.targetGeneration,
})
// For simplicity, we'll distribute to the same nodes as before
@@ -272,6 +274,7 @@ func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error
VolumeId: t.volumeID,
Collection: t.collection,
ShardIds: needToDistributeBits.ToUint32Slice(),
Generation: t.targetGeneration, // mount new EC shards as G+1
})
if mountErr != nil {
return fmt.Errorf("failed to mount new shards %v on %s: %w", needToDistributeBits.ShardIds(), targetNode, mountErr)