mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-20 16:51:31 +00:00
feat: add engine data/recoverability core (Phase 05 Slice 3)
New file: history.go — RetainedHistory connects recovery decisions to actual WAL retention state: - IsRecoverable: checks gap against tail/head boundaries - MakeHandshakeResult: generates HandshakeResult from retention state - RebuildSourceDecision: chooses snapshot+tail vs full base from checkpoint state (trusted vs untrusted) - ProveRecoverability: generates explicit proof explaining why recovery is or is not allowed 14 new tests (recoverability_test.go): - Recoverable/unrecoverable gap (exact boundary, beyond head) - Trusted/untrusted/no checkpoint → rebuild source selection - Handshake from retained history → outcome classification - Recoverability proofs (zero-gap, ahead, within retention, beyond) - E2E: two replicas driven by retained history (catch-up + rebuild) - Truncation required for replica ahead of committed Engine module at 44 tests (12 + 18 + 14). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
112
sw-block/engine/replication/history.go
Normal file
112
sw-block/engine/replication/history.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package replication
|
||||
|
||||
import "fmt"
|
||||
|
||||
// RetainedHistory represents the primary's WAL retention state as seen
|
||||
// by the recovery decision path. It answers "why is recovery allowed?"
|
||||
// with executable proof, not just policy assertions.
|
||||
//
|
||||
// This is the engine-level equivalent of the prototype's WALHistory —
|
||||
// it provides the recoverability inputs that ClassifyRecoveryOutcome
|
||||
// and rebuild-source selection consume.
|
||||
type RetainedHistory struct {
|
||||
// HeadLSN is the highest LSN written to the primary WAL.
|
||||
HeadLSN uint64
|
||||
|
||||
// TailLSN is the oldest retained LSN boundary (exclusive).
|
||||
// Entries with LSN > TailLSN are available for catch-up.
|
||||
// Entries with LSN <= TailLSN have been recycled.
|
||||
TailLSN uint64
|
||||
|
||||
// CommittedLSN is the lineage-safe boundary — the highest LSN
|
||||
// acknowledged as durable by the commit protocol.
|
||||
CommittedLSN uint64
|
||||
|
||||
// CheckpointLSN is the highest LSN with a durable base image.
|
||||
// Used for rebuild-source decision: if CheckpointLSN > 0 and
|
||||
// the checkpoint is trusted, snapshot+tail rebuild is possible.
|
||||
CheckpointLSN uint64
|
||||
|
||||
// CheckpointTrusted indicates whether the checkpoint base image
|
||||
// is known to be consistent and usable for rebuild.
|
||||
CheckpointTrusted bool
|
||||
}
|
||||
|
||||
// MakeHandshakeResult generates a HandshakeResult from the primary's
|
||||
// retained history and a replica's reported flushed LSN.
|
||||
func (rh *RetainedHistory) MakeHandshakeResult(replicaFlushedLSN uint64) HandshakeResult {
|
||||
retentionStart := rh.TailLSN + 1
|
||||
if rh.TailLSN == 0 {
|
||||
retentionStart = 0
|
||||
}
|
||||
return HandshakeResult{
|
||||
ReplicaFlushedLSN: replicaFlushedLSN,
|
||||
CommittedLSN: rh.CommittedLSN,
|
||||
RetentionStartLSN: retentionStart,
|
||||
}
|
||||
}
|
||||
|
||||
// IsRecoverable checks whether all entries from startExclusive+1 to
|
||||
// endInclusive are available in the retained WAL.
|
||||
func (rh *RetainedHistory) IsRecoverable(startExclusive, endInclusive uint64) bool {
|
||||
if startExclusive < rh.TailLSN {
|
||||
return false
|
||||
}
|
||||
if endInclusive > rh.HeadLSN {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// RebuildSourceDecision determines the optimal rebuild source from
|
||||
// the current retained history state.
|
||||
func (rh *RetainedHistory) RebuildSourceDecision() (source RebuildSource, snapshotLSN uint64) {
|
||||
if rh.CheckpointTrusted && rh.CheckpointLSN > 0 {
|
||||
return RebuildSnapshotTail, rh.CheckpointLSN
|
||||
}
|
||||
return RebuildFullBase, 0
|
||||
}
|
||||
|
||||
// RecoverabilityProof explains why a gap is or is not recoverable.
|
||||
type RecoverabilityProof struct {
|
||||
ReplicaFlushedLSN uint64
|
||||
CommittedLSN uint64
|
||||
TailLSN uint64
|
||||
HeadLSN uint64
|
||||
Recoverable bool
|
||||
Reason string
|
||||
}
|
||||
|
||||
// ProveRecoverability generates an explicit proof for a recovery decision.
|
||||
func (rh *RetainedHistory) ProveRecoverability(replicaFlushedLSN uint64) RecoverabilityProof {
|
||||
proof := RecoverabilityProof{
|
||||
ReplicaFlushedLSN: replicaFlushedLSN,
|
||||
CommittedLSN: rh.CommittedLSN,
|
||||
TailLSN: rh.TailLSN,
|
||||
HeadLSN: rh.HeadLSN,
|
||||
}
|
||||
|
||||
if replicaFlushedLSN == rh.CommittedLSN {
|
||||
proof.Recoverable = true
|
||||
proof.Reason = "zero_gap"
|
||||
return proof
|
||||
}
|
||||
|
||||
if replicaFlushedLSN > rh.CommittedLSN {
|
||||
proof.Recoverable = true
|
||||
proof.Reason = "replica_ahead_needs_truncation"
|
||||
return proof
|
||||
}
|
||||
|
||||
if rh.IsRecoverable(replicaFlushedLSN, rh.CommittedLSN) {
|
||||
proof.Recoverable = true
|
||||
proof.Reason = fmt.Sprintf("gap_within_retention: need LSN %d-%d, tail=%d head=%d",
|
||||
replicaFlushedLSN+1, rh.CommittedLSN, rh.TailLSN, rh.HeadLSN)
|
||||
return proof
|
||||
}
|
||||
|
||||
proof.Recoverable = false
|
||||
proof.Reason = fmt.Sprintf("gap_beyond_retention: need LSN %d but tail=%d",
|
||||
replicaFlushedLSN+1, rh.TailLSN)
|
||||
return proof
|
||||
}
|
||||
313
sw-block/engine/replication/recoverability_test.go
Normal file
313
sw-block/engine/replication/recoverability_test.go
Normal file
@@ -0,0 +1,313 @@
|
||||
package replication
|
||||
|
||||
import "testing"
|
||||
|
||||
// ============================================================
|
||||
// Phase 05 Slice 3: Engine Data / Recoverability Core
|
||||
//
|
||||
// Tests validate that recovery decisions are backed by actual
|
||||
// retained-history state, not just policy assertions.
|
||||
// ============================================================
|
||||
|
||||
// --- Recoverable vs unrecoverable gap ---
|
||||
|
||||
func TestHistory_Recoverable_GapWithinRetention(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
HeadLSN: 100,
|
||||
TailLSN: 30,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
if !rh.IsRecoverable(50, 100) {
|
||||
t.Fatal("gap 50→100 should be recoverable (tail=30)")
|
||||
}
|
||||
|
||||
proof := rh.ProveRecoverability(50)
|
||||
if !proof.Recoverable {
|
||||
t.Fatalf("proof: %s", proof.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_Unrecoverable_GapBeyondRetention(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
HeadLSN: 100,
|
||||
TailLSN: 60,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
if rh.IsRecoverable(50, 100) {
|
||||
t.Fatal("gap 50→100 should NOT be recoverable (tail=60)")
|
||||
}
|
||||
|
||||
proof := rh.ProveRecoverability(50)
|
||||
if proof.Recoverable {
|
||||
t.Fatal("proof should show unrecoverable")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_ExactBoundary(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
HeadLSN: 100,
|
||||
TailLSN: 50,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
// AT boundary: recoverable.
|
||||
if !rh.IsRecoverable(50, 100) {
|
||||
t.Fatal("exact boundary should be recoverable")
|
||||
}
|
||||
// ONE BELOW: unrecoverable.
|
||||
if rh.IsRecoverable(49, 100) {
|
||||
t.Fatal("below boundary should NOT be recoverable")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_BeyondHead_Unrecoverable(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
HeadLSN: 80,
|
||||
TailLSN: 0,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
if rh.IsRecoverable(0, 100) {
|
||||
t.Fatal("gap beyond head should NOT be recoverable")
|
||||
}
|
||||
}
|
||||
|
||||
// --- Trusted base vs no trusted base ---
|
||||
|
||||
func TestHistory_RebuildSource_TrustedCheckpoint(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
CheckpointLSN: 50,
|
||||
CheckpointTrusted: true,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
source, snapLSN := rh.RebuildSourceDecision()
|
||||
if source != RebuildSnapshotTail {
|
||||
t.Fatalf("source=%s, want snapshot_tail", source)
|
||||
}
|
||||
if snapLSN != 50 {
|
||||
t.Fatalf("snapshot LSN=%d, want 50", snapLSN)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_RebuildSource_NoCheckpoint(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
source, snapLSN := rh.RebuildSourceDecision()
|
||||
if source != RebuildFullBase {
|
||||
t.Fatalf("source=%s, want full_base", source)
|
||||
}
|
||||
if snapLSN != 0 {
|
||||
t.Fatalf("snapshot LSN=%d, want 0", snapLSN)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_RebuildSource_UntrustedCheckpoint(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
CheckpointLSN: 50,
|
||||
CheckpointTrusted: false,
|
||||
CommittedLSN: 100,
|
||||
}
|
||||
|
||||
source, _ := rh.RebuildSourceDecision()
|
||||
if source != RebuildFullBase {
|
||||
t.Fatalf("untrusted checkpoint: source=%s, want full_base", source)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Handshake result from retained history ---
|
||||
|
||||
func TestHistory_MakeHandshakeResult(t *testing.T) {
|
||||
rh := RetainedHistory{
|
||||
HeadLSN: 100,
|
||||
TailLSN: 30,
|
||||
CommittedLSN: 90,
|
||||
}
|
||||
|
||||
hr := rh.MakeHandshakeResult(70)
|
||||
if hr.ReplicaFlushedLSN != 70 {
|
||||
t.Fatalf("replica=%d", hr.ReplicaFlushedLSN)
|
||||
}
|
||||
if hr.CommittedLSN != 90 {
|
||||
t.Fatalf("committed=%d", hr.CommittedLSN)
|
||||
}
|
||||
if hr.RetentionStartLSN != 31 {
|
||||
t.Fatalf("retention=%d, want 31", hr.RetentionStartLSN)
|
||||
}
|
||||
|
||||
outcome := ClassifyRecoveryOutcome(hr)
|
||||
if outcome != OutcomeCatchUp {
|
||||
t.Fatalf("outcome=%s", outcome)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Recoverability proof ---
|
||||
|
||||
func TestHistory_Proof_ZeroGap(t *testing.T) {
|
||||
rh := RetainedHistory{CommittedLSN: 100}
|
||||
proof := rh.ProveRecoverability(100)
|
||||
if !proof.Recoverable || proof.Reason != "zero_gap" {
|
||||
t.Fatalf("proof: recoverable=%v reason=%s", proof.Recoverable, proof.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_Proof_ReplicaAhead(t *testing.T) {
|
||||
rh := RetainedHistory{CommittedLSN: 100}
|
||||
proof := rh.ProveRecoverability(105)
|
||||
if !proof.Recoverable || proof.Reason != "replica_ahead_needs_truncation" {
|
||||
t.Fatalf("proof: recoverable=%v reason=%s", proof.Recoverable, proof.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_Proof_GapWithinRetention(t *testing.T) {
|
||||
rh := RetainedHistory{HeadLSN: 100, TailLSN: 30, CommittedLSN: 100}
|
||||
proof := rh.ProveRecoverability(50)
|
||||
if !proof.Recoverable {
|
||||
t.Fatalf("proof: %s", proof.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistory_Proof_GapBeyondRetention(t *testing.T) {
|
||||
rh := RetainedHistory{HeadLSN: 100, TailLSN: 60, CommittedLSN: 100}
|
||||
proof := rh.ProveRecoverability(50)
|
||||
if proof.Recoverable {
|
||||
t.Fatal("should not be recoverable")
|
||||
}
|
||||
}
|
||||
|
||||
// --- End-to-end: retained-history-driven recovery flow ---
|
||||
|
||||
func TestHistory_E2E_RecoveryDrivenByRetainedHistory(t *testing.T) {
|
||||
// Primary's retained history.
|
||||
primary := RetainedHistory{
|
||||
HeadLSN: 100,
|
||||
TailLSN: 30,
|
||||
CommittedLSN: 100,
|
||||
CheckpointLSN: 50,
|
||||
CheckpointTrusted: true,
|
||||
}
|
||||
|
||||
r := NewRegistry()
|
||||
r.ApplyAssignment(AssignmentIntent{
|
||||
Replicas: []ReplicaAssignment{
|
||||
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}},
|
||||
{ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", Version: 1}},
|
||||
},
|
||||
Epoch: 1,
|
||||
RecoveryTargets: map[string]SessionKind{
|
||||
"r1": SessionCatchUp,
|
||||
"r2": SessionCatchUp,
|
||||
},
|
||||
})
|
||||
|
||||
// r1: replica at LSN 70 — catch-up (within retention).
|
||||
r1 := r.Sender("r1")
|
||||
id1 := r1.SessionID()
|
||||
r1.BeginConnect(id1)
|
||||
|
||||
proof1 := primary.ProveRecoverability(70)
|
||||
if !proof1.Recoverable {
|
||||
t.Fatalf("r1: %s", proof1.Reason)
|
||||
}
|
||||
|
||||
hr1 := primary.MakeHandshakeResult(70)
|
||||
o1, _ := r1.RecordHandshakeWithOutcome(id1, hr1)
|
||||
if o1 != OutcomeCatchUp {
|
||||
t.Fatalf("r1: outcome=%s", o1)
|
||||
}
|
||||
r1.BeginCatchUp(id1)
|
||||
r1.RecordCatchUpProgress(id1, 100)
|
||||
r1.CompleteSessionByID(id1)
|
||||
|
||||
// r2: replica at LSN 20 — needs rebuild (beyond retention).
|
||||
r2 := r.Sender("r2")
|
||||
id2 := r2.SessionID()
|
||||
r2.BeginConnect(id2)
|
||||
|
||||
proof2 := primary.ProveRecoverability(20)
|
||||
if proof2.Recoverable {
|
||||
t.Fatal("r2: should not be recoverable")
|
||||
}
|
||||
|
||||
hr2 := primary.MakeHandshakeResult(20)
|
||||
o2, _ := r2.RecordHandshakeWithOutcome(id2, hr2)
|
||||
if o2 != OutcomeNeedsRebuild {
|
||||
t.Fatalf("r2: outcome=%s", o2)
|
||||
}
|
||||
|
||||
// r2 needs rebuild — use history to choose source.
|
||||
source, snapLSN := primary.RebuildSourceDecision()
|
||||
if source != RebuildSnapshotTail || snapLSN != 50 {
|
||||
t.Fatalf("rebuild source=%s snap=%d", source, snapLSN)
|
||||
}
|
||||
|
||||
// New rebuild session for r2.
|
||||
r.ApplyAssignment(AssignmentIntent{
|
||||
Replicas: []ReplicaAssignment{
|
||||
{ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", Version: 1}},
|
||||
},
|
||||
Epoch: 1,
|
||||
RecoveryTargets: map[string]SessionKind{"r2": SessionRebuild},
|
||||
})
|
||||
|
||||
id2b := r2.SessionID()
|
||||
r2.BeginConnect(id2b)
|
||||
r2.RecordHandshake(id2b, 0, 100)
|
||||
r2.SelectRebuildSource(id2b, snapLSN, true, primary.CommittedLSN)
|
||||
r2.BeginRebuildTransfer(id2b)
|
||||
r2.RecordRebuildTransferProgress(id2b, snapLSN)
|
||||
r2.BeginRebuildTailReplay(id2b)
|
||||
r2.RecordRebuildTailProgress(id2b, 100)
|
||||
r2.CompleteRebuild(id2b)
|
||||
|
||||
if r1.State() != StateInSync || r2.State() != StateInSync {
|
||||
t.Fatalf("r1=%s r2=%s", r1.State(), r2.State())
|
||||
}
|
||||
t.Log("e2e: r1 caught up (proof: gap within retention), r2 rebuilt (proof: gap beyond retention, snapshot+tail)")
|
||||
}
|
||||
|
||||
// --- Truncation / safe-boundary handling ---
|
||||
|
||||
func TestHistory_Proof_TruncationRequired(t *testing.T) {
|
||||
rh := RetainedHistory{CommittedLSN: 100}
|
||||
|
||||
// Replica ahead → truncation required.
|
||||
proof := rh.ProveRecoverability(105)
|
||||
if proof.Reason != "replica_ahead_needs_truncation" {
|
||||
t.Fatalf("reason=%s", proof.Reason)
|
||||
}
|
||||
|
||||
// Sender execution: handshake sets truncation requirement.
|
||||
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
|
||||
id, _ := s.AttachSession(1, SessionCatchUp)
|
||||
s.BeginConnect(id)
|
||||
|
||||
hr := rh.MakeHandshakeResult(105)
|
||||
outcome, _ := s.RecordHandshakeWithOutcome(id, hr)
|
||||
if outcome != OutcomeCatchUp {
|
||||
t.Fatalf("outcome=%s", outcome)
|
||||
}
|
||||
|
||||
// Session should require truncation.
|
||||
snap := s.SessionSnapshot()
|
||||
if snap == nil {
|
||||
t.Fatal("session should exist")
|
||||
}
|
||||
|
||||
// Completion without truncation rejected.
|
||||
if s.CompleteSessionByID(id) {
|
||||
t.Fatal("should reject completion without truncation")
|
||||
}
|
||||
|
||||
// Record truncation.
|
||||
s.RecordTruncation(id, 100)
|
||||
|
||||
// Now completion works (zero-gap after truncation).
|
||||
if !s.CompleteSessionByID(id) {
|
||||
t.Fatal("completion after truncation should succeed")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user