diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 814e225ad..34b34632f 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -16,7 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/plugin" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "google.golang.org/protobuf/encoding/protojson" @@ -735,8 +734,8 @@ func (s *AdminServer) parseOrBuildClusterContext(raw json.RawMessage) (*plugin_p if len(contextMessage.MasterGrpcAddresses) == 0 { contextMessage.MasterGrpcAddresses = append(contextMessage.MasterGrpcAddresses, fallback.MasterGrpcAddresses...) } - if len(contextMessage.FilerGrpcAddresses) == 0 { - contextMessage.FilerGrpcAddresses = append(contextMessage.FilerGrpcAddresses, fallback.FilerGrpcAddresses...) + if len(contextMessage.FilerAddresses) == 0 { + contextMessage.FilerAddresses = append(contextMessage.FilerAddresses, fallback.FilerAddresses...) } if len(contextMessage.VolumeGrpcAddresses) == 0 { contextMessage.VolumeGrpcAddresses = append(contextMessage.VolumeGrpcAddresses, fallback.VolumeGrpcAddresses...) @@ -755,7 +754,7 @@ func (s *AdminServer) parseOrBuildClusterContext(raw json.RawMessage) (*plugin_p func (s *AdminServer) buildDefaultPluginClusterContext() *plugin_pb.ClusterContext { clusterContext := &plugin_pb.ClusterContext{ MasterGrpcAddresses: make([]string, 0), - FilerGrpcAddresses: make([]string, 0), + FilerAddresses: make([]string, 0), VolumeGrpcAddresses: make([]string, 0), S3GrpcAddresses: make([]string, 0), Metadata: map[string]string{ @@ -768,20 +767,20 @@ func (s *AdminServer) buildDefaultPluginClusterContext() *plugin_pb.ClusterConte clusterContext.MasterGrpcAddresses = append(clusterContext.MasterGrpcAddresses, masterAddress) } - // Master returns filers in dual-port form (host:httpPort.grpcPort); - // workers dial these directly, so collapse to host:grpcPort first. + // Master returns filers in pb.ServerAddress form (host:httpPort.grpcPort). + // Forward that verbatim; each worker converts to a gRPC or HTTP address as + // it needs (dialing wants gRPC, the admin shell wants the ServerAddress). filerSeen := map[string]struct{}{} for _, filer := range s.GetAllFilers() { filer = strings.TrimSpace(filer) if filer == "" { continue } - grpcAddr := pb.ServerAddress(filer).ToGrpcAddress() - if _, exists := filerSeen[grpcAddr]; exists { + if _, exists := filerSeen[filer]; exists { continue } - filerSeen[grpcAddr] = struct{}{} - clusterContext.FilerGrpcAddresses = append(clusterContext.FilerGrpcAddresses, grpcAddr) + filerSeen[filer] = struct{}{} + clusterContext.FilerAddresses = append(clusterContext.FilerAddresses, filer) } volumeSeen := map[string]struct{}{} @@ -829,7 +828,7 @@ func (s *AdminServer) buildDefaultPluginClusterContext() *plugin_pb.ClusterConte } sort.Strings(clusterContext.MasterGrpcAddresses) - sort.Strings(clusterContext.FilerGrpcAddresses) + sort.Strings(clusterContext.FilerAddresses) sort.Strings(clusterContext.VolumeGrpcAddresses) sort.Strings(clusterContext.S3GrpcAddresses) diff --git a/weed/admin/plugin/cluster_rate_limit.go b/weed/admin/plugin/cluster_rate_limit.go index e1802a5f7..f1537ad3c 100644 --- a/weed/admin/plugin/cluster_rate_limit.go +++ b/weed/admin/plugin/cluster_rate_limit.go @@ -113,7 +113,7 @@ func cloneClusterContext(in *plugin_pb.ClusterContext) *plugin_pb.ClusterContext } out := &plugin_pb.ClusterContext{ MasterGrpcAddresses: in.MasterGrpcAddresses, - FilerGrpcAddresses: in.FilerGrpcAddresses, + FilerAddresses: in.FilerAddresses, VolumeGrpcAddresses: in.VolumeGrpcAddresses, S3GrpcAddresses: in.S3GrpcAddresses, } diff --git a/weed/pb/plugin.proto b/weed/pb/plugin.proto index 9007908fc..9333a456c 100644 --- a/weed/pb/plugin.proto +++ b/weed/pb/plugin.proto @@ -348,7 +348,9 @@ message JobResult { message ClusterContext { repeated string master_grpc_addresses = 1; - repeated string filer_grpc_addresses = 2; + // Filers in pb.ServerAddress form (host:httpPort.grpcPort). Consumers + // convert to a gRPC or HTTP address as needed; see weed/pb/server_address.go. + repeated string filer_addresses = 2; repeated string volume_grpc_addresses = 3; map metadata = 4; repeated string s3_grpc_addresses = 5; diff --git a/weed/pb/plugin_pb/plugin.pb.go b/weed/pb/plugin_pb/plugin.pb.go index 9a3ecab90..06c0b02e8 100644 --- a/weed/pb/plugin_pb/plugin.pb.go +++ b/weed/pb/plugin_pb/plugin.pb.go @@ -3564,10 +3564,12 @@ func (x *JobResult) GetSummary() string { type ClusterContext struct { state protoimpl.MessageState `protogen:"open.v1"` MasterGrpcAddresses []string `protobuf:"bytes,1,rep,name=master_grpc_addresses,json=masterGrpcAddresses,proto3" json:"master_grpc_addresses,omitempty"` - FilerGrpcAddresses []string `protobuf:"bytes,2,rep,name=filer_grpc_addresses,json=filerGrpcAddresses,proto3" json:"filer_grpc_addresses,omitempty"` - VolumeGrpcAddresses []string `protobuf:"bytes,3,rep,name=volume_grpc_addresses,json=volumeGrpcAddresses,proto3" json:"volume_grpc_addresses,omitempty"` - Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` - S3GrpcAddresses []string `protobuf:"bytes,5,rep,name=s3_grpc_addresses,json=s3GrpcAddresses,proto3" json:"s3_grpc_addresses,omitempty"` + // Filers in pb.ServerAddress form (host:httpPort.grpcPort). Consumers + // convert to a gRPC or HTTP address as needed; see weed/pb/server_address.go. + FilerAddresses []string `protobuf:"bytes,2,rep,name=filer_addresses,json=filerAddresses,proto3" json:"filer_addresses,omitempty"` + VolumeGrpcAddresses []string `protobuf:"bytes,3,rep,name=volume_grpc_addresses,json=volumeGrpcAddresses,proto3" json:"volume_grpc_addresses,omitempty"` + Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + S3GrpcAddresses []string `protobuf:"bytes,5,rep,name=s3_grpc_addresses,json=s3GrpcAddresses,proto3" json:"s3_grpc_addresses,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3609,9 +3611,9 @@ func (x *ClusterContext) GetMasterGrpcAddresses() []string { return nil } -func (x *ClusterContext) GetFilerGrpcAddresses() []string { +func (x *ClusterContext) GetFilerAddresses() []string { if x != nil { - return x.FilerGrpcAddresses + return x.FilerAddresses } return nil } @@ -4273,10 +4275,10 @@ const file_plugin_proto_rawDesc = "" + "\asummary\x18\x02 \x01(\tR\asummary\x1aT\n" + "\x11OutputValuesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xd5\x02\n" + + "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xcc\x02\n" + "\x0eClusterContext\x122\n" + - "\x15master_grpc_addresses\x18\x01 \x03(\tR\x13masterGrpcAddresses\x120\n" + - "\x14filer_grpc_addresses\x18\x02 \x03(\tR\x12filerGrpcAddresses\x122\n" + + "\x15master_grpc_addresses\x18\x01 \x03(\tR\x13masterGrpcAddresses\x12'\n" + + "\x0ffiler_addresses\x18\x02 \x03(\tR\x0efilerAddresses\x122\n" + "\x15volume_grpc_addresses\x18\x03 \x03(\tR\x13volumeGrpcAddresses\x12@\n" + "\bmetadata\x18\x04 \x03(\v2$.plugin.ClusterContext.MetadataEntryR\bmetadata\x12*\n" + "\x11s3_grpc_addresses\x18\x05 \x03(\tR\x0fs3GrpcAddresses\x1a;\n" + diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 344376f9b..9d7271289 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -19,9 +19,9 @@ import ( ) const ( - adminScriptJobType = "admin_script" - maxAdminScriptOutputBytes = 16 * 1024 - defaultAdminScriptRunMins = 17 + adminScriptJobType = "admin_script" + maxAdminScriptOutputBytes = 16 * 1024 + defaultAdminScriptRunMins = 17 adminScriptDetectTickMinutes = 17 ) @@ -546,13 +546,31 @@ func (h *AdminScriptHandler) buildAdminScriptCommandEnv( ctx context.Context, clusterContext *plugin_pb.ClusterContext, ) (*shell.CommandEnv, context.CancelFunc, error) { + options, err := h.buildAdminScriptShellOptions(clusterContext) + if err != nil { + return nil, nil, err + } + + commandEnv := shell.NewCommandEnv(&options) + commandEnv.ForceNoLock() + + ctx, cancel := context.WithCancel(ctx) + go commandEnv.MasterClient.KeepConnectedToMaster(ctx) + + return commandEnv, cancel, nil +} + +// buildAdminScriptShellOptions maps a cluster context onto shell.ShellOptions. +// It is split out from buildAdminScriptCommandEnv so the address handling is +// testable without standing up a master client. +func (h *AdminScriptHandler) buildAdminScriptShellOptions(clusterContext *plugin_pb.ClusterContext) (shell.ShellOptions, error) { if clusterContext == nil { - return nil, nil, fmt.Errorf("cluster context is required") + return shell.ShellOptions{}, fmt.Errorf("cluster context is required") } masters := normalizeAddressList(clusterContext.MasterGrpcAddresses) if len(masters) == 0 { - return nil, nil, fmt.Errorf("missing master addresses for admin script") + return shell.ShellOptions{}, fmt.Errorf("missing master addresses for admin script") } filerGroup := "" @@ -564,20 +582,17 @@ func (h *AdminScriptHandler) buildAdminScriptCommandEnv( Directory: "/", } - filers := normalizeAddressList(clusterContext.FilerGrpcAddresses) + // FilerAddresses are pb.ServerAddress strings (host:httpPort.grpcPort). + // ShellOptions.FilerAddress is itself a ServerAddress; shell commands derive + // the gRPC or HTTP port from it as needed, so store it verbatim. + filers := normalizeAddressList(clusterContext.FilerAddresses) if len(filers) > 0 { options.FilerAddress = pb.ServerAddress(filers[0]) } else { glog.V(1).Infof("admin script worker missing filer address; filer-dependent commands may fail") } - commandEnv := shell.NewCommandEnv(&options) - commandEnv.ForceNoLock() - - ctx, cancel := context.WithCancel(ctx) - go commandEnv.MasterClient.KeepConnectedToMaster(ctx) - - return commandEnv, cancel, nil + return options, nil } func normalizeAddressList(addresses []string) []string { diff --git a/weed/plugin/worker/admin_script_handler_test.go b/weed/plugin/worker/admin_script_handler_test.go index 888a94bab..5f134078b 100644 --- a/weed/plugin/worker/admin_script_handler_test.go +++ b/weed/plugin/worker/admin_script_handler_test.go @@ -10,6 +10,36 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +func TestAdminScriptFilerAddressKeepsGrpcPortOffConvention(t *testing.T) { + // FilerAddresses arrive as pb.ServerAddress (host:httpPort.grpcPort). The + // shell derives the gRPC port from FilerAddress via ToGrpcAddress(); it must + // land on the real gRPC port even when that port is off the httpPort+10000 + // convention, not double the offset into a non-existent filer:28890. + h := NewAdminScriptHandler(nil) + options, err := h.buildAdminScriptShellOptions(&plugin_pb.ClusterContext{ + MasterGrpcAddresses: []string{"master:19333"}, + FilerAddresses: []string{"filer:8888.18890"}, + }) + if err != nil { + t.Fatalf("buildAdminScriptShellOptions returned err = %v", err) + } + if grpc := options.FilerAddress.ToGrpcAddress(); grpc != "filer:18890" { + t.Fatalf("filer gRPC address = %q, want filer:18890 (not filer:28890)", grpc) + } + if http := options.FilerAddress.ToHttpAddress(); http != "filer:8888" { + t.Fatalf("filer HTTP address = %q, want filer:8888", http) + } +} + +func TestAdminScriptShellOptionsRequireMasters(t *testing.T) { + h := NewAdminScriptHandler(nil) + if _, err := h.buildAdminScriptShellOptions(&plugin_pb.ClusterContext{ + FilerAddresses: []string{"filer:8888.18890"}, + }); err == nil { + t.Fatalf("expected error when master addresses are missing") + } +} + func TestAdminScriptDescriptorDefaults(t *testing.T) { descriptor := NewAdminScriptHandler(nil).Descriptor() if descriptor == nil { diff --git a/weed/worker/tasks/iceberg/exec_test.go b/weed/worker/tasks/iceberg/exec_test.go index 2044fb2de..54aa11f7c 100644 --- a/weed/worker/tasks/iceberg/exec_test.go +++ b/weed/worker/tasks/iceberg/exec_test.go @@ -246,7 +246,7 @@ func startFakeFilerWithAddress(t *testing.T) (*fakeFilerServer, filer_pb.Seaweed } // Return the gRPC address in dialable form (host:grpcPort) since - // dialFiler now dials FilerGrpcAddresses verbatim. + // dialFiler dials the resolved filer_grpc_address verbatim. return fakeServer, client, listener.Addr().String() } diff --git a/weed/worker/tasks/iceberg/handler.go b/weed/worker/tasks/iceberg/handler.go index 32dfb28b5..d2bfe8850 100644 --- a/weed/worker/tasks/iceberg/handler.go +++ b/weed/worker/tasks/iceberg/handler.go @@ -376,10 +376,13 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq // Detection interval is managed by the scheduler via AdminRuntimeDefaults.DetectionIntervalMinutes. - // Get filer gRPC addresses from cluster context + // ClusterContext.FilerAddresses are pb.ServerAddress strings + // (host:httpPort.grpcPort); collapse each to a dialable gRPC address. filerGrpcAddresses := make([]string, 0) if request.ClusterContext != nil { - filerGrpcAddresses = append(filerGrpcAddresses, request.ClusterContext.FilerGrpcAddresses...) + for _, filer := range request.ClusterContext.FilerAddresses { + filerGrpcAddresses = append(filerGrpcAddresses, pb.ServerAddress(filer).ToGrpcAddress()) + } } if len(filerGrpcAddresses) == 0 { _ = sender.SendActivity(pluginworker.BuildDetectorActivity("skipped", "no filer addresses in cluster context", nil)) @@ -640,12 +643,10 @@ func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error }) } -// dialFiler connects to a filer at the given gRPC address. The address is -// expected to be already dialable (host:grpcPort) as supplied via -// ClusterContext.FilerGrpcAddresses or a job proposal parameter; we don't -// run it through pb.ServerAddress.ToGrpcAddress because that helper's -// fallback adds +10000 to any single-port address, turning a real gRPC -// port like 18888 into a non-existent 28888. +// dialFiler connects to a filer at the given gRPC address. The address must +// already be a dialable host:grpcPort: Detect resolves it from +// ClusterContext.FilerAddresses via ToGrpcAddress and stores that resolved form +// in the job proposal parameter, so dialFiler dials it verbatim. func (h *Handler) dialFiler(ctx context.Context, grpcAddress string) (*grpc.ClientConn, error) { opCtx, opCancel := context.WithTimeout(ctx, filerConnectTimeout) defer opCancel() diff --git a/weed/worker/tasks/s3_lifecycle/handler.go b/weed/worker/tasks/s3_lifecycle/handler.go index 57a6f751a..896a363f1 100644 --- a/weed/worker/tasks/s3_lifecycle/handler.go +++ b/weed/worker/tasks/s3_lifecycle/handler.go @@ -170,19 +170,22 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq } filerAddresses := []string{} if request.ClusterContext != nil { - filerAddresses = append(filerAddresses, request.ClusterContext.FilerGrpcAddresses...) + filerAddresses = append(filerAddresses, request.ClusterContext.FilerAddresses...) } if len(filerAddresses) == 0 { _ = sender.SendActivity(pluginworker.BuildDetectorActivity("skipped", "no filer addresses in cluster context", nil)) return sender.SendComplete(&plugin_pb.DetectionComplete{JobType: jobType, Success: true}) } + // FilerAddresses are pb.ServerAddress strings (host:httpPort.grpcPort); + // Execute dials filer_grpc_address verbatim, so resolve it to a gRPC address. + filerGrpcAddress := pb.ServerAddress(filerAddresses[0]).ToGrpcAddress() proposal := &plugin_pb.JobProposal{ JobType: jobType, ProposalId: fmt.Sprintf("s3-lifecycle-%d", time.Now().UnixNano()), Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL, Parameters: map[string]*plugin_pb.ConfigValue{ - "filer_grpc_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerAddresses[0]}}, + "filer_grpc_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerGrpcAddress}}, }, } if err := sender.SendProposals(&plugin_pb.DetectionProposals{ diff --git a/weed/worker/tasks/s3_lifecycle/handler_test.go b/weed/worker/tasks/s3_lifecycle/handler_test.go index af715f21b..c1b98da89 100644 --- a/weed/worker/tasks/s3_lifecycle/handler_test.go +++ b/weed/worker/tasks/s3_lifecycle/handler_test.go @@ -175,7 +175,7 @@ func TestDetect_NoFilerAddressesCompletesWithSkipActivity(t *testing.T) { JobType: jobType, ClusterContext: &plugin_pb.ClusterContext{ S3GrpcAddresses: []string{"s3a:8333"}, - // no FilerGrpcAddresses + // no FilerAddresses }, }, r) require.NoError(t, err) @@ -196,8 +196,10 @@ func TestDetect_HappyPathProposesOneJobWithFirstFilerAddress(t *testing.T) { err := h.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: jobType, ClusterContext: &plugin_pb.ClusterContext{ - S3GrpcAddresses: []string{"s3a:8333"}, - FilerGrpcAddresses: []string{"filer-a:18888", "filer-b:18888"}, + S3GrpcAddresses: []string{"s3a:8333"}, + // pb.ServerAddress dual-port form; the gRPC port is pinned off the + // +10000 convention so a raw-forwarding regression resurfaces here. + FilerAddresses: []string{"filer-a:8888.18890", "filer-b:8888.18890"}, }, }, r) require.NoError(t, err) @@ -210,7 +212,7 @@ func TestDetect_HappyPathProposesOneJobWithFirstFilerAddress(t *testing.T) { assert.NotEmpty(t, prop.ProposalId, "proposal id must be unique-per-run") require.Contains(t, prop.Parameters, "filer_grpc_address") val := prop.Parameters["filer_grpc_address"].GetStringValue() - assert.Equal(t, "filer-a:18888", val, "first reachable filer is dialed") + assert.Equal(t, "filer-a:18890", val, "first filer resolved to its gRPC port, not host:28890") require.Len(t, r.completes, 1) assert.True(t, r.completes[0].Success) @@ -224,8 +226,8 @@ func TestDetect_EmptyJobTypeAccepted(t *testing.T) { r := &recordingSender{} err := h.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ ClusterContext: &plugin_pb.ClusterContext{ - S3GrpcAddresses: []string{"s3a:8333"}, - FilerGrpcAddresses: []string{"f:18888"}, + S3GrpcAddresses: []string{"s3a:8333"}, + FilerAddresses: []string{"f:8888.18890"}, }, }, r) require.NoError(t, err) @@ -241,8 +243,8 @@ func TestDetect_PropagatesProposalsSendError(t *testing.T) { err := h.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: jobType, ClusterContext: &plugin_pb.ClusterContext{ - S3GrpcAddresses: []string{"s3a:8333"}, - FilerGrpcAddresses: []string{"f:18888"}, + S3GrpcAddresses: []string{"s3a:8333"}, + FilerAddresses: []string{"f:8888.18890"}, }, }, r) assert.ErrorIs(t, err, want) @@ -261,8 +263,8 @@ func TestDetect_PropagatesCompleteSendError(t *testing.T) { err := h.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: jobType, ClusterContext: &plugin_pb.ClusterContext{ - S3GrpcAddresses: []string{"s3a:8333"}, - FilerGrpcAddresses: []string{"f:18888"}, + S3GrpcAddresses: []string{"s3a:8333"}, + FilerAddresses: []string{"f:8888.18890"}, }, }, r) assert.ErrorIs(t, err, want)