From 5aedada53aabb20fddfaabc1310111a76cd2677b Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sun, 5 Apr 2026 14:10:05 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=2018=20M3=20=E2=80=94=20replicate?= =?UTF-8?q?d=20data=20continuity=20closure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M3 milestone: write → Loop2 observe → failover → readback verify. Continuity runtime (continuity_runtime.go): - ExecuteReplicatedContinuity: composes mirror write + sync + Loop2 observation + failover + readback verify into one bounded path - ReplicatedContinuityResult: captures pre-failover Loop2 snapshot, failover result, selected primary, readback length, data match Runtime manager extensions: - Local node registry for write/readback during continuity verification - RegisterNode now stores node reference for local I/O access Tests prove two paths: - Happy: write on source → failover → promoted node reads correct data - Gated: degraded peer → failover gate stops → continuity reports failure Phase 18 docs: M3 delivered, M4 next. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/.private/phase/phase-18-decisions.md | 26 +++ sw-block/.private/phase/phase-18-log.md | 28 +++ sw-block/.private/phase/phase-18.md | 23 ++- .../runtime/volumev2/continuity_runtime.go | 120 ++++++++++++ sw-block/runtime/volumev2/poc_test.go | 178 ++++++++++++++++++ sw-block/runtime/volumev2/runtime_manager.go | 24 +++ 6 files changed, 394 insertions(+), 5 deletions(-) create mode 100644 sw-block/runtime/volumev2/continuity_runtime.go diff --git a/sw-block/.private/phase/phase-18-decisions.md b/sw-block/.private/phase/phase-18-decisions.md index c5cdecad7..3c8e147ef 100644 --- a/sw-block/.private/phase/phase-18-decisions.md +++ b/sw-block/.private/phase/phase-18-decisions.md @@ -116,3 +116,29 @@ Implication: 1. the first active Loop 2 runtime is summary-driven 2. later work can deepen it into real continuous keepup/catchup/rebuild choreography without changing the ownership rule + +## D6: `M3` Closes On One Bounded Continuity Statement, Not Broad RF2 Proof + +Decision: + +1. `M3` is considered complete when one runtime-owned continuity path exists: + write -> active Loop 2 observation -> failover -> readback verification +2. `M3` requires both: + - a healthy path + - a gated fail-closed path +3. `M3` does not imply broad RF2 product continuity proof + +Why: + +1. after `M1` and `M2`, the next meaningful closure is to compose failover and + active Loop 2 into one bounded continuity statement +2. this proves the runtime is not only structurally correct, but already able to + carry one real end-to-end continuity story +3. keeping the claim bounded avoids overreading the current in-process runtime as + a complete RF2 product path + +Implication: + +1. later work can attach RF2-facing product/runtime surfaces on top of a real + continuity-bearing runtime slice +2. `M4` should attach one bounded surface without widening the continuity claim diff --git a/sw-block/.private/phase/phase-18-log.md b/sw-block/.private/phase/phase-18-log.md index f32f7a984..fdd9332dd 100644 --- a/sw-block/.private/phase/phase-18-log.md +++ b/sw-block/.private/phase/phase-18-log.md @@ -104,3 +104,31 @@ Current interpretation: 2. this is still bounded summary-driven runtime ownership, not full shipper or rebuild-task choreography 3. the next active work should move to `M3` + +### `M3` Delivered + +Delivered in this update: + +1. one runtime-owned replicated continuity entry point now exists: + - `ExecuteReplicatedContinuity(...)` +2. failover and active Loop 2 are now composed into one bounded continuity path +3. the continuity result captures: + - pre-failover Loop 2 snapshot + - failover result + - selected primary + - readback length + - data match + +Tests: + +1. `TestInProcessRuntimeManager_ExecuteReplicatedContinuity_HappyPath` +2. `TestInProcessRuntimeManager_ExecuteReplicatedContinuity_GatedPath` +3. `go test ./sw-block/runtime/masterv2 ./sw-block/runtime/volumev2` + +Current interpretation: + +1. `M3` is complete as one bounded replicated continuity closure on the current + runtime path +2. this is still a bounded continuity claim on the in-process/runtime-owned + path, not broad RF2 product continuity proof +3. the next active work should move to `M4` diff --git a/sw-block/.private/phase/phase-18.md b/sw-block/.private/phase/phase-18.md index 05adf2545..926c4513b 100644 --- a/sw-block/.private/phase/phase-18.md +++ b/sw-block/.private/phase/phase-18.md @@ -227,11 +227,25 @@ Exit criteria: Current status: -1. not started +1. delivered +2. one runtime-owned replicated continuity entry point now exists +3. failover and active Loop 2 are now combined into one bounded continuity + statement +4. current boundary: + - continuity is closed on the bounded in-process runtime path + - this is not yet a broad RF2 product continuity claim Review/test update: -1. pending +1. delivered code: + - `ExecuteReplicatedContinuity(...)` + - `ReplicatedContinuityResult` +2. delivered tests: + - healthy replicated continuity through failover + - gated replicated continuity fail-closed path +3. result: + - the runtime now owns one bounded write -> observe -> failover -> readback + continuity statement ### `M4`: RF2 Product Runtime Surfaces @@ -311,9 +325,8 @@ written decision in `phase-18-decisions.md`. The active next work is: -1. `M3` seam step -2. turn the current failover + active Loop 2 runtime slices into one bounded - replicated continuity statement +1. `M4` seam step +2. attach one bounded RF2-facing runtime/product surface to the new runtime ## Review Base diff --git a/sw-block/runtime/volumev2/continuity_runtime.go b/sw-block/runtime/volumev2/continuity_runtime.go new file mode 100644 index 000000000..ff4ee5b9d --- /dev/null +++ b/sw-block/runtime/volumev2/continuity_runtime.go @@ -0,0 +1,120 @@ +package volumev2 + +import ( + "bytes" + "fmt" + "slices" +) + +// ReplicatedContinuityResult captures one bounded replicated continuity run +// through the current runtime-owned path. +type ReplicatedContinuityResult struct { + VolumeName string + SourcePrimaryNodeID string + SelectedPrimaryNodeID string + ExpectedEpoch uint64 + Loop2BeforeFailover Loop2RuntimeSnapshot + Failover FailoverResult + ReadBackLength uint32 + DataMatch bool +} + +// ExecuteReplicatedContinuity runs one bounded continuity statement through the +// current runtime: +// mirror writes to the bounded participant set -> observe active Loop 2 -> +// fail over to the survivor set -> read back data from the newly selected +// primary. This is a bounded continuity closure, not a full replication product +// claim. +func (m *InProcessRuntimeManager) ExecuteReplicatedContinuity(volumeName, sourcePrimaryNodeID string, expectedEpoch uint64, survivorNodeIDs []string, lba uint64, payload []byte) (ReplicatedContinuityResult, error) { + if m == nil { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: runtime manager is nil") + } + if volumeName == "" { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity volume name is required") + } + if sourcePrimaryNodeID == "" { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity source primary node id is required") + } + if len(payload) == 0 { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity payload is required") + } + if len(survivorNodeIDs) == 0 { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity survivor node ids are required") + } + + nodeIDs := append([]string{sourcePrimaryNodeID}, survivorNodeIDs...) + nodeIDs = uniqueSorted(nodeIDs) + nodes := make([]*Node, 0, len(nodeIDs)) + for _, nodeID := range nodeIDs { + node, err := m.localNode(nodeID) + if err != nil { + return ReplicatedContinuityResult{}, err + } + nodes = append(nodes, node) + } + + for _, node := range nodes { + if err := node.WriteLBA(volumeName, lba, payload); err != nil { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity write %s on %s: %w", volumeName, node.NodeID(), err) + } + } + for _, node := range nodes { + if err := node.SyncCache(volumeName); err != nil { + return ReplicatedContinuityResult{}, fmt.Errorf("volumev2: continuity sync %s on %s: %w", volumeName, node.NodeID(), err) + } + } + + result := ReplicatedContinuityResult{ + VolumeName: volumeName, + SourcePrimaryNodeID: sourcePrimaryNodeID, + ExpectedEpoch: expectedEpoch, + } + + loop2Snap, err := m.ObserveLoop2(volumeName, sourcePrimaryNodeID, expectedEpoch, nodeIDs...) + if err != nil { + return result, err + } + result.Loop2BeforeFailover = loop2Snap + + failover, err := m.ExecuteFailover(volumeName, expectedEpoch, survivorNodeIDs...) + result.Failover = failover + if err != nil { + return result, err + } + result.SelectedPrimaryNodeID = failover.Assignment.NodeID + + selectedNode, err := m.localNode(failover.Assignment.NodeID) + if err != nil { + return result, err + } + readBack, err := selectedNode.ReadLBA(volumeName, lba, uint32(len(payload))) + if err != nil { + return result, fmt.Errorf("volumev2: continuity readback %s on %s: %w", volumeName, selectedNode.NodeID(), err) + } + result.ReadBackLength = uint32(len(readBack)) + result.DataMatch = bytes.Equal(readBack, payload) + if !result.DataMatch { + return result, fmt.Errorf("volumev2: continuity payload mismatch after failover") + } + return result, nil +} + +func uniqueSorted(values []string) []string { + if len(values) == 0 { + return nil + } + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for _, value := range values { + if value == "" { + continue + } + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + out = append(out, value) + } + slices.Sort(out) + return out +} diff --git a/sw-block/runtime/volumev2/poc_test.go b/sw-block/runtime/volumev2/poc_test.go index bba609a4e..75c55cd04 100644 --- a/sw-block/runtime/volumev2/poc_test.go +++ b/sw-block/runtime/volumev2/poc_test.go @@ -1739,6 +1739,184 @@ func TestInProcessRuntimeManager_ObserveLoop2_NeedsRebuild(t *testing.T) { } } +func TestInProcessRuntimeManager_ExecuteReplicatedContinuity_HappyPath(t *testing.T) { + master := masterv2.New(masterv2.Config{}) + manager, err := NewInProcessRuntimeManager(master) + if err != nil { + t.Fatalf("new runtime manager: %v", err) + } + nodeB, err := New(Config{NodeID: "node-b"}) + if err != nil { + t.Fatalf("new node-b: %v", err) + } + defer nodeB.Close() + nodeC, err := New(Config{NodeID: "node-c"}) + if err != nil { + t.Fatalf("new node-c: %v", err) + } + defer nodeC.Close() + if err := manager.RegisterNode(nodeB); err != nil { + t.Fatalf("register node-b: %v", err) + } + if err := manager.RegisterNode(nodeC); err != nil { + t.Fatalf("register node-c: %v", err) + } + + tempDir := t.TempDir() + pathB := filepath.Join(tempDir, "continuity-b.blk") + pathC := filepath.Join(tempDir, "continuity-c.blk") + if err := master.DeclarePrimary(masterv2.VolumeSpec{ + Name: "continuity-vol", + Path: pathB, + PrimaryNodeID: "node-a", + CreateOptions: testCreateOptions(), + }); err != nil { + t.Fatalf("declare primary: %v", err) + } + if err := nodeB.ApplyAssignments([]masterv2.Assignment{{ + Name: "continuity-vol", + Path: pathB, + NodeID: "node-b", + Epoch: 2, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + }}); err != nil { + t.Fatalf("seed node-b: %v", err) + } + if err := nodeC.ApplyAssignments([]masterv2.Assignment{{ + Name: "continuity-vol", + Path: pathC, + NodeID: "node-c", + Epoch: 2, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + }}); err != nil { + t.Fatalf("seed node-c: %v", err) + } + + payload := bytes.Repeat([]byte{0x7A}, 4096) + result, err := manager.ExecuteReplicatedContinuity("continuity-vol", "node-b", 2, []string{"node-c"}, 0, payload) + if err != nil { + t.Fatalf("execute replicated continuity: %v", err) + } + if result.Loop2BeforeFailover.Mode != Loop2RuntimeModeKeepUp { + t.Fatalf("loop2 before failover=%q, want %q", result.Loop2BeforeFailover.Mode, Loop2RuntimeModeKeepUp) + } + if result.SelectedPrimaryNodeID != "node-c" { + t.Fatalf("selected primary=%q, want node-c", result.SelectedPrimaryNodeID) + } + if !result.DataMatch { + t.Fatalf("expected continuity data match: %+v", result) + } + if result.ReadBackLength != uint32(len(payload)) { + t.Fatalf("readback length=%d, want %d", result.ReadBackLength, len(payload)) + } +} + +func TestInProcessRuntimeManager_ExecuteReplicatedContinuity_GatedPath(t *testing.T) { + master := masterv2.New(masterv2.Config{}) + manager, err := NewInProcessRuntimeManager(master) + if err != nil { + t.Fatalf("new runtime manager: %v", err) + } + nodeB, err := New(Config{NodeID: "node-b"}) + if err != nil { + t.Fatalf("new node-b: %v", err) + } + defer nodeB.Close() + nodeC, err := New(Config{NodeID: "node-c"}) + if err != nil { + t.Fatalf("new node-c: %v", err) + } + defer nodeC.Close() + if err := manager.RegisterNode(nodeB); err != nil { + t.Fatalf("register node-b: %v", err) + } + if err := manager.RegisterNode(nodeC); err != nil { + t.Fatalf("register node-c: %v", err) + } + + tempDir := t.TempDir() + pathB := filepath.Join(tempDir, "continuity-gated-b.blk") + pathC := filepath.Join(tempDir, "continuity-gated-c.blk") + if err := master.DeclarePrimary(masterv2.VolumeSpec{ + Name: "continuity-gated-vol", + Path: pathB, + PrimaryNodeID: "node-a", + CreateOptions: testCreateOptions(), + }); err != nil { + t.Fatalf("declare primary: %v", err) + } + if err := nodeB.ApplyAssignments([]masterv2.Assignment{{ + Name: "continuity-gated-vol", + Path: pathB, + NodeID: "node-b", + Epoch: 2, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + }}); err != nil { + t.Fatalf("seed node-b: %v", err) + } + if err := nodeC.ApplyAssignments([]masterv2.Assignment{{ + Name: "continuity-gated-vol", + Path: pathC, + NodeID: "node-c", + Epoch: 2, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + }}); err != nil { + t.Fatalf("seed node-c: %v", err) + } + if err := manager.RegisterTarget(staticFailoverTarget( + "node-c", + masterv2.PromotionQueryResponse{ + VolumeName: "continuity-gated-vol", + NodeID: "node-c", + Epoch: 2, + CommittedLSN: 1, + WALHeadLSN: 1, + Eligible: false, + Reason: "needs_rebuild", + }, + protocolv2.ReplicaSummaryResponse{ + VolumeName: "continuity-gated-vol", + NodeID: "node-c", + Epoch: 2, + Role: "replica", + Mode: "needs_rebuild", + CommittedLSN: 1, + DurableLSN: 1, + CheckpointLSN: 1, + RecoveryPhase: "needs_rebuild", + LastBarrierOK: false, + LastBarrierReason: "timeout", + Eligible: false, + Reason: "needs_rebuild", + }, + )); err != nil { + t.Fatalf("override node-c target: %v", err) + } + + payload := bytes.Repeat([]byte{0x7B}, 4096) + result, err := manager.ExecuteReplicatedContinuity("continuity-gated-vol", "node-b", 2, []string{"node-c"}, 0, payload) + if err == nil { + t.Fatal("expected gated replicated continuity error") + } + if result.Loop2BeforeFailover.Mode != Loop2RuntimeModeNeedsRebuild { + t.Fatalf("loop2 before failover=%q, want %q", result.Loop2BeforeFailover.Mode, Loop2RuntimeModeNeedsRebuild) + } + if result.SelectedPrimaryNodeID != "" { + t.Fatalf("selected primary=%q, want empty on gated continuity", result.SelectedPrimaryNodeID) + } + if result.DataMatch { + t.Fatalf("unexpected continuity data match on gated path: %+v", result) + } +} + func mustHeartbeat(t *testing.T, node *Node) masterv2.NodeHeartbeat { t.Helper() hb, err := node.Heartbeat() diff --git a/sw-block/runtime/volumev2/runtime_manager.go b/sw-block/runtime/volumev2/runtime_manager.go index 7490b2c5a..e434d4eaa 100644 --- a/sw-block/runtime/volumev2/runtime_manager.go +++ b/sw-block/runtime/volumev2/runtime_manager.go @@ -16,6 +16,7 @@ type InProcessRuntimeManager struct { evidenceTransport *InMemoryFailoverEvidenceTransport mu sync.RWMutex + localNodes map[string]*Node lastSnapshot FailoverSnapshot lastResult FailoverResult hasLastResult bool @@ -36,6 +37,7 @@ func NewInProcessRuntimeManager(master *masterv2.Master) (*InProcessRuntimeManag return &InProcessRuntimeManager{ driver: driver, evidenceTransport: NewInMemoryFailoverEvidenceTransport(), + localNodes: make(map[string]*Node), snapshotsByName: make(map[string]FailoverSnapshot), resultsByName: make(map[string]FailoverResult), loop2ByVolume: make(map[string]Loop2RuntimeSnapshot), @@ -53,6 +55,9 @@ func (m *InProcessRuntimeManager) RegisterNode(node *Node) error { if err := m.evidenceTransport.RegisterHandler(node.NodeID(), node); err != nil { return err } + m.mu.Lock() + m.localNodes[node.NodeID()] = node + m.mu.Unlock() target, err := NewHybridInProcessFailoverTarget(node, m.evidenceTransport) if err != nil { return err @@ -84,6 +89,9 @@ func (m *InProcessRuntimeManager) UnregisterParticipant(nodeID string) { if m.evidenceTransport != nil { m.evidenceTransport.UnregisterHandler(nodeID) } + m.mu.Lock() + delete(m.localNodes, nodeID) + m.mu.Unlock() m.driver.UnregisterParticipant(nodeID) } @@ -240,3 +248,19 @@ func (m *InProcessRuntimeManager) recordLoop2Snapshot(volumeName string, snapsho m.loop2ByVolume[volumeName] = snapshot } } + +func (m *InProcessRuntimeManager) localNode(nodeID string) (*Node, error) { + if m == nil { + return nil, fmt.Errorf("volumev2: runtime manager is nil") + } + if nodeID == "" { + return nil, fmt.Errorf("volumev2: local node id is required") + } + m.mu.RLock() + defer m.mu.RUnlock() + node, ok := m.localNodes[nodeID] + if !ok || node == nil { + return nil, fmt.Errorf("volumev2: local node %q is not registered", nodeID) + } + return node, nil +}