mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-19 08:11:29 +00:00
fix: Batch 3 wiring — production path uses runtime helpers, legacy isolated
H wiring: block_recovery.go now uses runtime.PendingCoordinator - Removed local pendingRecoveryExecution type + store/take/peek/has/cancel - ExecutePendingCatchUp/Rebuild delegate to coord.TakeCatchUp/TakeRebuild - Shutdown uses coord.CancelAll - Added CancelAll to PendingCoordinator I wiring: executeCatchUpPlan/executeRebuildPlan replaced - ExecutePendingCatchUp now calls rt.ExecuteCatchUpPlan with RecoveryManager as RecoveryCallbacks (OnCatchUpCompleted/OnRebuildCompleted) - ExecutePendingRebuild follows same pattern - Local executeCatchUpPlan/executeRebuildPlan methods removed J structural: legacy no-core branches extracted - executeLegacyCatchUp: wraps rt.ExecuteCatchUpPlan for v2Core==nil path - executeLegacyRebuild: wraps rt.ExecuteRebuildPlan for v2Core==nil path - Clear "LEGACY NO-CORE COMPATIBILITY" section with structural separation - runCatchUp/runRebuild now branch cleanly: legacy helper vs core coordinator Test updates: pendingRecoveryExecution → rt.PendingExecution, field casing, Plan type assertions. Validation: all P4, P16B, and ApplyAssignments tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -137,3 +137,20 @@ func (pc *PendingCoordinator) Peek(volumeID string) *PendingExecution {
|
||||
defer pc.mu.Unlock()
|
||||
return pc.pending[volumeID]
|
||||
}
|
||||
|
||||
// CancelAll cancels and removes all pending executions.
|
||||
func (pc *PendingCoordinator) CancelAll(reason string) {
|
||||
pc.mu.Lock()
|
||||
all := make(map[string]*PendingExecution, len(pc.pending))
|
||||
for k, v := range pc.pending {
|
||||
all[k] = v
|
||||
}
|
||||
pc.pending = make(map[string]*PendingExecution)
|
||||
pc.mu.Unlock()
|
||||
|
||||
if pc.cancelFn != nil {
|
||||
for _, pe := range all {
|
||||
pc.cancelFn(pe, reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
bridge "github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol"
|
||||
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
|
||||
rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
|
||||
@@ -31,10 +32,10 @@ type recoveryTask struct {
|
||||
type RecoveryManager struct {
|
||||
bs *BlockService
|
||||
|
||||
mu sync.Mutex
|
||||
tasks map[string]*recoveryTask
|
||||
pending map[string]*pendingRecoveryExecution
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
tasks map[string]*recoveryTask
|
||||
coord *rt.PendingCoordinator
|
||||
wg sync.WaitGroup
|
||||
|
||||
// TestHook: if set, called before execution starts. Tests use this
|
||||
// to hold the goroutine alive for serialized-replacement proofs.
|
||||
@@ -42,24 +43,24 @@ type RecoveryManager struct {
|
||||
|
||||
// TestHook: if set, may adjust a freshly cached pending execution before
|
||||
// the core event is emitted. Used only by focused ownership tests.
|
||||
OnPendingExecution func(volumeID string, pending *pendingRecoveryExecution)
|
||||
}
|
||||
|
||||
type pendingRecoveryExecution struct {
|
||||
volumeID string
|
||||
replicaID string
|
||||
driver *engine.RecoveryDriver
|
||||
plan *engine.RecoveryPlan
|
||||
catchUpIO engine.CatchUpIO
|
||||
rebuildIO engine.RebuildIO
|
||||
OnPendingExecution func(volumeID string, pending *rt.PendingExecution)
|
||||
}
|
||||
|
||||
func NewRecoveryManager(bs *BlockService) *RecoveryManager {
|
||||
return &RecoveryManager{
|
||||
bs: bs,
|
||||
tasks: make(map[string]*recoveryTask),
|
||||
pending: make(map[string]*pendingRecoveryExecution),
|
||||
rm := &RecoveryManager{
|
||||
bs: bs,
|
||||
tasks: make(map[string]*recoveryTask),
|
||||
}
|
||||
rm.coord = rt.NewPendingCoordinator(func(pe *rt.PendingExecution, reason string) {
|
||||
if pe != nil && pe.Driver != nil && pe.Plan != nil {
|
||||
if drv, ok := pe.Driver.(*engine.RecoveryDriver); ok {
|
||||
if plan, ok := pe.Plan.(*engine.RecoveryPlan); ok {
|
||||
drv.CancelPlan(plan, reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
return rm
|
||||
}
|
||||
|
||||
// === LEGACY NO-CORE COMPATIBILITY ===
|
||||
@@ -160,13 +161,8 @@ func (rm *RecoveryManager) Shutdown() {
|
||||
}
|
||||
}
|
||||
rm.tasks = make(map[string]*recoveryTask)
|
||||
for volumeID, pending := range rm.pending {
|
||||
if pending != nil && pending.driver != nil && pending.plan != nil {
|
||||
pending.driver.CancelPlan(pending.plan, "recovery_shutdown")
|
||||
}
|
||||
delete(rm.pending, volumeID)
|
||||
}
|
||||
rm.mu.Unlock()
|
||||
rm.coord.CancelAll("recovery_shutdown")
|
||||
rm.wg.Wait()
|
||||
}
|
||||
|
||||
@@ -288,25 +284,20 @@ func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAdd
|
||||
switch plan.Outcome {
|
||||
case engine.OutcomeCatchUp:
|
||||
if bs.v2Core == nil {
|
||||
if err := rm.executeCatchUpPlan(volPath, replicaID, driver, plan, executor); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err)
|
||||
} else {
|
||||
glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, err)
|
||||
}
|
||||
}
|
||||
rm.executeLegacyCatchUp(ctx, volPath, replicaID, driver, plan, executor)
|
||||
return
|
||||
}
|
||||
rm.storePendingExecution(volPath, &pendingRecoveryExecution{
|
||||
volumeID: volPath,
|
||||
replicaID: replicaID,
|
||||
driver: driver,
|
||||
plan: plan,
|
||||
catchUpIO: executor,
|
||||
rm.coord.Store(volPath, &rt.PendingExecution{
|
||||
VolumeID: volPath,
|
||||
ReplicaID: replicaID,
|
||||
CatchUpTarget: plan.CatchUpTarget,
|
||||
Driver: driver,
|
||||
Plan: plan,
|
||||
CatchUpIO: executor,
|
||||
})
|
||||
bs.applyCoreEvent(engine.CatchUpPlanned{ID: volPath, TargetLSN: plan.CatchUpTarget})
|
||||
if rm.hasPendingExecution(volPath) {
|
||||
rm.cancelPendingExecution(volPath, "start_catchup_not_emitted")
|
||||
if rm.coord.Has(volPath) {
|
||||
rm.coord.Cancel(volPath, "start_catchup_not_emitted")
|
||||
return
|
||||
}
|
||||
case engine.OutcomeNeedsRebuild:
|
||||
@@ -361,124 +352,70 @@ func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAdd
|
||||
return
|
||||
}
|
||||
if bs.v2Core == nil {
|
||||
if err := rm.executeRebuildPlan(volPath, replicaID, driver, plan, executor); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err)
|
||||
} else {
|
||||
glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err)
|
||||
}
|
||||
}
|
||||
rm.executeLegacyRebuild(ctx, volPath, replicaID, driver, plan, executor)
|
||||
return
|
||||
}
|
||||
rm.storePendingExecution(volPath, &pendingRecoveryExecution{
|
||||
volumeID: volPath,
|
||||
replicaID: replicaID,
|
||||
driver: driver,
|
||||
plan: plan,
|
||||
rebuildIO: executor,
|
||||
})
|
||||
pe := &rt.PendingExecution{
|
||||
VolumeID: volPath,
|
||||
ReplicaID: replicaID,
|
||||
RebuildTargetLSN: plan.RebuildTargetLSN,
|
||||
Driver: driver,
|
||||
Plan: plan,
|
||||
RebuildIO: executor,
|
||||
}
|
||||
rm.coord.Store(volPath, pe)
|
||||
if rm.OnPendingExecution != nil {
|
||||
if pending, ok := rm.peekPendingExecution(volPath); ok {
|
||||
rm.OnPendingExecution(volPath, pending)
|
||||
}
|
||||
rm.OnPendingExecution(volPath, pe)
|
||||
}
|
||||
bs.applyCoreEvent(engine.RebuildStarted{ID: volPath, TargetLSN: plan.RebuildTargetLSN})
|
||||
if rm.hasPendingExecution(volPath) {
|
||||
rm.cancelPendingExecution(volPath, "start_rebuild_not_emitted")
|
||||
if rm.coord.Has(volPath) {
|
||||
rm.coord.Cancel(volPath, "start_rebuild_not_emitted")
|
||||
}
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) storePendingExecution(volumeID string, pending *pendingRecoveryExecution) {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
if rm.pending == nil {
|
||||
rm.pending = make(map[string]*pendingRecoveryExecution)
|
||||
}
|
||||
rm.pending[volumeID] = pending
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) takePendingExecution(volumeID string) (*pendingRecoveryExecution, bool) {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
pending, ok := rm.pending[volumeID]
|
||||
if ok {
|
||||
delete(rm.pending, volumeID)
|
||||
}
|
||||
return pending, ok
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) peekPendingExecution(volumeID string) (*pendingRecoveryExecution, bool) {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
pending, ok := rm.pending[volumeID]
|
||||
return pending, ok
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) hasPendingExecution(volumeID string) bool {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
_, ok := rm.pending[volumeID]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) cancelPendingExecution(volumeID, reason string) {
|
||||
pending, ok := rm.takePendingExecution(volumeID)
|
||||
if !ok || pending == nil || pending.driver == nil || pending.plan == nil {
|
||||
return
|
||||
}
|
||||
pending.driver.CancelPlan(pending.plan, reason)
|
||||
}
|
||||
// === Core-present pending execution (delegates to runtime.PendingCoordinator) ===
|
||||
|
||||
func (rm *RecoveryManager) ExecutePendingCatchUp(volumeID string, targetLSN uint64) error {
|
||||
pending, ok := rm.takePendingExecution(volumeID)
|
||||
if !ok || pending == nil || pending.plan == nil || pending.driver == nil {
|
||||
pe := rm.coord.TakeCatchUp(volumeID, targetLSN)
|
||||
if pe == nil {
|
||||
return nil
|
||||
}
|
||||
if pending.plan.CatchUpTarget != targetLSN {
|
||||
pending.driver.CancelPlan(pending.plan, "start_catchup_target_mismatch")
|
||||
drv, _ := pe.Driver.(*engine.RecoveryDriver)
|
||||
plan, _ := pe.Plan.(*engine.RecoveryPlan)
|
||||
io, _ := pe.CatchUpIO.(engine.CatchUpIO)
|
||||
if drv == nil || plan == nil {
|
||||
return nil
|
||||
}
|
||||
return rm.executeCatchUpPlan(volumeID, pending.replicaID, pending.driver, pending.plan, pending.catchUpIO)
|
||||
return rt.ExecuteCatchUpPlan(drv, plan, io, volumeID, rm)
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) ExecutePendingRebuild(volumeID string, targetLSN uint64) error {
|
||||
pending, ok := rm.takePendingExecution(volumeID)
|
||||
if !ok || pending == nil || pending.plan == nil || pending.driver == nil {
|
||||
pe := rm.coord.TakeRebuild(volumeID, targetLSN)
|
||||
if pe == nil {
|
||||
return nil
|
||||
}
|
||||
if pending.plan.RebuildTargetLSN != targetLSN {
|
||||
pending.driver.CancelPlan(pending.plan, "start_rebuild_target_mismatch")
|
||||
drv, _ := pe.Driver.(*engine.RecoveryDriver)
|
||||
plan, _ := pe.Plan.(*engine.RecoveryPlan)
|
||||
io, _ := pe.RebuildIO.(engine.RebuildIO)
|
||||
if drv == nil || plan == nil {
|
||||
return nil
|
||||
}
|
||||
return rm.executeRebuildPlan(volumeID, pending.replicaID, pending.driver, pending.plan, pending.rebuildIO)
|
||||
return rt.ExecuteRebuildPlan(drv, plan, io, volumeID, rm)
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) executeCatchUpPlan(volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) error {
|
||||
exec := engine.NewCatchUpExecutor(driver, plan)
|
||||
exec.IO = io
|
||||
if err := exec.Execute(nil, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(0).Infof("recovery: catch-up completed for %s", replicaID)
|
||||
// RecoveryCallbacks implementation — host-side completion notifications.
|
||||
|
||||
func (rm *RecoveryManager) OnCatchUpCompleted(volumeID string, achievedLSN uint64) {
|
||||
glog.V(0).Infof("recovery: catch-up completed for %s (achievedLSN=%d)", volumeID, achievedLSN)
|
||||
if rm.bs != nil && rm.bs.v2Core != nil {
|
||||
achievedLSN := plan.CatchUpTarget
|
||||
if achievedLSN == 0 {
|
||||
achievedLSN = plan.CatchUpStartLSN
|
||||
}
|
||||
rm.bs.applyCoreEvent(engine.CatchUpCompleted{ID: volumeID, AchievedLSN: achievedLSN})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) executeRebuildPlan(volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) error {
|
||||
exec := engine.NewRebuildExecutor(driver, plan)
|
||||
exec.IO = io
|
||||
if err := exec.Execute(); err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(0).Infof("recovery: rebuild completed for %s", replicaID)
|
||||
func (rm *RecoveryManager) OnRebuildCompleted(volumeID string, plan *engine.RecoveryPlan) {
|
||||
glog.V(0).Infof("recovery: rebuild completed for %s", volumeID)
|
||||
if rm.bs == nil || rm.bs.v2Core == nil {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
var snap blockvol.V2StatusSnapshot
|
||||
if err := rm.bs.blockStore.WithVolume(volumeID, func(vol *blockvol.BlockVol) error {
|
||||
@@ -505,7 +442,34 @@ func (rm *RecoveryManager) executeRebuildPlan(volumeID, replicaID string, driver
|
||||
FlushedLSN: flushedLSN,
|
||||
CheckpointLSN: checkpointLSN,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// === LEGACY NO-CORE COMPATIBILITY ===
|
||||
//
|
||||
// These methods execute recovery plans directly without going through the
|
||||
// core command path. They exist only for no-core compatibility and older tests.
|
||||
// Core-present paths use ExecutePendingCatchUp/ExecutePendingRebuild instead.
|
||||
|
||||
func (rm *RecoveryManager) executeLegacyCatchUp(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.CatchUpIO) {
|
||||
err := rt.ExecuteCatchUpPlan(driver, plan, io, volumeID, rm)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, err)
|
||||
} else {
|
||||
glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) executeLegacyRebuild(ctx context.Context, volumeID, replicaID string, driver *engine.RecoveryDriver, plan *engine.RecoveryPlan, io engine.RebuildIO) {
|
||||
err := rt.ExecuteRebuildPlan(driver, plan, io, volumeID, rm)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, err)
|
||||
} else {
|
||||
glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) deriveRebuildAddr(replicaID string, assignments []blockvol.BlockVolumeAssignment) string {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
|
||||
rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
|
||||
@@ -341,11 +342,13 @@ func TestP16B_RunRebuild_UsesCoreStartRebuildCommandOnLivePath(t *testing.T) {
|
||||
|
||||
rm := NewRecoveryManager(bs)
|
||||
bs.v2Recovery = rm
|
||||
rm.OnPendingExecution = func(volumeID string, pending *pendingRecoveryExecution) {
|
||||
if volumeID != volPath || pending == nil || pending.plan == nil {
|
||||
rm.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) {
|
||||
if volumeID != volPath || pending == nil || pending.Plan == nil {
|
||||
return
|
||||
}
|
||||
pending.rebuildIO = fakeRebuildIO{achievedLSN: pending.plan.RebuildTargetLSN}
|
||||
if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok {
|
||||
pending.RebuildIO = fakeRebuildIO{achievedLSN: plan.RebuildTargetLSN}
|
||||
}
|
||||
}
|
||||
_, _, rebuildPort := bs.ReplicationPorts(volPath)
|
||||
rebuildAddr := fmt.Sprintf("127.0.0.1:%d", rebuildPort)
|
||||
|
||||
@@ -4,8 +4,10 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
|
||||
rt "github.com/seaweedfs/seaweedfs/sw-block/engine/replication/runtime"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
)
|
||||
@@ -93,6 +95,21 @@ func createTestVolDirect(t *testing.T, bs *BlockService, name string) string {
|
||||
return path
|
||||
}
|
||||
|
||||
type fakeCatchUpIO struct {
|
||||
transferredTo uint64
|
||||
}
|
||||
|
||||
func (f fakeCatchUpIO) StreamWALEntries(startExclusive, endInclusive uint64) (uint64, error) {
|
||||
if f.transferredTo > 0 {
|
||||
return f.transferredTo, nil
|
||||
}
|
||||
return endInclusive, nil
|
||||
}
|
||||
|
||||
func (f fakeCatchUpIO) TruncateWAL(truncateLSN uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBlockService_ProcessAssignment_Primary(t *testing.T) {
|
||||
bs := newTestBlockServiceDirect(t)
|
||||
path := createTestVolDirect(t, bs, "vol1")
|
||||
@@ -405,6 +422,210 @@ func TestBlockService_ApplyAssignments_ExecutesCoreCommands_ReplicaRoleAndReceiv
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockService_ApplyAssignments_PrimaryRole_UsesCoreStartRecoveryTaskForCatchUp(t *testing.T) {
|
||||
bs := newTestBlockServiceDirect(t)
|
||||
bs.v2Bridge = newTestControlBridge()
|
||||
bs.v2Orchestrator = newTestOrchestrator()
|
||||
bs.v2Recovery = NewRecoveryManager(bs)
|
||||
defer bs.v2Recovery.Shutdown()
|
||||
|
||||
path := createTestVolDirect(t, bs, "vol-core-cmd-catchup-start")
|
||||
if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
|
||||
for i := 0; i < 5; i++ {
|
||||
if err := vol.WriteLBA(uint64(i), make([]byte, 4096)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("write: %v", err)
|
||||
}
|
||||
|
||||
bs.v2Recovery.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) {
|
||||
if volumeID == path && pending != nil && pending.Plan != nil {
|
||||
if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok {
|
||||
pending.CatchUpIO = fakeCatchUpIO{transferredTo: plan.CatchUpTarget}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{
|
||||
Path: path,
|
||||
Epoch: 1,
|
||||
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
||||
LeaseTtlMs: 30000,
|
||||
ReplicaServerID: "vs-2",
|
||||
ReplicaDataAddr: "10.0.0.2:4260",
|
||||
ReplicaCtrlAddr: "10.0.0.2:4261",
|
||||
}})
|
||||
if len(errs) != 1 || errs[0] != nil {
|
||||
t.Fatalf("apply errs=%v", errs)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
proj, ok := bs.CoreProjection(path)
|
||||
sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-2")
|
||||
if ok && proj.Recovery.Phase == engine.RecoveryIdle && sender != nil && sender.State() == engine.StateInSync {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
cmds := bs.ExecutedCoreCommands(path)
|
||||
if !reflect.DeepEqual(cmds, []string{"apply_role", "configure_shipper", "start_recovery_task", "start_catchup"}) {
|
||||
t.Fatalf("expected primary assignment to execute apply_role + configure_shipper + start_recovery_task + start_catchup, got %v", cmds)
|
||||
}
|
||||
proj, ok := bs.CoreProjection(path)
|
||||
if !ok {
|
||||
t.Fatal("expected core projection")
|
||||
}
|
||||
if proj.Recovery.Phase != engine.RecoveryIdle {
|
||||
t.Fatalf("recovery_phase=%s", proj.Recovery.Phase)
|
||||
}
|
||||
if proj.Boundary.DurableLSN == 0 {
|
||||
t.Fatalf("durable_lsn=%d", proj.Boundary.DurableLSN)
|
||||
}
|
||||
sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-2")
|
||||
if sender == nil {
|
||||
t.Fatal("sender not found")
|
||||
}
|
||||
if sender.State() != engine.StateInSync {
|
||||
t.Fatalf("sender state=%s", sender.State())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockService_ApplyAssignments_RebuildingRole_UsesCoreRecoveryPathWithoutLegacyDirectStart(t *testing.T) {
|
||||
bs := newTestBlockServiceDirect(t)
|
||||
bs.v2Bridge = newTestControlBridge()
|
||||
bs.v2Orchestrator = newTestOrchestrator()
|
||||
bs.v2Recovery = NewRecoveryManager(bs)
|
||||
defer bs.v2Recovery.Shutdown()
|
||||
|
||||
path := createTestVolDirect(t, bs, "vol-core-cmd-rebuild-assignment")
|
||||
if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
|
||||
if err := vol.WriteLBA(0, make([]byte, 4096)); err != nil {
|
||||
return err
|
||||
}
|
||||
return vol.ForceFlush()
|
||||
}); err != nil {
|
||||
t.Fatalf("write+flush: %v", err)
|
||||
}
|
||||
|
||||
legacyCalls := 0
|
||||
bs.onLegacyStartRebuild = func(path, rebuildAddr string, epoch uint64) {
|
||||
legacyCalls++
|
||||
}
|
||||
bs.v2Recovery.OnPendingExecution = func(volumeID string, pending *rt.PendingExecution) {
|
||||
if volumeID == path && pending != nil && pending.Plan != nil {
|
||||
if plan, ok := pending.Plan.(*engine.RecoveryPlan); ok {
|
||||
pending.RebuildIO = fakeRebuildIO{achievedLSN: plan.RebuildTargetLSN}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{
|
||||
Path: path,
|
||||
Epoch: 2,
|
||||
Role: blockvol.RoleToWire(blockvol.RoleRebuilding),
|
||||
LeaseTtlMs: 30000,
|
||||
ReplicaDataAddr: "127.0.0.1:0",
|
||||
ReplicaCtrlAddr: "127.0.0.1:0",
|
||||
RebuildAddr: "127.0.0.1:15000",
|
||||
}})
|
||||
if len(errs) != 1 || errs[0] != nil {
|
||||
t.Fatalf("apply errs=%v", errs)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
proj, ok := bs.CoreProjection(path)
|
||||
sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-test")
|
||||
if ok && proj.Recovery.Phase == engine.RecoveryIdle && sender != nil && sender.State() == engine.StateInSync {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if legacyCalls != 0 {
|
||||
t.Fatalf("legacy direct rebuild should not run when core is present, calls=%d", legacyCalls)
|
||||
}
|
||||
vol, ok := bs.blockStore.GetBlockVolume(path)
|
||||
if !ok {
|
||||
t.Fatal("volume not found")
|
||||
}
|
||||
status := vol.Status()
|
||||
if status.Role != blockvol.RoleRebuilding || status.Epoch != 2 {
|
||||
t.Fatalf("status=%+v", status)
|
||||
}
|
||||
cmds := bs.ExecutedCoreCommands(path)
|
||||
if !reflect.DeepEqual(cmds, []string{"apply_role", "start_recovery_task", "start_rebuild"}) {
|
||||
t.Fatalf("expected rebuilding assignment to execute apply_role + start_recovery_task + start_rebuild, got %v", cmds)
|
||||
}
|
||||
proj, ok := bs.CoreProjection(path)
|
||||
if !ok {
|
||||
t.Fatal("expected core projection")
|
||||
}
|
||||
if proj.Recovery.Phase != engine.RecoveryIdle {
|
||||
t.Fatalf("recovery_phase=%s", proj.Recovery.Phase)
|
||||
}
|
||||
if proj.Mode.Reason == "awaiting_receiver_ready" {
|
||||
t.Fatalf("rebuilding assignment should not be reported as awaiting receiver readiness: %+v", proj.Mode)
|
||||
}
|
||||
sender := bs.v2Orchestrator.Registry.Sender(path + "/vs-test")
|
||||
if sender == nil {
|
||||
t.Fatal("sender not found")
|
||||
}
|
||||
if sender.State() != engine.StateInSync {
|
||||
t.Fatalf("sender state=%s", sender.State())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockService_ApplyAssignments_RebuildingRole_PreservesLegacyFallbackWithoutCore(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
store := storage.NewBlockVolumeStore()
|
||||
t.Cleanup(func() { store.Close() })
|
||||
bs := &BlockService{
|
||||
blockStore: store,
|
||||
blockDir: dir,
|
||||
listenAddr: "127.0.0.1:3260",
|
||||
iqnPrefix: "iqn.2024-01.com.seaweedfs:vol.",
|
||||
replStates: make(map[string]*volReplState),
|
||||
localServerID: "vs-test",
|
||||
}
|
||||
|
||||
path := createTestVolDirect(t, bs, "vol-legacy-rebuild")
|
||||
legacyCalls := 0
|
||||
legacyCalled := make(chan struct{}, 1)
|
||||
bs.onLegacyStartRebuild = func(path, rebuildAddr string, epoch uint64) {
|
||||
legacyCalls++
|
||||
select {
|
||||
case legacyCalled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
errs := bs.ApplyAssignments([]blockvol.BlockVolumeAssignment{{
|
||||
Path: path,
|
||||
Epoch: 3,
|
||||
Role: blockvol.RoleToWire(blockvol.RoleRebuilding),
|
||||
LeaseTtlMs: 30000,
|
||||
RebuildAddr: "127.0.0.1:15000",
|
||||
}})
|
||||
if len(errs) != 1 || errs[0] != nil {
|
||||
t.Fatalf("apply errs=%v", errs)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-legacyCalled:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("expected legacy direct rebuild to be used without core")
|
||||
}
|
||||
if legacyCalls != 1 {
|
||||
t.Fatalf("legacy direct rebuild calls=%d", legacyCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockService_BarrierRejected_ExecutesCoreInvalidateSession(t *testing.T) {
|
||||
bs := newTestBlockServiceDirect(t)
|
||||
bs.v2Bridge = newTestControlBridge()
|
||||
|
||||
Reference in New Issue
Block a user