mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(volume): don't fatal on missing .idx for remote-tiered volume A .vif left behind without its .idx (orphaned by a crashed move, partial copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the whole volume server down on boot, killing every healthy volume on it too. For remote-tiered volumes treat it as a per-volume load error so the server can come up and the operator can clean up the stray .vif. Refs #9331. * fix(balance): skip remote-tiered volumes in admin balance detection The admin/worker balance detector had no equivalent of the shell-side guard ("does not move volume in remote storage" in command_volume_balance.go), so it scheduled moves on remote-tiered volumes. The "move" copies .idx/.vif to the destination and then calls Volume.Destroy on the source, which calls backendStorage.DeleteFile — deleting the remote object the destination's new .vif now points at. Populate HasRemoteCopy on the metrics emitted by both the admin maintenance scanner and the worker's master poll, then drop those volumes at the top of Detection. Fixes #9331. * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix(volume): keep remote data on volume-move-driven delete The on-source delete after a volume move (admin/worker balance and shell volume.move) ran Volume.Destroy with no way to opt out of the remote-object cleanup. Volume.Destroy unconditionally calls backendStorage.DeleteFile for remote-tiered volumes, so a successful move would copy .idx/.vif to the destination and then nuke the cloud object the destination's new .vif was already pointing at. Add VolumeDeleteRequest.keep_remote_data and plumb it through Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The balance task and shell volume.move set it to true; the post-tier-upload cleanup of other replicas and the over-replication trim in volume.fix.replication also set it to true since the remote object is still referenced. Other real-delete callers keep the default. The delete-before-receive path in VolumeCopy also sets it: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. Refs #9331. * test(storage): in-process remote-tier integration tests Cover the four operations the user is most likely to run against a cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by registering a local-disk-backed BackendStorage as the "remote" tier and exercising the real Volume / DiskLocation / EC encoder code paths. Locks in: - Destroy(keepRemoteData=true) preserves the remote object (move case) - Destroy(keepRemoteData=false) deletes it (real-delete case) - Vacuum/compact on a remote-tier volume never deletes the remote object - EC encode requires the local .dat (callers must download first) - EC encode + rebuild round-trips after a tier-down Tests run in-process and finish in under a second total — no cluster, binary, or external storage required. * fix(rust-volume): keep remote data on volume-move-driven delete Mirror the Go fix in seaweed-volume: plumb keep_remote_data through grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume → Volume.destroy, and skip the s3-tier delete_file call when the flag is set. The pre-receive cleanup in volume_copy passes true for the same reason as the Go side: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. The Rust loader already warns rather than fataling on a stray .vif without an .idx (volume.rs load_index_inmemory / load_index_redb), so no counterpart to the Go fatal-on-missing-idx fix is needed. Refs #9331. * fix(volume): preserve remote tier on IO-error eviction; fix EC test target Two review nits: - Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored volumes with keepRemoteData=false, so a transient local fault on a remote-tiered volume would also nuke the cloud object. Track the delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile() for IO-error evictions; TTL-expired evictions still pass false. - TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but called them "parity" — by the klauspost/reedsolomon convention shards 0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1 are parity. Switch the loop to delete the parity range so the intent matches the indices. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
292 lines
12 KiB
Go
292 lines
12 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeMove{})
|
|
}
|
|
|
|
type commandVolumeMove struct {
|
|
}
|
|
|
|
func (c *commandVolumeMove) Name() string {
|
|
return "volume.move"
|
|
}
|
|
|
|
func (c *commandVolumeMove) Help() string {
|
|
return `move a live volume from one volume server to another volume server
|
|
|
|
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id>
|
|
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [hdd|ssd|<tag>]
|
|
|
|
This command move a live volume from one volume server to another volume server. Here are the steps:
|
|
|
|
1. This command marks the source volume as read-only, copies it to the target volume server, and records the last entry timestamp.
|
|
2. This command asks the target volume server to mount the new volume.
|
|
3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain any in-flight requests.
|
|
4. This command asks the source volume server to delete the source volume.
|
|
|
|
The option "-disk [hdd|ssd|<tag>]" can be used to change the volume disk type.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeMove) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id")
|
|
sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
|
|
targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
|
|
diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
|
|
ioBytePerSecond := volMoveCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
|
|
noLock := volMoveCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
|
|
|
|
if err = volMoveCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if *noLock {
|
|
commandEnv.noLock = true
|
|
} else {
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
|
|
|
|
volumeId := needle.VolumeId(*volumeIdInt)
|
|
|
|
if sourceVolumeServer == targetVolumeServer {
|
|
return fmt.Errorf("source and target volume servers are the same!")
|
|
}
|
|
|
|
return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, *ioBytePerSecond, false)
|
|
}
|
|
|
|
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
|
|
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) {
|
|
|
|
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
|
|
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond, false)
|
|
if err != nil {
|
|
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
|
|
}
|
|
|
|
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
|
|
if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
|
|
if skipTailError {
|
|
fmt.Fprintf(writer, "tail volume %d from %s to %s: %v\n", volumeId, sourceVolumeServer, targetVolumeServer, err)
|
|
} else {
|
|
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
|
|
}
|
|
}
|
|
|
|
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
|
|
if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer, false, true); err != nil {
|
|
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
|
|
}
|
|
|
|
log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
|
|
return nil
|
|
}
|
|
|
|
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64, restoreWritable bool) (lastAppendAtNs uint64, err error) {
|
|
|
|
// check to see if the volume is already read-only and if its not then we need
|
|
// to mark it as read-only and then before we return we need to undo what we
|
|
// did
|
|
var shouldMarkWritable bool
|
|
defer func() {
|
|
if !shouldMarkWritable {
|
|
return
|
|
}
|
|
if !restoreWritable && err == nil {
|
|
return
|
|
}
|
|
|
|
clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
|
|
VolumeId: uint32(volumeId),
|
|
})
|
|
return writableErr
|
|
})
|
|
if clientErr != nil {
|
|
log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr)
|
|
}
|
|
}()
|
|
|
|
err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: uint32(volumeId),
|
|
})
|
|
if statusErr == nil && !resp.IsReadOnly {
|
|
shouldMarkWritable = true
|
|
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Persist: false,
|
|
})
|
|
return readonlyErr
|
|
}
|
|
return statusErr
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
SourceDataNode: string(sourceVolumeServer),
|
|
DiskType: diskType,
|
|
IoBytePerSecond: ioBytePerSecond,
|
|
})
|
|
if replicateErr != nil {
|
|
return replicateErr
|
|
}
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
} else {
|
|
return recvErr
|
|
}
|
|
}
|
|
if resp.LastAppendAtNs != 0 {
|
|
lastAppendAtNs = resp.LastAppendAtNs
|
|
} else {
|
|
fmt.Fprintf(writer, "%s => %s volume %d processed %s\n", sourceVolumeServer, targetVolumeServer, volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
|
|
|
|
return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
|
|
VolumeId: uint32(volumeId),
|
|
SinceNs: lastAppendAtNs,
|
|
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
|
|
SourceVolumeServer: string(sourceVolumeServer),
|
|
})
|
|
return replicateErr
|
|
})
|
|
|
|
}
|
|
|
|
// deleteVolume removes the volume from sourceVolumeServer. When keepRemoteData
|
|
// is true, the cloud-tier object backing the volume is left intact — used on
|
|
// the source side of a move where another server is taking over the same .vif.
|
|
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, onlyEmpty bool, keepRemoteData bool) (err error) {
|
|
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
|
VolumeId: uint32(volumeId),
|
|
OnlyEmpty: onlyEmpty,
|
|
KeepRemoteData: keepRemoteData,
|
|
})
|
|
return deleteErr
|
|
})
|
|
}
|
|
|
|
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
|
|
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
if writable {
|
|
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
|
|
VolumeId: uint32(volumeId),
|
|
})
|
|
} else {
|
|
_, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Persist: persist,
|
|
})
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
|
|
func markVolumeReplicaWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, location wdclient.Location, writable, persist bool) error {
|
|
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
|
|
return markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist)
|
|
}
|
|
|
|
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
|
|
for _, location := range locations {
|
|
if err := markVolumeReplicaWritable(grpcDialOption, volumeId, location, writable, persist); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// replicateVolumeToServer copies a volume from sourceAddress to targetAddress via the VolumeCopy gRPC stream.
|
|
func replicateVolumeToServer(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceAddress, targetAddress pb.ServerAddress, diskType string) error {
|
|
return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
SourceDataNode: string(sourceAddress),
|
|
DiskType: diskType,
|
|
})
|
|
if replicateErr != nil {
|
|
return replicateErr
|
|
}
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
}
|
|
return recvErr
|
|
}
|
|
if resp.ProcessedBytes > 0 {
|
|
fmt.Fprintf(writer, "volume %d processed %s bytes\n", volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// configureVolumeReplication sets the replication setting on a volume at the given server.
|
|
func configureVolumeReplication(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetAddress pb.ServerAddress, replicationString string) error {
|
|
return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Replication: replicationString,
|
|
})
|
|
if configureErr != nil {
|
|
return configureErr
|
|
}
|
|
if resp.Error != "" {
|
|
return errors.New(resp.Error)
|
|
}
|
|
return nil
|
|
})
|
|
}
|