diff --git a/sw-block/.private/phase/phase-13-cp6-retention.md b/sw-block/.private/phase/phase-13-cp6-retention.md new file mode 100644 index 000000000..4146d31bb --- /dev/null +++ b/sw-block/.private/phase/phase-13-cp6-retention.md @@ -0,0 +1,71 @@ +# CP13-6 Replica-Aware WAL Retention — Contract Review + Proof Package + +Date: 2026-04-03 +Code change: `shipper_group.go` EvaluateRetentionBudgets + `blockvol.go` caller updates + test fix + +## Retention Contract + +### Inputs + +| Input | Source | How it's used | +|-------|--------|---------------| +| `replicaFlushedLSN` | Barrier response (CP13-3 authority) | Retention floor: WAL must keep entries from this LSN forward | +| `primaryHeadLSN` | `nextLSN.Load() - 1` | Lag calculation: head - replicaFlushed = entries the replica still needs | +| `lastContactTime` | Barrier/handshake success time | Timeout budget: how long since the replica was heard from | + +### Decision matrix + +| Condition | Action | +|-----------|--------| +| Recoverable replica needs WAL entries | Hold: flusher does not advance tail past `minRecoverableFlushedLSN` | +| Replica last contact exceeds `walRetentionTimeout` (5min) | Escalate to `NeedsRebuild`, release hold | +| Replica lag exceeds `walRetentionMaxBytes` (64MB default) | Escalate to `NeedsRebuild`, release hold | +| Replica in `NeedsRebuild` | Excluded from retention floor (`MinRecoverableFlushedLSN` skips it) | +| No recoverable replicas | No retention hold (flusher advances freely) | + +### Code path + +``` +Flusher.FlushOnce() + ├─ EvaluateRetentionBudgetsFn() → shipper_group.EvaluateRetentionBudgets(timeout, maxBytes, primaryHead) + │ ├─ for each recoverable shipper: + │ │ ├─ timeout exceeded? → state.Store(NeedsRebuild) + │ │ └─ lag * 4KB > maxBytes? → state.Store(NeedsRebuild) + │ └─ NeedsRebuild shippers excluded from future floor computation + ├─ RetentionFloorFn() → shipper_group.MinRecoverableFlushedLSN() + │ └─ returns min flushedLSN of non-NeedsRebuild shippers with prior progress + └─ if maxLSN > floorLSN: hold WAL (don't advance tail) + else: advance tail normally +``` + +## What Changed + +**`shipper_group.go`:** `EvaluateRetentionBudgets` now takes `maxBytes` and `primaryHeadLSN` params. +Checks each recoverable replica: if `(primaryHeadLSN - replicaFlushedLSN) * 4KB > maxBytes`, +transitions to `NeedsRebuild`. This is a real state effect, not a log-only placeholder. + +**`blockvol.go`:** Callers updated to pass `walRetentionMaxBytes` (64MB) and `v.nextLSN.Load()-1`. + +**`sync_all_protocol_test.go`:** `TestWalRetention_MaxBytesTriggersNeedsRebuild` rewritten from +PASS* (log-only) to real PASS: asserts `s.State() == ReplicaNeedsRebuild` after max-bytes exceeded. + +## Baseline PASS* Now Closed + +| Test | Was | Now | Why | +|------|-----|-----|-----| +| `TestWalRetention_MaxBytesTriggersNeedsRebuild` | PASS* (logged "not implemented") | PASS (asserts NeedsRebuild) | Max-bytes budget triggers real state transition | + +## Proof Promotion + +### Primary proofs + +| Test | What it proves | +|------|---------------| +| `TestWalRetention_RequiredReplicaBlocksReclaim` | Recoverable replica blocks WAL reclaim | +| `TestWalRetention_TimeoutTriggersNeedsRebuild` | Timeout budget → NeedsRebuild (releases hold) | +| `TestWalRetention_MaxBytesTriggersNeedsRebuild` | Max-bytes budget → NeedsRebuild (real state effect) | + +## What CP13-6 Does NOT Close + +- Full NeedsRebuild lifecycle / rebuild execution (CP13-7) +- `TestAdversarial_NeedsRebuildBlocksAllPaths` still FAIL (CP13-7) diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 667d2bd66..d7d5fa8ed 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -35,9 +35,15 @@ var ErrVolumeClosed = errors.New("blockvol: volume closed") // BlockVol is the core block volume engine. // walRetentionTimeout is the maximum time a recoverable replica can hold // WAL entries. After this, the replica is escalated to NeedsRebuild and -// its WAL hold is released. CP13-6: timeout-only budget (max-bytes deferred). +// its WAL hold is released. const walRetentionTimeout = 5 * time.Minute +// walRetentionMaxBytes is the maximum WAL bytes a recoverable replica can +// hold before being escalated to NeedsRebuild. When primaryHeadLSN - +// replicaFlushedLSN exceeds this (measured in WAL byte lag), the replica's +// hold is released. CP13-6: bounded max-bytes retention budget. +const walRetentionMaxBytes uint64 = 64 * 1024 * 1024 // 64MB default + type BlockVol struct { mu sync.RWMutex ioMu sync.RWMutex // guards local data mutation (WAL/dirtyMap/extent); Lock for restore/import/expand @@ -195,7 +201,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B }, EvaluateRetentionBudgetsFn: func() { if v.shipperGroup != nil { - v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout) + v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout, walRetentionMaxBytes, v.nextLSN.Load()-1) } }, }) @@ -318,7 +324,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { }, EvaluateRetentionBudgetsFn: func() { if v.shipperGroup != nil { - v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout) + v.shipperGroup.EvaluateRetentionBudgets(walRetentionTimeout, walRetentionMaxBytes, v.nextLSN.Load()-1) } }, }) diff --git a/weed/storage/blockvol/shipper_group.go b/weed/storage/blockvol/shipper_group.go index 0ab34f6f0..c6fc81c06 100644 --- a/weed/storage/blockvol/shipper_group.go +++ b/weed/storage/blockvol/shipper_group.go @@ -175,11 +175,13 @@ func (sg *ShipperGroup) MinRecoverableFlushedLSN() (uint64, bool) { return min, found } -// EvaluateRetentionBudgets checks each recoverable replica's contact time -// against the timeout. Replicas that exceed the timeout are transitioned to -// NeedsRebuild, releasing their WAL hold. Must be called before +// EvaluateRetentionBudgets checks each recoverable replica against timeout +// and max-bytes budgets. Replicas that exceed either budget are transitioned +// to NeedsRebuild, releasing their WAL hold. Must be called before // MinRecoverableFlushedLSN to ensure stale replicas are escalated first. -func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) { +// +// CP13-6: both timeout and max-bytes budgets have real state effects. +func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration, maxBytes uint64, primaryHeadLSN uint64) { sg.mu.RLock() defer sg.mu.RUnlock() for _, s := range sg.shippers { @@ -189,14 +191,33 @@ func (sg *ShipperGroup) EvaluateRetentionBudgets(timeout time.Duration) { if s.State() == ReplicaNeedsRebuild { continue } + + // Timeout budget: replica hasn't been heard from in too long. ct := s.LastContactTime() - if ct.IsZero() { - continue // no contact yet — skip - } - if time.Since(ct) > timeout { + if !ct.IsZero() && time.Since(ct) > timeout { s.state.Store(uint32(ReplicaNeedsRebuild)) log.Printf("shipper_group: retention timeout for %s (last contact %v ago), transitioning to NeedsRebuild", s.dataAddr, time.Since(ct).Round(time.Second)) + continue + } + + // Max-bytes budget: replica lag exceeds configured maximum. + // Lag is measured as (primaryHeadLSN - replicaFlushedLSN) entries. + // Each entry is roughly one block write, so lag * blockSize ≈ byte lag. + // We use entry count directly since WAL entry sizes vary. + if maxBytes > 0 && primaryHeadLSN > 0 { + replicaLSN := s.ReplicaFlushedLSN() + if primaryHeadLSN > replicaLSN { + lag := primaryHeadLSN - replicaLSN + // Conservative: treat each LSN as one 4KB entry for byte estimation. + // This is a rough upper bound — actual WAL entries include headers. + lagBytes := lag * 4096 + if lagBytes > maxBytes { + s.state.Store(uint32(ReplicaNeedsRebuild)) + log.Printf("shipper_group: retention max-bytes exceeded for %s (lag=%d entries, ~%dKB > %dKB), transitioning to NeedsRebuild", + s.dataAddr, lag, lagBytes/1024, maxBytes/1024) + } + } } } } diff --git a/weed/storage/blockvol/sync_all_protocol_test.go b/weed/storage/blockvol/sync_all_protocol_test.go index b9834d40c..6fcc0281a 100644 --- a/weed/storage/blockvol/sync_all_protocol_test.go +++ b/weed/storage/blockvol/sync_all_protocol_test.go @@ -870,7 +870,7 @@ func TestWalRetention_TimeoutTriggersNeedsRebuild(t *testing.T) { // replica lag exceeds the configured maximum retention bytes, the replica // is transitioned to NeedsRebuild and the WAL hold is released. // -// Currently EXPECTED TO FAIL: no retention max-bytes mechanism. +// CP13-6: max-bytes budget now has a real state effect. func TestWalRetention_MaxBytesTriggersNeedsRebuild(t *testing.T) { dir := t.TempDir() opts := CreateOptions{ @@ -888,7 +888,7 @@ func TestWalRetention_MaxBytesTriggersNeedsRebuild(t *testing.T) { primary.SetRole(RolePrimary) primary.SetEpoch(1) primary.SetMasterEpoch(1) - primary.lease.Grant(30 * time.Second) + primary.lease.Grant(60 * time.Second) // long lease to avoid expiry during test replica, err := CreateBlockVol(filepath.Join(dir, "replica.blk"), opts) if err != nil { @@ -915,39 +915,44 @@ func TestWalRetention_MaxBytesTriggersNeedsRebuild(t *testing.T) { t.Fatal(err) } + sg := primary.shipperGroup + s := sg.Shipper(0) + if s.State() != ReplicaInSync { + t.Fatalf("expected InSync after initial sync, got %s", s.State()) + } + replicaFlushedBefore := s.ReplicaFlushedLSN() + // Disconnect replica. recv.Stop() time.Sleep(50 * time.Millisecond) - // Write enough to far exceed any reasonable retention budget. - // 64KB WAL ≈ 15 entries at 4KB each. Write 100 entries to overflow. - for i := uint64(0); i < 100; i++ { - _ = primary.WriteLBA(i%16, makeBlock(byte('0'+i%10))) - } - - // Flush to reclaim WAL space. - primary.flusher.FlushOnce() - primary.flusher.FlushOnce() - - // After CP13-6: the lag (primaryHeadLSN - replicaFlushedLSN) exceeds - // maxRetentionBytes. The shipper should auto-transition to NeedsRebuild. - sg := primary.shipperGroup - if sg != nil { - s := sg.Shipper(0) - if s != nil { - if s.IsDegraded() { - // Good — at least degraded. After CP13-4/6, should be NeedsRebuild. - t.Log("shipper is degraded (expected NeedsRebuild after CP13-6)") - } else { - t.Log("NOTE: shipper not even degraded despite massive lag") - } + // Write a few entries — enough to create meaningful lag but not overflow the tiny WAL. + // 64KB WAL fits ~12 entries. Write 8 to stay within capacity. + for i := uint64(0); i < 8; i++ { + if err := primary.WriteLBA(i%8, makeBlock(byte('0'+i%10))); err != nil { + t.Fatalf("write %d: %v", i, err) } } - // The real assertion (after CP13-6): - // s.State() == ReplicaStateNeedsRebuild - // And WAL tail has advanced freely (not pinned). - t.Log("NOTE: max-bytes retention trigger not implemented yet — test documents the gap") + // CP13-6: Evaluate retention budgets with a small max-bytes threshold. + // The lag (~8 entries * 4KB = ~32KB) exceeds 8KB budget → NeedsRebuild. + primaryHead := primary.nextLSN.Load() - 1 + sg.EvaluateRetentionBudgets(5*time.Minute, 8*1024, primaryHead) + + // The shipper must now be NeedsRebuild (not just Degraded). + st := s.State() + if st != ReplicaNeedsRebuild { + t.Fatalf("CP13-6: expected NeedsRebuild after max-bytes exceeded, got %s", st) + } + + // The replica's flushedLSN should not have advanced (it was disconnected). + if s.ReplicaFlushedLSN() != replicaFlushedBefore { + t.Fatalf("replicaFlushedLSN should not change while disconnected: was %d, now %d", + replicaFlushedBefore, s.ReplicaFlushedLSN()) + } + + t.Logf("CP13-6: max-bytes budget triggered NeedsRebuild (lag=%d entries, replicaFlushed=%d, primaryHead=%d)", + primaryHead-replicaFlushedBefore, replicaFlushedBefore, primaryHead) } // ---------- Data integrity ---------- @@ -1527,20 +1532,49 @@ func TestBarrier_NonEligibleStates_FailClosed(t *testing.T) { } }) - // Positive case: InSync shipper would proceed to barrier (but dead address = fail at TCP level). - // This proves InSync is the only state that enters the barrier request path. - t.Run("InSync_proceeds_to_barrier", func(t *testing.T) { - shipper.state.Store(uint32(ReplicaInSync)) - err := shipper.Barrier(1) - // Will fail at TCP level (dead address), but the error should NOT be ErrReplicaDegraded - // from the state gate — it should be from the TCP path (ensureCtrlConn or write). - if err == nil { - t.Fatal("Barrier() should fail on dead address, but returned nil") + // Positive case: InSync enters the barrier request path. + // Use a fake control server to observe MsgBarrierReq receipt — this + // distinguishes "passed state gate and attempted barrier" from "rejected early". + t.Run("InSync_enters_barrier_path", func(t *testing.T) { + // Start a fake control server that records received messages. + ctrlLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ctrlLn.Close() + + barrierReceived := make(chan struct{}, 1) + go func() { + conn, err := ctrlLn.Accept() + if err != nil { + return + } + defer conn.Close() + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + msgType, _, err := ReadFrame(conn) + if err == nil && msgType == MsgBarrierReq { + barrierReceived <- struct{}{} + } + }() + + // Create a shipper pointing at the fake control server. + inSyncShipper := NewWALShipper("127.0.0.1:1", ctrlLn.Addr().String(), func() uint64 { return 1 }, nil) + defer inSyncShipper.Stop() + inSyncShipper.state.Store(uint32(ReplicaInSync)) + + // Barrier will connect, send MsgBarrierReq, then fail (server doesn't respond). + // The important thing: MsgBarrierReq was sent. + _ = inSyncShipper.Barrier(1) + + select { + case <-barrierReceived: + t.Log("InSync: MsgBarrierReq received by server — barrier path entered") + case <-time.After(3 * time.Second): + t.Fatal("InSync should have sent MsgBarrierReq but server received nothing") } - // The error proves InSync entered the barrier path (not rejected at state gate). }) - t.Log("CP13-4: all non-eligible states fail closed; only InSync proceeds to barrier") + t.Log("CP13-4: 5 sub-cases — 3 immediate reject, 1 Disconnected fail, 1 InSync barrier-path verified") } func TestReplica_FlushedLSN_OnlyAfterSync(t *testing.T) {