From 16ba70f856c9ef3bf5053af70a2e4b2a1beffcc7 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sat, 4 Apr 2026 09:18:07 -0700 Subject: [PATCH] refactor: make bounded recovery observation events replica-scoped Carry replica-scoped addressing through bounded recovery planning and completion events so the core no longer depends on a volume-only observation seam. This preserves the current single-replica catch-up and rebuilding behavior while aligning the observation side with the replica-scoped command path. Made-with: Cursor --- sw-block/.private/phase/phase-16-log.md | 60 +++++++++++++++++++ sw-block/.private/phase/phase-16.md | 47 ++++++++++++++- sw-block/engine/replication/engine.go | 16 ++++- sw-block/engine/replication/event.go | 10 +++- .../replication/phase14_command_test.go | 4 +- .../engine/replication/runtime/executor.go | 10 ++-- .../replication/runtime/executor_test.go | 38 +++++++----- .../replication/runtime/rebuild_status.go | 3 +- weed/server/block_recovery.go | 26 ++++---- 9 files changed, 174 insertions(+), 40 deletions(-) diff --git a/sw-block/.private/phase/phase-16-log.md b/sw-block/.private/phase/phase-16-log.md index 668566d22..26df4b0c2 100644 --- a/sw-block/.private/phase/phase-16-log.md +++ b/sw-block/.private/phase/phase-16-log.md @@ -852,3 +852,63 @@ Conclusion: widening is reduced 3. this slice still does not claim broad multi-replica startup ownership or full recovery-loop closure + +--- + +#### `16G` Start Note Rev 1 + +Date: 2026-04-04 +Scope: replica-scoped recovery observation events on the bounded core-present +paths + +Why this slice exists: + +1. `16F` made recovery commands and pending matching replica-scoped +2. but recovery planning / completion events still only identify the volume +3. that leaves the bounded recovery loop with one remaining volume-scoped seam + before any later multi-replica widening can be evaluated cleanly + +Chosen implementation rule: + +1. make bounded recovery observation events carry `replicaID` +2. make `start_catchup` / `start_rebuild` command emission consume the + event-scoped `replicaID` +3. do not yet claim broad multi-replica recovery ownership or a per-replica + outward projection model + +--- + +#### `16G` Delivery Note Rev 1 + +Date: 2026-04-04 +Scope: replica-scoped recovery observation events on the bounded core-present +paths + +What changed: + +1. `sw-block/engine/replication/event.go` + - bounded recovery planning / completion events now carry `replicaID` +2. `sw-block/engine/replication/engine.go` + - bounded `start_catchup` / `start_rebuild` command emission now consumes + the event-scoped `replicaID` +3. `sw-block/engine/replication/runtime` + - runtime callbacks and rebuild-commit shaping preserve replica-scoped + addressing +4. `weed/server/block_recovery.go` + - recovery planning / completion events emitted back into the core now carry + the source `replicaID` + +Proof / evidence: + +1. `go test ./...` from `sw-block/engine/replication` +2. `go test ./weed/server -count=1 -timeout 120s -run "Test(P16B_|BlockService_(ApplyAssignments_(PrimaryRole_UsesCoreStartRecoveryTaskForCatchUp|RebuildingRole_UsesCoreRecoveryPathWithoutLegacyDirectStart|RebuildingRole_PreservesLegacyFallbackWithoutCore)|DebugInfoForVolume|CollectBlockVolumeHeartbeat|ReadinessSnapshot|HeartbeatReplicaDegraded))"` +3. result: `PASS` + +Conclusion: + +1. the bounded recovery loop no longer depends on a volume-only recovery event + seam +2. both recovery command addressing and recovery observation addressing are now + replica-scoped on the bounded path +3. this slice still does not claim broad multi-replica startup ownership or + full recovery-loop closure diff --git a/sw-block/.private/phase/phase-16.md b/sw-block/.private/phase/phase-16.md index efe498b5c..e83f57f8b 100644 --- a/sw-block/.private/phase/phase-16.md +++ b/sw-block/.private/phase/phase-16.md @@ -242,6 +242,47 @@ Evidence: 1. focused working-tree change after `145327498` +### `16G`: Replica-Scoped Recovery Observation Events + +Goal: + +1. remove the remaining volume-scoped recovery observation addressing on the + bounded core-present path +2. make recovery planning / completion events identify the intended `replicaID` + explicitly + +Acceptance object: + +1. bounded recovery observation events carry `replicaID` +2. bounded `start_catchup` / `start_rebuild` command emission consumes the + event-scoped `replicaID` +3. current single-replica catch-up and rebuilding proofs remain green +4. this slice still does not yet claim broad multi-replica recovery ownership + +Current chosen path: + +1. `CatchUpPlanned` carries `replicaID` +2. `CatchUpCompleted` carries `replicaID` +3. `NeedsRebuildObserved` / `RebuildStarted` / `RebuildCommitted` carry + `replicaID` +4. bounded runtime helpers and host callbacks preserve that addressing + +Status: + +1. delivered + +Delivered result: + +1. bounded recovery observation events now carry `replicaID` +2. bounded `start_catchup` / `start_rebuild` command emission now consumes the + event-scoped `replicaID` +3. current single-replica catch-up and rebuilding paths still behave the same, + but no longer depend on a volume-only recovery event seam + +Evidence: + +1. focused working-tree change after `b304b8e21` + ## Current Checkpoint Review Target The current review target is the current widened bounded runtime checkpoint @@ -295,12 +336,14 @@ boundary: 7. `16F` delivered: - recovery execution commands / pending matching are replica-scoped on the same bounded paths +8. `16G` delivered: + - recovery observation events are replica-scoped on those same bounded paths After this checkpoint: 1. keep `legacy P4` only as a compatibility guard 2. the next bounded semantic/runtime decision is whether to widen startup - ownership beyond the single-replica catch-up path now that recovery - addressing is replica-scoped + ownership beyond the single-replica catch-up path now that both command and + observation recovery seams are replica-scoped 3. do not yet claim full recovery-loop closure 4. do not broaden into launch claims diff --git a/sw-block/engine/replication/engine.go b/sw-block/engine/replication/engine.go index 9e6487398..3ba806849 100644 --- a/sw-block/engine/replication/engine.go +++ b/sw-block/engine/replication/engine.go @@ -91,7 +91,7 @@ func (e *CoreEngine) ApplyEvent(ev Event) ApplyResult { st.Boundary.TargetLSN = v.TargetLSN } st.Recovery.Reason = "" - if replicaID, ok := st.recoveryCommandReplicaID(); ok && st.shouldStartCatchUp(replicaID, v.TargetLSN) { + if replicaID, ok := st.recoveryCommandReplicaIDFromEvent(v.ReplicaID); ok && st.shouldStartCatchUp(replicaID, v.TargetLSN) { cmds = append(cmds, StartCatchUpCommand{ VolumeID: st.VolumeID, ReplicaID: replicaID, @@ -152,7 +152,7 @@ func (e *CoreEngine) ApplyEvent(ev Event) ApplyResult { if v.TargetLSN > st.Boundary.TargetLSN { st.Boundary.TargetLSN = v.TargetLSN } - if replicaID, ok := st.recoveryCommandReplicaID(); ok && st.shouldStartRebuild(replicaID, v.TargetLSN) { + if replicaID, ok := st.recoveryCommandReplicaIDFromEvent(v.ReplicaID); ok && st.shouldStartRebuild(replicaID, v.TargetLSN) { cmds = append(cmds, StartRebuildCommand{ VolumeID: st.VolumeID, ReplicaID: replicaID, @@ -458,6 +458,18 @@ func (st *VolumeState) recoveryCommandReplicaID() (string, bool) { return st.DesiredReplicas[0].ReplicaID, true } +func (st *VolumeState) recoveryCommandReplicaIDFromEvent(replicaID string) (string, bool) { + if replicaID != "" { + for _, replica := range st.DesiredReplicas { + if replica.ReplicaID == replicaID { + return replicaID, true + } + } + return "", false + } + return st.recoveryCommandReplicaID() +} + func (st *VolumeState) bootstrapReason() string { switch { case !st.Readiness.RoleApplied: diff --git a/sw-block/engine/replication/event.go b/sw-block/engine/replication/event.go index e9b35fecf..4b0d6aba8 100644 --- a/sw-block/engine/replication/event.go +++ b/sw-block/engine/replication/event.go @@ -88,6 +88,7 @@ func (e CheckpointAdvanced) VolumeID() string { return e.ID } // CatchUpPlanned freezes the current replay target as bounded recovery truth. type CatchUpPlanned struct { + ReplicaID string ID string TargetLSN uint64 } @@ -97,6 +98,7 @@ func (e CatchUpPlanned) VolumeID() string { return e.ID } // RecoveryProgressObserved updates achieved replay/rebuild progress without // implying publication or durability closure by itself. type RecoveryProgressObserved struct { + ReplicaID string ID string AchievedLSN uint64 } @@ -106,6 +108,7 @@ func (e RecoveryProgressObserved) VolumeID() string { return e.ID } // CatchUpCompleted closes a previously planned catch-up path at an explicit // achieved boundary. type CatchUpCompleted struct { + ReplicaID string ID string AchievedLSN uint64 } @@ -114,8 +117,9 @@ func (e CatchUpCompleted) VolumeID() string { return e.ID } // NeedsRebuildObserved is a fail-closed rebuild escalation. type NeedsRebuildObserved struct { - ID string - Reason string + ReplicaID string + ID string + Reason string } func (e NeedsRebuildObserved) VolumeID() string { return e.ID } @@ -123,6 +127,7 @@ func (e NeedsRebuildObserved) VolumeID() string { return e.ID } // RebuildStarted moves the recovery truth from blocked-needs-rebuild into an // explicit rebuilding path with a frozen target. type RebuildStarted struct { + ReplicaID string ID string TargetLSN uint64 } @@ -131,6 +136,7 @@ func (e RebuildStarted) VolumeID() string { return e.ID } // RebuildCommitted clears the rebuild condition with bounded durable truth. type RebuildCommitted struct { + ReplicaID string ID string AchievedLSN uint64 FlushedLSN uint64 diff --git a/sw-block/engine/replication/phase14_command_test.go b/sw-block/engine/replication/phase14_command_test.go index 953bc3016..3bcd669b5 100644 --- a/sw-block/engine/replication/phase14_command_test.go +++ b/sw-block/engine/replication/phase14_command_test.go @@ -145,7 +145,7 @@ func TestPhase14_CommandSequence_CatchUpStartIsBounded(t *testing.T) { core.ApplyEvent(ShipperConfiguredObserved{ID: "vol-cmd-catchup"}) core.ApplyEvent(ShipperConnectedObserved{ID: "vol-cmd-catchup"}) - result := core.ApplyEvent(CatchUpPlanned{ID: "vol-cmd-catchup", TargetLSN: 55}) + result := core.ApplyEvent(CatchUpPlanned{ID: "vol-cmd-catchup", ReplicaID: "replica-1", TargetLSN: 55}) assertCommandNames(t, result.Commands, []string{ "start_catchup", "publish_projection", @@ -176,7 +176,7 @@ func TestPhase14_CommandSequence_RebuildStartIsBounded(t *testing.T) { }) core.ApplyEvent(NeedsRebuildObserved{ID: "vol-cmd-rebuild", Reason: "gap_too_large"}) - result := core.ApplyEvent(RebuildStarted{ID: "vol-cmd-rebuild", TargetLSN: 80}) + result := core.ApplyEvent(RebuildStarted{ID: "vol-cmd-rebuild", ReplicaID: "replica-1", TargetLSN: 80}) assertCommandNames(t, result.Commands, []string{ "start_rebuild", "publish_projection", diff --git a/sw-block/engine/replication/runtime/executor.go b/sw-block/engine/replication/runtime/executor.go index ed1fbdd0e..6cd172059 100644 --- a/sw-block/engine/replication/runtime/executor.go +++ b/sw-block/engine/replication/runtime/executor.go @@ -7,12 +7,12 @@ import engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" // IO bindings and receives completion notifications. type RecoveryCallbacks interface { // OnCatchUpCompleted is called after successful catch-up execution. - OnCatchUpCompleted(volumeID string, achievedLSN uint64) + OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64) // OnRebuildCompleted is called after successful rebuild execution. // The host should read the post-rebuild snapshot and emit the // appropriate core event. - OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) + OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan) } // ExecuteCatchUpPlan runs a catch-up plan using the supplied IO binding @@ -22,6 +22,7 @@ func ExecuteCatchUpPlan( plan *engine.RecoveryPlan, io engine.CatchUpIO, volumeID string, + replicaID string, callbacks RecoveryCallbacks, ) error { exec := engine.NewCatchUpExecutor(driver, plan) @@ -34,7 +35,7 @@ func ExecuteCatchUpPlan( if achievedLSN == 0 { achievedLSN = plan.CatchUpStartLSN } - callbacks.OnCatchUpCompleted(volumeID, achievedLSN) + callbacks.OnCatchUpCompleted(volumeID, replicaID, achievedLSN) } return nil } @@ -46,6 +47,7 @@ func ExecuteRebuildPlan( plan *engine.RecoveryPlan, io engine.RebuildIO, volumeID string, + replicaID string, callbacks RecoveryCallbacks, ) error { exec := engine.NewRebuildExecutor(driver, plan) @@ -54,7 +56,7 @@ func ExecuteRebuildPlan( return err } if callbacks != nil { - callbacks.OnRebuildCompleted(volumeID, plan) + callbacks.OnRebuildCompleted(volumeID, replicaID, plan) } return nil } diff --git a/sw-block/engine/replication/runtime/executor_test.go b/sw-block/engine/replication/runtime/executor_test.go index 5f6146ec7..c483c55bd 100644 --- a/sw-block/engine/replication/runtime/executor_test.go +++ b/sw-block/engine/replication/runtime/executor_test.go @@ -7,24 +7,28 @@ import ( ) type fakeCallbacks struct { - catchUpCalled bool - catchUpVol string - catchUpLSN uint64 + catchUpCalled bool + catchUpVol string + catchUpReplica string + catchUpLSN uint64 - rebuildCalled bool - rebuildVol string - rebuildPlan *engine.RecoveryPlan + rebuildCalled bool + rebuildVol string + rebuildReplica string + rebuildPlan *engine.RecoveryPlan } -func (f *fakeCallbacks) OnCatchUpCompleted(volumeID string, achievedLSN uint64) { +func (f *fakeCallbacks) OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64) { f.catchUpCalled = true f.catchUpVol = volumeID + f.catchUpReplica = replicaID f.catchUpLSN = achievedLSN } -func (f *fakeCallbacks) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) { +func (f *fakeCallbacks) OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan) { f.rebuildCalled = true f.rebuildVol = volumeID + f.rebuildReplica = replicaID f.rebuildPlan = plan } @@ -50,7 +54,7 @@ func TestExecuteCatchUpPlan_CallsbackOnSuccess(t *testing.T) { t.Fatal(err) } - err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", cb) + err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", cb) if err != nil { t.Fatal(err) } @@ -60,6 +64,9 @@ func TestExecuteCatchUpPlan_CallsbackOnSuccess(t *testing.T) { if cb.catchUpVol != "vol1" { t.Fatalf("vol=%s", cb.catchUpVol) } + if cb.catchUpReplica != "vol1/vs2" { + t.Fatalf("replica=%s", cb.catchUpReplica) + } if cb.catchUpLSN != 100 { t.Fatalf("achievedLSN=%d", cb.catchUpLSN) } @@ -74,7 +81,7 @@ func TestExecuteCatchUpPlan_AchievedLSNMatchesTarget(t *testing.T) { } // The plan's CatchUpTarget is derived from storage state. // The callback should receive that same target as achievedLSN. - err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", cb) + err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", cb) if err != nil { t.Fatal(err) } @@ -100,7 +107,7 @@ func TestExecuteRebuildPlan_CallsbackOnSuccess(t *testing.T) { t.Fatal(err) } - err = ExecuteRebuildPlan(driver, plan, &noopRebuildIO{}, "vol2", cb) + err = ExecuteRebuildPlan(driver, plan, &noopRebuildIO{}, "vol2", "vol2/vs2", cb) if err != nil { t.Fatal(err) } @@ -110,6 +117,9 @@ func TestExecuteRebuildPlan_CallsbackOnSuccess(t *testing.T) { if cb.rebuildVol != "vol2" { t.Fatalf("vol=%s", cb.rebuildVol) } + if cb.rebuildReplica != "vol2/vs2" { + t.Fatalf("replica=%s", cb.rebuildReplica) + } if cb.rebuildPlan == nil { t.Fatal("rebuild plan not passed to callback") } @@ -122,7 +132,7 @@ func TestExecuteCatchUpPlan_NilCallbacksSafe(t *testing.T) { t.Fatal(err) } // nil callbacks should not panic. - if err := ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", nil); err != nil { + if err := ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", nil); err != nil { t.Fatal(err) } } @@ -132,12 +142,12 @@ func TestExecuteCatchUpPlan_NilCallbacksSafe(t *testing.T) { type noopCatchUpIO struct{} func (noopCatchUpIO) StreamWALEntries(start, end uint64) (uint64, error) { return end, nil } -func (noopCatchUpIO) TruncateWAL(lsn uint64) error { return nil } +func (noopCatchUpIO) TruncateWAL(lsn uint64) error { return nil } type noopRebuildIO struct{} func (noopRebuildIO) StreamWALEntries(start, end uint64) (uint64, error) { return end, nil } -func (noopRebuildIO) TruncateWAL(lsn uint64) error { return nil } +func (noopRebuildIO) TruncateWAL(lsn uint64) error { return nil } func (noopRebuildIO) TransferSnapshot(lsn uint64) error { return nil } func (noopRebuildIO) TransferFullBase(lsn uint64) (uint64, error) { return lsn, nil } diff --git a/sw-block/engine/replication/runtime/rebuild_status.go b/sw-block/engine/replication/runtime/rebuild_status.go index a0d2e2bd3..3d761a3be 100644 --- a/sw-block/engine/replication/runtime/rebuild_status.go +++ b/sw-block/engine/replication/runtime/rebuild_status.go @@ -13,7 +13,7 @@ type RebuildCompletionStatus struct { // DeriveRebuildCommitted computes the RebuildCommitted event from // post-rebuild status and the original plan. This is the reusable // shaping logic — the host only needs to supply the raw snapshot values. -func DeriveRebuildCommitted(volumeID string, status RebuildCompletionStatus, plan *engine.RecoveryPlan) engine.RebuildCommitted { +func DeriveRebuildCommitted(volumeID, replicaID string, status RebuildCompletionStatus, plan *engine.RecoveryPlan) engine.RebuildCommitted { flushedLSN := status.CommittedLSN if flushedLSN == 0 { flushedLSN = plan.RebuildTargetLSN @@ -27,6 +27,7 @@ func DeriveRebuildCommitted(volumeID string, status RebuildCompletionStatus, pla achievedLSN = checkpointLSN } return engine.RebuildCommitted{ + ReplicaID: replicaID, ID: volumeID, AchievedLSN: achievedLSN, FlushedLSN: flushedLSN, diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go index 23e5cbb12..9338c2933 100644 --- a/weed/server/block_recovery.go +++ b/weed/server/block_recovery.go @@ -311,7 +311,7 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, ass Plan: plan, CatchUpIO: rctx.executor, }) - bs.applyCoreEvent(engine.CatchUpPlanned{ID: rctx.volPath, TargetLSN: plan.CatchUpTarget}) + bs.applyCoreEvent(engine.CatchUpPlanned{ID: rctx.volPath, ReplicaID: replicaID, TargetLSN: plan.CatchUpTarget}) if rm.coord.Has(replicaID) { rm.coord.Cancel(replicaID, "start_catchup_not_emitted") return @@ -321,7 +321,7 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, ass if plan.Proof != nil && plan.Proof.Reason != "" { reason = plan.Proof.Reason } - bs.applyCoreEvent(engine.NeedsRebuildObserved{ID: rctx.volPath, Reason: reason}) + bs.applyCoreEvent(engine.NeedsRebuildObserved{ID: rctx.volPath, ReplicaID: replicaID, Reason: reason}) return } @@ -365,7 +365,7 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, ass if rm.OnPendingExecution != nil { rm.OnPendingExecution(rctx.volPath, pe) } - bs.applyCoreEvent(engine.RebuildStarted{ID: rctx.volPath, TargetLSN: plan.RebuildTargetLSN}) + bs.applyCoreEvent(engine.RebuildStarted{ID: rctx.volPath, ReplicaID: replicaID, TargetLSN: plan.RebuildTargetLSN}) if rm.coord.Has(replicaID) { rm.coord.Cancel(replicaID, "start_rebuild_not_emitted") } @@ -378,7 +378,7 @@ func (rm *RecoveryManager) ExecutePendingCatchUp(replicaID string, targetLSN uin if pe == nil || pe.Driver == nil || pe.Plan == nil { return nil } - return rt.ExecuteCatchUpPlan(pe.Driver, pe.Plan, pe.CatchUpIO, pe.VolumeID, rm) + return rt.ExecuteCatchUpPlan(pe.Driver, pe.Plan, pe.CatchUpIO, pe.VolumeID, pe.ReplicaID, rm) } func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uint64) error { @@ -386,25 +386,25 @@ func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uin if pe == nil || pe.Driver == nil || pe.Plan == nil { return nil } - return rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, rm) + return rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, pe.ReplicaID, rm) } // RecoveryCallbacks implementation — host-side completion notifications. -func (rm *RecoveryManager) OnCatchUpCompleted(volumeID string, achievedLSN uint64) { - glog.V(0).Infof("recovery: catch-up completed for %s (achievedLSN=%d)", volumeID, achievedLSN) +func (rm *RecoveryManager) OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64) { + glog.V(0).Infof("recovery: catch-up completed for %s via %s (achievedLSN=%d)", volumeID, replicaID, achievedLSN) if rm.bs != nil && rm.bs.v2Core != nil { - rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, AchievedLSN: achievedLSN}) + rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, ReplicaID: replicaID, AchievedLSN: achievedLSN}) } } -func (rm *RecoveryManager) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) { - glog.V(0).Infof("recovery: rebuild completed for %s", volumeID) +func (rm *RecoveryManager) OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan) { + glog.V(0).Infof("recovery: rebuild completed for %s via %s", volumeID, replicaID) if rm.bs == nil || rm.bs.v2Core == nil { return } status := rm.readRebuildStatus(volumeID) - ev := rt.DeriveRebuildCommitted(volumeID, status, plan) + ev := rt.DeriveRebuildCommitted(volumeID, replicaID, status, plan) rm.bs.applyCoreEvent(ev) } @@ -430,7 +430,7 @@ func (rm *RecoveryManager) readRebuildStatus(volumeID string) rt.RebuildCompleti // Core-present paths use ExecutePendingCatchUp/ExecutePendingRebuild instead. func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) { - err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, rm) + err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, replicaID, rm) if err != nil { if ctx.Err() != nil { glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err) @@ -441,7 +441,7 @@ func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, r } func (rm *RecoveryManager) executeLegacyRebuild(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) { - err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, rm) + err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, replicaID, rm) if err != nil { if ctx.Err() != nil { glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err)