mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-19 00:01:31 +00:00
* shell: expose retention flags on mq.topic.configure
The ConfigureTopicRequest proto already carries a TopicRetention message
({retention_seconds, enabled}), and GetTopicConfigurationResponse / the
admin UI both surface it — but the `mq.topic.configure` shell command
sent Retention: nil unconditionally, leaving CLI and IaC users with no
way to set retention without going through the admin UI. Add three
flags so the shell matches the existing surface:
-retention <duration> Go duration string (e.g. 168h, 30m)
-retentionSeconds <int> raw seconds (mutually exclusive with -retention)
-retentionEnabled bool toggle for retention enforcement
Behavior:
- If none of the retention flags are set, the request omits Retention
(`nil`) — preserving the prior "leave server-side state alone"
semantics for callers that only care about partition count.
- Setting any retention flag populates TopicRetention with both fields
so existing operators keep working when only one is provided.
- -retention and -retentionSeconds together is a hard error (ambiguous);
negative values are rejected.
The new behavior is detected via flag.FlagSet.Visit so default values
of 0 / false are distinguishable from "not provided".
Help text updated with examples for both setting a TTL and disabling
retention on an existing topic. No proto / server changes needed; this
is a CLI-only patch.
* broker, shell: address review for mq.topic.configure retention
Three follow-up fixes for the review comments on the original commit:
1. Server-side: detect retention changes in the early-return path.
Previously ConfigureTopic returned without persisting when
partitionCount + schema were unchanged, even if the request supplied
a different Retention. Now the early-return branch checks both
schema and retention, persists either or both when they differ, and
logs which fields actually changed.
2. Server-side: preserve existing retention when request.Retention is
nil in the fresh-allocation path. Capture the prior resp.Retention
before overwriting `resp = &mq_pb.ConfigureTopicResponse{}`, and
only overwrite the new resp.Retention with `request.Retention` when
the request actually supplies one. Otherwise carry the previous
retention forward, so partition-count changes (or any path that
bypasses the early-return branch) don't accidentally clear retention.
3. CLI: switch the `-retention` / `-retentionSeconds` mutual-exclusion
check from value-based (`!= 0`) to flag.FlagSet.Visit-based, so an
explicit `-retention=0 -retentionSeconds=N` is also rejected.
4. CLI: when any retention flag is set, fetch the current
GetTopicConfiguration and fill in the fields the user didn't
explicitly provide. This means `-retentionEnabled` alone no longer
zeros the existing duration, and `-retention 24h` alone preserves
the existing enabled flag. When the topic doesn't exist yet, the
GetTopicConfiguration error is treated as "no current state" and
we proceed with just the user-supplied values.
Build / vet / gofmt / `go test ./weed/shell/... ./weed/mq/broker/...`
clean.
165 lines
5.2 KiB
Go
165 lines
5.2 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandMqTopicConfigure{})
|
|
}
|
|
|
|
type commandMqTopicConfigure struct {
|
|
}
|
|
|
|
func (c *commandMqTopicConfigure) Name() string {
|
|
return "mq.topic.configure"
|
|
}
|
|
|
|
func (c *commandMqTopicConfigure) Help() string {
|
|
return `configure a topic with a given name
|
|
|
|
Example:
|
|
mq.topic.configure -namespace <namespace> -topic <topic_name> -partitionCount <partition_count>
|
|
|
|
Retention (delete messages older than the configured duration):
|
|
mq.topic.configure -namespace <namespace> -topic <topic_name> \
|
|
-retention 168h -retentionEnabled
|
|
|
|
# disable retention on an existing topic
|
|
mq.topic.configure -namespace <namespace> -topic <topic_name> \
|
|
-retentionEnabled=false
|
|
|
|
-retention accepts any Go duration string ("24h", "168h", "30m"). Use
|
|
-retentionSeconds for raw seconds when scripting. Specifying both is an
|
|
error.
|
|
|
|
When you set only some retention flags (for example, -retentionEnabled
|
|
without -retention), the unspecified field is read from the current
|
|
server-side configuration so it isn't accidentally zeroed. Omitting all
|
|
retention flags leaves the existing retention configuration alone.
|
|
`
|
|
}
|
|
|
|
func (c *commandMqTopicConfigure) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
|
|
|
|
// parse parameters
|
|
mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
namespace := mqCommand.String("namespace", "", "namespace name")
|
|
topicName := mqCommand.String("topic", "", "topic name")
|
|
partitionCount := mqCommand.Int("partitionCount", 6, "partition count")
|
|
retention := mqCommand.Duration("retention", 0, "retention duration (Go duration string, e.g. 168h). Mutually exclusive with -retentionSeconds.")
|
|
retentionSeconds := mqCommand.Int64("retentionSeconds", 0, "retention duration in seconds. Mutually exclusive with -retention.")
|
|
retentionEnabled := mqCommand.Bool("retentionEnabled", false, "enable retention enforcement on the topic")
|
|
if err := mqCommand.Parse(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Detect which retention flags the user actually provided. Using Visit
|
|
// (rather than value comparison) means an explicit `-retention=0` is
|
|
// treated as "user provided" and still triggers the mutual-exclusion
|
|
// check or partial-merge with current state.
|
|
var userSetRetention, userSetSeconds, userSetEnabled bool
|
|
mqCommand.Visit(func(f *flag.Flag) {
|
|
switch f.Name {
|
|
case "retention":
|
|
userSetRetention = true
|
|
case "retentionSeconds":
|
|
userSetSeconds = true
|
|
case "retentionEnabled":
|
|
userSetEnabled = true
|
|
}
|
|
})
|
|
|
|
if userSetRetention && userSetSeconds {
|
|
return fmt.Errorf("-retention and -retentionSeconds are mutually exclusive")
|
|
}
|
|
if *retention < 0 || *retentionSeconds < 0 {
|
|
return fmt.Errorf("retention duration must be >= 0")
|
|
}
|
|
|
|
retentionTouched := userSetRetention || userSetSeconds || userSetEnabled
|
|
|
|
// find the broker balancer
|
|
brokerBalancer, err := findBrokerBalancer(commandEnv)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
|
|
|
|
// Build the retention proto. When the user touches any retention flag we
|
|
// must send a fully-populated TopicRetention so partial flags don't zero
|
|
// the other field server-side: fetch the current configuration and use
|
|
// its values for whatever the user didn't specify.
|
|
var retentionProto *mq_pb.TopicRetention
|
|
if retentionTouched {
|
|
var currentRetention *mq_pb.TopicRetention
|
|
if err := pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
|
cur, getErr := client.GetTopicConfiguration(context.Background(), &mq_pb.GetTopicConfigurationRequest{
|
|
Topic: &schema_pb.Topic{Namespace: *namespace, Name: *topicName},
|
|
})
|
|
if getErr != nil {
|
|
// Topic may not exist yet — that's fine, we'll create it with
|
|
// the user-supplied retention only.
|
|
return nil
|
|
}
|
|
if cur != nil {
|
|
currentRetention = cur.Retention
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
var seconds int64
|
|
var enabled bool
|
|
if currentRetention != nil {
|
|
seconds = currentRetention.RetentionSeconds
|
|
enabled = currentRetention.Enabled
|
|
}
|
|
if userSetRetention {
|
|
seconds = int64((*retention) / time.Second)
|
|
} else if userSetSeconds {
|
|
seconds = *retentionSeconds
|
|
}
|
|
if userSetEnabled {
|
|
enabled = *retentionEnabled
|
|
}
|
|
retentionProto = &mq_pb.TopicRetention{
|
|
RetentionSeconds: seconds,
|
|
Enabled: enabled,
|
|
}
|
|
}
|
|
|
|
// create / update topic
|
|
return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
|
resp, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: *namespace,
|
|
Name: *topicName,
|
|
},
|
|
PartitionCount: int32(*partitionCount),
|
|
Retention: retentionProto,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
output, _ := json.MarshalIndent(resp, "", " ")
|
|
fmt.Fprintf(writer, "response:\n%+v\n", string(output))
|
|
return nil
|
|
})
|
|
|
|
}
|