mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(ec): verify full shard set before deleting source volume (#9490) Before this change, both the worker EC task and the shell ec.encode command would delete the source .dat as soon as MountEcShards returned — even if distribute/mount failed partway, leaving fewer than 14 shards in the cluster. The deletion was logged at V(2), so by the time someone noticed missing data the only trace was a 0-byte .dat synthesized by disk_location at next restart. - Worker path adds Step 6: poll VolumeEcShardsInfo on every destination, union the bitmaps, and refuse to call deleteOriginalVolume unless all TotalShardsCount distinct shard ids are observed. A failed gate leaves the source readonly so the next detection scan can retry. - Shell ec.encode adds the same gate after EcBalance, walking the master topology with collectEcNodeShardsInfo. - VolumeDelete RPC success and .dat/.idx unlinks now log at V(0) so any source destruction is traceable in default-verbosity production logs. The EC-balance-vs-in-flight-encode race is intentionally left for a follow-up; balance should refuse to move shards for a volume whose encode job is not in Completed state. * fix(ec): trim doc comments on the new shard-verification path Drop WHAT-describing godoc on freshly added helpers; keep only the WHY notes (query-error policy in VerifyShardsAcrossServers, the #9490 reference at the call sites). * fix(ec): drop issue-number anchors from new comments Issue references age poorly — the why behind each comment already stands on its own. * fix(ec): parametrize RequireFullShardSet on totalShards Take totalShards as an argument instead of reading the package-level TotalShardsCount constant. The OSS callers continue to pass 14, but the helper is now usable with any DataShards+ParityShards ratio. * test(plugin_workers): make fake volume server respond to VolumeEcShardsInfo The new pre-delete verification gate calls VolumeEcShardsInfo on every destination after mount, and the fake server's UnimplementedVolumeServer returns Unimplemented — the verifier read that as zero shards on every node and aborted source deletion. Build the response from recorded mount requests so the integration test exercises the gate end-to-end. * fix(rust/volume): log .dat/.idx unlink with size in remove_volume_files Mirror the Go-side change in weed/storage/volume_write.go: stat each file before removing and emit an info-level log for .dat/.idx so a destructive call is always traceable. The OSS Rust crate previously unlinked them silently. * fix(ec/decode): verify regenerated .dat before deleting EC shards After mountDecodedVolume succeeds, the previous code immediately unmounts and deletes every EC shard. A silent failure in generate or mount could leave the cluster with neither shards nor a valid normal volume. Probe ReadVolumeFileStatus on the target and refuse to proceed if dat or idx is 0 bytes. Also make the fake volume server's VolumeEcShardsInfo reflect whichever shard files exist on disk (seeded for tests as well as mounted via RPC), so the new gate can be exercised end-to-end. * fix(ec): address PR review nits in verification + fake server - Drop unused ServerShardInventory.Sizes field. - Skip shard ids >= MaxShardCount before bitmap Set so the ShardBits bound is explicit (Set already no-ops on overflow, this is for clarity). - Nil-guard the fake server's VolumeEcShardsInfo so a malformed call doesn't panic the test process.
473 lines
17 KiB
Go
473 lines
17 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandEcDecode{})
|
|
}
|
|
|
|
type commandEcDecode struct {
|
|
}
|
|
|
|
func (c *commandEcDecode) Name() string {
|
|
return "ec.decode"
|
|
}
|
|
|
|
func (c *commandEcDecode) Help() string {
|
|
return `decode a erasure coded volume into a normal volume
|
|
|
|
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>] [-checkMinFreeSpace]
|
|
|
|
The -collection parameter supports regular expressions for pattern matching:
|
|
- Use exact match: ec.decode -collection="^mybucket$"
|
|
- Match multiple buckets: ec.decode -collection="bucket.*"
|
|
- Match all collections: ec.decode -collection=".*"
|
|
|
|
Options:
|
|
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
|
|
-checkMinFreeSpace: check min free space when selecting the decode target (default true)
|
|
|
|
Examples:
|
|
# Decode EC shards from HDD (default)
|
|
ec.decode -collection=mybucket
|
|
|
|
# Decode EC shards from SSD
|
|
ec.decode -collection=mybucket -diskType=ssd
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandEcDecode) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
|
|
collection := decodeCommand.String("collection", "", "the collection name")
|
|
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
|
|
checkMinFreeSpace := decodeCommand.Bool("checkMinFreeSpace", true, "check min free space when selecting the decode target")
|
|
if err = decodeCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
|
|
vid := needle.VolumeId(*volumeId)
|
|
diskType := types.ToDiskType(*diskTypeStr)
|
|
|
|
// collect topology information
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var diskUsageState *decodeDiskUsageState
|
|
if *checkMinFreeSpace {
|
|
diskUsageState = newDecodeDiskUsageState(topologyInfo, diskType)
|
|
}
|
|
|
|
// volumeId is provided
|
|
if vid != 0 {
|
|
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState)
|
|
}
|
|
|
|
// apply to all volumes in the collection
|
|
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("ec decode volumes: %v\n", volumeIds)
|
|
for _, vid := range volumeIds {
|
|
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType, checkMinFreeSpace bool, diskUsageState *decodeDiskUsageState) (err error) {
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
// find volume location
|
|
nodeToEcShardsInfo := collectEcNodeShardsInfo(topoInfo, vid, diskType)
|
|
|
|
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo)
|
|
|
|
if len(nodeToEcShardsInfo) == 0 {
|
|
return fmt.Errorf("no EC shards found for volume %d (diskType %s)", vid, diskType.ReadableString())
|
|
}
|
|
|
|
var originalShardCounts map[pb.ServerAddress]int
|
|
if diskUsageState != nil {
|
|
originalShardCounts = make(map[pb.ServerAddress]int, len(nodeToEcShardsInfo))
|
|
for location, si := range nodeToEcShardsInfo {
|
|
originalShardCounts[location] = si.Count()
|
|
}
|
|
}
|
|
|
|
var eligibleTargets map[pb.ServerAddress]struct{}
|
|
if checkMinFreeSpace {
|
|
if diskUsageState == nil {
|
|
return fmt.Errorf("min free space checking requires disk usage state")
|
|
}
|
|
eligibleTargets = make(map[pb.ServerAddress]struct{})
|
|
for location := range nodeToEcShardsInfo {
|
|
if freeCount, found := diskUsageState.freeVolumeCount(location); found && freeCount > 0 {
|
|
eligibleTargets[location] = struct{}{}
|
|
}
|
|
}
|
|
if len(eligibleTargets) == 0 {
|
|
return fmt.Errorf("no eligible target datanodes with free volume slots for volume %d (diskType %s); use -checkMinFreeSpace=false to override", vid, diskType.ReadableString())
|
|
}
|
|
}
|
|
|
|
// collect ec shards to the server with most space
|
|
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid, eligibleTargets)
|
|
if err != nil {
|
|
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
|
|
}
|
|
|
|
// generate a normal volume
|
|
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
|
|
if err != nil {
|
|
// Special case: if the EC index has no live entries, decoding is a no-op.
|
|
// Just purge EC shards and return success without generating/mounting an empty volume.
|
|
if isEcDecodeEmptyVolumeErr(err) {
|
|
if err := unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid); err != nil {
|
|
return err
|
|
}
|
|
if diskUsageState != nil {
|
|
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, false)
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
|
|
}
|
|
|
|
// mount the decoded volume after server-side offline compaction succeeded
|
|
err = mountDecodedVolume(commandEnv.option.GrpcDialOption, targetNodeLocation, vid)
|
|
if err != nil {
|
|
return fmt.Errorf("mount decoded volume %d on %s: %v", vid, targetNodeLocation, err)
|
|
}
|
|
|
|
// Confirm the regenerated .dat is present and non-empty before destroying
|
|
// the shards. Without this gate, a silent failure in generate/mount could
|
|
// leave the cluster with neither shards nor volume.
|
|
if err := verifyDecodedVolumeBeforeDelete(commandEnv.option.GrpcDialOption, targetNodeLocation, vid); err != nil {
|
|
return fmt.Errorf("verify decoded volume %d on %s before deleting shards: %w", vid, targetNodeLocation, err)
|
|
}
|
|
|
|
// delete the previous ec shards
|
|
err = unmountAndDeleteEcShardsWithPrefix("deleteDecodedEcShards", commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid)
|
|
if err != nil {
|
|
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
|
|
}
|
|
if diskUsageState != nil {
|
|
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, true)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isEcDecodeEmptyVolumeErr(err error) bool {
|
|
st, ok := status.FromError(err)
|
|
if !ok {
|
|
return false
|
|
}
|
|
if st.Code() != codes.FailedPrecondition {
|
|
return false
|
|
}
|
|
// Keep this robust against wording tweaks while still being specific.
|
|
return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
|
|
}
|
|
|
|
func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
|
|
return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid)
|
|
}
|
|
|
|
func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
|
|
ewg := NewErrorWaitGroup(len(nodeToShardsInfo))
|
|
|
|
// unmount and delete ec shards in parallel (one goroutine per location)
|
|
for location, si := range nodeToShardsInfo {
|
|
location, si := location, si // capture loop variables for goroutine
|
|
ewg.Add(func() error {
|
|
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
|
|
if err := unmountEcShards(grpcDialOption, vid, location, si.Ids()); err != nil {
|
|
return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
|
|
}
|
|
|
|
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
|
|
if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, si.Ids()); err != nil {
|
|
return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
return ewg.Wait()
|
|
}
|
|
|
|
func verifyDecodedVolumeBeforeDelete(grpcDialOption grpc.DialOption, target pb.ServerAddress, vid needle.VolumeId) error {
|
|
var resp *volume_server_pb.ReadVolumeFileStatusResponse
|
|
if err := operation.WithVolumeServerClient(false, target, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
|
r, e := client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{
|
|
VolumeId: uint32(vid),
|
|
})
|
|
if e != nil {
|
|
return e
|
|
}
|
|
resp = r
|
|
return nil
|
|
}); err != nil {
|
|
return fmt.Errorf("read volume file status: %w", err)
|
|
}
|
|
if resp.DatFileSize == 0 {
|
|
return fmt.Errorf("decoded .dat is 0 bytes")
|
|
}
|
|
if resp.IdxFileSize == 0 {
|
|
return fmt.Errorf("decoded .idx is 0 bytes")
|
|
}
|
|
glog.V(0).Infof("ec decode verification ok for volume %d on %s: dat=%d idx=%d", vid, target, resp.DatFileSize, resp.IdxFileSize)
|
|
return nil
|
|
}
|
|
|
|
func mountDecodedVolume(grpcDialOption grpc.DialOption, targetNodeLocation pb.ServerAddress, vid needle.VolumeId) error {
|
|
return operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
|
|
VolumeId: uint32(vid),
|
|
})
|
|
return mountErr
|
|
})
|
|
}
|
|
|
|
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
|
|
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
|
|
|
|
err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
})
|
|
return genErr
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId, eligibleTargets map[pb.ServerAddress]struct{}) (targetNodeLocation pb.ServerAddress, err error) {
|
|
|
|
maxShardCount := -1
|
|
existingShardsInfo := erasure_coding.NewShardsInfo()
|
|
for loc, si := range nodeToShardsInfo {
|
|
if eligibleTargets != nil {
|
|
if _, ok := eligibleTargets[loc]; !ok {
|
|
continue
|
|
}
|
|
}
|
|
toBeCopiedShardCount := si.MinusParityShards().Count()
|
|
if toBeCopiedShardCount > maxShardCount {
|
|
maxShardCount = toBeCopiedShardCount
|
|
targetNodeLocation = loc
|
|
existingShardsInfo = si
|
|
}
|
|
}
|
|
if targetNodeLocation == "" {
|
|
return "", fmt.Errorf("no eligible target datanodes available to decode volume %d", vid)
|
|
}
|
|
|
|
fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo)
|
|
|
|
copiedShardsInfo := erasure_coding.NewShardsInfo()
|
|
for loc, si := range nodeToShardsInfo {
|
|
if loc == targetNodeLocation {
|
|
continue
|
|
}
|
|
|
|
needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards()
|
|
|
|
err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
|
|
// Always collect .ecj from every shard location. Each server's .ecj
|
|
// only contains deletions for needles whose data resides in shards
|
|
// held by that server. Without merging all .ecj files, deletions
|
|
// recorded on other servers would be lost during decode.
|
|
if needToCopyShardsInfo.Count() > 0 {
|
|
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation)
|
|
} else {
|
|
fmt.Printf("collect ecj %d %s => %s\n", vid, loc, targetNodeLocation)
|
|
}
|
|
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
ShardIds: needToCopyShardsInfo.IdsUint32(),
|
|
CopyEcxFile: false,
|
|
CopyEcjFile: true,
|
|
CopyVifFile: needToCopyShardsInfo.Count() > 0,
|
|
SourceDataNode: string(loc),
|
|
})
|
|
if copyErr != nil {
|
|
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr)
|
|
}
|
|
|
|
if needToCopyShardsInfo.Count() > 0 {
|
|
fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation)
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
ShardIds: needToCopyShardsInfo.IdsUint32(),
|
|
})
|
|
if mountErr != nil {
|
|
return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
copiedShardsInfo.Add(needToCopyShardsInfo)
|
|
}
|
|
|
|
nodeToShardsInfo[targetNodeLocation] = existingShardsInfo.Plus(copiedShardsInfo)
|
|
|
|
return targetNodeLocation, err
|
|
}
|
|
|
|
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
|
|
var resp *master_pb.LookupVolumeResponse
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.VolumeIdLocations, nil
|
|
}
|
|
|
|
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
|
|
// compile regex pattern for collection matching
|
|
collectionRegex, err := compileCollectionPattern(collectionPattern)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
|
|
}
|
|
|
|
vidMap := make(map[uint32]bool)
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
for _, v := range diskInfo.EcShardInfos {
|
|
if collectionRegex.MatchString(v.Collection) {
|
|
vidMap[v.Id] = true
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
for vid := range vidMap {
|
|
vids = append(vids, needle.VolumeId(vid))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]*erasure_coding.ShardsInfo {
|
|
res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo)
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
for _, v := range diskInfo.EcShardInfos {
|
|
if v.Id == uint32(vid) {
|
|
res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
type decodeDiskUsageState struct {
|
|
byNode map[pb.ServerAddress]*decodeDiskUsageCounts
|
|
}
|
|
|
|
type decodeDiskUsageCounts struct {
|
|
maxVolumeCount int64
|
|
volumeCount int64
|
|
remoteVolumeCount int64
|
|
ecShardCount int64
|
|
}
|
|
|
|
func newDecodeDiskUsageState(topoInfo *master_pb.TopologyInfo, diskType types.DiskType) *decodeDiskUsageState {
|
|
state := &decodeDiskUsageState{byNode: make(map[pb.ServerAddress]*decodeDiskUsageCounts)}
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
state.byNode[pb.NewServerAddressFromDataNode(dn)] = &decodeDiskUsageCounts{
|
|
maxVolumeCount: diskInfo.MaxVolumeCount,
|
|
volumeCount: diskInfo.VolumeCount,
|
|
remoteVolumeCount: diskInfo.RemoteVolumeCount,
|
|
ecShardCount: int64(countShards(diskInfo.EcShardInfos)),
|
|
}
|
|
}
|
|
})
|
|
return state
|
|
}
|
|
|
|
func (state *decodeDiskUsageState) freeVolumeCount(location pb.ServerAddress) (int64, bool) {
|
|
if state == nil {
|
|
return 0, false
|
|
}
|
|
usage, found := state.byNode[location]
|
|
if !found {
|
|
return 0, false
|
|
}
|
|
free := usage.maxVolumeCount - (usage.volumeCount - usage.remoteVolumeCount)
|
|
free -= (usage.ecShardCount + int64(erasure_coding.DataShardsCount) - 1) / int64(erasure_coding.DataShardsCount)
|
|
return free, true
|
|
}
|
|
|
|
func (state *decodeDiskUsageState) applyDecode(targetNodeLocation pb.ServerAddress, shardCounts map[pb.ServerAddress]int, createdVolume bool) {
|
|
if state == nil {
|
|
return
|
|
}
|
|
for location, shardCount := range shardCounts {
|
|
if usage, found := state.byNode[location]; found {
|
|
usage.ecShardCount -= int64(shardCount)
|
|
}
|
|
}
|
|
if createdVolume {
|
|
if usage, found := state.byNode[targetNodeLocation]; found {
|
|
usage.volumeCount++
|
|
}
|
|
}
|
|
}
|