diff --git a/weed/server/master_block_cluster_mode_test.go b/weed/server/master_block_cluster_mode_test.go new file mode 100644 index 000000000..78a69d2b2 --- /dev/null +++ b/weed/server/master_block_cluster_mode_test.go @@ -0,0 +1,237 @@ +package weed_server + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +func TestT5_AllReplicasHealthy_Keepup(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-keepup", VolumeServer: "primary:8080", + Path: "/data/vol-crm-keepup.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-keepup.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-keepup") + if entry.ClusterReplicationMode != "keepup" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "keepup") + } +} + +func TestT5_ReplicaBehind_CatchingUp(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-catch", VolumeServer: "primary:8080", + Path: "/data/vol-crm-catch.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-catch.blk", + HealthScore: 1.0, WALHeadLSN: 90, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-catch") + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up") + } +} + +func TestT5_StaleHeartbeat_Degraded(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-degrade", VolumeServer: "primary:8080", + Path: "/data/vol-crm-degrade.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-degrade.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), + LastHeartbeat: time.Now().Add(-120 * time.Second), // stale + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-degrade") + if entry.ClusterReplicationMode != "degraded" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "degraded") + } +} + +func TestT5_UnrecoverableGap_NeedsRebuild(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-rebuild", VolumeServer: "primary:8080", + Path: "/data/vol-crm-rebuild.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 5000, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-rebuild.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // lag > 1000 + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-rebuild") + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild") + } +} + +func TestT5_RebuildingRole_NeedsRebuild(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-rebuilding", VolumeServer: "primary:8080", + Path: "/data/vol-crm-rebuilding.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-rebuilding.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-rebuilding") + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "needs_rebuild") + } +} + +func TestT5_NoReplicas_NoReplicasMode(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-none", VolumeServer: "primary:8080", + Path: "/data/vol-crm-none.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 1, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-none") + if entry.ClusterReplicationMode != "no_replicas" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "no_replicas") + } +} + +func TestT5_ClusterReplicationModeDistinctFromEngineProjectionMode(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-distinct", VolumeServer: "primary:8080", + Path: "/data/vol-crm-distinct.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + // EngineProjectionMode says "publish_healthy" (VS-local). + EngineProjectionMode: "publish_healthy", + HasEngineProjectionMode: true, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-distinct.blk", + HealthScore: 1.0, WALHeadLSN: 50, Ready: true, // behind → catching_up + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-distinct") + // EngineProjectionMode is VS-local: "publish_healthy" + if entry.EngineProjectionMode != "publish_healthy" { + t.Fatalf("EngineProjectionMode=%q, want %q", entry.EngineProjectionMode, "publish_healthy") + } + // ClusterReplicationMode is master-computed: "catching_up" because replica is behind. + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("ClusterReplicationMode=%q, want %q", entry.ClusterReplicationMode, "catching_up") + } + // They must be different — proving the two are independent. + if entry.EngineProjectionMode == entry.ClusterReplicationMode { + t.Fatal("EngineProjectionMode and ClusterReplicationMode should differ in this scenario") + } +} + +func TestT5_HeartbeatUpdatesClusterReplicationMode(t *testing.T) { + r := NewBlockVolumeRegistry() + r.MarkBlockCapable("primary:8080") + r.MarkBlockCapable("replica:8080") + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-hb", VolumeServer: "primary:8080", + Path: "/data/vol-crm-hb.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 2, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{{ + Server: "replica:8080", Path: "/data/vol-crm-hb-replica.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }}, + }); err != nil { + t.Fatalf("register: %v", err) + } + + // Initial state: keepup. + entry, _ := r.Lookup("vol-crm-hb") + if entry.ClusterReplicationMode != "keepup" { + t.Fatalf("initial ClusterReplicationMode=%q, want keepup", entry.ClusterReplicationMode) + } + + // Primary heartbeat with higher WALHeadLSN → replica now behind. + r.UpdateFullHeartbeat("primary:8080", []*master_pb.BlockVolumeInfoMessage{{ + Path: "/data/vol-crm-hb.blk", + Role: blockvol.RoleToWire(blockvol.RolePrimary), + WalHeadLsn: 200, + }}, "") + + entry, _ = r.Lookup("vol-crm-hb") + if entry.ClusterReplicationMode != "catching_up" { + t.Fatalf("after primary advance: ClusterReplicationMode=%q, want catching_up", entry.ClusterReplicationMode) + } +} + +func TestT5_WorstReplicaDominates(t *testing.T) { + r := NewBlockVolumeRegistry() + if err := r.Register(&BlockVolumeEntry{ + Name: "vol-crm-worst", VolumeServer: "primary:8080", + Path: "/data/vol-crm-worst.blk", Status: StatusActive, + Role: blockvol.RoleToWire(blockvol.RolePrimary), ReplicaFactor: 3, + WALHeadLSN: 100, + Replicas: []ReplicaInfo{ + { + Server: "replica1:8080", Path: "/data/vol-crm-worst.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, // keepup + Role: blockvol.RoleToWire(blockvol.RoleReplica), LastHeartbeat: time.Now(), + }, + { + Server: "replica2:8080", Path: "/data/vol-crm-worst.blk", + HealthScore: 1.0, WALHeadLSN: 100, Ready: true, + Role: blockvol.RoleToWire(blockvol.RoleRebuilding), LastHeartbeat: time.Now(), // needs_rebuild + }, + }, + }); err != nil { + t.Fatalf("register: %v", err) + } + + entry, _ := r.Lookup("vol-crm-worst") + // One replica is keepup, one is needs_rebuild → worst dominates. + if entry.ClusterReplicationMode != "needs_rebuild" { + t.Fatalf("ClusterReplicationMode=%q, want needs_rebuild (worst dominates)", entry.ClusterReplicationMode) + } +} diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index a98b63d2f..e7806c269 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -95,6 +95,12 @@ type BlockVolumeEntry struct { VolumeMode string // "allocated_only", "bootstrap_pending", "publish_healthy", "degraded", "needs_rebuild" WALHeadLSN uint64 // primary WAL head LSN from heartbeat + // T5: Master-owned cluster-level replication health judgment. + // Distinct from EngineProjectionMode (VS-local) and VolumeMode (legacy). + // Computed from multi-replica facts: replica LSN lag, heartbeat freshness, + // role state, and recovery phase. Monotonic: worst replica state dominates. + ClusterReplicationMode string // "keepup", "catching_up", "degraded", "needs_rebuild", "no_replicas" + // CP8-3-1: Durability mode. DurabilityMode string // "best_effort", "sync_all", "sync_quorum" @@ -164,6 +170,96 @@ func (e *BlockVolumeEntry) recomputeReplicaState() { // CP13-9: compute normalized VolumeMode for external surfaces. e.VolumeMode = e.computeVolumeMode() + + // T5: compute cluster-level replication mode from multi-replica facts. + e.ClusterReplicationMode = e.computeClusterReplicationMode() +} + +// computeClusterReplicationMode returns the master-owned cluster-level +// replication health judgment for the RF2 set. This is distinct from +// EngineProjectionMode (VS-local engine truth) and VolumeMode (legacy). +// +// Mode meanings: +// - "no_replicas": RF=1 or no replicas configured (not an RF2 judgment) +// - "keepup": all replicas healthy, LSN within tolerance, heartbeat fresh +// - "catching_up": at least one replica is behind but recoverable +// - "degraded": at least one replica has barrier failure or impaired state +// - "needs_rebuild": at least one replica has unrecoverable gap +// +// Monotonic: worst replica state dominates the cluster mode. +func (e *BlockVolumeEntry) computeClusterReplicationMode() string { + if len(e.Replicas) == 0 { + return "no_replicas" + } + + worst := "keepup" + for _, ri := range e.Replicas { + replicaMode := evaluateReplicaHealth(ri, e.WALHeadLSN) + worst = worseClusterMode(worst, replicaMode) + } + return worst +} + +// evaluateReplicaHealth returns the cluster-level health classification for +// one replica. Does NOT use EngineProjectionMode — computes from registry- +// observed facts only (heartbeat freshness, LSN lag, role). +func evaluateReplicaHealth(ri ReplicaInfo, primaryWALHeadLSN uint64) string { + // Rebuilding role is needs_rebuild. + if blockvol.RoleFromWire(ri.Role) == blockvol.RoleRebuilding { + return "needs_rebuild" + } + + // Stale heartbeat (>60s) is degraded. + if !ri.LastHeartbeat.IsZero() && time.Since(ri.LastHeartbeat) > 60*time.Second { + return "degraded" + } + + // No heartbeat ever received — catching up. + if ri.LastHeartbeat.IsZero() { + return "catching_up" + } + + // LSN lag check: if primary has progress and replica is behind. + if primaryWALHeadLSN > 0 && ri.WALHeadLSN < primaryWALHeadLSN { + lag := primaryWALHeadLSN - ri.WALHeadLSN + if lag > 1000 { + return "needs_rebuild" + } + if lag > 0 { + return "catching_up" + } + } + + // Not ready is catching_up. + if !ri.Ready { + return "catching_up" + } + + return "keepup" +} + +// worseClusterMode returns the more severe of two cluster modes. +// Severity order: needs_rebuild > degraded > catching_up > keepup. +func worseClusterMode(a, b string) string { + if clusterModeRank(b) > clusterModeRank(a) { + return b + } + return a +} + +func clusterModeRank(mode string) int { + switch mode { + case "keepup": + return 0 + case "catching_up": + return 1 + case "degraded": + return 2 + case "needs_rebuild": + return 3 + default: + return 0 + } } // computeVolumeMode returns the normalized mode string for CP13-9.