diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 138c977a5..5d82e92a9 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -231,17 +231,42 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m return fmt.Errorf("failed to get volume locations for EC encoding: %w", err) } - // build a map of (volumeId, serverAddress) -> freeVolumeCount + // Build a map of (volumeId, serverAddress) -> freeVolumeCount. + // Key by dn.Address so it matches wdclient.Location.Url. In deployments + // where dn.Id is a short name (e.g. Kubernetes StatefulSet pod name) + // while dn.Address is a FQDN:port, keying by dn.Id would never match the + // location Url during the health-check lookup below. freeVolumeCountMap := make(map[string]int) // key: volumeId-serverAddress eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + addr := dn.Address + if addr == "" { + addr = dn.Id // older nodes use ip:port as id + } for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { - key := fmt.Sprintf("%d-%s", v.Id, dn.Id) + key := fmt.Sprintf("%d-%s", v.Id, addr) freeVolumeCountMap[key] = int(diskInfo.FreeVolumeCount) } } }) + // Filter replicas by free capacity BEFORE marking volumes readonly so that + // a failed health check does not strand volumes in readonly state. + filteredLocations := make(map[needle.VolumeId][]wdclient.Location) + for _, vid := range volumeIds { + var filteredLocs []wdclient.Location + for _, l := range locations[vid] { + key := fmt.Sprintf("%d-%s", vid, l.Url) + if freeCount, found := freeVolumeCountMap[key]; found && freeCount >= 2 { + filteredLocs = append(filteredLocs, l) + } + } + if len(filteredLocs) == 0 { + return fmt.Errorf("no healthy replicas (FreeVolumeCount >= 2) found for volume %d to use as source for EC encoding", vid) + } + filteredLocations[vid] = filteredLocs + } + // mark volumes as readonly ewg := NewErrorWaitGroup(maxParallelization) for _, vid := range volumeIds { @@ -263,24 +288,10 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m // by syncing missing entries between replicas before encoding bestReplicas := make(map[needle.VolumeId]wdclient.Location) for _, vid := range volumeIds { - locs := locations[vid] collection := volumeIdToCollection[vid] - // Filter locations to only include those on healthy disks (FreeVolumeCount >= 2) - var filteredLocs []wdclient.Location - for _, l := range locs { - key := fmt.Sprintf("%d-%s", vid, l.Url) - if freeCount, found := freeVolumeCountMap[key]; found && freeCount >= 2 { - filteredLocs = append(filteredLocs, l) - } - } - - if len(filteredLocs) == 0 { - return fmt.Errorf("no healthy replicas (FreeVolumeCount >= 2) found for volume %d to use as source for EC encoding", vid) - } - // Sync missing entries between replicas, then select the best one - bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocs, "", writer) + bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, filteredLocations[vid], "", writer) if selectErr != nil { return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr) }