mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-18 07:41:31 +00:00
feat: Phase 12 P3+P4 — diagnosability surfaces, perf floor, rollout gates
P3: Add explicit bounded read-only diagnosis surfaces for all symptom classes: - FailoverDiagnostic: volume-oriented failover state with per-volume DeferredPromotion/PendingRebuild entries and proper timer lifecycle - PublicationDiagnostic: two-read coherence check (LookupBlockVolume vs registry authority) with computed Coherent verdict - RecoveryDiagnostic: minimal ActiveTasks surface (Path A) - Blocker ledger: 3 diagnosed + 3 unresolved, finite, from actual file - Runbook references only exposed surfaces, no internal state P4: Add bounded performance floor + rollout-gate package: - Engine-local floor measurement with explicit IOPS gates per workload - Cost characterization: WAL 2x write amp, -56% replication tax - Rollout gates with semantic cross-checks against cited evidence (baseline numbers, transport/network matrix, blocker counts) - Launch envelope tightened to actually measured combinations only Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
29
sw-block/.private/phase/phase-12-p3-blockers.md
Normal file
29
sw-block/.private/phase/phase-12-p3-blockers.md
Normal file
@@ -0,0 +1,29 @@
|
||||
# Phase 12 P3 — Blocker Ledger
|
||||
|
||||
Date: 2026-04-02
|
||||
Scope: bounded diagnosability / blocker accounting for the accepted RF=2 sync_all chosen path
|
||||
|
||||
## Diagnosed and Bounded
|
||||
|
||||
| ID | Symptom | Evidence Surface | Owning Truth | Status |
|
||||
|----|---------|-----------------|--------------|--------|
|
||||
| B1 | Failover does not converge | failover logs + registry Lookup epoch/primary | registry authority | Diagnosed: convergence depends on lease expiry + heartbeat cycle; bounded by lease TTL |
|
||||
| B2 | Lookup publication stale after failover | LookupBlockVolume response vs registry entry | registry ISCSIAddr/VolumeServer | Diagnosed: publication updates on failover assignment delivery; bounded by assignment queue delivery |
|
||||
| B3 | Recovery tasks remain after volume delete | RecoveryManager.DiagnosticSnapshot | RecoveryManager task map | Diagnosed: tasks drain on shutdown/cancel; bounded by RecoveryManager lifecycle |
|
||||
|
||||
## Unresolved but Explicit
|
||||
|
||||
| ID | Symptom | Current Evidence | Why Unresolved | Blocks P4/Rollout? |
|
||||
|----|---------|-----------------|----------------|-------------------|
|
||||
| U1 | V2 engine accepts stale-epoch assignments at orchestrator level | V2 idempotence check skips only same-epoch; lower epoch creates new sender | Engine ApplyAssignment does not check epoch monotonicity on Reconcile | No — V1 HandleAssignment rejects epoch regression; V2 is secondary |
|
||||
| U2 | Single-process test cannot exercise Primary→Rebuilding role transition | HandleAssignment rejects transition in shared store | Test harness limitation, not production bug | No — production VS has separate stores |
|
||||
| U3 | gRPC stream transport not exercised in control-loop tests | All logic above/below stream is real; stream itself bypassed | Would require live master+VS gRPC servers in test | Blocks full integration test, not correctness |
|
||||
|
||||
## Out of Scope for P3
|
||||
|
||||
- Performance floor characterization
|
||||
- Rollout-gate criteria
|
||||
- Hours/days soak
|
||||
- RF>2 topology
|
||||
- NVMe runtime transport proof
|
||||
- CSI snapshot/expand
|
||||
70
sw-block/.private/phase/phase-12-p3-runbook.md
Normal file
70
sw-block/.private/phase/phase-12-p3-runbook.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Phase 12 P3 — Bounded Runbook
|
||||
|
||||
Scope: diagnosis of three symptom classes on the accepted RF=2 sync_all chosen path.
|
||||
|
||||
All diagnosis steps reference ONLY explicit bounded read-only surfaces:
|
||||
- `LookupBlockVolume` — gRPC RPC returning current primary VS + iSCSI address
|
||||
- `FailoverDiagnostic` — volume-oriented failover state snapshot
|
||||
- `PublicationDiagnostic` — lookup vs authority coherence snapshot
|
||||
- `RecoveryDiagnostic` — active recovery task set snapshot
|
||||
- Blocker ledger — finite file at `phase-12-p3-blockers.md`
|
||||
|
||||
## S1: Failover/Recovery Convergence Stall
|
||||
|
||||
**Visible symptom:** Volume remains unavailable after a VS death; lookup still returns the old primary.
|
||||
|
||||
**Diagnosis surfaces:**
|
||||
- `LookupBlockVolume(volumeName)` — check if `VolumeServer` is still the dead server
|
||||
- `FailoverDiagnostic` — check `Volumes[]` for the affected volume
|
||||
|
||||
**Diagnosis steps:**
|
||||
1. Call `LookupBlockVolume(volumeName)`. If `VolumeServer` changed from the dead server, failover succeeded.
|
||||
2. If unchanged: read `FailoverDiagnostic`. Find the volume by name in `Volumes[]`.
|
||||
3. If found with `DeferredPromotion=true`: lease-wait — failover is deferred until lease expires.
|
||||
4. If found with `PendingRebuild=true`: failover completed, rebuild is pending for the dead server.
|
||||
5. If `DeferredPromotionCount[deadServer] > 0` in the aggregate: deferred promotions are queued.
|
||||
6. If the volume does not appear in either lookup change or `FailoverDiagnostic`: escalate.
|
||||
|
||||
**Conclusion classes (from surfaces only):**
|
||||
- **Lease-wait:** `FailoverDiagnostic.DeferredPromotionCount[deadServer] > 0` — normal, bounded by lease TTL.
|
||||
- **Rebuild-pending:** `FailoverDiagnostic.Volumes[].PendingRebuild=true` — failover done, rebuild queued.
|
||||
- **Converged:** `LookupBlockVolume` shows new primary, no failover entries — resolved.
|
||||
- **Unresolved:** None of the above — escalate.
|
||||
|
||||
## S2: Publication/Lookup Mismatch
|
||||
|
||||
**Visible symptom:** `LookupBlockVolume` returns an iSCSI address or volume server that doesn't match expected state.
|
||||
|
||||
**Diagnosis surfaces:**
|
||||
- `LookupBlockVolume(volumeName)` — operator-visible publication
|
||||
- `PublicationDiagnostic` — explicit coherence check (lookup vs authority)
|
||||
|
||||
**Diagnosis steps:**
|
||||
1. Call `PublicationDiagnosticFor(volumeName)`. Check `Coherent` field.
|
||||
2. If `Coherent=true`: lookup matches registry authority — no mismatch.
|
||||
3. If `Coherent=false`: read `Reason` for explanation. Compare `LookupVolumeServer` vs `AuthorityVolumeServer` and `LookupIscsiAddr` vs `AuthorityIscsiAddr`.
|
||||
4. Cross-check with `LookupBlockVolume` directly: repeated lookups should be self-consistent.
|
||||
|
||||
**Conclusion classes (from surfaces only):**
|
||||
- **Coherent:** `PublicationDiagnostic.Coherent=true` — no mismatch.
|
||||
- **Stale client:** Coherent but client sees old value — bounded by client re-query.
|
||||
- **Unresolved:** `PublicationDiagnostic.Coherent=false` with no transient cause — escalate.
|
||||
|
||||
## S3: Leftover Runtime Work After Convergence
|
||||
|
||||
**Visible symptom:** After volume deletion or steady-state convergence, recovery tasks should have drained.
|
||||
|
||||
**Diagnosis surfaces:**
|
||||
- `RecoveryDiagnostic` — `ActiveTasks` list (replicaIDs with active recovery work)
|
||||
|
||||
**Diagnosis steps:**
|
||||
1. Call `RecoveryManager.DiagnosticSnapshot()`. Read `ActiveTasks`.
|
||||
2. If `ActiveTasks` is empty: clean — no leftover work.
|
||||
3. If non-empty: check whether any task replicaID contains the deleted volume's path.
|
||||
4. If a deleted volume's replicaID is present in `ActiveTasks`: residue — escalate.
|
||||
5. If all tasks are for live volumes: non-empty but expected — normal in-flight work.
|
||||
|
||||
**Conclusion classes (from surfaces only):**
|
||||
- **Clean:** `RecoveryDiagnostic.ActiveTasks` is empty — runtime converged.
|
||||
- **Non-empty, no residue:** Tasks present but none for the deleted/converged volume — normal.
|
||||
- **Residue:** Deleted volume's replicaID still in `ActiveTasks` — escalate.
|
||||
101
sw-block/.private/phase/phase-12-p4-floor.md
Normal file
101
sw-block/.private/phase/phase-12-p4-floor.md
Normal file
@@ -0,0 +1,101 @@
|
||||
# Phase 12 P4 — Performance Floor Summary
|
||||
|
||||
Date: 2026-04-02
|
||||
Scope: bounded performance floor for the accepted RF=2, sync_all chosen path.
|
||||
|
||||
## Workload Envelope
|
||||
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Topology | RF=2, sync_all |
|
||||
| Operations | 4K random write, 4K random read, sequential write, sequential read |
|
||||
| Runtime | Steady-state, no failover, no disturbance |
|
||||
| Path | Accepted chosen path (same as P1/P2/P3) |
|
||||
|
||||
## Environment
|
||||
|
||||
### Unit Test Harness (engine-local)
|
||||
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Name | `TestP12P4_PerformanceFloor_Bounded` |
|
||||
| Location | `weed/server/qa_block_perf_test.go` |
|
||||
| Platform | Single-process, local disk |
|
||||
| Volume | 64MB, 4K blocks, 16MB WAL |
|
||||
| Writer | Single-threaded (worst-case for group commit) |
|
||||
| Replication | Not exercised (engine-local only) |
|
||||
| Measurement | Worst of 3 iterations (floor, not peak) |
|
||||
|
||||
### Production Baseline (cross-machine)
|
||||
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Name | `baseline-roce-20260401` |
|
||||
| Location | `learn/projects/sw-block/test/results/baseline-roce-20260401.md` |
|
||||
| Hardware | m01 (10.0.0.1) - M02 (10.0.0.3), 25Gbps RoCE |
|
||||
| Protocol | NVMe-TCP |
|
||||
| Volume | 2GB, RF=2, sync_all, cross-machine replication |
|
||||
| Writer | fio, QD1-128, j=4 |
|
||||
|
||||
## Floor Table: Production (RF=2, sync_all, NVMe-TCP, 25Gbps RoCE)
|
||||
|
||||
These are measured floor values from the production baseline, not the unit test.
|
||||
|
||||
| Workload | Floor IOPS | Notes |
|
||||
|----------|-----------|-------|
|
||||
| 4K random write QD1 | 28,347 | Barrier round-trip limited (flat across QD) |
|
||||
| 4K random write QD32 | 28,453 | Same barrier ceiling |
|
||||
| 4K random read QD32 | 136,648 | No replication overhead |
|
||||
| Mixed 70/30 QD32 | 28,423 | Write-side limited |
|
||||
|
||||
Latency: Write latency is bounded by sync_all barrier round-trip (~35us at QD1).
|
||||
Read latency: sub-microsecond for cached, single-digit microseconds for extent.
|
||||
|
||||
## Floor Table: Engine-Local (unit test harness)
|
||||
|
||||
These values are measured by `TestP12P4_PerformanceFloor_Bounded` on the dev machine.
|
||||
They characterize the engine I/O floor WITHOUT transport or replication.
|
||||
Actual values vary by hardware; the test produces them on each run.
|
||||
|
||||
| Workload | Metric | Method | Gate |
|
||||
|----------|--------|--------|------|
|
||||
| 4K random write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 1,000 IOPS, P99 <= 100ms |
|
||||
| 4K random read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 5,000 IOPS |
|
||||
| 4K sequential write | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 2,000 IOPS, P99 <= 100ms |
|
||||
| 4K sequential read | Floor IOPS, Avg/P50/P99/Max latency | Worst of 3 iterations | >= 10,000 IOPS |
|
||||
|
||||
Gate thresholds are regression gates enforced in code (`perfFloorGates` in `qa_block_perf_test.go`).
|
||||
Set at ~10% of measured values to tolerate slow CI/VM hardware while catching catastrophic regressions.
|
||||
|
||||
## Cost Summary
|
||||
|
||||
| Cost | Value | Source |
|
||||
|------|-------|--------|
|
||||
| WAL write amplification | 2x minimum | Engine design: each write → WAL + eventual extent flush |
|
||||
| Replication tax (RF=2 sync_all vs RF=1) | -56% | baseline-roce-20260401.md (NVMe-TCP, 25Gbps RoCE) |
|
||||
| Replication tax (RF=2 sync_all vs RF=1, iSCSI 1Gbps) | -56% | baseline-roce-20260401.md |
|
||||
| Degraded mode penalty (sync_all RF=2, one replica dead) | -66% | baseline-roce-20260401.md (barrier timeout) |
|
||||
| Group commit | 1 fdatasync per batch | Amortizes sync cost across concurrent writers |
|
||||
|
||||
## Acceptance Evidence
|
||||
|
||||
| Item | Evidence | Type |
|
||||
|------|----------|------|
|
||||
| Floor gates pass | `perfFloorGates` thresholds enforced per workload | Acceptance |
|
||||
| Workload runs repeatably | `TestP12P4_PerformanceFloor_Bounded` passes | Acceptance |
|
||||
| Cost statement is bounded | `TestP12P4_CostCharacterization_Bounded` passes | Acceptance |
|
||||
| Production baseline exists | `baseline-roce-20260401.md` with measured values | Acceptance |
|
||||
| Floor is worst-of-N, not peak | Test takes minimum IOPS across 3 iterations | Method |
|
||||
| Regression-safe | Test fails if floor drops below gate (blocks rollout) | Acceptance |
|
||||
| Replication tax documented | -56% from measured production baseline | Support telemetry |
|
||||
|
||||
## What P4 does NOT claim
|
||||
|
||||
- This is not a claim that the measured floor is "good enough" for any specific application.
|
||||
- This does not claim readiness for failover-under-load scenarios.
|
||||
- This does not claim readiness for hours/days soak under load.
|
||||
- This does not claim readiness for RF>2 topologies.
|
||||
- This does not claim readiness for all transport combinations (iSCSI + NVMe + kernel versions).
|
||||
- This does not claim readiness for production rollout beyond the explicitly named launch envelope.
|
||||
- Engine-local floor numbers are not production floor numbers.
|
||||
- The replication tax is measured on one specific hardware configuration and may differ on other hardware.
|
||||
64
sw-block/.private/phase/phase-12-p4-rollout-gates.md
Normal file
64
sw-block/.private/phase/phase-12-p4-rollout-gates.md
Normal file
@@ -0,0 +1,64 @@
|
||||
# Phase 12 P4 — Rollout Gates
|
||||
|
||||
Date: 2026-04-02
|
||||
Scope: bounded first-launch envelope for the accepted RF=2, sync_all chosen path.
|
||||
|
||||
This is a bounded first-launch envelope, not general readiness.
|
||||
|
||||
## Supported Launch Envelope
|
||||
|
||||
Only the transport/network combinations with measured baselines are included.
|
||||
|
||||
| Parameter | Value |
|
||||
|-----------|-------|
|
||||
| Topology | RF=2, sync_all |
|
||||
| Transport + Network | NVMe-TCP @ 25Gbps RoCE (measured), iSCSI @ 25Gbps RoCE (measured), iSCSI @ 1Gbps (measured) |
|
||||
| NOT included | NVMe-TCP @ 1Gbps (not measured) |
|
||||
| Volume size | Up to 2GB (tested baseline) |
|
||||
| Failover | Lease-based, bounded by TTL (30s default) |
|
||||
| Recovery | Catch-up-first, rebuild fallback |
|
||||
| Degraded mode | Documented -66% write penalty (sync_all RF=2, one replica dead) |
|
||||
|
||||
## Cleared Gates
|
||||
|
||||
| Gate | Evidence | Status | Notes |
|
||||
|------|----------|--------|-------|
|
||||
| G1 | P1 disturbance tests pass | Cleared | Restart/reconnect correctness under disturbance |
|
||||
| G2 | P2 soak tests pass | Cleared | Repeated create/failover/recover cycles, no drift |
|
||||
| G3 | P3 diagnosability tests pass | Cleared | Explicit bounded diagnosis surfaces for all symptom classes |
|
||||
| G4 | P4 floor gates pass | Cleared | Explicit IOPS thresholds + P99 ceilings enforced per workload in code |
|
||||
| G5 | P4 cost characterization bounded | Cleared | WAL 2x write amp, -56% replication tax documented |
|
||||
| G6 | Production baseline exists | Cleared | baseline-roce-20260401.md: 28.4K write IOPS, 136.6K read IOPS |
|
||||
| G8 | Floor gates are regression-safe | Cleared | Test fails if any workload drops below defined minimum IOPS or exceeds P99 ceiling |
|
||||
| G7 | Blocker ledger finite | Cleared | 3 diagnosed (B1-B3) + 3 unresolved (U1-U3), all explicit |
|
||||
|
||||
## Remaining Blockers / Exclusions
|
||||
|
||||
| Exclusion | Why | Impact |
|
||||
|-----------|-----|--------|
|
||||
| E1 | Failover-under-load perf not measured | Cannot claim bounded perf during failover |
|
||||
| E2 | Hours/days soak not run | Cannot claim long-run stability under sustained load |
|
||||
| E3 | RF>2 not measured | Cannot claim perf floor for RF=3+ |
|
||||
| E4 | Broad transport matrix not tested | Cannot claim parity across all kernel/NVMe/iSCSI versions |
|
||||
| E5 | Degraded mode is severe (-66%) | sync_all RF=2 has sharp write cliff on replica death |
|
||||
| E6 | V2 stale-epoch at orchestrator level (U1 from P3) | V1 guards suffice; V2 is secondary path |
|
||||
| E7 | gRPC stream transport not exercised in unit tests (U3 from P3) | Blocks full integration test, not correctness |
|
||||
|
||||
## Reject Conditions
|
||||
|
||||
This launch envelope should be REJECTED if:
|
||||
|
||||
1. Any P1/P2/P3 test regresses (correctness/stability/diagnosability gate violated)
|
||||
2. Production baseline numbers are not reproducible on the target hardware
|
||||
3. Degraded mode behavior (-66% cliff) is not acceptable for the deployment scenario
|
||||
4. The deployment requires RF>2, failover-under-load guarantees, or long soak proof
|
||||
5. The deployment requires transport combinations not covered by the baseline
|
||||
|
||||
## What P4 does NOT claim
|
||||
|
||||
- This does not claim general production readiness.
|
||||
- This does not claim readiness for any deployment outside the named launch envelope.
|
||||
- This does not claim that the performance floor is optimal or final.
|
||||
- This does not claim that the degraded-mode penalty is acceptable (deployment-specific decision).
|
||||
- This does not claim hours/days stability under sustained load.
|
||||
- This is a bounded first-launch gate, not a broad rollout approval.
|
||||
374
weed/server/block_recovery.go
Normal file
374
weed/server/block_recovery.go
Normal file
@@ -0,0 +1,374 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
bridge "github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol"
|
||||
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
|
||||
)
|
||||
|
||||
// recoveryTask tracks a live recovery goroutine for one replica target.
|
||||
// The task pointer serves as identity token — only the goroutine that owns
|
||||
// THIS pointer may mark it as done.
|
||||
type recoveryTask struct {
|
||||
replicaID string
|
||||
cancel context.CancelFunc
|
||||
done chan struct{} // closed when the goroutine exits
|
||||
}
|
||||
|
||||
// RecoveryManager owns live recovery execution for all replica targets.
|
||||
//
|
||||
// Ownership model:
|
||||
// - At most one recovery goroutine per replicaID at any time.
|
||||
// - On supersede/replace: the old goroutine is cancelled AND drained
|
||||
// before the replacement starts. No overlap.
|
||||
// - Cancellation: context cancel + session invalidation (for removal/shutdown).
|
||||
// For supersede: context cancel only (engine already attached replacement session).
|
||||
type RecoveryManager struct {
|
||||
bs *BlockService
|
||||
|
||||
mu sync.Mutex
|
||||
tasks map[string]*recoveryTask
|
||||
wg sync.WaitGroup
|
||||
|
||||
// TestHook: if set, called before execution starts. Tests use this
|
||||
// to hold the goroutine alive for serialized-replacement proofs.
|
||||
OnBeforeExecute func(replicaID string)
|
||||
}
|
||||
|
||||
func NewRecoveryManager(bs *BlockService) *RecoveryManager {
|
||||
return &RecoveryManager{
|
||||
bs: bs,
|
||||
tasks: make(map[string]*recoveryTask),
|
||||
}
|
||||
}
|
||||
|
||||
// HandleAssignmentResult processes the engine's assignment result.
|
||||
//
|
||||
// Engine result semantics:
|
||||
// - SessionsCreated: new session, start goroutine
|
||||
// - SessionsSuperseded: old replaced by new — cancel+drain old, start new
|
||||
// - Removed: sender gone — cancel+drain, invalidate session
|
||||
func (rm *RecoveryManager) HandleAssignmentResult(result engine.AssignmentResult, assignments []blockvol.BlockVolumeAssignment) {
|
||||
// Removed: cancel + invalidate + drain.
|
||||
for _, replicaID := range result.Removed {
|
||||
rm.cancelAndDrain(replicaID, true)
|
||||
}
|
||||
|
||||
// Superseded: cancel + drain (no invalidate — engine has replacement session),
|
||||
// then start new.
|
||||
for _, replicaID := range result.SessionsSuperseded {
|
||||
rm.cancelAndDrain(replicaID, false)
|
||||
rm.startTask(replicaID, assignments)
|
||||
}
|
||||
|
||||
// Created: start new (cancel stale defensively).
|
||||
for _, replicaID := range result.SessionsCreated {
|
||||
rm.cancelAndDrain(replicaID, false)
|
||||
rm.startTask(replicaID, assignments)
|
||||
}
|
||||
}
|
||||
|
||||
// cancelAndDrain cancels a running task and WAITS for it to exit.
|
||||
// This ensures no overlap between old and new owners.
|
||||
func (rm *RecoveryManager) cancelAndDrain(replicaID string, invalidateSession bool) {
|
||||
rm.mu.Lock()
|
||||
task, ok := rm.tasks[replicaID]
|
||||
if !ok {
|
||||
rm.mu.Unlock()
|
||||
return
|
||||
}
|
||||
glog.V(1).Infof("recovery: cancelling+draining task for %s (invalidate=%v)", replicaID, invalidateSession)
|
||||
task.cancel()
|
||||
if invalidateSession && rm.bs.v2Orchestrator != nil {
|
||||
if s := rm.bs.v2Orchestrator.Registry.Sender(replicaID); s != nil {
|
||||
s.InvalidateSession("recovery_removed", engine.StateDisconnected)
|
||||
}
|
||||
}
|
||||
delete(rm.tasks, replicaID)
|
||||
doneCh := task.done
|
||||
rm.mu.Unlock()
|
||||
|
||||
// Wait for the old goroutine to exit OUTSIDE the lock.
|
||||
// This serializes replacement: new task cannot start until old is fully drained.
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
// startTask creates and starts a new recovery goroutine. Caller must ensure
|
||||
// no existing task for this replicaID (call cancelAndDrain first).
|
||||
func (rm *RecoveryManager) startTask(replicaID string, assignments []blockvol.BlockVolumeAssignment) {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
|
||||
rebuildAddr := rm.deriveRebuildAddr(replicaID, assignments)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
task := &recoveryTask{
|
||||
replicaID: replicaID,
|
||||
cancel: cancel,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
rm.tasks[replicaID] = task
|
||||
|
||||
rm.wg.Add(1)
|
||||
go rm.runRecovery(ctx, task, rebuildAddr)
|
||||
}
|
||||
|
||||
// Shutdown cancels all active recovery tasks and waits for drain.
|
||||
func (rm *RecoveryManager) Shutdown() {
|
||||
rm.mu.Lock()
|
||||
for _, task := range rm.tasks {
|
||||
task.cancel()
|
||||
if rm.bs.v2Orchestrator != nil {
|
||||
if s := rm.bs.v2Orchestrator.Registry.Sender(task.replicaID); s != nil {
|
||||
s.InvalidateSession("recovery_shutdown", engine.StateDisconnected)
|
||||
}
|
||||
}
|
||||
}
|
||||
rm.tasks = make(map[string]*recoveryTask)
|
||||
rm.mu.Unlock()
|
||||
rm.wg.Wait()
|
||||
}
|
||||
|
||||
// ActiveTaskCount returns the number of active recovery tasks (for testing).
|
||||
func (rm *RecoveryManager) ActiveTaskCount() int {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
return len(rm.tasks)
|
||||
}
|
||||
|
||||
// DiagnosticSnapshot returns a bounded read-only snapshot of active recovery
|
||||
// tasks for operator-visible diagnosis. Each entry shows the replicaID being
|
||||
// recovered. This is the P3 diagnosability surface — read-only, no semantics.
|
||||
type RecoveryDiagnostic struct {
|
||||
ActiveTasks []string // replicaIDs with active recovery work
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) DiagnosticSnapshot() RecoveryDiagnostic {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
diag := RecoveryDiagnostic{}
|
||||
for id := range rm.tasks {
|
||||
diag.ActiveTasks = append(diag.ActiveTasks, id)
|
||||
}
|
||||
return diag
|
||||
}
|
||||
|
||||
// runRecovery is the recovery goroutine for one replica target.
|
||||
func (rm *RecoveryManager) runRecovery(ctx context.Context, task *recoveryTask, rebuildAddr string) {
|
||||
defer rm.wg.Done()
|
||||
defer close(task.done) // signal drain completion
|
||||
defer func() {
|
||||
rm.mu.Lock()
|
||||
// Only delete if we're still the active task (pointer comparison).
|
||||
if rm.tasks[task.replicaID] == task {
|
||||
delete(rm.tasks, task.replicaID)
|
||||
}
|
||||
rm.mu.Unlock()
|
||||
}()
|
||||
|
||||
replicaID := task.replicaID
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
orch := rm.bs.v2Orchestrator
|
||||
s := orch.Registry.Sender(replicaID)
|
||||
if s == nil {
|
||||
glog.V(1).Infof("recovery: sender %s not found, skipping", replicaID)
|
||||
return
|
||||
}
|
||||
|
||||
sessSnap := s.SessionSnapshot()
|
||||
if sessSnap == nil {
|
||||
glog.V(1).Infof("recovery: sender %s has no active session, skipping", replicaID)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("recovery: starting %s session for %s (rebuildAddr=%s)",
|
||||
sessSnap.Kind, replicaID, rebuildAddr)
|
||||
|
||||
if rm.OnBeforeExecute != nil {
|
||||
rm.OnBeforeExecute(replicaID)
|
||||
}
|
||||
|
||||
switch sessSnap.Kind {
|
||||
case engine.SessionCatchUp:
|
||||
rm.runCatchUp(ctx, replicaID, rebuildAddr)
|
||||
case engine.SessionRebuild:
|
||||
rm.runRebuild(ctx, replicaID, rebuildAddr)
|
||||
default:
|
||||
glog.V(1).Infof("recovery: unknown session kind %s for %s", sessSnap.Kind, replicaID)
|
||||
}
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) runCatchUp(ctx context.Context, replicaID, rebuildAddr string) {
|
||||
bs := rm.bs
|
||||
volPath := rm.volumePathForReplica(replicaID)
|
||||
if volPath == "" {
|
||||
glog.Warningf("recovery: cannot determine volume path for %s", replicaID)
|
||||
return
|
||||
}
|
||||
|
||||
var sa engine.StorageAdapter
|
||||
var replicaFlushedLSN uint64
|
||||
var executor *v2bridge.Executor
|
||||
|
||||
if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error {
|
||||
reader := v2bridge.NewReader(vol)
|
||||
pinner := v2bridge.NewPinner(vol)
|
||||
sa = bridge.NewStorageAdapter(
|
||||
&readerShimForRecovery{reader},
|
||||
&pinnerShimForRecovery{pinner},
|
||||
)
|
||||
if s := bs.v2Orchestrator.Registry.Sender(replicaID); s != nil {
|
||||
if snap := s.SessionSnapshot(); snap != nil {
|
||||
replicaFlushedLSN = snap.StartLSN
|
||||
}
|
||||
}
|
||||
executor = v2bridge.NewExecutor(vol, rebuildAddr)
|
||||
return nil
|
||||
}); err != nil {
|
||||
glog.Warningf("recovery: cannot access volume %s: %v", volPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa}
|
||||
|
||||
plan, err := driver.PlanRecovery(replicaID, replicaFlushedLSN)
|
||||
if err != nil {
|
||||
glog.Warningf("recovery: plan failed for %s: %v", replicaID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
driver.CancelPlan(plan, "context_cancelled")
|
||||
return
|
||||
}
|
||||
|
||||
exec := engine.NewCatchUpExecutor(driver, plan)
|
||||
exec.IO = executor
|
||||
|
||||
if execErr := exec.Execute(nil, 0); execErr != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: catch-up cancelled for %s: %v", replicaID, execErr)
|
||||
} else {
|
||||
glog.Warningf("recovery: catch-up execution failed for %s: %v", replicaID, execErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("recovery: catch-up completed for %s", replicaID)
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) runRebuild(ctx context.Context, replicaID, rebuildAddr string) {
|
||||
bs := rm.bs
|
||||
volPath := rm.volumePathForReplica(replicaID)
|
||||
if volPath == "" {
|
||||
glog.Warningf("recovery: cannot determine volume path for %s", replicaID)
|
||||
return
|
||||
}
|
||||
|
||||
var sa engine.StorageAdapter
|
||||
var executor *v2bridge.Executor
|
||||
|
||||
if err := bs.blockStore.WithVolume(volPath, func(vol *blockvol.BlockVol) error {
|
||||
reader := v2bridge.NewReader(vol)
|
||||
pinner := v2bridge.NewPinner(vol)
|
||||
sa = bridge.NewStorageAdapter(
|
||||
&readerShimForRecovery{reader},
|
||||
&pinnerShimForRecovery{pinner},
|
||||
)
|
||||
executor = v2bridge.NewExecutor(vol, rebuildAddr)
|
||||
return nil
|
||||
}); err != nil {
|
||||
glog.Warningf("recovery: cannot access volume %s: %v", volPath, err)
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
driver := &engine.RecoveryDriver{Orchestrator: bs.v2Orchestrator, Storage: sa}
|
||||
|
||||
plan, err := driver.PlanRebuild(replicaID)
|
||||
if err != nil {
|
||||
glog.Warningf("recovery: rebuild plan failed for %s: %v", replicaID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
driver.CancelPlan(plan, "context_cancelled")
|
||||
return
|
||||
}
|
||||
|
||||
exec := engine.NewRebuildExecutor(driver, plan)
|
||||
exec.IO = executor
|
||||
|
||||
if execErr := exec.Execute(); execErr != nil {
|
||||
if ctx.Err() != nil {
|
||||
glog.V(1).Infof("recovery: rebuild cancelled for %s: %v", replicaID, execErr)
|
||||
} else {
|
||||
glog.Warningf("recovery: rebuild execution failed for %s: %v", replicaID, execErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("recovery: rebuild completed for %s", replicaID)
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) deriveRebuildAddr(replicaID string, assignments []blockvol.BlockVolumeAssignment) string {
|
||||
volPath := rm.volumePathForReplica(replicaID)
|
||||
for _, a := range assignments {
|
||||
if a.Path == volPath && a.RebuildAddr != "" {
|
||||
return a.RebuildAddr
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (rm *RecoveryManager) volumePathForReplica(replicaID string) string {
|
||||
for i := len(replicaID) - 1; i >= 0; i-- {
|
||||
if replicaID[i] == '/' {
|
||||
return replicaID[:i]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// --- Bridge shims ---
|
||||
|
||||
type readerShimForRecovery struct{ r *v2bridge.Reader }
|
||||
|
||||
func (s *readerShimForRecovery) ReadState() bridge.BlockVolState {
|
||||
rs := s.r.ReadState()
|
||||
return bridge.BlockVolState{
|
||||
WALHeadLSN: rs.WALHeadLSN,
|
||||
WALTailLSN: rs.WALTailLSN,
|
||||
CommittedLSN: rs.CommittedLSN,
|
||||
CheckpointLSN: rs.CheckpointLSN,
|
||||
CheckpointTrusted: rs.CheckpointTrusted,
|
||||
}
|
||||
}
|
||||
|
||||
type pinnerShimForRecovery struct{ p *v2bridge.Pinner }
|
||||
|
||||
func (s *pinnerShimForRecovery) HoldWALRetention(startLSN uint64) (func(), error) {
|
||||
return s.p.HoldWALRetention(startLSN)
|
||||
}
|
||||
func (s *pinnerShimForRecovery) HoldSnapshot(checkpointLSN uint64) (func(), error) {
|
||||
return s.p.HoldSnapshot(checkpointLSN)
|
||||
}
|
||||
func (s *pinnerShimForRecovery) HoldFullBase(committedLSN uint64) (func(), error) {
|
||||
return s.p.HoldFullBase(committedLSN)
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
)
|
||||
|
||||
@@ -18,18 +20,139 @@ type pendingRebuild struct {
|
||||
ReplicaCtrlAddr string // CP13-8: saved from before death for catch-up-first recovery
|
||||
}
|
||||
|
||||
// deferredPromotion tracks a deferred promotion timer with its volume context.
|
||||
type deferredPromotion struct {
|
||||
Timer *time.Timer
|
||||
VolumeName string
|
||||
CurrentPrimary string // current (stale) primary that will be replaced
|
||||
AffectedServer string // dead server addr
|
||||
}
|
||||
|
||||
// blockFailoverState holds failover and rebuild state on the master.
|
||||
type blockFailoverState struct {
|
||||
mu sync.Mutex
|
||||
pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds
|
||||
// R2-F2: Track deferred promotion timers so they can be cancelled on reconnect.
|
||||
deferredTimers map[string][]*time.Timer // dead server addr -> pending timers
|
||||
pendingRebuilds map[string][]pendingRebuild // dead server addr -> pending rebuilds
|
||||
deferredTimers map[string][]deferredPromotion // dead server addr -> pending deferred promotions
|
||||
}
|
||||
|
||||
// FailoverVolumeState is one volume's failover diagnosis entry.
|
||||
type FailoverVolumeState struct {
|
||||
VolumeName string
|
||||
CurrentPrimary string
|
||||
AffectedServer string // dead server that triggered the failover/rebuild
|
||||
DeferredPromotion bool // true if a deferred promotion timer is pending
|
||||
PendingRebuild bool // true if a rebuild is pending for this volume
|
||||
Reason string // "lease_wait", "rebuild_pending", or ""
|
||||
}
|
||||
|
||||
// FailoverDiagnostic is a bounded read-only snapshot of failover state
|
||||
// for operator-visible diagnosis. P3 diagnosability surface.
|
||||
//
|
||||
// Volume-oriented: each entry describes one volume's failover state.
|
||||
// Aggregate counts are derived from the volume list.
|
||||
type FailoverDiagnostic struct {
|
||||
Volumes []FailoverVolumeState
|
||||
PendingRebuildCount map[string]int // dead server → count of pending rebuilds
|
||||
DeferredPromotionCount map[string]int // dead server → count of deferred promotion timers
|
||||
}
|
||||
|
||||
func (fs *blockFailoverState) DiagnosticSnapshot() FailoverDiagnostic {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
diag := FailoverDiagnostic{
|
||||
PendingRebuildCount: make(map[string]int),
|
||||
DeferredPromotionCount: make(map[string]int),
|
||||
}
|
||||
for server, rebuilds := range fs.pendingRebuilds {
|
||||
diag.PendingRebuildCount[server] = len(rebuilds)
|
||||
for _, rb := range rebuilds {
|
||||
diag.Volumes = append(diag.Volumes, FailoverVolumeState{
|
||||
VolumeName: rb.VolumeName,
|
||||
CurrentPrimary: rb.NewPrimary,
|
||||
AffectedServer: server,
|
||||
PendingRebuild: true,
|
||||
Reason: "rebuild_pending",
|
||||
})
|
||||
}
|
||||
}
|
||||
for server, promos := range fs.deferredTimers {
|
||||
diag.DeferredPromotionCount[server] = len(promos)
|
||||
for _, dp := range promos {
|
||||
diag.Volumes = append(diag.Volumes, FailoverVolumeState{
|
||||
VolumeName: dp.VolumeName,
|
||||
CurrentPrimary: dp.CurrentPrimary,
|
||||
AffectedServer: dp.AffectedServer,
|
||||
DeferredPromotion: true,
|
||||
Reason: "lease_wait",
|
||||
})
|
||||
}
|
||||
}
|
||||
return diag
|
||||
}
|
||||
|
||||
// PublicationDiagnostic is a bounded read-only snapshot comparing the
|
||||
// operator-visible publication (LookupBlockVolume response) against the
|
||||
// registry authority for one volume. P3 diagnosability surface for S2.
|
||||
type PublicationDiagnostic struct {
|
||||
VolumeName string
|
||||
LookupVolumeServer string // what LookupBlockVolume returns
|
||||
LookupIscsiAddr string
|
||||
AuthorityVolumeServer string // registry entry (source of truth)
|
||||
AuthorityIscsiAddr string
|
||||
Coherent bool // true if lookup == authority
|
||||
Reason string // "" if coherent, otherwise why they diverge
|
||||
}
|
||||
|
||||
// PublicationDiagnosticFor returns a PublicationDiagnostic for the named volume.
|
||||
// It performs two independent reads:
|
||||
// - Lookup side: calls LookupBlockVolume (the actual gRPC method)
|
||||
// - Authority side: reads the registry directly
|
||||
//
|
||||
// Then compares the two. If they diverge, Coherent=false with a Reason.
|
||||
func (ms *MasterServer) PublicationDiagnosticFor(volumeName string) (PublicationDiagnostic, bool) {
|
||||
if ms.blockRegistry == nil {
|
||||
return PublicationDiagnostic{}, false
|
||||
}
|
||||
|
||||
// Read 1: the operator-visible publication surface.
|
||||
lookupResp, err := ms.LookupBlockVolume(context.Background(), &master_pb.LookupBlockVolumeRequest{Name: volumeName})
|
||||
if err != nil {
|
||||
return PublicationDiagnostic{}, false
|
||||
}
|
||||
|
||||
// Read 2: the registry authority (separate read).
|
||||
entry, ok := ms.blockRegistry.Lookup(volumeName)
|
||||
if !ok {
|
||||
return PublicationDiagnostic{}, false
|
||||
}
|
||||
|
||||
diag := PublicationDiagnostic{
|
||||
VolumeName: volumeName,
|
||||
LookupVolumeServer: lookupResp.VolumeServer,
|
||||
LookupIscsiAddr: lookupResp.IscsiAddr,
|
||||
AuthorityVolumeServer: entry.VolumeServer,
|
||||
AuthorityIscsiAddr: entry.ISCSIAddr,
|
||||
}
|
||||
|
||||
// Compare the two reads.
|
||||
vsMatch := diag.LookupVolumeServer == diag.AuthorityVolumeServer
|
||||
iscsiMatch := diag.LookupIscsiAddr == diag.AuthorityIscsiAddr
|
||||
diag.Coherent = vsMatch && iscsiMatch
|
||||
if !diag.Coherent {
|
||||
if !vsMatch {
|
||||
diag.Reason = "volume_server_mismatch"
|
||||
} else {
|
||||
diag.Reason = "iscsi_addr_mismatch"
|
||||
}
|
||||
}
|
||||
|
||||
return diag, true
|
||||
}
|
||||
|
||||
func newBlockFailoverState() *blockFailoverState {
|
||||
return &blockFailoverState{
|
||||
pendingRebuilds: make(map[string][]pendingRebuild),
|
||||
deferredTimers: make(map[string][]*time.Timer),
|
||||
deferredTimers: make(map[string][]deferredPromotion),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +183,11 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) {
|
||||
glog.V(0).Infof("failover: %q lease expires in %v, deferring promotion", entry.Name, delay)
|
||||
volumeName := entry.Name
|
||||
capturedEpoch := entry.Epoch // T3: capture epoch for stale-timer validation
|
||||
capturedDeadServer := deadServer // capture for closure
|
||||
timer := time.AfterFunc(delay, func() {
|
||||
// Clean up the deferred entry regardless of outcome.
|
||||
ms.removeFiredDeferredPromotion(capturedDeadServer, volumeName)
|
||||
|
||||
// T3: Re-validate before acting — prevent stale timer on recreated/changed volume.
|
||||
current, ok := ms.blockRegistry.Lookup(volumeName)
|
||||
if !ok {
|
||||
@@ -76,7 +203,12 @@ func (ms *MasterServer) failoverBlockVolumes(deadServer string) {
|
||||
})
|
||||
ms.blockFailover.mu.Lock()
|
||||
ms.blockFailover.deferredTimers[deadServer] = append(
|
||||
ms.blockFailover.deferredTimers[deadServer], timer)
|
||||
ms.blockFailover.deferredTimers[deadServer], deferredPromotion{
|
||||
Timer: timer,
|
||||
VolumeName: volumeName,
|
||||
CurrentPrimary: entry.VolumeServer,
|
||||
AffectedServer: deadServer,
|
||||
})
|
||||
ms.blockFailover.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
@@ -159,12 +291,14 @@ func (ms *MasterServer) finalizePromotion(volumeName, oldPrimary, oldPath string
|
||||
assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{
|
||||
DataAddr: ri.DataAddr,
|
||||
CtrlAddr: ri.CtrlAddr,
|
||||
ServerID: ri.Server, // V2: stable identity
|
||||
})
|
||||
}
|
||||
// Backward compat: also set scalar fields if exactly 1 replica.
|
||||
if len(entry.Replicas) == 1 {
|
||||
assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
|
||||
assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
|
||||
assignment.ReplicaServerID = entry.Replicas[0].Server // V2: stable identity
|
||||
}
|
||||
ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, assignment)
|
||||
|
||||
@@ -210,14 +344,36 @@ func (ms *MasterServer) cancelDeferredTimers(server string) {
|
||||
return
|
||||
}
|
||||
ms.blockFailover.mu.Lock()
|
||||
timers := ms.blockFailover.deferredTimers[server]
|
||||
promos := ms.blockFailover.deferredTimers[server]
|
||||
delete(ms.blockFailover.deferredTimers, server)
|
||||
ms.blockFailover.mu.Unlock()
|
||||
for _, t := range timers {
|
||||
t.Stop()
|
||||
for _, dp := range promos {
|
||||
dp.Timer.Stop()
|
||||
}
|
||||
if len(timers) > 0 {
|
||||
glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(timers), server)
|
||||
if len(promos) > 0 {
|
||||
glog.V(0).Infof("failover: cancelled %d deferred promotion timers for reconnected %s", len(promos), server)
|
||||
}
|
||||
}
|
||||
|
||||
// removeFiredDeferredPromotion removes a single deferred promotion entry after
|
||||
// its timer has fired (whether it promoted or was skipped). This keeps
|
||||
// FailoverDiagnostic accurate: once the timer fires, the volume is no longer
|
||||
// in lease-wait state.
|
||||
func (ms *MasterServer) removeFiredDeferredPromotion(server, volumeName string) {
|
||||
if ms.blockFailover == nil {
|
||||
return
|
||||
}
|
||||
ms.blockFailover.mu.Lock()
|
||||
defer ms.blockFailover.mu.Unlock()
|
||||
promos := ms.blockFailover.deferredTimers[server]
|
||||
for i, dp := range promos {
|
||||
if dp.VolumeName == volumeName {
|
||||
ms.blockFailover.deferredTimers[server] = append(promos[:i], promos[i+1:]...)
|
||||
if len(ms.blockFailover.deferredTimers[server]) == 0 {
|
||||
delete(ms.blockFailover.deferredTimers, server)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,16 +442,18 @@ func (ms *MasterServer) recoverBlockVolumes(reconnectedServer string) {
|
||||
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
||||
LeaseTtlMs: leaseTTLMs,
|
||||
}
|
||||
// Include all replica addresses.
|
||||
// Include all replica addresses with stable identity.
|
||||
for _, ri := range entry.Replicas {
|
||||
primaryAssignment.ReplicaAddrs = append(primaryAssignment.ReplicaAddrs, blockvol.ReplicaAddr{
|
||||
DataAddr: ri.DataAddr,
|
||||
CtrlAddr: ri.CtrlAddr,
|
||||
ServerID: ri.Server, // V2: stable identity
|
||||
})
|
||||
}
|
||||
if len(entry.Replicas) == 1 {
|
||||
primaryAssignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
|
||||
primaryAssignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
|
||||
primaryAssignment.ReplicaServerID = entry.Replicas[0].Server // V2
|
||||
}
|
||||
ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, primaryAssignment)
|
||||
|
||||
@@ -346,11 +504,13 @@ func (ms *MasterServer) refreshPrimaryForAddrChange(ac ReplicaAddrChange) {
|
||||
assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{
|
||||
DataAddr: ri.DataAddr,
|
||||
CtrlAddr: ri.CtrlAddr,
|
||||
ServerID: ri.Server, // V2: stable identity
|
||||
})
|
||||
}
|
||||
if len(entry.Replicas) == 1 {
|
||||
assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
|
||||
assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
|
||||
assignment.ReplicaServerID = entry.Replicas[0].Server // V2
|
||||
}
|
||||
// Use current registry primary (not stale ac.PrimaryServer) in case
|
||||
// failover happened between address-change detection and this refresh.
|
||||
@@ -384,6 +544,9 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) {
|
||||
capturedEpoch := entry.Epoch
|
||||
deadPrimary := entry.VolumeServer
|
||||
timer := time.AfterFunc(delay, func() {
|
||||
// Clean up the deferred entry regardless of outcome.
|
||||
ms.removeFiredDeferredPromotion(deadPrimary, volumeName)
|
||||
|
||||
current, ok := ms.blockRegistry.Lookup(volumeName)
|
||||
if !ok {
|
||||
return
|
||||
@@ -397,7 +560,12 @@ func (ms *MasterServer) reevaluateOrphanedPrimaries(server string) {
|
||||
})
|
||||
ms.blockFailover.mu.Lock()
|
||||
ms.blockFailover.deferredTimers[deadPrimary] = append(
|
||||
ms.blockFailover.deferredTimers[deadPrimary], timer)
|
||||
ms.blockFailover.deferredTimers[deadPrimary], deferredPromotion{
|
||||
Timer: timer,
|
||||
VolumeName: volumeName,
|
||||
CurrentPrimary: deadPrimary,
|
||||
AffectedServer: deadPrimary,
|
||||
})
|
||||
ms.blockFailover.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
270
weed/server/qa_block_diagnosability_test.go
Normal file
270
weed/server/qa_block_diagnosability_test.go
Normal file
@@ -0,0 +1,270 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
|
||||
// ============================================================
|
||||
// Phase 12 P3: Diagnosability / Blocker Accounting
|
||||
//
|
||||
// All diagnosis conclusions use ONLY explicit bounded read-only
|
||||
// diagnosis surfaces:
|
||||
// - LookupBlockVolume (product-visible publication)
|
||||
// - FailoverDiagnostic (volume-oriented failover state)
|
||||
// - PublicationDiagnostic (lookup vs authority coherence)
|
||||
// - RecoveryDiagnostic (active recovery task set)
|
||||
// - phase-12-p3-blockers.md (finite blocker ledger)
|
||||
//
|
||||
// NOT performance, NOT rollout readiness.
|
||||
// ============================================================
|
||||
|
||||
// --- S1: Failover convergence diagnosable via FailoverDiagnostic ---
|
||||
|
||||
func TestP12P3_FailoverConvergence_Diagnosable(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: "diag-vol-1", SizeBytes: 1 << 20,
|
||||
})
|
||||
|
||||
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-1")
|
||||
s.bs.localServerID = entry.VolumeServer
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Surface 1: LookupBlockVolume shows current primary before failover.
|
||||
lookupBefore, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"})
|
||||
if lookupBefore.VolumeServer != entry.VolumeServer {
|
||||
t.Fatal("lookup should show original primary before failover")
|
||||
}
|
||||
|
||||
// Surface 2: FailoverDiagnostic — no volumes in failover state yet.
|
||||
failoverBefore := s.ms.blockFailover.DiagnosticSnapshot()
|
||||
for _, v := range failoverBefore.Volumes {
|
||||
if v.VolumeName == "diag-vol-1" {
|
||||
t.Fatalf("S1: diag-vol-1 should not appear in failover diagnostic before failover, got %+v", v)
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger failover: expire lease, then failover.
|
||||
s.ms.blockRegistry.UpdateEntry("diag-vol-1", func(e *BlockVolumeEntry) {
|
||||
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
|
||||
})
|
||||
s.ms.failoverBlockVolumes(entry.VolumeServer)
|
||||
|
||||
// Surface 1 after: LookupBlockVolume shows NEW primary.
|
||||
lookupAfter, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-1"})
|
||||
if lookupAfter.VolumeServer == entry.VolumeServer {
|
||||
t.Fatal("S1 stall: lookup still shows old primary after failover")
|
||||
}
|
||||
|
||||
// Surface 2 after: FailoverDiagnostic shows volume-level failover state.
|
||||
failoverAfter := s.ms.blockFailover.DiagnosticSnapshot()
|
||||
|
||||
// Classify via explicit diagnosis surface: find diag-vol-1 in failover volumes.
|
||||
var found *FailoverVolumeState
|
||||
for i := range failoverAfter.Volumes {
|
||||
if failoverAfter.Volumes[i].VolumeName == "diag-vol-1" {
|
||||
found = &failoverAfter.Volumes[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if found == nil {
|
||||
t.Fatal("S1: diag-vol-1 not found in FailoverDiagnostic after failover")
|
||||
}
|
||||
|
||||
// Diagnosis conclusion from explicit surfaces only:
|
||||
// - Lookup changed (old → new primary)
|
||||
// - FailoverDiagnostic classifies state as rebuild_pending
|
||||
// - AffectedServer identifies the dead server
|
||||
if !found.PendingRebuild {
|
||||
t.Fatal("S1: FailoverDiagnostic should show PendingRebuild=true")
|
||||
}
|
||||
if found.Reason != "rebuild_pending" {
|
||||
t.Fatalf("S1: expected reason=rebuild_pending, got %q", found.Reason)
|
||||
}
|
||||
if found.AffectedServer != entry.VolumeServer {
|
||||
t.Fatalf("S1: AffectedServer should be dead server %s, got %s", entry.VolumeServer, found.AffectedServer)
|
||||
}
|
||||
if found.CurrentPrimary != lookupAfter.VolumeServer {
|
||||
t.Fatalf("S1: CurrentPrimary should match lookup %s, got %s", lookupAfter.VolumeServer, found.CurrentPrimary)
|
||||
}
|
||||
|
||||
t.Logf("P12P3 S1: diagnosed via LookupBlockVolume(%s→%s) + FailoverDiagnostic(vol=%s, reason=%s, affected=%s)",
|
||||
lookupBefore.VolumeServer, lookupAfter.VolumeServer, found.VolumeName, found.Reason, found.AffectedServer)
|
||||
}
|
||||
|
||||
// --- S2: Publication mismatch diagnosable via PublicationDiagnostic ---
|
||||
|
||||
func TestP12P3_PublicationMismatch_Diagnosable(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: "diag-vol-2", SizeBytes: 1 << 20,
|
||||
})
|
||||
|
||||
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-2")
|
||||
s.bs.localServerID = entry.VolumeServer
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Surface 1: PublicationDiagnostic before failover — should be coherent.
|
||||
pubBefore, ok := s.ms.PublicationDiagnosticFor("diag-vol-2")
|
||||
if !ok {
|
||||
t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2")
|
||||
}
|
||||
if !pubBefore.Coherent {
|
||||
t.Fatalf("S2: publication should be coherent before failover, got reason=%q", pubBefore.Reason)
|
||||
}
|
||||
|
||||
// Surface 2: LookupBlockVolume — repeated lookups self-consistent.
|
||||
lookup1, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"})
|
||||
lookup2, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "diag-vol-2"})
|
||||
if lookup1.IscsiAddr != lookup2.IscsiAddr || lookup1.VolumeServer != lookup2.VolumeServer {
|
||||
t.Fatalf("S2: repeated lookup mismatch: %s/%s vs %s/%s",
|
||||
lookup1.VolumeServer, lookup1.IscsiAddr, lookup2.VolumeServer, lookup2.IscsiAddr)
|
||||
}
|
||||
|
||||
// Trigger failover.
|
||||
s.ms.blockRegistry.UpdateEntry("diag-vol-2", func(e *BlockVolumeEntry) {
|
||||
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
|
||||
})
|
||||
s.ms.failoverBlockVolumes(entry.VolumeServer)
|
||||
|
||||
// Surface 1 after: PublicationDiagnostic after failover — still coherent.
|
||||
pubAfter, ok := s.ms.PublicationDiagnosticFor("diag-vol-2")
|
||||
if !ok {
|
||||
t.Fatal("S2: PublicationDiagnosticFor should find diag-vol-2 after failover")
|
||||
}
|
||||
if !pubAfter.Coherent {
|
||||
t.Fatalf("S2: publication should be coherent after failover, got reason=%q", pubAfter.Reason)
|
||||
}
|
||||
|
||||
// Diagnosis conclusion from explicit surfaces only:
|
||||
// - Pre-failover: coherent, lookup matches authority
|
||||
// - Post-failover: coherent, lookup updated to new primary
|
||||
// - Publication switched: post != pre
|
||||
if pubAfter.LookupVolumeServer == pubBefore.LookupVolumeServer {
|
||||
t.Fatal("S2: LookupVolumeServer unchanged after failover — publication did not switch")
|
||||
}
|
||||
if pubAfter.LookupIscsiAddr == pubBefore.LookupIscsiAddr {
|
||||
t.Fatal("S2: LookupIscsiAddr unchanged after failover")
|
||||
}
|
||||
|
||||
// Post-failover repeated lookup still self-consistent (via diagnostic).
|
||||
pubAfter2, _ := s.ms.PublicationDiagnosticFor("diag-vol-2")
|
||||
if pubAfter2.LookupVolumeServer != pubAfter.LookupVolumeServer ||
|
||||
pubAfter2.LookupIscsiAddr != pubAfter.LookupIscsiAddr {
|
||||
t.Fatal("S2: post-failover publication diagnostics inconsistent")
|
||||
}
|
||||
|
||||
t.Logf("P12P3 S2: diagnosed via PublicationDiagnostic — pre(vs=%s, iscsi=%s, coherent=%v) → post(vs=%s, iscsi=%s, coherent=%v)",
|
||||
pubBefore.LookupVolumeServer, pubBefore.LookupIscsiAddr, pubBefore.Coherent,
|
||||
pubAfter.LookupVolumeServer, pubAfter.LookupIscsiAddr, pubAfter.Coherent)
|
||||
}
|
||||
|
||||
// --- S3: Runtime residue diagnosable via RecoveryDiagnostic ---
|
||||
|
||||
func TestP12P3_RuntimeResidue_Diagnosable(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: "diag-vol-3", SizeBytes: 1 << 20,
|
||||
})
|
||||
|
||||
entry, _ := s.ms.blockRegistry.Lookup("diag-vol-3")
|
||||
s.bs.localServerID = entry.VolumeServer
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Surface: RecoveryDiagnostic shows active task set.
|
||||
diagBefore := s.bs.v2Recovery.DiagnosticSnapshot()
|
||||
t.Logf("S3 before delete: %d active tasks: %v", len(diagBefore.ActiveTasks), diagBefore.ActiveTasks)
|
||||
|
||||
// Delete the volume.
|
||||
s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: "diag-vol-3"})
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Surface after: RecoveryDiagnostic — no tasks for deleted volume.
|
||||
diagAfter := s.bs.v2Recovery.DiagnosticSnapshot()
|
||||
for _, task := range diagAfter.ActiveTasks {
|
||||
if strings.Contains(task, "diag-vol-3") {
|
||||
t.Fatalf("S3 residue: task %s active after delete", task)
|
||||
}
|
||||
}
|
||||
|
||||
// Diagnosis conclusion from explicit surface only:
|
||||
// - RecoveryDiagnostic.ActiveTasks does not contain deleted volume's tasks
|
||||
// - Conclusion: clean (no residue) or non-empty but unrelated to deleted volume
|
||||
if len(diagAfter.ActiveTasks) == 0 {
|
||||
t.Log("P12P3 S3: diagnosed via RecoveryDiagnostic — clean (0 active tasks after delete)")
|
||||
} else {
|
||||
t.Logf("P12P3 S3: diagnosed via RecoveryDiagnostic — %d active tasks (none for deleted vol)",
|
||||
len(diagAfter.ActiveTasks))
|
||||
}
|
||||
}
|
||||
|
||||
// --- Blocker ledger: reads and validates the actual file ---
|
||||
|
||||
func TestP12P3_BlockerLedger_Bounded(t *testing.T) {
|
||||
ledgerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md"
|
||||
|
||||
data, err := os.ReadFile(ledgerPath)
|
||||
if err != nil {
|
||||
t.Fatalf("blocker ledger must exist at %s: %v", ledgerPath, err)
|
||||
}
|
||||
|
||||
content := string(data)
|
||||
|
||||
// Must contain diagnosed items.
|
||||
for _, id := range []string{"B1", "B2", "B3"} {
|
||||
if !strings.Contains(content, id) {
|
||||
t.Fatalf("ledger missing diagnosed item %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Must contain unresolved items.
|
||||
for _, id := range []string{"U1", "U2", "U3"} {
|
||||
if !strings.Contains(content, id) {
|
||||
t.Fatalf("ledger missing unresolved item %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Must contain out-of-scope section.
|
||||
if !strings.Contains(content, "Out of Scope") {
|
||||
t.Fatal("ledger must have 'Out of Scope' section")
|
||||
}
|
||||
|
||||
// Must NOT overclaim perf or rollout.
|
||||
lines := strings.Split(content, "\n")
|
||||
diagnosedCount := 0
|
||||
unresolvedCount := 0
|
||||
for _, line := range lines {
|
||||
if strings.HasPrefix(strings.TrimSpace(line), "| B") {
|
||||
diagnosedCount++
|
||||
}
|
||||
if strings.HasPrefix(strings.TrimSpace(line), "| U") {
|
||||
unresolvedCount++
|
||||
}
|
||||
}
|
||||
|
||||
total := diagnosedCount + unresolvedCount
|
||||
if total == 0 {
|
||||
t.Fatal("ledger has no blocker items")
|
||||
}
|
||||
if total > 20 {
|
||||
t.Fatalf("ledger should be finite, got %d items", total)
|
||||
}
|
||||
|
||||
t.Logf("P12P3 blockers: %d diagnosed + %d unresolved = %d total (from actual file, finite)",
|
||||
diagnosedCount, unresolvedCount, total)
|
||||
}
|
||||
582
weed/server/qa_block_perf_test.go
Normal file
582
weed/server/qa_block_perf_test.go
Normal file
@@ -0,0 +1,582 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
)
|
||||
|
||||
// ============================================================
|
||||
// Phase 12 P4: Performance Floor — Bounded Measurement Package
|
||||
//
|
||||
// Workload envelope:
|
||||
// Topology: RF=2 sync_all accepted chosen path
|
||||
// Operations: 4K random write, 4K random read, 4K sequential write, 4K sequential read
|
||||
// Runtime: no failover, no disturbance, steady-state
|
||||
// Environment: unit test harness (single-process, local disk, engine-local I/O)
|
||||
//
|
||||
// What this measures:
|
||||
// Engine I/O floor for the accepted chosen path. WriteLBA/ReadLBA through
|
||||
// the full fencing path (epoch, role, lease, writeGate, WAL, dirtyMap).
|
||||
// No transport layer (iSCSI/NVMe). No cross-machine replication.
|
||||
//
|
||||
// What this does NOT measure:
|
||||
// Transport throughput, cross-machine replication tax, multi-client concurrency,
|
||||
// failover-under-load, degraded mode. Production floor with replication is
|
||||
// documented in baseline-roce-20260401.md.
|
||||
//
|
||||
// NOT performance tuning. NOT broad benchmark.
|
||||
// ============================================================
|
||||
|
||||
const (
|
||||
perfBlockSize = 4096
|
||||
perfVolumeSize = 64 * 1024 * 1024 // 64MB
|
||||
perfWALSize = 16 * 1024 * 1024 // 16MB
|
||||
perfOps = 1000 // ops per measurement run
|
||||
perfWarmupOps = 200 // warmup ops (discarded from measurement)
|
||||
perfIterations = 3 // run N times, report worst as floor
|
||||
)
|
||||
|
||||
// Minimum acceptable floor thresholds (engine-local, single-writer).
|
||||
//
|
||||
// These are regression gates, not performance targets. Set conservatively
|
||||
// so any reasonable hardware passes, but catastrophic regressions
|
||||
// (accidental serialization, O(n^2) scan, broken WAL path) are caught.
|
||||
//
|
||||
// Rationale for values:
|
||||
// Measured on dev SSD: rand-write ~10K, rand-read ~80K, seq-write ~30K, seq-read ~180K.
|
||||
// Thresholds set at ~10% of measured to tolerate slow CI machines and VMs.
|
||||
// Write P99 ceiling at 100ms catches deadlocks/stalls without false-positiving
|
||||
// on slow storage.
|
||||
var perfFloorGates = map[string]struct {
|
||||
MinIOPS float64
|
||||
MaxWriteP99 time.Duration // 0 = no ceiling (reads)
|
||||
}{
|
||||
"rand-write": {MinIOPS: 1000, MaxWriteP99: 100 * time.Millisecond},
|
||||
"rand-read": {MinIOPS: 5000},
|
||||
"seq-write": {MinIOPS: 2000, MaxWriteP99: 100 * time.Millisecond},
|
||||
"seq-read": {MinIOPS: 10000},
|
||||
}
|
||||
|
||||
// perfResult holds measurements for one workload run.
|
||||
type perfResult struct {
|
||||
Workload string
|
||||
Ops int
|
||||
Elapsed time.Duration
|
||||
IOPS float64
|
||||
MBps float64
|
||||
LatSamples []int64 // per-op latency in nanoseconds
|
||||
}
|
||||
|
||||
func (r *perfResult) latPct(pct float64) time.Duration {
|
||||
if len(r.LatSamples) == 0 {
|
||||
return 0
|
||||
}
|
||||
sorted := make([]int64, len(r.LatSamples))
|
||||
copy(sorted, r.LatSamples)
|
||||
sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
|
||||
idx := int(math.Ceil(pct/100.0*float64(len(sorted)))) - 1
|
||||
if idx < 0 {
|
||||
idx = 0
|
||||
}
|
||||
if idx >= len(sorted) {
|
||||
idx = len(sorted) - 1
|
||||
}
|
||||
return time.Duration(sorted[idx])
|
||||
}
|
||||
|
||||
func (r *perfResult) latAvg() time.Duration {
|
||||
if len(r.LatSamples) == 0 {
|
||||
return 0
|
||||
}
|
||||
var sum int64
|
||||
for _, s := range r.LatSamples {
|
||||
sum += s
|
||||
}
|
||||
return time.Duration(sum / int64(len(r.LatSamples)))
|
||||
}
|
||||
|
||||
// setupPerfVolume creates a BlockVol configured as Primary for perf measurement.
|
||||
func setupPerfVolume(t *testing.T) *blockvol.BlockVol {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
volPath := filepath.Join(dir, "perf.blk")
|
||||
vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{
|
||||
VolumeSize: perfVolumeSize,
|
||||
BlockSize: perfBlockSize,
|
||||
WALSize: perfWALSize,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Set up as Primary with long lease so writes are allowed.
|
||||
if err := vol.HandleAssignment(1, blockvol.RolePrimary, 10*time.Minute); err != nil {
|
||||
vol.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { vol.Close() })
|
||||
return vol
|
||||
}
|
||||
|
||||
// maxLBAs returns the number of addressable 4K blocks in the extent area.
|
||||
func maxLBAs() uint64 {
|
||||
// Volume size minus WAL, divided by block size, with safety margin.
|
||||
return (perfVolumeSize - perfWALSize) / perfBlockSize / 2
|
||||
}
|
||||
|
||||
// runPerfWorkload executes one workload measurement and returns the result.
|
||||
func runPerfWorkload(t *testing.T, vol *blockvol.BlockVol, workload string, ops int) perfResult {
|
||||
t.Helper()
|
||||
data := make([]byte, perfBlockSize)
|
||||
rand.Read(data)
|
||||
max := maxLBAs()
|
||||
|
||||
samples := make([]int64, 0, ops)
|
||||
start := time.Now()
|
||||
|
||||
for i := 0; i < ops; i++ {
|
||||
var lba uint64
|
||||
switch {
|
||||
case strings.HasPrefix(workload, "rand"):
|
||||
lba = uint64(mrand.Int63n(int64(max)))
|
||||
default: // sequential
|
||||
lba = uint64(i) % max
|
||||
}
|
||||
|
||||
opStart := time.Now()
|
||||
switch {
|
||||
case strings.HasSuffix(workload, "write"):
|
||||
if err := vol.WriteLBA(lba, data); err != nil {
|
||||
t.Fatalf("%s op %d: WriteLBA(%d): %v", workload, i, lba, err)
|
||||
}
|
||||
case strings.HasSuffix(workload, "read"):
|
||||
if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil {
|
||||
t.Fatalf("%s op %d: ReadLBA(%d): %v", workload, i, lba, err)
|
||||
}
|
||||
}
|
||||
samples = append(samples, time.Since(opStart).Nanoseconds())
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
iops := float64(ops) / elapsed.Seconds()
|
||||
mbps := iops * float64(perfBlockSize) / (1024 * 1024)
|
||||
|
||||
return perfResult{
|
||||
Workload: workload,
|
||||
Ops: ops,
|
||||
Elapsed: elapsed,
|
||||
IOPS: iops,
|
||||
MBps: mbps,
|
||||
LatSamples: samples,
|
||||
}
|
||||
}
|
||||
|
||||
// floorOf returns the worst (lowest) IOPS and worst (highest) P99 across iterations.
|
||||
type perfFloor struct {
|
||||
Workload string
|
||||
FloorIOPS float64
|
||||
FloorMBps float64
|
||||
WorstAvg time.Duration
|
||||
WorstP50 time.Duration
|
||||
WorstP99 time.Duration
|
||||
WorstMax time.Duration
|
||||
}
|
||||
|
||||
func computeFloor(results []perfResult) perfFloor {
|
||||
f := perfFloor{
|
||||
Workload: results[0].Workload,
|
||||
FloorIOPS: math.MaxFloat64,
|
||||
FloorMBps: math.MaxFloat64,
|
||||
}
|
||||
for _, r := range results {
|
||||
if r.IOPS < f.FloorIOPS {
|
||||
f.FloorIOPS = r.IOPS
|
||||
}
|
||||
if r.MBps < f.FloorMBps {
|
||||
f.FloorMBps = r.MBps
|
||||
}
|
||||
avg := r.latAvg()
|
||||
if avg > f.WorstAvg {
|
||||
f.WorstAvg = avg
|
||||
}
|
||||
p50 := r.latPct(50)
|
||||
if p50 > f.WorstP50 {
|
||||
f.WorstP50 = p50
|
||||
}
|
||||
p99 := r.latPct(99)
|
||||
if p99 > f.WorstP99 {
|
||||
f.WorstP99 = p99
|
||||
}
|
||||
pmax := r.latPct(100)
|
||||
if pmax > f.WorstMax {
|
||||
f.WorstMax = pmax
|
||||
}
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// --- Test 1: PerformanceFloor_Bounded ---
|
||||
|
||||
func TestP12P4_PerformanceFloor_Bounded(t *testing.T) {
|
||||
vol := setupPerfVolume(t)
|
||||
|
||||
workloads := []string{"rand-write", "rand-read", "seq-write", "seq-read"}
|
||||
floors := make([]perfFloor, 0, len(workloads))
|
||||
|
||||
for _, wl := range workloads {
|
||||
// Warmup: populate volume with data (needed for reads).
|
||||
if strings.HasSuffix(wl, "read") {
|
||||
warmupData := make([]byte, perfBlockSize)
|
||||
rand.Read(warmupData)
|
||||
for i := 0; i < int(maxLBAs()); i++ {
|
||||
if err := vol.WriteLBA(uint64(i), warmupData); err != nil {
|
||||
break // WAL full is acceptable during warmup
|
||||
}
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond) // let flusher drain
|
||||
}
|
||||
|
||||
// Warmup ops (discarded).
|
||||
runPerfWorkload(t, vol, wl, perfWarmupOps)
|
||||
|
||||
// Measurement: N iterations, take floor.
|
||||
var results []perfResult
|
||||
for iter := 0; iter < perfIterations; iter++ {
|
||||
r := runPerfWorkload(t, vol, wl, perfOps)
|
||||
results = append(results, r)
|
||||
}
|
||||
|
||||
floor := computeFloor(results)
|
||||
floors = append(floors, floor)
|
||||
}
|
||||
|
||||
// Report structured floor table.
|
||||
t.Log("")
|
||||
t.Log("=== P12P4 Performance Floor (engine-local, single-writer) ===")
|
||||
t.Log("")
|
||||
t.Logf("%-12s %10s %8s %10s %10s %10s %10s",
|
||||
"Workload", "Floor IOPS", "MB/s", "Avg Lat", "P50 Lat", "P99 Lat", "Max Lat")
|
||||
t.Logf("%-12s %10s %8s %10s %10s %10s %10s",
|
||||
"--------", "----------", "------", "-------", "-------", "-------", "-------")
|
||||
for _, f := range floors {
|
||||
t.Logf("%-12s %10.0f %8.2f %10s %10s %10s %10s",
|
||||
f.Workload, f.FloorIOPS, f.FloorMBps, f.WorstAvg, f.WorstP50, f.WorstP99, f.WorstMax)
|
||||
}
|
||||
t.Log("")
|
||||
t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d warmup=%d iterations=%d",
|
||||
perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, perfOps, perfWarmupOps, perfIterations)
|
||||
t.Log("Method: worst of N iterations (floor, not peak)")
|
||||
t.Log("Scope: engine-local only; production RF=2 floor in baseline-roce-20260401.md")
|
||||
|
||||
// Gate: floor values must meet minimum acceptable thresholds.
|
||||
// These are regression gates — if any floor drops below the gate,
|
||||
// the test fails, blocking rollout.
|
||||
t.Log("")
|
||||
t.Log("=== Floor Gate Validation ===")
|
||||
allGatesPassed := true
|
||||
for _, f := range floors {
|
||||
gate, ok := perfFloorGates[f.Workload]
|
||||
if !ok {
|
||||
t.Fatalf("no floor gate defined for workload %s", f.Workload)
|
||||
}
|
||||
passed := true
|
||||
if f.FloorIOPS < gate.MinIOPS {
|
||||
t.Errorf("GATE FAIL: %s floor IOPS %.0f < minimum %.0f", f.Workload, f.FloorIOPS, gate.MinIOPS)
|
||||
passed = false
|
||||
}
|
||||
if gate.MaxWriteP99 > 0 && f.WorstP99 > gate.MaxWriteP99 {
|
||||
t.Errorf("GATE FAIL: %s worst P99 %s > ceiling %s", f.Workload, f.WorstP99, gate.MaxWriteP99)
|
||||
passed = false
|
||||
}
|
||||
status := "PASS"
|
||||
if !passed {
|
||||
status = "FAIL"
|
||||
allGatesPassed = false
|
||||
}
|
||||
t.Logf(" %-12s min=%6.0f IOPS → floor=%6.0f [%s]", f.Workload, gate.MinIOPS, f.FloorIOPS, status)
|
||||
}
|
||||
|
||||
if !allGatesPassed {
|
||||
t.Fatal("P12P4 PerformanceFloor: FAIL — one or more floor gates not met")
|
||||
}
|
||||
t.Log("P12P4 PerformanceFloor: PASS — all floor gates met")
|
||||
}
|
||||
|
||||
// --- Test 2: CostCharacterization_Bounded ---
|
||||
|
||||
func TestP12P4_CostCharacterization_Bounded(t *testing.T) {
|
||||
vol := setupPerfVolume(t)
|
||||
|
||||
// Measure write latency breakdown: WriteLBA includes WAL append + group commit.
|
||||
data := make([]byte, perfBlockSize)
|
||||
rand.Read(data)
|
||||
max := maxLBAs()
|
||||
|
||||
const costOps = 500
|
||||
var writeLatSum int64
|
||||
for i := 0; i < costOps; i++ {
|
||||
lba := uint64(mrand.Int63n(int64(max)))
|
||||
start := time.Now()
|
||||
if err := vol.WriteLBA(lba, data); err != nil {
|
||||
t.Fatalf("write op %d: %v", i, err)
|
||||
}
|
||||
writeLatSum += time.Since(start).Nanoseconds()
|
||||
}
|
||||
avgWriteLat := time.Duration(writeLatSum / costOps)
|
||||
|
||||
// Measure read latency for comparison.
|
||||
// Populate first.
|
||||
for i := 0; i < int(max/2); i++ {
|
||||
vol.WriteLBA(uint64(i), data)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond) // let flusher drain
|
||||
|
||||
var readLatSum int64
|
||||
for i := 0; i < costOps; i++ {
|
||||
lba := uint64(mrand.Int63n(int64(max / 2)))
|
||||
start := time.Now()
|
||||
if _, err := vol.ReadLBA(lba, perfBlockSize); err != nil {
|
||||
t.Fatalf("read op %d: %v", i, err)
|
||||
}
|
||||
readLatSum += time.Since(start).Nanoseconds()
|
||||
}
|
||||
avgReadLat := time.Duration(readLatSum / costOps)
|
||||
|
||||
// Cost statement.
|
||||
t.Log("")
|
||||
t.Log("=== P12P4 Cost Characterization (engine-local) ===")
|
||||
t.Log("")
|
||||
t.Logf("Average write latency: %s (includes WAL append + group commit sync)", avgWriteLat)
|
||||
t.Logf("Average read latency: %s (dirtyMap lookup + WAL/extent read)", avgReadLat)
|
||||
t.Log("")
|
||||
t.Log("Bounded cost statement:")
|
||||
t.Log(" WAL write amplification: 2x minimum (WAL write + eventual extent flush)")
|
||||
t.Log(" Group commit: amortizes fdatasync across batched writers (1 sync per batch)")
|
||||
t.Log(" Replication tax (production RF=2 sync_all): -56% vs RF=1 (barrier round-trip)")
|
||||
t.Log(" Replication tax source: baseline-roce-20260401.md, measured on 25Gbps RoCE")
|
||||
t.Log("")
|
||||
t.Logf("Write/read ratio: %.1fx (write is %.1fx slower than read)",
|
||||
float64(avgWriteLat)/float64(avgReadLat),
|
||||
float64(avgWriteLat)/float64(avgReadLat))
|
||||
t.Log("")
|
||||
t.Logf("Config: volume=%dMB WAL=%dMB block=%dB ops=%d",
|
||||
perfVolumeSize/(1024*1024), perfWALSize/(1024*1024), perfBlockSize, costOps)
|
||||
|
||||
// Proof: cost values are finite and positive.
|
||||
if avgWriteLat <= 0 || avgReadLat <= 0 {
|
||||
t.Fatal("latency values must be positive")
|
||||
}
|
||||
if avgWriteLat < avgReadLat {
|
||||
t.Log("Note: write faster than read in this run (possible due to WAL cache hits)")
|
||||
}
|
||||
|
||||
t.Log("P12P4 CostCharacterization: PASS — bounded cost statement produced")
|
||||
}
|
||||
|
||||
// --- Test 3: RolloutGate_Bounded ---
|
||||
|
||||
func TestP12P4_RolloutGate_Bounded(t *testing.T) {
|
||||
floorPath := "../../sw-block/.private/phase/phase-12-p4-floor.md"
|
||||
gatesPath := "../../sw-block/.private/phase/phase-12-p4-rollout-gates.md"
|
||||
baselinePath := "../../learn/projects/sw-block/test/results/baseline-roce-20260401.md"
|
||||
blockerPath := "../../sw-block/.private/phase/phase-12-p3-blockers.md"
|
||||
|
||||
// --- Read all cited evidence sources ---
|
||||
|
||||
floorData, err := os.ReadFile(floorPath)
|
||||
if err != nil {
|
||||
t.Fatalf("floor doc must exist at %s: %v", floorPath, err)
|
||||
}
|
||||
floorContent := string(floorData)
|
||||
|
||||
gatesData, err := os.ReadFile(gatesPath)
|
||||
if err != nil {
|
||||
t.Fatalf("rollout-gates doc must exist at %s: %v", gatesPath, err)
|
||||
}
|
||||
gatesContent := string(gatesData)
|
||||
|
||||
baselineData, err := os.ReadFile(baselinePath)
|
||||
if err != nil {
|
||||
t.Fatalf("cited baseline must exist at %s: %v", baselinePath, err)
|
||||
}
|
||||
baselineContent := string(baselineData)
|
||||
|
||||
blockerData, err := os.ReadFile(blockerPath)
|
||||
if err != nil {
|
||||
t.Fatalf("cited blocker ledger must exist at %s: %v", blockerPath, err)
|
||||
}
|
||||
blockerContent := string(blockerData)
|
||||
|
||||
// --- Structural validation (shape) ---
|
||||
|
||||
// Floor doc: workload envelope, floor table, non-claims.
|
||||
for _, required := range []string{"RF=2", "sync_all", "4K random write", "4K random read", "sequential write", "sequential read"} {
|
||||
if !strings.Contains(floorContent, required) {
|
||||
t.Fatalf("floor doc missing required content: %q", required)
|
||||
}
|
||||
}
|
||||
if !strings.Contains(floorContent, "Floor") || !strings.Contains(floorContent, "IOPS") {
|
||||
t.Fatal("floor doc must contain floor table with IOPS")
|
||||
}
|
||||
if !strings.Contains(floorContent, "does NOT") {
|
||||
t.Fatal("floor doc must contain explicit non-claims")
|
||||
}
|
||||
|
||||
// Gates doc: gates table, launch envelope, exclusions, non-claims.
|
||||
if !strings.Contains(gatesContent, "Gate") || !strings.Contains(gatesContent, "Status") {
|
||||
t.Fatal("rollout-gates doc must contain gates table")
|
||||
}
|
||||
if !strings.Contains(gatesContent, "Launch Envelope") {
|
||||
t.Fatal("rollout-gates doc must contain launch envelope")
|
||||
}
|
||||
if !strings.Contains(gatesContent, "Exclusion") {
|
||||
t.Fatal("rollout-gates doc must contain exclusions")
|
||||
}
|
||||
if !strings.Contains(gatesContent, "does NOT") {
|
||||
t.Fatal("rollout-gates doc must contain explicit non-claims")
|
||||
}
|
||||
|
||||
// Count gates — must be finite.
|
||||
gateLines := 0
|
||||
for _, line := range strings.Split(gatesContent, "\n") {
|
||||
trimmed := strings.TrimSpace(line)
|
||||
if strings.HasPrefix(trimmed, "| G") || strings.HasPrefix(trimmed, "| E") {
|
||||
gateLines++
|
||||
}
|
||||
}
|
||||
if gateLines == 0 {
|
||||
t.Fatal("rollout-gates doc has no gate items")
|
||||
}
|
||||
if gateLines > 20 {
|
||||
t.Fatalf("rollout-gates doc should be finite, got %d items", gateLines)
|
||||
}
|
||||
|
||||
// --- Semantic cross-checks (evidence alignment) ---
|
||||
|
||||
// 1. G6 cites "28.4K write IOPS" — baseline must contain this number.
|
||||
if strings.Contains(gatesContent, "28.4K write IOPS") || strings.Contains(gatesContent, "28,4") {
|
||||
// The gates doc cites write IOPS from baseline. Verify the baseline has it.
|
||||
if !strings.Contains(baselineContent, "28,") {
|
||||
t.Fatal("G6 cites write IOPS but baseline does not contain matching value")
|
||||
}
|
||||
}
|
||||
// More precise: baseline must contain the specific numbers cited in G6.
|
||||
if !strings.Contains(baselineContent, "28,347") && !strings.Contains(baselineContent, "28,429") &&
|
||||
!strings.Contains(baselineContent, "28,453") {
|
||||
t.Fatal("baseline must contain RF=2 sync_all write IOPS data (28,3xx-28,4xx range)")
|
||||
}
|
||||
if !strings.Contains(baselineContent, "136,648") {
|
||||
t.Fatal("baseline must contain RF=2 read IOPS data (136,648)")
|
||||
}
|
||||
|
||||
// 2. G5 cites "-56% replication tax" — baseline must contain this.
|
||||
if strings.Contains(gatesContent, "-56%") {
|
||||
if !strings.Contains(baselineContent, "-56%") {
|
||||
t.Fatal("G5 cites -56% replication tax but baseline does not contain -56%")
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Launch envelope claims specific transport/network combos — verify against baseline.
|
||||
// Claimed: NVMe-TCP @ 25Gbps RoCE
|
||||
if strings.Contains(gatesContent, "NVMe-TCP @ 25Gbps RoCE") {
|
||||
if !strings.Contains(baselineContent, "NVMe-TCP") || !strings.Contains(baselineContent, "RoCE") {
|
||||
t.Fatal("launch envelope claims NVMe-TCP @ RoCE but baseline has no such data")
|
||||
}
|
||||
}
|
||||
// Claimed: iSCSI @ 25Gbps RoCE
|
||||
if strings.Contains(gatesContent, "iSCSI @ 25Gbps RoCE") {
|
||||
if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "RoCE") {
|
||||
t.Fatal("launch envelope claims iSCSI @ RoCE but baseline has no such data")
|
||||
}
|
||||
}
|
||||
// Claimed: iSCSI @ 1Gbps
|
||||
if strings.Contains(gatesContent, "iSCSI @ 1Gbps") {
|
||||
if !strings.Contains(baselineContent, "iSCSI") || !strings.Contains(baselineContent, "1Gbps") {
|
||||
t.Fatal("launch envelope claims iSCSI @ 1Gbps but baseline has no such data")
|
||||
}
|
||||
}
|
||||
// Exclusion: NVMe-TCP @ 1Gbps must NOT be claimed as supported.
|
||||
if strings.Contains(gatesContent, "NOT included") {
|
||||
// Verify baseline indeed lacks NVMe-TCP @ 1Gbps.
|
||||
hasNvme1g := strings.Contains(baselineContent, "NVMe-TCP") && strings.Contains(baselineContent, "| NVMe-TCP | 1Gbps")
|
||||
if hasNvme1g {
|
||||
t.Fatal("baseline contains NVMe-TCP @ 1Gbps data but gates doc excludes it — resolve mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
// 4. G7 cites blocker ledger counts — verify against actual ledger.
|
||||
if strings.Contains(gatesContent, "3 diagnosed") {
|
||||
diagCount := 0
|
||||
for _, line := range strings.Split(blockerContent, "\n") {
|
||||
if strings.HasPrefix(strings.TrimSpace(line), "| B") {
|
||||
diagCount++
|
||||
}
|
||||
}
|
||||
if diagCount != 3 {
|
||||
t.Fatalf("G7 claims 3 diagnosed blockers but ledger has %d", diagCount)
|
||||
}
|
||||
}
|
||||
if strings.Contains(gatesContent, "3 unresolved") {
|
||||
unresCount := 0
|
||||
for _, line := range strings.Split(blockerContent, "\n") {
|
||||
if strings.HasPrefix(strings.TrimSpace(line), "| U") {
|
||||
unresCount++
|
||||
}
|
||||
}
|
||||
if unresCount != 3 {
|
||||
t.Fatalf("G7 claims 3 unresolved blockers but ledger has %d", unresCount)
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Floor doc gate thresholds must match code-defined gates.
|
||||
for workload, gate := range perfFloorGates {
|
||||
// The doc uses comma-formatted numbers (e.g., "1,000" or "5,000").
|
||||
minInt := int(gate.MinIOPS)
|
||||
// Check for both comma-formatted and plain forms.
|
||||
found := false
|
||||
for _, form := range []string{
|
||||
fmt.Sprintf("%d", minInt), // "1000"
|
||||
fmt.Sprintf("%d,%03d", minInt/1000, minInt%1000), // "1,000"
|
||||
} {
|
||||
if strings.Contains(floorContent, form) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("floor doc gate for %s should cite minimum %d IOPS but doesn't", workload, minInt)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("P12P4 RolloutGate: floor doc %d bytes, gates doc %d bytes, %d gate items",
|
||||
len(floorData), len(gatesData), gateLines)
|
||||
t.Log("P12P4 RolloutGate: semantic cross-checks passed (baseline, blocker ledger, gate thresholds)")
|
||||
t.Log("P12P4 RolloutGate: PASS — bounded launch envelope with verified evidence alignment")
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
func init() {
|
||||
// Seed random for reproducible LBA patterns within a test run.
|
||||
mrand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// formatDuration formats a duration for table display.
|
||||
func formatDuration(d time.Duration) string {
|
||||
if d < time.Microsecond {
|
||||
return fmt.Sprintf("%dns", d.Nanoseconds())
|
||||
}
|
||||
if d < time.Millisecond {
|
||||
return fmt.Sprintf("%.1fus", float64(d.Nanoseconds())/1000.0)
|
||||
}
|
||||
return fmt.Sprintf("%.2fms", float64(d.Nanoseconds())/1e6)
|
||||
}
|
||||
308
weed/server/qa_block_soak_test.go
Normal file
308
weed/server/qa_block_soak_test.go
Normal file
@@ -0,0 +1,308 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol/v2bridge"
|
||||
)
|
||||
|
||||
// ============================================================
|
||||
// Phase 12 P2: Soak / Long-Run Stability Hardening
|
||||
//
|
||||
// Proves: repeated chosen-path cycles return to bounded truth
|
||||
// without hidden state drift or unbounded runtime artifacts.
|
||||
//
|
||||
// NOT diagnosability, NOT performance-floor, NOT rollout readiness.
|
||||
// ============================================================
|
||||
|
||||
const soakCycles = 5
|
||||
|
||||
type soakSetup struct {
|
||||
ms *MasterServer
|
||||
bs *BlockService
|
||||
store *storage.BlockVolumeStore
|
||||
dir string
|
||||
}
|
||||
|
||||
func newSoakSetup(t *testing.T) *soakSetup {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
store := storage.NewBlockVolumeStore()
|
||||
|
||||
ms := &MasterServer{
|
||||
blockRegistry: NewBlockVolumeRegistry(),
|
||||
blockAssignmentQueue: NewBlockAssignmentQueue(),
|
||||
blockFailover: newBlockFailoverState(),
|
||||
}
|
||||
ms.blockRegistry.MarkBlockCapable("vs1:9333")
|
||||
ms.blockRegistry.MarkBlockCapable("vs2:9333")
|
||||
|
||||
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
|
||||
sanitized := strings.ReplaceAll(server, ":", "_")
|
||||
serverDir := filepath.Join(dir, sanitized)
|
||||
os.MkdirAll(serverDir, 0755)
|
||||
volPath := filepath.Join(serverDir, fmt.Sprintf("%s.blk", name))
|
||||
vol, err := blockvol.CreateBlockVol(volPath, blockvol.CreateOptions{
|
||||
VolumeSize: 1 * 1024 * 1024,
|
||||
BlockSize: 4096,
|
||||
WALSize: 256 * 1024,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vol.Close()
|
||||
if _, err := store.AddBlockVolume(volPath, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
host := server
|
||||
if idx := strings.LastIndex(server, ":"); idx >= 0 {
|
||||
host = server[:idx]
|
||||
}
|
||||
return &blockAllocResult{
|
||||
Path: volPath,
|
||||
IQN: fmt.Sprintf("iqn.2024.test:%s", name),
|
||||
ISCSIAddr: host + ":3260",
|
||||
ReplicaDataAddr: server + ":14260",
|
||||
ReplicaCtrlAddr: server + ":14261",
|
||||
RebuildListenAddr: server + ":15000",
|
||||
}, nil
|
||||
}
|
||||
ms.blockVSDelete = func(ctx context.Context, server string, name string) error { return nil }
|
||||
|
||||
bs := &BlockService{
|
||||
blockStore: store,
|
||||
blockDir: filepath.Join(dir, "vs1_9333"),
|
||||
listenAddr: "127.0.0.1:3260",
|
||||
localServerID: "vs1:9333",
|
||||
v2Bridge: v2bridge.NewControlBridge(),
|
||||
v2Orchestrator: engine.NewRecoveryOrchestrator(),
|
||||
replStates: make(map[string]*volReplState),
|
||||
}
|
||||
bs.v2Recovery = NewRecoveryManager(bs)
|
||||
|
||||
t.Cleanup(func() {
|
||||
bs.v2Recovery.Shutdown()
|
||||
store.Close()
|
||||
})
|
||||
|
||||
return &soakSetup{ms: ms, bs: bs, store: store, dir: dir}
|
||||
}
|
||||
|
||||
func (s *soakSetup) deliver(server string) int {
|
||||
pending := s.ms.blockAssignmentQueue.Peek(server)
|
||||
if len(pending) == 0 {
|
||||
return 0
|
||||
}
|
||||
protoAssignments := blockvol.AssignmentsToProto(pending)
|
||||
goAssignments := blockvol.AssignmentsFromProto(protoAssignments)
|
||||
s.bs.ProcessAssignments(goAssignments)
|
||||
return len(goAssignments)
|
||||
}
|
||||
|
||||
// --- Repeated create/failover/recover cycles with end-of-cycle truth checks ---
|
||||
|
||||
func TestP12P2_RepeatedCycles_NoDrift(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for cycle := 1; cycle <= soakCycles; cycle++ {
|
||||
volName := fmt.Sprintf("soak-vol-%d", cycle)
|
||||
|
||||
// Step 1: Create.
|
||||
createResp, err := s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: volName, SizeBytes: 1 << 20,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("cycle %d create: %v", cycle, err)
|
||||
}
|
||||
primaryVS := createResp.VolumeServer
|
||||
|
||||
// Deliver initial assignment.
|
||||
s.bs.localServerID = primaryVS
|
||||
s.deliver(primaryVS)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
entry, ok := s.ms.blockRegistry.Lookup(volName)
|
||||
if !ok {
|
||||
t.Fatalf("cycle %d: volume not in registry", cycle)
|
||||
}
|
||||
|
||||
// Step 2: Failover.
|
||||
s.ms.blockRegistry.UpdateEntry(volName, func(e *BlockVolumeEntry) {
|
||||
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
|
||||
})
|
||||
s.ms.failoverBlockVolumes(primaryVS)
|
||||
|
||||
entryAfter, _ := s.ms.blockRegistry.Lookup(volName)
|
||||
if entryAfter.Epoch <= entry.Epoch {
|
||||
t.Fatalf("cycle %d: epoch did not increase: %d <= %d", cycle, entryAfter.Epoch, entry.Epoch)
|
||||
}
|
||||
|
||||
// Deliver failover assignment.
|
||||
newPrimary := entryAfter.VolumeServer
|
||||
s.bs.localServerID = newPrimary
|
||||
s.deliver(newPrimary)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Step 3: Reconnect.
|
||||
s.ms.recoverBlockVolumes(primaryVS)
|
||||
s.bs.localServerID = primaryVS
|
||||
s.deliver(primaryVS)
|
||||
s.bs.localServerID = newPrimary
|
||||
s.deliver(newPrimary)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// === End-of-cycle truth checks ===
|
||||
|
||||
// Registry: volume exists, epoch monotonic.
|
||||
finalEntry, ok := s.ms.blockRegistry.Lookup(volName)
|
||||
if !ok {
|
||||
t.Fatalf("cycle %d: volume missing from registry at end", cycle)
|
||||
}
|
||||
if finalEntry.Epoch < entryAfter.Epoch {
|
||||
t.Fatalf("cycle %d: registry epoch regressed: %d < %d", cycle, finalEntry.Epoch, entryAfter.Epoch)
|
||||
}
|
||||
|
||||
// VS-visible: promoted vol epoch matches.
|
||||
var volEpoch uint64
|
||||
if err := s.store.WithVolume(entryAfter.Path, func(vol *blockvol.BlockVol) error {
|
||||
volEpoch = vol.Epoch()
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("cycle %d: promoted vol access failed: %v", cycle, err)
|
||||
}
|
||||
if volEpoch < entryAfter.Epoch {
|
||||
t.Fatalf("cycle %d: vol epoch=%d < registry=%d", cycle, volEpoch, entryAfter.Epoch)
|
||||
}
|
||||
|
||||
// Publication: lookup matches registry truth (not just non-empty).
|
||||
lookupResp, err := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: volName})
|
||||
if err != nil {
|
||||
t.Fatalf("cycle %d: lookup failed: %v", cycle, err)
|
||||
}
|
||||
if lookupResp.IscsiAddr != finalEntry.ISCSIAddr {
|
||||
t.Fatalf("cycle %d: lookup iSCSI=%q != registry=%q", cycle, lookupResp.IscsiAddr, finalEntry.ISCSIAddr)
|
||||
}
|
||||
if lookupResp.VolumeServer != finalEntry.VolumeServer {
|
||||
t.Fatalf("cycle %d: lookup VS=%q != registry=%q", cycle, lookupResp.VolumeServer, finalEntry.VolumeServer)
|
||||
}
|
||||
|
||||
t.Logf("cycle %d: registry=%d vol=%d lookup=registry ✓",
|
||||
cycle, finalEntry.Epoch, volEpoch)
|
||||
}
|
||||
|
||||
t.Logf("P12P2 repeated cycles: %d cycles, all end-of-cycle truth checks passed", soakCycles)
|
||||
}
|
||||
|
||||
// --- Runtime state hygiene: no unbounded leftovers after cycles ---
|
||||
|
||||
func TestP12P2_RuntimeHygiene_NoLeftovers(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Create and delete several volumes to exercise lifecycle.
|
||||
for i := 1; i <= soakCycles; i++ {
|
||||
name := fmt.Sprintf("hygiene-vol-%d", i)
|
||||
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: name, SizeBytes: 1 << 20,
|
||||
})
|
||||
entry, _ := s.ms.blockRegistry.Lookup(name)
|
||||
s.bs.localServerID = entry.VolumeServer
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Delete all volumes.
|
||||
for i := 1; i <= soakCycles; i++ {
|
||||
name := fmt.Sprintf("hygiene-vol-%d", i)
|
||||
s.ms.DeleteBlockVolume(ctx, &master_pb.DeleteBlockVolumeRequest{Name: name})
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check: no stale recovery tasks.
|
||||
activeTasks := s.bs.v2Recovery.ActiveTaskCount()
|
||||
if activeTasks > 0 {
|
||||
t.Fatalf("stale recovery tasks: %d (expected 0 after all volumes deleted)", activeTasks)
|
||||
}
|
||||
|
||||
// Check: registry should have no entries for deleted volumes.
|
||||
for i := 1; i <= soakCycles; i++ {
|
||||
name := fmt.Sprintf("hygiene-vol-%d", i)
|
||||
if _, ok := s.ms.blockRegistry.Lookup(name); ok {
|
||||
t.Fatalf("stale registry entry: %s (should be deleted)", name)
|
||||
}
|
||||
}
|
||||
|
||||
// Check: assignment queue should not have unbounded stale entries.
|
||||
for _, server := range []string{"vs1:9333", "vs2:9333"} {
|
||||
pending := s.ms.blockAssignmentQueue.Peek(server)
|
||||
// Some pending entries may exist (lease grants etc), but check for bounded size.
|
||||
if len(pending) > soakCycles*2 {
|
||||
t.Fatalf("unbounded stale assignments for %s: %d", server, len(pending))
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("P12P2 hygiene: %d volumes created+deleted, 0 stale tasks, 0 stale registry, bounded queue", soakCycles)
|
||||
}
|
||||
|
||||
// --- Steady-state repeated delivery: idempotence holds over many cycles ---
|
||||
|
||||
func TestP12P2_SteadyState_IdempotenceHolds(t *testing.T) {
|
||||
s := newSoakSetup(t)
|
||||
ctx := context.Background()
|
||||
|
||||
s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{
|
||||
Name: "steady-vol-1", SizeBytes: 1 << 20,
|
||||
})
|
||||
|
||||
entry, _ := s.ms.blockRegistry.Lookup("steady-vol-1")
|
||||
s.bs.localServerID = entry.VolumeServer
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
replicaID := entry.Path + "/" + entry.Replicas[0].Server
|
||||
eventsAfterFirst := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID))
|
||||
if eventsAfterFirst == 0 {
|
||||
t.Fatal("first delivery must create events")
|
||||
}
|
||||
|
||||
// Deliver the same assignment many times.
|
||||
for i := 0; i < soakCycles*2; i++ {
|
||||
s.deliver(entry.VolumeServer)
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
|
||||
eventsAfterSoak := len(s.bs.v2Orchestrator.Log.EventsFor(replicaID))
|
||||
if eventsAfterSoak != eventsAfterFirst {
|
||||
t.Fatalf("idempotence drift: events %d → %d after %d repeated deliveries",
|
||||
eventsAfterFirst, eventsAfterSoak, soakCycles*2)
|
||||
}
|
||||
|
||||
// Verify: registry, vol, and lookup are still coherent.
|
||||
finalEntry, _ := s.ms.blockRegistry.Lookup("steady-vol-1")
|
||||
if finalEntry.Epoch != entry.Epoch {
|
||||
t.Fatalf("epoch drifted: %d → %d", entry.Epoch, finalEntry.Epoch)
|
||||
}
|
||||
|
||||
lookupResp, _ := s.ms.LookupBlockVolume(ctx, &master_pb.LookupBlockVolumeRequest{Name: "steady-vol-1"})
|
||||
if lookupResp.IscsiAddr != finalEntry.ISCSIAddr {
|
||||
t.Fatalf("lookup iSCSI=%q != registry=%q after soak", lookupResp.IscsiAddr, finalEntry.ISCSIAddr)
|
||||
}
|
||||
if lookupResp.VolumeServer != finalEntry.VolumeServer {
|
||||
t.Fatalf("lookup VS=%q != registry=%q after soak", lookupResp.VolumeServer, finalEntry.VolumeServer)
|
||||
}
|
||||
|
||||
t.Logf("P12P2 steady state: %d repeated deliveries, events stable at %d, lookup=registry ✓",
|
||||
soakCycles*2, eventsAfterFirst)
|
||||
}
|
||||
Reference in New Issue
Block a user