Files
seaweedfs/weed/command/master.go
Chris Lu 9ae905e456 feat(security): hot-reload HTTPS certs without restart (k8s cert-manager) (#9181)
* feat(security): hot-reload HTTPS certs for master/volume/filer/webdav/admin

S3 and filer already use a refreshing pemfile provider for their HTTPS
cert, so rotated certificates (e.g. from k8s cert-manager) are picked up
without a restart. Master, volume, webdav, and admin, however, passed
cert/key paths straight to ServeTLS/ListenAndServeTLS and loaded once at
startup — rotating those certs required a pod restart.

Add a small helper NewReloadingServerCertificate in weed/security that
wraps pemfile.Provider and returns a tls.Config.GetCertificate closure,
then wire it into the four remaining HTTPS entry points. httpdown now
also calls ServeTLS when TLSConfig carries a GetCertificate/Certificates
but CertFile/KeyFile are empty, so volume server can pre-populate
TLSConfig.

A unit test exercises the rotation path (write cert, rotate on disk,
assert the callback returns the new cert) with a short refresh window.

* refactor(security): route filer/s3 HTTPS through the shared cert reloader

Before: filer.go and s3.go each kept a *certprovider.Provider on the
options struct plus a duplicated GetCertificateWithUpdate method. Both
were loading pemfile themselves. Behaviorally they already reloaded, but
the logic was duplicated two ways and neither path was shared with the
newly-added master/volume/webdav/admin wiring.

After: both use security.NewReloadingServerCertificate like the other
servers. The per-struct certProvider field and GetCertificateWithUpdate
method are removed, along with the now-unused certprovider and pemfile
imports. Net: -32 lines, one code path for all HTTPS cert reloading.

No behavior change — the refresh window, cache, and handshake contract
are identical (the helper wraps the same pemfile.NewProvider).

* feat(security): hot-reload HTTPS client certs for mount/backup/upload/etc

The HTTP client in weed/util/http/client loaded the mTLS client cert
once at startup via tls.LoadX509KeyPair. That left every long-lived
HTTPS client process (weed mount, backup, filer.copy, filer→volume,
s3→filer/volume) unable to pick up a rotated client cert without a
restart — even though the same cert-manager setup was already rotating
the server side fine.

Swap the client cert loader for a tls.Config.GetClientCertificate
callback backed by the same refreshing pemfile provider. New TLS
handshakes pick up the rotated cert; in-flight pooled connections keep
their old cert and drop as normal transport churn happens.

To keep this reusable from both server and client TLS code without an
import cycle (weed/security already imports weed/util/http/client for
LoadHTTPClientFromFile), extract the pemfile wrapper into a new
weed/security/certreload subpackage. weed/security keeps its thin
NewReloadingServerCertificate wrapper. The existing unit test moves
with the implementation.

gRPC mTLS was already handled by security.LoadServerTLS /
LoadClientTLS; this PR does not change any gRPC paths. MQ broker, MQ
agent, Kafka gateway, and FUSE mount control plane are gRPC-only and
therefore already rotate.

CA bundles (ClientCAs / RootCAs / grpc.ca) are still loaded once — noted
as a known limitation in the wiki.

* fix(security): address PR review feedback on cert reloader

Bots (gemini-code-assist + coderabbit) flagged three real issues and a
couple of nits. Addressing them here:

1. KeyMaterial used context.Background(). The grpc pemfile provider's
   KeyMaterial blocks until material arrives or the context deadline
   expires; with Background() a slow disk could hang the TLS handshake
   indefinitely. Switched both the server and client callbacks to use
   hello.Context() / cri.Context() so a stuck read is bounded by the
   handshake timeout.

2. Admin server loaded TLS inside the serve goroutine. If the cert was
   bad, the goroutine returned but startAdminServer kept blocking on
   <-ctx.Done() with no listener, making the process look healthy with
   nothing bound. Moved TLS setup to run before the goroutine starts
   and propagate errors via fmt.Errorf; also captures the provider and
   defers Close().

3. HTTP client discarded the certprovider.Provider from
   NewClientGetCertificate. That leaked the refresh goroutine, and
   NewHttpClientWithTLS had a worse case where a CA-file failure after
   provider creation orphaned the provider entirely. Added a
   certProvider field and a Close() method on HTTPClient, and made
   the constructors close the provider on subsequent error paths.

4. Server-side paths (master/volume/filer/s3/webdav/admin) now retain
   the provider. filer and webdav run ServeTLS synchronously, so a
   plain defer works. master/volume/s3 dispatch goroutines and return
   while the server keeps running, so they hook Close() into
   grace.OnInterrupt.

5. Test: certreload_test now tolerates transient read/parse errors
   during file rotation (writeSelfSigned rewrites cert before key) and
   reports the last error only if the deadline expires.

No user-visible behavior change for the happy path.

* test(tls): add end-to-end HTTPS cert rotation integration test

Boots a real `weed master` with HTTPS enabled, captures the leaf cert
served at TLS handshake time, atomically rewrites the cert/key files
on disk (the same rename-in-place pattern kubelet does when it swaps
a cert-manager Secret), and asserts that a subsequent TLS handshake
observes the rotated leaf — with no process restart, no SIGHUP, no
reloader sidecar. Verifies the full path: on-disk change → pemfile
refresh tick → provider.KeyMaterial → tls.Config.GetCertificate →
server TLS handshake.

Runtime is ~1s by exposing the reloader's refresh window as an env
var (WEED_TLS_CERT_REFRESH_INTERVAL) and setting it to 500ms for the
test. The same env var is user-facing — documented in the wiki — so
operators running short-lived certs (Vault, cert-manager with
duration: 24h, etc.) can tighten the rotation-pickup window without a
rebuild. Defaults to 5h to preserve prior behavior.

security.CredRefreshingInterval is kept for API compatibility but now
aliases certreload.DefaultRefreshInterval so the same env controls
both gRPC mTLS and HTTPS reload.

* ci(tls): wire the TLS rotation integration test into GitHub Actions

Mirrors the existing vacuum-integration-tests.yml shape: Ubuntu runner,
Go 1.25, build weed, run `go test` in test/tls_rotation, upload master
logs on failure. 10-minute job timeout; the test itself finishes in
about a second because WEED_TLS_CERT_REFRESH_INTERVAL is set to 500ms
inside the test.

Runs on every push to master and on every PR to master.

* fix(tls): address follow-up PR review comments

Three new comments on the integration test + volume shutdown path:

1. Test: peekServerCert was swallowing every dial/handshake error,
   which meant waitForCert's "last err: <nil>" fatal message lost all
   diagnostic value. Thread errors back through: peekServerCert now
   returns (*x509.Certificate, error), and waitForCert records the
   latest error so a CI flake points at the actual cause (master
   didn't come up, handshake rejected, CA pool mismatch, etc.).

2. Test: set HOME=<tempdir> on the master subprocess. Viper today
   registers the literal path "$HOME/.seaweedfs" without env
   expansion, so a developer's ~/.seaweedfs/security.toml is
   accidentally invisible — the test was relying on that. Pinning
   HOME is belt-and-braces against a future viper upgrade that does
   expand env vars.

3. volume.go: startClusterHttpService's provider close was registered
   via grace.OnInterrupt, which fires on SIGTERM but NOT on the
   v.shutdownCtx.Done() path used by mini / integration tests. The
   pemfile refresh goroutine leaked in that shutdown path. Now the
   helper returns a close func and the caller invokes it on BOTH
   shutdown paths for parity.

Also add MinVersion: TLS 1.2 to the test's tls.Config to quiet the
ast-grep static-analysis nit — zero-risk since the pool only trusts
our in-memory CA.

Test runs clean 3/3.
2026-04-21 20:20:11 -07:00

461 lines
17 KiB
Go

package command
import (
"context"
"crypto/tls"
"fmt"
"math/rand/v2"
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
hashicorpRaft "github.com/hashicorp/raft"
"slices"
"github.com/gorilla/mux"
"github.com/seaweedfs/raft/protobuf"
"github.com/spf13/viper"
"google.golang.org/grpc/reflection"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
m MasterOptions
)
const (
raftJoinCheckDelay = 1500 * time.Millisecond // delay before checking if we should join a raft cluster
)
type MasterOptions struct {
port *int
portGrpc *int
ip *string
ipBind *string
metaFolder *string
peers *string
mastersDeprecated *string // deprecated, for backward compatibility in master.follower
volumeSizeLimitMB *uint
volumePreallocate *bool
maxParallelVacuumPerServer *int
// pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
whiteList *string
disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
raftBootstrap *bool
telemetryUrl *string
telemetryEnabled *bool
debug *bool
debugPort *int
// shutdownCtx, when non-nil, tells startMaster to shut down once the ctx
// is cancelled. Used by integration tests and by weed mini; nil for
// standalone weed master.
shutdownCtx context.Context
}
func init() {
cmdMaster.Run = runMaster // break init cycle
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port")
m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier")
m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095; use 'none' for single-master mode")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
m.maxParallelVacuumPerServer = cmdMaster.Flag.Int("maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel per volume server")
// m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.")
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
m.metricsHttpIp = cmdMaster.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", true, "resume previous state on start master server")
m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft")
m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
m.telemetryUrl = cmdMaster.Flag.String("telemetry.url", "https://telemetry.seaweedfs.com/api/collect", "telemetry server URL to send usage statistics")
m.telemetryEnabled = cmdMaster.Flag.Bool("telemetry", false, "enable telemetry reporting")
m.debug = cmdMaster.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
m.debugPort = cmdMaster.Flag.Int("debug.port", 6060, "http port for debugging")
}
var cmdMaster = &Command{
UsageLine: "master -port=9333",
Short: "start a master server",
Long: `start a master server to provide volume=>location mapping service and sequence number of file ids
The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
The example security.toml configuration file can be generated by "weed scaffold -config=security"
For single-master setups, use -peers=none to skip Raft quorum wait and enable instant startup.
This is ideal for development or standalone deployments.
`,
}
var (
masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file")
)
func runMaster(cmd *Command, args []string) bool {
if *m.debug {
grace.StartDebugServer(*m.debugPort)
}
util.LoadSecurityConfiguration()
util.LoadConfiguration("master", false)
// bind viper configuration to command line flags
if v := util.GetViper().GetString("master.mdir"); v != "" {
*m.metaFolder = v
}
grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
parent, _ := util.FullPath(*m.metaFolder).DirAndName()
if util.FileExists(string(parent)) && !util.FileExists(*m.metaFolder) {
os.MkdirAll(*m.metaFolder, 0755)
}
if err := util.TestFolderWritable(util.ResolvePath(*m.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
}
masterWhiteList := util.StringSplit(*m.whiteList, ",")
if *m.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
switch {
case *m.metricsHttpIp != "":
// noting to do, use m.metricsHttpIp
case *m.ipBind != "":
*m.metricsHttpIp = *m.ipBind
case *m.ip != "":
*m.metricsHttpIp = *m.ip
}
go stats_collect.StartMetricsServer(*m.metricsHttpIp, *m.metricsHttpPort)
go stats_collect.LoopPushingMetric("masterServer", util.JoinHostPort(*m.ip, *m.port), *m.metricsAddress, *m.metricsIntervalSec)
startMaster(m, masterWhiteList)
return true
}
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
backend.LoadConfiguration(util.GetViper())
if *masterOption.portGrpc == 0 {
*masterOption.portGrpc = 10000 + *masterOption.port
}
if *masterOption.ipBind == "" {
*masterOption.ipBind = *masterOption.ip
}
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
masterPeers := make(map[string]pb.ServerAddress)
for _, peer := range peers {
masterPeers[string(peer)] = peer
}
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", version.Version(), listeningAddress)
masterListener, masterLocalListener, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port))
isSingleMaster := isSingleMasterMode(*masterOption.peers)
raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
Peers: masterPeers,
ServerAddr: myMasterAddress,
DataDir: util.ResolvePath(metaDir),
Topo: ms.Topo,
RaftResumeState: *masterOption.raftResumeState,
SingleMaster: isSingleMaster,
HeartbeatInterval: *masterOption.heartbeatInterval,
ElectionTimeout: *masterOption.electionTimeout,
RaftBootstrap: *masterOption.raftBootstrap,
}
var raftServer *weed_server.RaftServer
var err error
if *masterOption.raftHashicorp {
if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
glog.Fatalf("NewHashicorpRaftServer: %s", err)
}
} else {
raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}
// For single-master mode with a fresh log, initialize cluster immediately.
// When resuming with existing state, the server is already a member and
// will self-elect via fastResume — sending another JoinCommand would block
// because goraft's setCommitIndex returns early on JoinCommand entries,
// preventing the new entry's event from being notified when old uncommitted
// JoinCommands exist in the log.
if isSingleMaster && !raftServer.HasExistingState() {
glog.V(0).Infof("Single-master mode: initializing cluster immediately")
raftServer.DoJoinCommand()
}
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods(http.MethodGet, http.MethodHead)
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods(http.MethodGet, http.MethodHead)
if *masterOption.raftHashicorp {
r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods(http.MethodGet)
}
// starting grpc server
grpcPort := *masterOption.portGrpc
grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
if *masterOption.raftHashicorp {
raftServer.TransportManager.Register(grpcS)
} else {
protobuf.RegisterRaftServer(grpcS, raftServer)
}
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", version.Version(), *masterOption.ipBind, grpcPort)
if grpcLocalL != nil {
go grpcS.Serve(grpcLocalL)
}
go grpcS.Serve(grpcL)
pb.ServeGrpcOnLocalSocket(grpcS, grpcPort)
// For multi-master mode with non-Hashicorp raft, wait and check if we should join
if !*masterOption.raftHashicorp && !isSingleMaster {
go func() {
// Stagger bootstrap by peer index so masters don't all check
// simultaneously. Peer 0 waits ~1.5s, peer 1 ~3s, etc.
idx := peerIndex(myMasterAddress, peers)
delay := time.Duration(float64(raftJoinCheckDelay) * (rand.Float64()*0.25 + 1) * float64(idx+1))
glog.V(0).Infof("bootstrap check in %v (peer index %d of %d)", delay, idx, len(peers))
time.Sleep(delay)
ms.Topo.RaftServerAccessLock.RLock()
isEmptyMaster := ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty()
isFirst := idx == 0
if isEmptyMaster && isFirst {
existingLeader := ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress)
if existingLeader == "" {
raftServer.DoJoinCommand()
} else {
glog.V(0).Infof("skip bootstrap: existing leader %s found from peers", existingLeader)
}
} else if !isEmptyMaster {
glog.V(0).Infof("skip bootstrap: leader=%q logEmpty=%v", ms.Topo.RaftServer.Leader(), ms.Topo.RaftServer.IsLogEmpty())
} else {
glog.V(0).Infof("skip bootstrap: %v is not the first master in peers (index %d)", myMasterAddress, idx)
}
ms.Topo.RaftServerAccessLock.RUnlock()
}()
}
go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
var (
clientCertFile,
certFile,
keyFile string
)
useTLS := false
useMTLS := false
if viper.GetString("https.master.key") != "" {
useTLS = true
certFile = viper.GetString("https.master.cert")
keyFile = viper.GetString("https.master.key")
}
if viper.GetString("https.master.ca") != "" {
useMTLS = true
clientCertFile = viper.GetString("https.master.ca")
}
if masterLocalListener != nil {
go newHttpServer(r, nil).Serve(masterLocalListener)
}
var tlsConfig *tls.Config
if useMTLS {
tlsConfig = security.LoadClientTLSHTTP(clientCertFile)
security.FixTlsConfig(util.GetViper(), tlsConfig)
}
if useTLS {
getCert, certProvider, err := security.NewReloadingServerCertificate(certFile, keyFile)
if err != nil {
glog.Fatalf("failed to load master HTTPS certificate: %v", err)
}
// Master runs ServeTLS in a goroutine and this function then blocks
// on shutdownCtx / select{}; tie the pem refresh goroutine to the
// existing interrupt hook instead of a local defer that would fire
// while the server is still running.
grace.OnInterrupt(certProvider.Close)
if tlsConfig == nil {
tlsConfig = &tls.Config{}
}
tlsConfig.GetCertificate = getCert
go newHttpServer(r, tlsConfig).ServeTLS(masterListener, "", "")
} else {
go newHttpServer(r, nil).Serve(masterListener)
}
grace.OnInterrupt(ms.Shutdown)
grace.OnInterrupt(grpcS.Stop)
grace.OnReload(func() {
if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer()
}
})
if masterOption.shutdownCtx != nil {
<-masterOption.shutdownCtx.Done()
ms.Shutdown()
grpcS.Stop()
} else {
select {}
}
}
func isSingleMasterMode(peers string) bool {
p := strings.ToLower(strings.TrimSpace(peers))
return p == "none"
}
func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) {
glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort)
// Handle special case: -peers=none for single-master setup
if isSingleMasterMode(peers) {
glog.V(0).Infof("Running in single-master mode (peers=none), no quorum required")
cleanedPeers = []pb.ServerAddress{masterAddress}
return
}
peers = strings.TrimSpace(peers)
seenPeers := make(map[string]struct{})
for _, peer := range pb.ServerAddresses(peers).ToAddresses() {
normalizedPeer := normalizeMasterPeerAddress(peer, masterAddress)
key := string(normalizedPeer)
if _, found := seenPeers[key]; found {
continue
}
seenPeers[key] = struct{}{}
cleanedPeers = append(cleanedPeers, normalizedPeer)
}
hasSelf := false
for _, peer := range cleanedPeers {
if peer.ToHttpAddress() == masterAddress.ToHttpAddress() {
hasSelf = true
break
}
}
if !hasSelf {
cleanedPeers = append(cleanedPeers, masterAddress)
}
if len(cleanedPeers)%2 == 0 {
glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers)
}
return
}
func normalizeMasterPeerAddress(peer pb.ServerAddress, self pb.ServerAddress) pb.ServerAddress {
if peer.ToHttpAddress() == self.ToHttpAddress() {
return self
}
_, grpcPort, err := net.SplitHostPort(peer.ToGrpcAddress())
if err != nil {
return peer
}
grpcPortValue, err := strconv.Atoi(grpcPort)
if err != nil {
return peer
}
return pb.NewServerAddressWithGrpcPort(peer.ToHttpAddress(), grpcPortValue)
}
// peerIndex returns the 0-based position of self in the sorted peer list.
// Peer 0 is the designated bootstrap node. Returns -1 if self is not found.
func peerIndex(self pb.ServerAddress, peers []pb.ServerAddress) int {
slices.SortFunc(peers, func(a, b pb.ServerAddress) int {
return strings.Compare(a.ToHttpAddress(), b.ToHttpAddress())
})
for i, peer := range peers {
if peer.ToHttpAddress() == self.ToHttpAddress() {
return i
}
}
glog.Warningf("peerIndex: self %s not found in peers %v", self, peers)
return -1
}
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc)
return &weed_server.MasterOption{
Master: masterAddress,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
VolumePreallocate: *m.volumePreallocate,
MaxParallelVacuumPerServer: *m.maxParallelVacuumPerServer,
// PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication,
GarbageThreshold: *m.garbageThreshold,
WhiteList: whiteList,
DisableHttp: *m.disableHttp,
MetricsAddress: *m.metricsAddress,
MetricsIntervalSec: *m.metricsIntervalSec,
TelemetryUrl: *m.telemetryUrl,
TelemetryEnabled: *m.telemetryEnabled,
}
}