fix: rebuild correctness — single completion, fail-closed acks, diagnostic logging

Three correctness fixes for the remote rebuild path:

1. No double completion: for remote rebuilds, OnRebuildCompleted skips
   RebuildCommitted since ObserveReplicaRebuildSessionAck already emitted
   SessionCompleted on the accepted ack. One rebuild = one completion event.

2. SessionAckFailed with rejected observation: if OnAck rejects the failed
   ack (stale session), don't use the sentinel errRebuildAckFailed. Return
   a regular error so ExecutePendingRebuild emits the fallback SessionFailed.
   No path leaves the engine session hanging.

3. Diagnostic logging in ExecutePendingRebuild: log the replicaID and
   targetLSN on both nil-return (TakeRebuild mismatch) and successful take
   paths. Also log the pending store in runRebuild with replicaID, targetLSN,
   and IO type. This makes the TakeRebuild seam diagnosable on hardware
   without rebuilding the engine package.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
pingqiu
2026-04-09 15:25:26 -07:00
parent df69c83f41
commit bc767eb9d2
2 changed files with 25 additions and 16 deletions

View File

@@ -144,10 +144,15 @@ func (r *RemoteRebuildIO) TransferFullBase(committedLSN uint64) (uint64, error)
return achieved, nil
case blockvol.SessionAckFailed:
// Forward the ack for observation cleanup (pins, watchdog, engine
// SessionFailed). Then return the sentinel error so ExecutePendingRebuild
// knows NOT to emit a second SessionFailed.
r.transitionOnFailure()
if ackErr != nil {
// Observation rejected the ack (stale session, etc.) — don't use
// sentinel. Return a regular error so ExecutePendingRebuild emits
// the fallback SessionFailed since observation didn't handle it.
return 0, fmt.Errorf("remote rebuild: session %d failed (observation rejected: %w)", r.SessionID, ackErr)
}
// Observation accepted the ack and already emitted SessionFailed.
// Use sentinel so ExecutePendingRebuild doesn't double-emit.
return 0, fmt.Errorf("remote rebuild: session %d: %w", r.SessionID, errRebuildAckFailed)
}
}

View File

@@ -582,6 +582,8 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, ass
Plan: plan,
RebuildIO: rebuildIO,
}
glog.V(0).Infof("recovery: storing pending rebuild replicaID=%s targetLSN=%d IO=%T",
replicaID, plan.RebuildTargetLSN, rebuildIO)
rm.coord.Store(replicaID, pe)
if rm.OnPendingExecution != nil {
rm.OnPendingExecution(rctx.volPath, pe)
@@ -605,8 +607,12 @@ func (rm *RecoveryManager) ExecutePendingCatchUp(replicaID string, targetLSN uin
func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uint64) error {
pe := rm.coord.TakeRebuild(replicaID, targetLSN)
if pe == nil || pe.Driver == nil || pe.Plan == nil {
glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — TakeRebuild returned nil (replicaID or targetLSN mismatch)",
replicaID, targetLSN)
return nil
}
glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — executing with IO=%T",
replicaID, targetLSN, pe.RebuildIO)
err := rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, pe.ReplicaID, rm)
if err != nil {
glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err)
@@ -664,23 +670,21 @@ func (rm *RecoveryManager) OnRebuildCompleted(volumeID, replicaID string, plan *
if rm.bs == nil || rm.bs.v2Core == nil {
return
}
// For remote rebuilds, use the replica's achieved LSN (stored by onAck
// callback on SessionAckCompleted) instead of reading the primary's local
// vol — the primary's vol is the source, not the rebuilt destination.
// For remote rebuilds, the ack observation (ObserveReplicaRebuildSessionAck)
// already emitted SessionCompleted on SessionAckCompleted. Emitting
// RebuildCommitted here would cause a double completion in the engine.
// Consume the remote marker and skip.
rm.mu.Lock()
remoteAchieved, isRemote := rm.remoteRebuildAchieved[replicaID]
delete(rm.remoteRebuildAchieved, replicaID) // consumed
_, isRemote := rm.remoteRebuildAchieved[replicaID]
delete(rm.remoteRebuildAchieved, replicaID)
rm.mu.Unlock()
var status rt.RebuildCompletionStatus
if isRemote {
// Use replica's completion proof. CommittedLSN = CheckpointLSN = achievedLSN.
status.CommittedLSN = remoteAchieved
status.CheckpointLSN = remoteAchieved
} else {
// Legacy/local path: read from primary vol (backwards compat).
status = rm.readRebuildStatus(volumeID)
glog.V(0).Infof("recovery: remote rebuild %s — skipping RebuildCommitted (ack observation already emitted SessionCompleted)", replicaID)
return
}
// Legacy/local path: read from primary vol and emit RebuildCommitted.
status := rm.readRebuildStatus(volumeID)
ev := rt.DeriveRebuildCommitted(volumeID, replicaID, status, plan)
rm.bs.applyCoreEvent(ev)
}