mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 17:51:30 +00:00
feat: Phase 18 M2 — active Loop 2 replication runtime
M2 milestone: bounded summary-driven active Loop 2 runtime. Loop 2 runtime session (loop2_runtime.go): - Loop2RuntimeSession: primary-led active observation of replica set - ObserveOnce: collects replica summaries via transport seam, evaluates runtime mode (keepup / catching_up / degraded / needs_rebuild) - Fail-closed severity escalation: mode only degrades, never reverts - Detection: epoch mismatch, barrier failure, peer behind primary, recovery in progress, needs_rebuild sticky Runtime manager integration: - NewLoop2RuntimeSession, ObserveLoop2, LastLoop2Snapshot, Loop2Snapshot - Runtime manager now retains active Loop 2 snapshots alongside failover Tests prove three paths: - healthy replica set → keepup - peer behind → catching_up - peer needs_rebuild → needs_rebuild (fail-closed) Phase 18 docs updated: M2 delivered, M3 next. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -93,3 +93,26 @@ Implication:
|
||||
- primary-local takeover
|
||||
2. later transport work may widen execution transport, but only without changing
|
||||
the authority split
|
||||
|
||||
## D5: `M2` Closes On A Bounded Summary-Driven Active Loop 2 Runtime
|
||||
|
||||
Decision:
|
||||
|
||||
1. `M2` is considered complete when Loop 2 becomes runtime-owned outside
|
||||
failover-only logic through a bounded active observation/controller slice
|
||||
2. `M2` does not require full shipper task execution or rebuild choreography
|
||||
|
||||
Why:
|
||||
|
||||
1. the main gap after `M1` is not more transport syntax; it is that Loop 2
|
||||
should exist as an active runtime owner
|
||||
2. bounded replica summaries already carry enough information to derive a first
|
||||
runtime-owned `keepup` / `catching_up` / `needs_rebuild` slice
|
||||
3. this allows the runtime to become continuously meaningful without pretending
|
||||
the full replication executor is already migrated
|
||||
|
||||
Implication:
|
||||
|
||||
1. the first active Loop 2 runtime is summary-driven
|
||||
2. later work can deepen it into real continuous keepup/catchup/rebuild
|
||||
choreography without changing the ownership rule
|
||||
|
||||
@@ -76,3 +76,31 @@ Current interpretation:
|
||||
2. this is still a bounded request/response transport implementation, not broad
|
||||
network-product proof
|
||||
3. the next active work should move to `M2`
|
||||
|
||||
### `M2` Delivered
|
||||
|
||||
Delivered in this update:
|
||||
|
||||
1. one runtime-owned active Loop 2 session/controller now exists:
|
||||
- `Loop2RuntimeSession`
|
||||
2. one bounded active runtime snapshot now exists:
|
||||
- `Loop2RuntimeSnapshot`
|
||||
- `Loop2RuntimeMode`
|
||||
3. the runtime manager now owns active Loop 2 observation entry points and
|
||||
retained snapshots
|
||||
4. the active Loop 2 slice is driven by bounded replica summaries rather than
|
||||
by hidden backend ownership
|
||||
|
||||
Tests:
|
||||
|
||||
1. `TestLoop2RuntimeSession_KeepUpOnHealthyReplicaSet`
|
||||
2. `TestInProcessRuntimeManager_ObserveLoop2_CatchingUp`
|
||||
3. `TestInProcessRuntimeManager_ObserveLoop2_NeedsRebuild`
|
||||
4. `go test ./sw-block/runtime/masterv2 ./sw-block/runtime/volumev2`
|
||||
|
||||
Current interpretation:
|
||||
|
||||
1. `M2` is complete as the first active Loop 2 runtime slice
|
||||
2. this is still bounded summary-driven runtime ownership, not full shipper or
|
||||
rebuild-task choreography
|
||||
3. the next active work should move to `M3`
|
||||
|
||||
@@ -177,11 +177,30 @@ Exit criteria:
|
||||
|
||||
Current status:
|
||||
|
||||
1. not started
|
||||
1. delivered
|
||||
2. one runtime-owned active Loop 2 session/controller now derives bounded
|
||||
`keepup` / `catching_up` / `needs_rebuild` runtime modes from replica
|
||||
summaries
|
||||
3. the runtime manager now owns explicit active Loop 2 observation entry points
|
||||
and snapshots
|
||||
4. current boundary:
|
||||
- the active runtime is bounded summary-driven, not full shipper/rebuild task
|
||||
choreography
|
||||
|
||||
Review/test update:
|
||||
|
||||
1. pending
|
||||
1. delivered code:
|
||||
- `Loop2RuntimeSession`
|
||||
- `Loop2RuntimeSnapshot`
|
||||
- `Loop2RuntimeMode`
|
||||
- runtime-manager `ObserveLoop2(...)` / `LastLoop2Snapshot(...)` /
|
||||
`Loop2Snapshot(...)`
|
||||
2. delivered tests:
|
||||
- healthy `keepup`
|
||||
- lagging `catching_up`
|
||||
- explicit `needs_rebuild`
|
||||
3. result:
|
||||
- Loop 2 now has a runtime-owned active slice outside failover-only logic
|
||||
|
||||
### `M3`: Replicated Data Continuity Closure
|
||||
|
||||
@@ -292,9 +311,9 @@ written decision in `phase-18-decisions.md`.
|
||||
|
||||
The active next work is:
|
||||
|
||||
1. `M2` seam step
|
||||
2. keep the new transport/session-backed failover path as the reference closure
|
||||
while active Loop 2 runtime is introduced
|
||||
1. `M3` seam step
|
||||
2. turn the current failover + active Loop 2 runtime slices into one bounded
|
||||
replicated continuity statement
|
||||
|
||||
## Review Base
|
||||
|
||||
|
||||
270
sw-block/runtime/volumev2/loop2_runtime.go
Normal file
270
sw-block/runtime/volumev2/loop2_runtime.go
Normal file
@@ -0,0 +1,270 @@
|
||||
package volumev2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/sw-block/runtime/protocolv2"
|
||||
)
|
||||
|
||||
// Loop2RuntimeMode is the coarse active runtime mode for the primary-led
|
||||
// data-control loop. It is a runtime-owned view, not a public protocol surface.
|
||||
type Loop2RuntimeMode string
|
||||
|
||||
const (
|
||||
Loop2RuntimeModeKeepUp Loop2RuntimeMode = "keepup"
|
||||
Loop2RuntimeModeCatchingUp Loop2RuntimeMode = "catching_up"
|
||||
Loop2RuntimeModeDegraded Loop2RuntimeMode = "degraded"
|
||||
Loop2RuntimeModeNeedsRebuild Loop2RuntimeMode = "needs_rebuild"
|
||||
)
|
||||
|
||||
// Loop2ReplicaStatus is the bounded per-replica view exposed by the active
|
||||
// Loop 2 runtime session.
|
||||
type Loop2ReplicaStatus struct {
|
||||
NodeID string
|
||||
Role string
|
||||
Mode string
|
||||
Reason string
|
||||
RecoveryPhase string
|
||||
CommittedLSN uint64
|
||||
DurableLSN uint64
|
||||
CheckpointLSN uint64
|
||||
TargetLSN uint64
|
||||
AchievedLSN uint64
|
||||
LastBarrierOK bool
|
||||
LastBarrierReason string
|
||||
}
|
||||
|
||||
// Loop2RuntimeSnapshot is the runtime-owned active data-control view for one
|
||||
// volume at one observation point.
|
||||
type Loop2RuntimeSnapshot struct {
|
||||
VolumeName string
|
||||
PrimaryNodeID string
|
||||
ExpectedEpoch uint64
|
||||
Mode Loop2RuntimeMode
|
||||
Reason string
|
||||
ReplicaCount int
|
||||
HealthyReplicaCount int
|
||||
CommittedLSN uint64
|
||||
DurableFloorLSN uint64
|
||||
TargetLSN uint64
|
||||
AchievedLSN uint64
|
||||
Replicas []Loop2ReplicaStatus
|
||||
}
|
||||
|
||||
// Loop2RuntimeSession is the first runtime-owned active Loop 2 controller. It
|
||||
// periodically observes bounded replica summaries and derives a primary-led
|
||||
// runtime mode beyond failover-only logic.
|
||||
type Loop2RuntimeSession struct {
|
||||
volumeName string
|
||||
primaryNodeID string
|
||||
expectedEpoch uint64
|
||||
targets []FailoverTarget
|
||||
|
||||
snapshot Loop2RuntimeSnapshot
|
||||
lastErr error
|
||||
}
|
||||
|
||||
// NewLoop2RuntimeSession creates a new active Loop 2 session over explicit
|
||||
// failover targets.
|
||||
func NewLoop2RuntimeSession(volumeName, primaryNodeID string, expectedEpoch uint64, targets []FailoverTarget) (*Loop2RuntimeSession, error) {
|
||||
if volumeName == "" {
|
||||
return nil, fmt.Errorf("volumev2: loop2 volume name is required")
|
||||
}
|
||||
if primaryNodeID == "" {
|
||||
return nil, fmt.Errorf("volumev2: loop2 primary node id is required")
|
||||
}
|
||||
if len(targets) == 0 {
|
||||
return nil, fmt.Errorf("volumev2: loop2 targets are required")
|
||||
}
|
||||
return &Loop2RuntimeSession{
|
||||
volumeName: volumeName,
|
||||
primaryNodeID: primaryNodeID,
|
||||
expectedEpoch: expectedEpoch,
|
||||
targets: targets,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ObserveOnce collects bounded replica summaries and refreshes the active Loop
|
||||
// 2 runtime snapshot.
|
||||
func (s *Loop2RuntimeSession) ObserveOnce() (Loop2RuntimeSnapshot, error) {
|
||||
if s == nil {
|
||||
return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 session is nil")
|
||||
}
|
||||
req := protocolv2.ReplicaSummaryRequest{
|
||||
VolumeName: s.volumeName,
|
||||
ExpectedEpoch: s.expectedEpoch,
|
||||
}
|
||||
summaries := make([]protocolv2.ReplicaSummaryResponse, 0, len(s.targets))
|
||||
for _, target := range s.targets {
|
||||
if target.NodeID == "" || target.Evidence == nil {
|
||||
continue
|
||||
}
|
||||
summary, err := target.Evidence.QueryReplicaSummary(req)
|
||||
if err != nil {
|
||||
s.lastErr = fmt.Errorf("volumev2: loop2 summary %s: %w", s.volumeName, err)
|
||||
return s.snapshot, s.lastErr
|
||||
}
|
||||
summaries = append(summaries, summary)
|
||||
}
|
||||
snapshot, err := evaluateLoop2Runtime(s.volumeName, s.primaryNodeID, s.expectedEpoch, summaries)
|
||||
if err != nil {
|
||||
s.lastErr = err
|
||||
return s.snapshot, err
|
||||
}
|
||||
s.snapshot = snapshot
|
||||
s.lastErr = nil
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
// Snapshot returns the latest active Loop 2 runtime snapshot.
|
||||
func (s *Loop2RuntimeSession) Snapshot() Loop2RuntimeSnapshot {
|
||||
if s == nil {
|
||||
return Loop2RuntimeSnapshot{}
|
||||
}
|
||||
return s.snapshot
|
||||
}
|
||||
|
||||
// LastError returns the last Loop 2 observation error.
|
||||
func (s *Loop2RuntimeSession) LastError() error {
|
||||
if s == nil {
|
||||
return fmt.Errorf("volumev2: loop2 session is nil")
|
||||
}
|
||||
return s.lastErr
|
||||
}
|
||||
|
||||
func evaluateLoop2Runtime(volumeName, primaryNodeID string, expectedEpoch uint64, summaries []protocolv2.ReplicaSummaryResponse) (Loop2RuntimeSnapshot, error) {
|
||||
if len(summaries) == 0 {
|
||||
return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 summaries are required")
|
||||
}
|
||||
slices.SortFunc(summaries, func(a, b protocolv2.ReplicaSummaryResponse) int {
|
||||
switch {
|
||||
case a.NodeID < b.NodeID:
|
||||
return -1
|
||||
case a.NodeID > b.NodeID:
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
})
|
||||
|
||||
var (
|
||||
primary protocolv2.ReplicaSummaryResponse
|
||||
foundPrimary bool
|
||||
durableFloor uint64
|
||||
maxTarget uint64
|
||||
minAchieved uint64
|
||||
healthyCount int
|
||||
mode = Loop2RuntimeModeKeepUp
|
||||
reason string
|
||||
replicas = make([]Loop2ReplicaStatus, 0, len(summaries))
|
||||
minAchievedSet bool
|
||||
)
|
||||
|
||||
for _, summary := range summaries {
|
||||
replicas = append(replicas, Loop2ReplicaStatus{
|
||||
NodeID: summary.NodeID,
|
||||
Role: summary.Role,
|
||||
Mode: summary.Mode,
|
||||
Reason: summary.Reason,
|
||||
RecoveryPhase: summary.RecoveryPhase,
|
||||
CommittedLSN: summary.CommittedLSN,
|
||||
DurableLSN: summary.DurableLSN,
|
||||
CheckpointLSN: summary.CheckpointLSN,
|
||||
TargetLSN: summary.TargetLSN,
|
||||
AchievedLSN: summary.AchievedLSN,
|
||||
LastBarrierOK: summary.LastBarrierOK,
|
||||
LastBarrierReason: summary.LastBarrierReason,
|
||||
})
|
||||
if summary.NodeID == primaryNodeID {
|
||||
primary = summary
|
||||
foundPrimary = true
|
||||
}
|
||||
if summary.Epoch == expectedEpoch && summary.Eligible {
|
||||
healthyCount++
|
||||
}
|
||||
if durableFloor == 0 || summary.DurableLSN < durableFloor {
|
||||
durableFloor = summary.DurableLSN
|
||||
}
|
||||
if summary.TargetLSN > maxTarget {
|
||||
maxTarget = summary.TargetLSN
|
||||
}
|
||||
if !minAchievedSet || summary.AchievedLSN < minAchieved {
|
||||
minAchieved = summary.AchievedLSN
|
||||
minAchievedSet = true
|
||||
}
|
||||
}
|
||||
if !foundPrimary {
|
||||
return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: loop2 primary %q missing from summaries", primaryNodeID)
|
||||
}
|
||||
|
||||
for _, summary := range summaries {
|
||||
switch {
|
||||
case summary.Epoch != expectedEpoch:
|
||||
mode = degradeLoop2Mode(mode, Loop2RuntimeModeDegraded)
|
||||
if reason == "" {
|
||||
reason = "peer_epoch_mismatch"
|
||||
}
|
||||
case summary.Mode == "needs_rebuild" || summary.Reason == "needs_rebuild" || summary.RecoveryPhase == "needs_rebuild":
|
||||
mode = Loop2RuntimeModeNeedsRebuild
|
||||
if reason == "" {
|
||||
reason = "needs_rebuild"
|
||||
}
|
||||
case !summary.LastBarrierOK && summary.LastBarrierReason != "":
|
||||
mode = degradeLoop2Mode(mode, Loop2RuntimeModeDegraded)
|
||||
if reason == "" {
|
||||
reason = summary.LastBarrierReason
|
||||
}
|
||||
case summary.NodeID != primaryNodeID && (summary.RecoveryPhase == "catching_up" || summary.RecoveryPhase == "rebuilding"):
|
||||
mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp)
|
||||
if reason == "" {
|
||||
reason = summary.RecoveryPhase
|
||||
}
|
||||
case summary.NodeID != primaryNodeID && summary.TargetLSN > 0 && summary.AchievedLSN < summary.TargetLSN:
|
||||
mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp)
|
||||
if reason == "" {
|
||||
reason = "peer_target_not_achieved"
|
||||
}
|
||||
case summary.NodeID != primaryNodeID && summary.DurableLSN < primary.CommittedLSN:
|
||||
mode = degradeLoop2Mode(mode, Loop2RuntimeModeCatchingUp)
|
||||
if reason == "" {
|
||||
reason = "peer_durable_behind_primary"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Loop2RuntimeSnapshot{
|
||||
VolumeName: volumeName,
|
||||
PrimaryNodeID: primaryNodeID,
|
||||
ExpectedEpoch: expectedEpoch,
|
||||
Mode: mode,
|
||||
Reason: reason,
|
||||
ReplicaCount: len(summaries),
|
||||
HealthyReplicaCount: healthyCount,
|
||||
CommittedLSN: primary.CommittedLSN,
|
||||
DurableFloorLSN: durableFloor,
|
||||
TargetLSN: maxTarget,
|
||||
AchievedLSN: minAchieved,
|
||||
Replicas: replicas,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func degradeLoop2Mode(current, next Loop2RuntimeMode) Loop2RuntimeMode {
|
||||
if loop2ModeRank(next) > loop2ModeRank(current) {
|
||||
return next
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
||||
func loop2ModeRank(mode Loop2RuntimeMode) int {
|
||||
switch mode {
|
||||
case Loop2RuntimeModeNeedsRebuild:
|
||||
return 3
|
||||
case Loop2RuntimeModeDegraded:
|
||||
return 2
|
||||
case Loop2RuntimeModeCatchingUp:
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
@@ -1519,6 +1519,226 @@ func TestTransportEvidenceAdapter_GatedFailoverFlow(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoop2RuntimeSession_KeepUpOnHealthyReplicaSet(t *testing.T) {
|
||||
nodeB, err := New(Config{NodeID: "node-b"})
|
||||
if err != nil {
|
||||
t.Fatalf("new node-b: %v", err)
|
||||
}
|
||||
defer nodeB.Close()
|
||||
|
||||
tempDir := t.TempDir()
|
||||
pathB := filepath.Join(tempDir, "loop2-keepup-b.blk")
|
||||
assignB := masterv2.Assignment{
|
||||
Name: "loop2-keepup-vol",
|
||||
Path: pathB,
|
||||
NodeID: "node-b",
|
||||
Epoch: 3,
|
||||
LeaseTTL: 30 * time.Second,
|
||||
CreateOptions: testCreateOptions(),
|
||||
Role: "primary",
|
||||
}
|
||||
if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil {
|
||||
t.Fatalf("seed node-b: %v", err)
|
||||
}
|
||||
payload := bytes.Repeat([]byte{0x71}, 4096)
|
||||
if err := nodeB.WriteLBA("loop2-keepup-vol", 0, payload); err != nil {
|
||||
t.Fatalf("write node-b: %v", err)
|
||||
}
|
||||
if err := nodeB.SyncCache("loop2-keepup-vol"); err != nil {
|
||||
t.Fatalf("sync node-b: %v", err)
|
||||
}
|
||||
primary := mustReplicaSummary(t, nodeB, "loop2-keepup-vol", 3)
|
||||
|
||||
session, err := NewLoop2RuntimeSession("loop2-keepup-vol", "node-b", 3, []FailoverTarget{
|
||||
mustInProcessFailoverTarget(t, nodeB),
|
||||
staticFailoverTarget(
|
||||
"node-c",
|
||||
masterv2.PromotionQueryResponse{VolumeName: "loop2-keepup-vol", NodeID: "node-c"},
|
||||
protocolv2.ReplicaSummaryResponse{
|
||||
VolumeName: "loop2-keepup-vol",
|
||||
NodeID: "node-c",
|
||||
Epoch: 3,
|
||||
Role: "replica",
|
||||
Mode: "replica_ready",
|
||||
RoleApplied: true,
|
||||
ReceiverReady: true,
|
||||
CommittedLSN: primary.CommittedLSN,
|
||||
DurableLSN: primary.DurableLSN,
|
||||
CheckpointLSN: primary.CheckpointLSN,
|
||||
LastBarrierOK: true,
|
||||
Eligible: true,
|
||||
},
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("new loop2 runtime session: %v", err)
|
||||
}
|
||||
snap, err := session.ObserveOnce()
|
||||
if err != nil {
|
||||
t.Fatalf("observe loop2 keepup: %v", err)
|
||||
}
|
||||
if snap.Mode != Loop2RuntimeModeKeepUp {
|
||||
t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeKeepUp)
|
||||
}
|
||||
if snap.HealthyReplicaCount != 2 {
|
||||
t.Fatalf("healthy replicas=%d, want 2", snap.HealthyReplicaCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInProcessRuntimeManager_ObserveLoop2_CatchingUp(t *testing.T) {
|
||||
master := masterv2.New(masterv2.Config{})
|
||||
manager, err := NewInProcessRuntimeManager(master)
|
||||
if err != nil {
|
||||
t.Fatalf("new runtime manager: %v", err)
|
||||
}
|
||||
nodeB, err := New(Config{NodeID: "node-b"})
|
||||
if err != nil {
|
||||
t.Fatalf("new node-b: %v", err)
|
||||
}
|
||||
defer nodeB.Close()
|
||||
if err := manager.RegisterNode(nodeB); err != nil {
|
||||
t.Fatalf("register node-b: %v", err)
|
||||
}
|
||||
|
||||
tempDir := t.TempDir()
|
||||
pathB := filepath.Join(tempDir, "loop2-catchup-b.blk")
|
||||
assignB := masterv2.Assignment{
|
||||
Name: "loop2-catchup-vol",
|
||||
Path: pathB,
|
||||
NodeID: "node-b",
|
||||
Epoch: 4,
|
||||
LeaseTTL: 30 * time.Second,
|
||||
CreateOptions: testCreateOptions(),
|
||||
Role: "primary",
|
||||
}
|
||||
if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil {
|
||||
t.Fatalf("seed node-b: %v", err)
|
||||
}
|
||||
payload := bytes.Repeat([]byte{0x72}, 4096)
|
||||
if err := nodeB.WriteLBA("loop2-catchup-vol", 0, payload); err != nil {
|
||||
t.Fatalf("write node-b: %v", err)
|
||||
}
|
||||
if err := nodeB.SyncCache("loop2-catchup-vol"); err != nil {
|
||||
t.Fatalf("sync node-b: %v", err)
|
||||
}
|
||||
primary := mustReplicaSummary(t, nodeB, "loop2-catchup-vol", 4)
|
||||
|
||||
if err := manager.RegisterTarget(staticFailoverTarget(
|
||||
"node-c",
|
||||
masterv2.PromotionQueryResponse{VolumeName: "loop2-catchup-vol", NodeID: "node-c"},
|
||||
protocolv2.ReplicaSummaryResponse{
|
||||
VolumeName: "loop2-catchup-vol",
|
||||
NodeID: "node-c",
|
||||
Epoch: 4,
|
||||
Role: "replica",
|
||||
Mode: "replica_ready",
|
||||
RoleApplied: true,
|
||||
ReceiverReady: true,
|
||||
CommittedLSN: primary.CommittedLSN - 1,
|
||||
DurableLSN: primary.CommittedLSN - 1,
|
||||
CheckpointLSN: primary.CheckpointLSN,
|
||||
TargetLSN: primary.CommittedLSN,
|
||||
AchievedLSN: primary.CommittedLSN - 1,
|
||||
RecoveryPhase: "catching_up",
|
||||
LastBarrierOK: true,
|
||||
Eligible: true,
|
||||
},
|
||||
)); err != nil {
|
||||
t.Fatalf("register node-c target: %v", err)
|
||||
}
|
||||
|
||||
snap, err := manager.ObserveLoop2("loop2-catchup-vol", "node-b", 4)
|
||||
if err != nil {
|
||||
t.Fatalf("observe loop2 catchup: %v", err)
|
||||
}
|
||||
if snap.Mode != Loop2RuntimeModeCatchingUp {
|
||||
t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeCatchingUp)
|
||||
}
|
||||
if snap.Reason == "" {
|
||||
t.Fatal("expected loop2 catching_up reason")
|
||||
}
|
||||
lastSnap, ok := manager.LastLoop2Snapshot()
|
||||
if !ok {
|
||||
t.Fatal("expected last loop2 snapshot")
|
||||
}
|
||||
if lastSnap.Mode != Loop2RuntimeModeCatchingUp {
|
||||
t.Fatalf("last loop2 mode=%q, want %q", lastSnap.Mode, Loop2RuntimeModeCatchingUp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInProcessRuntimeManager_ObserveLoop2_NeedsRebuild(t *testing.T) {
|
||||
master := masterv2.New(masterv2.Config{})
|
||||
manager, err := NewInProcessRuntimeManager(master)
|
||||
if err != nil {
|
||||
t.Fatalf("new runtime manager: %v", err)
|
||||
}
|
||||
nodeB, err := New(Config{NodeID: "node-b"})
|
||||
if err != nil {
|
||||
t.Fatalf("new node-b: %v", err)
|
||||
}
|
||||
defer nodeB.Close()
|
||||
if err := manager.RegisterNode(nodeB); err != nil {
|
||||
t.Fatalf("register node-b: %v", err)
|
||||
}
|
||||
|
||||
tempDir := t.TempDir()
|
||||
pathB := filepath.Join(tempDir, "loop2-rebuild-b.blk")
|
||||
assignB := masterv2.Assignment{
|
||||
Name: "loop2-rebuild-vol",
|
||||
Path: pathB,
|
||||
NodeID: "node-b",
|
||||
Epoch: 5,
|
||||
LeaseTTL: 30 * time.Second,
|
||||
CreateOptions: testCreateOptions(),
|
||||
Role: "primary",
|
||||
}
|
||||
if err := nodeB.ApplyAssignments([]masterv2.Assignment{assignB}); err != nil {
|
||||
t.Fatalf("seed node-b: %v", err)
|
||||
}
|
||||
|
||||
if err := manager.RegisterTarget(staticFailoverTarget(
|
||||
"node-c",
|
||||
masterv2.PromotionQueryResponse{VolumeName: "loop2-rebuild-vol", NodeID: "node-c"},
|
||||
protocolv2.ReplicaSummaryResponse{
|
||||
VolumeName: "loop2-rebuild-vol",
|
||||
NodeID: "node-c",
|
||||
Epoch: 5,
|
||||
Role: "replica",
|
||||
Mode: "needs_rebuild",
|
||||
RoleApplied: true,
|
||||
ReceiverReady: false,
|
||||
CommittedLSN: 1,
|
||||
DurableLSN: 1,
|
||||
CheckpointLSN: 1,
|
||||
RecoveryPhase: "needs_rebuild",
|
||||
LastBarrierOK: false,
|
||||
LastBarrierReason: "timeout",
|
||||
Eligible: false,
|
||||
Reason: "needs_rebuild",
|
||||
},
|
||||
)); err != nil {
|
||||
t.Fatalf("register node-c target: %v", err)
|
||||
}
|
||||
|
||||
snap, err := manager.ObserveLoop2("loop2-rebuild-vol", "node-b", 5)
|
||||
if err != nil {
|
||||
t.Fatalf("observe loop2 needs_rebuild: %v", err)
|
||||
}
|
||||
if snap.Mode != Loop2RuntimeModeNeedsRebuild {
|
||||
t.Fatalf("loop2 mode=%q, want %q", snap.Mode, Loop2RuntimeModeNeedsRebuild)
|
||||
}
|
||||
if snap.Reason != "needs_rebuild" {
|
||||
t.Fatalf("loop2 reason=%q, want needs_rebuild", snap.Reason)
|
||||
}
|
||||
perVolSnap, ok := manager.Loop2Snapshot("loop2-rebuild-vol")
|
||||
if !ok {
|
||||
t.Fatal("expected per-volume loop2 snapshot")
|
||||
}
|
||||
if perVolSnap.Mode != Loop2RuntimeModeNeedsRebuild {
|
||||
t.Fatalf("per-volume loop2 mode=%q, want %q", perVolSnap.Mode, Loop2RuntimeModeNeedsRebuild)
|
||||
}
|
||||
}
|
||||
|
||||
func mustHeartbeat(t *testing.T, node *Node) masterv2.NodeHeartbeat {
|
||||
t.Helper()
|
||||
hb, err := node.Heartbeat()
|
||||
@@ -1540,6 +1760,18 @@ func mustPromotionEvidence(t *testing.T, node *Node, volumeName string, epoch ui
|
||||
return resp
|
||||
}
|
||||
|
||||
func mustReplicaSummary(t *testing.T, node *Node, volumeName string, epoch uint64) protocolv2.ReplicaSummaryResponse {
|
||||
t.Helper()
|
||||
resp, err := node.QueryReplicaSummary(protocolv2.ReplicaSummaryRequest{
|
||||
VolumeName: volumeName,
|
||||
ExpectedEpoch: epoch,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("replica summary: %v", err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func mustInProcessFailoverTarget(t *testing.T, node *Node) FailoverTarget {
|
||||
t.Helper()
|
||||
target, err := NewInProcessFailoverTarget(node)
|
||||
|
||||
@@ -15,12 +15,15 @@ type InProcessRuntimeManager struct {
|
||||
driver *InProcessFailoverDriver
|
||||
evidenceTransport *InMemoryFailoverEvidenceTransport
|
||||
|
||||
mu sync.RWMutex
|
||||
lastSnapshot FailoverSnapshot
|
||||
lastResult FailoverResult
|
||||
hasLastResult bool
|
||||
snapshotsByName map[string]FailoverSnapshot
|
||||
resultsByName map[string]FailoverResult
|
||||
mu sync.RWMutex
|
||||
lastSnapshot FailoverSnapshot
|
||||
lastResult FailoverResult
|
||||
hasLastResult bool
|
||||
snapshotsByName map[string]FailoverSnapshot
|
||||
resultsByName map[string]FailoverResult
|
||||
lastLoop2Snapshot Loop2RuntimeSnapshot
|
||||
hasLastLoop2 bool
|
||||
loop2ByVolume map[string]Loop2RuntimeSnapshot
|
||||
}
|
||||
|
||||
// NewInProcessRuntimeManager creates a runtime-owned failover manager over one
|
||||
@@ -35,6 +38,7 @@ func NewInProcessRuntimeManager(master *masterv2.Master) (*InProcessRuntimeManag
|
||||
evidenceTransport: NewInMemoryFailoverEvidenceTransport(),
|
||||
snapshotsByName: make(map[string]FailoverSnapshot),
|
||||
resultsByName: make(map[string]FailoverResult),
|
||||
loop2ByVolume: make(map[string]Loop2RuntimeSnapshot),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -114,6 +118,34 @@ func (m *InProcessRuntimeManager) ExecuteFailover(volumeName string, expectedEpo
|
||||
return result, runErr
|
||||
}
|
||||
|
||||
// NewLoop2RuntimeSession resolves runtime-owned targets and creates an active
|
||||
// Loop 2 session for one selected primary.
|
||||
func (m *InProcessRuntimeManager) NewLoop2RuntimeSession(volumeName, primaryNodeID string, expectedEpoch uint64, nodeIDs ...string) (*Loop2RuntimeSession, error) {
|
||||
if m == nil || m.driver == nil {
|
||||
return nil, fmt.Errorf("volumev2: runtime manager is nil")
|
||||
}
|
||||
targets, err := m.driver.resolveTargets(nodeIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewLoop2RuntimeSession(volumeName, primaryNodeID, expectedEpoch, targets)
|
||||
}
|
||||
|
||||
// ObserveLoop2 runs one active Loop 2 observation and persists the latest
|
||||
// runtime snapshot.
|
||||
func (m *InProcessRuntimeManager) ObserveLoop2(volumeName, primaryNodeID string, expectedEpoch uint64, nodeIDs ...string) (Loop2RuntimeSnapshot, error) {
|
||||
if m == nil {
|
||||
return Loop2RuntimeSnapshot{}, fmt.Errorf("volumev2: runtime manager is nil")
|
||||
}
|
||||
session, err := m.NewLoop2RuntimeSession(volumeName, primaryNodeID, expectedEpoch, nodeIDs...)
|
||||
if err != nil {
|
||||
return Loop2RuntimeSnapshot{}, err
|
||||
}
|
||||
snapshot, obsErr := session.ObserveOnce()
|
||||
m.recordLoop2Snapshot(volumeName, session.Snapshot())
|
||||
return snapshot, obsErr
|
||||
}
|
||||
|
||||
// LastFailoverSnapshot returns the most recent runtime-owned failover snapshot.
|
||||
func (m *InProcessRuntimeManager) LastFailoverSnapshot() (FailoverSnapshot, bool) {
|
||||
if m == nil {
|
||||
@@ -162,6 +194,31 @@ func (m *InProcessRuntimeManager) FailoverResult(volumeName string) (FailoverRes
|
||||
return result, ok
|
||||
}
|
||||
|
||||
// LastLoop2Snapshot returns the most recent active Loop 2 runtime snapshot.
|
||||
func (m *InProcessRuntimeManager) LastLoop2Snapshot() (Loop2RuntimeSnapshot, bool) {
|
||||
if m == nil {
|
||||
return Loop2RuntimeSnapshot{}, false
|
||||
}
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
if !m.hasLastLoop2 {
|
||||
return Loop2RuntimeSnapshot{}, false
|
||||
}
|
||||
return m.lastLoop2Snapshot, true
|
||||
}
|
||||
|
||||
// Loop2Snapshot returns the latest active Loop 2 runtime snapshot for one
|
||||
// volume if present.
|
||||
func (m *InProcessRuntimeManager) Loop2Snapshot(volumeName string) (Loop2RuntimeSnapshot, bool) {
|
||||
if m == nil {
|
||||
return Loop2RuntimeSnapshot{}, false
|
||||
}
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
snapshot, ok := m.loop2ByVolume[volumeName]
|
||||
return snapshot, ok
|
||||
}
|
||||
|
||||
func (m *InProcessRuntimeManager) recordSnapshot(volumeName string, snapshot FailoverSnapshot, result FailoverResult) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
@@ -173,3 +230,13 @@ func (m *InProcessRuntimeManager) recordSnapshot(volumeName string, snapshot Fai
|
||||
m.resultsByName[volumeName] = result
|
||||
}
|
||||
}
|
||||
|
||||
func (m *InProcessRuntimeManager) recordLoop2Snapshot(volumeName string, snapshot Loop2RuntimeSnapshot) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.lastLoop2Snapshot = snapshot
|
||||
m.hasLastLoop2 = true
|
||||
if volumeName != "" {
|
||||
m.loop2ByVolume[volumeName] = snapshot
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user