Files
seaweedfs/weed/shell/command_mq_topic_configure.go
Parviz Miriyev 2212cc8a5f shell: expose retention flags on mq.topic.configure (#9416)
* shell: expose retention flags on mq.topic.configure

The ConfigureTopicRequest proto already carries a TopicRetention message
({retention_seconds, enabled}), and GetTopicConfigurationResponse / the
admin UI both surface it — but the `mq.topic.configure` shell command
sent Retention: nil unconditionally, leaving CLI and IaC users with no
way to set retention without going through the admin UI. Add three
flags so the shell matches the existing surface:

  -retention <duration>     Go duration string (e.g. 168h, 30m)
  -retentionSeconds <int>   raw seconds (mutually exclusive with -retention)
  -retentionEnabled         bool toggle for retention enforcement

Behavior:
- If none of the retention flags are set, the request omits Retention
  (`nil`) — preserving the prior "leave server-side state alone"
  semantics for callers that only care about partition count.
- Setting any retention flag populates TopicRetention with both fields
  so existing operators keep working when only one is provided.
- -retention and -retentionSeconds together is a hard error (ambiguous);
  negative values are rejected.

The new behavior is detected via flag.FlagSet.Visit so default values
of 0 / false are distinguishable from "not provided".

Help text updated with examples for both setting a TTL and disabling
retention on an existing topic. No proto / server changes needed; this
is a CLI-only patch.

* broker, shell: address review for mq.topic.configure retention

Three follow-up fixes for the review comments on the original commit:

1. Server-side: detect retention changes in the early-return path.
   Previously ConfigureTopic returned without persisting when
   partitionCount + schema were unchanged, even if the request supplied
   a different Retention. Now the early-return branch checks both
   schema and retention, persists either or both when they differ, and
   logs which fields actually changed.

2. Server-side: preserve existing retention when request.Retention is
   nil in the fresh-allocation path. Capture the prior resp.Retention
   before overwriting `resp = &mq_pb.ConfigureTopicResponse{}`, and
   only overwrite the new resp.Retention with `request.Retention` when
   the request actually supplies one. Otherwise carry the previous
   retention forward, so partition-count changes (or any path that
   bypasses the early-return branch) don't accidentally clear retention.

3. CLI: switch the `-retention` / `-retentionSeconds` mutual-exclusion
   check from value-based (`!= 0`) to flag.FlagSet.Visit-based, so an
   explicit `-retention=0 -retentionSeconds=N` is also rejected.

4. CLI: when any retention flag is set, fetch the current
   GetTopicConfiguration and fill in the fields the user didn't
   explicitly provide. This means `-retentionEnabled` alone no longer
   zeros the existing duration, and `-retention 24h` alone preserves
   the existing enabled flag. When the topic doesn't exist yet, the
   GetTopicConfiguration error is treated as "no current state" and
   we proceed with just the user-supplied values.

Build / vet / gofmt / `go test ./weed/shell/... ./weed/mq/broker/...`
clean.
2026-05-12 12:37:09 -07:00

165 lines
5.2 KiB
Go

package shell
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func init() {
Commands = append(Commands, &commandMqTopicConfigure{})
}
type commandMqTopicConfigure struct {
}
func (c *commandMqTopicConfigure) Name() string {
return "mq.topic.configure"
}
func (c *commandMqTopicConfigure) Help() string {
return `configure a topic with a given name
Example:
mq.topic.configure -namespace <namespace> -topic <topic_name> -partitionCount <partition_count>
Retention (delete messages older than the configured duration):
mq.topic.configure -namespace <namespace> -topic <topic_name> \
-retention 168h -retentionEnabled
# disable retention on an existing topic
mq.topic.configure -namespace <namespace> -topic <topic_name> \
-retentionEnabled=false
-retention accepts any Go duration string ("24h", "168h", "30m"). Use
-retentionSeconds for raw seconds when scripting. Specifying both is an
error.
When you set only some retention flags (for example, -retentionEnabled
without -retention), the unspecified field is read from the current
server-side configuration so it isn't accidentally zeroed. Omitting all
retention flags leaves the existing retention configuration alone.
`
}
func (c *commandMqTopicConfigure) HasTag(CommandTag) bool {
return false
}
func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// parse parameters
mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
namespace := mqCommand.String("namespace", "", "namespace name")
topicName := mqCommand.String("topic", "", "topic name")
partitionCount := mqCommand.Int("partitionCount", 6, "partition count")
retention := mqCommand.Duration("retention", 0, "retention duration (Go duration string, e.g. 168h). Mutually exclusive with -retentionSeconds.")
retentionSeconds := mqCommand.Int64("retentionSeconds", 0, "retention duration in seconds. Mutually exclusive with -retention.")
retentionEnabled := mqCommand.Bool("retentionEnabled", false, "enable retention enforcement on the topic")
if err := mqCommand.Parse(args); err != nil {
return err
}
// Detect which retention flags the user actually provided. Using Visit
// (rather than value comparison) means an explicit `-retention=0` is
// treated as "user provided" and still triggers the mutual-exclusion
// check or partial-merge with current state.
var userSetRetention, userSetSeconds, userSetEnabled bool
mqCommand.Visit(func(f *flag.Flag) {
switch f.Name {
case "retention":
userSetRetention = true
case "retentionSeconds":
userSetSeconds = true
case "retentionEnabled":
userSetEnabled = true
}
})
if userSetRetention && userSetSeconds {
return fmt.Errorf("-retention and -retentionSeconds are mutually exclusive")
}
if *retention < 0 || *retentionSeconds < 0 {
return fmt.Errorf("retention duration must be >= 0")
}
retentionTouched := userSetRetention || userSetSeconds || userSetEnabled
// find the broker balancer
brokerBalancer, err := findBrokerBalancer(commandEnv)
if err != nil {
return err
}
fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
// Build the retention proto. When the user touches any retention flag we
// must send a fully-populated TopicRetention so partial flags don't zero
// the other field server-side: fetch the current configuration and use
// its values for whatever the user didn't specify.
var retentionProto *mq_pb.TopicRetention
if retentionTouched {
var currentRetention *mq_pb.TopicRetention
if err := pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
cur, getErr := client.GetTopicConfiguration(context.Background(), &mq_pb.GetTopicConfigurationRequest{
Topic: &schema_pb.Topic{Namespace: *namespace, Name: *topicName},
})
if getErr != nil {
// Topic may not exist yet — that's fine, we'll create it with
// the user-supplied retention only.
return nil
}
if cur != nil {
currentRetention = cur.Retention
}
return nil
}); err != nil {
return err
}
var seconds int64
var enabled bool
if currentRetention != nil {
seconds = currentRetention.RetentionSeconds
enabled = currentRetention.Enabled
}
if userSetRetention {
seconds = int64((*retention) / time.Second)
} else if userSetSeconds {
seconds = *retentionSeconds
}
if userSetEnabled {
enabled = *retentionEnabled
}
retentionProto = &mq_pb.TopicRetention{
RetentionSeconds: seconds,
Enabled: enabled,
}
}
// create / update topic
return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
resp, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: &schema_pb.Topic{
Namespace: *namespace,
Name: *topicName,
},
PartitionCount: int32(*partitionCount),
Retention: retentionProto,
})
if err != nil {
return err
}
output, _ := json.MarshalIndent(resp, "", " ")
fmt.Fprintf(writer, "response:\n%+v\n", string(output))
return nil
})
}