mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 01:31:34 +00:00
fix: make rebuild path exclusive, enforce phase discipline, require tick for stall budget
Rebuild exclusivity:
- BeginCatchUp rejects SessionRebuild ("must use rebuild APIs")
- RecordCatchUpProgress rejects SessionRebuild
- Rebuild sessions can only be completed via CompleteRebuild
- All legacy rebuild-through-catch-up paths in tests converted
Phase discipline:
- SelectRebuildSource requires session.Phase == PhaseHandshake
- Cannot skip BeginConnect + RecordHandshake
Stall budget:
- RecordCatchUpProgress requires tick parameter when
ProgressDeadlineTicks > 0 (no silent stall budget bypass)
3 new tests: rebuild exclusivity (catch-up APIs rejected),
rebuild source requires handshake phase, stall budget requires tick.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -379,14 +379,15 @@ func TestE2E_NeedsRebuild_Escalation(t *testing.T) {
|
||||
t.Fatal("should have rebuild session")
|
||||
}
|
||||
|
||||
// Step 5: Execute rebuild recovery (simulated).
|
||||
// Step 5: Execute rebuild recovery via rebuild APIs.
|
||||
r1.BeginConnect(rebuildSess.ID)
|
||||
r1.RecordHandshake(rebuildSess.ID, 0, 100) // full rebuild range
|
||||
r1.BeginCatchUp(rebuildSess.ID)
|
||||
r1.RecordCatchUpProgress(rebuildSess.ID, 100)
|
||||
r1.RecordHandshake(rebuildSess.ID, 0, 100)
|
||||
r1.SelectRebuildSource(rebuildSess.ID, 0, false, 100) // full base
|
||||
r1.BeginRebuildTransfer(rebuildSess.ID)
|
||||
r1.RecordRebuildTransferProgress(rebuildSess.ID, 100)
|
||||
|
||||
if !r1.CompleteSessionByID(rebuildSess.ID) {
|
||||
t.Fatal("rebuild completion should succeed")
|
||||
if err := r1.CompleteRebuild(rebuildSess.ID); err != nil {
|
||||
t.Fatalf("rebuild completion: %v", err)
|
||||
}
|
||||
if r1.State != StateInSync {
|
||||
t.Fatalf("after rebuild: state=%s, want in_sync", r1.State)
|
||||
|
||||
@@ -347,6 +347,69 @@ func TestSender_RebuildAPIs_RejectStaleID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// --- Rebuild exclusivity ---
|
||||
|
||||
func TestSender_RebuildExclusive_CatchUpAPIsRejected(t *testing.T) {
|
||||
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
|
||||
sess, _ := s.AttachSession(1, SessionRebuild)
|
||||
s.BeginConnect(sess.ID)
|
||||
s.RecordHandshake(sess.ID, 0, 100)
|
||||
|
||||
// BeginCatchUp rejects rebuild session.
|
||||
if err := s.BeginCatchUp(sess.ID); err == nil {
|
||||
t.Fatal("BeginCatchUp should reject rebuild session")
|
||||
}
|
||||
|
||||
// RecordCatchUpProgress rejects rebuild session (even if we could somehow reach catchup phase).
|
||||
if err := s.RecordCatchUpProgress(sess.ID, 50); err == nil {
|
||||
t.Fatal("RecordCatchUpProgress should reject rebuild session")
|
||||
}
|
||||
|
||||
// CompleteSessionByID rejects (no catch-up convergence possible for rebuild).
|
||||
if s.CompleteSessionByID(sess.ID) {
|
||||
t.Fatal("catch-up completion should not work for rebuild session")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSender_RebuildSourceSelect_RequiresHandshakePhase(t *testing.T) {
|
||||
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
|
||||
sess, _ := s.AttachSession(1, SessionRebuild)
|
||||
|
||||
// Skip BeginConnect + RecordHandshake → should fail at SelectRebuildSource.
|
||||
if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err == nil {
|
||||
t.Fatal("source select should require PhaseHandshake")
|
||||
}
|
||||
|
||||
// After proper phase entry.
|
||||
s.BeginConnect(sess.ID)
|
||||
s.RecordHandshake(sess.ID, 0, 100)
|
||||
|
||||
// Now it works.
|
||||
if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err != nil {
|
||||
t.Fatalf("source select after handshake: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSender_StallBudget_RequiresTick(t *testing.T) {
|
||||
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
|
||||
sess, _ := s.AttachSession(1, SessionCatchUp)
|
||||
sess.Budget = &CatchUpBudget{ProgressDeadlineTicks: 5}
|
||||
|
||||
s.BeginConnect(sess.ID)
|
||||
s.RecordHandshake(sess.ID, 0, 100)
|
||||
s.BeginCatchUp(sess.ID, 0)
|
||||
|
||||
// Without tick, progress is rejected when stall budget is configured.
|
||||
if err := s.RecordCatchUpProgress(sess.ID, 10); err == nil {
|
||||
t.Fatal("progress without tick should be rejected when ProgressDeadlineTicks > 0")
|
||||
}
|
||||
|
||||
// With tick, progress works.
|
||||
if err := s.RecordCatchUpProgress(sess.ID, 10, 1); err != nil {
|
||||
t.Fatalf("progress with tick: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- E2E: Bounded catch-up → budget exceeded → rebuild ---
|
||||
|
||||
func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) {
|
||||
|
||||
@@ -363,9 +363,10 @@ func TestBoundary_NeedsRebuild_PersistsAcrossUpdate(t *testing.T) {
|
||||
rebuildSess := r1.Session()
|
||||
r1.BeginConnect(rebuildSess.ID)
|
||||
r1.RecordHandshake(rebuildSess.ID, 0, 100)
|
||||
r1.BeginCatchUp(rebuildSess.ID)
|
||||
r1.RecordCatchUpProgress(rebuildSess.ID, 100)
|
||||
r1.CompleteSessionByID(rebuildSess.ID)
|
||||
r1.SelectRebuildSource(rebuildSess.ID, 0, false, 100) // full base
|
||||
r1.BeginRebuildTransfer(rebuildSess.ID)
|
||||
r1.RecordRebuildTransferProgress(rebuildSess.ID, 100)
|
||||
r1.CompleteRebuild(rebuildSess.ID)
|
||||
|
||||
if r1.State != StateInSync {
|
||||
t.Fatalf("after rebuild: state=%s", r1.State)
|
||||
|
||||
@@ -296,13 +296,16 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error
|
||||
// Freezes TargetLSNAtStart from the session's TargetLSN — catch-up will not
|
||||
// chase a moving head beyond this boundary.
|
||||
// Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp.
|
||||
// Rejects: wrong sessionID, wrong phase.
|
||||
// Rejects: wrong sessionID, wrong phase, SessionRebuild (must use rebuild APIs).
|
||||
func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if err := s.checkSessionAuthority(sessionID); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.session.Kind == SessionRebuild {
|
||||
return fmt.Errorf("rebuild sessions must use rebuild APIs, not catch-up")
|
||||
}
|
||||
if !s.session.Advance(PhaseCatchUp) {
|
||||
return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase)
|
||||
}
|
||||
@@ -330,6 +333,9 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tic
|
||||
if err := s.checkSessionAuthority(sessionID); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.session.Kind == SessionRebuild {
|
||||
return fmt.Errorf("rebuild sessions must use rebuild APIs, not catch-up progress")
|
||||
}
|
||||
if s.session.Phase != PhaseCatchUp {
|
||||
return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase)
|
||||
}
|
||||
@@ -342,6 +348,10 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tic
|
||||
return fmt.Errorf("progress %d exceeds frozen target %d",
|
||||
recoveredTo, s.session.Budget.TargetLSNAtStart)
|
||||
}
|
||||
// Tick is mandatory when ProgressDeadlineTicks is configured.
|
||||
if s.session.Budget != nil && s.session.Budget.ProgressDeadlineTicks > 0 && len(tick) == 0 {
|
||||
return fmt.Errorf("tick is required when ProgressDeadlineTicks is configured")
|
||||
}
|
||||
// Entry counting by LSN delta, not call count.
|
||||
delta := recoveredTo - s.session.RecoveredTo
|
||||
s.session.Tracker.EntriesReplayed += delta
|
||||
@@ -387,6 +397,9 @@ func (s *Sender) SelectRebuildSource(sessionID uint64, snapshotLSN uint64, snaps
|
||||
if s.session.Kind != SessionRebuild {
|
||||
return fmt.Errorf("not a rebuild session (kind=%s)", s.session.Kind)
|
||||
}
|
||||
if s.session.Phase != PhaseHandshake {
|
||||
return fmt.Errorf("rebuild source select requires PhaseHandshake, got %s", s.session.Phase)
|
||||
}
|
||||
if s.session.Rebuild == nil {
|
||||
return fmt.Errorf("rebuild state not initialized")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user