diff --git a/sw-block/.private/phase/phase-18-decisions.md b/sw-block/.private/phase/phase-18-decisions.md index 518c41989..c5cdecad7 100644 --- a/sw-block/.private/phase/phase-18-decisions.md +++ b/sw-block/.private/phase/phase-18-decisions.md @@ -93,3 +93,26 @@ Implication: - primary-local takeover 2. later transport work may widen execution transport, but only without changing the authority split + +## D5: `M2` Closes On A Bounded Summary-Driven Active Loop 2 Runtime + +Decision: + +1. `M2` is considered complete when Loop 2 becomes runtime-owned outside + failover-only logic through a bounded active observation/controller slice +2. `M2` does not require full shipper task execution or rebuild choreography + +Why: + +1. the main gap after `M1` is not more transport syntax; it is that Loop 2 + should exist as an active runtime owner +2. bounded replica summaries already carry enough information to derive a first + runtime-owned `keepup` / `catching_up` / `needs_rebuild` slice +3. this allows the runtime to become continuously meaningful without pretending + the full replication executor is already migrated + +Implication: + +1. the first active Loop 2 runtime is summary-driven +2. later work can deepen it into real continuous keepup/catchup/rebuild + choreography without changing the ownership rule diff --git a/sw-block/.private/phase/phase-18-log.md b/sw-block/.private/phase/phase-18-log.md index d316a9a20..f32f7a984 100644 --- a/sw-block/.private/phase/phase-18-log.md +++ b/sw-block/.private/phase/phase-18-log.md @@ -76,3 +76,31 @@ Current interpretation: 2. this is still a bounded request/response transport implementation, not broad network-product proof 3. the next active work should move to `M2` + +### `M2` Delivered + +Delivered in this update: + +1. one runtime-owned active Loop 2 session/controller now exists: + - `Loop2RuntimeSession` +2. one bounded active runtime snapshot now exists: + - `Loop2RuntimeSnapshot` + - `Loop2RuntimeMode` +3. the runtime manager now owns active Loop 2 observation entry points and + retained snapshots +4. the active Loop 2 slice is driven by bounded replica summaries rather than + by hidden backend ownership + +Tests: + +1. `TestLoop2RuntimeSession_KeepUpOnHealthyReplicaSet` +2. `TestInProcessRuntimeManager_ObserveLoop2_CatchingUp` +3. `TestInProcessRuntimeManager_ObserveLoop2_NeedsRebuild` +4. `go test ./sw-block/runtime/masterv2 ./sw-block/runtime/volumev2` + +Current interpretation: + +1. `M2` is complete as the first active Loop 2 runtime slice +2. this is still bounded summary-driven runtime ownership, not full shipper or + rebuild-task choreography +3. the next active work should move to `M3` diff --git a/sw-block/.private/phase/phase-18.md b/sw-block/.private/phase/phase-18.md index f20f94545..05adf2545 100644 --- a/sw-block/.private/phase/phase-18.md +++ b/sw-block/.private/phase/phase-18.md @@ -177,11 +177,30 @@ Exit criteria: Current status: -1. not started +1. delivered +2. one runtime-owned active Loop 2 session/controller now derives bounded + `keepup` / `catching_up` / `needs_rebuild` runtime modes from replica + summaries +3. the runtime manager now owns explicit active Loop 2 observation entry points + and snapshots +4. current boundary: + - the active runtime is bounded summary-driven, not full shipper/rebuild task + choreography Review/test update: -1. pending +1. delivered code: + - `Loop2RuntimeSession` + - `Loop2RuntimeSnapshot` + - `Loop2RuntimeMode` + - runtime-manager `ObserveLoop2(...)` / `LastLoop2Snapshot(...)` / + `Loop2Snapshot(...)` +2. delivered tests: + - healthy `keepup` + - lagging `catching_up` + - explicit `needs_rebuild` +3. result: + - Loop 2 now has a runtime-owned active slice outside failover-only logic ### `M3`: Replicated Data Continuity Closure @@ -292,9 +311,9 @@ written decision in `phase-18-decisions.md`. The active next work is: -1. `M2` seam step -2. keep the new transport/session-backed failover path as the reference closure - while active Loop 2 runtime is introduced +1. `M3` seam step +2. turn the current failover + active Loop 2 runtime slices into one bounded + replicated continuity statement ## Review Base diff --git a/sw-block/runtime/volumev2/loop2_runtime.go b/sw-block/runtime/volumev2/loop2_runtime.go new file mode 100644 index 000000000..ed6a8d571 --- /dev/null +++ b/sw-block/runtime/volumev2/loop2_runtime.go @@ -0,0 +1,270 @@ +package volumev2 + +import ( + "fmt" + "slices" + + "github.com/seaweedfs/seaweedfs/sw-block/runtime/protocolv2" +) + +// Loop2RuntimeMode is the coarse active runtime mode for the primary-led +// data-control loop. It is a runtime-owned view, not a public protocol surface. +type Loop2RuntimeMode string + +const ( + Loop2RuntimeModeKeepUp Loop2RuntimeMode = "keepup" + Loop2RuntimeModeCatchingUp Loop2RuntimeMode = "catching_up" + Loop2RuntimeModeDegraded Loop2RuntimeMode = "degraded" + Loop2RuntimeModeNeedsRebuild Loop2RuntimeMode = "needs_rebuild" +) + +// Loop2ReplicaStatus is the bounded per-replica view exposed by the active +// Loop 2 runtime session. +type Loop2ReplicaStatus struct { + NodeID string + Role string + Mode string + Reason string + RecoveryPhase string + CommittedLSN uint64 + DurableLSN uint64 + CheckpointLSN uint64 + TargetLSN uint64 + AchievedLSN uint64 + LastBarrierOK bool + LastBarrierReason string +} + +// Loop2RuntimeSnapshot is the runtime-owned active data-control view for one +// volume at one observation point. +type Loop2RuntimeSnapshot struct { + VolumeName string + PrimaryNodeID string + ExpectedEpoch uint64 + Mode Loop2RuntimeMode + Reason string + ReplicaCount int + HealthyReplicaCount int + CommittedLSN uint64 + DurableFloorLSN uint64 + TargetLSN uint64 + AchievedLSN uint64 + Replicas []Loop2ReplicaStatus +} + +// Loop2RuntimeSession is the first runtime-owned active Loop 2 controller. It +// periodically observes bounded replica summaries and derives a primary-led +// runtime mode beyond failover-only logic. +type Loop2RuntimeSession struct { + volumeName string + primaryNodeID string + expectedEpoch uint64 + targets []FailoverTarget + + snapshot Loop2RuntimeSnapshot + lastErr error +} + +// NewLoop2RuntimeSession creates a new active Loop 2 session over explicit +// failover targets. +func NewLoop2RuntimeSession(volumeName, primaryNodeID string, expectedEpoch uint64, targets []FailoverTarget) (*Loop2RuntimeSession, error) { + if volumeName == "" { + return nil, fmt.Errorf("volumev2: loop2 volume name is required") + } + if primaryNodeID == "" { + return nil, fmt.Errorf("volumev2: loop2 primary node id is required") + } + if len(targets) == 0 { + return nil, fmt.Errorf("volumev2: loop2 targets are required") + } + return &Loop2RuntimeSession{ + volumeName: volumeName, + primaryNodeID: primaryNodeID, + expectedEpoch: expectedEpoch, + targets: targets, + }, nil +} + +// ObserveOnce collects bounded replica summaries and refreshes the active Loop +// 2 runtime snapshot. +func (s *Loop2RuntimeSession) ObserveOnce() (Loop2RuntimeSnapshot, error) { + if s == nil { + return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 session is nil") + } + req := protocolv2.ReplicaSummaryRequest{ + VolumeName: s.volumeName, + ExpectedEpoch: s.expectedEpoch, + } + summaries := make([]protocolv2.ReplicaSummaryResponse, 0, len(s.targets)) + for _, target := range s.targets { + if target.NodeID == "" || target.Evidence == nil { + continue + } + summary, err := target.Evidence.QueryReplicaSummary(req) + if err != nil { + s.lastErr = fmt.Errorf("volumev2: loop2 summary %s: %w", s.volumeName, err) + return s.snapshot, s.lastErr + } + summaries = append(summaries, summary) + } + snapshot, err := evaluateLoop2Runtime(s.volumeName, s.primaryNodeID, s.expectedEpoch, summaries) + if err != nil { + s.lastErr = err + return s.snapshot, err + } + s.snapshot = snapshot + s.lastErr = nil + return snapshot, nil +} + +// Snapshot returns the latest active Loop 2 runtime snapshot. +func (s *Loop2RuntimeSession) Snapshot() Loop2RuntimeSnapshot { + if s == nil { + return Loop2RuntimeSnapshot{} + } + return s.snapshot +} + +// LastError returns the last Loop 2 observation error. +func (s *Loop2RuntimeSession) LastError() error { + if s == nil { + return fmt.Errorf("volumev2: loop2 session is nil") + } + return s.lastErr +} + +func evaluateLoop2Runtime(volumeName, primaryNodeID string, expectedEpoch uint64, summaries []protocolv2.ReplicaSummaryResponse) (Loop2RuntimeSnapshot, error) { + if len(summaries) == 0 { + return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 summaries are required") + } + slices.SortFunc(summaries, func(a, b protocolv2.ReplicaSummaryResponse) int { + switch { + case a.NodeID < b.NodeID: + return -1 + case a.NodeID > b.NodeID: + return 1 + default: + return 0 + } + }) + + var ( + primary protocolv2.ReplicaSummaryResponse + foundPrimary bool + durableFloor uint64 + maxTarget uint64 + minAchieved uint64 + healthyCount int + mode = Loop2RuntimeModeKeepUp + reason string + replicas = make([]Loop2ReplicaStatus, 0, len(summaries)) + minAchievedSet bool + ) + + for _, summary := range summaries { + replicas = append(replicas, Loop2ReplicaStatus{ + NodeID: summary.NodeID, + Role: summary.Role, + Mode: summary.Mode, + Reason: summary.Reason, + RecoveryPhase: summary.RecoveryPhase, + CommittedLSN: summary.CommittedLSN, + DurableLSN: summary.DurableLSN, + CheckpointLSN: summary.CheckpointLSN, + TargetLSN: summary.TargetLSN, + AchievedLSN: summary.AchievedLSN, + LastBarrierOK: summary.LastBarrierOK, + LastBarrierReason: summary.LastBarrierReason, + }) + if summary.NodeID == primaryNodeID { + primary = summary + foundPrimary = true + } + if summary.Epoch == expectedEpoch && summary.Eligible { + healthyCount++ + } + if durableFloor == 0 || summary.DurableLSN < durableFloor { + durableFloor = summary.DurableLSN + } + if summary.TargetLSN > maxTarget { + maxTarget = summary.TargetLSN + } + if !minAchievedSet || summary.AchievedLSN < minAchieved { + minAchieved = summary.AchievedLSN + minAchievedSet = true + } + } + if !foundPrimary { + return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 primary %q missing from summaries", primaryNodeID) + } + + for _, summary := range summaries { + switch { + case summary.Epoch != expectedEpoch: + mode = degradeLoop2Mode(mode, Loop2RuntimeModeDegraded) + if reason == "" { + reason = "peer_epoch_mismatch" + } + case summary.Mode == "needs_rebuild" || summary.Reason == "needs_rebuild" || summary.RecoveryPhase == "needs_rebuild": + mode = Loop2RuntimeModeNeedsRebuild + if reason == "" { + reason = "needs_rebuild" + } + case !summary.LastBarrierOK && summary.LastBarrierReason != "": + mode = degradeLoop2Mode(mode, Loop2RuntimeModeDegraded) + if reason == "" { + reason = summary.LastBarrierReason + } + case summary.NodeID != primaryNodeID && (summary.RecoveryPhase == "catching_up" || summary.RecoveryPhase == "rebuilding"): + mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp) + if reason == "" { + reason = summary.RecoveryPhase + } + case summary.NodeID != primaryNodeID && summary.TargetLSN > 0 && summary.AchievedLSN < summary.TargetLSN: + mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp) + if reason == "" { + reason = "peer_target_not_achieved" + } + case summary.NodeID != primaryNodeID && summary.DurableLSN < primary.CommittedLSN: + mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp) + if reason == "" { + reason = "peer_durable_behind_primary" + } + } + } + + return Loop2RuntimeSnapshot{ + VolumeName: volumeName, + PrimaryNodeID: primaryNodeID, + ExpectedEpoch: expectedEpoch, + Mode: mode, + Reason: reason, + ReplicaCount: len(summaries), + HealthyReplicaCount: healthyCount, + CommittedLSN: primary.CommittedLSN, + DurableFloorLSN: durableFloor, + TargetLSN: maxTarget, + AchievedLSN: minAchieved, + Replicas: replicas, + }, nil +} + +func degradeLoop2Mode(current, next Loop2RuntimeMode) Loop2RuntimeMode { + if loop2ModeRank(next) > loop2ModeRank(current) { + return next + } + return current +} + +func loop2ModeRank(mode Loop2RuntimeMode) int { + switch mode { + case Loop2RuntimeModeNeedsRebuild: + return 3 + case Loop2RuntimeModeDegraded: + return 2 + case Loop2RuntimeModeCatchingUp: + return 1 + default: + return 0 + } +} diff --git a/sw-block/runtime/volumev2/poc_test.go b/sw-block/runtime/volumev2/poc_test.go index 0d908c3c8..bba609a4e 100644 --- a/sw-block/runtime/volumev2/poc_test.go +++ b/sw-block/runtime/volumev2/poc_test.go @@ -1519,6 +1519,226 @@ func TestTransportEvidenceAdapter_GatedFailoverFlow(t *testing.T) { } } +func TestLoop2RuntimeSession_KeepUpOnHealthyReplicaSet(t *testing.T) { + nodeB, err := New(Config{NodeID: "node-b"}) + if err != nil { + t.Fatalf("new node-b: %v", err) + } + defer nodeB.Close() + + tempDir := t.TempDir() + pathB := filepath.Join(tempDir, "loop2-keepup-b.blk") + assignB := masterv2.Assignment{ + Name: "loop2-keepup-vol", + Path: pathB, + NodeID: "node-b", + Epoch: 3, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + } + if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil { + t.Fatalf("seed node-b: %v", err) + } + payload := bytes.Repeat([]byte{0x71}, 4096) + if err := nodeB.WriteLBA("loop2-keepup-vol", 0, payload); err != nil { + t.Fatalf("write node-b: %v", err) + } + if err := nodeB.SyncCache("loop2-keepup-vol"); err != nil { + t.Fatalf("sync node-b: %v", err) + } + primary := mustReplicaSummary(t, nodeB, "loop2-keepup-vol", 3) + + session, err := NewLoop2RuntimeSession("loop2-keepup-vol", "node-b", 3, []FailoverTarget{ + mustInProcessFailoverTarget(t, nodeB), + staticFailoverTarget( + "node-c", + masterv2.PromotionQueryResponse{VolumeName: "loop2-keepup-vol", NodeID: "node-c"}, + protocolv2.ReplicaSummaryResponse{ + VolumeName: "loop2-keepup-vol", + NodeID: "node-c", + Epoch: 3, + Role: "replica", + Mode: "replica_ready", + RoleApplied: true, + ReceiverReady: true, + CommittedLSN: primary.CommittedLSN, + DurableLSN: primary.DurableLSN, + CheckpointLSN: primary.CheckpointLSN, + LastBarrierOK: true, + Eligible: true, + }, + ), + }) + if err != nil { + t.Fatalf("new loop2 runtime session: %v", err) + } + snap, err := session.ObserveOnce() + if err != nil { + t.Fatalf("observe loop2 keepup: %v", err) + } + if snap.Mode != Loop2RuntimeModeKeepUp { + t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeKeepUp) + } + if snap.HealthyReplicaCount != 2 { + t.Fatalf("healthy replicas=%d, want 2", snap.HealthyReplicaCount) + } +} + +func TestInProcessRuntimeManager_ObserveLoop2_CatchingUp(t *testing.T) { + master := masterv2.New(masterv2.Config{}) + manager, err := NewInProcessRuntimeManager(master) + if err != nil { + t.Fatalf("new runtime manager: %v", err) + } + nodeB, err := New(Config{NodeID: "node-b"}) + if err != nil { + t.Fatalf("new node-b: %v", err) + } + defer nodeB.Close() + if err := manager.RegisterNode(nodeB); err != nil { + t.Fatalf("register node-b: %v", err) + } + + tempDir := t.TempDir() + pathB := filepath.Join(tempDir, "loop2-catchup-b.blk") + assignB := masterv2.Assignment{ + Name: "loop2-catchup-vol", + Path: pathB, + NodeID: "node-b", + Epoch: 4, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + } + if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil { + t.Fatalf("seed node-b: %v", err) + } + payload := bytes.Repeat([]byte{0x72}, 4096) + if err := nodeB.WriteLBA("loop2-catchup-vol", 0, payload); err != nil { + t.Fatalf("write node-b: %v", err) + } + if err := nodeB.SyncCache("loop2-catchup-vol"); err != nil { + t.Fatalf("sync node-b: %v", err) + } + primary := mustReplicaSummary(t, nodeB, "loop2-catchup-vol", 4) + + if err := manager.RegisterTarget(staticFailoverTarget( + "node-c", + masterv2.PromotionQueryResponse{VolumeName: "loop2-catchup-vol", NodeID: "node-c"}, + protocolv2.ReplicaSummaryResponse{ + VolumeName: "loop2-catchup-vol", + NodeID: "node-c", + Epoch: 4, + Role: "replica", + Mode: "replica_ready", + RoleApplied: true, + ReceiverReady: true, + CommittedLSN: primary.CommittedLSN - 1, + DurableLSN: primary.CommittedLSN - 1, + CheckpointLSN: primary.CheckpointLSN, + TargetLSN: primary.CommittedLSN, + AchievedLSN: primary.CommittedLSN - 1, + RecoveryPhase: "catching_up", + LastBarrierOK: true, + Eligible: true, + }, + )); err != nil { + t.Fatalf("register node-c target: %v", err) + } + + snap, err := manager.ObserveLoop2("loop2-catchup-vol", "node-b", 4) + if err != nil { + t.Fatalf("observe loop2 catchup: %v", err) + } + if snap.Mode != Loop2RuntimeModeCatchingUp { + t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeCatchingUp) + } + if snap.Reason == "" { + t.Fatal("expected loop2 catching_up reason") + } + lastSnap, ok := manager.LastLoop2Snapshot() + if !ok { + t.Fatal("expected last loop2 snapshot") + } + if lastSnap.Mode != Loop2RuntimeModeCatchingUp { + t.Fatalf("last loop2 mode=%q, want %q", lastSnap.Mode, Loop2RuntimeModeCatchingUp) + } +} + +func TestInProcessRuntimeManager_ObserveLoop2_NeedsRebuild(t *testing.T) { + master := masterv2.New(masterv2.Config{}) + manager, err := NewInProcessRuntimeManager(master) + if err != nil { + t.Fatalf("new runtime manager: %v", err) + } + nodeB, err := New(Config{NodeID: "node-b"}) + if err != nil { + t.Fatalf("new node-b: %v", err) + } + defer nodeB.Close() + if err := manager.RegisterNode(nodeB); err != nil { + t.Fatalf("register node-b: %v", err) + } + + tempDir := t.TempDir() + pathB := filepath.Join(tempDir, "loop2-rebuild-b.blk") + assignB := masterv2.Assignment{ + Name: "loop2-rebuild-vol", + Path: pathB, + NodeID: "node-b", + Epoch: 5, + LeaseTTL: 30 * time.Second, + CreateOptions: testCreateOptions(), + Role: "primary", + } + if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil { + t.Fatalf("seed node-b: %v", err) + } + + if err := manager.RegisterTarget(staticFailoverTarget( + "node-c", + masterv2.PromotionQueryResponse{VolumeName: "loop2-rebuild-vol", NodeID: "node-c"}, + protocolv2.ReplicaSummaryResponse{ + VolumeName: "loop2-rebuild-vol", + NodeID: "node-c", + Epoch: 5, + Role: "replica", + Mode: "needs_rebuild", + RoleApplied: true, + ReceiverReady: false, + CommittedLSN: 1, + DurableLSN: 1, + CheckpointLSN: 1, + RecoveryPhase: "needs_rebuild", + LastBarrierOK: false, + LastBarrierReason: "timeout", + Eligible: false, + Reason: "needs_rebuild", + }, + )); err != nil { + t.Fatalf("register node-c target: %v", err) + } + + snap, err := manager.ObserveLoop2("loop2-rebuild-vol", "node-b", 5) + if err != nil { + t.Fatalf("observe loop2 needs_rebuild: %v", err) + } + if snap.Mode != Loop2RuntimeModeNeedsRebuild { + t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeNeedsRebuild) + } + if snap.Reason != "needs_rebuild" { + t.Fatalf("loop2 reason=%q, want needs_rebuild", snap.Reason) + } + perVolSnap, ok := manager.Loop2Snapshot("loop2-rebuild-vol") + if !ok { + t.Fatal("expected per-volume loop2 snapshot") + } + if perVolSnap.Mode != Loop2RuntimeModeNeedsRebuild { + t.Fatalf("per-volume loop2 mode=%q, want %q", perVolSnap.Mode, Loop2RuntimeModeNeedsRebuild) + } +} + func mustHeartbeat(t *testing.T, node *Node) masterv2.NodeHeartbeat { t.Helper() hb, err := node.Heartbeat() @@ -1540,6 +1760,18 @@ func mustPromotionEvidence(t *testing.T, node *Node, volumeName string, epoch ui return resp } +func mustReplicaSummary(t *testing.T, node *Node, volumeName string, epoch uint64) protocolv2.ReplicaSummaryResponse { + t.Helper() + resp, err := node.QueryReplicaSummary(protocolv2.ReplicaSummaryRequest{ + VolumeName: volumeName, + ExpectedEpoch: epoch, + }) + if err != nil { + t.Fatalf("replica summary: %v", err) + } + return resp +} + func mustInProcessFailoverTarget(t *testing.T, node *Node) FailoverTarget { t.Helper() target, err := NewInProcessFailoverTarget(node) diff --git a/sw-block/runtime/volumev2/runtime_manager.go b/sw-block/runtime/volumev2/runtime_manager.go index f6d93a439..7490b2c5a 100644 --- a/sw-block/runtime/volumev2/runtime_manager.go +++ b/sw-block/runtime/volumev2/runtime_manager.go @@ -15,12 +15,15 @@ type InProcessRuntimeManager struct { driver *InProcessFailoverDriver evidenceTransport *InMemoryFailoverEvidenceTransport - mu sync.RWMutex - lastSnapshot FailoverSnapshot - lastResult FailoverResult - hasLastResult bool - snapshotsByName map[string]FailoverSnapshot - resultsByName map[string]FailoverResult + mu sync.RWMutex + lastSnapshot FailoverSnapshot + lastResult FailoverResult + hasLastResult bool + snapshotsByName map[string]FailoverSnapshot + resultsByName map[string]FailoverResult + lastLoop2Snapshot Loop2RuntimeSnapshot + hasLastLoop2 bool + loop2ByVolume map[string]Loop2RuntimeSnapshot } // NewInProcessRuntimeManager creates a runtime-owned failover manager over one @@ -35,6 +38,7 @@ func NewInProcessRuntimeManager(master *masterv2.Master) (*InProcessRuntimeManag evidenceTransport: NewInMemoryFailoverEvidenceTransport(), snapshotsByName: make(map[string]FailoverSnapshot), resultsByName: make(map[string]FailoverResult), + loop2ByVolume: make(map[string]Loop2RuntimeSnapshot), }, nil } @@ -114,6 +118,34 @@ func (m *InProcessRuntimeManager) ExecuteFailover(volumeName string, expectedEpo return result, runErr } +// NewLoop2RuntimeSession resolves runtime-owned targets and creates an active +// Loop 2 session for one selected primary. +func (m *InProcessRuntimeManager) NewLoop2RuntimeSession(volumeName, primaryNodeID string, expectedEpoch uint64, nodeIDs ...string) (*Loop2RuntimeSession, error) { + if m == nil || m.driver == nil { + return nil, fmt.Errorf("volumev2: runtime manager is nil") + } + targets, err := m.driver.resolveTargets(nodeIDs) + if err != nil { + return nil, err + } + return NewLoop2RuntimeSession(volumeName, primaryNodeID, expectedEpoch, targets) +} + +// ObserveLoop2 runs one active Loop 2 observation and persists the latest +// runtime snapshot. +func (m *InProcessRuntimeManager) ObserveLoop2(volumeName, primaryNodeID string, expectedEpoch uint64, nodeIDs ...string) (Loop2RuntimeSnapshot, error) { + if m == nil { + return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: runtime manager is nil") + } + session, err := m.NewLoop2RuntimeSession(volumeName, primaryNodeID, expectedEpoch, nodeIDs...) + if err != nil { + return Loop2RuntimeSnapshot{}, err + } + snapshot, obsErr := session.ObserveOnce() + m.recordLoop2Snapshot(volumeName, session.Snapshot()) + return snapshot, obsErr +} + // LastFailoverSnapshot returns the most recent runtime-owned failover snapshot. func (m *InProcessRuntimeManager) LastFailoverSnapshot() (FailoverSnapshot, bool) { if m == nil { @@ -162,6 +194,31 @@ func (m *InProcessRuntimeManager) FailoverResult(volumeName string) (FailoverRes return result, ok } +// LastLoop2Snapshot returns the most recent active Loop 2 runtime snapshot. +func (m *InProcessRuntimeManager) LastLoop2Snapshot() (Loop2RuntimeSnapshot, bool) { + if m == nil { + return Loop2RuntimeSnapshot{}, false + } + m.mu.RLock() + defer m.mu.RUnlock() + if !m.hasLastLoop2 { + return Loop2RuntimeSnapshot{}, false + } + return m.lastLoop2Snapshot, true +} + +// Loop2Snapshot returns the latest active Loop 2 runtime snapshot for one +// volume if present. +func (m *InProcessRuntimeManager) Loop2Snapshot(volumeName string) (Loop2RuntimeSnapshot, bool) { + if m == nil { + return Loop2RuntimeSnapshot{}, false + } + m.mu.RLock() + defer m.mu.RUnlock() + snapshot, ok := m.loop2ByVolume[volumeName] + return snapshot, ok +} + func (m *InProcessRuntimeManager) recordSnapshot(volumeName string, snapshot FailoverSnapshot, result FailoverResult) { m.mu.Lock() defer m.mu.Unlock() @@ -173,3 +230,13 @@ func (m *InProcessRuntimeManager) recordSnapshot(volumeName string, snapshot Fai m.resultsByName[volumeName] = result } } + +func (m *InProcessRuntimeManager) recordLoop2Snapshot(volumeName string, snapshot Loop2RuntimeSnapshot) { + m.mu.Lock() + defer m.mu.Unlock() + m.lastLoop2Snapshot = snapshot + m.hasLastLoop2 = true + if volumeName != "" { + m.loop2ByVolume[volumeName] = snapshot + } +}