Files
seaweedfs/weed/server/volume_server.go
Chris Lu 10cc06333b cluster: restrict Ping RPC to known peers of the requested type (#9445)
Ping previously dialled whatever host:port the caller asked for. Gate
each server's Ping handler on cluster membership: masters check the
topology, registered cluster nodes, and configured master peers; volume
servers only accept their seed/current masters; filers accept tracked
peer filers, the master-learned volume server set, and configured
masters.

Use address-indexed peer lookups to keep Ping target validation O(1):
- topology maintains a pb.ServerAddress -> *DataNode index alongside
  the dc/rack/node tree, kept in sync from doLinkChildNode and
  UnlinkChildNode plus the ip/port-rewrite branch in
  GetOrCreateDataNode. GetTopology now returns nil on a detached
  subtree instead of panicking, so the linkage hooks can no-op safely.
- vid_map tracks a refcount per volume-server address so
  hasVolumeServer answers without scanning every vid location. The
  add path skips empty-address entries the same way the delete path
  already does, so a zero-value Location cannot leak a permanent
  serverRefCount[""] bucket.
- masters reuse a cached master-address set from MasterClient instead
  of walking the configured peer slice on every request.
- volume servers compare against a pre-built seed-master set and
  protect currentMaster reads/writes with an RWMutex, fixing the
  data race with the heartbeat goroutine. The seed slice is copied
  on construction so external mutation cannot desync it from the
  frozen lookup set.
- cluster.check drops the direct volume-to-volume sweep; volume
  servers no longer carry a peer-volume list, and the note next to
  the dropped probe is reworded to make clear that direct
  volume-to-volume reachability is intentionally not validated by
  this command.

Update the volume-server integration tests that drove Ping through the
new admission gate: success-path coverage now targets the master peer
(the only type a volume server tracks), and the unknown/unreachable
path asserts the InvalidArgument the gate now returns instead of the
old downstream dial error.

Mirror the same admission gate in the Rust volume server crate: a
seed-master HashSet built once at startup plus a tokio RwLock over the
heartbeat-tracked current master, both consulted in is_known_ping_target
on every Ping, with InvalidArgument returned for any target that isn't
a recognised master.
2026-05-12 13:00:52 -07:00

208 lines
7.5 KiB
Go

package weed_server
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage"
)
type VolumeServer struct {
volume_server_pb.UnimplementedVolumeServerServer
inFlightUploadDataSize int64
inFlightDownloadDataSize int64
concurrentUploadLimit int64
concurrentDownloadLimit int64
inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
inflightUploadDataTimeout time.Duration
inflightDownloadDataTimeout time.Duration
hasSlowRead bool
readBufferSizeMB int
SeedMasterNodes []pb.ServerAddress
// seedMasterSet mirrors SeedMasterNodes keyed by the canonical http
// form. It is computed once in NewVolumeServer so admission paths can
// answer is-this-a-seed-master in O(1).
seedMasterSet map[string]struct{}
whiteList []string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
pulsePeriod time.Duration
dataCenter string
rack string
store *storage.Store
guard *security.Guard
grpcDialOption grpc.DialOption
needleMapKind storage.NeedleMapKind
ldbTimout int64
FixJpgOrientation bool
ReadMode string
AllowUntrustedRemoteEndpoints bool
compactionBytePerSecond int64
maintenanceBytePerSecond int64
metricsAddress string
metricsIntervalSec int
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, grpcPort int, publicUrl string, id string,
folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, diskTags [][]string,
idxFolder string,
needleMapKind storage.NeedleMapKind,
masterNodes []pb.ServerAddress, pulsePeriod time.Duration,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
readMode string,
compactionMBPerSecond int,
maintenanceMBPerSecond int,
fileSizeLimitMB int,
concurrentUploadLimit int64,
concurrentDownloadLimit int64,
inflightUploadDataTimeout time.Duration,
inflightDownloadDataTimeout time.Duration,
hasSlowRead bool,
readBufferSizeMB int,
ldbTimeout int64,
allowUntrustedRemoteEndpoints bool,
) *VolumeServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
enableUiAccess := v.GetBool("access.ui")
readSigningKey := v.GetString("jwt.signing.read.key")
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
vs := &VolumeServer{
pulsePeriod: pulsePeriod,
dataCenter: dataCenter,
rack: rack,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadMode: readMode,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
maintenanceBytePerSecond: int64(maintenanceMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
inflightUploadDataTimeout: inflightUploadDataTimeout,
inflightDownloadDataTimeout: inflightDownloadDataTimeout,
hasSlowRead: hasSlowRead,
readBufferSizeMB: readBufferSizeMB,
ldbTimout: ldbTimeout,
whiteList: whiteList,
AllowUntrustedRemoteEndpoints: allowUntrustedRemoteEndpoints,
}
whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
// Copy the caller's slice so subsequent external mutation cannot desync
// SeedMasterNodes from the frozen lookup set built below.
seedMasters := make([]pb.ServerAddress, len(masterNodes))
copy(seedMasters, masterNodes)
vs.SeedMasterNodes = seedMasters
vs.seedMasterSet = make(map[string]struct{}, len(seedMasters))
for _, m := range seedMasters {
vs.seedMasterSet[m.ToHttpAddress()] = struct{}{}
}
vs.checkWithMaster()
vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, id, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, diskTags, ldbTimeout)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
adminMux.HandleFunc("/status", requestIDMiddleware(vs.statusHandler))
adminMux.HandleFunc("/healthz", requestIDMiddleware(vs.healthzHandler))
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", requestIDMiddleware(vs.uiStatusHandler))
/*
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
*/
}
adminMux.HandleFunc("/", requestIDMiddleware(vs.privateStoreHandler))
if publicMux != adminMux {
// separated admin and public port
handleStaticResources(publicMux)
publicMux.HandleFunc("/", requestIDMiddleware(vs.publicReadOnlyHandler))
}
stats.VolumeServerConcurrentDownloadLimit.Set(float64(vs.concurrentDownloadLimit))
stats.VolumeServerConcurrentUploadLimit.Set(float64(vs.concurrentUploadLimit))
stats.VolumeServerStartTimeSeconds.Set(float64(time.Now().Unix()))
go vs.heartbeat()
go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
func (vs *VolumeServer) SetStopping() {
glog.V(0).Infoln("Stopping volume server...")
vs.store.SetStopping()
}
func (vs *VolumeServer) LoadNewVolumes() {
glog.V(0).Infoln(" Loading new volume ids ...")
vs.store.LoadNewVolumes()
}
func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close()
glog.V(0).Infoln("Shut down successfully!")
}
func (vs *VolumeServer) Reload() {
glog.V(0).Infoln("Reload volume server...")
util.LoadConfiguration("security", false)
v := util.GetViper()
vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...))
}
// Returns whether a volume server is in maintenance (i.e. read-only) mode.
func (vs *VolumeServer) MaintenanceMode() bool {
if vs.store == nil {
return false
}
return vs.store.State.Proto().GetMaintenance()
}
// Checks if a volume server is in maintenance mode, and returns an error explaining why.
func (vs *VolumeServer) CheckMaintenanceMode() error {
if !vs.MaintenanceMode() {
return nil
}
return fmt.Errorf("volume server %s is in maintenance mode", vs.store.Id)
}