mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
* fix(volume): don't fatal on missing .idx for remote-tiered volume A .vif left behind without its .idx (orphaned by a crashed move, partial copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the whole volume server down on boot, killing every healthy volume on it too. For remote-tiered volumes treat it as a per-volume load error so the server can come up and the operator can clean up the stray .vif. Refs #9331. * fix(balance): skip remote-tiered volumes in admin balance detection The admin/worker balance detector had no equivalent of the shell-side guard ("does not move volume in remote storage" in command_volume_balance.go), so it scheduled moves on remote-tiered volumes. The "move" copies .idx/.vif to the destination and then calls Volume.Destroy on the source, which calls backendStorage.DeleteFile — deleting the remote object the destination's new .vif now points at. Populate HasRemoteCopy on the metrics emitted by both the admin maintenance scanner and the worker's master poll, then drop those volumes at the top of Detection. Fixes #9331. * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix(volume): keep remote data on volume-move-driven delete The on-source delete after a volume move (admin/worker balance and shell volume.move) ran Volume.Destroy with no way to opt out of the remote-object cleanup. Volume.Destroy unconditionally calls backendStorage.DeleteFile for remote-tiered volumes, so a successful move would copy .idx/.vif to the destination and then nuke the cloud object the destination's new .vif was already pointing at. Add VolumeDeleteRequest.keep_remote_data and plumb it through Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The balance task and shell volume.move set it to true; the post-tier-upload cleanup of other replicas and the over-replication trim in volume.fix.replication also set it to true since the remote object is still referenced. Other real-delete callers keep the default. The delete-before-receive path in VolumeCopy also sets it: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. Refs #9331. * test(storage): in-process remote-tier integration tests Cover the four operations the user is most likely to run against a cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by registering a local-disk-backed BackendStorage as the "remote" tier and exercising the real Volume / DiskLocation / EC encoder code paths. Locks in: - Destroy(keepRemoteData=true) preserves the remote object (move case) - Destroy(keepRemoteData=false) deletes it (real-delete case) - Vacuum/compact on a remote-tier volume never deletes the remote object - EC encode requires the local .dat (callers must download first) - EC encode + rebuild round-trips after a tier-down Tests run in-process and finish in under a second total — no cluster, binary, or external storage required. * fix(rust-volume): keep remote data on volume-move-driven delete Mirror the Go fix in seaweed-volume: plumb keep_remote_data through grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume → Volume.destroy, and skip the s3-tier delete_file call when the flag is set. The pre-receive cleanup in volume_copy passes true for the same reason as the Go side: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. The Rust loader already warns rather than fataling on a stray .vif without an .idx (volume.rs load_index_inmemory / load_index_redb), so no counterpart to the Go fatal-on-missing-idx fix is needed. Refs #9331. * fix(volume): preserve remote tier on IO-error eviction; fix EC test target Two review nits: - Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored volumes with keepRemoteData=false, so a transient local fault on a remote-tiered volume would also nuke the cloud object. Track the delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile() for IO-error evictions; TTL-expired evictions still pass false. - TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but called them "parity" — by the klauspost/reedsolomon convention shards 0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1 are parity. Switch the loop to delete the parity range so the intent matches the indices. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
223 lines
7.2 KiB
Go
223 lines
7.2 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeTierUpload{})
|
|
}
|
|
|
|
type commandVolumeTierUpload struct {
|
|
}
|
|
|
|
func (c *commandVolumeTierUpload) Name() string {
|
|
return "volume.tier.upload"
|
|
}
|
|
|
|
func (c *commandVolumeTierUpload) Help() string {
|
|
return `upload the dat file of a volume to a remote tier
|
|
|
|
volume.tier.upload [-collection=""] [-fullPercent=95] [-quietFor=1h]
|
|
volume.tier.upload [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile]
|
|
|
|
e.g.:
|
|
volume.tier.upload -volumeId=7 -dest=s3
|
|
volume.tier.upload -volumeId=7 -dest=s3.default
|
|
|
|
The <storage_backend> is defined in master.toml.
|
|
For example, "s3.default" in [storage.backend.s3.default]
|
|
|
|
This command will move the dat file of a volume to a remote tier.
|
|
|
|
SeaweedFS enables scalable and fast local access to lots of files,
|
|
and the cloud storage is slower by cost efficient. How to combine them together?
|
|
|
|
Usually the data follows 80/20 rule: only 20% of data is frequently accessed.
|
|
We can offload the old volumes to the cloud.
|
|
|
|
With this, SeaweedFS can be both fast and scalable, and infinite storage space.
|
|
Just add more local SeaweedFS volume servers to increase the throughput.
|
|
|
|
The index file is still local, and the same O(1) disk read is applied to the remote file.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeTierUpload) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
|
|
collection := tierCommand.String("collection", "", "the collection name")
|
|
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
|
|
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
|
|
dest := tierCommand.String("dest", "", "the target tier name")
|
|
keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file")
|
|
disk := tierCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
|
|
if err = tierCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
|
|
vid := needle.VolumeId(*volumeId)
|
|
|
|
// volumeId is provided
|
|
if vid != 0 {
|
|
return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
|
|
}
|
|
|
|
var diskType *types.DiskType
|
|
if disk != nil {
|
|
_diskType := types.ToDiskType(*disk)
|
|
diskType = &_diskType
|
|
}
|
|
|
|
// apply to all volumes in the collection
|
|
// reusing collectVolumeIdsForEcEncode for now
|
|
volumeIds, _, err := collectVolumeIdsForEcEncode(commandEnv, *collection, diskType, *fullPercentage, *quietPeriod, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("tier upload volumes: %v\n", volumeIds)
|
|
for _, vid := range volumeIds {
|
|
if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
|
|
// find volume location
|
|
topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("collect topology info: %v", err)
|
|
}
|
|
|
|
var existingLocations []wdclient.Location
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
for _, disk := range dn.DiskInfos {
|
|
for _, vi := range disk.VolumeInfos {
|
|
if needle.VolumeId(vi.Id) == vid && (collection == "" || vi.Collection == collection) {
|
|
fmt.Printf("find volume %d from Url:%s, GrpcPort:%d, DC:%s\n", vid, dn.Id, dn.GrpcPort, string(dc))
|
|
existingLocations = append(existingLocations, wdclient.Location{
|
|
Url: dn.Id,
|
|
PublicUrl: dn.Id,
|
|
GrpcPort: int(dn.GrpcPort),
|
|
DataCenter: string(dc),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
if len(existingLocations) == 0 {
|
|
if collection == "" {
|
|
return fmt.Errorf("volume %d not found", vid)
|
|
}
|
|
return fmt.Errorf("volume %d not found in collection %s", vid, collection)
|
|
}
|
|
|
|
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
|
|
if err != nil {
|
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
|
|
}
|
|
|
|
// copy the .dat file to remote tier
|
|
err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, vid, collection, existingLocations[0].ServerAddress(), dest, keepLocalDatFile)
|
|
if err != nil {
|
|
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, existingLocations[0].Url, dest, err)
|
|
}
|
|
|
|
if keepLocalDatFile {
|
|
return nil
|
|
}
|
|
// now the first replica has the .idx and .vif files.
|
|
// ask replicas on other volume server to delete its own local copy
|
|
for i, location := range existingLocations {
|
|
if i == 0 {
|
|
continue
|
|
}
|
|
fmt.Printf("delete volume %d from Url:%s\n", vid, location.Url)
|
|
// Other replicas were never tier-uploaded; their .vif (if any) points
|
|
// to the same cloud key just written for this volume. Keep the remote
|
|
// object so the tier-uploaded replica still references valid data.
|
|
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false, true)
|
|
if err != nil {
|
|
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
|
|
|
|
err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
DestinationBackendName: dest,
|
|
KeepLocalDatFile: keepLocalDatFile,
|
|
})
|
|
|
|
if stream == nil {
|
|
if copyErr == nil {
|
|
// when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error
|
|
// so we should directly return in this caseAdd commentMore actions
|
|
fmt.Fprintf(writer, "volume %v already uploaded", volumeId)
|
|
return nil
|
|
} else {
|
|
return copyErr
|
|
}
|
|
}
|
|
var lastProcessed int64
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr != nil {
|
|
if recvErr == io.EOF {
|
|
break
|
|
} else {
|
|
return recvErr
|
|
}
|
|
}
|
|
|
|
processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
|
|
|
|
fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
|
|
|
|
lastProcessed = resp.Processed
|
|
}
|
|
|
|
return copyErr
|
|
})
|
|
|
|
return err
|
|
|
|
}
|