mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-18 07:41:31 +00:00
ec: add -sourceDiskType to ec.encode and -diskType to ec.decode
ec.encode: - Add -sourceDiskType flag to filter source volumes by disk type - This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards) - -diskType specifies target disk type for EC shards ec.decode: - Add -diskType flag to specify source disk type where EC shards are stored - Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType Examples: # Encode SSD volumes to HDD EC shards (tier migration) ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd # Decode EC shards from SSD ec.decode -collection=mybucket -diskType=ssd Integration tests updated to cover new flags.
This commit is contained in:
@@ -1250,11 +1250,99 @@ func TestECDiskTypeSupport(t *testing.T) {
|
||||
// This ensures the command accepts the -diskType flag without errors
|
||||
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
|
||||
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
|
||||
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
|
||||
|
||||
// Test help output contains diskType
|
||||
assert.NotNil(t, ecEncodeCmd, "ec.encode command should exist")
|
||||
assert.NotNil(t, ecBalanceCmd, "ec.balance command should exist")
|
||||
t.Log("Both ec.encode and ec.balance commands support -diskType flag")
|
||||
assert.NotNil(t, ecDecodeCmd, "ec.decode command should exist")
|
||||
t.Log("ec.encode, ec.balance, and ec.decode commands all support -diskType flag")
|
||||
})
|
||||
|
||||
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
|
||||
// Test that -sourceDiskType flag is accepted
|
||||
lockCmd := shell.Commands[findCommandIndex("lock")]
|
||||
var lockOutput bytes.Buffer
|
||||
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
|
||||
if err != nil {
|
||||
t.Logf("Lock command failed: %v", err)
|
||||
}
|
||||
|
||||
// Execute EC encoding with sourceDiskType filter
|
||||
var output bytes.Buffer
|
||||
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
|
||||
args := []string{
|
||||
"-collection", "ssd_test",
|
||||
"-sourceDiskType", "ssd", // Filter source volumes by SSD
|
||||
"-diskType", "ssd", // Place EC shards on SSD
|
||||
"-force",
|
||||
}
|
||||
|
||||
// Capture output
|
||||
oldStdout := os.Stdout
|
||||
oldStderr := os.Stderr
|
||||
r, w, _ := os.Pipe()
|
||||
os.Stdout = w
|
||||
os.Stderr = w
|
||||
|
||||
encodeErr := ecEncodeCmd.Do(args, commandEnv, &output)
|
||||
|
||||
w.Close()
|
||||
os.Stdout = oldStdout
|
||||
os.Stderr = oldStderr
|
||||
capturedOutput, _ := io.ReadAll(r)
|
||||
|
||||
t.Logf("EC encode with sourceDiskType output: %s", string(capturedOutput))
|
||||
// The command should accept the flag even if no volumes match
|
||||
if encodeErr != nil {
|
||||
t.Logf("EC encoding with sourceDiskType: %v (expected if no matching volumes)", encodeErr)
|
||||
}
|
||||
|
||||
unlockCmd := shell.Commands[findCommandIndex("unlock")]
|
||||
var unlockOutput bytes.Buffer
|
||||
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
|
||||
})
|
||||
|
||||
t.Run("ec_decode_with_disktype", func(t *testing.T) {
|
||||
// Test that ec.decode accepts -diskType flag
|
||||
lockCmd := shell.Commands[findCommandIndex("lock")]
|
||||
var lockOutput bytes.Buffer
|
||||
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
|
||||
if err != nil {
|
||||
t.Logf("Lock command failed: %v", err)
|
||||
}
|
||||
|
||||
// Execute EC decode with disk type
|
||||
var output bytes.Buffer
|
||||
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
|
||||
args := []string{
|
||||
"-collection", "ssd_test",
|
||||
"-diskType", "ssd", // Source EC shards are on SSD
|
||||
}
|
||||
|
||||
// Capture output
|
||||
oldStdout := os.Stdout
|
||||
oldStderr := os.Stderr
|
||||
r, w, _ := os.Pipe()
|
||||
os.Stdout = w
|
||||
os.Stderr = w
|
||||
|
||||
decodeErr := ecDecodeCmd.Do(args, commandEnv, &output)
|
||||
|
||||
w.Close()
|
||||
os.Stdout = oldStdout
|
||||
os.Stderr = oldStderr
|
||||
capturedOutput, _ := io.ReadAll(r)
|
||||
|
||||
t.Logf("EC decode with diskType output: %s", string(capturedOutput))
|
||||
// The command should accept the flag
|
||||
if decodeErr != nil {
|
||||
t.Logf("EC decode with diskType: %v (expected if no EC volumes)", decodeErr)
|
||||
}
|
||||
|
||||
unlockCmd := shell.Commands[findCommandIndex("unlock")]
|
||||
var unlockOutput bytes.Buffer
|
||||
unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -32,13 +32,23 @@ func (c *commandEcDecode) Name() string {
|
||||
func (c *commandEcDecode) Help() string {
|
||||
return `decode a erasure coded volume into a normal volume
|
||||
|
||||
ec.decode [-collection=""] [-volumeId=<volume_id>]
|
||||
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>]
|
||||
|
||||
The -collection parameter supports regular expressions for pattern matching:
|
||||
- Use exact match: ec.decode -collection="^mybucket$"
|
||||
- Match multiple buckets: ec.decode -collection="bucket.*"
|
||||
- Match all collections: ec.decode -collection=".*"
|
||||
|
||||
Options:
|
||||
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
|
||||
|
||||
Examples:
|
||||
# Decode EC shards from HDD (default)
|
||||
ec.decode -collection=mybucket
|
||||
|
||||
# Decode EC shards from SSD
|
||||
ec.decode -collection=mybucket -diskType=ssd
|
||||
|
||||
`
|
||||
}
|
||||
|
||||
@@ -50,6 +60,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
|
||||
collection := decodeCommand.String("collection", "", "the collection name")
|
||||
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
|
||||
if err = decodeCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -59,6 +70,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
}
|
||||
|
||||
vid := needle.VolumeId(*volumeId)
|
||||
diskType := types.ToDiskType(*diskTypeStr)
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
||||
@@ -68,17 +80,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
|
||||
// volumeId is provided
|
||||
if vid != 0 {
|
||||
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
|
||||
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType)
|
||||
}
|
||||
|
||||
// apply to all volumes in the collection
|
||||
volumeIds, err := collectEcShardIds(topologyInfo, *collection)
|
||||
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("ec decode volumes: %v\n", volumeIds)
|
||||
for _, vid := range volumeIds {
|
||||
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
|
||||
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -86,14 +98,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
return nil
|
||||
}
|
||||
|
||||
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
|
||||
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) {
|
||||
|
||||
if !commandEnv.isLocked() {
|
||||
return fmt.Errorf("lock is lost")
|
||||
}
|
||||
|
||||
// find volume location
|
||||
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
|
||||
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType)
|
||||
|
||||
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
|
||||
|
||||
@@ -248,7 +260,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
|
||||
return resp.VolumeIdLocations, nil
|
||||
}
|
||||
|
||||
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) {
|
||||
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
|
||||
// compile regex pattern for collection matching
|
||||
collectionRegex, err := compileCollectionPattern(collectionPattern)
|
||||
if err != nil {
|
||||
@@ -257,7 +269,7 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
|
||||
|
||||
vidMap := make(map[uint32]bool)
|
||||
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if collectionRegex.MatchString(v.Collection) {
|
||||
vidMap[v.Id] = true
|
||||
@@ -273,11 +285,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
|
||||
return
|
||||
}
|
||||
|
||||
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
|
||||
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits {
|
||||
|
||||
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
|
||||
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
|
||||
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
||||
for _, v := range diskInfo.EcShardInfos {
|
||||
if v.Id == uint32(vid) {
|
||||
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
|
||||
|
||||
@@ -37,7 +37,7 @@ func (c *commandEcEncode) Name() string {
|
||||
func (c *commandEcEncode) Help() string {
|
||||
return `apply erasure coding to a volume
|
||||
|
||||
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-diskType=<disk_type>]
|
||||
ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose] [-sourceDiskType=<disk_type>] [-diskType=<disk_type>]
|
||||
ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose] [-diskType=<disk_type>]
|
||||
|
||||
This command will:
|
||||
@@ -61,7 +61,18 @@ func (c *commandEcEncode) Help() string {
|
||||
|
||||
Options:
|
||||
-verbose: show detailed reasons why volumes are not selected for encoding
|
||||
-diskType: the disk type for EC shards (hdd, ssd, or empty for default hdd)
|
||||
-sourceDiskType: filter source volumes by disk type (hdd, ssd, or empty for all)
|
||||
-diskType: target disk type for EC shards (hdd, ssd, or empty for default hdd)
|
||||
|
||||
Examples:
|
||||
# Encode SSD volumes to SSD EC shards (same tier)
|
||||
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=ssd
|
||||
|
||||
# Encode SSD volumes to HDD EC shards (tier migration to cheaper storage)
|
||||
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd
|
||||
|
||||
# Encode all volumes to SSD EC shards
|
||||
ec.encode -collection=mybucket -diskType=ssd
|
||||
|
||||
Re-balancing algorithm:
|
||||
` + ecBalanceAlgorithmDescription
|
||||
@@ -81,7 +92,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
||||
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
|
||||
shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
|
||||
diskTypeStr := encodeCommand.String("diskType", "", "the disk type for EC shards (hdd, ssd, or empty for default hdd)")
|
||||
sourceDiskTypeStr := encodeCommand.String("sourceDiskType", "", "filter source volumes by disk type (hdd, ssd, or empty for all)")
|
||||
diskTypeStr := encodeCommand.String("diskType", "", "target disk type for EC shards (hdd, ssd, or empty for default hdd)")
|
||||
applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
|
||||
verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
|
||||
|
||||
@@ -96,6 +108,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
return err
|
||||
}
|
||||
|
||||
// Parse source disk type filter (optional)
|
||||
var sourceDiskType *types.DiskType
|
||||
if *sourceDiskTypeStr != "" {
|
||||
sdt := types.ToDiskType(*sourceDiskTypeStr)
|
||||
sourceDiskType = &sdt
|
||||
}
|
||||
|
||||
// Parse target disk type for EC shards
|
||||
diskType := types.ToDiskType(*diskTypeStr)
|
||||
|
||||
// collect topology information
|
||||
@@ -123,7 +143,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
|
||||
} else {
|
||||
// apply to all volumes for the given collection pattern (regex)
|
||||
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
|
||||
volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, sourceDiskType, *fullPercentage, *quietPeriod, *verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user