Files
seaweedfs/weed/plugin/worker/volume_metrics.go
Chris Lu 1c0e24f06a fix(balance): don't move remote-tiered volumes; don't fatal on missing .idx (#9335)
* fix(volume): don't fatal on missing .idx for remote-tiered volume

A .vif left behind without its .idx (orphaned by a crashed move, partial
copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the
whole volume server down on boot, killing every healthy volume on it
too. For remote-tiered volumes treat it as a per-volume load error so
the server can come up and the operator can clean up the stray .vif.

Refs #9331.

* fix(balance): skip remote-tiered volumes in admin balance detection

The admin/worker balance detector had no equivalent of the shell-side
guard ("does not move volume in remote storage" in
command_volume_balance.go), so it scheduled moves on remote-tiered
volumes. The "move" copies .idx/.vif to the destination and then calls
Volume.Destroy on the source, which calls backendStorage.DeleteFile —
deleting the remote object the destination's new .vif now points at.

Populate HasRemoteCopy on the metrics emitted by both the admin
maintenance scanner and the worker's master poll, then drop those
volumes at the top of Detection.

Fixes #9331.

* Apply suggestion from @gemini-code-assist[bot]

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix(volume): keep remote data on volume-move-driven delete

The on-source delete after a volume move (admin/worker balance and
shell volume.move) ran Volume.Destroy with no way to opt out of the
remote-object cleanup. Volume.Destroy unconditionally calls
backendStorage.DeleteFile for remote-tiered volumes, so a successful
move would copy .idx/.vif to the destination and then nuke the cloud
object the destination's new .vif was already pointing at.

Add VolumeDeleteRequest.keep_remote_data and plumb it through
Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The
balance task and shell volume.move set it to true; the post-tier-upload
cleanup of other replicas and the over-replication trim in
volume.fix.replication also set it to true since the remote object is
still referenced. Other real-delete callers keep the default. The
delete-before-receive path in VolumeCopy also sets it: the inbound copy
carries a .vif that may reference the same cloud object as the
existing volume.

Refs #9331.

* test(storage): in-process remote-tier integration tests

Cover the four operations the user is most likely to run against a
cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by
registering a local-disk-backed BackendStorage as the "remote" tier and
exercising the real Volume / DiskLocation / EC encoder code paths.

Locks in:
- Destroy(keepRemoteData=true) preserves the remote object (move case)
- Destroy(keepRemoteData=false) deletes it (real-delete case)
- Vacuum/compact on a remote-tier volume never deletes the remote object
- EC encode requires the local .dat (callers must download first)
- EC encode + rebuild round-trips after a tier-down

Tests run in-process and finish in under a second total — no cluster,
binary, or external storage required.

* fix(rust-volume): keep remote data on volume-move-driven delete

Mirror the Go fix in seaweed-volume: plumb keep_remote_data through
grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume
→ Volume.destroy, and skip the s3-tier delete_file call when the flag
is set. The pre-receive cleanup in volume_copy passes true for the
same reason as the Go side: the inbound copy carries a .vif that may
reference the same cloud object as the existing volume.

The Rust loader already warns rather than fataling on a stray .vif
without an .idx (volume.rs load_index_inmemory / load_index_redb), so
no counterpart to the Go fatal-on-missing-idx fix is needed.

Refs #9331.

* fix(volume): preserve remote tier on IO-error eviction; fix EC test target

Two review nits:

- Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored
  volumes with keepRemoteData=false, so a transient local fault on a
  remote-tiered volume would also nuke the cloud object. Track the
  delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile()
  for IO-error evictions; TTL-expired evictions still pass false.

- TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but
  called them "parity" — by the klauspost/reedsolomon convention shards
  0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1
  are parity. Switch the loop to delete the parity range so the
  intent matches the indices.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-05-06 15:19:43 -07:00

217 lines
7.1 KiB
Go

package pluginworker
import (
"context"
"errors"
"fmt"
"regexp"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
)
// CollectVolumeMetricsFromMasters dials the provided master addresses in order
// until one returns a usable volume list, then converts that into per-volume
// health metrics, an active-topology view, and a replica-location map.
func CollectVolumeMetricsFromMasters(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
grpcDialOption grpc.DialOption,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) {
if grpcDialOption == nil {
return nil, nil, nil, fmt.Errorf("grpc dial option is not configured")
}
if len(masterAddresses) == 0 {
return nil, nil, nil, fmt.Errorf("no master addresses provided in cluster context")
}
for _, masterAddress := range masterAddresses {
response, err := FetchVolumeList(ctx, masterAddress, grpcDialOption)
if err != nil {
glog.Warningf("Plugin worker failed master volume list at %s: %v", masterAddress, err)
continue
}
metrics, activeTopology, replicaMap, buildErr := buildVolumeMetrics(response, collectionFilter)
if buildErr != nil {
// Configuration errors (e.g. invalid regex) will fail on every master,
// so return immediately instead of masking them with retries.
if isConfigError(buildErr) {
return nil, nil, nil, buildErr
}
glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr)
continue
}
return metrics, activeTopology, replicaMap, nil
}
return nil, nil, nil, fmt.Errorf("failed to load topology from all provided masters")
}
// FetchVolumeList dials the given master address (trying both the address as
// given and the gRPC port variant) and returns the master's volume list. Used
// by detection helpers that already know which master address to talk to.
func FetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.DialOption) (*master_pb.VolumeListResponse, error) {
var lastErr error
for _, candidate := range MasterAddressCandidates(address) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
dialCtx, cancelDial := context.WithTimeout(ctx, 5*time.Second)
conn, err := pb.GrpcDial(dialCtx, candidate, false, grpcDialOption)
cancelDial()
if err != nil {
lastErr = err
continue
}
client := master_pb.NewSeaweedClient(conn)
callCtx, cancelCall := context.WithTimeout(ctx, 10*time.Second)
response, callErr := client.VolumeList(callCtx, &master_pb.VolumeListRequest{})
cancelCall()
_ = conn.Close()
if callErr == nil {
return response, nil
}
lastErr = callErr
}
if lastErr == nil {
lastErr = fmt.Errorf("no valid master address candidate")
}
return nil, lastErr
}
func buildVolumeMetrics(
response *master_pb.VolumeListResponse,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) {
if response == nil || response.TopologyInfo == nil {
return nil, nil, nil, fmt.Errorf("volume list response has no topology info")
}
activeTopology := topology.NewActiveTopology(10)
if err := activeTopology.UpdateTopology(response.TopologyInfo); err != nil {
return nil, nil, nil, err
}
var collectionRegex *regexp.Regexp
trimmedFilter := strings.TrimSpace(collectionFilter)
filterMode := CollectionFilterMode(trimmedFilter)
if trimmedFilter != "" && filterMode != CollectionFilterAll && filterMode != CollectionFilterEach && trimmedFilter != "*" {
var err error
collectionRegex, err = regexp.Compile(trimmedFilter)
if err != nil {
return nil, nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)}
}
}
volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024
now := time.Now()
metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256)
replicaMap := make(map[uint32][]workertypes.ReplicaLocation)
for _, dc := range response.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for diskType, diskInfo := range node.DiskInfos {
for _, volume := range diskInfo.VolumeInfos {
// Build replica map from ALL volumes BEFORE collection filtering,
// since replicas may span filtered/unfiltered nodes.
replicaMap[volume.Id] = append(replicaMap[volume.Id], workertypes.ReplicaLocation{
DataCenter: dc.Id,
Rack: rack.Id,
NodeID: node.Id,
})
if collectionRegex != nil && !collectionRegex.MatchString(volume.Collection) {
continue
}
metric := &workertypes.VolumeHealthMetrics{
VolumeID: volume.Id,
Server: node.Id,
ServerAddress: string(pb.NewServerAddressFromDataNode(node)),
DiskType: diskType,
DiskId: volume.DiskId,
DataCenter: dc.Id,
Rack: rack.Id,
Collection: volume.Collection,
Size: volume.Size,
DeletedBytes: volume.DeletedByteCount,
LastModified: time.Unix(volume.ModifiedAtSecond, 0),
ReplicaCount: 1,
ExpectedReplicas: int(volume.ReplicaPlacement),
IsReadOnly: volume.ReadOnly,
HasRemoteCopy: volume.RemoteStorageName != "",
}
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
}
if volumeSizeLimitBytes > 0 {
metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
}
metric.Age = now.Sub(metric.LastModified)
metrics = append(metrics, metric)
}
}
}
}
}
replicaCounts := make(map[uint32]int)
for _, metric := range metrics {
replicaCounts[metric.VolumeID]++
}
for _, metric := range metrics {
metric.ReplicaCount = replicaCounts[metric.VolumeID]
}
return metrics, activeTopology, replicaMap, nil
}
// configError wraps configuration errors that should not be retried across masters.
type configError struct {
err error
}
func (e *configError) Error() string { return e.err.Error() }
func (e *configError) Unwrap() error { return e.err }
func isConfigError(err error) bool {
var ce *configError
return errors.As(err, &ce)
}
// MasterAddressCandidates returns address forms to try when dialing a master:
// the address as given plus the gRPC variant (port + 10000). Both are tried
// because callers may pass either an HTTP-port or gRPC-port address.
func MasterAddressCandidates(address string) []string {
trimmed := strings.TrimSpace(address)
if trimmed == "" {
return nil
}
candidateSet := map[string]struct{}{
trimmed: {},
}
converted := pb.ServerToGrpcAddress(trimmed)
candidateSet[converted] = struct{}{}
candidates := make([]string, 0, len(candidateSet))
for candidate := range candidateSet {
candidates = append(candidates, candidate)
}
sort.Strings(candidates)
return candidates
}