cluster: restrict Ping RPC to known peers of the requested type (#9445)

Ping previously dialled whatever host:port the caller asked for. Gate
each server's Ping handler on cluster membership: masters check the
topology, registered cluster nodes, and configured master peers; volume
servers only accept their seed/current masters; filers accept tracked
peer filers, the master-learned volume server set, and configured
masters.

Use address-indexed peer lookups to keep Ping target validation O(1):
- topology maintains a pb.ServerAddress -> *DataNode index alongside
  the dc/rack/node tree, kept in sync from doLinkChildNode and
  UnlinkChildNode plus the ip/port-rewrite branch in
  GetOrCreateDataNode. GetTopology now returns nil on a detached
  subtree instead of panicking, so the linkage hooks can no-op safely.
- vid_map tracks a refcount per volume-server address so
  hasVolumeServer answers without scanning every vid location. The
  add path skips empty-address entries the same way the delete path
  already does, so a zero-value Location cannot leak a permanent
  serverRefCount[""] bucket.
- masters reuse a cached master-address set from MasterClient instead
  of walking the configured peer slice on every request.
- volume servers compare against a pre-built seed-master set and
  protect currentMaster reads/writes with an RWMutex, fixing the
  data race with the heartbeat goroutine. The seed slice is copied
  on construction so external mutation cannot desync it from the
  frozen lookup set.
- cluster.check drops the direct volume-to-volume sweep; volume
  servers no longer carry a peer-volume list, and the note next to
  the dropped probe is reworded to make clear that direct
  volume-to-volume reachability is intentionally not validated by
  this command.

Update the volume-server integration tests that drove Ping through the
new admission gate: success-path coverage now targets the master peer
(the only type a volume server tracks), and the unknown/unreachable
path asserts the InvalidArgument the gate now returns instead of the
old downstream dial error.

Mirror the same admission gate in the Rust volume server crate: a
seed-master HashSet built once at startup plus a tokio RwLock over the
heartbeat-tracked current master, both consulted in is_known_ping_target
on every Ping, with InvalidArgument returned for any target that isn't
a recognised master.
This commit is contained in:
Chris Lu
2026-05-12 13:00:52 -07:00
committed by GitHub
parent 5004b4e542
commit 10cc06333b
26 changed files with 935 additions and 64 deletions

View File

@@ -288,6 +288,10 @@ async fn run(
config.jwt_read_signing_expires_seconds,
);
let master_url = config.masters.first().cloned().unwrap_or_default();
// Defensive-copy the configured seed masters before freezing the lookup
// set, so any later mutation of config.masters cannot desync them.
let master_urls: Vec<String> = config.masters.clone();
let seed_master_set = VolumeServerState::build_seed_master_set(&master_urls);
let self_url = format!("{}:{}", config.ip, config.port);
let (http_client, outgoing_http_scheme) = build_outgoing_http_client(&config)?;
let outgoing_grpc_tls = load_outgoing_grpc_tls(&config)?;
@@ -324,7 +328,9 @@ async fn run(
),
read_mode: config.read_mode,
master_url,
master_urls: config.masters.clone(),
master_urls,
seed_master_set,
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url,
http_client,
outgoing_http_scheme,

View File

@@ -3898,6 +3898,24 @@ impl VolumeServer for VolumeGrpcService {
let start = now_ns();
// Empty target is a self-liveness probe and stays unauthenticated.
// Otherwise gate the dial on cluster membership: volume servers only
// know masters, so any other target type is refused. Mirrors Go's
// volume_grpc_admin.go Ping admission check. tonic forbids returning
// a body alongside an error, so we surface the InvalidArgument status
// alone — behaviour-identical to Go's status.Errorf return.
if !req.target.is_empty()
&& !self
.state
.is_known_ping_target(&req.target, &req.target_type)
.await
{
return Err(Status::invalid_argument(format!(
"unknown ping target {} of type {}",
req.target, req.target_type
)));
}
// Route ping based on target type (matches Go's volume_grpc_admin.go Ping)
let remote_time_ns = if req.target_type == "volumeServer" {
match ping_volume_server_target(&req.target, self.state.outgoing_grpc_tls.as_ref())
@@ -4560,6 +4578,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
@@ -4662,6 +4682,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
@@ -4710,6 +4732,206 @@ mod tests {
.remove("s3.incr_copy_test");
}
/// Build a bare-bones service with no on-disk store but a configurable
/// seed master set, for Ping admission tests. Matches the structure of
/// `make_local_service_with_volume` minus the volume bits.
fn make_service_with_seed_masters(seeds: &[&str]) -> (VolumeGrpcService, TempDir) {
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut store = Store::new(NeedleMapKind::InMemory);
store
.add_location(
dir,
dir,
10,
DiskType::HardDrive,
MinFreeSpace::Percent(1.0),
Vec::new(),
)
.unwrap();
let master_urls: Vec<String> = seeds.iter().map(|s| (*s).to_string()).collect();
let seed_master_set =
crate::server::volume_server::VolumeServerState::build_seed_master_set(&master_urls);
let state = Arc::new(VolumeServerState {
store: RwLock::new(store),
guard: RwLock::new(Guard::new(
&[],
SigningKey(vec![]),
0,
SigningKey(vec![]),
0,
)),
is_stopping: RwLock::new(false),
maintenance: std::sync::atomic::AtomicBool::new(false),
state_version: std::sync::atomic::AtomicU32::new(0),
concurrent_upload_limit: 0,
concurrent_download_limit: 0,
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
inflight_download_data_timeout: std::time::Duration::from_secs(60),
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
upload_notify: tokio::sync::Notify::new(),
download_notify: tokio::sync::Notify::new(),
data_center: String::new(),
rack: String::new(),
file_size_limit_bytes: 0,
maintenance_byte_per_second: 0,
is_heartbeating: std::sync::atomic::AtomicBool::new(true),
has_master: false,
pre_stop_seconds: 0,
volume_state_notify: tokio::sync::Notify::new(),
write_queue: std::sync::OnceLock::new(),
s3_tier_registry: std::sync::RwLock::new(
crate::remote_storage::s3_tier::S3TierRegistry::new(),
),
read_mode: crate::config::ReadMode::Local,
master_url: master_urls.first().cloned().unwrap_or_default(),
master_urls,
seed_master_set,
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),
outgoing_grpc_tls: None,
metrics_runtime: std::sync::RwLock::new(
crate::server::volume_server::RuntimeMetricsConfig::default(),
),
metrics_notify: tokio::sync::Notify::new(),
fix_jpg_orientation: false,
has_slow_read: false,
read_buffer_size_bytes: 1024 * 1024,
security_file: String::new(),
cli_white_list: vec![],
state_file_path: String::new(),
});
(VolumeGrpcService { state }, tmp)
}
#[tokio::test]
async fn test_ping_empty_target_is_self_probe() {
// Empty target stays unauthenticated and returns Ok with timing fields
// populated — it is the local liveness probe path.
let (service, _tmp) = make_service_with_seed_masters(&[]);
let response = service
.ping(Request::new(volume_server_pb::PingRequest {
target: String::new(),
target_type: String::new(),
}))
.await
.expect("empty target ping must succeed");
let inner = response.into_inner();
assert!(inner.start_time_ns > 0);
assert!(inner.stop_time_ns >= inner.start_time_ns);
}
#[tokio::test]
async fn test_ping_seed_master_target_passes_admission() {
// A target that matches a configured seed master clears admission.
// The dial itself may or may not succeed depending on what's listening
// on the loopback; either way, the response must not be the
// InvalidArgument the gate would surface.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9333".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
assert!(
!err.message().contains("unknown ping target"),
"admission gate should have allowed this target: {}",
err.message()
);
}
}
#[tokio::test]
async fn test_ping_unknown_master_target_rejected() {
// A master-type target not in the seed list and not the current
// master is refused with InvalidArgument.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let err = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9999".to_string(),
target_type: "master".to_string(),
}))
.await
.expect_err("unknown master target must be rejected");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert_eq!(
err.message(),
"unknown ping target localhost:9999 of type master"
);
}
#[tokio::test]
async fn test_ping_volume_server_target_always_rejected() {
// Volume servers do not maintain a peer-volume list, so volumeServer
// pings are refused regardless of address.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
let err = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:8080".to_string(),
target_type: "volumeServer".to_string(),
}))
.await
.expect_err("volumeServer target must be rejected");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert_eq!(
err.message(),
"unknown ping target localhost:8080 of type volumeServer"
);
}
#[tokio::test]
async fn test_ping_current_master_target_passes_admission() {
// A target that matches the current (post-leader-change) master also
// clears admission, even if it is not in the seed list. Pick a port
// that is extremely unlikely to be live so the test does not flake on
// a developer machine that happens to be running a real master.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
// Simulate the heartbeat goroutine having moved to a new leader.
*service.state.current_master_url.write().await = "127.0.0.1:1".to_string();
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "127.0.0.1:1".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
assert!(
!err.message().contains("unknown ping target"),
"leader-change master should be admitted: {}",
err.message()
);
}
}
#[tokio::test]
async fn test_ping_target_with_grpc_port_suffix_is_normalised() {
// Seed masters in pb.ServerAddress form (`host:port.grpcPort`) must
// match a Ping target sent in plain `host:port` form, since the gate
// normalises both sides through to_http_address.
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333.19333"]);
let result = service
.ping(Request::new(volume_server_pb::PingRequest {
target: "localhost:9333".to_string(),
target_type: "master".to_string(),
}))
.await;
if let Err(err) = result {
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
}
}
#[tokio::test]
async fn test_volume_ec_shards_generate_persists_expire_at_sec() {
let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap();

View File

@@ -397,6 +397,15 @@ async fn do_heartbeat(
let mut response_stream = client.send_heartbeat(stream).await?.into_inner();
info!("Heartbeat stream established with {}", grpc_addr);
// Publish the master we're now talking to in canonical http host:port
// form so Ping admission can recognise it once a leader change moves us
// off the seed list. Mirrors Go's vs.setCurrentMaster(masterAddress).
{
let normalised =
super::volume_server::to_http_address(current_master).into_owned();
let mut guard = state.current_master_url.write().await;
*guard = normalised;
}
if is_stopping(state) {
state.is_heartbeating.store(false, Ordering::Relaxed);
send_deregister_heartbeat(config, state, &tx).await;
@@ -1033,6 +1042,8 @@ mod tests {
read_mode: ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),

View File

@@ -81,6 +81,16 @@ pub struct VolumeServerState {
pub master_url: String,
/// Seed master addresses for UI rendering.
pub master_urls: Vec<String>,
/// Canonical http `host:port` form of every configured seed master.
/// Built once at construction so Ping admission stays O(1). Mirrors
/// Go's `seedMasterSet` on `VolumeServer`.
pub seed_master_set: std::collections::HashSet<String>,
/// Current master this server is heartbeating with, in canonical http
/// `host:port` form. Empty when no heartbeat connection is active. The
/// heartbeat goroutine writes; admission reads — the lock keeps them
/// from racing on a leader change. Mirrors Go's `currentMaster` plus
/// `currentMasterLock`.
pub current_master_url: tokio::sync::RwLock<String>,
/// This server's own address (ip:port) for filtering self from lookup results.
pub self_url: String,
/// HTTP client for proxy requests and master lookups.
@@ -117,6 +127,35 @@ impl VolumeServerState {
}
Ok(())
}
/// Build the seed master set from a list of raw `host:port[.grpcPort]`
/// addresses, normalised the same way Go's `pb.ServerAddress.ToHttpAddress`
/// does (drop the `.grpcPort` suffix, preserve everything else).
pub fn build_seed_master_set(master_urls: &[String]) -> std::collections::HashSet<String> {
master_urls
.iter()
.map(|m| to_http_address(m).into_owned())
.collect()
}
/// Returns true iff `target` (normalised to canonical http `host:port`)
/// is a master this server already knows about. Volume servers do not
/// keep a peer-volume or peer-filer list, so Ping is scoped to masters.
/// Mirrors Go's `VolumeServer.isKnownPingTarget`.
pub async fn is_known_ping_target(&self, target: &str, target_type: &str) -> bool {
if target_type != "master" {
return false;
}
let key = to_http_address(target).into_owned();
if key.is_empty() {
return false;
}
let current = self.current_master_url.read().await.clone();
if !current.is_empty() && current == key {
return true;
}
self.seed_master_set.contains(&key)
}
}
pub fn build_metrics_router() -> Router {

View File

@@ -207,6 +207,8 @@ mod tests {
read_mode: crate::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),

View File

@@ -101,6 +101,8 @@ fn test_state_with_guard(
read_mode: seaweed_volume::config::ReadMode::Local,
master_url: String::new(),
master_urls: Vec::new(),
seed_master_set: std::collections::HashSet::new(),
current_master_url: tokio::sync::RwLock::new(String::new()),
self_url: String::new(),
http_client: reqwest::Client::new(),
outgoing_http_scheme: "http".to_string(),

View File

@@ -8,6 +8,9 @@ import (
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -321,15 +324,18 @@ func TestPingVolumeTargetAndLeaveAffectsHealthz(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Volume servers only track their masters as ping peers, so drive the
// success path through the master target and rely on the master->volume
// admission for cross-type coverage.
pingResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.VolumeServerType,
Target: clusterHarness.VolumeServerAddress(),
TargetType: cluster.MasterType,
Target: clusterHarness.MasterAddress(),
})
if err != nil {
t.Fatalf("Ping target volume server failed: %v", err)
t.Fatalf("Ping master from volume server failed: %v", err)
}
if pingResp.GetRemoteTimeNs() == 0 {
t.Fatalf("expected remote timestamp from ping target volume server")
t.Fatalf("expected remote timestamp from ping master")
}
if _, err = grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil {
@@ -399,20 +405,21 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
unknownResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
// The volume server gates Ping on a known peer list. Unknown target types
// and addresses outside that list are rejected with InvalidArgument
// before any outbound dial is attempted.
_, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: "unknown-type",
Target: "127.0.0.1:12345",
})
if err != nil {
t.Fatalf("Ping unknown target type should not return grpc error, got: %v", err)
if err == nil {
t.Fatalf("Ping unknown target type should be rejected by admission")
}
if unknownResp.GetRemoteTimeNs() != 0 {
t.Fatalf("Ping unknown target type expected remote_time_ns=0, got %d", unknownResp.GetRemoteTimeNs())
}
if unknownResp.GetStopTimeNs() < unknownResp.GetStartTimeNs() {
t.Fatalf("Ping unknown target type expected stop_time_ns >= start_time_ns")
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping unknown target type expected InvalidArgument, got %s: %v", got, err)
}
// Empty target stays as an unauthenticated self-liveness probe.
emptyTargetResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{})
if err != nil {
t.Fatalf("Ping empty target should not return grpc error, got: %v", err)
@@ -424,26 +431,30 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) {
t.Fatalf("Ping empty target expected stop_time_ns >= start_time_ns")
}
// 127.0.0.1:1 is not in the seed/current master set, so admission rejects
// the call before it can reach the network.
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.MasterType,
Target: "127.0.0.1:1",
})
if err == nil {
t.Fatalf("Ping master target should fail when target is unreachable")
t.Fatalf("Ping master target should be rejected when not in the known-peer set")
}
if !strings.Contains(err.Error(), "ping master") {
t.Fatalf("Ping master unreachable error mismatch: %v", err)
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping unknown master expected InvalidArgument, got %s: %v", got, err)
}
// Volume servers do not carry a peer-filer list at all; any filer target
// is rejected at admission regardless of reachability.
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.FilerType,
Target: "127.0.0.1:1",
})
if err == nil {
t.Fatalf("Ping filer target should fail when target is unreachable")
t.Fatalf("Ping filer target should be rejected by admission")
}
if !strings.Contains(err.Error(), "ping filer") {
t.Fatalf("Ping filer unreachable error mismatch: %v", err)
if got := status.Code(err); got != codes.InvalidArgument {
t.Fatalf("Ping filer expected InvalidArgument, got %s: %v", got, err)
}
}
@@ -479,6 +490,10 @@ func TestPingFilerTargetSuccess(t *testing.T) {
t.Skip("skipping integration test in short mode")
}
// Volume servers do not maintain a peer-filer index, so drive the
// filer-presence success path through the volume server's master peer:
// reaching that master in a filer-bearing cluster is sufficient signal
// that filer joined the cluster without breaking volume-server ping.
clusterHarness := framework.StartSingleVolumeClusterWithFiler(t, matrix.P1())
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
@@ -487,16 +502,16 @@ func TestPingFilerTargetSuccess(t *testing.T) {
defer cancel()
resp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
TargetType: cluster.FilerType,
Target: clusterHarness.FilerServerAddress(),
TargetType: cluster.MasterType,
Target: clusterHarness.MasterAddress(),
})
if err != nil {
t.Fatalf("Ping filer target success path failed: %v", err)
t.Fatalf("Ping master from filer-bearing cluster failed: %v", err)
}
if resp.GetRemoteTimeNs() == 0 {
t.Fatalf("Ping filer target expected non-zero remote time")
t.Fatalf("Ping master expected non-zero remote time")
}
if resp.GetStopTimeNs() < resp.GetStartTimeNs() {
t.Fatalf("Ping filer target expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
t.Fatalf("Ping master expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
}
}

View File

@@ -138,6 +138,38 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri
return
}
// IsKnownNode reports whether address is currently registered under nodeType
// in any filer group. The lookup is intentionally group-agnostic because callers
// (e.g. Ping admission) only know the target address, not the group it joined.
func (cluster *Cluster) IsKnownNode(nodeType string, address pb.ServerAddress) bool {
var groups *ClusterNodeGroups
switch nodeType {
case FilerType:
groups = cluster.filerGroups
case BrokerType:
groups = cluster.brokerGroups
case S3Type:
groups = cluster.s3Groups
default:
return false
}
groups.RLock()
defer groups.RUnlock()
for _, members := range groups.groupMembers {
if _, found := members.members[address]; found {
return true
}
// fall back to a port-tolerant comparison so callers that omit the
// grpc-port suffix still match a registered peer
for stored := range members.members {
if stored.Equals(address) {
return true
}
}
}
return false
}
func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{

View File

@@ -39,3 +39,28 @@ func TestConcurrentAddRemoveNodes(t *testing.T) {
}
wg.Wait()
}
func TestIsKnownNode(t *testing.T) {
c := NewCluster()
filer := pb.ServerAddress("10.0.0.20:8888")
c.AddClusterNode("", FilerType, "dc1", "rack1", filer, "test")
if !c.IsKnownNode(FilerType, filer) {
t.Fatalf("registered filer %s should be known", filer)
}
if c.IsKnownNode(VolumeServerType, filer) {
t.Fatalf("filer address must not be accepted as a volume server target")
}
if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:1")) {
t.Fatalf("unregistered low-port target must be rejected")
}
if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:65000")) {
t.Fatalf("unregistered high-port target must be rejected")
}
if c.IsKnownNode(FilerType, pb.ServerAddress("example.com:443")) {
t.Fatalf("unrelated host must be rejected")
}
if c.IsKnownNode("garbage", filer) {
t.Fatalf("unknown node type must be rejected")
}
}

View File

@@ -87,6 +87,23 @@ func (ma *MetaAggregator) HasRemotePeers() bool {
return false
}
// HasPeer reports whether address is currently a tracked filer peer (or this
// filer's own address). Callers use this to gate operations on known cluster
// members.
func (ma *MetaAggregator) HasPeer(address pb.ServerAddress) bool {
if address == ma.self || address.Equals(ma.self) {
return true
}
ma.peerChansLock.Lock()
defer ma.peerChansLock.Unlock()
for peer := range ma.peerChans {
if peer == address || peer.Equals(address) {
return true
}
}
return false
}
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {

View File

@@ -5,6 +5,9 @@ import (
"fmt"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -44,10 +47,57 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
}, nil
}
// isKnownPingTarget reports whether target is a peer the filer has learned
// about from its master subscription (other filers, volume servers) or from
// its own master list. Restricting Ping prevents the RPC from being used as
// an arbitrary outbound dialer. All lookups are O(1) so the gate adds no
// noticeable overhead even in large clusters.
func (fs *FilerServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool {
addr := pb.ServerAddress(target)
switch targetType {
case cluster.FilerType:
if fs.filer != nil && fs.filer.MetaAggregator != nil && fs.filer.MetaAggregator.HasPeer(addr) {
return true
}
return false
case cluster.VolumeServerType:
if fs.filer != nil && fs.filer.MasterClient != nil {
return fs.filer.MasterClient.HasVolumeServer(addr)
}
return false
case cluster.MasterType:
key := addr.ToHttpAddress()
if fs.option != nil && fs.option.Masters != nil {
if _, ok := fs.option.Masters.GetInstancesAsMap()[string(addr)]; ok {
return true
}
// Fall back to a port-tolerant compare for callers that supply
// the http form when masters were registered with grpc suffix.
for _, master := range fs.option.Masters.GetInstances() {
if master.ToHttpAddress() == key {
return true
}
}
}
if fs.filer != nil && fs.filer.MasterClient != nil {
if _, ok := fs.filer.MasterClient.ListMasterSet()[key]; ok {
return true
}
}
return false
}
return false
}
func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
resp = &filer_pb.PingResponse{
StartTimeNs: time.Now().UnixNano(),
}
// Empty target is a self-liveness probe and stays unauthenticated.
if req.Target != "" && !fs.isKnownPingTarget(ctx, req.Target, req.TargetType) {
resp.StopTimeNs = time.Now().UnixNano()
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})

View File

@@ -15,6 +15,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
/*
@@ -158,10 +160,42 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
return resp, nil
}
// isKnownPingTarget reports whether target is a peer that the master has
// learned about as part of cluster membership. Restricting Ping to known
// peers avoids turning the RPC into a generic outbound dialer. The lookups
// are O(1) so the gate stays cheap on clusters with thousands of nodes.
func (ms *MasterServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool {
addr := pb.ServerAddress(target)
switch targetType {
case cluster.FilerType, cluster.BrokerType, cluster.S3Type:
return ms.Cluster.IsKnownNode(targetType, addr)
case cluster.VolumeServerType:
if ms.Topo == nil {
return false
}
return ms.Topo.LookupDataNodeByAddress(addr) != nil
case cluster.MasterType:
if ms.option != nil && ms.option.Master.Equals(addr) {
return true
}
if ms.MasterClient != nil {
_, ok := ms.MasterClient.ListMasterSet()[addr.ToHttpAddress()]
return ok
}
return false
}
return false
}
func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
resp = &master_pb.PingResponse{
StartTimeNs: time.Now().UnixNano(),
}
// Empty target is a self-liveness probe and stays unauthenticated.
if req.Target != "" && !ms.isKnownPingTarget(ctx, req.Target, req.TargetType) {
resp.StopTimeNs = time.Now().UnixNano()
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})

View File

@@ -0,0 +1,97 @@
package weed_server
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/topology"
)
func TestMasterIsKnownPingTarget(t *testing.T) {
topo := topology.NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topology.NewDataCenter("dc1")
topo.LinkChildNode(dc)
rack := topology.NewRack("rack1")
dc.LinkChildNode(rack)
dn := topology.NewDataNode("vol1")
dn.Ip = "10.0.0.10"
dn.Port = 8080
dn.GrpcPort = 18080
rack.LinkChildNode(dn)
c := cluster.NewCluster()
filerAddr := pb.ServerAddress("10.0.0.20:8888")
c.AddClusterNode("", cluster.FilerType, "dc1", "rack1", filerAddr, "test")
ms := &MasterServer{
option: &MasterOption{Master: pb.ServerAddress("10.0.0.1:9333")},
Topo: topo,
Cluster: c,
}
ctx := context.Background()
cases := []struct {
name string
target string
targetType string
want bool
}{
{"known filer", string(filerAddr), cluster.FilerType, true},
{"known volume server", "10.0.0.10:8080.18080", cluster.VolumeServerType, true},
{"known volume server http addr", "10.0.0.10:8080", cluster.VolumeServerType, true},
{"known self master", "10.0.0.1:9333", cluster.MasterType, true},
{"unknown localhost low port", "127.0.0.1:1", cluster.VolumeServerType, false},
{"unknown localhost high port", "127.0.0.1:65000", cluster.FilerType, false},
{"unrelated host", "example.com:443", cluster.MasterType, false},
{"unknown target type", string(filerAddr), "garbage", false},
{"filer address checked as volume server", string(filerAddr), cluster.VolumeServerType, false},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
got := ms.isKnownPingTarget(ctx, tc.target, tc.targetType)
if got != tc.want {
t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want)
}
})
}
}
func TestVolumeServerIsKnownPingTarget(t *testing.T) {
seed := pb.ServerAddress("10.0.0.1:9333")
vs := &VolumeServer{
SeedMasterNodes: []pb.ServerAddress{seed},
seedMasterSet: map[string]struct{}{seed.ToHttpAddress(): {}},
}
vs.setCurrentMaster(pb.ServerAddress("10.0.0.2:9333"))
cases := []struct {
name string
target string
targetType string
want bool
}{
{"seed master", string(seed), cluster.MasterType, true},
{"current master", "10.0.0.2:9333", cluster.MasterType, true},
{"other volume server", "10.0.0.5:8080", cluster.VolumeServerType, false},
{"random filer", "10.0.0.6:8888", cluster.FilerType, false},
{"unknown low port", "127.0.0.1:1", cluster.MasterType, false},
{"unknown high port", "127.0.0.1:65000", cluster.MasterType, false},
{"unrelated host", "example.com:443", cluster.MasterType, false},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
got := vs.isKnownPingTarget(tc.target, tc.targetType)
if got != tc.want {
t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want)
}
})
}
}

View File

@@ -447,10 +447,37 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
}
// isKnownPingTarget reports whether target is a master this volume server
// already knows about. Volume servers do not maintain a peer-volume or
// peer-filer list, so Ping is scoped to the masters they heartbeat with.
// The current-master read is taken under a lock to avoid racing with the
// heartbeat goroutine that rewrites it on leader changes, and the seed
// list is consulted via a pre-built set so the check stays O(1).
func (vs *VolumeServer) isKnownPingTarget(target string, targetType string) bool {
if targetType != cluster.MasterType {
return false
}
addr := pb.ServerAddress(target)
key := addr.ToHttpAddress()
if key == "" {
return false
}
if current := vs.getCurrentMaster(); current != "" && current.ToHttpAddress() == key {
return true
}
_, ok := vs.seedMasterSet[key]
return ok
}
func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
resp = &volume_server_pb.PingResponse{
StartTimeNs: time.Now().UnixNano(),
}
// Empty target is a self-liveness probe and stays unauthenticated.
if req.Target != "" && !vs.isKnownPingTarget(req.Target, req.TargetType) {
resp.StopTimeNs = time.Now().UnixNano()
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})

View File

@@ -24,9 +24,26 @@ import (
)
func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress {
return vs.getCurrentMaster()
}
// getCurrentMaster returns vs.currentMaster under a read lock so callers
// (e.g. Ping admission) do not race with the heartbeat goroutine that
// rewrites it on leader changes.
func (vs *VolumeServer) getCurrentMaster() pb.ServerAddress {
vs.currentMasterLock.RLock()
defer vs.currentMasterLock.RUnlock()
return vs.currentMaster
}
// setCurrentMaster updates vs.currentMaster under a write lock. The
// heartbeat goroutine calls this whenever it (re)connects to a master.
func (vs *VolumeServer) setCurrentMaster(master pb.ServerAddress) {
vs.currentMasterLock.Lock()
vs.currentMaster = master
vs.currentMasterLock.Unlock()
}
func (vs *VolumeServer) checkWithMaster() (err error) {
for {
for _, master := range vs.SeedMasterNodes {
@@ -126,7 +143,7 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
return "", err
}
glog.V(0).Infof("Heartbeat to: %v", masterAddress)
vs.currentMaster = masterAddress
vs.setCurrentMaster(masterAddress)
doneChan := make(chan error, 1)
@@ -176,16 +193,19 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
if volumeOptsChanged {
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.getCurrentMaster(), err)
return
}
}
}
if in.GetLeader() != "" && !vs.currentMaster.Equals(pb.ServerAddress(in.GetLeader())) {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
newLeader = pb.ServerAddress(in.GetLeader())
doneChan <- nil
return
if in.GetLeader() != "" {
current := vs.getCurrentMaster()
if !current.Equals(pb.ServerAddress(in.GetLeader())) {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), current)
newLeader = pb.ServerAddress(in.GetLeader())
doneChan <- nil
return
}
}
}
}()

View File

@@ -34,9 +34,14 @@ type VolumeServer struct {
readBufferSizeMB int
SeedMasterNodes []pb.ServerAddress
whiteList []string
currentMaster pb.ServerAddress
pulsePeriod time.Duration
// seedMasterSet mirrors SeedMasterNodes keyed by the canonical http
// form. It is computed once in NewVolumeServer so admission paths can
// answer is-this-a-seed-master in O(1).
seedMasterSet map[string]struct{}
whiteList []string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
pulsePeriod time.Duration
dataCenter string
rack string
store *storage.Store
@@ -117,7 +122,15 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
vs.SeedMasterNodes = masterNodes
// Copy the caller's slice so subsequent external mutation cannot desync
// SeedMasterNodes from the frozen lookup set built below.
seedMasters := make([]pb.ServerAddress, len(masterNodes))
copy(seedMasters, masterNodes)
vs.SeedMasterNodes = seedMasters
vs.seedMasterSet = make(map[string]struct{}, len(seedMasters))
for _, m := range seedMasters {
vs.seedMasterSet[m.ToHttpAddress()] = struct{}{}
}
vs.checkWithMaster()

View File

@@ -213,28 +213,11 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
}
}
// check between volume servers
for _, sourceVolumeServer := range volumeServers {
for _, targetVolumeServer := range volumeServers {
if sourceVolumeServer == targetVolumeServer {
continue
}
fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
Target: string(targetVolumeServer),
TargetType: cluster.VolumeServerType,
})
if err == nil {
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
}
return err
})
if err != nil {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// Direct volume-to-volume connectivity is intentionally not validated
// here. Each volume server now restricts Ping to peers it can identify
// (its configured/current masters), so it does not carry a peer-volume
// list to drive a mesh check from. The master->volume and filer->volume
// probes above do not exercise volume-to-volume reachability.
// check between filers, and need to connect to itself
for _, sourceFiler := range filers {

View File

@@ -424,6 +424,13 @@ func (n *NodeImpl) doLinkChildNode(node Node) {
}
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
node.SetParent(n)
// Maintain the topology's address index so Ping admission and other
// callers can resolve a data node from its address in O(1).
if dn, ok := node.GetValue().(*DataNode); ok {
if topo := n.GetTopology(); topo != nil {
topo.registerDataNodeAddress(dn)
}
}
glog.V(0).Infoln(n, "adds child", node.Id())
}
}
@@ -433,6 +440,13 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
defer n.Unlock()
node := n.children[nodeId]
if node != nil {
// Drop the topology address index before clearing the parent pointer
// so GetTopology() can still walk up to the root.
if dn, ok := node.GetValue().(*DataNode); ok {
if topo := n.GetTopology(); topo != nil {
topo.unregisterDataNodeAddress(dn.ServerAddress(), dn)
}
}
node.SetParent(nil)
delete(n.children, node.Id())
for dt, du := range node.GetDiskUsages().negative().usages {
@@ -484,10 +498,13 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64,
}
func (n *NodeImpl) GetTopology() *Topology {
var p Node
p = n
var p Node = n
for p.Parent() != nil {
p = p.Parent()
}
return p.GetValue().(*Topology)
// A detached subtree (no Topology root in scope) must not panic; the
// callers above check the returned value for nil and skip the
// address-index maintenance in that case.
topo, _ := p.GetValue().(*Topology)
return topo
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -60,7 +61,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
// Log if IP or Port changed (e.g., pod rescheduled in K8s)
if dn.Ip != ip || dn.Port != port {
addrChanged := dn.Ip != ip || dn.Port != port
var oldAddr pb.ServerAddress
if addrChanged {
oldAddr = dn.ServerAddress()
glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
}
// Update the IP/Port in case they changed
@@ -69,6 +73,12 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
if addrChanged {
if topo := r.GetTopology(); topo != nil {
topo.unregisterDataNodeAddress(oldAddr, dn)
topo.registerDataNodeAddress(dn)
}
}
return dn
}
@@ -92,7 +102,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
delete(r.children, oldId)
dn.id = NodeId(nodeId)
r.children[NodeId(nodeId)] = dn
// Update connection info in case they changed
// Update connection info in case they changed; address itself is
// unchanged on legacy transition, so the index entry stays valid.
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()

View File

@@ -73,6 +73,14 @@ type Topology struct {
lastLeaderChangeTime time.Time
hadVolumesAtLeaderChange bool
lastLeaderChangeTimeLock sync.RWMutex
// dataNodeIndex is an address -> *DataNode lookup so callers (e.g. the
// Ping admission gate) do not have to walk every dc/rack/node tier on
// every request. Keys use the canonical http form returned by
// pb.ServerAddress.ToHttpAddress so a target like "1.2.3.4:8080" finds
// the same node whether or not the grpc port suffix is present.
dataNodeIndex map[string]*DataNode
dataNodeIndexLock sync.RWMutex
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -95,10 +103,65 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
t.Configuration = &Configuration{}
t.dataNodeIndex = make(map[string]*DataNode)
return t
}
// LookupDataNodeByAddress returns the registered DataNode that serves addr,
// or nil if no such node has been observed. Lookup is O(1) and uses the
// canonical http form of the address so callers that pass either
// "host:port" or "host:port.grpc" find the same node.
func (t *Topology) LookupDataNodeByAddress(addr pb.ServerAddress) *DataNode {
if addr == "" {
return nil
}
t.dataNodeIndexLock.RLock()
defer t.dataNodeIndexLock.RUnlock()
if t.dataNodeIndex == nil {
return nil
}
return t.dataNodeIndex[addr.ToHttpAddress()]
}
// registerDataNodeAddress records dn in the address index under its current
// http address. Callers must invoke unregisterDataNodeAddress with the prior
// address whenever a node's Ip or Port changes (e.g. k8s pod reschedule).
func (t *Topology) registerDataNodeAddress(dn *DataNode) {
if dn == nil {
return
}
key := dn.ServerAddress().ToHttpAddress()
if key == "" {
return
}
t.dataNodeIndexLock.Lock()
defer t.dataNodeIndexLock.Unlock()
if t.dataNodeIndex == nil {
t.dataNodeIndex = make(map[string]*DataNode)
}
t.dataNodeIndex[key] = dn
}
// unregisterDataNodeAddress removes the index entry for addr, but only when
// the entry still points at dn. The conditional guard avoids dropping a
// freshly re-registered node whose address happens to alias the one being
// removed (e.g. legacy id transitions or a fast restart).
func (t *Topology) unregisterDataNodeAddress(addr pb.ServerAddress, dn *DataNode) {
if addr == "" {
return
}
key := addr.ToHttpAddress()
if key == "" {
return
}
t.dataNodeIndexLock.Lock()
defer t.dataNodeIndexLock.Unlock()
if existing, ok := t.dataNodeIndex[key]; ok && (dn == nil || existing == dn) {
delete(t.dataNodeIndex, key)
}
}
func (t *Topology) IsChildLocked() (bool, error) {
if t.IsLocked() {
return true, errors.New("topology is locked")

View File

@@ -108,6 +108,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
t.unregisterDataNodeAddress(dn.ServerAddress(), dn)
if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id())
}

View File

@@ -3,6 +3,7 @@ package topology
import (
"reflect"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/storage"
@@ -530,3 +531,45 @@ func TestDataNodeIdBasedIdentification(t *testing.T) {
t.Errorf("expected 4 DataNodes, got %d", len(children))
}
}
func TestLookupDataNodeByAddress(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := map[string]uint32{"": 10}
// Brand-new registration must be discoverable by both the http and
// grpc forms of the address.
dn := rack.GetOrCreateDataNode("10.1.2.3", 8080, 18080, "10.1.2.3:8080", "n1", maxVolumeCounts)
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != dn {
t.Fatalf("lookup by http address: got %v, want %v", got, dn)
}
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080.18080")); got != dn {
t.Fatalf("lookup by grpc-suffix address: got %v, want %v", got, dn)
}
// Unknown addresses must miss.
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("127.0.0.1:1")); got != nil {
t.Fatalf("unknown address must not be found, got %v", got)
}
// Heartbeat from a moved pod (same id, new ip) updates the index in
// place: the old address is dropped and the new one resolves.
dnMoved := rack.GetOrCreateDataNode("10.9.9.9", 8080, 18080, "10.9.9.9:8080", "n1", maxVolumeCounts)
if dnMoved != dn {
t.Fatalf("expected same node instance after move, got different")
}
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != nil {
t.Fatalf("old address must be unregistered after move, got %v", got)
}
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != dn {
t.Fatalf("new address lookup: got %v, want %v", got, dn)
}
// UnRegisterDataNode evicts the index entry.
topo.UnRegisterDataNode(dn)
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != nil {
t.Fatalf("address must be unregistered after UnRegisterDataNode, got %v", got)
}
}

View File

@@ -439,6 +439,18 @@ func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
return mc.masters.GetInstances()
}
// ListMasterSet returns a set of configured master addresses keyed by their
// canonical http form. Unlike GetMasters this does not wait for a connection,
// so it is safe to call from admission paths that must stay non-blocking.
func (mc *MasterClient) ListMasterSet() map[string]struct{} {
addrs := mc.masters.GetInstances()
set := make(map[string]struct{}, len(addrs))
for _, a := range addrs {
set[a.ToHttpAddress()] = struct{}{}
}
return set
}
// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {

View File

@@ -36,18 +36,32 @@ type vidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
ecVid2Locations map[uint32][]Location
DataCenter string
cache atomic.Pointer[vidMap]
// serverRefCount tracks how many vid locations (regular + EC) currently
// reference each volume server address. Maintaining it incrementally lets
// hasVolumeServer answer in O(1) instead of walking every volume entry.
// Keys are the canonical http form of pb.ServerAddress, so callers that
// pass either "host:port" or "host:port.grpc" find the same entry.
serverRefCount map[string]int
DataCenter string
cache atomic.Pointer[vidMap]
}
func newVidMap(dataCenter string) *vidMap {
return &vidMap{
vid2Locations: make(map[uint32][]Location),
ecVid2Locations: make(map[uint32][]Location),
serverRefCount: make(map[string]int),
DataCenter: dataCenter,
}
}
// locationServerKey returns the index key used by serverRefCount for a
// Location. The key normalises away the optional grpc-port suffix so the
// counter stays consistent with hasVolumeServer's lookup.
func locationServerKey(loc Location) string {
return loc.ServerAddress().ToHttpAddress()
}
func (vc *vidMap) isSameDataCenter(loc *Location) bool {
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
return false
@@ -152,6 +166,28 @@ func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) {
return
}
// hasVolumeServer reports whether any tracked volume (regular or EC) is hosted
// on addr. It walks the cache chain so recently expired maps are still
// considered. Used to gate admission of operations targeting a volume server.
// The lookup is O(1) thanks to serverRefCount; we still consult the cache
// chain to keep covering volume servers that just rolled out of the live map.
func (vc *vidMap) hasVolumeServer(addr pb.ServerAddress) bool {
key := addr.ToHttpAddress()
if key == "" {
return false
}
vc.RLock()
count := vc.serverRefCount[key]
vc.RUnlock()
if count > 0 {
return true
}
if cachedMap := vc.cache.Load(); cachedMap != nil {
return cachedMap.hasVolumeServer(addr)
}
return false
}
func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
@@ -161,6 +197,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
locations, found := vc.vid2Locations[vid]
if !found {
vc.vid2Locations[vid] = []Location{location}
vc.incrementServerRef(locationServerKey(location))
return
}
@@ -171,6 +208,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
}
vc.vid2Locations[vid] = append(locations, location)
vc.incrementServerRef(locationServerKey(location))
}
@@ -183,6 +221,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
locations, found := vc.ecVid2Locations[vid]
if !found {
vc.ecVid2Locations[vid] = []Location{location}
vc.incrementServerRef(locationServerKey(location))
return
}
@@ -193,6 +232,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
}
vc.ecVid2Locations[vid] = append(locations, location)
vc.incrementServerRef(locationServerKey(location))
}
@@ -214,6 +254,7 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
for i, loc := range locations {
if loc.Url == location.Url {
vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
vc.decrementServerRef(locationServerKey(loc))
break
}
}
@@ -237,6 +278,7 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
for i, loc := range locations {
if loc.Url == location.Url {
vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
vc.decrementServerRef(locationServerKey(loc))
break
}
}
@@ -250,6 +292,38 @@ func (vc *vidMap) deleteVid(vid uint32) {
vc.Lock()
defer vc.Unlock()
for _, loc := range vc.vid2Locations[vid] {
vc.decrementServerRef(locationServerKey(loc))
}
for _, loc := range vc.ecVid2Locations[vid] {
vc.decrementServerRef(locationServerKey(loc))
}
delete(vc.vid2Locations, vid)
delete(vc.ecVid2Locations, vid)
}
// incrementServerRef increases the refcount for key. Empty keys are skipped
// so a zero-value Location (which serialises to "") does not leak a permanent
// bucket that hasVolumeServer and decrementServerRef both ignore. Callers
// must hold vc's write lock.
func (vc *vidMap) incrementServerRef(key string) {
if key == "" {
return
}
vc.serverRefCount[key]++
}
// decrementServerRef decreases the refcount for key and removes the entry
// once it falls to zero. Callers must hold vc's write lock.
func (vc *vidMap) decrementServerRef(key string) {
if key == "" {
return
}
if n, ok := vc.serverRefCount[key]; ok {
if n <= 1 {
delete(vc.serverRefCount, key)
} else {
vc.serverRefCount[key] = n - 1
}
}
}

View File

@@ -117,3 +117,50 @@ func TestConcurrentGetLocations(t *testing.T) {
cancel()
wg.Wait()
}
func TestHasVolumeServer(t *testing.T) {
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
regular := Location{Url: "10.0.0.1:8080", GrpcPort: 18080}
ecOnly := Location{Url: "10.0.0.2:8080", GrpcPort: 18080}
mc.addLocation(7, regular)
mc.addEcLocation(9, ecOnly)
addr := func(u string) pb.ServerAddress { return pb.ServerAddress(u) }
if !mc.HasVolumeServer(addr("10.0.0.1:8080")) {
t.Fatalf("regular volume server must be known by http address")
}
if !mc.HasVolumeServer(addr("10.0.0.1:8080.18080")) {
t.Fatalf("regular volume server must be known by grpc-suffix address")
}
if !mc.HasVolumeServer(addr("10.0.0.2:8080")) {
t.Fatalf("ec-only volume server must be known")
}
if mc.HasVolumeServer(addr("127.0.0.1:1")) {
t.Fatalf("unknown address must not be known")
}
// Adding the same location twice must not double-count:
// deleting once should evict the server.
mc.addLocation(7, regular)
mc.deleteLocation(7, regular)
if mc.HasVolumeServer(addr("10.0.0.1:8080")) {
t.Fatalf("server should be evicted after deleteLocation")
}
// Removing the EC entry must also drop the index entry.
mc.deleteEcLocation(9, ecOnly)
if mc.HasVolumeServer(addr("10.0.0.2:8080")) {
t.Fatalf("server should be evicted after deleteEcLocation")
}
// deleteVid removes every reference held by that vid in one call.
mc.addLocation(11, regular)
mc.addEcLocation(11, regular)
mc.InvalidateCache("11,abc")
if mc.HasVolumeServer(addr("10.0.0.1:8080")) {
t.Fatalf("server should be evicted after InvalidateCache")
}
}

View File

@@ -13,6 +13,7 @@ import (
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
)
// VolumeLocationProvider is the interface for looking up volume locations
@@ -290,6 +291,13 @@ func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string,
return vc.getStableVidMap().LookupVolumeServerUrl(vid)
}
// HasVolumeServer reports whether addr is currently a known volume server
// (hosts at least one volume or EC shard) in the cached vid map. Used by
// admission paths that must only contact peers learned from the master.
func (vc *vidMapClient) HasVolumeServer(addr pb.ServerAddress) bool {
return vc.getStableVidMap().hasVolumeServer(addr)
}
// GetDataCenter safely retrieves the data center
func (vc *vidMapClient) GetDataCenter() string {
return vc.getStableVidMap().DataCenter