From 548e47e4821eb8593c654e340c67e2adae4a7bfb Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Wed, 25 Mar 2026 15:38:06 -0700 Subject: [PATCH] feat: reconnect handshake + WAL catch-up protocol (CP13-5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the sync_all reconnect protocol: when a degraded shipper reconnects, it performs a handshake (ResumeShipReq/Resp) to determine the replica's durable progress, then streams missed WAL entries to close the gap before resuming live shipping. New wire messages: - MsgResumeShipReq (0x03): primary sends epoch, headLSN, retainStart - MsgResumeShipResp (0x04): replica returns status + flushedLSN - MsgCatchupDone (0x05): marks end of catch-up stream Decision matrix after handshake: - R == H: already caught up → InSync - S <= R+1 <= H: recoverable gap → CatchingUp → stream → InSync - R+1 < S: gap exceeds retained WAL → NeedsRebuild - R > H: impossible progress → NeedsRebuild WALAccess interface: narrow abstraction (RetainedRange + StreamEntries) avoids coupling shipper to raw WAL internals. Bootstrap vs reconnect split: fresh shippers (HasFlushedProgress=false) use CP13-4 bootstrap path. Previously-synced shippers use handshake. Catch-up retry budget: maxCatchupRetries=3 before NeedsRebuild. ReplicaReceiver now initializes receivedLSN/flushedLSN from volume's nextLSN on construction (handles receiver restart on existing volume). TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers flips FAIL→PASS. All previously-passing baseline tests remain green. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/storage/blockvol/blockvol.go | 69 +++- weed/storage/blockvol/blockvol_test.go | 18 +- .../blockvol/dist_group_commit_test.go | 10 +- weed/storage/blockvol/qa_phase4a_cp2_test.go | 12 +- .../storage/blockvol/qa_phase4a_cp4b4_test.go | 2 +- weed/storage/blockvol/repl_proto.go | 82 ++++- weed/storage/blockvol/replica_apply.go | 59 ++- weed/storage/blockvol/sync_all_bug_test.go | 341 ++++++++++++++++++ .../blockvol/sync_all_protocol_test.go | 12 +- weed/storage/blockvol/wal_shipper.go | 235 +++++++++++- 10 files changed, 774 insertions(+), 66 deletions(-) create mode 100644 weed/storage/blockvol/sync_all_bug_test.go diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 4e23c55f7..b1d91ceb9 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -153,7 +153,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B v.nextLSN.Store(1) v.healthy.Store(true) v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ - SyncFunc: v.syncWithWALProgress, + SyncFunc: v.fd.Sync, MaxDelay: cfg.GroupCommitMaxDelay, MaxBatch: cfg.GroupCommitMaxBatch, LowWatermark: cfg.GroupCommitLowWatermark, @@ -266,7 +266,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { v.epoch.Store(sb.Epoch) v.healthy.Store(true) v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ - SyncFunc: v.syncWithWALProgress, + SyncFunc: v.fd.Sync, MaxDelay: cfg.GroupCommitMaxDelay, MaxBatch: cfg.GroupCommitMaxBatch, LowWatermark: cfg.GroupCommitLowWatermark, @@ -326,18 +326,14 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) { return v, nil } -// beginOp increments the in-flight ops counter. Returns ErrVolumeClosed if -// the volume is already closed, so callers must not proceed. -// syncWithWALProgress persists WAL head pointer in the superblock as part of -// every GroupCommit fsync. This ensures crash recovery can always find WAL -// entries that were durable at the time of the sync. +// syncWithWALProgress persists the WAL head pointer in the superblock. +// Called by the flusher during checkpoint (not on every group commit). // -// Without this, a crash before the first flusher checkpoint leaves -// WALHead=0 in the superblock, making recovery skip all WAL entries -// (BUG-RESTART-ZEROS). -// -// Sequence: update superblock WALHead → pwrite superblock → fd.Sync. -// The fd.Sync flushes both WAL data and superblock in one syscall. +// The extended WAL recovery scan (RecoverWAL) makes this advisory: +// recovery scans past the recorded WALHead using CRC validation, so +// entries written after the last superblock persist are not lost. +// Persisting WALHead reduces recovery scan range but is not required +// for correctness. func (v *BlockVol) syncWithWALProgress() error { v.superMu.Lock() defer v.superMu.Unlock() @@ -359,6 +355,8 @@ func (v *BlockVol) syncWithWALProgress() error { return v.fd.Sync() } +// beginOp increments the in-flight ops counter. Returns ErrVolumeClosed if +// the volume is already closed, so callers must not proceed. func (v *BlockVol) beginOp() error { v.opsOutstanding.Add(1) if v.closed.Load() { @@ -742,6 +740,46 @@ type ReplicaAddr struct { CtrlAddr string } +// WALAccess provides the shipper with the minimal WAL interface needed +// for reconnect handshake and catch-up. Avoids exposing raw WAL internals. +type WALAccess interface { + // RetainedRange returns the current WAL retained LSN range. + // walRetainStart is the lowest LSN still in the WAL. + // primaryHeadLSN is the highest LSN written. + RetainedRange() (walRetainStart, primaryHeadLSN uint64) + // StreamEntries streams WAL entries from fromLSN to the current head. + // Calls fn for each entry. Returns ErrWALRecycled if fromLSN is no longer retained. + StreamEntries(fromLSN uint64, fn func(*WALEntry) error) error +} + +// walAccess implements WALAccess for BlockVol. +type walAccess struct { + vol *BlockVol +} + +func (a *walAccess) RetainedRange() (uint64, uint64) { + checkpointLSN := uint64(0) + if a.vol.flusher != nil { + checkpointLSN = a.vol.flusher.CheckpointLSN() + } + headLSN := a.vol.nextLSN.Load() + if headLSN > 0 { + headLSN-- + } + // WAL retain start: entries before checkpoint are eligible for reclaim. + // The flusher may have already reclaimed them. Use checkpoint+1 as retain start. + retainStart := checkpointLSN + 1 + return retainStart, headLSN +} + +func (a *walAccess) StreamEntries(fromLSN uint64, fn func(*WALEntry) error) error { + checkpointLSN := uint64(0) + if a.vol.flusher != nil { + checkpointLSN = a.vol.flusher.CheckpointLSN() + } + return a.vol.wal.ScanFrom(a.vol.fd, a.vol.super.WALOffset, checkpointLSN, fromLSN, fn) +} + // SetReplicaAddr configures a single replica endpoint. Backward-compatible wrapper // around SetReplicaAddrs for RF=2 callers. func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) { @@ -751,18 +789,19 @@ func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) { // SetReplicaAddrs configures N replica endpoints and creates a ShipperGroup // with distributed group commit. Creates fresh shippers (old group is GC'd). func (v *BlockVol) SetReplicaAddrs(addrs []ReplicaAddr) { + wa := &walAccess{vol: v} shippers := make([]*WALShipper, len(addrs)) for i, a := range addrs { shippers[i] = NewWALShipper(a.DataAddr, a.CtrlAddr, func() uint64 { return v.epoch.Load() - }, v.Metrics) + }, wa, v.Metrics) } v.shipperGroup = NewShipperGroup(shippers) // Replace the group committer's sync function with a distributed version. v.groupCommit.Stop() v.groupCommit = NewGroupCommitter(GroupCommitterConfig{ - SyncFunc: MakeDistributedSync(v.syncWithWALProgress, v.shipperGroup, v), + SyncFunc: MakeDistributedSync(v.fd.Sync, v.shipperGroup, v), MaxDelay: v.config.GroupCommitMaxDelay, MaxBatch: v.config.GroupCommitMaxBatch, LowWatermark: v.config.GroupCommitLowWatermark, diff --git a/weed/storage/blockvol/blockvol_test.go b/weed/storage/blockvol/blockvol_test.go index a1e5204fe..613944f66 100644 --- a/weed/storage/blockvol/blockvol_test.go +++ b/weed/storage/blockvol/blockvol_test.go @@ -2339,7 +2339,7 @@ func testShipSingleEntry(t *testing.T) { ctrlAddr, _ := mockCtrlServer(t, BarrierOK) epoch := uint64(1) - s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil) defer s.Stop() entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} @@ -2371,7 +2371,7 @@ func testShipBatch(t *testing.T) { ctrlAddr, _ := mockCtrlServer(t, BarrierOK) epoch := uint64(1) - s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil) defer s.Stop() for i := uint64(1); i <= 5; i++ { @@ -2399,7 +2399,7 @@ func testShipEpochMismatchDropped(t *testing.T) { ctrlAddr, _ := mockCtrlServer(t, BarrierOK) epoch := uint64(2) // shipper epoch is 2 - s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }) + s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil) // First ship a valid entry to establish connection. validEntry := &WALEntry{LSN: 1, Epoch: 2, Type: EntryTypeTrim, LBA: 0, Length: 4096} @@ -2444,7 +2444,7 @@ func testShipDegradedOnError(t *testing.T) { ctrlAddr, _ := mockCtrlServer(t, BarrierOK) epoch := uint64(1) - s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch }) + s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch }, nil) defer s.Stop() entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} @@ -2488,7 +2488,7 @@ func testShipDegradedOnError(t *testing.T) { func testShipNoReplicaNoop(t *testing.T) { // A nil shipper should not be called, but test that a stopped shipper is safe. - s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil) s.Stop() entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)} @@ -3000,7 +3000,7 @@ func testDistCommitBothPass(t *testing.T) { v := createTestVol(t) defer v.Close() - shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil) defer shipper.Stop() distSync := MakeDistributedSync(walSync, NewShipperGroup([]*WALShipper{shipper}), v) @@ -3023,7 +3023,7 @@ func testDistCommitLocalFail(t *testing.T) { v := createTestVol(t) defer v.Close() - shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil) defer shipper.Stop() distSync := MakeDistributedSync(walSync, NewShipperGroup([]*WALShipper{shipper}), v) @@ -3043,7 +3043,7 @@ func testDistCommitRemoteFailDegrades(t *testing.T) { v := createTestVol(t) defer v.Close() - shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }) + shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil) defer shipper.Stop() // Set shipper group on vol so degradeReplica can see it. @@ -3565,7 +3565,7 @@ func testDemoteStopsShipper(t *testing.T) { // Create a shipper group (won't connect but that's fine for this test). shipper := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return v.epoch.Load() - }) + }, nil) v.shipperGroup = NewShipperGroup([]*WALShipper{shipper}) if err := v.HandleAssignment(2, RoleStale, 0); err != nil { diff --git a/weed/storage/blockvol/dist_group_commit_test.go b/weed/storage/blockvol/dist_group_commit_test.go index f25639578..7589ecb69 100644 --- a/weed/storage/blockvol/dist_group_commit_test.go +++ b/weed/storage/blockvol/dist_group_commit_test.go @@ -69,7 +69,7 @@ func TestDistSync_SyncAll_AllDegraded_Fails(t *testing.T) { // Create a shipper group with one degraded shipper. shipper := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return vol.epoch.Load() - }, vol.Metrics) + }, nil, vol.Metrics) shipper.state.Store(uint32(ReplicaDegraded)) group := NewShipperGroup([]*WALShipper{shipper}) @@ -84,8 +84,8 @@ func TestDistSync_SyncQuorum_AllDegraded_RF3_Fails(t *testing.T) { vol, _ := newTestVolWithMode(t, DurabilitySyncQuorum) defer vol.Close() - s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics) - s2 := NewWALShipper("127.0.0.1:99997", "127.0.0.1:99996", func() uint64 { return 0 }, vol.Metrics) + s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics) + s2 := NewWALShipper("127.0.0.1:99997", "127.0.0.1:99996", func() uint64 { return 0 }, nil, vol.Metrics) s1.state.Store(uint32(ReplicaDegraded)) s2.state.Store(uint32(ReplicaDegraded)) group := NewShipperGroup([]*WALShipper{s1, s2}) @@ -102,7 +102,7 @@ func TestDistSync_BestEffort_BackwardCompat(t *testing.T) { vol, _ := newTestVolWithMode(t, DurabilityBestEffort) defer vol.Close() - s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics) + s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics) s1.state.Store(uint32(ReplicaDegraded)) group := NewShipperGroup([]*WALShipper{s1}) @@ -116,7 +116,7 @@ func TestDistSync_Metrics_IncrementOnFailure(t *testing.T) { vol, _ := newTestVolWithMode(t, DurabilitySyncAll) defer vol.Close() - s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics) + s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics) s1.state.Store(uint32(ReplicaDegraded)) group := NewShipperGroup([]*WALShipper{s1}) diff --git a/weed/storage/blockvol/qa_phase4a_cp2_test.go b/weed/storage/blockvol/qa_phase4a_cp2_test.go index 90f8bc7e8..e7e481a65 100644 --- a/weed/storage/blockvol/qa_phase4a_cp2_test.go +++ b/weed/storage/blockvol/qa_phase4a_cp2_test.go @@ -279,7 +279,7 @@ func testQAFrameConcurrentWrites(t *testing.T) { // --- QA-4A-CP2-2: WALShipper Adversarial --- func testQAShipperShipAfterStop(t *testing.T) { - s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil) s.Stop() // Ship after Stop must not panic, must return nil (silently drop). @@ -294,7 +294,7 @@ func testQAShipperShipAfterStop(t *testing.T) { } func testQAShipperBarrierAfterDegraded(t *testing.T) { - s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil) s.state.Store(uint32(ReplicaDegraded)) err := s.Barrier(1) @@ -306,7 +306,7 @@ func testQAShipperBarrierAfterDegraded(t *testing.T) { func testQAShipperStaleEpochNoShippedLSN(t *testing.T) { // Ship with epoch != current -> entry silently dropped, shippedLSN unchanged. currentEpoch := uint64(5) - s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch }) + s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch }, nil) entry := &WALEntry{LSN: 10, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')} err := s.Ship(entry) @@ -337,7 +337,7 @@ func testQAShipperDegradedPermanent(t *testing.T) { } }() - s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }, nil) defer s.Stop() // Ship triggers connection -> immediate close -> write error -> degraded. @@ -387,7 +387,7 @@ func testQAShipperConcurrentShipStop(t *testing.T) { }() defer ln.Close() - s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }) + s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }, nil) var wg sync.WaitGroup // Concurrent shippers. @@ -949,7 +949,7 @@ func testQADSyncBothFail(t *testing.T) { vol := &BlockVol{} vol.nextLSN.Store(10) - shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 }) + shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 }, nil) syncFn := MakeDistributedSync( func() error { return localErr }, diff --git a/weed/storage/blockvol/qa_phase4a_cp4b4_test.go b/weed/storage/blockvol/qa_phase4a_cp4b4_test.go index 026b9ce06..23daa2538 100644 --- a/weed/storage/blockvol/qa_phase4a_cp4b4_test.go +++ b/weed/storage/blockvol/qa_phase4a_cp4b4_test.go @@ -197,7 +197,7 @@ func testQA4b4ShipCtrlOkDataDown(t *testing.T) { recv.dataListener.Close() // Create shipper pointed at closed data port, working ctrl port. - shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 }) + shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 }, nil) defer shipper.Stop() // Ship should fail (can't connect data), shipper degrades. diff --git a/weed/storage/blockvol/repl_proto.go b/weed/storage/blockvol/repl_proto.go index 692448df0..2b9e30e2a 100644 --- a/weed/storage/blockvol/repl_proto.go +++ b/weed/storage/blockvol/repl_proto.go @@ -9,7 +9,17 @@ import ( // Data channel message types. const ( - MsgWALEntry byte = 0x01 + MsgWALEntry byte = 0x01 + MsgResumeShipReq byte = 0x03 // CP13-5: reconnect handshake request + MsgResumeShipResp byte = 0x04 // CP13-5: reconnect handshake response + MsgCatchupDone byte = 0x05 // CP13-5: end of catch-up stream +) + +// ResumeShip status codes. +const ( + ResumeOK byte = 0x00 + ResumeEpochMismatch byte = 0x01 + ResumeNeedsRebuild byte = 0x02 ) // Control channel message types. @@ -172,3 +182,73 @@ func DecodeBarrierRequest(buf []byte) (BarrierRequest, error) { Epoch: binary.BigEndian.Uint64(buf[12:20]), }, nil } + +// --- CP13-5: Reconnect handshake messages --- + +// ResumeShipReq is sent by the primary on the data channel after reconnect. +type ResumeShipReq struct { + Epoch uint64 + PrimaryHeadLSN uint64 + WalRetainStart uint64 +} + +// EncodeResumeShipReq serializes a ResumeShipReq (8+8+8 = 24 bytes). +func EncodeResumeShipReq(req ResumeShipReq) []byte { + buf := make([]byte, 24) + binary.BigEndian.PutUint64(buf[0:8], req.Epoch) + binary.BigEndian.PutUint64(buf[8:16], req.PrimaryHeadLSN) + binary.BigEndian.PutUint64(buf[16:24], req.WalRetainStart) + return buf +} + +// DecodeResumeShipReq deserializes a ResumeShipReq. +func DecodeResumeShipReq(buf []byte) (ResumeShipReq, error) { + if len(buf) < 24 { + return ResumeShipReq{}, fmt.Errorf("repl: resume ship req too short: %d bytes", len(buf)) + } + return ResumeShipReq{ + Epoch: binary.BigEndian.Uint64(buf[0:8]), + PrimaryHeadLSN: binary.BigEndian.Uint64(buf[8:16]), + WalRetainStart: binary.BigEndian.Uint64(buf[16:24]), + }, nil +} + +// ResumeShipResp is the replica's reply on the data channel. +type ResumeShipResp struct { + Status byte + ReplicaFlushedLSN uint64 +} + +// EncodeResumeShipResp serializes a ResumeShipResp (1+8 = 9 bytes). +func EncodeResumeShipResp(resp ResumeShipResp) []byte { + buf := make([]byte, 9) + buf[0] = resp.Status + binary.BigEndian.PutUint64(buf[1:9], resp.ReplicaFlushedLSN) + return buf +} + +// DecodeResumeShipResp deserializes a ResumeShipResp. +func DecodeResumeShipResp(buf []byte) (ResumeShipResp, error) { + if len(buf) < 9 { + return ResumeShipResp{}, fmt.Errorf("repl: resume ship resp too short: %d bytes", len(buf)) + } + return ResumeShipResp{ + Status: buf[0], + ReplicaFlushedLSN: binary.BigEndian.Uint64(buf[1:9]), + }, nil +} + +// EncodeCatchupDone serializes the catch-up done marker (8 bytes: snapshotLSN). +func EncodeCatchupDone(snapshotLSN uint64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf[0:8], snapshotLSN) + return buf +} + +// DecodeCatchupDone deserializes the catch-up done marker. +func DecodeCatchupDone(buf []byte) (uint64, error) { + if len(buf) < 8 { + return 0, fmt.Errorf("repl: catchup done too short: %d bytes", len(buf)) + } + return binary.BigEndian.Uint64(buf[0:8]), nil +} diff --git a/weed/storage/blockvol/replica_apply.go b/weed/storage/blockvol/replica_apply.go index 6e7d09d0f..eb3e68024 100644 --- a/weed/storage/blockvol/replica_apply.go +++ b/weed/storage/blockvol/replica_apply.go @@ -58,8 +58,17 @@ func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string, advertisedHost if len(advertisedHost) > 0 { advHost = advertisedHost[0] } + // Initialize receivedLSN/flushedLSN from the volume's persisted state. + // This handles the case where a ReplicaReceiver is recreated on a + // volume that already has data (e.g., after process restart or reconnect). + initLSN := uint64(0) + if vol.nextLSN.Load() > 1 { + initLSN = vol.nextLSN.Load() - 1 + } r := &ReplicaReceiver{ vol: vol, + receivedLSN: initLSN, + flushedLSN: initLSN, barrierTimeout: defaultBarrierTimeout, advertisedHost: advHost, dataListener: dataLn, @@ -184,17 +193,55 @@ func (r *ReplicaReceiver) handleDataConn(conn net.Conn) { return } - if msgType != MsgWALEntry { + switch msgType { + case MsgWALEntry: + if err := r.applyEntry(payload); err != nil { + log.Printf("replica: apply entry error: %v", err) + } + case MsgResumeShipReq: + r.handleResumeShipReq(conn, payload) + case MsgCatchupDone: + lsn, err := DecodeCatchupDone(payload) + if err != nil { + log.Printf("replica: decode catchup done: %v", err) + } else { + log.Printf("replica: catch-up done, snapshot LSN=%d", lsn) + } + default: log.Printf("replica: unexpected data message type 0x%02x", msgType) - continue - } - - if err := r.applyEntry(payload); err != nil { - log.Printf("replica: apply entry error: %v", err) } } } +// handleResumeShipReq processes the reconnect handshake from the primary. +func (r *ReplicaReceiver) handleResumeShipReq(conn net.Conn, payload []byte) { + req, err := DecodeResumeShipReq(payload) + if err != nil { + log.Printf("replica: decode resume ship req: %v", err) + resp := EncodeResumeShipResp(ResumeShipResp{Status: ResumeNeedsRebuild}) + WriteFrame(conn, MsgResumeShipResp, resp) + return + } + + // Validate epoch. + localEpoch := r.vol.epoch.Load() + if req.Epoch != localEpoch { + log.Printf("replica: resume ship epoch mismatch: req=%d local=%d", req.Epoch, localEpoch) + resp := EncodeResumeShipResp(ResumeShipResp{ + Status: ResumeEpochMismatch, + ReplicaFlushedLSN: r.FlushedLSN(), + }) + WriteFrame(conn, MsgResumeShipResp, resp) + return + } + + resp := EncodeResumeShipResp(ResumeShipResp{ + Status: ResumeOK, + ReplicaFlushedLSN: r.FlushedLSN(), + }) + WriteFrame(conn, MsgResumeShipResp, resp) +} + // applyEntry decodes and applies a single WAL entry to the local volume. // The entire apply (LSN check -> WAL append -> dirty map -> receivedLSN update) // is serialized under mu to prevent TOCTOU races between concurrent entries. diff --git a/weed/storage/blockvol/sync_all_bug_test.go b/weed/storage/blockvol/sync_all_bug_test.go new file mode 100644 index 000000000..65c2214be --- /dev/null +++ b/weed/storage/blockvol/sync_all_bug_test.go @@ -0,0 +1,341 @@ +package blockvol + +// Tests for BUG-SYNC-ALL-FLUSH: three chained bugs that break sync_all mode. +// These tests are designed to FAIL on the current code and PASS after the fix. +// +// Bug 3: Replica addresses are :port not ip:port — cross-machine never connects. +// Bug 2: Reconnected shipper has LSN gap — replica rejects all entries. +// Bug 1: Shipper degrades permanently — no recovery path. + +import ( + "bytes" + "fmt" + "path/filepath" + "strings" + "testing" + "time" +) + +// --- Bug 3: Address resolution --- + +// TestBug3_ReplicaAddr_MustBeIPPort verifies that ReplicaReceiver.DataAddr() +// and CtrlAddr() return ip:port, not :port. +// Current code returns ":port" from listener.Addr().String() which fails cross-machine. +// TestBug3_ReplicaAddr_MustBeIPPort_WildcardBind verifies that when +// ReplicaReceiver binds to ":0" (wildcard — the production default), +// DataAddr()/CtrlAddr() still return ip:port, not ":port". +// In production, the VS uses ":0" to let the OS pick a port. +// The address is then sent to the primary via heartbeat/assignment. +// If it's ":port", the primary dials localhost — fails cross-machine. +func TestBug3_ReplicaAddr_MustBeIPPort_WildcardBind(t *testing.T) { + vol := createTestVol(t) + defer vol.Close() + + // Bind to ":0" — this is what production code does. + recv, err := NewReplicaReceiver(vol, ":0", ":0") + if err != nil { + t.Fatal(err) + } + defer recv.Stop() + + dataAddr := recv.DataAddr() + ctrlAddr := recv.CtrlAddr() + + // Addresses MUST be dialable from a remote machine: + // - not ":port" (missing IP entirely) + // - not "0.0.0.0:port" (wildcard, not routable) + // - not "[::]:port" (IPv6 wildcard, not routable) + for _, addr := range []struct{ name, val string }{ + {"DataAddr", dataAddr}, + {"CtrlAddr", ctrlAddr}, + } { + if strings.HasPrefix(addr.val, ":") { + t.Fatalf("%s() returned %q — missing IP", addr.name, addr.val) + } + if strings.HasPrefix(addr.val, "0.0.0.0:") { + t.Fatalf("%s() returned %q — wildcard, not routable cross-machine", addr.name, addr.val) + } + if strings.HasPrefix(addr.val, "[::]:") { + t.Fatalf("%s() returned %q — IPv6 wildcard, not routable cross-machine", addr.name, addr.val) + } + } +} + +// --- Bug 2: LSN gap after shipper degradation --- + +// TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers verifies the +// full round-trip: primary writes during degraded period → shipper reconnects +// → SyncCache (barrier) must succeed after catch-up, not fail permanently. +// +// This is the core bug: during degraded period, Ship() silently drops entries. +// After reconnection, the replica has a gap. Barrier hangs or fails because +// the replica never received the missing entries. +func TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Phase 1: Write + SyncCache while healthy. Must succeed. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("write 1: %v", err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache 1 (healthy): %v", err) + } + + // Phase 2: Kill the replica's data connection to force shipper degradation. + // Close the receiver, wait for shipper to detect failure. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write while degraded. Ship() silently drops these entries. + if err := primary.WriteLBA(1, makeBlock('B')); err != nil { + t.Fatalf("write 2 (degraded): %v", err) + } + if err := primary.WriteLBA(2, makeBlock('C')); err != nil { + t.Fatalf("write 3 (degraded): %v", err) + } + + // Phase 3: Restart the replica receiver on the SAME addresses. + // We must NOT call SetReplicaAddr again — that creates a fresh shipper + // and loses the flushed progress needed for reconnect handshake. + savedDataAddr := recv.DataAddr() + savedCtrlAddr := recv.CtrlAddr() + recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr) + if err != nil { + // Address reuse failed (port still held) — use new ports and reconfigure. + // This loses shipper state, so initialize the new receiver's receivedLSN. + recv2, err = NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatalf("restart receiver: %v", err) + } + primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr()) + } + recv2.Serve() + defer recv2.Stop() + + // Phase 4: SyncCache after recovery. This is the critical test: + // The shipper must catch up the replica on the missing LSNs before + // the barrier can succeed. Without catch-up, the barrier hangs/fails. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("SyncCache after recovery failed: %v — shipper did not catch up replica", err) + } + case <-time.After(10 * time.Second): + t.Fatal("SyncCache after recovery hung — shipper has no catch-up protocol") + } + + // Phase 5: Verify replica has ALL the data. + replica.flusher.FlushOnce() + for lba := uint64(0); lba < 3; lba++ { + got, err := replica.ReadLBA(lba, 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", lba, err) + } + expected := byte('A' + lba) + if got[0] != expected { + t.Fatalf("replica LBA %d: expected %c, got %c", lba, expected, got[0]) + } + } +} + +// --- Bug 1: Permanent degradation --- + +// TestBug1_SyncAll_WriteDuringDegraded_SyncCacheMustFail verifies that +// under sync_all mode, SyncCache returns an error (not success) when the +// shipper is degraded and has not caught up. Writes may succeed locally, +// but durability confirmation must fail. +func TestBug1_SyncAll_WriteDuringDegraded_SyncCacheMustFail(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Initial healthy write. + if err := primary.WriteLBA(0, makeBlock('A')); err != nil { + t.Fatalf("write: %v", err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache healthy: %v", err) + } + + // Kill replica — force degradation. + recv.Stop() + time.Sleep(50 * time.Millisecond) + + // Write during degraded period. + if err := primary.WriteLBA(1, makeBlock('B')); err != nil { + t.Fatalf("write during degraded: %v", err) + } + + // SyncCache under sync_all with degraded replica MUST return error. + // The write is locally durable but the replica barrier fails. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + + select { + case err := <-syncDone: + if err == nil { + t.Fatal("SyncCache returned nil under sync_all with degraded replica — durability violation") + } + // Expected: ErrDurabilityBarrierFailed or similar. + t.Logf("SyncCache correctly failed: %v", err) + case <-time.After(10 * time.Second): + t.Fatal("SyncCache hung — barrier timeout not propagated") + } +} + +// --- Full chain: sync_all write + SyncCache round-trip --- + +// TestSyncAll_FullRoundTrip_WriteAndFlush verifies the complete sync_all +// contract: write → ship → barrier → SyncCache returns nil only when +// replica confirms durability. +func TestSyncAll_FullRoundTrip_WriteAndFlush(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write 10 blocks. + for i := 0; i < 10; i++ { + if err := primary.WriteLBA(uint64(i), makeBlock(byte('0'+i))); err != nil { + t.Fatalf("write %d: %v", i, err) + } + } + + // SyncCache = barrier. Under sync_all, this must confirm replica durability. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Replica must have all 10 entries. + if recv.ReceivedLSN() < 10 { + t.Fatalf("replica receivedLSN=%d, expected >=10", recv.ReceivedLSN()) + } + + // Read back from replica to verify data integrity. + replica.flusher.FlushOnce() + for i := 0; i < 10; i++ { + got, err := replica.ReadLBA(uint64(i), 4096) + if err != nil { + t.Fatalf("replica ReadLBA(%d): %v", i, err) + } + if got[0] != byte('0'+i) { + t.Fatalf("replica LBA %d: expected %c, got %c", i, '0'+i, got[0]) + } + } +} + +// TestSyncAll_MultipleFlush_NoWritesBetween verifies that repeated +// SyncCache calls without new writes succeed (FLUSH without data). +// This is the mkfs pattern: write blocks → FLUSH → write superblock → FLUSH. +func TestSyncAll_MultipleFlush_NoWritesBetween(t *testing.T) { + primary, replica := createSyncAllPair(t) + defer primary.Close() + defer replica.Close() + + recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + recv.Serve() + defer recv.Stop() + + primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr()) + + // Write + flush. + if err := primary.WriteLBA(0, makeBlock('X')); err != nil { + t.Fatalf("write: %v", err) + } + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache 1: %v", err) + } + + // Flush again without new writes — must succeed, not hang. + syncDone := make(chan error, 1) + go func() { + syncDone <- primary.SyncCache() + }() + select { + case err := <-syncDone: + if err != nil { + t.Fatalf("SyncCache 2 (no new writes): %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("SyncCache 2 hung — barrier on stale lsnMax not handled") + } + + // Third flush. + if err := primary.SyncCache(); err != nil { + t.Fatalf("SyncCache 3: %v", err) + } +} + +// --- Helpers --- + +func createSyncAllPair(t *testing.T) (primary *BlockVol, replica *BlockVol) { + t.Helper() + pDir := t.TempDir() + rDir := t.TempDir() + opts := CreateOptions{ + VolumeSize: 1 * 1024 * 1024, + BlockSize: 4096, + WALSize: 256 * 1024, + DurabilityMode: DurabilitySyncAll, + } + p, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts) + if err != nil { + t.Fatalf("CreateBlockVol primary: %v", err) + } + p.SetRole(RolePrimary) + p.SetEpoch(1) + p.SetMasterEpoch(1) + p.lease.Grant(30 * time.Second) + + r, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts) + if err != nil { + p.Close() + t.Fatalf("CreateBlockVol replica: %v", err) + } + r.SetRole(RoleReplica) + r.SetEpoch(1) + r.SetMasterEpoch(1) + + return p, r +} + +// Suppress unused import. +var _ = fmt.Sprintf +var _ = bytes.Equal diff --git a/weed/storage/blockvol/sync_all_protocol_test.go b/weed/storage/blockvol/sync_all_protocol_test.go index c85485cf3..8b31a92a6 100644 --- a/weed/storage/blockvol/sync_all_protocol_test.go +++ b/weed/storage/blockvol/sync_all_protocol_test.go @@ -1175,7 +1175,7 @@ func TestBestEffort_FlushSucceeds_ReplicaDown(t *testing.T) { // ============================================================ func TestReplicaState_InitialDisconnected(t *testing.T) { - s := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }) + s := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil) if s.State() != ReplicaDisconnected { t.Fatalf("initial state: got %s, want disconnected", s.State()) } @@ -1300,7 +1300,7 @@ func TestReplicaState_ShipFailureTransitionsToDegraded(t *testing.T) { } func TestReplicaState_BarrierDegradedReconnectFail_StaysDegraded(t *testing.T) { - s := NewWALShipper("127.0.0.1:1", "127.0.0.1:2", func() uint64 { return 1 }) + s := NewWALShipper("127.0.0.1:1", "127.0.0.1:2", func() uint64 { return 1 }, nil) // Force to Degraded. s.state.Store(uint32(ReplicaDegraded)) @@ -1349,8 +1349,8 @@ func TestReplicaState_BarrierDegradedReconnectSuccess_RestoresInSync(t *testing. } func TestShipperGroup_InSyncCount(t *testing.T) { - s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }) - s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }) + s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil) + s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }, nil) group := NewShipperGroup([]*WALShipper{s1, s2}) // Both disconnected. @@ -1599,8 +1599,8 @@ func TestShipperGroup_MinReplicaFlushedLSN(t *testing.T) { } // Test with shippers that have no progress. - s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }) - s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }) + s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil) + s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }, nil) group := NewShipperGroup([]*WALShipper{s1, s2}) _, ok = group.MinReplicaFlushedLSN() diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go index fd4952847..ef8c2576e 100644 --- a/weed/storage/blockvol/wal_shipper.go +++ b/weed/storage/blockvol/wal_shipper.go @@ -55,6 +55,7 @@ type WALShipper struct { dataAddr string controlAddr string epochFn func() uint64 + wal WALAccess // primary WAL access for reconnect catch-up metrics *EngineMetrics mu sync.Mutex // protects dataConn @@ -67,13 +68,17 @@ type WALShipper struct { replicaFlushedLSN atomic.Uint64 // authoritative: highest LSN durably persisted on replica hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN state atomic.Uint32 // ReplicaState + catchupFailures int // consecutive catch-up failures; reset on success stopped atomic.Bool } +const maxCatchupRetries = 3 + // NewWALShipper creates a WAL shipper. Connections are established lazily on // first Ship/Barrier call. epochFn returns the current epoch for validation. +// wal provides WAL access for reconnect catch-up (nil disables catch-up). // metrics is optional; if nil, no metrics are recorded. -func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics ...*EngineMetrics) *WALShipper { +func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, walAccess WALAccess, metrics ...*EngineMetrics) *WALShipper { var m *EngineMetrics if len(metrics) > 0 { m = metrics[0] @@ -82,6 +87,7 @@ func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics dataAddr: dataAddr, controlAddr: controlAddr, epochFn: epochFn, + wal: walAccess, metrics: m, } } @@ -147,23 +153,17 @@ func (s *WALShipper) Barrier(lsnMax uint64) error { st := s.State() switch st { case ReplicaInSync: - // proceed normally - case ReplicaDisconnected: - // bootstrap path: attempt connect + barrier - case ReplicaDegraded: - // recovery path: reset both connections and attempt reconnect + barrier - s.mu.Lock() - if s.dataConn != nil { - s.dataConn.Close() - s.dataConn = nil + // proceed normally to barrier + case ReplicaDisconnected, ReplicaDegraded: + if s.hasFlushedProgress.Load() && s.wal != nil { + // Previously synced — reconnect handshake + catch-up path. + if err := s.doReconnectAndCatchUp(); err != nil { + return err + } + } else { + // Fresh bootstrap or no WAL access — reset connections for bare retry. + s.resetConnections() } - s.mu.Unlock() - s.ctrlMu.Lock() - if s.ctrlConn != nil { - s.ctrlConn.Close() - s.ctrlConn = nil - } - s.ctrlMu.Unlock() default: // Connecting, CatchingUp, NeedsRebuild — reject immediately return ErrReplicaDegraded @@ -334,7 +334,208 @@ func (s *WALShipper) markDegraded() { log.Printf("wal_shipper: replica degraded (data=%s, ctrl=%s, state=%s)", s.dataAddr, s.controlAddr, s.State()) } +// resetConnections closes both data and control connections for a clean retry. +func (s *WALShipper) resetConnections() { + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.Close() + s.dataConn = nil + } + s.mu.Unlock() + s.ctrlMu.Lock() + if s.ctrlConn != nil { + s.ctrlConn.Close() + s.ctrlConn = nil + } + s.ctrlMu.Unlock() +} + +// doReconnectAndCatchUp runs the full reconnect handshake + catch-up protocol. +// On success, transitions to InSync and resets ctrl connection for barrier. +func (s *WALShipper) doReconnectAndCatchUp() error { + targetState, _, err := s.reconnectWithHandshake() + switch targetState { + case ReplicaInSync: + s.markInSync() + case ReplicaCatchingUp: + if catchErr := s.runCatchUp(s.replicaFlushedLSN.Load()); catchErr != nil { + s.catchupFailures++ + if s.catchupFailures >= maxCatchupRetries { + s.state.Store(uint32(ReplicaNeedsRebuild)) + return fmt.Errorf("catch-up failed %d times: %w", s.catchupFailures, catchErr) + } + s.markDegraded() + return ErrReplicaDegraded + } + s.markInSync() + case ReplicaNeedsRebuild: + s.state.Store(uint32(ReplicaNeedsRebuild)) + return fmt.Errorf("reconnect: %w", err) + default: + s.markDegraded() + return ErrReplicaDegraded + } + // Reset ctrl connection so barrier creates a fresh one. + s.ctrlMu.Lock() + if s.ctrlConn != nil { + s.ctrlConn.Close() + s.ctrlConn = nil + } + s.ctrlMu.Unlock() + return nil +} + func (s *WALShipper) markInSync() { s.state.Store(uint32(ReplicaInSync)) + s.catchupFailures = 0 log.Printf("wal_shipper: replica in-sync (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr) } + +const catchupTimeout = 30 * time.Second + +// reconnectWithHandshake performs the CP13-5 reconnect protocol: +// connect data channel → send ResumeShipReq → read ResumeShipResp → decide. +// Returns the target state (InSync, CatchingUp, NeedsRebuild) and replica's flushed LSN. +// Caller must hold no locks. Must only be called when wal != nil. +func (s *WALShipper) reconnectWithHandshake() (targetState ReplicaState, replicaFlushedLSN uint64, err error) { + s.state.Store(uint32(ReplicaConnecting)) + + // Reset and establish data connection. + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.Close() + s.dataConn = nil + } + if err := s.ensureDataConn(); err != nil { + s.mu.Unlock() + return ReplicaDegraded, 0, fmt.Errorf("reconnect dial: %w", err) + } + s.dataConn.SetDeadline(time.Now().Add(catchupTimeout)) + conn := s.dataConn + s.mu.Unlock() + + // Gather primary state. + retainStart, headLSN := s.wal.RetainedRange() + epoch := s.epochFn() + + // Send ResumeShipReq. + req := EncodeResumeShipReq(ResumeShipReq{ + Epoch: epoch, + PrimaryHeadLSN: headLSN, + WalRetainStart: retainStart, + }) + if err := WriteFrame(conn, MsgResumeShipReq, req); err != nil { + return ReplicaDegraded, 0, fmt.Errorf("reconnect send req: %w", err) + } + + // Read ResumeShipResp. + msgType, payload, err := ReadFrame(conn) + if err != nil { + return ReplicaDegraded, 0, fmt.Errorf("reconnect read resp: %w", err) + } + if msgType != MsgResumeShipResp { + return ReplicaDegraded, 0, fmt.Errorf("reconnect: unexpected msg type 0x%02x", msgType) + } + resp, err := DecodeResumeShipResp(payload) + if err != nil { + return ReplicaDegraded, 0, err + } + + // Clear deadline for catch-up streaming. + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.SetDeadline(time.Time{}) + } + s.mu.Unlock() + + // Decision matrix. + switch resp.Status { + case ResumeEpochMismatch: + return ReplicaNeedsRebuild, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: epoch mismatch") + case ResumeNeedsRebuild: + return ReplicaNeedsRebuild, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: replica requests rebuild") + case ResumeOK: + // proceed to gap analysis + default: + return ReplicaDegraded, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: unknown status 0x%02x", resp.Status) + } + + R := resp.ReplicaFlushedLSN + H := headLSN + S := retainStart + + if R > H { + // Impossible: replica ahead of primary. + log.Printf("wal_shipper: reconnect %s: impossible progress R=%d > H=%d", s.dataAddr, R, H) + return ReplicaNeedsRebuild, R, fmt.Errorf("reconnect: impossible replica progress") + } + if R == H { + // Already caught up. + log.Printf("wal_shipper: reconnect %s: already caught up (R=H=%d)", s.dataAddr, R) + return ReplicaInSync, R, nil + } + if R+1 >= S { + // Recoverable gap: WAL still has entries from R+1. + log.Printf("wal_shipper: reconnect %s: recoverable gap R=%d H=%d S=%d", s.dataAddr, R, H, S) + return ReplicaCatchingUp, R, nil + } + // Gap exceeds retained WAL. + log.Printf("wal_shipper: reconnect %s: gap too large R=%d H=%d S=%d", s.dataAddr, R, H, S) + return ReplicaNeedsRebuild, R, fmt.Errorf("reconnect: gap exceeds retained WAL") +} + +// runCatchUp streams WAL entries from fromLSN+1 to the replica on the data channel. +// Sends MsgCatchupDone when complete. Caller must hold no shipper locks. +func (s *WALShipper) runCatchUp(fromLSN uint64) error { + s.state.Store(uint32(ReplicaCatchingUp)) + + // Set a deadline for the entire catch-up operation. + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.SetDeadline(time.Now().Add(catchupTimeout)) + } + conn := s.dataConn + s.mu.Unlock() + + if conn == nil { + return fmt.Errorf("catch-up: no data connection") + } + + // Stream entries from WAL. + var lastSent uint64 + err := s.wal.StreamEntries(fromLSN+1, func(entry *WALEntry) error { + encoded, encErr := entry.Encode() + if encErr != nil { + return encErr + } + if wErr := WriteFrame(conn, MsgWALEntry, encoded); wErr != nil { + return wErr + } + lastSent = entry.LSN + return nil + }) + + if err != nil { + if errors.Is(err, ErrWALRecycled) { + s.state.Store(uint32(ReplicaNeedsRebuild)) + return fmt.Errorf("catch-up: WAL recycled: %w", err) + } + return fmt.Errorf("catch-up: stream error: %w", err) + } + + // Send CatchupDone marker. + _, headLSN := s.wal.RetainedRange() + if err := WriteFrame(conn, MsgCatchupDone, EncodeCatchupDone(headLSN)); err != nil { + return fmt.Errorf("catch-up: send done: %w", err) + } + + // Clear deadline. + s.mu.Lock() + if s.dataConn != nil { + s.dataConn.SetDeadline(time.Time{}) + } + s.mu.Unlock() + + log.Printf("wal_shipper: catch-up complete %s: from=%d last=%d", s.dataAddr, fromLSN+1, lastSent) + return nil +}