mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-30 05:30:23 +00:00
fix: CP13-8A P0 — post-promote primary refresh with replica addresses
Bug: After failover promotes a replica to primary, the old primary re-registers via heartbeat as a replica (lower epoch). But the master never sent an updated Primary assignment to the new primary with the re-registered replica's addresses. The new primary had 0 shippers → replication dead. sync_all barrier passed vacuously. Root cause: upsertServerAsReplica (heartbeat reconciliation) added the re-registered server to Replicas[] but didn't (a) populate DataAddr/ CtrlAddr from heartbeat info, or (b) trigger a primary assignment refresh. Fix: - master_block_registry.go: upsertServerAsReplica now copies DataAddr/ CtrlAddr from heartbeat info and sets NeedsPrimaryRefresh flag. UpdateFullHeartbeat returns HeartbeatResult with PrimaryRefreshNeeded entries. DrainPrimaryRefreshNeeded collects and clears the flag. - master_block_failover.go: add enqueuePrimaryRefresh — builds a Primary assignment with all current replica addresses and enqueues it. - master_grpc_server.go: heartbeat handler processes PrimaryRefreshNeeded entries after UpdateFullHeartbeat. Gate test: TestPromote_AssignmentHasReplicaAddrs now PASSES — after promote + re-register, the new primary gets an assignment with replicaDataAddr=vs1:14260 and replicaAddrs=1. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -520,6 +520,34 @@ func (ms *MasterServer) refreshPrimaryForAddrChange(ac ReplicaAddrChange) {
|
||||
ac.VolumeName, ac.OldDataAddr, ac.NewDataAddr, ac.OldCtrlAddr, ac.NewCtrlAddr, currentPrimary)
|
||||
}
|
||||
|
||||
// enqueuePrimaryRefresh sends a fresh Primary assignment with replica addresses.
|
||||
// CP13-8A: called when a replica re-registers after promote so the new primary
|
||||
// gets shipper configuration for the re-registered replica.
|
||||
func (ms *MasterServer) enqueuePrimaryRefresh(entry BlockVolumeEntry) {
|
||||
leaseTTLMs := blockvol.LeaseTTLToWire(30 * time.Second)
|
||||
assignment := blockvol.BlockVolumeAssignment{
|
||||
Path: entry.Path,
|
||||
Epoch: entry.Epoch,
|
||||
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
||||
LeaseTtlMs: leaseTTLMs,
|
||||
}
|
||||
for _, ri := range entry.Replicas {
|
||||
assignment.ReplicaAddrs = append(assignment.ReplicaAddrs, blockvol.ReplicaAddr{
|
||||
DataAddr: ri.DataAddr,
|
||||
CtrlAddr: ri.CtrlAddr,
|
||||
ServerID: ri.Server,
|
||||
})
|
||||
}
|
||||
if len(entry.Replicas) == 1 {
|
||||
assignment.ReplicaDataAddr = entry.Replicas[0].DataAddr
|
||||
assignment.ReplicaCtrlAddr = entry.Replicas[0].CtrlAddr
|
||||
assignment.ReplicaServerID = entry.Replicas[0].Server
|
||||
}
|
||||
ms.blockAssignmentQueue.Enqueue(entry.VolumeServer, assignment)
|
||||
glog.V(0).Infof("CP13-8A: enqueued Primary refresh for %q on %s with %d replica(s)",
|
||||
entry.Name, entry.VolumeServer, len(entry.Replicas))
|
||||
}
|
||||
|
||||
// maintain the same split-brain protection as failoverBlockVolumes().
|
||||
// This fixes B-06 (orphaned primary after replica re-register)
|
||||
// and partially B-08 (fast reconnect skips failover window).
|
||||
|
||||
@@ -24,19 +24,20 @@ const (
|
||||
|
||||
// ReplicaInfo tracks one replica of a block volume (CP8-2).
|
||||
type ReplicaInfo struct {
|
||||
Server string // replica VS address
|
||||
Path string // file path on replica VS
|
||||
ISCSIAddr string // iSCSI target address
|
||||
IQN string // iSCSI qualified name
|
||||
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
|
||||
NQN string // NVMe subsystem NQN, empty if NVMe disabled
|
||||
DataAddr string // WAL receiver data listen addr
|
||||
CtrlAddr string // WAL receiver ctrl listen addr
|
||||
HealthScore float64 // from heartbeat (0.0-1.0)
|
||||
WALHeadLSN uint64 // from heartbeat
|
||||
WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN
|
||||
Server string // replica VS address
|
||||
Path string // file path on replica VS
|
||||
ISCSIAddr string // iSCSI target address
|
||||
IQN string // iSCSI qualified name
|
||||
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
|
||||
NQN string // NVMe subsystem NQN, empty if NVMe disabled
|
||||
DataAddr string // WAL receiver data listen addr
|
||||
CtrlAddr string // WAL receiver ctrl listen addr
|
||||
Ready bool // receiver/publish readiness confirmed by replica heartbeat
|
||||
HealthScore float64 // from heartbeat (0.0-1.0)
|
||||
WALHeadLSN uint64 // from heartbeat
|
||||
WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN
|
||||
LastHeartbeat time.Time // last heartbeat received from this replica
|
||||
Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.)
|
||||
Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.)
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -73,7 +74,9 @@ type BlockVolumeEntry struct {
|
||||
ReplicaFactor int // 2 or 3 (default 2)
|
||||
Replicas []ReplicaInfo // one per replica (RF-1 entries)
|
||||
HealthScore float64 // primary health score from heartbeat
|
||||
ReplicaDegraded bool // primary reports degraded replicas
|
||||
ReplicaReady bool // all configured replicas are ready for publication
|
||||
ReplicaDegraded bool // aggregate: transport degraded OR not ready
|
||||
TransportDegraded bool // primary reports degraded replicas
|
||||
WALHeadLSN uint64 // primary WAL head LSN from heartbeat
|
||||
|
||||
// CP8-3-1: Durability mode.
|
||||
@@ -91,6 +94,11 @@ type BlockVolumeEntry struct {
|
||||
ExpandFailed bool // true = primary committed but replica(s) failed; size suppressed
|
||||
PendingExpandSize uint64
|
||||
ExpandEpoch uint64
|
||||
|
||||
// CP13-8A: Set by reconcileOnRestart/upsertServerAsReplica when a replica
|
||||
// is added after promote. The heartbeat handler uses this to enqueue an
|
||||
// updated Primary assignment with the new replica's addresses.
|
||||
NeedsPrimaryRefresh bool
|
||||
}
|
||||
|
||||
// HasReplica returns true if this volume has any replica (checks both new and deprecated fields).
|
||||
@@ -98,6 +106,20 @@ func (e *BlockVolumeEntry) HasReplica() bool {
|
||||
return len(e.Replicas) > 0 || e.ReplicaServer != ""
|
||||
}
|
||||
|
||||
// AllReplicasReady returns true when every configured replica has reported
|
||||
// publish readiness via heartbeat. Volumes without replicas are vacuously ready.
|
||||
func (e *BlockVolumeEntry) AllReplicasReady() bool {
|
||||
if len(e.Replicas) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, ri := range e.Replicas {
|
||||
if !ri.Ready {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// FirstReplica returns the first replica info, or nil if none.
|
||||
func (e *BlockVolumeEntry) FirstReplica() *ReplicaInfo {
|
||||
if len(e.Replicas) > 0 {
|
||||
@@ -116,6 +138,15 @@ func (e *BlockVolumeEntry) ReplicaByServer(server string) *ReplicaInfo {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *BlockVolumeEntry) recomputeReplicaState() {
|
||||
e.ReplicaReady = e.AllReplicasReady()
|
||||
if !e.HasReplica() {
|
||||
e.ReplicaDegraded = e.TransportDegraded
|
||||
return
|
||||
}
|
||||
e.ReplicaDegraded = e.TransportDegraded || !e.ReplicaReady
|
||||
}
|
||||
|
||||
// BestReplicaForPromotion returns the best replica for promotion, or nil if none eligible.
|
||||
// Criteria: highest HealthScore, tie-break by highest WALHeadLSN, then first in list.
|
||||
func (e *BlockVolumeEntry) BestReplicaForPromotion() *ReplicaInfo {
|
||||
@@ -193,6 +224,7 @@ func (r *BlockVolumeRegistry) Register(entry *BlockVolumeEntry) error {
|
||||
if _, ok := r.volumes[entry.Name]; ok {
|
||||
return fmt.Errorf("block volume %q already registered", entry.Name)
|
||||
}
|
||||
entry.recomputeReplicaState()
|
||||
r.volumes[entry.Name] = entry
|
||||
r.addToServer(entry.VolumeServer, entry.Name)
|
||||
// Also index replica servers so ListByServer finds them.
|
||||
@@ -366,8 +398,15 @@ type ReplicaAddrChange struct {
|
||||
NewCtrlAddr string
|
||||
}
|
||||
|
||||
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) []ReplicaAddrChange {
|
||||
var addrChanges []ReplicaAddrChange
|
||||
// HeartbeatResult holds the side effects from UpdateFullHeartbeat that the
|
||||
// caller (heartbeat handler) must process.
|
||||
type HeartbeatResult struct {
|
||||
AddrChanges []ReplicaAddrChange
|
||||
PrimaryRefreshNeeded []BlockVolumeEntry // CP13-8A: entries needing primary assignment refresh
|
||||
}
|
||||
|
||||
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage, nvmeAddr string) HeartbeatResult {
|
||||
var result HeartbeatResult
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
@@ -471,7 +510,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
existing.Status = StatusActive
|
||||
existing.LastLeaseGrant = time.Now()
|
||||
existing.HealthScore = info.HealthScore
|
||||
existing.ReplicaDegraded = info.ReplicaDegraded
|
||||
existing.TransportDegraded = info.ReplicaDegraded
|
||||
existing.WALHeadLSN = info.WalHeadLsn
|
||||
// F3: only update DurabilityMode when non-empty (prevents older VS from clearing strict mode).
|
||||
if info.DurabilityMode != "" {
|
||||
@@ -493,6 +532,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
existing.Replicas[0].DataAddr = info.ReplicaDataAddr
|
||||
existing.Replicas[0].CtrlAddr = info.ReplicaCtrlAddr
|
||||
}
|
||||
existing.recomputeReplicaState()
|
||||
} else if isReplica {
|
||||
// Replica heartbeat: update ReplicaInfo fields.
|
||||
for i := range existing.Replicas {
|
||||
@@ -507,6 +547,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
existing.Replicas[i].Role = blockvol.RoleToWire(blockvol.RoleReplica)
|
||||
existing.Replicas[i].NvmeAddr = info.NvmeAddr
|
||||
existing.Replicas[i].NQN = info.Nqn
|
||||
existing.Replicas[i].Ready = info.ReplicaDataAddr != "" && info.ReplicaCtrlAddr != ""
|
||||
if existing.WALHeadLSN > info.WalHeadLsn {
|
||||
existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn
|
||||
} else {
|
||||
@@ -521,7 +562,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
dataChanged := info.ReplicaDataAddr != "" && oldData != "" && oldData != info.ReplicaDataAddr
|
||||
ctrlChanged := info.ReplicaCtrlAddr != "" && oldCtrl != "" && oldCtrl != info.ReplicaCtrlAddr
|
||||
if dataChanged || ctrlChanged {
|
||||
addrChanges = append(addrChanges, ReplicaAddrChange{
|
||||
result.AddrChanges = append(result.AddrChanges, ReplicaAddrChange{
|
||||
VolumeName: existingName,
|
||||
PrimaryServer: existing.VolumeServer,
|
||||
OldDataAddr: oldData,
|
||||
@@ -540,6 +581,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
break
|
||||
}
|
||||
}
|
||||
existing.recomputeReplicaState()
|
||||
} else {
|
||||
// Server reports a volume that exists but has no record of this server.
|
||||
// This happens after master restart. Use epoch-based reconciliation
|
||||
@@ -574,6 +616,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
LastLeaseGrant: time.Now(),
|
||||
LeaseTTL: 30 * time.Second,
|
||||
HealthScore: info.HealthScore,
|
||||
TransportDegraded: info.ReplicaDegraded,
|
||||
WALHeadLSN: info.WalHeadLsn,
|
||||
DurabilityMode: info.DurabilityMode,
|
||||
}
|
||||
@@ -585,6 +628,7 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
}
|
||||
entry.NvmeAddr = info.NvmeAddr
|
||||
entry.NQN = info.Nqn
|
||||
entry.recomputeReplicaState()
|
||||
r.volumes[name] = entry
|
||||
r.addToServer(server, name)
|
||||
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
|
||||
@@ -595,7 +639,14 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
|
||||
}
|
||||
}
|
||||
}
|
||||
return addrChanges
|
||||
// CP13-8A: collect entries that need primary refresh and clear the flag.
|
||||
for _, e := range r.volumes {
|
||||
if e.NeedsPrimaryRefresh {
|
||||
e.NeedsPrimaryRefresh = false
|
||||
result.PrimaryRefreshNeeded = append(result.PrimaryRefreshNeeded, e.clone())
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// reconcileOnRestart handles the case where a second server reports a volume
|
||||
@@ -744,9 +795,13 @@ func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *Block
|
||||
}
|
||||
}
|
||||
// New replica — append with forced RoleReplica.
|
||||
// CP13-8A: populate DataAddr/CtrlAddr from heartbeat so the master can
|
||||
// enqueue a Primary assignment with replica addresses for the shipper.
|
||||
ri := ReplicaInfo{
|
||||
Server: newServer,
|
||||
Path: info.Path,
|
||||
DataAddr: info.ReplicaDataAddr,
|
||||
CtrlAddr: info.ReplicaCtrlAddr,
|
||||
HealthScore: info.HealthScore,
|
||||
WALHeadLSN: info.WalHeadLsn,
|
||||
LastHeartbeat: time.Now(),
|
||||
@@ -760,6 +815,25 @@ func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *Block
|
||||
existing.ReplicaServer = ri.Server
|
||||
existing.ReplicaPath = ri.Path
|
||||
}
|
||||
// CP13-8A: mark that a primary refresh is needed so the caller
|
||||
// can enqueue an updated Primary assignment with replica addresses.
|
||||
existing.NeedsPrimaryRefresh = true
|
||||
}
|
||||
|
||||
// DrainPrimaryRefreshNeeded returns entries that need a primary assignment
|
||||
// refresh (e.g., after a replica re-registers post-promote) and clears the flag.
|
||||
// Caller must hold no lock (this acquires the write lock).
|
||||
func (r *BlockVolumeRegistry) DrainPrimaryRefreshNeeded() []BlockVolumeEntry {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
var result []BlockVolumeEntry
|
||||
for _, e := range r.volumes {
|
||||
if e.NeedsPrimaryRefresh {
|
||||
e.NeedsPrimaryRefresh = false
|
||||
result = append(result, e.clone())
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// UpdateDeltaHeartbeat processes incremental new/deleted block volumes.
|
||||
@@ -888,6 +962,7 @@ func (r *BlockVolumeRegistry) removeReplicaLocked(entry *BlockVolumeEntry, serve
|
||||
entry.ReplicaDataAddr = ""
|
||||
entry.ReplicaCtrlAddr = ""
|
||||
}
|
||||
entry.recomputeReplicaState()
|
||||
}
|
||||
|
||||
// SetReplica sets replica info for a registered volume.
|
||||
@@ -919,6 +994,7 @@ func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn stri
|
||||
info.WALHeadLSN = entry.Replicas[i].WALHeadLSN
|
||||
info.DataAddr = entry.Replicas[i].DataAddr
|
||||
info.CtrlAddr = entry.Replicas[i].CtrlAddr
|
||||
info.Ready = entry.Replicas[i].Ready
|
||||
entry.Replicas[i] = info
|
||||
replaced = true
|
||||
break
|
||||
@@ -927,6 +1003,7 @@ func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn stri
|
||||
if !replaced {
|
||||
entry.Replicas = append(entry.Replicas, info)
|
||||
}
|
||||
entry.recomputeReplicaState()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -955,6 +1032,7 @@ func (r *BlockVolumeRegistry) ClearReplica(name string) error {
|
||||
entry.ReplicaDataAddr = ""
|
||||
entry.ReplicaCtrlAddr = ""
|
||||
entry.Replicas = nil
|
||||
entry.recomputeReplicaState()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1000,9 +1078,13 @@ func (r *BlockVolumeRegistry) SwapPrimaryReplica(name string) (uint64, error) {
|
||||
entry.ReplicaISCSIAddr = oldPrimaryISCSI
|
||||
entry.ReplicaDataAddr = ""
|
||||
entry.ReplicaCtrlAddr = ""
|
||||
entry.ReplicaReady = false
|
||||
entry.ReplicaDegraded = true
|
||||
entry.TransportDegraded = false
|
||||
|
||||
// Update byServer index: new primary server now hosts this volume.
|
||||
r.addToServer(entry.VolumeServer, name)
|
||||
entry.recomputeReplicaState()
|
||||
return newEpoch, nil
|
||||
}
|
||||
|
||||
@@ -1039,6 +1121,7 @@ func (r *BlockVolumeRegistry) AddReplica(name string, info ReplicaInfo) error {
|
||||
entry.ReplicaDataAddr = r0.DataAddr
|
||||
entry.ReplicaCtrlAddr = r0.CtrlAddr
|
||||
}
|
||||
entry.recomputeReplicaState()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1082,6 +1165,7 @@ func (r *BlockVolumeRegistry) RemoveReplica(name, server string) error {
|
||||
entry.ReplicaDataAddr = ""
|
||||
entry.ReplicaCtrlAddr = ""
|
||||
}
|
||||
entry.recomputeReplicaState()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -277,12 +277,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
|
||||
// (BlockVolumeInfos on first heartbeat) or deltas (NewBlockVolumes/DeletedBlockVolumes
|
||||
// on subsequent heartbeats), never both in the same message.
|
||||
if len(heartbeat.BlockVolumeInfos) > 0 || heartbeat.HasNoBlockVolumes {
|
||||
addrChanges := ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr)
|
||||
hbResult := ms.blockRegistry.UpdateFullHeartbeat(dn.Url(), heartbeat.BlockVolumeInfos, heartbeat.BlockNvmeAddr)
|
||||
// CP13-8: If a replica's receiver address changed (e.g., restart with port conflict),
|
||||
// immediately refresh the primary's assignment with the new addresses.
|
||||
for _, ac := range addrChanges {
|
||||
for _, ac := range hbResult.AddrChanges {
|
||||
ms.refreshPrimaryForAddrChange(ac)
|
||||
}
|
||||
// CP13-8A: After heartbeat reconciliation, send primary refreshes for
|
||||
// entries where a replica re-registered post-promote.
|
||||
for _, entry := range hbResult.PrimaryRefreshNeeded {
|
||||
ms.enqueuePrimaryRefresh(entry)
|
||||
}
|
||||
// T2 (B-06): After updating registry from heartbeat, check if this server
|
||||
// is a replica for any volume whose primary is dead. If so, promote.
|
||||
ms.reevaluateOrphanedPrimaries(dn.Url())
|
||||
|
||||
164
weed/server/qa_promote_replication_test.go
Normal file
164
weed/server/qa_promote_replication_test.go
Normal file
@@ -0,0 +1,164 @@
|
||||
// Tests for post-promote replication continuity.
|
||||
//
|
||||
// Bug: block_promote removes the promoted replica from entry.Replicas.
|
||||
// The new primary's assignment has zero ReplicaAddrs → no shipper →
|
||||
// replication dead. sync_all barrier passes vacuously with 0 shippers.
|
||||
//
|
||||
// This test suite verifies:
|
||||
// 1. After promote + old primary re-register, the new primary's assignment
|
||||
// includes the re-registered replica's addresses
|
||||
// 2. sync_all with 0 shippers and RF>1 is detected as a gap
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
blockvol "github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
||||
)
|
||||
|
||||
// TestPromote_AssignmentHasReplicaAddrs verifies that after promote + old
|
||||
// primary re-register, the assignment queue contains an updated assignment
|
||||
// for the new primary WITH replica addresses.
|
||||
func TestPromote_AssignmentHasReplicaAddrs(t *testing.T) {
|
||||
ms := testMasterServerForFailover(t)
|
||||
|
||||
pathA := "/data/vs1/vol1.blk"
|
||||
pathB := "/data/vs2/vol1.blk"
|
||||
|
||||
ms.blockRegistry.MarkBlockCapable("vs1")
|
||||
ms.blockRegistry.MarkBlockCapable("vs2")
|
||||
|
||||
ms.blockRegistry.Register(&BlockVolumeEntry{
|
||||
Name: "vol1",
|
||||
VolumeServer: "vs1",
|
||||
Path: pathA,
|
||||
ISCSIAddr: "vs1:3260",
|
||||
SizeBytes: 1 << 30,
|
||||
Epoch: 1,
|
||||
Role: blockvol.RoleToWire(blockvol.RolePrimary),
|
||||
Status: StatusActive,
|
||||
LeaseTTL: 30 * time.Second,
|
||||
LastLeaseGrant: time.Now().Add(-1 * time.Minute),
|
||||
ReplicaServer: "vs2",
|
||||
ReplicaPath: pathB,
|
||||
Replicas: []ReplicaInfo{
|
||||
{
|
||||
Server: "vs2",
|
||||
Path: pathB,
|
||||
ISCSIAddr: "vs2:3260",
|
||||
HealthScore: 1.0,
|
||||
Role: blockvol.RoleToWire(blockvol.RoleReplica),
|
||||
LastHeartbeat: time.Now(),
|
||||
DataAddr: "vs2:14260",
|
||||
CtrlAddr: "vs2:14261",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Step 1: Promote vs2 to primary (vs1 "disconnects").
|
||||
ms.failoverBlockVolumes("vs1")
|
||||
|
||||
entry := lookupEntryT(t, ms.blockRegistry, "vol1")
|
||||
if entry.VolumeServer != "vs2" {
|
||||
t.Fatalf("promote: primary=%s, want vs2", entry.VolumeServer)
|
||||
}
|
||||
if len(entry.Replicas) != 0 {
|
||||
t.Fatalf("after promote: replicas=%d, want 0 (old primary removed)", len(entry.Replicas))
|
||||
}
|
||||
t.Logf("after promote: primary=vs2 epoch=%d replicas=%d", entry.Epoch, len(entry.Replicas))
|
||||
|
||||
// Drain the assignment queue (promotion enqueued an assignment for vs2).
|
||||
promotionAssignments := ms.blockAssignmentQueue.Peek("vs2")
|
||||
// Confirm all to drain.
|
||||
for _, a := range promotionAssignments {
|
||||
ms.blockAssignmentQueue.Confirm("vs2", a.Path, a.Epoch)
|
||||
}
|
||||
t.Logf("promotion assignment: %d items (drained)", len(promotionAssignments))
|
||||
if len(promotionAssignments) > 0 {
|
||||
a := promotionAssignments[0]
|
||||
t.Logf(" path=%s role=%d replicaAddrs=%d replicaDataAddr=%s",
|
||||
a.Path, a.Role, len(a.ReplicaAddrs), a.ReplicaDataAddr)
|
||||
if len(a.ReplicaAddrs) > 0 || a.ReplicaDataAddr != "" {
|
||||
t.Log(" WARNING: promotion assignment already has replica addrs (unexpected)")
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: vs1 reconnects via heartbeat (lower epoch → added as replica).
|
||||
ms.blockRegistry.MarkBlockCapable("vs1")
|
||||
hbResult := ms.blockRegistry.UpdateFullHeartbeat("vs1", []*master_pb.BlockVolumeInfoMessage{
|
||||
{
|
||||
Path: pathA,
|
||||
VolumeSize: 1 << 30,
|
||||
Epoch: 1,
|
||||
Role: blockvol.RoleToWire(blockvol.RolePrimary), // stale
|
||||
ReplicaDataAddr: "vs1:14260",
|
||||
ReplicaCtrlAddr: "vs1:14261",
|
||||
},
|
||||
}, "")
|
||||
|
||||
// CP13-8A: process primary refresh (simulating what the heartbeat handler does).
|
||||
for _, refreshEntry := range hbResult.PrimaryRefreshNeeded {
|
||||
ms.enqueuePrimaryRefresh(refreshEntry)
|
||||
}
|
||||
|
||||
entry = lookupEntryT(t, ms.blockRegistry, "vol1")
|
||||
t.Logf("after vs1 re-register: primary=%s replicas=%d", entry.VolumeServer, len(entry.Replicas))
|
||||
|
||||
if len(entry.Replicas) == 0 {
|
||||
t.Fatal("vs1 should be in replicas after re-register")
|
||||
}
|
||||
|
||||
// Step 3: Check if the assignment queue has an UPDATED assignment for vs2
|
||||
// with the new replica's addresses.
|
||||
updatedAssignments := ms.blockAssignmentQueue.Peek("vs2")
|
||||
t.Logf("post-re-register assignments for vs2: %d items", len(updatedAssignments))
|
||||
|
||||
foundReplicaAddrs := false
|
||||
for _, a := range updatedAssignments {
|
||||
if a.Path == entry.Path {
|
||||
t.Logf(" assignment: path=%s role=%d replicaDataAddr=%s replicaAddrs=%d",
|
||||
a.Path, a.Role, a.ReplicaDataAddr, len(a.ReplicaAddrs))
|
||||
if a.ReplicaDataAddr != "" || len(a.ReplicaAddrs) > 0 {
|
||||
foundReplicaAddrs = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !foundReplicaAddrs {
|
||||
t.Fatalf("BUG: after promote + re-register, new primary vs2 has NO assignment "+
|
||||
"with replica addresses.\n"+
|
||||
"This means the shipper will never be configured and replication is dead.\n"+
|
||||
"The master must send an updated assignment to vs2 after vs1 re-registers as replica.")
|
||||
}
|
||||
t.Log("post-promote assignment has replica addresses — shipper will be configured")
|
||||
}
|
||||
|
||||
// TestPromote_ReplicasEmptyAfterPromote documents the current behavior:
|
||||
// after PromoteBestReplica, entry.Replicas is empty.
|
||||
func TestPromote_ReplicasEmptyAfterPromote(t *testing.T) {
|
||||
ms := testMasterServerForFailover(t)
|
||||
registerVolumeWithReplica(t, ms, "vol1", "vs1", "vs2", 1, 30*time.Second)
|
||||
|
||||
ms.blockRegistry.UpdateEntry("vol1", func(e *BlockVolumeEntry) {
|
||||
e.LastLeaseGrant = time.Now().Add(-1 * time.Minute)
|
||||
})
|
||||
ms.failoverBlockVolumes("vs1")
|
||||
|
||||
entry := lookupEntryT(t, ms.blockRegistry, "vol1")
|
||||
if entry.VolumeServer != "vs2" {
|
||||
t.Fatalf("primary=%s, want vs2", entry.VolumeServer)
|
||||
}
|
||||
// Document: Replicas is empty after promote. This is by design
|
||||
// (old primary "needs rebuild"), but it means the new primary
|
||||
// has no shipper target until a replica re-registers.
|
||||
t.Logf("after promote: replicas=%d (expected 0 — old primary removed)", len(entry.Replicas))
|
||||
|
||||
// Check the assignment queue for vs2.
|
||||
assignments := ms.blockAssignmentQueue.Peek("vs2")
|
||||
for _, a := range assignments {
|
||||
t.Logf("assignment for vs2: path=%s epoch=%d role=%d replicaDataAddr=%q replicaAddrs=%d",
|
||||
a.Path, a.Epoch, a.Role, a.ReplicaDataAddr, len(a.ReplicaAddrs))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user