Files
seaweedfs/weed/shell/command_volume_merge.go
Chris Lu 1c0e24f06a fix(balance): don't move remote-tiered volumes; don't fatal on missing .idx (#9335)
* fix(volume): don't fatal on missing .idx for remote-tiered volume

A .vif left behind without its .idx (orphaned by a crashed move, partial
copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the
whole volume server down on boot, killing every healthy volume on it
too. For remote-tiered volumes treat it as a per-volume load error so
the server can come up and the operator can clean up the stray .vif.

Refs #9331.

* fix(balance): skip remote-tiered volumes in admin balance detection

The admin/worker balance detector had no equivalent of the shell-side
guard ("does not move volume in remote storage" in
command_volume_balance.go), so it scheduled moves on remote-tiered
volumes. The "move" copies .idx/.vif to the destination and then calls
Volume.Destroy on the source, which calls backendStorage.DeleteFile —
deleting the remote object the destination's new .vif now points at.

Populate HasRemoteCopy on the metrics emitted by both the admin
maintenance scanner and the worker's master poll, then drop those
volumes at the top of Detection.

Fixes #9331.

* Apply suggestion from @gemini-code-assist[bot]

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix(volume): keep remote data on volume-move-driven delete

The on-source delete after a volume move (admin/worker balance and
shell volume.move) ran Volume.Destroy with no way to opt out of the
remote-object cleanup. Volume.Destroy unconditionally calls
backendStorage.DeleteFile for remote-tiered volumes, so a successful
move would copy .idx/.vif to the destination and then nuke the cloud
object the destination's new .vif was already pointing at.

Add VolumeDeleteRequest.keep_remote_data and plumb it through
Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The
balance task and shell volume.move set it to true; the post-tier-upload
cleanup of other replicas and the over-replication trim in
volume.fix.replication also set it to true since the remote object is
still referenced. Other real-delete callers keep the default. The
delete-before-receive path in VolumeCopy also sets it: the inbound copy
carries a .vif that may reference the same cloud object as the
existing volume.

Refs #9331.

* test(storage): in-process remote-tier integration tests

Cover the four operations the user is most likely to run against a
cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by
registering a local-disk-backed BackendStorage as the "remote" tier and
exercising the real Volume / DiskLocation / EC encoder code paths.

Locks in:
- Destroy(keepRemoteData=true) preserves the remote object (move case)
- Destroy(keepRemoteData=false) deletes it (real-delete case)
- Vacuum/compact on a remote-tier volume never deletes the remote object
- EC encode requires the local .dat (callers must download first)
- EC encode + rebuild round-trips after a tier-down

Tests run in-process and finish in under a second total — no cluster,
binary, or external storage required.

* fix(rust-volume): keep remote data on volume-move-driven delete

Mirror the Go fix in seaweed-volume: plumb keep_remote_data through
grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume
→ Volume.destroy, and skip the s3-tier delete_file call when the flag
is set. The pre-receive cleanup in volume_copy passes true for the
same reason as the Go side: the inbound copy carries a .vif that may
reference the same cloud object as the existing volume.

The Rust loader already warns rather than fataling on a stray .vif
without an .idx (volume.rs load_index_inmemory / load_index_redb), so
no counterpart to the Go fatal-on-missing-idx fix is needed.

Refs #9331.

* fix(volume): preserve remote tier on IO-error eviction; fix EC test target

Two review nits:

- Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored
  volumes with keepRemoteData=false, so a transient local fault on a
  remote-tiered volume would also nuke the cloud object. Track the
  delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile()
  for IO-error evictions; TTL-expired evictions still pass false.

- TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but
  called them "parity" — by the klauspost/reedsolomon convention shards
  0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1
  are parity. Switch the loop to delete the parity range so the
  intent matches the indices.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-05-06 15:19:43 -07:00

529 lines
16 KiB
Go

package shell
import (
"bytes"
"container/heap"
"context"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
// mergeIdleTimeoutSeconds is the timeout for idle streams during needle tailing.
// This ensures that slow or stalled streams don't block the merge indefinitely.
// Set to 5 seconds to handle network congestion and avoid premature stream termination.
// Can be made configurable in the future if needed for different deployment scenarios.
const mergeIdleTimeoutSeconds = 5
// mergeDeduplicationWindowNs defines the time window for deduplication across replicas.
// Since the same needle ID can have different timestamps on different servers due to
// clock skew and replication lag, we deduplicate needles with the same ID within this window.
// Set to 5 seconds in nanoseconds to handle typical server clock differences.
const mergeDeduplicationWindowNs = 5 * time.Second
func init() {
Commands = append(Commands, &commandVolumeMerge{})
}
type commandVolumeMerge struct{}
func (c *commandVolumeMerge) Name() string {
return "volume.merge"
}
func (c *commandVolumeMerge) Help() string {
return `merge replicas for a volume id in timestamp order into a fresh copy
volume.merge -volumeId <volume id>
This command:
1) marks the volume readonly on replicas (if not already)
2) allocates a temporary copy on a third location
3) merges replicas in append timestamp order, skipping duplicates
4) replaces the original replicas with the merged volume
5) restores writable state if it was writable before
`
}
func (c *commandVolumeMerge) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
mergeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := mergeCommand.Int("volumeId", 0, "the volume id")
targetNodeStr := mergeCommand.String("target", "", "optional target volume server <host>:<port> for temporary merge output")
noLock := mergeCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
if err = mergeCommand.Parse(args); err != nil {
return err
}
if *volumeIdInt == 0 {
return fmt.Errorf("volumeId is required")
}
if *noLock {
commandEnv.noLock = true
} else if err = commandEnv.confirmIsLocked(args); err != nil {
return err
}
volumeId := needle.VolumeId(*volumeIdInt)
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
replicas := volumeReplicas[uint32(volumeId)]
if len(replicas) < 2 {
return fmt.Errorf("volume %d has %d replica(s); merge requires at least two", volumeId, len(replicas))
}
volumeInfo := replicas[0].info
replicaPlacement, err := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
if err != nil {
return fmt.Errorf("parse replica placement for volume %d: %w", volumeId, err)
}
var targetServer pb.ServerAddress
if *targetNodeStr != "" {
targetServer = pb.ServerAddress(*targetNodeStr)
if isReplicaServer(targetServer, replicas) {
return fmt.Errorf("target %s already hosts volume %d", *targetNodeStr, volumeId)
}
if err = allocateMergeVolume(commandEnv.option.GrpcDialOption, targetServer, volumeInfo, replicaPlacement); err != nil {
return err
}
} else {
targetServer, err = allocateMergeVolumeOnThirdLocation(commandEnv.option.GrpcDialOption, allLocations, replicas, volumeInfo, replicaPlacement)
if err != nil {
return err
}
}
cleanupTarget := true
defer func() {
if !cleanupTarget {
return
}
if delErr := deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); delErr != nil {
glog.Warningf("failed to clean up temporary merge volume %d on %s: %v", volumeId, targetServer, delErr)
}
}()
writableReplicaIndices, err := ensureVolumeReadonly(commandEnv, replicas)
if err != nil {
return err
}
if len(writableReplicaIndices) > 0 {
defer func() {
// Only restore writable state for replicas that were originally writable
writableReplicas := make([]*VolumeReplica, 0, len(writableReplicaIndices))
for _, idx := range writableReplicaIndices {
writableReplicas = append(writableReplicas, replicas[idx])
}
if restoreErr := markReplicasWritable(commandEnv.option.GrpcDialOption, writableReplicas, true, false); restoreErr != nil {
glog.Warningf("failed to restore writable state for volume %d: %v", volumeId, restoreErr)
}
}()
}
done := make(chan struct{})
defer close(done)
sources := make([]needleStream, 0, len(replicas))
for _, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
sources = append(sources, startTailNeedleStream(commandEnv.option.GrpcDialOption, volumeId, server, done))
}
mergeErr := operation.WithVolumeServerClient(false, targetServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
version := needle.Version(volumeInfo.Version)
if version == 0 {
version = needle.GetCurrentVersion()
}
return mergeNeedleStreams(sources, func(streamIndex int, n *needle.Needle) error {
blob, size, err := needleBlobFromNeedle(n, version)
if err != nil {
return err
}
_, err = client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: uint32(volumeId),
NeedleId: uint64(n.Id),
Size: int32(size),
NeedleBlob: blob,
})
return err
})
})
if mergeErr != nil {
return mergeErr
}
for _, replica := range replicas {
sourceServer := pb.NewServerAddressFromDataNode(replica.location.dataNode)
if _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, targetServer, sourceServer, "", 0, false); err != nil {
return err
}
}
if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); err != nil {
return err
}
cleanupTarget = false
fmt.Fprintf(writer, "merged volume %d from %d replicas via %s\n", volumeId, len(replicas), targetServer)
return nil
}
type needleStream interface {
Next() (*needle.Needle, bool)
Err() error
}
type tailNeedleStream struct {
ch <-chan *needle.Needle
errMu sync.Mutex
err error
}
func (s *tailNeedleStream) Next() (*needle.Needle, bool) {
n, ok := <-s.ch
return n, ok
}
func (s *tailNeedleStream) Err() error {
s.errMu.Lock()
defer s.errMu.Unlock()
return s.err
}
func (s *tailNeedleStream) setErr(err error) {
s.errMu.Lock()
s.err = err
s.errMu.Unlock()
}
func startTailNeedleStream(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, server pb.ServerAddress, done <-chan struct{}) *tailNeedleStream {
ch := make(chan *needle.Needle, 32)
stream := &tailNeedleStream{ch: ch}
go func() {
err := operation.TailVolumeFromSource(server, grpcDialOption, volumeId, 0, mergeIdleTimeoutSeconds, func(n *needle.Needle) error {
select {
case ch <- n:
case <-done:
return fmt.Errorf("merge cancelled")
}
return nil
})
close(ch)
stream.setErr(err)
}()
return stream
}
type needleMergeItem struct {
streamIndex int
needle *needle.Needle
timestamp uint64
}
type needleMergeHeap []needleMergeItem
func (h needleMergeHeap) Len() int { return len(h) }
func (h needleMergeHeap) Less(i, j int) bool {
if h[i].timestamp == h[j].timestamp {
return h[i].needle.Id < h[j].needle.Id
}
return h[i].timestamp < h[j].timestamp
}
func (h needleMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *needleMergeHeap) Push(x any) {
*h = append(*h, x.(needleMergeItem))
}
func (h *needleMergeHeap) Pop() any {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}
func mergeNeedleStreams(streams []needleStream, consume func(int, *needle.Needle) error) error {
h := &needleMergeHeap{}
heap.Init(h)
for i, stream := range streams {
if n, ok := stream.Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: i, needle: n, timestamp: needleTimestamp(n)})
}
}
// Track seen needle IDs (by stream) within a time window to skip cross-stream duplicates only.
// Needles with the same ID from different streams within mergeDeduplicationWindowNs are considered
// cross-stream duplicates and skipped. Same-stream duplicates (overwrites) are kept.
// Map: needleId -> streamIndex that first processed it in this window
seenAtTimestamp := make(map[types.NeedleId]int)
var windowStartTimestamp uint64
windowInitialized := false
for h.Len() > 0 {
item := heap.Pop(h).(needleMergeItem)
ts := item.timestamp
n := item.needle
// Initialize window on first timestamp, or move to new window when outside current window
if !windowInitialized {
windowStartTimestamp = ts
windowInitialized = true
} else if ts > windowStartTimestamp+uint64(mergeDeduplicationWindowNs) {
// Moving to a new window: clear the watermark to reduce memory usage.
// This is safe because we only skip duplicates within the same time window.
seenAtTimestamp = make(map[types.NeedleId]int)
windowStartTimestamp = ts
}
// Skip cross-stream duplicates: if we've already seen this needle ID from a DIFFERENT stream
// within this time window, skip it. Same-stream duplicates (overwrites) are kept.
if seenStreamIdx, exists := seenAtTimestamp[n.Id]; exists && seenStreamIdx != item.streamIndex {
// Cross-stream duplicate from different stream - skip this occurrence
if nextN, ok := streams[item.streamIndex].Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: item.streamIndex, needle: nextN, timestamp: needleTimestamp(nextN)})
}
continue
}
// Record this stream's occurrence of this needle ID in this window
// (overwrite if from same stream, since we process in timestamp order)
seenAtTimestamp[n.Id] = item.streamIndex
if err := consume(item.streamIndex, n); err != nil {
return err
}
if nextN, ok := streams[item.streamIndex].Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: item.streamIndex, needle: nextN, timestamp: needleTimestamp(nextN)})
}
}
for _, stream := range streams {
if err := stream.Err(); err != nil {
return err
}
}
return nil
}
func needleTimestamp(n *needle.Needle) uint64 {
if n.AppendAtNs != 0 {
return n.AppendAtNs
}
if n.LastModified != 0 {
return uint64(time.Unix(int64(n.LastModified), 0).UnixNano())
}
return 0
}
// memoryBackendFile implements backend.BackendStorageFile using an in-memory buffer
type memoryBackendFile struct {
buf *bytes.Buffer
}
func (m *memoryBackendFile) ReadAt(p []byte, off int64) (n int, err error) {
data := m.buf.Bytes()
if off >= int64(len(data)) {
return 0, io.EOF
}
n = copy(p, data[off:])
if off+int64(n) < int64(len(data)) {
return n, nil
}
return n, io.EOF
}
func (m *memoryBackendFile) WriteAt(p []byte, off int64) (n int, err error) {
data := m.buf.Bytes()
if off > int64(len(data)) {
// Pad with zeros
m.buf.Write(make([]byte, off-int64(len(data))))
// Refresh data snapshot after padding to see the padded length
data = m.buf.Bytes()
}
if off == int64(len(data)) {
return m.buf.Write(p)
}
// Overwrite existing data: preserve any trailing bytes beyond the write range
newLen := off + int64(len(p))
if newLen < int64(len(data)) {
newLen = int64(len(data))
}
newData := make([]byte, newLen)
copy(newData, data)
copy(newData[off:], p)
m.buf = bytes.NewBuffer(newData)
return len(p), nil
}
func (m *memoryBackendFile) Truncate(off int64) error {
data := m.buf.Bytes()
if off > int64(len(data)) {
m.buf.Write(make([]byte, off-int64(len(data))))
} else {
m.buf = bytes.NewBuffer(data[:off])
}
return nil
}
func (m *memoryBackendFile) Close() error {
return nil
}
func (m *memoryBackendFile) GetStat() (datSize int64, modTime time.Time, err error) {
return int64(m.buf.Len()), time.Now(), nil
}
func (m *memoryBackendFile) Name() string {
return "memory"
}
func (m *memoryBackendFile) Sync() error {
return nil
}
func newMemoryBackendFile() *memoryBackendFile {
return &memoryBackendFile{
buf: &bytes.Buffer{},
}
}
func needleBlobFromNeedle(n *needle.Needle, version needle.Version) ([]byte, types.Size, error) {
// Use in-memory buffer for serialization to avoid expensive temporary file I/O
memFile := newMemoryBackendFile()
defer memFile.Close()
_, size, actualSize, err := n.Append(memFile, version)
if err != nil {
return nil, 0, err
}
buf := make([]byte, actualSize)
read, err := memFile.ReadAt(buf, 0)
if err != nil && err != io.EOF {
return nil, 0, err
}
return buf[:read], size, nil
}
func allocateMergeVolumeOnThirdLocation(grpcDialOption grpc.DialOption, allLocations []location, replicas []*VolumeReplica, info *master_pb.VolumeInformationMessage, replicaPlacement *super_block.ReplicaPlacement) (pb.ServerAddress, error) {
replicaNodes := map[string]struct{}{}
for _, replica := range replicas {
replicaNodes[replica.location.dataNode.Id] = struct{}{}
}
for _, loc := range allLocations {
if _, exists := replicaNodes[loc.dataNode.Id]; exists {
continue
}
if !locationHasDiskType(loc, info.DiskType) {
continue
}
server := pb.NewServerAddressFromDataNode(loc.dataNode)
if err := allocateMergeVolume(grpcDialOption, server, info, replicaPlacement); err != nil {
glog.V(1).Infof("failed to allocate merge volume on %s with replication %s: %v", server, replicaPlacement.String(), err)
continue
}
return server, nil
}
return "", fmt.Errorf("no third location available to merge volume %d", info.Id)
}
func allocateMergeVolume(grpcDialOption grpc.DialOption, server pb.ServerAddress, info *master_pb.VolumeInformationMessage, replicaPlacement *super_block.ReplicaPlacement) error {
return operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
VolumeId: info.Id,
Collection: info.Collection,
Preallocate: 0,
Replication: replicaPlacement.String(),
Ttl: needle.LoadTTLFromUint32(info.Ttl).String(),
DiskType: info.DiskType,
Version: info.Version,
})
return err
})
}
// ensureVolumeReadonly marks all replicas as readonly and returns the indices of replicas that were writable
func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([]int, error) {
var writableReplicaIndices []int
for i, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
err := operation.WithVolumeServerClient(false, server, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: replica.info.Id})
if err != nil {
return err
}
if !resp.IsReadOnly {
writableReplicaIndices = append(writableReplicaIndices, i)
}
return nil
})
if err != nil {
return nil, err
}
}
if len(writableReplicaIndices) > 0 {
if err := markReplicasWritable(commandEnv.option.GrpcDialOption, replicas, false, false); err != nil {
return nil, err
}
}
return writableReplicaIndices, nil
}
func isReplicaServer(target pb.ServerAddress, replicas []*VolumeReplica) bool {
for _, replica := range replicas {
if pb.NewServerAddressFromDataNode(replica.location.dataNode).Equals(target) {
return true
}
}
return false
}
func locationHasDiskType(loc location, diskType string) bool {
for _, diskInfo := range loc.dataNode.DiskInfos {
if diskInfo.Type == diskType {
return true
}
}
return false
}
func markReplicasWritable(grpcDialOption grpc.DialOption, replicas []*VolumeReplica, writable bool, persist bool) error {
for _, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
if writable {
_, err := client.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{VolumeId: replica.info.Id})
return err
}
_, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{VolumeId: replica.info.Id, Persist: persist})
return err
})
if err != nil {
return err
}
}
return nil
}