From 013f3e7ccbc9f2bef6498967cc1e85846e82e2c0 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sun, 5 Apr 2026 18:41:44 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=2020=20T5=20=E2=80=94=20ClusterRe?= =?UTF-8?q?plicationMode=20on=20master?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ClusterReplicationMode as a distinct master-owned cluster-level replication health judgment, computed from multi-replica facts: replica LSN lag, heartbeat freshness, role state. Monotonic: worst replica state dominates. Modes: "no_replicas" (RF=1), "keepup" (all healthy), "catching_up" (replica behind but recoverable), "degraded" (stale heartbeat or barrier failure), "needs_rebuild" (unrecoverable gap or rebuilding role). Distinct from EngineProjectionMode (VS-local engine truth) and VolumeMode (legacy). They answer different questions, live in different fields, have different names. Tests explicitly prove the two can differ without conflict. Computed in recomputeReplicaState() alongside existing VolumeMode. Updated on every heartbeat that touches the entry. 9 tests: keepup, catching_up, stale degraded, LSN gap needs_rebuild, rebuilding role, no_replicas, distinctness from EngineProjectionMode, heartbeat-driven update, worst-replica-dominates (RF3). Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/master_block_cluster_mode_test.go | 237 ++++++++++++++++++ weed/server/master_block_registry.go | 96 +++++++ 2 files changed, 333 insertions(+) create mode 100644 weed/server/master_block_cluster_mode_test.go diff --git a/weed/server/master_block_cluster_mode_test.go b/weed/server/master_block_cluster_mode_test.go new file mode 100644 index 000000000..78a69d2b2 --- /dev/null +++ b/weed/server/master_block_cluster_mode_test.go @@ -0,0 +1,237 @@ +package weed_server + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +func TestT5_AllReplicasHealthy_Keepup(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-keepup", VolumeServer: "primary:8080", + Path: "/data/vol-crm-keepup.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-keepup.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-keepup") + if entry.ClusterReplicationMode != "keepup" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "keepup") + } +} + +func TestT5_ReplicaBehind_CatchingUp(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-catch", VolumeServer: "primary:8080", + Path: "/data/vol-crm-catch.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-catch.blk", + HealthScore: 1.0, WALHeadLSN: 90, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-catch") + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up") + } +} + +func TestT5_StaleHeartbeat_Degraded(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-degrade", VolumeServer: "primary:8080", + Path: "/data/vol-crm-degrade.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-degrade.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), + LastHeartbeat: time.Now().Add(-120 * time.Second), // stale + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-degrade") + if entry.ClusterReplicationMode != "degraded" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "degraded") + } +} + +func TestT5_UnrecoverableGap_NeedsRebuild(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-rebuild", VolumeServer: "primary:8080", + Path: "/data/vol-crm-rebuild.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 5000, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-rebuild.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // lag > 1000 + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-rebuild") + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild") + } +} + +func TestT5_RebuildingRole_NeedsRebuild(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-rebuilding", VolumeServer: "primary:8080", + Path: "/data/vol-crm-rebuilding.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-rebuilding.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-rebuilding") + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild") + } +} + +func TestT5_NoReplicas_NoReplicasMode(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-none", VolumeServer: "primary:8080", + Path: "/data/vol-crm-none.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 1, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-none") + if entry.ClusterReplicationMode != "no_replicas" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "no_replicas") + } +} + +func TestT5_ClusterReplicationModeDistinctFromEngineProjectionMode(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-distinct", VolumeServer: "primary:8080", + Path: "/data/vol-crm-distinct.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + // EngineProjectionMode says "publish_healthy" (VS-local). + EngineProjectionMode: "publish_healthy", + HasEngineProjectionMode: true, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-distinct.blk", + HealthScore: 1.0, WALHeadLSN: 50, Ready: true, // behind → catching_up + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-distinct") + // EngineProjectionMode is VS-local: "publish_healthy" + if entry.EngineProjectionMode != "publish_healthy" { + t.Fatalf("EngineProjectionMode=%q, want %q", entry.EngineProjectionMode, "publish_healthy") + } + // ClusterReplicationMode is master-computed: "catching_up" because replica is behind. + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up") + } + // They must be different — proving the two are independent. + if entry.EngineProjectionMode == entry.ClusterReplicationMode { + t.Fatal("EngineProjectionMode and ClusterReplicationMode should differ in this scenario") + } +} + +func TestT5_HeartbeatUpdatesClusterReplicationMode(t *testing.T) { + r := NewBlockVolumeRegistry() + r.MarkBlockCapable("primary:8080") + r.MarkBlockCapable("replica:8080") + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-hb", VolumeServer: "primary:8080", + Path: "/data/vol-crm-hb.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-hb-replica.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + // Initial state: keepup. + entry, _ := r.Lookup("vol-crm-hb") + if entry.ClusterReplicationMode != "keepup" { + t.Fatalf("initial ClusterReplicationMode=%q, want keepup", entry.ClusterReplicationMode) + } + + // Primary heartbeat with higher WALHeadLSN → replica now behind. + r.UpdateFullHeartbeat("primary:8080", []*master_pb.BlockVolumeInfoMessage{{ + Path: "/data/vol-crm-hb.blk", + Role: blockvol.RoleToWire(blockvol.RolePrimary), + WalHeadLsn: 200, + }}, "") + + entry, _ = r.Lookup("vol-crm-hb") + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("after primary advance: ClusterReplicationMode=%q, want catching_up", entry.ClusterReplicationMode) + } +} + +func TestT5_WorstReplicaDominates(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-worst", VolumeServer: "primary:8080", + Path: "/data/vol-crm-worst.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 3, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{ + { + Server: "replica1:8080", Path: "/data/vol-crm-worst.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // keepup + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }, + { + Server: "replica2:8080", Path: "/data/vol-crm-worst.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(), // needs_rebuild + }, + }, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-worst") + // One replica is keepup, one is needs_rebuild → worst dominates. + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want needs_rebuild (worst dominates)", entry.ClusterReplicationMode) + } +} diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index a98b63d2f..e7806c269 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -95,6 +95,12 @@ type BlockVolumeEntry struct { VolumeMode string // "allocated_only", "bootstrap_pending", "publish_healthy", "degraded", "needs_rebuild" WALHeadLSN uint64 // primary WAL head LSN from heartbeat + // T5: Master-owned cluster-level replication health judgment. + // Distinct from EngineProjectionMode (VS-local) and VolumeMode (legacy). + // Computed from multi-replica facts: replica LSN lag, heartbeat freshness, + // role state, and recovery phase. Monotonic: worst replica state dominates. + ClusterReplicationMode string // "keepup", "catching_up", "degraded", "needs_rebuild", "no_replicas" + // CP8-3-1: Durability mode. DurabilityMode string // "best_effort", "sync_all", "sync_quorum" @@ -164,6 +170,96 @@ func (e *BlockVolumeEntry) recomputeReplicaState() { // CP13-9: compute normalized VolumeMode for external surfaces. e.VolumeMode = e.computeVolumeMode() + + // T5: compute cluster-level replication mode from multi-replica facts. + e.ClusterReplicationMode = e.computeClusterReplicationMode() +} + +// computeClusterReplicationMode returns the master-owned cluster-level +// replication health judgment for the RF2 set. This is distinct from +// EngineProjectionMode (VS-local engine truth) and VolumeMode (legacy). +// +// Mode meanings: +// - "no_replicas": RF=1 or no replicas configured (not an RF2 judgment) +// - "keepup": all replicas healthy, LSN within tolerance, heartbeat fresh +// - "catching_up": at least one replica is behind but recoverable +// - "degraded": at least one replica has barrier failure or impaired state +// - "needs_rebuild": at least one replica has unrecoverable gap +// +// Monotonic: worst replica state dominates the cluster mode. +func (e *BlockVolumeEntry) computeClusterReplicationMode() string { + if len(e.Replicas) == 0 { + return "no_replicas" + } + + worst := "keepup" + for _, ri := range e.Replicas { + replicaMode := evaluateReplicaHealth(ri, e.WALHeadLSN) + worst = worseClusterMode(worst, replicaMode) + } + return worst +} + +// evaluateReplicaHealth returns the cluster-level health classification for +// one replica. Does NOT use EngineProjectionMode — computes from registry- +// observed facts only (heartbeat freshness, LSN lag, role). +func evaluateReplicaHealth(ri ReplicaInfo, primaryWALHeadLSN uint64) string { + // Rebuilding role is needs_rebuild. + if blockvol.RoleFromWire(ri.Role) == blockvol.RoleRebuilding { + return "needs_rebuild" + } + + // Stale heartbeat (>60s) is degraded. + if !ri.LastHeartbeat.IsZero() && time.Since(ri.LastHeartbeat) > 60*time.Second { + return "degraded" + } + + // No heartbeat ever received — catching up. + if ri.LastHeartbeat.IsZero() { + return "catching_up" + } + + // LSN lag check: if primary has progress and replica is behind. + if primaryWALHeadLSN > 0 && ri.WALHeadLSN < primaryWALHeadLSN { + lag := primaryWALHeadLSN - ri.WALHeadLSN + if lag > 1000 { + return "needs_rebuild" + } + if lag > 0 { + return "catching_up" + } + } + + // Not ready is catching_up. + if !ri.Ready { + return "catching_up" + } + + return "keepup" +} + +// worseClusterMode returns the more severe of two cluster modes. +// Severity order: needs_rebuild > degraded > catching_up > keepup. +func worseClusterMode(a, b string) string { + if clusterModeRank(b) > clusterModeRank(a) { + return b + } + return a +} + +func clusterModeRank(mode string) int { + switch mode { + case "keepup": + return 0 + case "catching_up": + return 1 + case "degraded": + return 2 + case "needs_rebuild": + return 3 + default: + return 0 + } } // computeVolumeMode returns the normalized mode string for CP13-9.