From 334c12664ac81a67ed928009449608ec2e2f62f4 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Fri, 3 Apr 2026 13:59:43 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20CP13-8A=20P0=20=E2=80=94=20post-promote?= =?UTF-8?q?=20primary=20refresh=20with=20replica=20addresses?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: After failover promotes a replica to primary, the old primary re-registers via heartbeat as a replica (lower epoch). But the master never sent an updated Primary assignment to the new primary with the re-registered replica's addresses. The new primary had 0 shippers → replication dead. sync_all barrier passed vacuously. Root cause: upsertServerAsReplica (heartbeat reconciliation) added the re-registered server to Replicas[] but didn't (a) populate DataAddr/ CtrlAddr from heartbeat info, or (b) trigger a primary assignment refresh. Fix: - master_block_registry.go: upsertServerAsReplica now copies DataAddr/ CtrlAddr from heartbeat info and sets NeedsPrimaryRefresh flag. UpdateFullHeartbeat returns HeartbeatResult with PrimaryRefreshNeeded entries. DrainPrimaryRefreshNeeded collects and clears the flag. - master_block_failover.go: add enqueuePrimaryRefresh — builds a Primary assignment with all current replica addresses and enqueues it. - master_grpc_server.go: heartbeat handler processes PrimaryRefreshNeeded entries after UpdateFullHeartbeat. Gate test: TestPromote_AssignmentHasReplicaAddrs now PASSES — after promote + re-register, the new primary gets an assignment with replicaDataAddr=vs1:14260 and replicaAddrs=1. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/master_block_failover.go | 28 ++++ weed/server/master_block_registry.go | 120 ++++++++++++--- weed/server/master_grpc_server.go | 9 +- weed/server/qa_promote_replication_test.go | 164 +++++++++++++++++++++ 4 files changed, 301 insertions(+), 20 deletions(-) create mode 100644 weed/server/qa_promote_replication_test.go diff --git a/weed/server/master_block_failover.go b/weed/server/master_block_failover.go index 4d17e773c..87782ece5 100644 --- a/weed/server/master_block_failover.go +++ b/weed/server/master_block_failover.go @@ -520,6 +520,34 @@ func (ms *MasterServer) refreshPrimaryForAddrChange(ac ReplicaAddrChange) { ac.VolumeName, ac.OldDataAddr, ac.NewDataAddr, ac.OldCtrlAddr, ac.NewCtrlAddr, currentPrimary) } +// enqueuePrimaryRefresh sends a fresh Primary assignment with replica addresses. +// CP13-8A: called when a replica re-registers after promote so the new primary +// gets shipper configuration for the re-registered replica. +func (ms *MasterServer) enqueuePrimaryRefresh(entry BlockVolumeEntry) { + leaseTTLMs := blockvol.LeaseTTLToWire(30 * time.Second) + assignment := blockvol.BlockVolumeAssignment{ + Path: entry.Path, + Epoch: entry.Epoch, + Role: blockvol.RoleToWire(blockvol.RolePrimary), + LeaseTtlMs: leaseTTLMs, + } + for _, ri := range entry.Replicas { + assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{ + DataAddr: ri.DataAddr, + CtrlAddr: ri.CtrlAddr, + ServerID: ri.Server, + }) + } + if len(entry.Replicas) == 1 { + assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr + assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr + assignment.ReplicaServerID = entry.Replicas[0].Server + } + ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, assignment) + glog.V(0).Infof("CP13-8A: enqueued Primary refresh for %q on %s with %d replica(s)", + entry.Name, entry.VolumeServer, len(entry.Replicas)) +} + // maintain the same split-brain protection as failoverBlockVolumes(). // This fixes B-06 (orphaned primary after replica re-register) // and partially B-08 (fast reconnect skips failover window). diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index cfc665134..63686694a 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -24,19 +24,20 @@ const ( // ReplicaInfo tracks one replica of a block volume (CP8-2). type ReplicaInfo struct { - Server string // replica VS address - Path string // file path on replica VS - ISCSIAddr string // iSCSI target address - IQN string // iSCSI qualified name - NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled - NQN string // NVMe subsystem NQN, empty if NVMe disabled - DataAddr string // WAL receiver data listen addr - CtrlAddr string // WAL receiver ctrl listen addr - HealthScore float64 // from heartbeat (0.0-1.0) - WALHeadLSN uint64 // from heartbeat - WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN + Server string // replica VS address + Path string // file path on replica VS + ISCSIAddr string // iSCSI target address + IQN string // iSCSI qualified name + NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled + NQN string // NVMe subsystem NQN, empty if NVMe disabled + DataAddr string // WAL receiver data listen addr + CtrlAddr string // WAL receiver ctrl listen addr + Ready bool // receiver/publish readiness confirmed by replica heartbeat + HealthScore float64 // from heartbeat (0.0-1.0) + WALHeadLSN uint64 // from heartbeat + WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN LastHeartbeat time.Time // last heartbeat received from this replica - Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.) + Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.) } const ( @@ -73,7 +74,9 @@ type BlockVolumeEntry struct { ReplicaFactor int // 2 or 3 (default 2) Replicas []ReplicaInfo // one per replica (RF-1 entries) HealthScore float64 // primary health score from heartbeat - ReplicaDegraded bool // primary reports degraded replicas + ReplicaReady bool // all configured replicas are ready for publication + ReplicaDegraded bool // aggregate: transport degraded OR not ready + TransportDegraded bool // primary reports degraded replicas WALHeadLSN uint64 // primary WAL head LSN from heartbeat // CP8-3-1: Durability mode. @@ -91,6 +94,11 @@ type BlockVolumeEntry struct { ExpandFailed bool // true = primary committed but replica(s) failed; size suppressed PendingExpandSize uint64 ExpandEpoch uint64 + + // CP13-8A: Set by reconcileOnRestart/upsertServerAsReplica when a replica + // is added after promote. The heartbeat handler uses this to enqueue an + // updated Primary assignment with the new replica's addresses. + NeedsPrimaryRefresh bool } // HasReplica returns true if this volume has any replica (checks both new and deprecated fields). @@ -98,6 +106,20 @@ func (e *BlockVolumeEntry) HasReplica() bool { return len(e.Replicas) > 0 || e.ReplicaServer != "" } +// AllReplicasReady returns true when every configured replica has reported +// publish readiness via heartbeat. Volumes without replicas are vacuously ready. +func (e *BlockVolumeEntry) AllReplicasReady() bool { + if len(e.Replicas) == 0 { + return true + } + for _, ri := range e.Replicas { + if !ri.Ready { + return false + } + } + return true +} + // FirstReplica returns the first replica info, or nil if none. func (e *BlockVolumeEntry) FirstReplica() *ReplicaInfo { if len(e.Replicas) > 0 { @@ -116,6 +138,15 @@ func (e *BlockVolumeEntry) ReplicaByServer(server string) *ReplicaInfo { return nil } +func (e *BlockVolumeEntry) recomputeReplicaState() { + e.ReplicaReady = e.AllReplicasReady() + if !e.HasReplica() { + e.ReplicaDegraded = e.TransportDegraded + return + } + e.ReplicaDegraded = e.TransportDegraded || !e.ReplicaReady +} + // BestReplicaForPromotion returns the best replica for promotion, or nil if none eligible. // Criteria: highest HealthScore, tie-break by highest WALHeadLSN, then first in list. func (e *BlockVolumeEntry) BestReplicaForPromotion() *ReplicaInfo { @@ -193,6 +224,7 @@ func (r *BlockVolumeRegistry) Register(entry *BlockVolumeEntry) error { if _, ok := r.volumes[entry.Name]; ok { return fmt.Errorf("block volume %q already registered", entry.Name) } + entry.recomputeReplicaState() r.volumes[entry.Name] = entry r.addToServer(entry.VolumeServer, entry.Name) // Also index replica servers so ListByServer finds them. @@ -366,8 +398,15 @@ type ReplicaAddrChange struct { NewCtrlAddr string } -func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) []ReplicaAddrChange { - var addrChanges []ReplicaAddrChange +// HeartbeatResult holds the side effects from UpdateFullHeartbeat that the +// caller (heartbeat handler) must process. +type HeartbeatResult struct { + AddrChanges []ReplicaAddrChange + PrimaryRefreshNeeded []BlockVolumeEntry // CP13-8A: entries needing primary assignment refresh +} + +func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) HeartbeatResult { + var result HeartbeatResult r.mu.Lock() defer r.mu.Unlock() @@ -471,7 +510,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master existing.Status = StatusActive existing.LastLeaseGrant = time.Now() existing.HealthScore = info.HealthScore - existing.ReplicaDegraded = info.ReplicaDegraded + existing.TransportDegraded = info.ReplicaDegraded existing.WALHeadLSN = info.WalHeadLsn // F3: only update DurabilityMode when non-empty (prevents older VS from clearing strict mode). if info.DurabilityMode != "" { @@ -493,6 +532,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master existing.Replicas[0].DataAddr = info.ReplicaDataAddr existing.Replicas[0].CtrlAddr = info.ReplicaCtrlAddr } + existing.recomputeReplicaState() } else if isReplica { // Replica heartbeat: update ReplicaInfo fields. for i := range existing.Replicas { @@ -507,6 +547,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master existing.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica) existing.Replicas[i].NvmeAddr = info.NvmeAddr existing.Replicas[i].NQN = info.Nqn + existing.Replicas[i].Ready = info.ReplicaDataAddr != "" && info.ReplicaCtrlAddr != "" if existing.WALHeadLSN > info.WalHeadLsn { existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn } else { @@ -521,7 +562,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master dataChanged := info.ReplicaDataAddr != "" && oldData != "" && oldData != info.ReplicaDataAddr ctrlChanged := info.ReplicaCtrlAddr != "" && oldCtrl != "" && oldCtrl != info.ReplicaCtrlAddr if dataChanged || ctrlChanged { - addrChanges = append(addrChanges, ReplicaAddrChange{ + result.AddrChanges = append(result.AddrChanges, ReplicaAddrChange{ VolumeName: existingName, PrimaryServer: existing.VolumeServer, OldDataAddr: oldData, @@ -540,6 +581,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master break } } + existing.recomputeReplicaState() } else { // Server reports a volume that exists but has no record of this server. // This happens after master restart. Use epoch-based reconciliation @@ -574,6 +616,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master LastLeaseGrant: time.Now(), LeaseTTL: 30 * time.Second, HealthScore: info.HealthScore, + TransportDegraded: info.ReplicaDegraded, WALHeadLSN: info.WalHeadLsn, DurabilityMode: info.DurabilityMode, } @@ -585,6 +628,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master } entry.NvmeAddr = info.NvmeAddr entry.NQN = info.Nqn + entry.recomputeReplicaState() r.volumes[name] = entry r.addToServer(server, name) glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)", @@ -595,7 +639,14 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master } } } - return addrChanges + // CP13-8A: collect entries that need primary refresh and clear the flag. + for _, e := range r.volumes { + if e.NeedsPrimaryRefresh { + e.NeedsPrimaryRefresh = false + result.PrimaryRefreshNeeded = append(result.PrimaryRefreshNeeded, e.clone()) + } + } + return result } // reconcileOnRestart handles the case where a second server reports a volume @@ -744,9 +795,13 @@ func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *Block } } // New replica — append with forced RoleReplica. + // CP13-8A: populate DataAddr/CtrlAddr from heartbeat so the master can + // enqueue a Primary assignment with replica addresses for the shipper. ri := ReplicaInfo{ Server: newServer, Path: info.Path, + DataAddr: info.ReplicaDataAddr, + CtrlAddr: info.ReplicaCtrlAddr, HealthScore: info.HealthScore, WALHeadLSN: info.WalHeadLsn, LastHeartbeat: time.Now(), @@ -760,6 +815,25 @@ func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *Block existing.ReplicaServer = ri.Server existing.ReplicaPath = ri.Path } + // CP13-8A: mark that a primary refresh is needed so the caller + // can enqueue an updated Primary assignment with replica addresses. + existing.NeedsPrimaryRefresh = true +} + +// DrainPrimaryRefreshNeeded returns entries that need a primary assignment +// refresh (e.g., after a replica re-registers post-promote) and clears the flag. +// Caller must hold no lock (this acquires the write lock). +func (r *BlockVolumeRegistry) DrainPrimaryRefreshNeeded() []BlockVolumeEntry { + r.mu.Lock() + defer r.mu.Unlock() + var result []BlockVolumeEntry + for _, e := range r.volumes { + if e.NeedsPrimaryRefresh { + e.NeedsPrimaryRefresh = false + result = append(result, e.clone()) + } + } + return result } // UpdateDeltaHeartbeat processes incremental new/deleted block volumes. @@ -888,6 +962,7 @@ func (r *BlockVolumeRegistry) removeReplicaLocked(entry *BlockVolumeEntry, serve entry.ReplicaDataAddr = "" entry.ReplicaCtrlAddr = "" } + entry.recomputeReplicaState() } // SetReplica sets replica info for a registered volume. @@ -919,6 +994,7 @@ func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn stri info.WALHeadLSN = entry.Replicas[i].WALHeadLSN info.DataAddr = entry.Replicas[i].DataAddr info.CtrlAddr = entry.Replicas[i].CtrlAddr + info.Ready = entry.Replicas[i].Ready entry.Replicas[i] = info replaced = true break @@ -927,6 +1003,7 @@ func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn stri if !replaced { entry.Replicas = append(entry.Replicas, info) } + entry.recomputeReplicaState() return nil } @@ -955,6 +1032,7 @@ func (r *BlockVolumeRegistry) ClearReplica(name string) error { entry.ReplicaDataAddr = "" entry.ReplicaCtrlAddr = "" entry.Replicas = nil + entry.recomputeReplicaState() return nil } @@ -1000,9 +1078,13 @@ func (r *BlockVolumeRegistry) SwapPrimaryReplica(name string) (uint64, error) { entry.ReplicaISCSIAddr = oldPrimaryISCSI entry.ReplicaDataAddr = "" entry.ReplicaCtrlAddr = "" + entry.ReplicaReady = false + entry.ReplicaDegraded = true + entry.TransportDegraded = false // Update byServer index: new primary server now hosts this volume. r.addToServer(entry.VolumeServer, name) + entry.recomputeReplicaState() return newEpoch, nil } @@ -1039,6 +1121,7 @@ func (r *BlockVolumeRegistry) AddReplica(name string, info ReplicaInfo) error { entry.ReplicaDataAddr = r0.DataAddr entry.ReplicaCtrlAddr = r0.CtrlAddr } + entry.recomputeReplicaState() return nil } @@ -1082,6 +1165,7 @@ func (r *BlockVolumeRegistry) RemoveReplica(name, server string) error { entry.ReplicaDataAddr = "" entry.ReplicaCtrlAddr = "" } + entry.recomputeReplicaState() return nil } diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 8f88aa858..c84024446 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -277,12 +277,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // (BlockVolumeInfos on first heartbeat) or deltas (NewBlockVolumes/DeletedBlockVolumes // on subsequent heartbeats), never both in the same message. if len(heartbeat.BlockVolumeInfos) > 0 || heartbeat.HasNoBlockVolumes { - addrChanges := ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr) + hbResult := ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr) // CP13-8: If a replica's receiver address changed (e.g., restart with port conflict), // immediately refresh the primary's assignment with the new addresses. - for _, ac := range addrChanges { + for _, ac := range hbResult.AddrChanges { ms.refreshPrimaryForAddrChange(ac) } + // CP13-8A: After heartbeat reconciliation, send primary refreshes for + // entries where a replica re-registered post-promote. + for _, entry := range hbResult.PrimaryRefreshNeeded { + ms.enqueuePrimaryRefresh(entry) + } // T2 (B-06): After updating registry from heartbeat, check if this server // is a replica for any volume whose primary is dead. If so, promote. ms.reevaluateOrphanedPrimaries(dn.Url()) diff --git a/weed/server/qa_promote_replication_test.go b/weed/server/qa_promote_replication_test.go new file mode 100644 index 000000000..fcb044a69 --- /dev/null +++ b/weed/server/qa_promote_replication_test.go @@ -0,0 +1,164 @@ +// Tests for post-promote replication continuity. +// +// Bug: block_promote removes the promoted replica from entry.Replicas. +// The new primary's assignment has zero ReplicaAddrs → no shipper → +// replication dead. sync_all barrier passes vacuously with 0 shippers. +// +// This test suite verifies: +// 1. After promote + old primary re-register, the new primary's assignment +// includes the re-registered replica's addresses +// 2. sync_all with 0 shippers and RF>1 is detected as a gap +package weed_server + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + blockvol "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// TestPromote_AssignmentHasReplicaAddrs verifies that after promote + old +// primary re-register, the assignment queue contains an updated assignment +// for the new primary WITH replica addresses. +func TestPromote_AssignmentHasReplicaAddrs(t *testing.T) { + ms := testMasterServerForFailover(t) + + pathA := "/data/vs1/vol1.blk" + pathB := "/data/vs2/vol1.blk" + + ms.blockRegistry.MarkBlockCapable("vs1") + ms.blockRegistry.MarkBlockCapable("vs2") + + ms.blockRegistry.Register(&BlockVolumeEntry{ + Name: "vol1", + VolumeServer: "vs1", + Path: pathA, + ISCSIAddr: "vs1:3260", + SizeBytes: 1 << 30, + Epoch: 1, + Role: blockvol.RoleToWire(blockvol.RolePrimary), + Status: StatusActive, + LeaseTTL: 30 * time.Second, + LastLeaseGrant: time.Now().Add(-1 * time.Minute), + ReplicaServer: "vs2", + ReplicaPath: pathB, + Replicas: []ReplicaInfo{ + { + Server: "vs2", + Path: pathB, + ISCSIAddr: "vs2:3260", + HealthScore: 1.0, + Role: blockvol.RoleToWire(blockvol.RoleReplica), + LastHeartbeat: time.Now(), + DataAddr: "vs2:14260", + CtrlAddr: "vs2:14261", + }, + }, + }) + + // Step 1: Promote vs2 to primary (vs1 "disconnects"). + ms.failoverBlockVolumes("vs1") + + entry := lookupEntryT(t, ms.blockRegistry, "vol1") + if entry.VolumeServer != "vs2" { + t.Fatalf("promote: primary=%s, want vs2", entry.VolumeServer) + } + if len(entry.Replicas) != 0 { + t.Fatalf("after promote: replicas=%d, want 0 (old primary removed)", len(entry.Replicas)) + } + t.Logf("after promote: primary=vs2 epoch=%d replicas=%d", entry.Epoch, len(entry.Replicas)) + + // Drain the assignment queue (promotion enqueued an assignment for vs2). + promotionAssignments := ms.blockAssignmentQueue.Peek("vs2") + // Confirm all to drain. + for _, a := range promotionAssignments { + ms.blockAssignmentQueue.Confirm("vs2", a.Path, a.Epoch) + } + t.Logf("promotion assignment: %d items (drained)", len(promotionAssignments)) + if len(promotionAssignments) > 0 { + a := promotionAssignments[0] + t.Logf(" path=%s role=%d replicaAddrs=%d replicaDataAddr=%s", + a.Path, a.Role, len(a.ReplicaAddrs), a.ReplicaDataAddr) + if len(a.ReplicaAddrs) > 0 || a.ReplicaDataAddr != "" { + t.Log(" WARNING: promotion assignment already has replica addrs (unexpected)") + } + } + + // Step 2: vs1 reconnects via heartbeat (lower epoch → added as replica). + ms.blockRegistry.MarkBlockCapable("vs1") + hbResult := ms.blockRegistry.UpdateFullHeartbeat("vs1", []*master_pb.BlockVolumeInfoMessage{ + { + Path: pathA, + VolumeSize: 1 << 30, + Epoch: 1, + Role: blockvol.RoleToWire(blockvol.RolePrimary), // stale + ReplicaDataAddr: "vs1:14260", + ReplicaCtrlAddr: "vs1:14261", + }, + }, "") + + // CP13-8A: process primary refresh (simulating what the heartbeat handler does). + for _, refreshEntry := range hbResult.PrimaryRefreshNeeded { + ms.enqueuePrimaryRefresh(refreshEntry) + } + + entry = lookupEntryT(t, ms.blockRegistry, "vol1") + t.Logf("after vs1 re-register: primary=%s replicas=%d", entry.VolumeServer, len(entry.Replicas)) + + if len(entry.Replicas) == 0 { + t.Fatal("vs1 should be in replicas after re-register") + } + + // Step 3: Check if the assignment queue has an UPDATED assignment for vs2 + // with the new replica's addresses. + updatedAssignments := ms.blockAssignmentQueue.Peek("vs2") + t.Logf("post-re-register assignments for vs2: %d items", len(updatedAssignments)) + + foundReplicaAddrs := false + for _, a := range updatedAssignments { + if a.Path == entry.Path { + t.Logf(" assignment: path=%s role=%d replicaDataAddr=%s replicaAddrs=%d", + a.Path, a.Role, a.ReplicaDataAddr, len(a.ReplicaAddrs)) + if a.ReplicaDataAddr != "" || len(a.ReplicaAddrs) > 0 { + foundReplicaAddrs = true + } + } + } + + if !foundReplicaAddrs { + t.Fatalf("BUG: after promote + re-register, new primary vs2 has NO assignment "+ + "with replica addresses.\n"+ + "This means the shipper will never be configured and replication is dead.\n"+ + "The master must send an updated assignment to vs2 after vs1 re-registers as replica.") + } + t.Log("post-promote assignment has replica addresses — shipper will be configured") +} + +// TestPromote_ReplicasEmptyAfterPromote documents the current behavior: +// after PromoteBestReplica, entry.Replicas is empty. +func TestPromote_ReplicasEmptyAfterPromote(t *testing.T) { + ms := testMasterServerForFailover(t) + registerVolumeWithReplica(t, ms, "vol1", "vs1", "vs2", 1, 30*time.Second) + + ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) { + e.LastLeaseGrant = time.Now().Add(-1 * time.Minute) + }) + ms.failoverBlockVolumes("vs1") + + entry := lookupEntryT(t, ms.blockRegistry, "vol1") + if entry.VolumeServer != "vs2" { + t.Fatalf("primary=%s, want vs2", entry.VolumeServer) + } + // Document: Replicas is empty after promote. This is by design + // (old primary "needs rebuild"), but it means the new primary + // has no shipper target until a replica re-registers. + t.Logf("after promote: replicas=%d (expected 0 — old primary removed)", len(entry.Replicas)) + + // Check the assignment queue for vs2. + assignments := ms.blockAssignmentQueue.Peek("vs2") + for _, a := range assignments { + t.Logf("assignment for vs2: path=%s epoch=%d role=%d replicaDataAddr=%q replicaAddrs=%d", + a.Path, a.Epoch, a.Role, a.ReplicaDataAddr, len(a.ReplicaAddrs)) + } +}