Files
seaweedfs/weed/command/mq_agent.go
Lars Lehtonen 29e14f89f1 fix(weed/command) address unhandled errors (#9208)
* fix(weed/command) address unhandled errors

* fix(command): don't log graceful-shutdown sentinels; plug response-body leak

- s3: Serve on unix socket treated http.ErrServerClosed as fatal; now
  excluded like the other Serve/ServeTLS paths in this file.
- mq_agent, mq_broker: filter grpc.ErrServerStopped so clean shutdown
  doesn't log as an error.
- worker_runtime: the added decodeErr early-continue skipped
  resp.Body.Close(); drop it since the existing check below already
  surfaces the decode error.
- mount_std: the pre-mount Unmount commonly fails when nothing is
  mounted; demote to V(1) Infof.
- fuse_std: tidy panic message to match sibling cases.

* fix(mq_broker): filter grpc.ErrServerStopped on localhost listener

The localhost listener goroutine logged any Serve error unconditionally,
which includes grpc.ErrServerStopped on graceful shutdown. Match the
main listener's check so clean stops don't surface as errors.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
2026-04-23 22:15:05 -07:00

95 lines
2.7 KiB
Go

package command
import (
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
mqAgentOptions MessageQueueAgentOptions
)
type MessageQueueAgentOptions struct {
brokers []pb.ServerAddress
brokersString *string
filerGroup *string
ip *string
port *int
}
func init() {
cmdMqAgent.Run = runMqAgent // break init cycle
mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address")
mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
}
var cmdMqAgent = &Command{
UsageLine: "mq.agent [-port=16777] [-broker=<ip:port>]",
Short: "<WIP> start a message queue agent",
Long: `start a message queue agent
The agent runs on local server to accept gRPC calls to write or read messages.
The messages are sent to message queue brokers.
`,
}
func runMqAgent(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
return mqAgentOptions.startQueueAgent()
}
func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
SeedBrokers: mqAgentOpt.brokers,
}, grpcDialOption)
// start grpc listener
grpcL, localL, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
}
// Create main gRPC server
grpcS := pb.NewGrpcServer()
mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
reflection.Register(grpcS)
// Start localhost listener if available
if localL != nil {
localGrpcS := pb.NewGrpcServer()
mq_agent_pb.RegisterSeaweedMessagingAgentServer(localGrpcS, agentServer)
reflection.Register(localGrpcS)
go func() {
glog.V(0).Infof("MQ Agent listening on localhost:%d", *mqAgentOpt.port)
if err := localGrpcS.Serve(localL); err != nil {
glog.Errorf("MQ Agent localhost listener error: %v", err)
}
}()
}
glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
if err := grpcS.Serve(grpcL); err != nil && err != grpc.ErrServerStopped {
glog.Errorf("MQ Agent failed to start: %v", err)
}
return true
}