From bc767eb9d2d13fe190bed71f03a67781d6c11e36 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Thu, 9 Apr 2026 15:25:26 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20rebuild=20correctness=20=E2=80=94=20sing?= =?UTF-8?q?le=20completion,=20fail-closed=20acks,=20diagnostic=20logging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three correctness fixes for the remote rebuild path: 1. No double completion: for remote rebuilds, OnRebuildCompleted skips RebuildCommitted since ObserveReplicaRebuildSessionAck already emitted SessionCompleted on the accepted ack. One rebuild = one completion event. 2. SessionAckFailed with rejected observation: if OnAck rejects the failed ack (stale session), don't use the sentinel errRebuildAckFailed. Return a regular error so ExecutePendingRebuild emits the fallback SessionFailed. No path leaves the engine session hanging. 3. Diagnostic logging in ExecutePendingRebuild: log the replicaID and targetLSN on both nil-return (TakeRebuild mismatch) and successful take paths. Also log the pending store in runRebuild with replicaID, targetLSN, and IO type. This makes the TakeRebuild seam diagnosable on hardware without rebuilding the engine package. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/block_rebuild_remote.go | 11 ++++++++--- weed/server/block_recovery.go | 30 ++++++++++++++++------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/weed/server/block_rebuild_remote.go b/weed/server/block_rebuild_remote.go index db5e80541..5792634eb 100644 --- a/weed/server/block_rebuild_remote.go +++ b/weed/server/block_rebuild_remote.go @@ -144,10 +144,15 @@ func (r *RemoteRebuildIO) TransferFullBase(committedLSN uint64) (uint64, error) return achieved, nil case blockvol.SessionAckFailed: - // Forward the ack for observation cleanup (pins, watchdog, engine - // SessionFailed). Then return the sentinel error so ExecutePendingRebuild - // knows NOT to emit a second SessionFailed. r.transitionOnFailure() + if ackErr != nil { + // Observation rejected the ack (stale session, etc.) — don't use + // sentinel. Return a regular error so ExecutePendingRebuild emits + // the fallback SessionFailed since observation didn't handle it. + return 0, fmt.Errorf("remote rebuild: session %d failed (observation rejected: %w)", r.SessionID, ackErr) + } + // Observation accepted the ack and already emitted SessionFailed. + // Use sentinel so ExecutePendingRebuild doesn't double-emit. return 0, fmt.Errorf("remote rebuild: session %d: %w", r.SessionID, errRebuildAckFailed) } } diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go index 349342200..af30c865c 100644 --- a/weed/server/block_recovery.go +++ b/weed/server/block_recovery.go @@ -582,6 +582,8 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, ass Plan: plan, RebuildIO: rebuildIO, } + glog.V(0).Infof("recovery: storing pending rebuild replicaID=%s targetLSN=%d IO=%T", + replicaID, plan.RebuildTargetLSN, rebuildIO) rm.coord.Store(replicaID, pe) if rm.OnPendingExecution != nil { rm.OnPendingExecution(rctx.volPath, pe) @@ -605,8 +607,12 @@ func (rm *RecoveryManager) ExecutePendingCatchUp(replicaID string, targetLSN uin func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uint64) error { pe := rm.coord.TakeRebuild(replicaID, targetLSN) if pe == nil || pe.Driver == nil || pe.Plan == nil { + glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — TakeRebuild returned nil (replicaID or targetLSN mismatch)", + replicaID, targetLSN) return nil } + glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — executing with IO=%T", + replicaID, targetLSN, pe.RebuildIO) err := rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, pe.ReplicaID, rm) if err != nil { glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err) @@ -664,23 +670,21 @@ func (rm *RecoveryManager) OnRebuildCompleted(volumeID, replicaID string, plan * if rm.bs == nil || rm.bs.v2Core == nil { return } - // For remote rebuilds, use the replica's achieved LSN (stored by onAck - // callback on SessionAckCompleted) instead of reading the primary's local - // vol — the primary's vol is the source, not the rebuilt destination. + // For remote rebuilds, the ack observation (ObserveReplicaRebuildSessionAck) + // already emitted SessionCompleted on SessionAckCompleted. Emitting + // RebuildCommitted here would cause a double completion in the engine. + // Consume the remote marker and skip. rm.mu.Lock() - remoteAchieved, isRemote := rm.remoteRebuildAchieved[replicaID] - delete(rm.remoteRebuildAchieved, replicaID) // consumed + _, isRemote := rm.remoteRebuildAchieved[replicaID] + delete(rm.remoteRebuildAchieved, replicaID) rm.mu.Unlock() - - var status rt.RebuildCompletionStatus if isRemote { - // Use replica's completion proof. CommittedLSN = CheckpointLSN = achievedLSN. - status.CommittedLSN = remoteAchieved - status.CheckpointLSN = remoteAchieved - } else { - // Legacy/local path: read from primary vol (backwards compat). - status = rm.readRebuildStatus(volumeID) + glog.V(0).Infof("recovery: remote rebuild %s — skipping RebuildCommitted (ack observation already emitted SessionCompleted)", replicaID) + return } + + // Legacy/local path: read from primary vol and emit RebuildCommitted. + status := rm.readRebuildStatus(volumeID) ev := rt.DeriveRebuildCommitted(volumeID, replicaID, status, plan) rm.bs.applyCoreEvent(ev) }