diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go index 39115287d..6672e241f 100644 --- a/weed/server/block_recovery.go +++ b/weed/server/block_recovery.go @@ -131,8 +131,6 @@ func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.Bl rm.mu.Lock() defer rm.mu.Unlock() - rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments) - ctx, cancel := context.WithCancel(context.Background()) task := &recoveryTask{ replicaID: replicaID, @@ -142,7 +140,7 @@ func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.Bl rm.tasks[replicaID] = task rm.wg.Add(1) - go rm.runRecovery(ctx, task, rebuildAddr) + go rm.runRecovery(ctx, task, assignments) } // Shutdown cancels all active recovery tasks and waits for drain. @@ -187,7 +185,7 @@ func (rm *RecoveryManager) DiagnosticSnapshot() RecoveryDiagnostic { } // runRecovery is the recovery goroutine for one replica target. -func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, rebuildAddr string) { +func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, assignments []blockvol.BlockVolumeAssignment) { defer rm.wg.Done() defer close(task.done) // signal drain completion defer func() { @@ -218,8 +216,7 @@ func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, return } - glog.V(0).Infof("recovery: starting %s session for %s (rebuildAddr=%s)", - sessSnap.Kind, replicaID, rebuildAddr) + glog.V(0).Infof("recovery: starting %s session for %s", sessSnap.Kind, replicaID) if rm.OnBeforeExecute != nil { rm.OnBeforeExecute(replicaID) @@ -227,32 +224,35 @@ func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, switch sessSnap.Kind { case engine.SessionCatchUp: - rm.runCatchUp(ctx, replicaID, rebuildAddr) + rm.runCatchUp(ctx, replicaID, assignments) case engine.SessionRebuild: - rm.runRebuild(ctx, replicaID, rebuildAddr) + rm.runRebuild(ctx, replicaID, assignments) default: glog.V(1).Infof("recovery: unknown session kind %s for %s", sessSnap.Kind, replicaID) } } -// recoveryContext holds the resolved context for one recovery execution. -// Built by resolveRecoveryContext from replicaID + rebuildAddr. +// recoveryContext holds the fully resolved context for one recovery execution. +// Built by resolveRecoveryContext from replicaID + assignments. type recoveryContext struct { volPath string + rebuildAddr string driver *engine.RecoveryDriver executor *v2bridge.Executor replicaFlushedLSN uint64 // catch-up start point (0 if no session) } -// resolveRecoveryContext resolves volume path, builds recovery bindings, -// and looks up the replica's flushed progress. This is the shared host-side -// context resolution for both catch-up and rebuild paths. -func (rm *RecoveryManager) resolveRecoveryContext(replicaID, rebuildAddr string) (*recoveryContext, error) { +// resolveRecoveryContext resolves everything needed for recovery execution: +// volume path, rebuild address, recovery bindings, and replica flushed progress. +// This is the single host-side context resolution for both catch-up and rebuild. +func (rm *RecoveryManager) resolveRecoveryContext(replicaID string, assignments []blockvol.BlockVolumeAssignment) (*recoveryContext, error) { volPath := rm.volumePathForReplica(replicaID) if volPath == "" { return nil, fmt.Errorf("cannot determine volume path for %s", replicaID) } + rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments) + var bundle *v2bridge.RecoveryBundle if err := rm.bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error { bundle = v2bridge.BuildRecoveryBundle(vol, rebuildAddr) @@ -272,16 +272,17 @@ func (rm *RecoveryManager) resolveRecoveryContext(replicaID, rebuildAddr string) return &recoveryContext{ volPath: volPath, + rebuildAddr: rebuildAddr, driver: driver, executor: bundle.Executor, replicaFlushedLSN: replicaFlushedLSN, }, nil } -func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAddr string) { +func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, assignments []blockvol.BlockVolumeAssignment) { bs := rm.bs - rctx, err := rm.resolveRecoveryContext(replicaID, rebuildAddr) + rctx, err := rm.resolveRecoveryContext(replicaID, assignments) if err != nil { glog.Warningf("recovery: %v", err) return @@ -330,10 +331,10 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAdd } } -func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAddr string) { +func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, assignments []blockvol.BlockVolumeAssignment) { bs := rm.bs - rctx, err := rm.resolveRecoveryContext(replicaID, rebuildAddr) + rctx, err := rm.resolveRecoveryContext(replicaID, assignments) if err != nil { glog.Warningf("recovery: %v", err) return diff --git a/weed/server/block_recovery_test.go b/weed/server/block_recovery_test.go index a9fc844cb..76e7bed14 100644 --- a/weed/server/block_recovery_test.go +++ b/weed/server/block_recovery_test.go @@ -220,7 +220,7 @@ func TestP16B_RunCatchUp_UpdatesCoreProjectionFromLiveRecovery(t *testing.T) { rm := NewRecoveryManager(bs) bs.v2Recovery = rm - rm.runCatchUp(context.Background(), replicaID, "") + rm.runCatchUp(context.Background(), replicaID, nil) proj, ok := bs.CoreProjection(volPath) if !ok { @@ -276,7 +276,7 @@ func TestP16B_RunCatchUp_EscalatesNeedsRebuildIntoCoreProjection(t *testing.T) { rm := NewRecoveryManager(bs) bs.v2Recovery = rm - rm.runCatchUp(context.Background(), replicaID, "") + rm.runCatchUp(context.Background(), replicaID, nil) proj, ok := bs.CoreProjection(volPath) if !ok { @@ -350,7 +350,7 @@ func TestP16B_RunRebuild_UsesCoreStartRebuildCommandOnLivePath(t *testing.T) { } _, _, rebuildPort := bs.ReplicationPorts(volPath) rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort) - rm.runRebuild(context.Background(), replicaID, rebuildAddr) + rm.runRebuild(context.Background(), replicaID, []blockvol.BlockVolumeAssignment{{Path: volPath, RebuildAddr: rebuildAddr}}) proj, ok := bs.CoreProjection(volPath) if !ok { @@ -416,7 +416,7 @@ func TestP16B_RunRebuild_FailClosedWithoutFreshStartRebuildCommand(t *testing.T) bs.v2Recovery = rm _, _, rebuildPort := bs.ReplicationPorts(volPath) rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort) - rm.runRebuild(context.Background(), replicaID, rebuildAddr) + rm.runRebuild(context.Background(), replicaID, []blockvol.BlockVolumeAssignment{{Path: volPath, RebuildAddr: rebuildAddr}}) after := bs.ExecutedCoreCommands(volPath) if !reflect.DeepEqual(after, before) {