mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-13 21:31:32 +00:00
* fix(weed/command) address unhandled errors * fix(command): don't log graceful-shutdown sentinels; plug response-body leak - s3: Serve on unix socket treated http.ErrServerClosed as fatal; now excluded like the other Serve/ServeTLS paths in this file. - mq_agent, mq_broker: filter grpc.ErrServerStopped so clean shutdown doesn't log as an error. - worker_runtime: the added decodeErr early-continue skipped resp.Body.Close(); drop it since the existing check below already surfaces the decode error. - mount_std: the pre-mount Unmount commonly fails when nothing is mounted; demote to V(1) Infof. - fuse_std: tidy panic message to match sibling cases. * fix(mq_broker): filter grpc.ErrServerStopped on localhost listener The localhost listener goroutine logged any Serve error unconditionally, which includes grpc.ErrServerStopped on graceful shutdown. Match the main listener's check so clean stops don't surface as errors. --------- Co-authored-by: Chris Lu <chris.lu@gmail.com>
95 lines
2.7 KiB
Go
95 lines
2.7 KiB
Go
package command
|
|
|
|
import (
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/reflection"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
var (
|
|
mqAgentOptions MessageQueueAgentOptions
|
|
)
|
|
|
|
type MessageQueueAgentOptions struct {
|
|
brokers []pb.ServerAddress
|
|
brokersString *string
|
|
filerGroup *string
|
|
ip *string
|
|
port *int
|
|
}
|
|
|
|
func init() {
|
|
cmdMqAgent.Run = runMqAgent // break init cycle
|
|
mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
|
|
mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address")
|
|
mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
|
|
}
|
|
|
|
var cmdMqAgent = &Command{
|
|
UsageLine: "mq.agent [-port=16777] [-broker=<ip:port>]",
|
|
Short: "<WIP> start a message queue agent",
|
|
Long: `start a message queue agent
|
|
|
|
The agent runs on local server to accept gRPC calls to write or read messages.
|
|
The messages are sent to message queue brokers.
|
|
|
|
`,
|
|
}
|
|
|
|
func runMqAgent(cmd *Command, args []string) bool {
|
|
|
|
util.LoadSecurityConfiguration()
|
|
|
|
mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
|
|
|
|
return mqAgentOptions.startQueueAgent()
|
|
|
|
}
|
|
|
|
func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
|
|
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
|
|
|
|
agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
|
|
SeedBrokers: mqAgentOpt.brokers,
|
|
}, grpcDialOption)
|
|
|
|
// start grpc listener
|
|
grpcL, localL, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
|
|
if err != nil {
|
|
glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
|
|
}
|
|
|
|
// Create main gRPC server
|
|
grpcS := pb.NewGrpcServer()
|
|
mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
|
|
reflection.Register(grpcS)
|
|
|
|
// Start localhost listener if available
|
|
if localL != nil {
|
|
localGrpcS := pb.NewGrpcServer()
|
|
mq_agent_pb.RegisterSeaweedMessagingAgentServer(localGrpcS, agentServer)
|
|
reflection.Register(localGrpcS)
|
|
go func() {
|
|
glog.V(0).Infof("MQ Agent listening on localhost:%d", *mqAgentOpt.port)
|
|
if err := localGrpcS.Serve(localL); err != nil {
|
|
glog.Errorf("MQ Agent localhost listener error: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
|
|
if err := grpcS.Serve(grpcL); err != nil && err != grpc.ErrServerStopped {
|
|
glog.Errorf("MQ Agent failed to start: %v", err)
|
|
}
|
|
|
|
return true
|
|
|
|
}
|