From 8e4028758ff693fb30ffd854053eab1ecbda5d50 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sun, 29 Mar 2026 15:21:39 -0700 Subject: [PATCH] fix: make rebuild path exclusive, enforce phase discipline, require tick for stall budget Rebuild exclusivity: - BeginCatchUp rejects SessionRebuild ("must use rebuild APIs") - RecordCatchUpProgress rejects SessionRebuild - Rebuild sessions can only be completed via CompleteRebuild - All legacy rebuild-through-catch-up paths in tests converted Phase discipline: - SelectRebuildSource requires session.Phase == PhaseHandshake - Cannot skip BeginConnect + RecordHandshake Stall budget: - RecordCatchUpProgress requires tick parameter when ProgressDeadlineTicks > 0 (no silent stall budget bypass) 3 new tests: rebuild exclusivity (catch-up APIs rejected), rebuild source requires handshake phase, stall budget requires tick. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/prototype/enginev2/p2_test.go | 13 ++-- sw-block/prototype/enginev2/phase45_test.go | 63 ++++++++++++++++++++ sw-block/prototype/enginev2/scenario_test.go | 7 ++- sw-block/prototype/enginev2/sender.go | 15 ++++- 4 files changed, 88 insertions(+), 10 deletions(-) diff --git a/sw-block/prototype/enginev2/p2_test.go b/sw-block/prototype/enginev2/p2_test.go index fc7e81cec..c6f5e0af6 100644 --- a/sw-block/prototype/enginev2/p2_test.go +++ b/sw-block/prototype/enginev2/p2_test.go @@ -379,14 +379,15 @@ func TestE2E_NeedsRebuild_Escalation(t *testing.T) { t.Fatal("should have rebuild session") } - // Step 5: Execute rebuild recovery (simulated). + // Step 5: Execute rebuild recovery via rebuild APIs. r1.BeginConnect(rebuildSess.ID) - r1.RecordHandshake(rebuildSess.ID, 0, 100) // full rebuild range - r1.BeginCatchUp(rebuildSess.ID) - r1.RecordCatchUpProgress(rebuildSess.ID, 100) + r1.RecordHandshake(rebuildSess.ID, 0, 100) + r1.SelectRebuildSource(rebuildSess.ID, 0, false, 100) // full base + r1.BeginRebuildTransfer(rebuildSess.ID) + r1.RecordRebuildTransferProgress(rebuildSess.ID, 100) - if !r1.CompleteSessionByID(rebuildSess.ID) { - t.Fatal("rebuild completion should succeed") + if err := r1.CompleteRebuild(rebuildSess.ID); err != nil { + t.Fatalf("rebuild completion: %v", err) } if r1.State != StateInSync { t.Fatalf("after rebuild: state=%s, want in_sync", r1.State) diff --git a/sw-block/prototype/enginev2/phase45_test.go b/sw-block/prototype/enginev2/phase45_test.go index 9bb145ed7..3e0e96430 100644 --- a/sw-block/prototype/enginev2/phase45_test.go +++ b/sw-block/prototype/enginev2/phase45_test.go @@ -347,6 +347,69 @@ func TestSender_RebuildAPIs_RejectStaleID(t *testing.T) { } } +// --- Rebuild exclusivity --- + +func TestSender_RebuildExclusive_CatchUpAPIsRejected(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionRebuild) + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + + // BeginCatchUp rejects rebuild session. + if err := s.BeginCatchUp(sess.ID); err == nil { + t.Fatal("BeginCatchUp should reject rebuild session") + } + + // RecordCatchUpProgress rejects rebuild session (even if we could somehow reach catchup phase). + if err := s.RecordCatchUpProgress(sess.ID, 50); err == nil { + t.Fatal("RecordCatchUpProgress should reject rebuild session") + } + + // CompleteSessionByID rejects (no catch-up convergence possible for rebuild). + if s.CompleteSessionByID(sess.ID) { + t.Fatal("catch-up completion should not work for rebuild session") + } +} + +func TestSender_RebuildSourceSelect_RequiresHandshakePhase(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionRebuild) + + // Skip BeginConnect + RecordHandshake → should fail at SelectRebuildSource. + if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err == nil { + t.Fatal("source select should require PhaseHandshake") + } + + // After proper phase entry. + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + + // Now it works. + if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err != nil { + t.Fatalf("source select after handshake: %v", err) + } +} + +func TestSender_StallBudget_RequiresTick(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + sess.Budget = &CatchUpBudget{ProgressDeadlineTicks: 5} + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + s.BeginCatchUp(sess.ID, 0) + + // Without tick, progress is rejected when stall budget is configured. + if err := s.RecordCatchUpProgress(sess.ID, 10); err == nil { + t.Fatal("progress without tick should be rejected when ProgressDeadlineTicks > 0") + } + + // With tick, progress works. + if err := s.RecordCatchUpProgress(sess.ID, 10, 1); err != nil { + t.Fatalf("progress with tick: %v", err) + } +} + // --- E2E: Bounded catch-up → budget exceeded → rebuild --- func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) { diff --git a/sw-block/prototype/enginev2/scenario_test.go b/sw-block/prototype/enginev2/scenario_test.go index 2be285ff9..5fd9afd75 100644 --- a/sw-block/prototype/enginev2/scenario_test.go +++ b/sw-block/prototype/enginev2/scenario_test.go @@ -363,9 +363,10 @@ func TestBoundary_NeedsRebuild_PersistsAcrossUpdate(t *testing.T) { rebuildSess := r1.Session() r1.BeginConnect(rebuildSess.ID) r1.RecordHandshake(rebuildSess.ID, 0, 100) - r1.BeginCatchUp(rebuildSess.ID) - r1.RecordCatchUpProgress(rebuildSess.ID, 100) - r1.CompleteSessionByID(rebuildSess.ID) + r1.SelectRebuildSource(rebuildSess.ID, 0, false, 100) // full base + r1.BeginRebuildTransfer(rebuildSess.ID) + r1.RecordRebuildTransferProgress(rebuildSess.ID, 100) + r1.CompleteRebuild(rebuildSess.ID) if r1.State != StateInSync { t.Fatalf("after rebuild: state=%s", r1.State) diff --git a/sw-block/prototype/enginev2/sender.go b/sw-block/prototype/enginev2/sender.go index 3b1d92da7..a5b636569 100644 --- a/sw-block/prototype/enginev2/sender.go +++ b/sw-block/prototype/enginev2/sender.go @@ -296,13 +296,16 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error // Freezes TargetLSNAtStart from the session's TargetLSN — catch-up will not // chase a moving head beyond this boundary. // Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp. -// Rejects: wrong sessionID, wrong phase. +// Rejects: wrong sessionID, wrong phase, SessionRebuild (must use rebuild APIs). func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error { s.mu.Lock() defer s.mu.Unlock() if err := s.checkSessionAuthority(sessionID); err != nil { return err } + if s.session.Kind == SessionRebuild { + return fmt.Errorf("rebuild sessions must use rebuild APIs, not catch-up") + } if !s.session.Advance(PhaseCatchUp) { return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase) } @@ -330,6 +333,9 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tic if err := s.checkSessionAuthority(sessionID); err != nil { return err } + if s.session.Kind == SessionRebuild { + return fmt.Errorf("rebuild sessions must use rebuild APIs, not catch-up progress") + } if s.session.Phase != PhaseCatchUp { return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase) } @@ -342,6 +348,10 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tic return fmt.Errorf("progress %d exceeds frozen target %d", recoveredTo, s.session.Budget.TargetLSNAtStart) } + // Tick is mandatory when ProgressDeadlineTicks is configured. + if s.session.Budget != nil && s.session.Budget.ProgressDeadlineTicks > 0 && len(tick) == 0 { + return fmt.Errorf("tick is required when ProgressDeadlineTicks is configured") + } // Entry counting by LSN delta, not call count. delta := recoveredTo - s.session.RecoveredTo s.session.Tracker.EntriesReplayed += delta @@ -387,6 +397,9 @@ func (s *Sender) SelectRebuildSource(sessionID uint64, snapshotLSN uint64, snaps if s.session.Kind != SessionRebuild { return fmt.Errorf("not a rebuild session (kind=%s)", s.session.Kind) } + if s.session.Phase != PhaseHandshake { + return fmt.Errorf("rebuild source select requires PhaseHandshake, got %s", s.session.Phase) + } if s.session.Rebuild == nil { return fmt.Errorf("rebuild state not initialized") }