feat: add collection.mark shell command

Add collection.mark to mark all existing normal volume replicas in a collection as readonly or writable. The command runs in preview mode by default and requires -apply to execute changes. It reuses existing volume mark RPCs, supports default collection aliases, skips EC shards, and adds unit tests for option parsing and target collection logic.
This commit is contained in:
lianyu.sun
2026-05-20 20:32:52 +08:00
parent 868849392c
commit 50c2bbf94c
2 changed files with 314 additions and 0 deletions
+166
View File
@@ -0,0 +1,166 @@
package shell
import (
"flag"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func init() {
Commands = append(Commands, &commandCollectionMark{})
}
type commandCollectionMark struct {
}
func (c *commandCollectionMark) Name() string {
return "collection.mark"
}
func (c *commandCollectionMark) Help() string {
return `Mark all existing normal volume replicas in one collection writable or readonly
collection.mark -collection <collection_name> -readonly -apply
collection.mark -collection <collection_name> -writable -apply
Use '_default' or '_default_' for the empty-named collection.
Without -apply, this command only prints the volume replicas that would be marked.
`
}
func (c *commandCollectionMark) HasTag(CommandTag) bool {
return false
}
type collectionMarkOptions struct {
collection string
markWritable bool
apply bool
}
type collectionMarkTarget struct {
volumeId needle.VolumeId
collection string
server pb.ServerAddress
readOnly bool
}
func (c *commandCollectionMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
opts, err := parseCollectionMarkOptions(args)
if err != nil {
return err
}
infoAboutSimulationMode(writer, opts.apply, "-apply")
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
targets, skippedEC := collectCollectionMarkTargets(topologyInfo, opts.collection)
action := "readonly"
if opts.markWritable {
action = "writable"
}
for _, target := range targets {
fmt.Fprintf(writer, "volume %d collection %q on %s currentReadonly:%t will be marked %s\n",
target.volumeId, target.collection, target.server, target.readOnly, action)
}
if skippedEC > 0 {
fmt.Fprintf(writer, "Skipped %d EC shard volume(s); collection.mark only marks normal volumes.\n", skippedEC)
}
if len(targets) == 0 {
fmt.Fprintf(writer, "No normal volume replicas found for collection %q.\n", opts.collection)
return nil
}
if !opts.apply {
fmt.Fprintf(writer, "Found %d normal volume replica(s) for collection %q. Use -apply to mark them %s.\n",
len(targets), opts.collection, action)
return nil
}
for _, target := range targets {
if err := markVolumeWritable(commandEnv.option.GrpcDialOption, target.volumeId, target.server, opts.markWritable, true); err != nil {
return fmt.Errorf("mark volume %d on %s as %s: %w", target.volumeId, target.server, action, err)
}
}
fmt.Fprintf(writer, "Marked %d normal volume replica(s) in collection %q as %s.\n", len(targets), opts.collection, action)
return nil
}
func parseCollectionMarkOptions(args []string) (*collectionMarkOptions, error) {
collectionMarkCommand := flag.NewFlagSet("collection.mark", flag.ContinueOnError)
collectionName := collectionMarkCommand.String("collection", "", "collection to mark. Use '_default' or '_default_' for the empty-named collection.")
writable := collectionMarkCommand.Bool("writable", false, "collection mark writable")
readonly := collectionMarkCommand.Bool("readonly", false, "collection mark readonly")
apply := collectionMarkCommand.Bool("apply", false, "apply the collection mark")
if err := collectionMarkCommand.Parse(args); err != nil {
return nil, err
}
if *collectionName == "" {
return nil, fmt.Errorf("empty collection name is not allowed")
}
if (*writable && *readonly) || (!*writable && !*readonly) {
return nil, fmt.Errorf("use -readonly or -writable")
}
return &collectionMarkOptions{
collection: normalizeCollectionMarkName(*collectionName),
markWritable: *writable,
apply: *apply,
}, nil
}
func normalizeCollectionMarkName(collectionName string) string {
if collectionName == CollectionDefault || collectionName == "_default_" {
return ""
}
return collectionName
}
func collectCollectionMarkTargets(topo *master_pb.TopologyInfo, collection string) (targets []collectionMarkTarget, skippedEC int) {
if topo == nil {
return nil, 0
}
for _, dc := range topo.GetDataCenterInfos() {
for _, rack := range dc.GetRackInfos() {
for _, dn := range rack.GetDataNodeInfos() {
server := pb.NewServerAddressFromDataNode(dn)
for _, diskType := range sortMapKey(dn.GetDiskInfos()) {
diskInfo := dn.GetDiskInfos()[diskType]
for _, vi := range diskInfo.GetVolumeInfos() {
if vi.GetCollection() != collection {
continue
}
targets = append(targets, collectionMarkTarget{
volumeId: needle.VolumeId(vi.GetId()),
collection: vi.GetCollection(),
server: server,
readOnly: vi.GetReadOnly(),
})
}
for _, ecShardInfo := range diskInfo.GetEcShardInfos() {
if ecShardInfo.GetCollection() == collection {
skippedEC++
}
}
}
}
}
}
return targets, skippedEC
}
+148
View File
@@ -0,0 +1,148 @@
package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
func TestCollectionMarkNormalizeCollectionName(t *testing.T) {
tests := []struct {
name string
in string
want string
}{
{name: "named collection", in: "photos", want: "photos"},
{name: "default keyword", in: "_default", want: ""},
{name: "delete command default keyword", in: "_default_", want: ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := normalizeCollectionMarkName(tt.in)
if got != tt.want {
t.Fatalf("normalizeCollectionMarkName(%q) = %q, want %q", tt.in, got, tt.want)
}
})
}
}
func TestCollectionMarkParseOptions(t *testing.T) {
opts, err := parseCollectionMarkOptions([]string{"-collection", "photos", "-readonly"})
if err != nil {
t.Fatalf("parse readonly options: %v", err)
}
if opts.collection != "photos" {
t.Fatalf("collection = %q, want photos", opts.collection)
}
if opts.markWritable {
t.Fatalf("markWritable = true, want false for -readonly")
}
if opts.apply {
t.Fatalf("apply = true, want false without -apply")
}
opts, err = parseCollectionMarkOptions([]string{"-collection", "_default_", "-writable", "-apply"})
if err != nil {
t.Fatalf("parse writable options: %v", err)
}
if opts.collection != "" {
t.Fatalf("collection = %q, want empty default collection", opts.collection)
}
if !opts.markWritable {
t.Fatalf("markWritable = false, want true for -writable")
}
if !opts.apply {
t.Fatalf("apply = false, want true with -apply")
}
}
func TestCollectionMarkParseOptionsRequiresCollectionAndSingleMode(t *testing.T) {
tests := []struct {
name string
args []string
}{
{name: "missing collection", args: []string{"-readonly"}},
{name: "missing mode", args: []string{"-collection", "photos"}},
{name: "conflicting modes", args: []string{"-collection", "photos", "-readonly", "-writable"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if _, err := parseCollectionMarkOptions(tt.args); err == nil {
t.Fatalf("parseCollectionMarkOptions(%v) returned nil error", tt.args)
}
})
}
}
func TestCollectionMarkCollectTargets(t *testing.T) {
topo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
RackInfos: []*master_pb.RackInfo{
{
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "node-a",
Address: "10.0.0.1:8080",
GrpcPort: 9080,
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
VolumeInfos: []*master_pb.VolumeInformationMessage{
{Id: 3, Collection: "photos", ReadOnly: false},
{Id: 4, Collection: "other", ReadOnly: false},
},
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 9, Collection: "photos"},
},
},
},
},
{
Id: "node-b",
Address: "10.0.0.2:8080",
GrpcPort: 9081,
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
VolumeInfos: []*master_pb.VolumeInformationMessage{
{Id: 3, Collection: "photos", ReadOnly: true},
},
},
},
},
},
},
},
},
},
}
targets, skippedEC := collectCollectionMarkTargets(topo, "photos")
if skippedEC != 1 {
t.Fatalf("skippedEC = %d, want 1", skippedEC)
}
if len(targets) != 2 {
t.Fatalf("len(targets) = %d, want 2: %#v", len(targets), targets)
}
if got := uint32(targets[0].volumeId); got != 3 {
t.Fatalf("target[0].volumeId = %d, want 3", got)
}
if got := string(targets[0].server); got != "10.0.0.1:8080.9080" {
t.Fatalf("target[0].server = %q, want 10.0.0.1:8080.9080", got)
}
if targets[0].readOnly {
t.Fatalf("target[0].readOnly = true, want false")
}
if got := uint32(targets[1].volumeId); got != 3 {
t.Fatalf("target[1].volumeId = %d, want 3", got)
}
if got := string(targets[1].server); got != "10.0.0.2:8080.9081" {
t.Fatalf("target[1].server = %q, want 10.0.0.2:8080.9081", got)
}
if !targets[1].readOnly {
t.Fatalf("target[1].readOnly = false, want true")
}
}