diff --git a/weed/server/block_rebuild_remote.go b/weed/server/block_rebuild_remote.go index db5e80541..5792634eb 100644 --- a/weed/server/block_rebuild_remote.go +++ b/weed/server/block_rebuild_remote.go @@ -144,10 +144,15 @@ func (r *RemoteRebuildIO) TransferFullBase(committedLSN uint64) (uint64, error) return achieved, nil case blockvol.SessionAckFailed: - // Forward the ack for observation cleanup (pins, watchdog, engine - // SessionFailed). Then return the sentinel error so ExecutePendingRebuild - // knows NOT to emit a second SessionFailed. r.transitionOnFailure() + if ackErr != nil { + // Observation rejected the ack (stale session, etc.) — don't use + // sentinel. Return a regular error so ExecutePendingRebuild emits + // the fallback SessionFailed since observation didn't handle it. + return 0, fmt.Errorf("remote rebuild: session %d failed (observation rejected: %w)", r.SessionID, ackErr) + } + // Observation accepted the ack and already emitted SessionFailed. + // Use sentinel so ExecutePendingRebuild doesn't double-emit. return 0, fmt.Errorf("remote rebuild: session %d: %w", r.SessionID, errRebuildAckFailed) } } diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go index 349342200..af30c865c 100644 --- a/weed/server/block_recovery.go +++ b/weed/server/block_recovery.go @@ -582,6 +582,8 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, ass Plan: plan, RebuildIO: rebuildIO, } + glog.V(0).Infof("recovery: storing pending rebuild replicaID=%s targetLSN=%d IO=%T", + replicaID, plan.RebuildTargetLSN, rebuildIO) rm.coord.Store(replicaID, pe) if rm.OnPendingExecution != nil { rm.OnPendingExecution(rctx.volPath, pe) @@ -605,8 +607,12 @@ func (rm *RecoveryManager) ExecutePendingCatchUp(replicaID string, targetLSN uin func (rm *RecoveryManager) ExecutePendingRebuild(replicaID string, targetLSN uint64) error { pe := rm.coord.TakeRebuild(replicaID, targetLSN) if pe == nil || pe.Driver == nil || pe.Plan == nil { + glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — TakeRebuild returned nil (replicaID or targetLSN mismatch)", + replicaID, targetLSN) return nil } + glog.V(0).Infof("recovery: ExecutePendingRebuild(%s, target=%d) — executing with IO=%T", + replicaID, targetLSN, pe.RebuildIO) err := rt.ExecuteRebuildPlan(pe.Driver, pe.Plan, pe.RebuildIO, pe.VolumeID, pe.ReplicaID, rm) if err != nil { glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err) @@ -664,23 +670,21 @@ func (rm *RecoveryManager) OnRebuildCompleted(volumeID, replicaID string, plan * if rm.bs == nil || rm.bs.v2Core == nil { return } - // For remote rebuilds, use the replica's achieved LSN (stored by onAck - // callback on SessionAckCompleted) instead of reading the primary's local - // vol — the primary's vol is the source, not the rebuilt destination. + // For remote rebuilds, the ack observation (ObserveReplicaRebuildSessionAck) + // already emitted SessionCompleted on SessionAckCompleted. Emitting + // RebuildCommitted here would cause a double completion in the engine. + // Consume the remote marker and skip. rm.mu.Lock() - remoteAchieved, isRemote := rm.remoteRebuildAchieved[replicaID] - delete(rm.remoteRebuildAchieved, replicaID) // consumed + _, isRemote := rm.remoteRebuildAchieved[replicaID] + delete(rm.remoteRebuildAchieved, replicaID) rm.mu.Unlock() - - var status rt.RebuildCompletionStatus if isRemote { - // Use replica's completion proof. CommittedLSN = CheckpointLSN = achievedLSN. - status.CommittedLSN = remoteAchieved - status.CheckpointLSN = remoteAchieved - } else { - // Legacy/local path: read from primary vol (backwards compat). - status = rm.readRebuildStatus(volumeID) + glog.V(0).Infof("recovery: remote rebuild %s — skipping RebuildCommitted (ack observation already emitted SessionCompleted)", replicaID) + return } + + // Legacy/local path: read from primary vol and emit RebuildCommitted. + status := rm.readRebuildStatus(volumeID) ev := rt.DeriveRebuildCommitted(volumeID, replicaID, status, plan) rm.bs.applyCoreEvent(ev) }