fix: Batch 6 completion — rebuildAddr folded into resolveRecoveryContext

resolveRecoveryContext now also derives rebuildAddr from assignments,
so the full host-side recovery context is resolved in one call:
- volPath (from replicaID)
- rebuildAddr (from assignments via deriveRebuildAddr)
- recovery bindings (driver + executor via BuildRecoveryBundle)
- replicaFlushedLSN (from sender session)

startTask/runRecovery/runCatchUp/runRebuild now pass assignments
instead of rebuildAddr. No separate rebuildAddr resolution remains
outside the resolver.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
pingqiu
2026-04-04 01:52:35 -07:00
parent a48da0f674
commit 41082bf92c
2 changed files with 23 additions and 22 deletions

View File

@@ -131,8 +131,6 @@ func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.Bl
rm.mu.Lock()
defer rm.mu.Unlock()
rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments)
ctx, cancel := context.WithCancel(context.Background())
task := &recoveryTask{
replicaID: replicaID,
@@ -142,7 +140,7 @@ func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.Bl
rm.tasks[replicaID] = task
rm.wg.Add(1)
go rm.runRecovery(ctx, task, rebuildAddr)
go rm.runRecovery(ctx, task, assignments)
}
// Shutdown cancels all active recovery tasks and waits for drain.
@@ -187,7 +185,7 @@ func (rm *RecoveryManager) DiagnosticSnapshot() RecoveryDiagnostic {
}
// runRecovery is the recovery goroutine for one replica target.
func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, rebuildAddr string) {
func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, assignments []blockvol.BlockVolumeAssignment) {
defer rm.wg.Done()
defer close(task.done) // signal drain completion
defer func() {
@@ -218,8 +216,7 @@ func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask,
return
}
glog.V(0).Infof("recovery: starting %s session for %s (rebuildAddr=%s)",
sessSnap.Kind, replicaID, rebuildAddr)
glog.V(0).Infof("recovery: starting %s session for %s", sessSnap.Kind, replicaID)
if rm.OnBeforeExecute != nil {
rm.OnBeforeExecute(replicaID)
@@ -227,32 +224,35 @@ func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask,
switch sessSnap.Kind {
case engine.SessionCatchUp:
rm.runCatchUp(ctx, replicaID, rebuildAddr)
rm.runCatchUp(ctx, replicaID, assignments)
case engine.SessionRebuild:
rm.runRebuild(ctx, replicaID, rebuildAddr)
rm.runRebuild(ctx, replicaID, assignments)
default:
glog.V(1).Infof("recovery: unknown session kind %s for %s", sessSnap.Kind, replicaID)
}
}
// recoveryContext holds the resolved context for one recovery execution.
// Built by resolveRecoveryContext from replicaID + rebuildAddr.
// recoveryContext holds the fully resolved context for one recovery execution.
// Built by resolveRecoveryContext from replicaID + assignments.
type recoveryContext struct {
volPath string
rebuildAddr string
driver *engine.RecoveryDriver
executor *v2bridge.Executor
replicaFlushedLSN uint64 // catch-up start point (0 if no session)
}
// resolveRecoveryContext resolves volume path, builds recovery bindings,
// and looks up the replica's flushed progress. This is the shared host-side
// context resolution for both catch-up and rebuild paths.
func (rm *RecoveryManager) resolveRecoveryContext(replicaID, rebuildAddr string) (*recoveryContext, error) {
// resolveRecoveryContext resolves everything needed for recovery execution:
// volume path, rebuild address, recovery bindings, and replica flushed progress.
// This is the single host-side context resolution for both catch-up and rebuild.
func (rm *RecoveryManager) resolveRecoveryContext(replicaID string, assignments []blockvol.BlockVolumeAssignment) (*recoveryContext, error) {
volPath := rm.volumePathForReplica(replicaID)
if volPath == "" {
return nil, fmt.Errorf("cannot determine volume path for %s", replicaID)
}
rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments)
var bundle *v2bridge.RecoveryBundle
if err := rm.bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error {
bundle = v2bridge.BuildRecoveryBundle(vol, rebuildAddr)
@@ -272,16 +272,17 @@ func (rm *RecoveryManager) resolveRecoveryContext(replicaID, rebuildAddr string)
return &recoveryContext{
volPath: volPath,
rebuildAddr: rebuildAddr,
driver: driver,
executor: bundle.Executor,
replicaFlushedLSN: replicaFlushedLSN,
}, nil
}
func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAddr string) {
func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID string, assignments []blockvol.BlockVolumeAssignment) {
bs := rm.bs
rctx, err := rm.resolveRecoveryContext(replicaID, rebuildAddr)
rctx, err := rm.resolveRecoveryContext(replicaID, assignments)
if err != nil {
glog.Warningf("recovery: %v", err)
return
@@ -330,10 +331,10 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAdd
}
}
func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAddr string) {
func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID string, assignments []blockvol.BlockVolumeAssignment) {
bs := rm.bs
rctx, err := rm.resolveRecoveryContext(replicaID, rebuildAddr)
rctx, err := rm.resolveRecoveryContext(replicaID, assignments)
if err != nil {
glog.Warningf("recovery: %v", err)
return

View File

@@ -220,7 +220,7 @@ func TestP16B_RunCatchUp_UpdatesCoreProjectionFromLiveRecovery(t *testing.T) {
rm := NewRecoveryManager(bs)
bs.v2Recovery = rm
rm.runCatchUp(context.Background(), replicaID, "")
rm.runCatchUp(context.Background(), replicaID, nil)
proj, ok := bs.CoreProjection(volPath)
if !ok {
@@ -276,7 +276,7 @@ func TestP16B_RunCatchUp_EscalatesNeedsRebuildIntoCoreProjection(t *testing.T) {
rm := NewRecoveryManager(bs)
bs.v2Recovery = rm
rm.runCatchUp(context.Background(), replicaID, "")
rm.runCatchUp(context.Background(), replicaID, nil)
proj, ok := bs.CoreProjection(volPath)
if !ok {
@@ -350,7 +350,7 @@ func TestP16B_RunRebuild_UsesCoreStartRebuildCommandOnLivePath(t *testing.T) {
}
_, _, rebuildPort := bs.ReplicationPorts(volPath)
rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort)
rm.runRebuild(context.Background(), replicaID, rebuildAddr)
rm.runRebuild(context.Background(), replicaID, []blockvol.BlockVolumeAssignment{{Path: volPath, RebuildAddr: rebuildAddr}})
proj, ok := bs.CoreProjection(volPath)
if !ok {
@@ -416,7 +416,7 @@ func TestP16B_RunRebuild_FailClosedWithoutFreshStartRebuildCommand(t *testing.T)
bs.v2Recovery = rm
_, _, rebuildPort := bs.ReplicationPorts(volPath)
rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort)
rm.runRebuild(context.Background(), replicaID, rebuildAddr)
rm.runRebuild(context.Background(), replicaID, []blockvol.BlockVolumeAssignment{{Path: volPath, RebuildAddr: rebuildAddr}})
after := bs.ExecutedCoreCommands(volPath)
if !reflect.DeepEqual(after, before) {