refactor: make bounded recovery observation events replica-scoped

Carry replica-scoped addressing through bounded recovery planning and completion events so the core no longer depends on a volume-only observation seam. This preserves the current single-replica catch-up and rebuilding behavior while aligning the observation side with the replica-scoped command path.

Made-with: Cursor
This commit is contained in:
pingqiu
2026-04-04 09:18:07 -07:00
parent b304b8e212
commit 16ba70f856
9 changed files with 174 additions and 40 deletions

View File

@@ -852,3 +852,63 @@ Conclusion:
widening is reduced
3. this slice still does not claim broad multi-replica startup ownership or
full recovery-loop closure
---
#### `16G` Start Note Rev 1
Date: 2026-04-04
Scope: replica-scoped recovery observation events on the bounded core-present
paths
Why this slice exists:
1. `16F` made recovery commands and pending matching replica-scoped
2. but recovery planning / completion events still only identify the volume
3. that leaves the bounded recovery loop with one remaining volume-scoped seam
before any later multi-replica widening can be evaluated cleanly
Chosen implementation rule:
1. make bounded recovery observation events carry `replicaID`
2. make `start_catchup` / `start_rebuild` command emission consume the
event-scoped `replicaID`
3. do not yet claim broad multi-replica recovery ownership or a per-replica
outward projection model
---
#### `16G` Delivery Note Rev 1
Date: 2026-04-04
Scope: replica-scoped recovery observation events on the bounded core-present
paths
What changed:
1. `sw-block/engine/replication/event.go`
- bounded recovery planning / completion events now carry `replicaID`
2. `sw-block/engine/replication/engine.go`
- bounded `start_catchup` / `start_rebuild` command emission now consumes
the event-scoped `replicaID`
3. `sw-block/engine/replication/runtime`
- runtime callbacks and rebuild-commit shaping preserve replica-scoped
addressing
4. `weed/server/block_recovery.go`
- recovery planning / completion events emitted back into the core now carry
the source `replicaID`
Proof / evidence:
1. `go test ./...` from `sw-block/engine/replication`
2. `go test ./weed/server -count=1 -timeout 120s -run "Test(P16B_|BlockService_(ApplyAssignments_(PrimaryRole_UsesCoreStartRecoveryTaskForCatchUp|RebuildingRole_UsesCoreRecoveryPathWithoutLegacyDirectStart|RebuildingRole_PreservesLegacyFallbackWithoutCore)|DebugInfoForVolume|CollectBlockVolumeHeartbeat|ReadinessSnapshot|HeartbeatReplicaDegraded))"`
3. result: `PASS`
Conclusion:
1. the bounded recovery loop no longer depends on a volume-only recovery event
seam
2. both recovery command addressing and recovery observation addressing are now
replica-scoped on the bounded path
3. this slice still does not claim broad multi-replica startup ownership or
full recovery-loop closure

View File

@@ -242,6 +242,47 @@ Evidence:
1. focused working-tree change after `145327498`
### `16G`: Replica-Scoped Recovery Observation Events
Goal:
1. remove the remaining volume-scoped recovery observation addressing on the
bounded core-present path
2. make recovery planning / completion events identify the intended `replicaID`
explicitly
Acceptance object:
1. bounded recovery observation events carry `replicaID`
2. bounded `start_catchup` / `start_rebuild` command emission consumes the
event-scoped `replicaID`
3. current single-replica catch-up and rebuilding proofs remain green
4. this slice still does not yet claim broad multi-replica recovery ownership
Current chosen path:
1. `CatchUpPlanned` carries `replicaID`
2. `CatchUpCompleted` carries `replicaID`
3. `NeedsRebuildObserved` / `RebuildStarted` / `RebuildCommitted` carry
`replicaID`
4. bounded runtime helpers and host callbacks preserve that addressing
Status:
1. delivered
Delivered result:
1. bounded recovery observation events now carry `replicaID`
2. bounded `start_catchup` / `start_rebuild` command emission now consumes the
event-scoped `replicaID`
3. current single-replica catch-up and rebuilding paths still behave the same,
but no longer depend on a volume-only recovery event seam
Evidence:
1. focused working-tree change after `b304b8e21`
## Current Checkpoint Review Target
The current review target is the current widened bounded runtime checkpoint
@@ -295,12 +336,14 @@ boundary:
7. `16F` delivered:
- recovery execution commands / pending matching are replica-scoped on the
same bounded paths
8. `16G` delivered:
- recovery observation events are replica-scoped on those same bounded paths
After this checkpoint:
1. keep `legacy P4` only as a compatibility guard
2. the next bounded semantic/runtime decision is whether to widen startup
ownership beyond the single-replica catch-up path now that recovery
addressing is replica-scoped
ownership beyond the single-replica catch-up path now that both command and
observation recovery seams are replica-scoped
3. do not yet claim full recovery-loop closure
4. do not broaden into launch claims

View File

@@ -91,7 +91,7 @@ func (e *CoreEngine) ApplyEvent(ev Event) ApplyResult {
st.Boundary.TargetLSN = v.TargetLSN
}
st.Recovery.Reason = ""
if replicaID, ok := st.recoveryCommandReplicaID(); ok && st.shouldStartCatchUp(replicaID, v.TargetLSN) {
if replicaID, ok := st.recoveryCommandReplicaIDFromEvent(v.ReplicaID); ok && st.shouldStartCatchUp(replicaID, v.TargetLSN) {
cmds = append(cmds, StartCatchUpCommand{
VolumeID: st.VolumeID,
ReplicaID: replicaID,
@@ -152,7 +152,7 @@ func (e *CoreEngine) ApplyEvent(ev Event) ApplyResult {
if v.TargetLSN > st.Boundary.TargetLSN {
st.Boundary.TargetLSN = v.TargetLSN
}
if replicaID, ok := st.recoveryCommandReplicaID(); ok && st.shouldStartRebuild(replicaID, v.TargetLSN) {
if replicaID, ok := st.recoveryCommandReplicaIDFromEvent(v.ReplicaID); ok && st.shouldStartRebuild(replicaID, v.TargetLSN) {
cmds = append(cmds, StartRebuildCommand{
VolumeID: st.VolumeID,
ReplicaID: replicaID,
@@ -458,6 +458,18 @@ func (st *VolumeState) recoveryCommandReplicaID() (string, bool) {
return st.DesiredReplicas[0].ReplicaID, true
}
func (st *VolumeState) recoveryCommandReplicaIDFromEvent(replicaID string) (string, bool) {
if replicaID != "" {
for _, replica := range st.DesiredReplicas {
if replica.ReplicaID == replicaID {
return replicaID, true
}
}
return "", false
}
return st.recoveryCommandReplicaID()
}
func (st *VolumeState) bootstrapReason() string {
switch {
case !st.Readiness.RoleApplied:

View File

@@ -88,6 +88,7 @@ func (e CheckpointAdvanced) VolumeID() string { return e.ID }
// CatchUpPlanned freezes the current replay target as bounded recovery truth.
type CatchUpPlanned struct {
ReplicaID string
ID string
TargetLSN uint64
}
@@ -97,6 +98,7 @@ func (e CatchUpPlanned) VolumeID() string { return e.ID }
// RecoveryProgressObserved updates achieved replay/rebuild progress without
// implying publication or durability closure by itself.
type RecoveryProgressObserved struct {
ReplicaID string
ID string
AchievedLSN uint64
}
@@ -106,6 +108,7 @@ func (e RecoveryProgressObserved) VolumeID() string { return e.ID }
// CatchUpCompleted closes a previously planned catch-up path at an explicit
// achieved boundary.
type CatchUpCompleted struct {
ReplicaID string
ID string
AchievedLSN uint64
}
@@ -114,8 +117,9 @@ func (e CatchUpCompleted) VolumeID() string { return e.ID }
// NeedsRebuildObserved is a fail-closed rebuild escalation.
type NeedsRebuildObserved struct {
ID string
Reason string
ReplicaID string
ID string
Reason string
}
func (e NeedsRebuildObserved) VolumeID() string { return e.ID }
@@ -123,6 +127,7 @@ func (e NeedsRebuildObserved) VolumeID() string { return e.ID }
// RebuildStarted moves the recovery truth from blocked-needs-rebuild into an
// explicit rebuilding path with a frozen target.
type RebuildStarted struct {
ReplicaID string
ID string
TargetLSN uint64
}
@@ -131,6 +136,7 @@ func (e RebuildStarted) VolumeID() string { return e.ID }
// RebuildCommitted clears the rebuild condition with bounded durable truth.
type RebuildCommitted struct {
ReplicaID string
ID string
AchievedLSN uint64
FlushedLSN uint64

View File

@@ -145,7 +145,7 @@ func TestPhase14_CommandSequence_CatchUpStartIsBounded(t *testing.T) {
core.ApplyEvent(ShipperConfiguredObserved{ID: "vol-cmd-catchup"})
core.ApplyEvent(ShipperConnectedObserved{ID: "vol-cmd-catchup"})
result := core.ApplyEvent(CatchUpPlanned{ID: "vol-cmd-catchup", TargetLSN: 55})
result := core.ApplyEvent(CatchUpPlanned{ID: "vol-cmd-catchup", ReplicaID: "replica-1", TargetLSN: 55})
assertCommandNames(t, result.Commands, []string{
"start_catchup",
"publish_projection",
@@ -176,7 +176,7 @@ func TestPhase14_CommandSequence_RebuildStartIsBounded(t *testing.T) {
})
core.ApplyEvent(NeedsRebuildObserved{ID: "vol-cmd-rebuild", Reason: "gap_too_large"})
result := core.ApplyEvent(RebuildStarted{ID: "vol-cmd-rebuild", TargetLSN: 80})
result := core.ApplyEvent(RebuildStarted{ID: "vol-cmd-rebuild", ReplicaID: "replica-1", TargetLSN: 80})
assertCommandNames(t, result.Commands, []string{
"start_rebuild",
"publish_projection",

View File

@@ -7,12 +7,12 @@ import engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
// IO bindings and receives completion notifications.
type RecoveryCallbacks interface {
// OnCatchUpCompleted is called after successful catch-up execution.
OnCatchUpCompleted(volumeID string, achievedLSN uint64)
OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64)
// OnRebuildCompleted is called after successful rebuild execution.
// The host should read the post-rebuild snapshot and emit the
// appropriate core event.
OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan)
OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan)
}
// ExecuteCatchUpPlan runs a catch-up plan using the supplied IO binding
@@ -22,6 +22,7 @@ func ExecuteCatchUpPlan(
plan *engine.RecoveryPlan,
io engine.CatchUpIO,
volumeID string,
replicaID string,
callbacks RecoveryCallbacks,
) error {
exec := engine.NewCatchUpExecutor(driver, plan)
@@ -34,7 +35,7 @@ func ExecuteCatchUpPlan(
if achievedLSN == 0 {
achievedLSN = plan.CatchUpStartLSN
}
callbacks.OnCatchUpCompleted(volumeID, achievedLSN)
callbacks.OnCatchUpCompleted(volumeID, replicaID, achievedLSN)
}
return nil
}
@@ -46,6 +47,7 @@ func ExecuteRebuildPlan(
plan *engine.RecoveryPlan,
io engine.RebuildIO,
volumeID string,
replicaID string,
callbacks RecoveryCallbacks,
) error {
exec := engine.NewRebuildExecutor(driver, plan)
@@ -54,7 +56,7 @@ func ExecuteRebuildPlan(
return err
}
if callbacks != nil {
callbacks.OnRebuildCompleted(volumeID, plan)
callbacks.OnRebuildCompleted(volumeID, replicaID, plan)
}
return nil
}

View File

@@ -7,24 +7,28 @@ import (
)
type fakeCallbacks struct {
catchUpCalled bool
catchUpVol string
catchUpLSN uint64
catchUpCalled bool
catchUpVol string
catchUpReplica string
catchUpLSN uint64
rebuildCalled bool
rebuildVol string
rebuildPlan *engine.RecoveryPlan
rebuildCalled bool
rebuildVol string
rebuildReplica string
rebuildPlan *engine.RecoveryPlan
}
func (f *fakeCallbacks) OnCatchUpCompleted(volumeID string, achievedLSN uint64) {
func (f *fakeCallbacks) OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64) {
f.catchUpCalled = true
f.catchUpVol = volumeID
f.catchUpReplica = replicaID
f.catchUpLSN = achievedLSN
}
func (f *fakeCallbacks) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) {
func (f *fakeCallbacks) OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan) {
f.rebuildCalled = true
f.rebuildVol = volumeID
f.rebuildReplica = replicaID
f.rebuildPlan = plan
}
@@ -50,7 +54,7 @@ func TestExecuteCatchUpPlan_CallsbackOnSuccess(t *testing.T) {
t.Fatal(err)
}
err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", cb)
err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", cb)
if err != nil {
t.Fatal(err)
}
@@ -60,6 +64,9 @@ func TestExecuteCatchUpPlan_CallsbackOnSuccess(t *testing.T) {
if cb.catchUpVol != "vol1" {
t.Fatalf("vol=%s", cb.catchUpVol)
}
if cb.catchUpReplica != "vol1/vs2" {
t.Fatalf("replica=%s", cb.catchUpReplica)
}
if cb.catchUpLSN != 100 {
t.Fatalf("achievedLSN=%d", cb.catchUpLSN)
}
@@ -74,7 +81,7 @@ func TestExecuteCatchUpPlan_AchievedLSNMatchesTarget(t *testing.T) {
}
// The plan's CatchUpTarget is derived from storage state.
// The callback should receive that same target as achievedLSN.
err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", cb)
err = ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", cb)
if err != nil {
t.Fatal(err)
}
@@ -100,7 +107,7 @@ func TestExecuteRebuildPlan_CallsbackOnSuccess(t *testing.T) {
t.Fatal(err)
}
err = ExecuteRebuildPlan(driver, plan, &noopRebuildIO{}, "vol2", cb)
err = ExecuteRebuildPlan(driver, plan, &noopRebuildIO{}, "vol2", "vol2/vs2", cb)
if err != nil {
t.Fatal(err)
}
@@ -110,6 +117,9 @@ func TestExecuteRebuildPlan_CallsbackOnSuccess(t *testing.T) {
if cb.rebuildVol != "vol2" {
t.Fatalf("vol=%s", cb.rebuildVol)
}
if cb.rebuildReplica != "vol2/vs2" {
t.Fatalf("replica=%s", cb.rebuildReplica)
}
if cb.rebuildPlan == nil {
t.Fatal("rebuild plan not passed to callback")
}
@@ -122,7 +132,7 @@ func TestExecuteCatchUpPlan_NilCallbacksSafe(t *testing.T) {
t.Fatal(err)
}
// nil callbacks should not panic.
if err := ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", nil); err != nil {
if err := ExecuteCatchUpPlan(driver, plan, &noopCatchUpIO{}, "vol1", "vol1/vs2", nil); err != nil {
t.Fatal(err)
}
}
@@ -132,12 +142,12 @@ func TestExecuteCatchUpPlan_NilCallbacksSafe(t *testing.T) {
type noopCatchUpIO struct{}
func (noopCatchUpIO) StreamWALEntries(start, end uint64) (uint64, error) { return end, nil }
func (noopCatchUpIO) TruncateWAL(lsn uint64) error { return nil }
func (noopCatchUpIO) TruncateWAL(lsn uint64) error { return nil }
type noopRebuildIO struct{}
func (noopRebuildIO) StreamWALEntries(start, end uint64) (uint64, error) { return end, nil }
func (noopRebuildIO) TruncateWAL(lsn uint64) error { return nil }
func (noopRebuildIO) TruncateWAL(lsn uint64) error { return nil }
func (noopRebuildIO) TransferSnapshot(lsn uint64) error { return nil }
func (noopRebuildIO) TransferFullBase(lsn uint64) (uint64, error) { return lsn, nil }

View File

@@ -13,7 +13,7 @@ type RebuildCompletionStatus struct {
// DeriveRebuildCommitted computes the RebuildCommitted event from
// post-rebuild status and the original plan. This is the reusable
// shaping logic — the host only needs to supply the raw snapshot values.
func DeriveRebuildCommitted(volumeID string, status RebuildCompletionStatus, plan *engine.RecoveryPlan) engine.RebuildCommitted {
func DeriveRebuildCommitted(volumeID, replicaID string, status RebuildCompletionStatus, plan *engine.RecoveryPlan) engine.RebuildCommitted {
flushedLSN := status.CommittedLSN
if flushedLSN == 0 {
flushedLSN = plan.RebuildTargetLSN
@@ -27,6 +27,7 @@ func DeriveRebuildCommitted(volumeID string, status RebuildCompletionStatus, pla
achievedLSN = checkpointLSN
}
return engine.RebuildCommitted{
ReplicaID: replicaID,
ID: volumeID,
AchievedLSN: achievedLSN,
FlushedLSN: flushedLSN,

View File

@@ -311,7 +311,7 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, ass
Plan: plan,
CatchUpIO: rctx.executor,
})
bs.applyCoreEvent(engine.CatchUpPlanned{ID: rctx.volPath, TargetLSN: plan.CatchUpTarget})
bs.applyCoreEvent(engine.CatchUpPlanned{ID: rctx.volPath, ReplicaID: replicaID, TargetLSN: plan.CatchUpTarget})
if rm.coord.Has(replicaID) {
rm.coord.Cancel(replicaID, "start_catchup_not_emitted")
return
@@ -321,7 +321,7 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, ass
if plan.Proof != nil && plan.Proof.Reason != "" {
reason = plan.Proof.Reason
}
bs.applyCoreEvent(engine.NeedsRebuildObserved{ID: rctx.volPath, Reason: reason})
bs.applyCoreEvent(engine.NeedsRebuildObserved{ID: rctx.volPath, ReplicaID: replicaID, Reason: reason})
return
}
@@ -365,7 +365,7 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, ass
if rm.OnPendingExecution != nil {
rm.OnPendingExecution(rctx.volPath, pe)
}
bs.applyCoreEvent(engine.RebuildStarted{ID: rctx.volPath, TargetLSN: plan.RebuildTargetLSN})
bs.applyCoreEvent(engine.RebuildStarted{ID: rctx.volPath, ReplicaID: replicaID, TargetLSN: plan.RebuildTargetLSN})
if rm.coord.Has(replicaID) {
rm.coord.Cancel(replicaID, "start_rebuild_not_emitted")
}
@@ -378,7 +378,7 @@ func (rm *RecoveryManager) ExecutePendingCatchUp(replicaID string, targetLSN uin
if pe == nil || pe.Driver == nil || pe.Plan == nil {
return nil
}
return rt.ExecuteCatchUpPlan(pe.Driver, pe.Plan, pe.CatchUpIO, pe.VolumeID, rm)
return rt.ExecuteCatchUpPlan(pe.Driver, pe.Plan, pe.CatchUpIO, pe.VolumeID, pe.ReplicaID, rm)
}
func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uint64) error {
@@ -386,25 +386,25 @@ func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uin
if pe == nil || pe.Driver == nil || pe.Plan == nil {
return nil
}
return rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, rm)
return rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, pe.ReplicaID, rm)
}
// RecoveryCallbacks implementation — host-side completion notifications.
func (rm *RecoveryManager) OnCatchUpCompleted(volumeID string, achievedLSN uint64) {
glog.V(0).Infof("recovery: catch-up completed for %s (achievedLSN=%d)", volumeID, achievedLSN)
func (rm *RecoveryManager) OnCatchUpCompleted(volumeID, replicaID string, achievedLSN uint64) {
glog.V(0).Infof("recovery: catch-up completed for %s via %s (achievedLSN=%d)", volumeID, replicaID, achievedLSN)
if rm.bs != nil && rm.bs.v2Core != nil {
rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, AchievedLSN: achievedLSN})
rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, ReplicaID: replicaID, AchievedLSN: achievedLSN})
}
}
func (rm *RecoveryManager) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) {
glog.V(0).Infof("recovery: rebuild completed for %s", volumeID)
func (rm *RecoveryManager) OnRebuildCompleted(volumeID, replicaID string, plan *engine.RecoveryPlan) {
glog.V(0).Infof("recovery: rebuild completed for %s via %s", volumeID, replicaID)
if rm.bs == nil || rm.bs.v2Core == nil {
return
}
status := rm.readRebuildStatus(volumeID)
ev := rt.DeriveRebuildCommitted(volumeID, status, plan)
ev := rt.DeriveRebuildCommitted(volumeID, replicaID, status, plan)
rm.bs.applyCoreEvent(ev)
}
@@ -430,7 +430,7 @@ func (rm *RecoveryManager) readRebuildStatus(volumeID string) rt.RebuildCompleti
// Core-present paths use ExecutePendingCatchUp/ExecutePendingRebuild instead.
func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) {
err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, rm)
err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, replicaID, rm)
if err != nil {
if ctx.Err() != nil {
glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err)
@@ -441,7 +441,7 @@ func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, r
}
func (rm *RecoveryManager) executeLegacyRebuild(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) {
err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, rm)
err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, replicaID, rm)
if err != nil {
if ctx.Err() != nil {
glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err)