diff --git a/sw-block/engine/replication/runtime/pending.go b/sw-block/engine/replication/runtime/pending.go index 000545043..fa19f671b 100644 --- a/sw-block/engine/replication/runtime/pending.go +++ b/sw-block/engine/replication/runtime/pending.go @@ -137,3 +137,20 @@ func (pc *PendingCoordinator) Peek(volumeID string) *PendingExecution { defer pc.mu.Unlock() return pc.pending[volumeID] } + +// CancelAll cancels and removes all pending executions. +func (pc *PendingCoordinator) CancelAll(reason string) { + pc.mu.Lock() + all := make(map[string]*PendingExecution, len(pc.pending)) + for k, v := range pc.pending { + all[k] = v + } + pc.pending = make(map[string]*PendingExecution) + pc.mu.Unlock() + + if pc.cancelFn != nil { + for _, pe := range all { + pc.cancelFn(pe, reason) + } + } +} diff --git a/weed/server/block_recovery.go b/weed/server/block_recovery.go index 6c9fe36e0..4cbab4d60 100644 --- a/weed/server/block_recovery.go +++ b/weed/server/block_recovery.go @@ -6,6 +6,7 @@ import ( bridge "github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol" engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge" @@ -31,10 +32,10 @@ type recoveryTask struct { type RecoveryManager struct { bs *BlockService - mu sync.Mutex - tasks map[string]*recoveryTask - pending map[string]*pendingRecoveryExecution - wg sync.WaitGroup + mu sync.Mutex + tasks map[string]*recoveryTask + coord *rt.PendingCoordinator + wg sync.WaitGroup // TestHook: if set, called before execution starts. Tests use this // to hold the goroutine alive for serialized-replacement proofs. @@ -42,24 +43,24 @@ type RecoveryManager struct { // TestHook: if set, may adjust a freshly cached pending execution before // the core event is emitted. Used only by focused ownership tests. - OnPendingExecution func(volumeID string, pending *pendingRecoveryExecution) -} - -type pendingRecoveryExecution struct { - volumeID string - replicaID string - driver *engine.RecoveryDriver - plan *engine.RecoveryPlan - catchUpIO engine.CatchUpIO - rebuildIO engine.RebuildIO + OnPendingExecution func(volumeID string, pending *rt.PendingExecution) } func NewRecoveryManager(bs *BlockService) *RecoveryManager { - return &RecoveryManager{ - bs: bs, - tasks: make(map[string]*recoveryTask), - pending: make(map[string]*pendingRecoveryExecution), + rm := &RecoveryManager{ + bs: bs, + tasks: make(map[string]*recoveryTask), } + rm.coord = rt.NewPendingCoordinator(func(pe *rt.PendingExecution, reason string) { + if pe != nil && pe.Driver != nil && pe.Plan != nil { + if drv, ok := pe.Driver.(*engine.RecoveryDriver); ok { + if plan, ok := pe.Plan.(*engine.RecoveryPlan); ok { + drv.CancelPlan(plan, reason) + } + } + } + }) + return rm } // === LEGACY NO-CORE COMPATIBILITY === @@ -160,13 +161,8 @@ func (rm *RecoveryManager) Shutdown() { } } rm.tasks = make(map[string]*recoveryTask) - for volumeID, pending := range rm.pending { - if pending != nil && pending.driver != nil && pending.plan != nil { - pending.driver.CancelPlan(pending.plan, "recovery_shutdown") - } - delete(rm.pending, volumeID) - } rm.mu.Unlock() + rm.coord.CancelAll("recovery_shutdown") rm.wg.Wait() } @@ -288,25 +284,20 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAdd switch plan.Outcome { case engine.OutcomeCatchUp: if bs.v2Core == nil { - if err := rm.executeCatchUpPlan(volPath, replicaID, driver, plan, executor); err != nil { - if ctx.Err() != nil { - glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err) - } else { - glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, err) - } - } + rm.executeLegacyCatchUp(ctx, volPath, replicaID, driver, plan, executor) return } - rm.storePendingExecution(volPath, &pendingRecoveryExecution{ - volumeID: volPath, - replicaID: replicaID, - driver: driver, - plan: plan, - catchUpIO: executor, + rm.coord.Store(volPath, &rt.PendingExecution{ + VolumeID: volPath, + ReplicaID: replicaID, + CatchUpTarget: plan.CatchUpTarget, + Driver: driver, + Plan: plan, + CatchUpIO: executor, }) bs.applyCoreEvent(engine.CatchUpPlanned{ID: volPath, TargetLSN: plan.CatchUpTarget}) - if rm.hasPendingExecution(volPath) { - rm.cancelPendingExecution(volPath, "start_catchup_not_emitted") + if rm.coord.Has(volPath) { + rm.coord.Cancel(volPath, "start_catchup_not_emitted") return } case engine.OutcomeNeedsRebuild: @@ -361,124 +352,70 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAdd return } if bs.v2Core == nil { - if err := rm.executeRebuildPlan(volPath, replicaID, driver, plan, executor); err != nil { - if ctx.Err() != nil { - glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err) - } else { - glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err) - } - } + rm.executeLegacyRebuild(ctx, volPath, replicaID, driver, plan, executor) return } - rm.storePendingExecution(volPath, &pendingRecoveryExecution{ - volumeID: volPath, - replicaID: replicaID, - driver: driver, - plan: plan, - rebuildIO: executor, - }) + pe := &rt.PendingExecution{ + VolumeID: volPath, + ReplicaID: replicaID, + RebuildTargetLSN: plan.RebuildTargetLSN, + Driver: driver, + Plan: plan, + RebuildIO: executor, + } + rm.coord.Store(volPath, pe) if rm.OnPendingExecution != nil { - if pending, ok := rm.peekPendingExecution(volPath); ok { - rm.OnPendingExecution(volPath, pending) - } + rm.OnPendingExecution(volPath, pe) } bs.applyCoreEvent(engine.RebuildStarted{ID: volPath, TargetLSN: plan.RebuildTargetLSN}) - if rm.hasPendingExecution(volPath) { - rm.cancelPendingExecution(volPath, "start_rebuild_not_emitted") + if rm.coord.Has(volPath) { + rm.coord.Cancel(volPath, "start_rebuild_not_emitted") } } -func (rm *RecoveryManager) storePendingExecution(volumeID string, pending *pendingRecoveryExecution) { - rm.mu.Lock() - defer rm.mu.Unlock() - if rm.pending == nil { - rm.pending = make(map[string]*pendingRecoveryExecution) - } - rm.pending[volumeID] = pending -} - -func (rm *RecoveryManager) takePendingExecution(volumeID string) (*pendingRecoveryExecution, bool) { - rm.mu.Lock() - defer rm.mu.Unlock() - pending, ok := rm.pending[volumeID] - if ok { - delete(rm.pending, volumeID) - } - return pending, ok -} - -func (rm *RecoveryManager) peekPendingExecution(volumeID string) (*pendingRecoveryExecution, bool) { - rm.mu.Lock() - defer rm.mu.Unlock() - pending, ok := rm.pending[volumeID] - return pending, ok -} - -func (rm *RecoveryManager) hasPendingExecution(volumeID string) bool { - rm.mu.Lock() - defer rm.mu.Unlock() - _, ok := rm.pending[volumeID] - return ok -} - -func (rm *RecoveryManager) cancelPendingExecution(volumeID, reason string) { - pending, ok := rm.takePendingExecution(volumeID) - if !ok || pending == nil || pending.driver == nil || pending.plan == nil { - return - } - pending.driver.CancelPlan(pending.plan, reason) -} +// === Core-present pending execution (delegates to runtime.PendingCoordinator) === func (rm *RecoveryManager) ExecutePendingCatchUp(volumeID string, targetLSN uint64) error { - pending, ok := rm.takePendingExecution(volumeID) - if !ok || pending == nil || pending.plan == nil || pending.driver == nil { + pe := rm.coord.TakeCatchUp(volumeID, targetLSN) + if pe == nil { return nil } - if pending.plan.CatchUpTarget != targetLSN { - pending.driver.CancelPlan(pending.plan, "start_catchup_target_mismatch") + drv, _ := pe.Driver.(*engine.RecoveryDriver) + plan, _ := pe.Plan.(*engine.RecoveryPlan) + io, _ := pe.CatchUpIO.(engine.CatchUpIO) + if drv == nil || plan == nil { return nil } - return rm.executeCatchUpPlan(volumeID, pending.replicaID, pending.driver, pending.plan, pending.catchUpIO) + return rt.ExecuteCatchUpPlan(drv, plan, io, volumeID, rm) } func (rm *RecoveryManager) ExecutePendingRebuild(volumeID string, targetLSN uint64) error { - pending, ok := rm.takePendingExecution(volumeID) - if !ok || pending == nil || pending.plan == nil || pending.driver == nil { + pe := rm.coord.TakeRebuild(volumeID, targetLSN) + if pe == nil { return nil } - if pending.plan.RebuildTargetLSN != targetLSN { - pending.driver.CancelPlan(pending.plan, "start_rebuild_target_mismatch") + drv, _ := pe.Driver.(*engine.RecoveryDriver) + plan, _ := pe.Plan.(*engine.RecoveryPlan) + io, _ := pe.RebuildIO.(engine.RebuildIO) + if drv == nil || plan == nil { return nil } - return rm.executeRebuildPlan(volumeID, pending.replicaID, pending.driver, pending.plan, pending.rebuildIO) + return rt.ExecuteRebuildPlan(drv, plan, io, volumeID, rm) } -func (rm *RecoveryManager) executeCatchUpPlan(volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) error { - exec := engine.NewCatchUpExecutor(driver, plan) - exec.IO = io - if err := exec.Execute(nil, 0); err != nil { - return err - } - glog.V(0).Infof("recovery: catch-up completed for %s", replicaID) +// RecoveryCallbacks implementation — host-side completion notifications. + +func (rm *RecoveryManager) OnCatchUpCompleted(volumeID string, achievedLSN uint64) { + glog.V(0).Infof("recovery: catch-up completed for %s (achievedLSN=%d)", volumeID, achievedLSN) if rm.bs != nil && rm.bs.v2Core != nil { - achievedLSN := plan.CatchUpTarget - if achievedLSN == 0 { - achievedLSN = plan.CatchUpStartLSN - } rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, AchievedLSN: achievedLSN}) } - return nil } -func (rm *RecoveryManager) executeRebuildPlan(volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) error { - exec := engine.NewRebuildExecutor(driver, plan) - exec.IO = io - if err := exec.Execute(); err != nil { - return err - } - glog.V(0).Infof("recovery: rebuild completed for %s", replicaID) +func (rm *RecoveryManager) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) { + glog.V(0).Infof("recovery: rebuild completed for %s", volumeID) if rm.bs == nil || rm.bs.v2Core == nil { - return nil + return } var snap blockvol.V2StatusSnapshot if err := rm.bs.blockStore.WithVolume(volumeID, func(vol *blockvol.BlockVol) error { @@ -505,7 +442,34 @@ func (rm *RecoveryManager) executeRebuildPlan(volumeID, replicaID string, driver FlushedLSN: flushedLSN, CheckpointLSN: checkpointLSN, }) - return nil +} + +// === LEGACY NO-CORE COMPATIBILITY === +// +// These methods execute recovery plans directly without going through the +// core command path. They exist only for no-core compatibility and older tests. +// Core-present paths use ExecutePendingCatchUp/ExecutePendingRebuild instead. + +func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) { + err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, rm) + if err != nil { + if ctx.Err() != nil { + glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err) + } else { + glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, err) + } + } +} + +func (rm *RecoveryManager) executeLegacyRebuild(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) { + err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, rm) + if err != nil { + if ctx.Err() != nil { + glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err) + } else { + glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err) + } + } } func (rm *RecoveryManager) deriveRebuildAddr(replicaID string, assignments []blockvol.BlockVolumeAssignment) string { diff --git a/weed/server/block_recovery_test.go b/weed/server/block_recovery_test.go index 6d5d71ea4..9ea166133 100644 --- a/weed/server/block_recovery_test.go +++ b/weed/server/block_recovery_test.go @@ -9,6 +9,7 @@ import ( "time" engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge" @@ -341,11 +342,13 @@ func TestP16B_RunRebuild_UsesCoreStartRebuildCommandOnLivePath(t *testing.T) { rm := NewRecoveryManager(bs) bs.v2Recovery = rm - rm.OnPendingExecution = func(volumeID string, pending *pendingRecoveryExecution) { - if volumeID != volPath || pending == nil || pending.plan == nil { + rm.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) { + if volumeID != volPath || pending == nil || pending.Plan == nil { return } - pending.rebuildIO = fakeRebuildIO{achievedLSN: pending.plan.RebuildTargetLSN} + if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok { + pending.RebuildIO = fakeRebuildIO{achievedLSN: plan.RebuildTargetLSN} + } } _, _, rebuildPort := bs.ReplicationPorts(volPath) rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort) diff --git a/weed/server/volume_server_block_test.go b/weed/server/volume_server_block_test.go index 6fca44fde..56a16117d 100644 --- a/weed/server/volume_server_block_test.go +++ b/weed/server/volume_server_block_test.go @@ -4,8 +4,10 @@ import ( "path/filepath" "reflect" "testing" + "time" engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" + rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" ) @@ -93,6 +95,21 @@ func createTestVolDirect(t *testing.T, bs *BlockService, name string) string { return path } +type fakeCatchUpIO struct { + transferredTo uint64 +} + +func (f fakeCatchUpIO) StreamWALEntries(startExclusive, endInclusive uint64) (uint64, error) { + if f.transferredTo > 0 { + return f.transferredTo, nil + } + return endInclusive, nil +} + +func (f fakeCatchUpIO) TruncateWAL(truncateLSN uint64) error { + return nil +} + func TestBlockService_ProcessAssignment_Primary(t *testing.T) { bs := newTestBlockServiceDirect(t) path := createTestVolDirect(t, bs, "vol1") @@ -405,6 +422,210 @@ func TestBlockService_ApplyAssignments_ExecutesCoreCommands_ReplicaRoleAndReceiv } } +func TestBlockService_ApplyAssignments_PrimaryRole_UsesCoreStartRecoveryTaskForCatchUp(t *testing.T) { + bs := newTestBlockServiceDirect(t) + bs.v2Bridge = newTestControlBridge() + bs.v2Orchestrator = newTestOrchestrator() + bs.v2Recovery = NewRecoveryManager(bs) + defer bs.v2Recovery.Shutdown() + + path := createTestVolDirect(t, bs, "vol-core-cmd-catchup-start") + if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + for i := 0; i < 5; i++ { + if err := vol.WriteLBA(uint64(i), make([]byte, 4096)); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("write: %v", err) + } + + bs.v2Recovery.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) { + if volumeID == path && pending != nil && pending.Plan != nil { + if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok { + pending.CatchUpIO = fakeCatchUpIO{transferredTo: plan.CatchUpTarget} + } + } + } + + errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 1, + Role: blockvol.RoleToWire(blockvol.RolePrimary), + LeaseTtlMs: 30000, + ReplicaServerID: "vs-2", + ReplicaDataAddr: "10.0.0.2:4260", + ReplicaCtrlAddr: "10.0.0.2:4261", + }}) + if len(errs) != 1 || errs[0] != nil { + t.Fatalf("apply errs=%v", errs) + } + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + proj, ok := bs.CoreProjection(path) + sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-2") + if ok && proj.Recovery.Phase == engine.RecoveryIdle && sender != nil && sender.State() == engine.StateInSync { + break + } + time.Sleep(10 * time.Millisecond) + } + + cmds := bs.ExecutedCoreCommands(path) + if !reflect.DeepEqual(cmds, []string{"apply_role", "configure_shipper", "start_recovery_task", "start_catchup"}) { + t.Fatalf("expected primary assignment to execute apply_role + configure_shipper + start_recovery_task + start_catchup, got %v", cmds) + } + proj, ok := bs.CoreProjection(path) + if !ok { + t.Fatal("expected core projection") + } + if proj.Recovery.Phase != engine.RecoveryIdle { + t.Fatalf("recovery_phase=%s", proj.Recovery.Phase) + } + if proj.Boundary.DurableLSN == 0 { + t.Fatalf("durable_lsn=%d", proj.Boundary.DurableLSN) + } + sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-2") + if sender == nil { + t.Fatal("sender not found") + } + if sender.State() != engine.StateInSync { + t.Fatalf("sender state=%s", sender.State()) + } +} + +func TestBlockService_ApplyAssignments_RebuildingRole_UsesCoreRecoveryPathWithoutLegacyDirectStart(t *testing.T) { + bs := newTestBlockServiceDirect(t) + bs.v2Bridge = newTestControlBridge() + bs.v2Orchestrator = newTestOrchestrator() + bs.v2Recovery = NewRecoveryManager(bs) + defer bs.v2Recovery.Shutdown() + + path := createTestVolDirect(t, bs, "vol-core-cmd-rebuild-assignment") + if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + if err := vol.WriteLBA(0, make([]byte, 4096)); err != nil { + return err + } + return vol.ForceFlush() + }); err != nil { + t.Fatalf("write+flush: %v", err) + } + + legacyCalls := 0 + bs.onLegacyStartRebuild = func(path, rebuildAddr string, epoch uint64) { + legacyCalls++ + } + bs.v2Recovery.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) { + if volumeID == path && pending != nil && pending.Plan != nil { + if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok { + pending.RebuildIO = fakeRebuildIO{achievedLSN: plan.RebuildTargetLSN} + } + } + } + + errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 2, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), + LeaseTtlMs: 30000, + ReplicaDataAddr: "127.0.0.1:0", + ReplicaCtrlAddr: "127.0.0.1:0", + RebuildAddr: "127.0.0.1:15000", + }}) + if len(errs) != 1 || errs[0] != nil { + t.Fatalf("apply errs=%v", errs) + } + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + proj, ok := bs.CoreProjection(path) + sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-test") + if ok && proj.Recovery.Phase == engine.RecoveryIdle && sender != nil && sender.State() == engine.StateInSync { + break + } + time.Sleep(10 * time.Millisecond) + } + + if legacyCalls != 0 { + t.Fatalf("legacy direct rebuild should not run when core is present, calls=%d", legacyCalls) + } + vol, ok := bs.blockStore.GetBlockVolume(path) + if !ok { + t.Fatal("volume not found") + } + status := vol.Status() + if status.Role != blockvol.RoleRebuilding || status.Epoch != 2 { + t.Fatalf("status=%+v", status) + } + cmds := bs.ExecutedCoreCommands(path) + if !reflect.DeepEqual(cmds, []string{"apply_role", "start_recovery_task", "start_rebuild"}) { + t.Fatalf("expected rebuilding assignment to execute apply_role + start_recovery_task + start_rebuild, got %v", cmds) + } + proj, ok := bs.CoreProjection(path) + if !ok { + t.Fatal("expected core projection") + } + if proj.Recovery.Phase != engine.RecoveryIdle { + t.Fatalf("recovery_phase=%s", proj.Recovery.Phase) + } + if proj.Mode.Reason == "awaiting_receiver_ready" { + t.Fatalf("rebuilding assignment should not be reported as awaiting receiver readiness: %+v", proj.Mode) + } + sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-test") + if sender == nil { + t.Fatal("sender not found") + } + if sender.State() != engine.StateInSync { + t.Fatalf("sender state=%s", sender.State()) + } +} + +func TestBlockService_ApplyAssignments_RebuildingRole_PreservesLegacyFallbackWithoutCore(t *testing.T) { + dir := t.TempDir() + store := storage.NewBlockVolumeStore() + t.Cleanup(func() { store.Close() }) + bs := &BlockService{ + blockStore: store, + blockDir: dir, + listenAddr: "127.0.0.1:3260", + iqnPrefix: "iqn.2024-01.com.seaweedfs:vol.", + replStates: make(map[string]*volReplState), + localServerID: "vs-test", + } + + path := createTestVolDirect(t, bs, "vol-legacy-rebuild") + legacyCalls := 0 + legacyCalled := make(chan struct{}, 1) + bs.onLegacyStartRebuild = func(path, rebuildAddr string, epoch uint64) { + legacyCalls++ + select { + case legacyCalled <- struct{}{}: + default: + } + } + + errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{ + Path: path, + Epoch: 3, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), + LeaseTtlMs: 30000, + RebuildAddr: "127.0.0.1:15000", + }}) + if len(errs) != 1 || errs[0] != nil { + t.Fatalf("apply errs=%v", errs) + } + + select { + case <-legacyCalled: + case <-time.After(1 * time.Second): + t.Fatal("expected legacy direct rebuild to be used without core") + } + if legacyCalls != 1 { + t.Fatalf("legacy direct rebuild calls=%d", legacyCalls) + } +} + func TestBlockService_BarrierRejected_ExecutesCoreInvalidateSession(t *testing.T) { bs := newTestBlockServiceDirect(t) bs.v2Bridge = newTestControlBridge()