feat: reconnect handshake + WAL catch-up protocol (CP13-5)

Adds the sync_all reconnect protocol: when a degraded shipper
reconnects, it performs a handshake (ResumeShipReq/Resp) to
determine the replica's durable progress, then streams missed
WAL entries to close the gap before resuming live shipping.

New wire messages:
- MsgResumeShipReq (0x03): primary sends epoch, headLSN, retainStart
- MsgResumeShipResp (0x04): replica returns status + flushedLSN
- MsgCatchupDone (0x05): marks end of catch-up stream

Decision matrix after handshake:
- R == H: already caught up → InSync
- S <= R+1 <= H: recoverable gap → CatchingUp → stream → InSync
- R+1 < S: gap exceeds retained WAL → NeedsRebuild
- R > H: impossible progress → NeedsRebuild

WALAccess interface: narrow abstraction (RetainedRange + StreamEntries)
avoids coupling shipper to raw WAL internals.

Bootstrap vs reconnect split: fresh shippers (HasFlushedProgress=false)
use CP13-4 bootstrap path. Previously-synced shippers use handshake.

Catch-up retry budget: maxCatchupRetries=3 before NeedsRebuild.

ReplicaReceiver now initializes receivedLSN/flushedLSN from volume's
nextLSN on construction (handles receiver restart on existing volume).

TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers flips FAIL→PASS.
All previously-passing baseline tests remain green.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ping Qiu
2026-03-25 15:38:06 -07:00
parent 8d6379f841
commit 548e47e482
10 changed files with 774 additions and 66 deletions

View File

@@ -153,7 +153,7 @@ func CreateBlockVol(path string, opts CreateOptions, cfgs ...BlockVolConfig) (*B
v.nextLSN.Store(1)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: v.syncWithWALProgress,
SyncFunc: v.fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
@@ -266,7 +266,7 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
v.epoch.Store(sb.Epoch)
v.healthy.Store(true)
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: v.syncWithWALProgress,
SyncFunc: v.fd.Sync,
MaxDelay: cfg.GroupCommitMaxDelay,
MaxBatch: cfg.GroupCommitMaxBatch,
LowWatermark: cfg.GroupCommitLowWatermark,
@@ -326,18 +326,14 @@ func OpenBlockVol(path string, cfgs ...BlockVolConfig) (*BlockVol, error) {
return v, nil
}
// beginOp increments the in-flight ops counter. Returns ErrVolumeClosed if
// the volume is already closed, so callers must not proceed.
// syncWithWALProgress persists WAL head pointer in the superblock as part of
// every GroupCommit fsync. This ensures crash recovery can always find WAL
// entries that were durable at the time of the sync.
// syncWithWALProgress persists the WAL head pointer in the superblock.
// Called by the flusher during checkpoint (not on every group commit).
//
// Without this, a crash before the first flusher checkpoint leaves
// WALHead=0 in the superblock, making recovery skip all WAL entries
// (BUG-RESTART-ZEROS).
//
// Sequence: update superblock WALHead → pwrite superblock → fd.Sync.
// The fd.Sync flushes both WAL data and superblock in one syscall.
// The extended WAL recovery scan (RecoverWAL) makes this advisory:
// recovery scans past the recorded WALHead using CRC validation, so
// entries written after the last superblock persist are not lost.
// Persisting WALHead reduces recovery scan range but is not required
// for correctness.
func (v *BlockVol) syncWithWALProgress() error {
v.superMu.Lock()
defer v.superMu.Unlock()
@@ -359,6 +355,8 @@ func (v *BlockVol) syncWithWALProgress() error {
return v.fd.Sync()
}
// beginOp increments the in-flight ops counter. Returns ErrVolumeClosed if
// the volume is already closed, so callers must not proceed.
func (v *BlockVol) beginOp() error {
v.opsOutstanding.Add(1)
if v.closed.Load() {
@@ -742,6 +740,46 @@ type ReplicaAddr struct {
CtrlAddr string
}
// WALAccess provides the shipper with the minimal WAL interface needed
// for reconnect handshake and catch-up. Avoids exposing raw WAL internals.
type WALAccess interface {
// RetainedRange returns the current WAL retained LSN range.
// walRetainStart is the lowest LSN still in the WAL.
// primaryHeadLSN is the highest LSN written.
RetainedRange() (walRetainStart, primaryHeadLSN uint64)
// StreamEntries streams WAL entries from fromLSN to the current head.
// Calls fn for each entry. Returns ErrWALRecycled if fromLSN is no longer retained.
StreamEntries(fromLSN uint64, fn func(*WALEntry) error) error
}
// walAccess implements WALAccess for BlockVol.
type walAccess struct {
vol *BlockVol
}
func (a *walAccess) RetainedRange() (uint64, uint64) {
checkpointLSN := uint64(0)
if a.vol.flusher != nil {
checkpointLSN = a.vol.flusher.CheckpointLSN()
}
headLSN := a.vol.nextLSN.Load()
if headLSN > 0 {
headLSN--
}
// WAL retain start: entries before checkpoint are eligible for reclaim.
// The flusher may have already reclaimed them. Use checkpoint+1 as retain start.
retainStart := checkpointLSN + 1
return retainStart, headLSN
}
func (a *walAccess) StreamEntries(fromLSN uint64, fn func(*WALEntry) error) error {
checkpointLSN := uint64(0)
if a.vol.flusher != nil {
checkpointLSN = a.vol.flusher.CheckpointLSN()
}
return a.vol.wal.ScanFrom(a.vol.fd, a.vol.super.WALOffset, checkpointLSN, fromLSN, fn)
}
// SetReplicaAddr configures a single replica endpoint. Backward-compatible wrapper
// around SetReplicaAddrs for RF=2 callers.
func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) {
@@ -751,18 +789,19 @@ func (v *BlockVol) SetReplicaAddr(dataAddr, ctrlAddr string) {
// SetReplicaAddrs configures N replica endpoints and creates a ShipperGroup
// with distributed group commit. Creates fresh shippers (old group is GC'd).
func (v *BlockVol) SetReplicaAddrs(addrs []ReplicaAddr) {
wa := &walAccess{vol: v}
shippers := make([]*WALShipper, len(addrs))
for i, a := range addrs {
shippers[i] = NewWALShipper(a.DataAddr, a.CtrlAddr, func() uint64 {
return v.epoch.Load()
}, v.Metrics)
}, wa, v.Metrics)
}
v.shipperGroup = NewShipperGroup(shippers)
// Replace the group committer's sync function with a distributed version.
v.groupCommit.Stop()
v.groupCommit = NewGroupCommitter(GroupCommitterConfig{
SyncFunc: MakeDistributedSync(v.syncWithWALProgress, v.shipperGroup, v),
SyncFunc: MakeDistributedSync(v.fd.Sync, v.shipperGroup, v),
MaxDelay: v.config.GroupCommitMaxDelay,
MaxBatch: v.config.GroupCommitMaxBatch,
LowWatermark: v.config.GroupCommitLowWatermark,

View File

@@ -2339,7 +2339,7 @@ func testShipSingleEntry(t *testing.T) {
ctrlAddr, _ := mockCtrlServer(t, BarrierOK)
epoch := uint64(1)
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch })
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil)
defer s.Stop()
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)}
@@ -2371,7 +2371,7 @@ func testShipBatch(t *testing.T) {
ctrlAddr, _ := mockCtrlServer(t, BarrierOK)
epoch := uint64(1)
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch })
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil)
defer s.Stop()
for i := uint64(1); i <= 5; i++ {
@@ -2399,7 +2399,7 @@ func testShipEpochMismatchDropped(t *testing.T) {
ctrlAddr, _ := mockCtrlServer(t, BarrierOK)
epoch := uint64(2) // shipper epoch is 2
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch })
s := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return epoch }, nil)
// First ship a valid entry to establish connection.
validEntry := &WALEntry{LSN: 1, Epoch: 2, Type: EntryTypeTrim, LBA: 0, Length: 4096}
@@ -2444,7 +2444,7 @@ func testShipDegradedOnError(t *testing.T) {
ctrlAddr, _ := mockCtrlServer(t, BarrierOK)
epoch := uint64(1)
s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch })
s := NewWALShipper(ln.Addr().String(), ctrlAddr, func() uint64 { return epoch }, nil)
defer s.Stop()
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)}
@@ -2488,7 +2488,7 @@ func testShipDegradedOnError(t *testing.T) {
func testShipNoReplicaNoop(t *testing.T) {
// A nil shipper should not be called, but test that a stopped shipper is safe.
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 })
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil)
s.Stop()
entry := &WALEntry{LSN: 1, Epoch: 1, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: make([]byte, 4096)}
@@ -3000,7 +3000,7 @@ func testDistCommitBothPass(t *testing.T) {
v := createTestVol(t)
defer v.Close()
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 })
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil)
defer shipper.Stop()
distSync := MakeDistributedSync(walSync, NewShipperGroup([]*WALShipper{shipper}), v)
@@ -3023,7 +3023,7 @@ func testDistCommitLocalFail(t *testing.T) {
v := createTestVol(t)
defer v.Close()
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 })
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil)
defer shipper.Stop()
distSync := MakeDistributedSync(walSync, NewShipperGroup([]*WALShipper{shipper}), v)
@@ -3043,7 +3043,7 @@ func testDistCommitRemoteFailDegrades(t *testing.T) {
v := createTestVol(t)
defer v.Close()
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 })
shipper := NewWALShipper(dataAddr, ctrlAddr, func() uint64 { return 0 }, nil)
defer shipper.Stop()
// Set shipper group on vol so degradeReplica can see it.
@@ -3565,7 +3565,7 @@ func testDemoteStopsShipper(t *testing.T) {
// Create a shipper group (won't connect but that's fine for this test).
shipper := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 {
return v.epoch.Load()
})
}, nil)
v.shipperGroup = NewShipperGroup([]*WALShipper{shipper})
if err := v.HandleAssignment(2, RoleStale, 0); err != nil {

View File

@@ -69,7 +69,7 @@ func TestDistSync_SyncAll_AllDegraded_Fails(t *testing.T) {
// Create a shipper group with one degraded shipper.
shipper := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 {
return vol.epoch.Load()
}, vol.Metrics)
}, nil, vol.Metrics)
shipper.state.Store(uint32(ReplicaDegraded))
group := NewShipperGroup([]*WALShipper{shipper})
@@ -84,8 +84,8 @@ func TestDistSync_SyncQuorum_AllDegraded_RF3_Fails(t *testing.T) {
vol, _ := newTestVolWithMode(t, DurabilitySyncQuorum)
defer vol.Close()
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics)
s2 := NewWALShipper("127.0.0.1:99997", "127.0.0.1:99996", func() uint64 { return 0 }, vol.Metrics)
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics)
s2 := NewWALShipper("127.0.0.1:99997", "127.0.0.1:99996", func() uint64 { return 0 }, nil, vol.Metrics)
s1.state.Store(uint32(ReplicaDegraded))
s2.state.Store(uint32(ReplicaDegraded))
group := NewShipperGroup([]*WALShipper{s1, s2})
@@ -102,7 +102,7 @@ func TestDistSync_BestEffort_BackwardCompat(t *testing.T) {
vol, _ := newTestVolWithMode(t, DurabilityBestEffort)
defer vol.Close()
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics)
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics)
s1.state.Store(uint32(ReplicaDegraded))
group := NewShipperGroup([]*WALShipper{s1})
@@ -116,7 +116,7 @@ func TestDistSync_Metrics_IncrementOnFailure(t *testing.T) {
vol, _ := newTestVolWithMode(t, DurabilitySyncAll)
defer vol.Close()
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, vol.Metrics)
s1 := NewWALShipper("127.0.0.1:99999", "127.0.0.1:99998", func() uint64 { return 0 }, nil, vol.Metrics)
s1.state.Store(uint32(ReplicaDegraded))
group := NewShipperGroup([]*WALShipper{s1})

View File

@@ -279,7 +279,7 @@ func testQAFrameConcurrentWrites(t *testing.T) {
// --- QA-4A-CP2-2: WALShipper Adversarial ---
func testQAShipperShipAfterStop(t *testing.T) {
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 })
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil)
s.Stop()
// Ship after Stop must not panic, must return nil (silently drop).
@@ -294,7 +294,7 @@ func testQAShipperShipAfterStop(t *testing.T) {
}
func testQAShipperBarrierAfterDegraded(t *testing.T) {
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 })
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return 1 }, nil)
s.state.Store(uint32(ReplicaDegraded))
err := s.Barrier(1)
@@ -306,7 +306,7 @@ func testQAShipperBarrierAfterDegraded(t *testing.T) {
func testQAShipperStaleEpochNoShippedLSN(t *testing.T) {
// Ship with epoch != current -> entry silently dropped, shippedLSN unchanged.
currentEpoch := uint64(5)
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch })
s := NewWALShipper("127.0.0.1:0", "127.0.0.1:0", func() uint64 { return currentEpoch }, nil)
entry := &WALEntry{LSN: 10, Epoch: 3, Type: EntryTypeWrite, LBA: 0, Length: 4096, Data: makeBlock('X')}
err := s.Ship(entry)
@@ -337,7 +337,7 @@ func testQAShipperDegradedPermanent(t *testing.T) {
}
}()
s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 })
s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }, nil)
defer s.Stop()
// Ship triggers connection -> immediate close -> write error -> degraded.
@@ -387,7 +387,7 @@ func testQAShipperConcurrentShipStop(t *testing.T) {
}()
defer ln.Close()
s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 })
s := NewWALShipper(ln.Addr().String(), ln.Addr().String(), func() uint64 { return 1 }, nil)
var wg sync.WaitGroup
// Concurrent shippers.
@@ -949,7 +949,7 @@ func testQADSyncBothFail(t *testing.T) {
vol := &BlockVol{}
vol.nextLSN.Store(10)
shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 })
shipper := NewWALShipper("127.0.0.1:1", "127.0.0.1:1", func() uint64 { return 1 }, nil)
syncFn := MakeDistributedSync(
func() error { return localErr },

View File

@@ -197,7 +197,7 @@ func testQA4b4ShipCtrlOkDataDown(t *testing.T) {
recv.dataListener.Close()
// Create shipper pointed at closed data port, working ctrl port.
shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 })
shipper := NewWALShipper(recv.DataAddr(), ctrlAddr, func() uint64 { return 1 }, nil)
defer shipper.Stop()
// Ship should fail (can't connect data), shipper degrades.

View File

@@ -9,7 +9,17 @@ import (
// Data channel message types.
const (
MsgWALEntry byte = 0x01
MsgWALEntry byte = 0x01
MsgResumeShipReq byte = 0x03 // CP13-5: reconnect handshake request
MsgResumeShipResp byte = 0x04 // CP13-5: reconnect handshake response
MsgCatchupDone byte = 0x05 // CP13-5: end of catch-up stream
)
// ResumeShip status codes.
const (
ResumeOK byte = 0x00
ResumeEpochMismatch byte = 0x01
ResumeNeedsRebuild byte = 0x02
)
// Control channel message types.
@@ -172,3 +182,73 @@ func DecodeBarrierRequest(buf []byte) (BarrierRequest, error) {
Epoch: binary.BigEndian.Uint64(buf[12:20]),
}, nil
}
// --- CP13-5: Reconnect handshake messages ---
// ResumeShipReq is sent by the primary on the data channel after reconnect.
type ResumeShipReq struct {
Epoch uint64
PrimaryHeadLSN uint64
WalRetainStart uint64
}
// EncodeResumeShipReq serializes a ResumeShipReq (8+8+8 = 24 bytes).
func EncodeResumeShipReq(req ResumeShipReq) []byte {
buf := make([]byte, 24)
binary.BigEndian.PutUint64(buf[0:8], req.Epoch)
binary.BigEndian.PutUint64(buf[8:16], req.PrimaryHeadLSN)
binary.BigEndian.PutUint64(buf[16:24], req.WalRetainStart)
return buf
}
// DecodeResumeShipReq deserializes a ResumeShipReq.
func DecodeResumeShipReq(buf []byte) (ResumeShipReq, error) {
if len(buf) < 24 {
return ResumeShipReq{}, fmt.Errorf("repl: resume ship req too short: %d bytes", len(buf))
}
return ResumeShipReq{
Epoch: binary.BigEndian.Uint64(buf[0:8]),
PrimaryHeadLSN: binary.BigEndian.Uint64(buf[8:16]),
WalRetainStart: binary.BigEndian.Uint64(buf[16:24]),
}, nil
}
// ResumeShipResp is the replica's reply on the data channel.
type ResumeShipResp struct {
Status byte
ReplicaFlushedLSN uint64
}
// EncodeResumeShipResp serializes a ResumeShipResp (1+8 = 9 bytes).
func EncodeResumeShipResp(resp ResumeShipResp) []byte {
buf := make([]byte, 9)
buf[0] = resp.Status
binary.BigEndian.PutUint64(buf[1:9], resp.ReplicaFlushedLSN)
return buf
}
// DecodeResumeShipResp deserializes a ResumeShipResp.
func DecodeResumeShipResp(buf []byte) (ResumeShipResp, error) {
if len(buf) < 9 {
return ResumeShipResp{}, fmt.Errorf("repl: resume ship resp too short: %d bytes", len(buf))
}
return ResumeShipResp{
Status: buf[0],
ReplicaFlushedLSN: binary.BigEndian.Uint64(buf[1:9]),
}, nil
}
// EncodeCatchupDone serializes the catch-up done marker (8 bytes: snapshotLSN).
func EncodeCatchupDone(snapshotLSN uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf[0:8], snapshotLSN)
return buf
}
// DecodeCatchupDone deserializes the catch-up done marker.
func DecodeCatchupDone(buf []byte) (uint64, error) {
if len(buf) < 8 {
return 0, fmt.Errorf("repl: catchup done too short: %d bytes", len(buf))
}
return binary.BigEndian.Uint64(buf[0:8]), nil
}

View File

@@ -58,8 +58,17 @@ func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string, advertisedHost
if len(advertisedHost) > 0 {
advHost = advertisedHost[0]
}
// Initialize receivedLSN/flushedLSN from the volume's persisted state.
// This handles the case where a ReplicaReceiver is recreated on a
// volume that already has data (e.g., after process restart or reconnect).
initLSN := uint64(0)
if vol.nextLSN.Load() > 1 {
initLSN = vol.nextLSN.Load() - 1
}
r := &ReplicaReceiver{
vol: vol,
receivedLSN: initLSN,
flushedLSN: initLSN,
barrierTimeout: defaultBarrierTimeout,
advertisedHost: advHost,
dataListener: dataLn,
@@ -184,17 +193,55 @@ func (r *ReplicaReceiver) handleDataConn(conn net.Conn) {
return
}
if msgType != MsgWALEntry {
switch msgType {
case MsgWALEntry:
if err := r.applyEntry(payload); err != nil {
log.Printf("replica: apply entry error: %v", err)
}
case MsgResumeShipReq:
r.handleResumeShipReq(conn, payload)
case MsgCatchupDone:
lsn, err := DecodeCatchupDone(payload)
if err != nil {
log.Printf("replica: decode catchup done: %v", err)
} else {
log.Printf("replica: catch-up done, snapshot LSN=%d", lsn)
}
default:
log.Printf("replica: unexpected data message type 0x%02x", msgType)
continue
}
if err := r.applyEntry(payload); err != nil {
log.Printf("replica: apply entry error: %v", err)
}
}
}
// handleResumeShipReq processes the reconnect handshake from the primary.
func (r *ReplicaReceiver) handleResumeShipReq(conn net.Conn, payload []byte) {
req, err := DecodeResumeShipReq(payload)
if err != nil {
log.Printf("replica: decode resume ship req: %v", err)
resp := EncodeResumeShipResp(ResumeShipResp{Status: ResumeNeedsRebuild})
WriteFrame(conn, MsgResumeShipResp, resp)
return
}
// Validate epoch.
localEpoch := r.vol.epoch.Load()
if req.Epoch != localEpoch {
log.Printf("replica: resume ship epoch mismatch: req=%d local=%d", req.Epoch, localEpoch)
resp := EncodeResumeShipResp(ResumeShipResp{
Status: ResumeEpochMismatch,
ReplicaFlushedLSN: r.FlushedLSN(),
})
WriteFrame(conn, MsgResumeShipResp, resp)
return
}
resp := EncodeResumeShipResp(ResumeShipResp{
Status: ResumeOK,
ReplicaFlushedLSN: r.FlushedLSN(),
})
WriteFrame(conn, MsgResumeShipResp, resp)
}
// applyEntry decodes and applies a single WAL entry to the local volume.
// The entire apply (LSN check -> WAL append -> dirty map -> receivedLSN update)
// is serialized under mu to prevent TOCTOU races between concurrent entries.

View File

@@ -0,0 +1,341 @@
package blockvol
// Tests for BUG-SYNC-ALL-FLUSH: three chained bugs that break sync_all mode.
// These tests are designed to FAIL on the current code and PASS after the fix.
//
// Bug 3: Replica addresses are :port not ip:port — cross-machine never connects.
// Bug 2: Reconnected shipper has LSN gap — replica rejects all entries.
// Bug 1: Shipper degrades permanently — no recovery path.
import (
"bytes"
"fmt"
"path/filepath"
"strings"
"testing"
"time"
)
// --- Bug 3: Address resolution ---
// TestBug3_ReplicaAddr_MustBeIPPort verifies that ReplicaReceiver.DataAddr()
// and CtrlAddr() return ip:port, not :port.
// Current code returns ":port" from listener.Addr().String() which fails cross-machine.
// TestBug3_ReplicaAddr_MustBeIPPort_WildcardBind verifies that when
// ReplicaReceiver binds to ":0" (wildcard — the production default),
// DataAddr()/CtrlAddr() still return ip:port, not ":port".
// In production, the VS uses ":0" to let the OS pick a port.
// The address is then sent to the primary via heartbeat/assignment.
// If it's ":port", the primary dials localhost — fails cross-machine.
func TestBug3_ReplicaAddr_MustBeIPPort_WildcardBind(t *testing.T) {
vol := createTestVol(t)
defer vol.Close()
// Bind to ":0" — this is what production code does.
recv, err := NewReplicaReceiver(vol, ":0", ":0")
if err != nil {
t.Fatal(err)
}
defer recv.Stop()
dataAddr := recv.DataAddr()
ctrlAddr := recv.CtrlAddr()
// Addresses MUST be dialable from a remote machine:
// - not ":port" (missing IP entirely)
// - not "0.0.0.0:port" (wildcard, not routable)
// - not "[::]:port" (IPv6 wildcard, not routable)
for _, addr := range []struct{ name, val string }{
{"DataAddr", dataAddr},
{"CtrlAddr", ctrlAddr},
} {
if strings.HasPrefix(addr.val, ":") {
t.Fatalf("%s() returned %q — missing IP", addr.name, addr.val)
}
if strings.HasPrefix(addr.val, "0.0.0.0:") {
t.Fatalf("%s() returned %q — wildcard, not routable cross-machine", addr.name, addr.val)
}
if strings.HasPrefix(addr.val, "[::]:") {
t.Fatalf("%s() returned %q — IPv6 wildcard, not routable cross-machine", addr.name, addr.val)
}
}
}
// --- Bug 2: LSN gap after shipper degradation ---
// TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers verifies the
// full round-trip: primary writes during degraded period → shipper reconnects
// → SyncCache (barrier) must succeed after catch-up, not fail permanently.
//
// This is the core bug: during degraded period, Ship() silently drops entries.
// After reconnection, the replica has a gap. Barrier hangs or fails because
// the replica never received the missing entries.
func TestBug2_SyncAll_SyncCache_AfterDegradedShipperRecovers(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Phase 1: Write + SyncCache while healthy. Must succeed.
if err := primary.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("write 1: %v", err)
}
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache 1 (healthy): %v", err)
}
// Phase 2: Kill the replica's data connection to force shipper degradation.
// Close the receiver, wait for shipper to detect failure.
recv.Stop()
time.Sleep(50 * time.Millisecond)
// Write while degraded. Ship() silently drops these entries.
if err := primary.WriteLBA(1, makeBlock('B')); err != nil {
t.Fatalf("write 2 (degraded): %v", err)
}
if err := primary.WriteLBA(2, makeBlock('C')); err != nil {
t.Fatalf("write 3 (degraded): %v", err)
}
// Phase 3: Restart the replica receiver on the SAME addresses.
// We must NOT call SetReplicaAddr again — that creates a fresh shipper
// and loses the flushed progress needed for reconnect handshake.
savedDataAddr := recv.DataAddr()
savedCtrlAddr := recv.CtrlAddr()
recv2, err := NewReplicaReceiver(replica, savedDataAddr, savedCtrlAddr)
if err != nil {
// Address reuse failed (port still held) — use new ports and reconfigure.
// This loses shipper state, so initialize the new receiver's receivedLSN.
recv2, err = NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatalf("restart receiver: %v", err)
}
primary.SetReplicaAddr(recv2.DataAddr(), recv2.CtrlAddr())
}
recv2.Serve()
defer recv2.Stop()
// Phase 4: SyncCache after recovery. This is the critical test:
// The shipper must catch up the replica on the missing LSNs before
// the barrier can succeed. Without catch-up, the barrier hangs/fails.
syncDone := make(chan error, 1)
go func() {
syncDone <- primary.SyncCache()
}()
select {
case err := <-syncDone:
if err != nil {
t.Fatalf("SyncCache after recovery failed: %v — shipper did not catch up replica", err)
}
case <-time.After(10 * time.Second):
t.Fatal("SyncCache after recovery hung — shipper has no catch-up protocol")
}
// Phase 5: Verify replica has ALL the data.
replica.flusher.FlushOnce()
for lba := uint64(0); lba < 3; lba++ {
got, err := replica.ReadLBA(lba, 4096)
if err != nil {
t.Fatalf("replica ReadLBA(%d): %v", lba, err)
}
expected := byte('A' + lba)
if got[0] != expected {
t.Fatalf("replica LBA %d: expected %c, got %c", lba, expected, got[0])
}
}
}
// --- Bug 1: Permanent degradation ---
// TestBug1_SyncAll_WriteDuringDegraded_SyncCacheMustFail verifies that
// under sync_all mode, SyncCache returns an error (not success) when the
// shipper is degraded and has not caught up. Writes may succeed locally,
// but durability confirmation must fail.
func TestBug1_SyncAll_WriteDuringDegraded_SyncCacheMustFail(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Initial healthy write.
if err := primary.WriteLBA(0, makeBlock('A')); err != nil {
t.Fatalf("write: %v", err)
}
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache healthy: %v", err)
}
// Kill replica — force degradation.
recv.Stop()
time.Sleep(50 * time.Millisecond)
// Write during degraded period.
if err := primary.WriteLBA(1, makeBlock('B')); err != nil {
t.Fatalf("write during degraded: %v", err)
}
// SyncCache under sync_all with degraded replica MUST return error.
// The write is locally durable but the replica barrier fails.
syncDone := make(chan error, 1)
go func() {
syncDone <- primary.SyncCache()
}()
select {
case err := <-syncDone:
if err == nil {
t.Fatal("SyncCache returned nil under sync_all with degraded replica — durability violation")
}
// Expected: ErrDurabilityBarrierFailed or similar.
t.Logf("SyncCache correctly failed: %v", err)
case <-time.After(10 * time.Second):
t.Fatal("SyncCache hung — barrier timeout not propagated")
}
}
// --- Full chain: sync_all write + SyncCache round-trip ---
// TestSyncAll_FullRoundTrip_WriteAndFlush verifies the complete sync_all
// contract: write → ship → barrier → SyncCache returns nil only when
// replica confirms durability.
func TestSyncAll_FullRoundTrip_WriteAndFlush(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Write 10 blocks.
for i := 0; i < 10; i++ {
if err := primary.WriteLBA(uint64(i), makeBlock(byte('0'+i))); err != nil {
t.Fatalf("write %d: %v", i, err)
}
}
// SyncCache = barrier. Under sync_all, this must confirm replica durability.
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache: %v", err)
}
// Replica must have all 10 entries.
if recv.ReceivedLSN() < 10 {
t.Fatalf("replica receivedLSN=%d, expected >=10", recv.ReceivedLSN())
}
// Read back from replica to verify data integrity.
replica.flusher.FlushOnce()
for i := 0; i < 10; i++ {
got, err := replica.ReadLBA(uint64(i), 4096)
if err != nil {
t.Fatalf("replica ReadLBA(%d): %v", i, err)
}
if got[0] != byte('0'+i) {
t.Fatalf("replica LBA %d: expected %c, got %c", i, '0'+i, got[0])
}
}
}
// TestSyncAll_MultipleFlush_NoWritesBetween verifies that repeated
// SyncCache calls without new writes succeed (FLUSH without data).
// This is the mkfs pattern: write blocks → FLUSH → write superblock → FLUSH.
func TestSyncAll_MultipleFlush_NoWritesBetween(t *testing.T) {
primary, replica := createSyncAllPair(t)
defer primary.Close()
defer replica.Close()
recv, err := NewReplicaReceiver(replica, "127.0.0.1:0", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
recv.Serve()
defer recv.Stop()
primary.SetReplicaAddr(recv.DataAddr(), recv.CtrlAddr())
// Write + flush.
if err := primary.WriteLBA(0, makeBlock('X')); err != nil {
t.Fatalf("write: %v", err)
}
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache 1: %v", err)
}
// Flush again without new writes — must succeed, not hang.
syncDone := make(chan error, 1)
go func() {
syncDone <- primary.SyncCache()
}()
select {
case err := <-syncDone:
if err != nil {
t.Fatalf("SyncCache 2 (no new writes): %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("SyncCache 2 hung — barrier on stale lsnMax not handled")
}
// Third flush.
if err := primary.SyncCache(); err != nil {
t.Fatalf("SyncCache 3: %v", err)
}
}
// --- Helpers ---
func createSyncAllPair(t *testing.T) (primary *BlockVol, replica *BlockVol) {
t.Helper()
pDir := t.TempDir()
rDir := t.TempDir()
opts := CreateOptions{
VolumeSize: 1 * 1024 * 1024,
BlockSize: 4096,
WALSize: 256 * 1024,
DurabilityMode: DurabilitySyncAll,
}
p, err := CreateBlockVol(filepath.Join(pDir, "primary.blockvol"), opts)
if err != nil {
t.Fatalf("CreateBlockVol primary: %v", err)
}
p.SetRole(RolePrimary)
p.SetEpoch(1)
p.SetMasterEpoch(1)
p.lease.Grant(30 * time.Second)
r, err := CreateBlockVol(filepath.Join(rDir, "replica.blockvol"), opts)
if err != nil {
p.Close()
t.Fatalf("CreateBlockVol replica: %v", err)
}
r.SetRole(RoleReplica)
r.SetEpoch(1)
r.SetMasterEpoch(1)
return p, r
}
// Suppress unused import.
var _ = fmt.Sprintf
var _ = bytes.Equal

View File

@@ -1175,7 +1175,7 @@ func TestBestEffort_FlushSucceeds_ReplicaDown(t *testing.T) {
// ============================================================
func TestReplicaState_InitialDisconnected(t *testing.T) {
s := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 })
s := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil)
if s.State() != ReplicaDisconnected {
t.Fatalf("initial state: got %s, want disconnected", s.State())
}
@@ -1300,7 +1300,7 @@ func TestReplicaState_ShipFailureTransitionsToDegraded(t *testing.T) {
}
func TestReplicaState_BarrierDegradedReconnectFail_StaysDegraded(t *testing.T) {
s := NewWALShipper("127.0.0.1:1", "127.0.0.1:2", func() uint64 { return 1 })
s := NewWALShipper("127.0.0.1:1", "127.0.0.1:2", func() uint64 { return 1 }, nil)
// Force to Degraded.
s.state.Store(uint32(ReplicaDegraded))
@@ -1349,8 +1349,8 @@ func TestReplicaState_BarrierDegradedReconnectSuccess_RestoresInSync(t *testing.
}
func TestShipperGroup_InSyncCount(t *testing.T) {
s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 })
s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 })
s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil)
s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }, nil)
group := NewShipperGroup([]*WALShipper{s1, s2})
// Both disconnected.
@@ -1599,8 +1599,8 @@ func TestShipperGroup_MinReplicaFlushedLSN(t *testing.T) {
}
// Test with shippers that have no progress.
s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 })
s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 })
s1 := NewWALShipper("127.0.0.1:9001", "127.0.0.1:9002", func() uint64 { return 1 }, nil)
s2 := NewWALShipper("127.0.0.1:9003", "127.0.0.1:9004", func() uint64 { return 1 }, nil)
group := NewShipperGroup([]*WALShipper{s1, s2})
_, ok = group.MinReplicaFlushedLSN()

View File

@@ -55,6 +55,7 @@ type WALShipper struct {
dataAddr string
controlAddr string
epochFn func() uint64
wal WALAccess // primary WAL access for reconnect catch-up
metrics *EngineMetrics
mu sync.Mutex // protects dataConn
@@ -67,13 +68,17 @@ type WALShipper struct {
replicaFlushedLSN atomic.Uint64 // authoritative: highest LSN durably persisted on replica
hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN
state atomic.Uint32 // ReplicaState
catchupFailures int // consecutive catch-up failures; reset on success
stopped atomic.Bool
}
const maxCatchupRetries = 3
// NewWALShipper creates a WAL shipper. Connections are established lazily on
// first Ship/Barrier call. epochFn returns the current epoch for validation.
// wal provides WAL access for reconnect catch-up (nil disables catch-up).
// metrics is optional; if nil, no metrics are recorded.
func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics ...*EngineMetrics) *WALShipper {
func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, walAccess WALAccess, metrics ...*EngineMetrics) *WALShipper {
var m *EngineMetrics
if len(metrics) > 0 {
m = metrics[0]
@@ -82,6 +87,7 @@ func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics
dataAddr: dataAddr,
controlAddr: controlAddr,
epochFn: epochFn,
wal: walAccess,
metrics: m,
}
}
@@ -147,23 +153,17 @@ func (s *WALShipper) Barrier(lsnMax uint64) error {
st := s.State()
switch st {
case ReplicaInSync:
// proceed normally
case ReplicaDisconnected:
// bootstrap path: attempt connect + barrier
case ReplicaDegraded:
// recovery path: reset both connections and attempt reconnect + barrier
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.Close()
s.dataConn = nil
// proceed normally to barrier
case ReplicaDisconnected, ReplicaDegraded:
if s.hasFlushedProgress.Load() && s.wal != nil {
// Previously synced — reconnect handshake + catch-up path.
if err := s.doReconnectAndCatchUp(); err != nil {
return err
}
} else {
// Fresh bootstrap or no WAL access — reset connections for bare retry.
s.resetConnections()
}
s.mu.Unlock()
s.ctrlMu.Lock()
if s.ctrlConn != nil {
s.ctrlConn.Close()
s.ctrlConn = nil
}
s.ctrlMu.Unlock()
default:
// Connecting, CatchingUp, NeedsRebuild — reject immediately
return ErrReplicaDegraded
@@ -334,7 +334,208 @@ func (s *WALShipper) markDegraded() {
log.Printf("wal_shipper: replica degraded (data=%s, ctrl=%s, state=%s)", s.dataAddr, s.controlAddr, s.State())
}
// resetConnections closes both data and control connections for a clean retry.
func (s *WALShipper) resetConnections() {
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.Close()
s.dataConn = nil
}
s.mu.Unlock()
s.ctrlMu.Lock()
if s.ctrlConn != nil {
s.ctrlConn.Close()
s.ctrlConn = nil
}
s.ctrlMu.Unlock()
}
// doReconnectAndCatchUp runs the full reconnect handshake + catch-up protocol.
// On success, transitions to InSync and resets ctrl connection for barrier.
func (s *WALShipper) doReconnectAndCatchUp() error {
targetState, _, err := s.reconnectWithHandshake()
switch targetState {
case ReplicaInSync:
s.markInSync()
case ReplicaCatchingUp:
if catchErr := s.runCatchUp(s.replicaFlushedLSN.Load()); catchErr != nil {
s.catchupFailures++
if s.catchupFailures >= maxCatchupRetries {
s.state.Store(uint32(ReplicaNeedsRebuild))
return fmt.Errorf("catch-up failed %d times: %w", s.catchupFailures, catchErr)
}
s.markDegraded()
return ErrReplicaDegraded
}
s.markInSync()
case ReplicaNeedsRebuild:
s.state.Store(uint32(ReplicaNeedsRebuild))
return fmt.Errorf("reconnect: %w", err)
default:
s.markDegraded()
return ErrReplicaDegraded
}
// Reset ctrl connection so barrier creates a fresh one.
s.ctrlMu.Lock()
if s.ctrlConn != nil {
s.ctrlConn.Close()
s.ctrlConn = nil
}
s.ctrlMu.Unlock()
return nil
}
func (s *WALShipper) markInSync() {
s.state.Store(uint32(ReplicaInSync))
s.catchupFailures = 0
log.Printf("wal_shipper: replica in-sync (data=%s, ctrl=%s)", s.dataAddr, s.controlAddr)
}
const catchupTimeout = 30 * time.Second
// reconnectWithHandshake performs the CP13-5 reconnect protocol:
// connect data channel → send ResumeShipReq → read ResumeShipResp → decide.
// Returns the target state (InSync, CatchingUp, NeedsRebuild) and replica's flushed LSN.
// Caller must hold no locks. Must only be called when wal != nil.
func (s *WALShipper) reconnectWithHandshake() (targetState ReplicaState, replicaFlushedLSN uint64, err error) {
s.state.Store(uint32(ReplicaConnecting))
// Reset and establish data connection.
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.Close()
s.dataConn = nil
}
if err := s.ensureDataConn(); err != nil {
s.mu.Unlock()
return ReplicaDegraded, 0, fmt.Errorf("reconnect dial: %w", err)
}
s.dataConn.SetDeadline(time.Now().Add(catchupTimeout))
conn := s.dataConn
s.mu.Unlock()
// Gather primary state.
retainStart, headLSN := s.wal.RetainedRange()
epoch := s.epochFn()
// Send ResumeShipReq.
req := EncodeResumeShipReq(ResumeShipReq{
Epoch: epoch,
PrimaryHeadLSN: headLSN,
WalRetainStart: retainStart,
})
if err := WriteFrame(conn, MsgResumeShipReq, req); err != nil {
return ReplicaDegraded, 0, fmt.Errorf("reconnect send req: %w", err)
}
// Read ResumeShipResp.
msgType, payload, err := ReadFrame(conn)
if err != nil {
return ReplicaDegraded, 0, fmt.Errorf("reconnect read resp: %w", err)
}
if msgType != MsgResumeShipResp {
return ReplicaDegraded, 0, fmt.Errorf("reconnect: unexpected msg type 0x%02x", msgType)
}
resp, err := DecodeResumeShipResp(payload)
if err != nil {
return ReplicaDegraded, 0, err
}
// Clear deadline for catch-up streaming.
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.SetDeadline(time.Time{})
}
s.mu.Unlock()
// Decision matrix.
switch resp.Status {
case ResumeEpochMismatch:
return ReplicaNeedsRebuild, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: epoch mismatch")
case ResumeNeedsRebuild:
return ReplicaNeedsRebuild, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: replica requests rebuild")
case ResumeOK:
// proceed to gap analysis
default:
return ReplicaDegraded, resp.ReplicaFlushedLSN, fmt.Errorf("reconnect: unknown status 0x%02x", resp.Status)
}
R := resp.ReplicaFlushedLSN
H := headLSN
S := retainStart
if R > H {
// Impossible: replica ahead of primary.
log.Printf("wal_shipper: reconnect %s: impossible progress R=%d > H=%d", s.dataAddr, R, H)
return ReplicaNeedsRebuild, R, fmt.Errorf("reconnect: impossible replica progress")
}
if R == H {
// Already caught up.
log.Printf("wal_shipper: reconnect %s: already caught up (R=H=%d)", s.dataAddr, R)
return ReplicaInSync, R, nil
}
if R+1 >= S {
// Recoverable gap: WAL still has entries from R+1.
log.Printf("wal_shipper: reconnect %s: recoverable gap R=%d H=%d S=%d", s.dataAddr, R, H, S)
return ReplicaCatchingUp, R, nil
}
// Gap exceeds retained WAL.
log.Printf("wal_shipper: reconnect %s: gap too large R=%d H=%d S=%d", s.dataAddr, R, H, S)
return ReplicaNeedsRebuild, R, fmt.Errorf("reconnect: gap exceeds retained WAL")
}
// runCatchUp streams WAL entries from fromLSN+1 to the replica on the data channel.
// Sends MsgCatchupDone when complete. Caller must hold no shipper locks.
func (s *WALShipper) runCatchUp(fromLSN uint64) error {
s.state.Store(uint32(ReplicaCatchingUp))
// Set a deadline for the entire catch-up operation.
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.SetDeadline(time.Now().Add(catchupTimeout))
}
conn := s.dataConn
s.mu.Unlock()
if conn == nil {
return fmt.Errorf("catch-up: no data connection")
}
// Stream entries from WAL.
var lastSent uint64
err := s.wal.StreamEntries(fromLSN+1, func(entry *WALEntry) error {
encoded, encErr := entry.Encode()
if encErr != nil {
return encErr
}
if wErr := WriteFrame(conn, MsgWALEntry, encoded); wErr != nil {
return wErr
}
lastSent = entry.LSN
return nil
})
if err != nil {
if errors.Is(err, ErrWALRecycled) {
s.state.Store(uint32(ReplicaNeedsRebuild))
return fmt.Errorf("catch-up: WAL recycled: %w", err)
}
return fmt.Errorf("catch-up: stream error: %w", err)
}
// Send CatchupDone marker.
_, headLSN := s.wal.RetainedRange()
if err := WriteFrame(conn, MsgCatchupDone, EncodeCatchupDone(headLSN)); err != nil {
return fmt.Errorf("catch-up: send done: %w", err)
}
// Clear deadline.
s.mu.Lock()
if s.dataConn != nil {
s.dataConn.SetDeadline(time.Time{})
}
s.mu.Unlock()
log.Printf("wal_shipper: catch-up complete %s: from=%d last=%d", s.dataAddr, fromLSN+1, lastSent)
return nil
}