feat: Phase 20 T5 — ClusterReplicationMode on master

Add ClusterReplicationMode as a distinct master-owned cluster-level
replication health judgment, computed from multi-replica facts:
replica LSN lag, heartbeat freshness, role state. Monotonic: worst
replica state dominates.

Modes: "no_replicas" (RF=1), "keepup" (all healthy), "catching_up"
(replica behind but recoverable), "degraded" (stale heartbeat or
barrier failure), "needs_rebuild" (unrecoverable gap or rebuilding
role).

Distinct from EngineProjectionMode (VS-local engine truth) and
VolumeMode (legacy). They answer different questions, live in
different fields, have different names. Tests explicitly prove the
two can differ without conflict.

Computed in recomputeReplicaState() alongside existing VolumeMode.
Updated on every heartbeat that touches the entry.

9 tests: keepup, catching_up, stale degraded, LSN gap needs_rebuild,
rebuilding role, no_replicas, distinctness from EngineProjectionMode,
heartbeat-driven update, worst-replica-dominates (RF3).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
pingqiu
2026-04-05 18:41:44 -07:00
parent 9cead1b502
commit 013f3e7ccb
2 changed files with 333 additions and 0 deletions

View File

@@ -0,0 +1,237 @@
package weed_server
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
func TestT5_AllReplicasHealthy_Keepup(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-keepup", VolumeServer: "primary:8080",
Path: "/data/vol-crm-keepup.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-keepup.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-keepup")
if entry.ClusterReplicationMode != "keepup" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "keepup")
}
}
func TestT5_ReplicaBehind_CatchingUp(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-catch", VolumeServer: "primary:8080",
Path: "/data/vol-crm-catch.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-catch.blk",
HealthScore: 1.0, WALHeadLSN: 90, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-catch")
if entry.ClusterReplicationMode != "catching_up" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up")
}
}
func TestT5_StaleHeartbeat_Degraded(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-degrade", VolumeServer: "primary:8080",
Path: "/data/vol-crm-degrade.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-degrade.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleReplica),
LastHeartbeat: time.Now().Add(-120 * time.Second), // stale
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-degrade")
if entry.ClusterReplicationMode != "degraded" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "degraded")
}
}
func TestT5_UnrecoverableGap_NeedsRebuild(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-rebuild", VolumeServer: "primary:8080",
Path: "/data/vol-crm-rebuild.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 5000,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-rebuild.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // lag > 1000
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-rebuild")
if entry.ClusterReplicationMode != "needs_rebuild" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild")
}
}
func TestT5_RebuildingRole_NeedsRebuild(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-rebuilding", VolumeServer: "primary:8080",
Path: "/data/vol-crm-rebuilding.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-rebuilding.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-rebuilding")
if entry.ClusterReplicationMode != "needs_rebuild" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild")
}
}
func TestT5_NoReplicas_NoReplicasMode(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-none", VolumeServer: "primary:8080",
Path: "/data/vol-crm-none.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 1,
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-none")
if entry.ClusterReplicationMode != "no_replicas" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "no_replicas")
}
}
func TestT5_ClusterReplicationModeDistinctFromEngineProjectionMode(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-distinct", VolumeServer: "primary:8080",
Path: "/data/vol-crm-distinct.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
// EngineProjectionMode says "publish_healthy" (VS-local).
EngineProjectionMode: "publish_healthy",
HasEngineProjectionMode: true,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-distinct.blk",
HealthScore: 1.0, WALHeadLSN: 50, Ready: true, // behind → catching_up
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-distinct")
// EngineProjectionMode is VS-local: "publish_healthy"
if entry.EngineProjectionMode != "publish_healthy" {
t.Fatalf("EngineProjectionMode=%q, want %q", entry.EngineProjectionMode, "publish_healthy")
}
// ClusterReplicationMode is master-computed: "catching_up" because replica is behind.
if entry.ClusterReplicationMode != "catching_up" {
t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up")
}
// They must be different — proving the two are independent.
if entry.EngineProjectionMode == entry.ClusterReplicationMode {
t.Fatal("EngineProjectionMode and ClusterReplicationMode should differ in this scenario")
}
}
func TestT5_HeartbeatUpdatesClusterReplicationMode(t *testing.T) {
r := NewBlockVolumeRegistry()
r.MarkBlockCapable("primary:8080")
r.MarkBlockCapable("replica:8080")
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-hb", VolumeServer: "primary:8080",
Path: "/data/vol-crm-hb.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{{
Server: "replica:8080", Path: "/data/vol-crm-hb-replica.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
}},
}); err != nil {
t.Fatalf("register: %v", err)
}
// Initial state: keepup.
entry, _ := r.Lookup("vol-crm-hb")
if entry.ClusterReplicationMode != "keepup" {
t.Fatalf("initial ClusterReplicationMode=%q, want keepup", entry.ClusterReplicationMode)
}
// Primary heartbeat with higher WALHeadLSN → replica now behind.
r.UpdateFullHeartbeat("primary:8080", []*master_pb.BlockVolumeInfoMessage{{
Path: "/data/vol-crm-hb.blk",
Role: blockvol.RoleToWire(blockvol.RolePrimary),
WalHeadLsn: 200,
}}, "")
entry, _ = r.Lookup("vol-crm-hb")
if entry.ClusterReplicationMode != "catching_up" {
t.Fatalf("after primary advance: ClusterReplicationMode=%q, want catching_up", entry.ClusterReplicationMode)
}
}
func TestT5_WorstReplicaDominates(t *testing.T) {
r := NewBlockVolumeRegistry()
if err := r.Register(&BlockVolumeEntry{
Name: "vol-crm-worst", VolumeServer: "primary:8080",
Path: "/data/vol-crm-worst.blk", Status: StatusActive,
Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 3,
WALHeadLSN: 100,
Replicas: []ReplicaInfo{
{
Server: "replica1:8080", Path: "/data/vol-crm-worst.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // keepup
Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(),
},
{
Server: "replica2:8080", Path: "/data/vol-crm-worst.blk",
HealthScore: 1.0, WALHeadLSN: 100, Ready: true,
Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(), // needs_rebuild
},
},
}); err != nil {
t.Fatalf("register: %v", err)
}
entry, _ := r.Lookup("vol-crm-worst")
// One replica is keepup, one is needs_rebuild → worst dominates.
if entry.ClusterReplicationMode != "needs_rebuild" {
t.Fatalf("ClusterReplicationMode=%q, want needs_rebuild (worst dominates)", entry.ClusterReplicationMode)
}
}

View File

@@ -95,6 +95,12 @@ type BlockVolumeEntry struct {
VolumeMode string // "allocated_only", "bootstrap_pending", "publish_healthy", "degraded", "needs_rebuild"
WALHeadLSN uint64 // primary WAL head LSN from heartbeat
// T5: Master-owned cluster-level replication health judgment.
// Distinct from EngineProjectionMode (VS-local) and VolumeMode (legacy).
// Computed from multi-replica facts: replica LSN lag, heartbeat freshness,
// role state, and recovery phase. Monotonic: worst replica state dominates.
ClusterReplicationMode string // "keepup", "catching_up", "degraded", "needs_rebuild", "no_replicas"
// CP8-3-1: Durability mode.
DurabilityMode string // "best_effort", "sync_all", "sync_quorum"
@@ -164,6 +170,96 @@ func (e *BlockVolumeEntry) recomputeReplicaState() {
// CP13-9: compute normalized VolumeMode for external surfaces.
e.VolumeMode = e.computeVolumeMode()
// T5: compute cluster-level replication mode from multi-replica facts.
e.ClusterReplicationMode = e.computeClusterReplicationMode()
}
// computeClusterReplicationMode returns the master-owned cluster-level
// replication health judgment for the RF2 set. This is distinct from
// EngineProjectionMode (VS-local engine truth) and VolumeMode (legacy).
//
// Mode meanings:
// - "no_replicas": RF=1 or no replicas configured (not an RF2 judgment)
// - "keepup": all replicas healthy, LSN within tolerance, heartbeat fresh
// - "catching_up": at least one replica is behind but recoverable
// - "degraded": at least one replica has barrier failure or impaired state
// - "needs_rebuild": at least one replica has unrecoverable gap
//
// Monotonic: worst replica state dominates the cluster mode.
func (e *BlockVolumeEntry) computeClusterReplicationMode() string {
if len(e.Replicas) == 0 {
return "no_replicas"
}
worst := "keepup"
for _, ri := range e.Replicas {
replicaMode := evaluateReplicaHealth(ri, e.WALHeadLSN)
worst = worseClusterMode(worst, replicaMode)
}
return worst
}
// evaluateReplicaHealth returns the cluster-level health classification for
// one replica. Does NOT use EngineProjectionMode — computes from registry-
// observed facts only (heartbeat freshness, LSN lag, role).
func evaluateReplicaHealth(ri ReplicaInfo, primaryWALHeadLSN uint64) string {
// Rebuilding role is needs_rebuild.
if blockvol.RoleFromWire(ri.Role) == blockvol.RoleRebuilding {
return "needs_rebuild"
}
// Stale heartbeat (>60s) is degraded.
if !ri.LastHeartbeat.IsZero() && time.Since(ri.LastHeartbeat) > 60*time.Second {
return "degraded"
}
// No heartbeat ever received — catching up.
if ri.LastHeartbeat.IsZero() {
return "catching_up"
}
// LSN lag check: if primary has progress and replica is behind.
if primaryWALHeadLSN > 0 && ri.WALHeadLSN < primaryWALHeadLSN {
lag := primaryWALHeadLSN - ri.WALHeadLSN
if lag > 1000 {
return "needs_rebuild"
}
if lag > 0 {
return "catching_up"
}
}
// Not ready is catching_up.
if !ri.Ready {
return "catching_up"
}
return "keepup"
}
// worseClusterMode returns the more severe of two cluster modes.
// Severity order: needs_rebuild > degraded > catching_up > keepup.
func worseClusterMode(a, b string) string {
if clusterModeRank(b) > clusterModeRank(a) {
return b
}
return a
}
func clusterModeRank(mode string) int {
switch mode {
case "keepup":
return 0
case "catching_up":
return 1
case "degraded":
return 2
case "needs_rebuild":
return 3
default:
return 0
}
}
// computeVolumeMode returns the normalized mode string for CP13-9.