fix(shell): ec.encode health-check key mismatch on K8s deployments (#9164)

Build freeVolumeCountMap using dn.Address so the key matches
wdclient.Location.Url during the subsequent lookup. Keying by dn.Id
silently filtered out every replica in deployments where dn.Id is a
short name (e.g. Kubernetes StatefulSet pod name) while the location
Url is a FQDN:port, causing "no healthy replicas" even with ample
free capacity.

Also filter replicas before marking volumes readonly so that a failed
health check no longer strands volumes in readonly state.

Fixes #9145
This commit is contained in:
Chris Lu
2026-04-20 15:57:30 -07:00
committed by GitHub
parent e725eb4079
commit caaa53aee3

View File

@@ -231,17 +231,42 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
return fmt.Errorf("failed to get volume locations for EC encoding: %w", err)
}
// build a map of (volumeId, serverAddress) -> freeVolumeCount
// Build a map of (volumeId, serverAddress) -> freeVolumeCount.
// Key by dn.Address so it matches wdclient.Location.Url. In deployments
// where dn.Id is a short name (e.g. Kubernetes StatefulSet pod name)
// while dn.Address is a FQDN:port, keying by dn.Id would never match the
// location Url during the health-check lookup below.
freeVolumeCountMap := make(map[string]int) // key: volumeId-serverAddress
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
addr := dn.Address
if addr == "" {
addr = dn.Id // older nodes use ip:port as id
}
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
key := fmt.Sprintf("%d-%s", v.Id, dn.Id)
key := fmt.Sprintf("%d-%s", v.Id, addr)
freeVolumeCountMap[key] = int(diskInfo.FreeVolumeCount)
}
}
})
// Filter replicas by free capacity BEFORE marking volumes readonly so that
// a failed health check does not strand volumes in readonly state.
filteredLocations := make(map[needle.VolumeId][]wdclient.Location)
for _, vid := range volumeIds {
var filteredLocs []wdclient.Location
for _, l := range locations[vid] {
key := fmt.Sprintf("%d-%s", vid, l.Url)
if freeCount, found := freeVolumeCountMap[key]; found && freeCount >= 2 {
filteredLocs = append(filteredLocs, l)
}
}
if len(filteredLocs) == 0 {
return fmt.Errorf("no healthy replicas (FreeVolumeCount >= 2) found for volume %d to use as source for EC encoding", vid)
}
filteredLocations[vid] = filteredLocs
}
// mark volumes as readonly
ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
@@ -263,24 +288,10 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
// by syncing missing entries between replicas before encoding
bestReplicas := make(map[needle.VolumeId]wdclient.Location)
for _, vid := range volumeIds {
locs := locations[vid]
collection := volumeIdToCollection[vid]
// Filter locations to only include those on healthy disks (FreeVolumeCount >= 2)
var filteredLocs []wdclient.Location
for _, l := range locs {
key := fmt.Sprintf("%d-%s", vid, l.Url)
if freeCount, found := freeVolumeCountMap[key]; found && freeCount >= 2 {
filteredLocs = append(filteredLocs, l)
}
}
if len(filteredLocs) == 0 {
return fmt.Errorf("no healthy replicas (FreeVolumeCount >= 2) found for volume %d to use as source for EC encoding", vid)
}
// Sync missing entries between replicas, then select the best one
bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocs, "", writer)
bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer)
if selectErr != nil {
return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr)
}