mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-09 18:32:43 +00:00
cluster: restrict Ping RPC to known peers of the requested type (#9445)
Ping previously dialled whatever host:port the caller asked for. Gate each server's Ping handler on cluster membership: masters check the topology, registered cluster nodes, and configured master peers; volume servers only accept their seed/current masters; filers accept tracked peer filers, the master-learned volume server set, and configured masters. Use address-indexed peer lookups to keep Ping target validation O(1): - topology maintains a pb.ServerAddress -> *DataNode index alongside the dc/rack/node tree, kept in sync from doLinkChildNode and UnlinkChildNode plus the ip/port-rewrite branch in GetOrCreateDataNode. GetTopology now returns nil on a detached subtree instead of panicking, so the linkage hooks can no-op safely. - vid_map tracks a refcount per volume-server address so hasVolumeServer answers without scanning every vid location. The add path skips empty-address entries the same way the delete path already does, so a zero-value Location cannot leak a permanent serverRefCount[""] bucket. - masters reuse a cached master-address set from MasterClient instead of walking the configured peer slice on every request. - volume servers compare against a pre-built seed-master set and protect currentMaster reads/writes with an RWMutex, fixing the data race with the heartbeat goroutine. The seed slice is copied on construction so external mutation cannot desync it from the frozen lookup set. - cluster.check drops the direct volume-to-volume sweep; volume servers no longer carry a peer-volume list, and the note next to the dropped probe is reworded to make clear that direct volume-to-volume reachability is intentionally not validated by this command. Update the volume-server integration tests that drove Ping through the new admission gate: success-path coverage now targets the master peer (the only type a volume server tracks), and the unknown/unreachable path asserts the InvalidArgument the gate now returns instead of the old downstream dial error. Mirror the same admission gate in the Rust volume server crate: a seed-master HashSet built once at startup plus a tokio RwLock over the heartbeat-tracked current master, both consulted in is_known_ping_target on every Ping, with InvalidArgument returned for any target that isn't a recognised master.
This commit is contained in:
@@ -288,6 +288,10 @@ async fn run(
|
||||
config.jwt_read_signing_expires_seconds,
|
||||
);
|
||||
let master_url = config.masters.first().cloned().unwrap_or_default();
|
||||
// Defensive-copy the configured seed masters before freezing the lookup
|
||||
// set, so any later mutation of config.masters cannot desync them.
|
||||
let master_urls: Vec<String> = config.masters.clone();
|
||||
let seed_master_set = VolumeServerState::build_seed_master_set(&master_urls);
|
||||
let self_url = format!("{}:{}", config.ip, config.port);
|
||||
let (http_client, outgoing_http_scheme) = build_outgoing_http_client(&config)?;
|
||||
let outgoing_grpc_tls = load_outgoing_grpc_tls(&config)?;
|
||||
@@ -324,7 +328,9 @@ async fn run(
|
||||
),
|
||||
read_mode: config.read_mode,
|
||||
master_url,
|
||||
master_urls: config.masters.clone(),
|
||||
master_urls,
|
||||
seed_master_set,
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url,
|
||||
http_client,
|
||||
outgoing_http_scheme,
|
||||
|
||||
@@ -3898,6 +3898,24 @@ impl VolumeServer for VolumeGrpcService {
|
||||
|
||||
let start = now_ns();
|
||||
|
||||
// Empty target is a self-liveness probe and stays unauthenticated.
|
||||
// Otherwise gate the dial on cluster membership: volume servers only
|
||||
// know masters, so any other target type is refused. Mirrors Go's
|
||||
// volume_grpc_admin.go Ping admission check. tonic forbids returning
|
||||
// a body alongside an error, so we surface the InvalidArgument status
|
||||
// alone — behaviour-identical to Go's status.Errorf return.
|
||||
if !req.target.is_empty()
|
||||
&& !self
|
||||
.state
|
||||
.is_known_ping_target(&req.target, &req.target_type)
|
||||
.await
|
||||
{
|
||||
return Err(Status::invalid_argument(format!(
|
||||
"unknown ping target {} of type {}",
|
||||
req.target, req.target_type
|
||||
)));
|
||||
}
|
||||
|
||||
// Route ping based on target type (matches Go's volume_grpc_admin.go Ping)
|
||||
let remote_time_ns = if req.target_type == "volumeServer" {
|
||||
match ping_volume_server_target(&req.target, self.state.outgoing_grpc_tls.as_ref())
|
||||
@@ -4560,6 +4578,8 @@ mod tests {
|
||||
read_mode: crate::config::ReadMode::Local,
|
||||
master_url: String::new(),
|
||||
master_urls: Vec::new(),
|
||||
seed_master_set: std::collections::HashSet::new(),
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
@@ -4662,6 +4682,8 @@ mod tests {
|
||||
read_mode: crate::config::ReadMode::Local,
|
||||
master_url: String::new(),
|
||||
master_urls: Vec::new(),
|
||||
seed_master_set: std::collections::HashSet::new(),
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
@@ -4710,6 +4732,206 @@ mod tests {
|
||||
.remove("s3.incr_copy_test");
|
||||
}
|
||||
|
||||
/// Build a bare-bones service with no on-disk store but a configurable
|
||||
/// seed master set, for Ping admission tests. Matches the structure of
|
||||
/// `make_local_service_with_volume` minus the volume bits.
|
||||
fn make_service_with_seed_masters(seeds: &[&str]) -> (VolumeGrpcService, TempDir) {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let dir = tmp.path().to_str().unwrap();
|
||||
|
||||
let mut store = Store::new(NeedleMapKind::InMemory);
|
||||
store
|
||||
.add_location(
|
||||
dir,
|
||||
dir,
|
||||
10,
|
||||
DiskType::HardDrive,
|
||||
MinFreeSpace::Percent(1.0),
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let master_urls: Vec<String> = seeds.iter().map(|s| (*s).to_string()).collect();
|
||||
let seed_master_set =
|
||||
crate::server::volume_server::VolumeServerState::build_seed_master_set(&master_urls);
|
||||
|
||||
let state = Arc::new(VolumeServerState {
|
||||
store: RwLock::new(store),
|
||||
guard: RwLock::new(Guard::new(
|
||||
&[],
|
||||
SigningKey(vec![]),
|
||||
0,
|
||||
SigningKey(vec![]),
|
||||
0,
|
||||
)),
|
||||
is_stopping: RwLock::new(false),
|
||||
maintenance: std::sync::atomic::AtomicBool::new(false),
|
||||
state_version: std::sync::atomic::AtomicU32::new(0),
|
||||
concurrent_upload_limit: 0,
|
||||
concurrent_download_limit: 0,
|
||||
inflight_upload_data_timeout: std::time::Duration::from_secs(60),
|
||||
inflight_download_data_timeout: std::time::Duration::from_secs(60),
|
||||
inflight_upload_bytes: std::sync::atomic::AtomicI64::new(0),
|
||||
inflight_download_bytes: std::sync::atomic::AtomicI64::new(0),
|
||||
upload_notify: tokio::sync::Notify::new(),
|
||||
download_notify: tokio::sync::Notify::new(),
|
||||
data_center: String::new(),
|
||||
rack: String::new(),
|
||||
file_size_limit_bytes: 0,
|
||||
maintenance_byte_per_second: 0,
|
||||
is_heartbeating: std::sync::atomic::AtomicBool::new(true),
|
||||
has_master: false,
|
||||
pre_stop_seconds: 0,
|
||||
volume_state_notify: tokio::sync::Notify::new(),
|
||||
write_queue: std::sync::OnceLock::new(),
|
||||
s3_tier_registry: std::sync::RwLock::new(
|
||||
crate::remote_storage::s3_tier::S3TierRegistry::new(),
|
||||
),
|
||||
read_mode: crate::config::ReadMode::Local,
|
||||
master_url: master_urls.first().cloned().unwrap_or_default(),
|
||||
master_urls,
|
||||
seed_master_set,
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
outgoing_grpc_tls: None,
|
||||
metrics_runtime: std::sync::RwLock::new(
|
||||
crate::server::volume_server::RuntimeMetricsConfig::default(),
|
||||
),
|
||||
metrics_notify: tokio::sync::Notify::new(),
|
||||
fix_jpg_orientation: false,
|
||||
has_slow_read: false,
|
||||
read_buffer_size_bytes: 1024 * 1024,
|
||||
security_file: String::new(),
|
||||
cli_white_list: vec![],
|
||||
state_file_path: String::new(),
|
||||
});
|
||||
|
||||
(VolumeGrpcService { state }, tmp)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_empty_target_is_self_probe() {
|
||||
// Empty target stays unauthenticated and returns Ok with timing fields
|
||||
// populated — it is the local liveness probe path.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&[]);
|
||||
let response = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: String::new(),
|
||||
target_type: String::new(),
|
||||
}))
|
||||
.await
|
||||
.expect("empty target ping must succeed");
|
||||
let inner = response.into_inner();
|
||||
assert!(inner.start_time_ns > 0);
|
||||
assert!(inner.stop_time_ns >= inner.start_time_ns);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_seed_master_target_passes_admission() {
|
||||
// A target that matches a configured seed master clears admission.
|
||||
// The dial itself may or may not succeed depending on what's listening
|
||||
// on the loopback; either way, the response must not be the
|
||||
// InvalidArgument the gate would surface.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
|
||||
let result = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: "localhost:9333".to_string(),
|
||||
target_type: "master".to_string(),
|
||||
}))
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
|
||||
assert!(
|
||||
!err.message().contains("unknown ping target"),
|
||||
"admission gate should have allowed this target: {}",
|
||||
err.message()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_unknown_master_target_rejected() {
|
||||
// A master-type target not in the seed list and not the current
|
||||
// master is refused with InvalidArgument.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
|
||||
let err = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: "localhost:9999".to_string(),
|
||||
target_type: "master".to_string(),
|
||||
}))
|
||||
.await
|
||||
.expect_err("unknown master target must be rejected");
|
||||
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||
assert_eq!(
|
||||
err.message(),
|
||||
"unknown ping target localhost:9999 of type master"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_volume_server_target_always_rejected() {
|
||||
// Volume servers do not maintain a peer-volume list, so volumeServer
|
||||
// pings are refused regardless of address.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
|
||||
let err = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: "localhost:8080".to_string(),
|
||||
target_type: "volumeServer".to_string(),
|
||||
}))
|
||||
.await
|
||||
.expect_err("volumeServer target must be rejected");
|
||||
assert_eq!(err.code(), tonic::Code::InvalidArgument);
|
||||
assert_eq!(
|
||||
err.message(),
|
||||
"unknown ping target localhost:8080 of type volumeServer"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_current_master_target_passes_admission() {
|
||||
// A target that matches the current (post-leader-change) master also
|
||||
// clears admission, even if it is not in the seed list. Pick a port
|
||||
// that is extremely unlikely to be live so the test does not flake on
|
||||
// a developer machine that happens to be running a real master.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333"]);
|
||||
// Simulate the heartbeat goroutine having moved to a new leader.
|
||||
*service.state.current_master_url.write().await = "127.0.0.1:1".to_string();
|
||||
|
||||
let result = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: "127.0.0.1:1".to_string(),
|
||||
target_type: "master".to_string(),
|
||||
}))
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
|
||||
assert!(
|
||||
!err.message().contains("unknown ping target"),
|
||||
"leader-change master should be admitted: {}",
|
||||
err.message()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping_target_with_grpc_port_suffix_is_normalised() {
|
||||
// Seed masters in pb.ServerAddress form (`host:port.grpcPort`) must
|
||||
// match a Ping target sent in plain `host:port` form, since the gate
|
||||
// normalises both sides through to_http_address.
|
||||
let (service, _tmp) = make_service_with_seed_masters(&["localhost:9333.19333"]);
|
||||
let result = service
|
||||
.ping(Request::new(volume_server_pb::PingRequest {
|
||||
target: "localhost:9333".to_string(),
|
||||
target_type: "master".to_string(),
|
||||
}))
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
assert_ne!(err.code(), tonic::Code::InvalidArgument, "got {err:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_volume_ec_shards_generate_persists_expire_at_sec() {
|
||||
let ttl = crate::storage::needle::ttl::TTL::read("3m").unwrap();
|
||||
|
||||
@@ -397,6 +397,15 @@ async fn do_heartbeat(
|
||||
let mut response_stream = client.send_heartbeat(stream).await?.into_inner();
|
||||
|
||||
info!("Heartbeat stream established with {}", grpc_addr);
|
||||
// Publish the master we're now talking to in canonical http host:port
|
||||
// form so Ping admission can recognise it once a leader change moves us
|
||||
// off the seed list. Mirrors Go's vs.setCurrentMaster(masterAddress).
|
||||
{
|
||||
let normalised =
|
||||
super::volume_server::to_http_address(current_master).into_owned();
|
||||
let mut guard = state.current_master_url.write().await;
|
||||
*guard = normalised;
|
||||
}
|
||||
if is_stopping(state) {
|
||||
state.is_heartbeating.store(false, Ordering::Relaxed);
|
||||
send_deregister_heartbeat(config, state, &tx).await;
|
||||
@@ -1033,6 +1042,8 @@ mod tests {
|
||||
read_mode: ReadMode::Local,
|
||||
master_url: String::new(),
|
||||
master_urls: Vec::new(),
|
||||
seed_master_set: std::collections::HashSet::new(),
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
|
||||
@@ -81,6 +81,16 @@ pub struct VolumeServerState {
|
||||
pub master_url: String,
|
||||
/// Seed master addresses for UI rendering.
|
||||
pub master_urls: Vec<String>,
|
||||
/// Canonical http `host:port` form of every configured seed master.
|
||||
/// Built once at construction so Ping admission stays O(1). Mirrors
|
||||
/// Go's `seedMasterSet` on `VolumeServer`.
|
||||
pub seed_master_set: std::collections::HashSet<String>,
|
||||
/// Current master this server is heartbeating with, in canonical http
|
||||
/// `host:port` form. Empty when no heartbeat connection is active. The
|
||||
/// heartbeat goroutine writes; admission reads — the lock keeps them
|
||||
/// from racing on a leader change. Mirrors Go's `currentMaster` plus
|
||||
/// `currentMasterLock`.
|
||||
pub current_master_url: tokio::sync::RwLock<String>,
|
||||
/// This server's own address (ip:port) for filtering self from lookup results.
|
||||
pub self_url: String,
|
||||
/// HTTP client for proxy requests and master lookups.
|
||||
@@ -117,6 +127,35 @@ impl VolumeServerState {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the seed master set from a list of raw `host:port[.grpcPort]`
|
||||
/// addresses, normalised the same way Go's `pb.ServerAddress.ToHttpAddress`
|
||||
/// does (drop the `.grpcPort` suffix, preserve everything else).
|
||||
pub fn build_seed_master_set(master_urls: &[String]) -> std::collections::HashSet<String> {
|
||||
master_urls
|
||||
.iter()
|
||||
.map(|m| to_http_address(m).into_owned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns true iff `target` (normalised to canonical http `host:port`)
|
||||
/// is a master this server already knows about. Volume servers do not
|
||||
/// keep a peer-volume or peer-filer list, so Ping is scoped to masters.
|
||||
/// Mirrors Go's `VolumeServer.isKnownPingTarget`.
|
||||
pub async fn is_known_ping_target(&self, target: &str, target_type: &str) -> bool {
|
||||
if target_type != "master" {
|
||||
return false;
|
||||
}
|
||||
let key = to_http_address(target).into_owned();
|
||||
if key.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let current = self.current_master_url.read().await.clone();
|
||||
if !current.is_empty() && current == key {
|
||||
return true;
|
||||
}
|
||||
self.seed_master_set.contains(&key)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_metrics_router() -> Router {
|
||||
|
||||
@@ -207,6 +207,8 @@ mod tests {
|
||||
read_mode: crate::config::ReadMode::Local,
|
||||
master_url: String::new(),
|
||||
master_urls: Vec::new(),
|
||||
seed_master_set: std::collections::HashSet::new(),
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
|
||||
@@ -101,6 +101,8 @@ fn test_state_with_guard(
|
||||
read_mode: seaweed_volume::config::ReadMode::Local,
|
||||
master_url: String::new(),
|
||||
master_urls: Vec::new(),
|
||||
seed_master_set: std::collections::HashSet::new(),
|
||||
current_master_url: tokio::sync::RwLock::new(String::new()),
|
||||
self_url: String::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
outgoing_http_scheme: "http".to_string(),
|
||||
|
||||
@@ -8,6 +8,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
||||
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
@@ -321,15 +324,18 @@ func TestPingVolumeTargetAndLeaveAffectsHealthz(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Volume servers only track their masters as ping peers, so drive the
|
||||
// success path through the master target and rely on the master->volume
|
||||
// admission for cross-type coverage.
|
||||
pingResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
TargetType: cluster.VolumeServerType,
|
||||
Target: clusterHarness.VolumeServerAddress(),
|
||||
TargetType: cluster.MasterType,
|
||||
Target: clusterHarness.MasterAddress(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Ping target volume server failed: %v", err)
|
||||
t.Fatalf("Ping master from volume server failed: %v", err)
|
||||
}
|
||||
if pingResp.GetRemoteTimeNs() == 0 {
|
||||
t.Fatalf("expected remote timestamp from ping target volume server")
|
||||
t.Fatalf("expected remote timestamp from ping master")
|
||||
}
|
||||
|
||||
if _, err = grpcClient.VolumeServerLeave(ctx, &volume_server_pb.VolumeServerLeaveRequest{}); err != nil {
|
||||
@@ -399,20 +405,21 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
unknownResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
// The volume server gates Ping on a known peer list. Unknown target types
|
||||
// and addresses outside that list are rejected with InvalidArgument
|
||||
// before any outbound dial is attempted.
|
||||
_, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
TargetType: "unknown-type",
|
||||
Target: "127.0.0.1:12345",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Ping unknown target type should not return grpc error, got: %v", err)
|
||||
if err == nil {
|
||||
t.Fatalf("Ping unknown target type should be rejected by admission")
|
||||
}
|
||||
if unknownResp.GetRemoteTimeNs() != 0 {
|
||||
t.Fatalf("Ping unknown target type expected remote_time_ns=0, got %d", unknownResp.GetRemoteTimeNs())
|
||||
}
|
||||
if unknownResp.GetStopTimeNs() < unknownResp.GetStartTimeNs() {
|
||||
t.Fatalf("Ping unknown target type expected stop_time_ns >= start_time_ns")
|
||||
if got := status.Code(err); got != codes.InvalidArgument {
|
||||
t.Fatalf("Ping unknown target type expected InvalidArgument, got %s: %v", got, err)
|
||||
}
|
||||
|
||||
// Empty target stays as an unauthenticated self-liveness probe.
|
||||
emptyTargetResp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{})
|
||||
if err != nil {
|
||||
t.Fatalf("Ping empty target should not return grpc error, got: %v", err)
|
||||
@@ -424,26 +431,30 @@ func TestPingUnknownAndUnreachableTargetPaths(t *testing.T) {
|
||||
t.Fatalf("Ping empty target expected stop_time_ns >= start_time_ns")
|
||||
}
|
||||
|
||||
// 127.0.0.1:1 is not in the seed/current master set, so admission rejects
|
||||
// the call before it can reach the network.
|
||||
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
TargetType: cluster.MasterType,
|
||||
Target: "127.0.0.1:1",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("Ping master target should fail when target is unreachable")
|
||||
t.Fatalf("Ping master target should be rejected when not in the known-peer set")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "ping master") {
|
||||
t.Fatalf("Ping master unreachable error mismatch: %v", err)
|
||||
if got := status.Code(err); got != codes.InvalidArgument {
|
||||
t.Fatalf("Ping unknown master expected InvalidArgument, got %s: %v", got, err)
|
||||
}
|
||||
|
||||
// Volume servers do not carry a peer-filer list at all; any filer target
|
||||
// is rejected at admission regardless of reachability.
|
||||
_, err = grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
TargetType: cluster.FilerType,
|
||||
Target: "127.0.0.1:1",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("Ping filer target should fail when target is unreachable")
|
||||
t.Fatalf("Ping filer target should be rejected by admission")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "ping filer") {
|
||||
t.Fatalf("Ping filer unreachable error mismatch: %v", err)
|
||||
if got := status.Code(err); got != codes.InvalidArgument {
|
||||
t.Fatalf("Ping filer expected InvalidArgument, got %s: %v", got, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -479,6 +490,10 @@ func TestPingFilerTargetSuccess(t *testing.T) {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
// Volume servers do not maintain a peer-filer index, so drive the
|
||||
// filer-presence success path through the volume server's master peer:
|
||||
// reaching that master in a filer-bearing cluster is sufficient signal
|
||||
// that filer joined the cluster without breaking volume-server ping.
|
||||
clusterHarness := framework.StartSingleVolumeClusterWithFiler(t, matrix.P1())
|
||||
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
||||
defer conn.Close()
|
||||
@@ -487,16 +502,16 @@ func TestPingFilerTargetSuccess(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
resp, err := grpcClient.Ping(ctx, &volume_server_pb.PingRequest{
|
||||
TargetType: cluster.FilerType,
|
||||
Target: clusterHarness.FilerServerAddress(),
|
||||
TargetType: cluster.MasterType,
|
||||
Target: clusterHarness.MasterAddress(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Ping filer target success path failed: %v", err)
|
||||
t.Fatalf("Ping master from filer-bearing cluster failed: %v", err)
|
||||
}
|
||||
if resp.GetRemoteTimeNs() == 0 {
|
||||
t.Fatalf("Ping filer target expected non-zero remote time")
|
||||
t.Fatalf("Ping master expected non-zero remote time")
|
||||
}
|
||||
if resp.GetStopTimeNs() < resp.GetStartTimeNs() {
|
||||
t.Fatalf("Ping filer target expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
|
||||
t.Fatalf("Ping master expected stop >= start, got start=%d stop=%d", resp.GetStartTimeNs(), resp.GetStopTimeNs())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +138,38 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri
|
||||
return
|
||||
}
|
||||
|
||||
// IsKnownNode reports whether address is currently registered under nodeType
|
||||
// in any filer group. The lookup is intentionally group-agnostic because callers
|
||||
// (e.g. Ping admission) only know the target address, not the group it joined.
|
||||
func (cluster *Cluster) IsKnownNode(nodeType string, address pb.ServerAddress) bool {
|
||||
var groups *ClusterNodeGroups
|
||||
switch nodeType {
|
||||
case FilerType:
|
||||
groups = cluster.filerGroups
|
||||
case BrokerType:
|
||||
groups = cluster.brokerGroups
|
||||
case S3Type:
|
||||
groups = cluster.s3Groups
|
||||
default:
|
||||
return false
|
||||
}
|
||||
groups.RLock()
|
||||
defer groups.RUnlock()
|
||||
for _, members := range groups.groupMembers {
|
||||
if _, found := members.members[address]; found {
|
||||
return true
|
||||
}
|
||||
// fall back to a port-tolerant comparison so callers that omit the
|
||||
// grpc-port suffix still match a registered peer
|
||||
for stored := range members.members {
|
||||
if stored.Equals(address) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
|
||||
result = append(result, &master_pb.KeepConnectedResponse{
|
||||
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
|
||||
|
||||
@@ -39,3 +39,28 @@ func TestConcurrentAddRemoveNodes(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestIsKnownNode(t *testing.T) {
|
||||
c := NewCluster()
|
||||
filer := pb.ServerAddress("10.0.0.20:8888")
|
||||
c.AddClusterNode("", FilerType, "dc1", "rack1", filer, "test")
|
||||
|
||||
if !c.IsKnownNode(FilerType, filer) {
|
||||
t.Fatalf("registered filer %s should be known", filer)
|
||||
}
|
||||
if c.IsKnownNode(VolumeServerType, filer) {
|
||||
t.Fatalf("filer address must not be accepted as a volume server target")
|
||||
}
|
||||
if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:1")) {
|
||||
t.Fatalf("unregistered low-port target must be rejected")
|
||||
}
|
||||
if c.IsKnownNode(FilerType, pb.ServerAddress("127.0.0.1:65000")) {
|
||||
t.Fatalf("unregistered high-port target must be rejected")
|
||||
}
|
||||
if c.IsKnownNode(FilerType, pb.ServerAddress("example.com:443")) {
|
||||
t.Fatalf("unrelated host must be rejected")
|
||||
}
|
||||
if c.IsKnownNode("garbage", filer) {
|
||||
t.Fatalf("unknown node type must be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,23 @@ func (ma *MetaAggregator) HasRemotePeers() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// HasPeer reports whether address is currently a tracked filer peer (or this
|
||||
// filer's own address). Callers use this to gate operations on known cluster
|
||||
// members.
|
||||
func (ma *MetaAggregator) HasPeer(address pb.ServerAddress) bool {
|
||||
if address == ma.self || address.Equals(ma.self) {
|
||||
return true
|
||||
}
|
||||
ma.peerChansLock.Lock()
|
||||
defer ma.peerChansLock.Unlock()
|
||||
for peer := range ma.peerChans {
|
||||
if peer == address || peer.Equals(address) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
|
||||
lastTsNs := startFrom.UnixNano()
|
||||
for {
|
||||
|
||||
@@ -5,6 +5,9 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
@@ -44,10 +47,57 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isKnownPingTarget reports whether target is a peer the filer has learned
|
||||
// about from its master subscription (other filers, volume servers) or from
|
||||
// its own master list. Restricting Ping prevents the RPC from being used as
|
||||
// an arbitrary outbound dialer. All lookups are O(1) so the gate adds no
|
||||
// noticeable overhead even in large clusters.
|
||||
func (fs *FilerServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool {
|
||||
addr := pb.ServerAddress(target)
|
||||
switch targetType {
|
||||
case cluster.FilerType:
|
||||
if fs.filer != nil && fs.filer.MetaAggregator != nil && fs.filer.MetaAggregator.HasPeer(addr) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
case cluster.VolumeServerType:
|
||||
if fs.filer != nil && fs.filer.MasterClient != nil {
|
||||
return fs.filer.MasterClient.HasVolumeServer(addr)
|
||||
}
|
||||
return false
|
||||
case cluster.MasterType:
|
||||
key := addr.ToHttpAddress()
|
||||
if fs.option != nil && fs.option.Masters != nil {
|
||||
if _, ok := fs.option.Masters.GetInstancesAsMap()[string(addr)]; ok {
|
||||
return true
|
||||
}
|
||||
// Fall back to a port-tolerant compare for callers that supply
|
||||
// the http form when masters were registered with grpc suffix.
|
||||
for _, master := range fs.option.Masters.GetInstances() {
|
||||
if master.ToHttpAddress() == key {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
if fs.filer != nil && fs.filer.MasterClient != nil {
|
||||
if _, ok := fs.filer.MasterClient.ListMasterSet()[key]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
|
||||
resp = &filer_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
// Empty target is a self-liveness probe and stays unauthenticated.
|
||||
if req.Target != "" && !fs.isKnownPingTarget(ctx, req.Target, req.TargetType) {
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
|
||||
@@ -15,6 +15,8 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -158,10 +160,42 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// isKnownPingTarget reports whether target is a peer that the master has
|
||||
// learned about as part of cluster membership. Restricting Ping to known
|
||||
// peers avoids turning the RPC into a generic outbound dialer. The lookups
|
||||
// are O(1) so the gate stays cheap on clusters with thousands of nodes.
|
||||
func (ms *MasterServer) isKnownPingTarget(ctx context.Context, target string, targetType string) bool {
|
||||
addr := pb.ServerAddress(target)
|
||||
switch targetType {
|
||||
case cluster.FilerType, cluster.BrokerType, cluster.S3Type:
|
||||
return ms.Cluster.IsKnownNode(targetType, addr)
|
||||
case cluster.VolumeServerType:
|
||||
if ms.Topo == nil {
|
||||
return false
|
||||
}
|
||||
return ms.Topo.LookupDataNodeByAddress(addr) != nil
|
||||
case cluster.MasterType:
|
||||
if ms.option != nil && ms.option.Master.Equals(addr) {
|
||||
return true
|
||||
}
|
||||
if ms.MasterClient != nil {
|
||||
_, ok := ms.MasterClient.ListMasterSet()[addr.ToHttpAddress()]
|
||||
return ok
|
||||
}
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
|
||||
resp = &master_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
// Empty target is a self-liveness probe and stays unauthenticated.
|
||||
if req.Target != "" && !ms.isKnownPingTarget(ctx, req.Target, req.TargetType) {
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
|
||||
97
weed/server/master_grpc_server_admin_ping_test.go
Normal file
97
weed/server/master_grpc_server_admin_ping_test.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||||
"github.com/seaweedfs/seaweedfs/weed/topology"
|
||||
)
|
||||
|
||||
func TestMasterIsKnownPingTarget(t *testing.T) {
|
||||
topo := topology.NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
dc := topology.NewDataCenter("dc1")
|
||||
topo.LinkChildNode(dc)
|
||||
rack := topology.NewRack("rack1")
|
||||
dc.LinkChildNode(rack)
|
||||
|
||||
dn := topology.NewDataNode("vol1")
|
||||
dn.Ip = "10.0.0.10"
|
||||
dn.Port = 8080
|
||||
dn.GrpcPort = 18080
|
||||
rack.LinkChildNode(dn)
|
||||
|
||||
c := cluster.NewCluster()
|
||||
filerAddr := pb.ServerAddress("10.0.0.20:8888")
|
||||
c.AddClusterNode("", cluster.FilerType, "dc1", "rack1", filerAddr, "test")
|
||||
|
||||
ms := &MasterServer{
|
||||
option: &MasterOption{Master: pb.ServerAddress("10.0.0.1:9333")},
|
||||
Topo: topo,
|
||||
Cluster: c,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
cases := []struct {
|
||||
name string
|
||||
target string
|
||||
targetType string
|
||||
want bool
|
||||
}{
|
||||
{"known filer", string(filerAddr), cluster.FilerType, true},
|
||||
{"known volume server", "10.0.0.10:8080.18080", cluster.VolumeServerType, true},
|
||||
{"known volume server http addr", "10.0.0.10:8080", cluster.VolumeServerType, true},
|
||||
{"known self master", "10.0.0.1:9333", cluster.MasterType, true},
|
||||
{"unknown localhost low port", "127.0.0.1:1", cluster.VolumeServerType, false},
|
||||
{"unknown localhost high port", "127.0.0.1:65000", cluster.FilerType, false},
|
||||
{"unrelated host", "example.com:443", cluster.MasterType, false},
|
||||
{"unknown target type", string(filerAddr), "garbage", false},
|
||||
{"filer address checked as volume server", string(filerAddr), cluster.VolumeServerType, false},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := ms.isKnownPingTarget(ctx, tc.target, tc.targetType)
|
||||
if got != tc.want {
|
||||
t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeServerIsKnownPingTarget(t *testing.T) {
|
||||
seed := pb.ServerAddress("10.0.0.1:9333")
|
||||
vs := &VolumeServer{
|
||||
SeedMasterNodes: []pb.ServerAddress{seed},
|
||||
seedMasterSet: map[string]struct{}{seed.ToHttpAddress(): {}},
|
||||
}
|
||||
vs.setCurrentMaster(pb.ServerAddress("10.0.0.2:9333"))
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
target string
|
||||
targetType string
|
||||
want bool
|
||||
}{
|
||||
{"seed master", string(seed), cluster.MasterType, true},
|
||||
{"current master", "10.0.0.2:9333", cluster.MasterType, true},
|
||||
{"other volume server", "10.0.0.5:8080", cluster.VolumeServerType, false},
|
||||
{"random filer", "10.0.0.6:8888", cluster.FilerType, false},
|
||||
{"unknown low port", "127.0.0.1:1", cluster.MasterType, false},
|
||||
{"unknown high port", "127.0.0.1:65000", cluster.MasterType, false},
|
||||
{"unrelated host", "example.com:443", cluster.MasterType, false},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := vs.isKnownPingTarget(tc.target, tc.targetType)
|
||||
if got != tc.want {
|
||||
t.Fatalf("isKnownPingTarget(%q,%q) = %v, want %v", tc.target, tc.targetType, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -447,10 +447,37 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
|
||||
|
||||
}
|
||||
|
||||
// isKnownPingTarget reports whether target is a master this volume server
|
||||
// already knows about. Volume servers do not maintain a peer-volume or
|
||||
// peer-filer list, so Ping is scoped to the masters they heartbeat with.
|
||||
// The current-master read is taken under a lock to avoid racing with the
|
||||
// heartbeat goroutine that rewrites it on leader changes, and the seed
|
||||
// list is consulted via a pre-built set so the check stays O(1).
|
||||
func (vs *VolumeServer) isKnownPingTarget(target string, targetType string) bool {
|
||||
if targetType != cluster.MasterType {
|
||||
return false
|
||||
}
|
||||
addr := pb.ServerAddress(target)
|
||||
key := addr.ToHttpAddress()
|
||||
if key == "" {
|
||||
return false
|
||||
}
|
||||
if current := vs.getCurrentMaster(); current != "" && current.ToHttpAddress() == key {
|
||||
return true
|
||||
}
|
||||
_, ok := vs.seedMasterSet[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
|
||||
resp = &volume_server_pb.PingResponse{
|
||||
StartTimeNs: time.Now().UnixNano(),
|
||||
}
|
||||
// Empty target is a self-liveness probe and stays unauthenticated.
|
||||
if req.Target != "" && !vs.isKnownPingTarget(req.Target, req.TargetType) {
|
||||
resp.StopTimeNs = time.Now().UnixNano()
|
||||
return resp, status.Errorf(codes.InvalidArgument, "unknown ping target %s of type %s", req.Target, req.TargetType)
|
||||
}
|
||||
if req.TargetType == cluster.FilerType {
|
||||
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
|
||||
|
||||
@@ -24,9 +24,26 @@ import (
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress {
|
||||
return vs.getCurrentMaster()
|
||||
}
|
||||
|
||||
// getCurrentMaster returns vs.currentMaster under a read lock so callers
|
||||
// (e.g. Ping admission) do not race with the heartbeat goroutine that
|
||||
// rewrites it on leader changes.
|
||||
func (vs *VolumeServer) getCurrentMaster() pb.ServerAddress {
|
||||
vs.currentMasterLock.RLock()
|
||||
defer vs.currentMasterLock.RUnlock()
|
||||
return vs.currentMaster
|
||||
}
|
||||
|
||||
// setCurrentMaster updates vs.currentMaster under a write lock. The
|
||||
// heartbeat goroutine calls this whenever it (re)connects to a master.
|
||||
func (vs *VolumeServer) setCurrentMaster(master pb.ServerAddress) {
|
||||
vs.currentMasterLock.Lock()
|
||||
vs.currentMaster = master
|
||||
vs.currentMasterLock.Unlock()
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) checkWithMaster() (err error) {
|
||||
for {
|
||||
for _, master := range vs.SeedMasterNodes {
|
||||
@@ -126,7 +143,7 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
|
||||
return "", err
|
||||
}
|
||||
glog.V(0).Infof("Heartbeat to: %v", masterAddress)
|
||||
vs.currentMaster = masterAddress
|
||||
vs.setCurrentMaster(masterAddress)
|
||||
|
||||
doneChan := make(chan error, 1)
|
||||
|
||||
@@ -176,16 +193,19 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
|
||||
if volumeOptsChanged {
|
||||
if vs.store.MaybeAdjustVolumeMax() {
|
||||
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
|
||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
|
||||
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.getCurrentMaster(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if in.GetLeader() != "" && !vs.currentMaster.Equals(pb.ServerAddress(in.GetLeader())) {
|
||||
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
|
||||
newLeader = pb.ServerAddress(in.GetLeader())
|
||||
doneChan <- nil
|
||||
return
|
||||
if in.GetLeader() != "" {
|
||||
current := vs.getCurrentMaster()
|
||||
if !current.Equals(pb.ServerAddress(in.GetLeader())) {
|
||||
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), current)
|
||||
newLeader = pb.ServerAddress(in.GetLeader())
|
||||
doneChan <- nil
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -34,9 +34,14 @@ type VolumeServer struct {
|
||||
readBufferSizeMB int
|
||||
|
||||
SeedMasterNodes []pb.ServerAddress
|
||||
whiteList []string
|
||||
currentMaster pb.ServerAddress
|
||||
pulsePeriod time.Duration
|
||||
// seedMasterSet mirrors SeedMasterNodes keyed by the canonical http
|
||||
// form. It is computed once in NewVolumeServer so admission paths can
|
||||
// answer is-this-a-seed-master in O(1).
|
||||
seedMasterSet map[string]struct{}
|
||||
whiteList []string
|
||||
currentMaster pb.ServerAddress
|
||||
currentMasterLock sync.RWMutex
|
||||
pulsePeriod time.Duration
|
||||
dataCenter string
|
||||
rack string
|
||||
store *storage.Store
|
||||
@@ -117,7 +122,15 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
||||
}
|
||||
|
||||
whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
|
||||
vs.SeedMasterNodes = masterNodes
|
||||
// Copy the caller's slice so subsequent external mutation cannot desync
|
||||
// SeedMasterNodes from the frozen lookup set built below.
|
||||
seedMasters := make([]pb.ServerAddress, len(masterNodes))
|
||||
copy(seedMasters, masterNodes)
|
||||
vs.SeedMasterNodes = seedMasters
|
||||
vs.seedMasterSet = make(map[string]struct{}, len(seedMasters))
|
||||
for _, m := range seedMasters {
|
||||
vs.seedMasterSet[m.ToHttpAddress()] = struct{}{}
|
||||
}
|
||||
|
||||
vs.checkWithMaster()
|
||||
|
||||
|
||||
@@ -213,28 +213,11 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
|
||||
}
|
||||
}
|
||||
|
||||
// check between volume servers
|
||||
for _, sourceVolumeServer := range volumeServers {
|
||||
for _, targetVolumeServer := range volumeServers {
|
||||
if sourceVolumeServer == targetVolumeServer {
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
|
||||
err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
|
||||
Target: string(targetVolumeServer),
|
||||
TargetType: cluster.VolumeServerType,
|
||||
})
|
||||
if err == nil {
|
||||
printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Fprintf(writer, "%v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Direct volume-to-volume connectivity is intentionally not validated
|
||||
// here. Each volume server now restricts Ping to peers it can identify
|
||||
// (its configured/current masters), so it does not carry a peer-volume
|
||||
// list to drive a mesh check from. The master->volume and filer->volume
|
||||
// probes above do not exercise volume-to-volume reachability.
|
||||
|
||||
// check between filers, and need to connect to itself
|
||||
for _, sourceFiler := range filers {
|
||||
|
||||
@@ -424,6 +424,13 @@ func (n *NodeImpl) doLinkChildNode(node Node) {
|
||||
}
|
||||
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
|
||||
node.SetParent(n)
|
||||
// Maintain the topology's address index so Ping admission and other
|
||||
// callers can resolve a data node from its address in O(1).
|
||||
if dn, ok := node.GetValue().(*DataNode); ok {
|
||||
if topo := n.GetTopology(); topo != nil {
|
||||
topo.registerDataNodeAddress(dn)
|
||||
}
|
||||
}
|
||||
glog.V(0).Infoln(n, "adds child", node.Id())
|
||||
}
|
||||
}
|
||||
@@ -433,6 +440,13 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
|
||||
defer n.Unlock()
|
||||
node := n.children[nodeId]
|
||||
if node != nil {
|
||||
// Drop the topology address index before clearing the parent pointer
|
||||
// so GetTopology() can still walk up to the root.
|
||||
if dn, ok := node.GetValue().(*DataNode); ok {
|
||||
if topo := n.GetTopology(); topo != nil {
|
||||
topo.unregisterDataNodeAddress(dn.ServerAddress(), dn)
|
||||
}
|
||||
}
|
||||
node.SetParent(nil)
|
||||
delete(n.children, node.Id())
|
||||
for dt, du := range node.GetDiskUsages().negative().usages {
|
||||
@@ -484,10 +498,13 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64,
|
||||
}
|
||||
|
||||
func (n *NodeImpl) GetTopology() *Topology {
|
||||
var p Node
|
||||
p = n
|
||||
var p Node = n
|
||||
for p.Parent() != nil {
|
||||
p = p.Parent()
|
||||
}
|
||||
return p.GetValue().(*Topology)
|
||||
// A detached subtree (no Topology root in scope) must not panic; the
|
||||
// callers above check the returned value for nil and skip the
|
||||
// address-index maintenance in that case.
|
||||
topo, _ := p.GetValue().(*Topology)
|
||||
return topo
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
@@ -60,7 +61,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
|
||||
if c, ok := r.children[NodeId(nodeId)]; ok {
|
||||
dn := c.(*DataNode)
|
||||
// Log if IP or Port changed (e.g., pod rescheduled in K8s)
|
||||
if dn.Ip != ip || dn.Port != port {
|
||||
addrChanged := dn.Ip != ip || dn.Port != port
|
||||
var oldAddr pb.ServerAddress
|
||||
if addrChanged {
|
||||
oldAddr = dn.ServerAddress()
|
||||
glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
|
||||
}
|
||||
// Update the IP/Port in case they changed
|
||||
@@ -69,6 +73,12 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
|
||||
dn.GrpcPort = grpcPort
|
||||
dn.PublicUrl = publicUrl
|
||||
dn.LastSeen = time.Now().Unix()
|
||||
if addrChanged {
|
||||
if topo := r.GetTopology(); topo != nil {
|
||||
topo.unregisterDataNodeAddress(oldAddr, dn)
|
||||
topo.registerDataNodeAddress(dn)
|
||||
}
|
||||
}
|
||||
return dn
|
||||
}
|
||||
|
||||
@@ -92,7 +102,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
|
||||
delete(r.children, oldId)
|
||||
dn.id = NodeId(nodeId)
|
||||
r.children[NodeId(nodeId)] = dn
|
||||
// Update connection info in case they changed
|
||||
// Update connection info in case they changed; address itself is
|
||||
// unchanged on legacy transition, so the index entry stays valid.
|
||||
dn.GrpcPort = grpcPort
|
||||
dn.PublicUrl = publicUrl
|
||||
dn.LastSeen = time.Now().Unix()
|
||||
|
||||
@@ -73,6 +73,14 @@ type Topology struct {
|
||||
lastLeaderChangeTime time.Time
|
||||
hadVolumesAtLeaderChange bool
|
||||
lastLeaderChangeTimeLock sync.RWMutex
|
||||
|
||||
// dataNodeIndex is an address -> *DataNode lookup so callers (e.g. the
|
||||
// Ping admission gate) do not have to walk every dc/rack/node tier on
|
||||
// every request. Keys use the canonical http form returned by
|
||||
// pb.ServerAddress.ToHttpAddress so a target like "1.2.3.4:8080" finds
|
||||
// the same node whether or not the grpc port suffix is present.
|
||||
dataNodeIndex map[string]*DataNode
|
||||
dataNodeIndexLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
|
||||
@@ -95,10 +103,65 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
|
||||
t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
|
||||
|
||||
t.Configuration = &Configuration{}
|
||||
t.dataNodeIndex = make(map[string]*DataNode)
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
// LookupDataNodeByAddress returns the registered DataNode that serves addr,
|
||||
// or nil if no such node has been observed. Lookup is O(1) and uses the
|
||||
// canonical http form of the address so callers that pass either
|
||||
// "host:port" or "host:port.grpc" find the same node.
|
||||
func (t *Topology) LookupDataNodeByAddress(addr pb.ServerAddress) *DataNode {
|
||||
if addr == "" {
|
||||
return nil
|
||||
}
|
||||
t.dataNodeIndexLock.RLock()
|
||||
defer t.dataNodeIndexLock.RUnlock()
|
||||
if t.dataNodeIndex == nil {
|
||||
return nil
|
||||
}
|
||||
return t.dataNodeIndex[addr.ToHttpAddress()]
|
||||
}
|
||||
|
||||
// registerDataNodeAddress records dn in the address index under its current
|
||||
// http address. Callers must invoke unregisterDataNodeAddress with the prior
|
||||
// address whenever a node's Ip or Port changes (e.g. k8s pod reschedule).
|
||||
func (t *Topology) registerDataNodeAddress(dn *DataNode) {
|
||||
if dn == nil {
|
||||
return
|
||||
}
|
||||
key := dn.ServerAddress().ToHttpAddress()
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
t.dataNodeIndexLock.Lock()
|
||||
defer t.dataNodeIndexLock.Unlock()
|
||||
if t.dataNodeIndex == nil {
|
||||
t.dataNodeIndex = make(map[string]*DataNode)
|
||||
}
|
||||
t.dataNodeIndex[key] = dn
|
||||
}
|
||||
|
||||
// unregisterDataNodeAddress removes the index entry for addr, but only when
|
||||
// the entry still points at dn. The conditional guard avoids dropping a
|
||||
// freshly re-registered node whose address happens to alias the one being
|
||||
// removed (e.g. legacy id transitions or a fast restart).
|
||||
func (t *Topology) unregisterDataNodeAddress(addr pb.ServerAddress, dn *DataNode) {
|
||||
if addr == "" {
|
||||
return
|
||||
}
|
||||
key := addr.ToHttpAddress()
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
t.dataNodeIndexLock.Lock()
|
||||
defer t.dataNodeIndexLock.Unlock()
|
||||
if existing, ok := t.dataNodeIndex[key]; ok && (dn == nil || existing == dn) {
|
||||
delete(t.dataNodeIndex, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Topology) IsChildLocked() (bool, error) {
|
||||
if t.IsLocked() {
|
||||
return true, errors.New("topology is locked")
|
||||
|
||||
@@ -108,6 +108,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
|
||||
}
|
||||
dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
|
||||
dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
|
||||
t.unregisterDataNodeAddress(dn.ServerAddress(), dn)
|
||||
if dn.Parent() != nil {
|
||||
dn.Parent().UnlinkChildNode(dn.Id())
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package topology
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage"
|
||||
@@ -530,3 +531,45 @@ func TestDataNodeIdBasedIdentification(t *testing.T) {
|
||||
t.Errorf("expected 4 DataNodes, got %d", len(children))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLookupDataNodeByAddress(t *testing.T) {
|
||||
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
||||
dc := topo.GetOrCreateDataCenter("dc1")
|
||||
rack := dc.GetOrCreateRack("rack1")
|
||||
|
||||
maxVolumeCounts := map[string]uint32{"": 10}
|
||||
|
||||
// Brand-new registration must be discoverable by both the http and
|
||||
// grpc forms of the address.
|
||||
dn := rack.GetOrCreateDataNode("10.1.2.3", 8080, 18080, "10.1.2.3:8080", "n1", maxVolumeCounts)
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != dn {
|
||||
t.Fatalf("lookup by http address: got %v, want %v", got, dn)
|
||||
}
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080.18080")); got != dn {
|
||||
t.Fatalf("lookup by grpc-suffix address: got %v, want %v", got, dn)
|
||||
}
|
||||
|
||||
// Unknown addresses must miss.
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("127.0.0.1:1")); got != nil {
|
||||
t.Fatalf("unknown address must not be found, got %v", got)
|
||||
}
|
||||
|
||||
// Heartbeat from a moved pod (same id, new ip) updates the index in
|
||||
// place: the old address is dropped and the new one resolves.
|
||||
dnMoved := rack.GetOrCreateDataNode("10.9.9.9", 8080, 18080, "10.9.9.9:8080", "n1", maxVolumeCounts)
|
||||
if dnMoved != dn {
|
||||
t.Fatalf("expected same node instance after move, got different")
|
||||
}
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.1.2.3:8080")); got != nil {
|
||||
t.Fatalf("old address must be unregistered after move, got %v", got)
|
||||
}
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != dn {
|
||||
t.Fatalf("new address lookup: got %v, want %v", got, dn)
|
||||
}
|
||||
|
||||
// UnRegisterDataNode evicts the index entry.
|
||||
topo.UnRegisterDataNode(dn)
|
||||
if got := topo.LookupDataNodeByAddress(pb.ServerAddress("10.9.9.9:8080")); got != nil {
|
||||
t.Fatalf("address must be unregistered after UnRegisterDataNode, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -439,6 +439,18 @@ func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
|
||||
return mc.masters.GetInstances()
|
||||
}
|
||||
|
||||
// ListMasterSet returns a set of configured master addresses keyed by their
|
||||
// canonical http form. Unlike GetMasters this does not wait for a connection,
|
||||
// so it is safe to call from admission paths that must stay non-blocking.
|
||||
func (mc *MasterClient) ListMasterSet() map[string]struct{} {
|
||||
addrs := mc.masters.GetInstances()
|
||||
set := make(map[string]struct{}, len(addrs))
|
||||
for _, a := range addrs {
|
||||
set[a.ToHttpAddress()] = struct{}{}
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
|
||||
// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
|
||||
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
|
||||
|
||||
@@ -36,18 +36,32 @@ type vidMap struct {
|
||||
sync.RWMutex
|
||||
vid2Locations map[uint32][]Location
|
||||
ecVid2Locations map[uint32][]Location
|
||||
DataCenter string
|
||||
cache atomic.Pointer[vidMap]
|
||||
// serverRefCount tracks how many vid locations (regular + EC) currently
|
||||
// reference each volume server address. Maintaining it incrementally lets
|
||||
// hasVolumeServer answer in O(1) instead of walking every volume entry.
|
||||
// Keys are the canonical http form of pb.ServerAddress, so callers that
|
||||
// pass either "host:port" or "host:port.grpc" find the same entry.
|
||||
serverRefCount map[string]int
|
||||
DataCenter string
|
||||
cache atomic.Pointer[vidMap]
|
||||
}
|
||||
|
||||
func newVidMap(dataCenter string) *vidMap {
|
||||
return &vidMap{
|
||||
vid2Locations: make(map[uint32][]Location),
|
||||
ecVid2Locations: make(map[uint32][]Location),
|
||||
serverRefCount: make(map[string]int),
|
||||
DataCenter: dataCenter,
|
||||
}
|
||||
}
|
||||
|
||||
// locationServerKey returns the index key used by serverRefCount for a
|
||||
// Location. The key normalises away the optional grpc-port suffix so the
|
||||
// counter stays consistent with hasVolumeServer's lookup.
|
||||
func locationServerKey(loc Location) string {
|
||||
return loc.ServerAddress().ToHttpAddress()
|
||||
}
|
||||
|
||||
func (vc *vidMap) isSameDataCenter(loc *Location) bool {
|
||||
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
|
||||
return false
|
||||
@@ -152,6 +166,28 @@ func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) {
|
||||
return
|
||||
}
|
||||
|
||||
// hasVolumeServer reports whether any tracked volume (regular or EC) is hosted
|
||||
// on addr. It walks the cache chain so recently expired maps are still
|
||||
// considered. Used to gate admission of operations targeting a volume server.
|
||||
// The lookup is O(1) thanks to serverRefCount; we still consult the cache
|
||||
// chain to keep covering volume servers that just rolled out of the live map.
|
||||
func (vc *vidMap) hasVolumeServer(addr pb.ServerAddress) bool {
|
||||
key := addr.ToHttpAddress()
|
||||
if key == "" {
|
||||
return false
|
||||
}
|
||||
vc.RLock()
|
||||
count := vc.serverRefCount[key]
|
||||
vc.RUnlock()
|
||||
if count > 0 {
|
||||
return true
|
||||
}
|
||||
if cachedMap := vc.cache.Load(); cachedMap != nil {
|
||||
return cachedMap.hasVolumeServer(addr)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (vc *vidMap) addLocation(vid uint32, location Location) {
|
||||
vc.Lock()
|
||||
defer vc.Unlock()
|
||||
@@ -161,6 +197,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
|
||||
locations, found := vc.vid2Locations[vid]
|
||||
if !found {
|
||||
vc.vid2Locations[vid] = []Location{location}
|
||||
vc.incrementServerRef(locationServerKey(location))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -171,6 +208,7 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
|
||||
}
|
||||
|
||||
vc.vid2Locations[vid] = append(locations, location)
|
||||
vc.incrementServerRef(locationServerKey(location))
|
||||
|
||||
}
|
||||
|
||||
@@ -183,6 +221,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
|
||||
locations, found := vc.ecVid2Locations[vid]
|
||||
if !found {
|
||||
vc.ecVid2Locations[vid] = []Location{location}
|
||||
vc.incrementServerRef(locationServerKey(location))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -193,6 +232,7 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
|
||||
}
|
||||
|
||||
vc.ecVid2Locations[vid] = append(locations, location)
|
||||
vc.incrementServerRef(locationServerKey(location))
|
||||
|
||||
}
|
||||
|
||||
@@ -214,6 +254,7 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
|
||||
for i, loc := range locations {
|
||||
if loc.Url == location.Url {
|
||||
vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
|
||||
vc.decrementServerRef(locationServerKey(loc))
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -237,6 +278,7 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
|
||||
for i, loc := range locations {
|
||||
if loc.Url == location.Url {
|
||||
vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
|
||||
vc.decrementServerRef(locationServerKey(loc))
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -250,6 +292,38 @@ func (vc *vidMap) deleteVid(vid uint32) {
|
||||
vc.Lock()
|
||||
defer vc.Unlock()
|
||||
|
||||
for _, loc := range vc.vid2Locations[vid] {
|
||||
vc.decrementServerRef(locationServerKey(loc))
|
||||
}
|
||||
for _, loc := range vc.ecVid2Locations[vid] {
|
||||
vc.decrementServerRef(locationServerKey(loc))
|
||||
}
|
||||
delete(vc.vid2Locations, vid)
|
||||
delete(vc.ecVid2Locations, vid)
|
||||
}
|
||||
|
||||
// incrementServerRef increases the refcount for key. Empty keys are skipped
|
||||
// so a zero-value Location (which serialises to "") does not leak a permanent
|
||||
// bucket that hasVolumeServer and decrementServerRef both ignore. Callers
|
||||
// must hold vc's write lock.
|
||||
func (vc *vidMap) incrementServerRef(key string) {
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
vc.serverRefCount[key]++
|
||||
}
|
||||
|
||||
// decrementServerRef decreases the refcount for key and removes the entry
|
||||
// once it falls to zero. Callers must hold vc's write lock.
|
||||
func (vc *vidMap) decrementServerRef(key string) {
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
if n, ok := vc.serverRefCount[key]; ok {
|
||||
if n <= 1 {
|
||||
delete(vc.serverRefCount, key)
|
||||
} else {
|
||||
vc.serverRefCount[key] = n - 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,3 +117,50 @@ func TestConcurrentGetLocations(t *testing.T) {
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestHasVolumeServer(t *testing.T) {
|
||||
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
|
||||
|
||||
regular := Location{Url: "10.0.0.1:8080", GrpcPort: 18080}
|
||||
ecOnly := Location{Url: "10.0.0.2:8080", GrpcPort: 18080}
|
||||
|
||||
mc.addLocation(7, regular)
|
||||
mc.addEcLocation(9, ecOnly)
|
||||
|
||||
addr := func(u string) pb.ServerAddress { return pb.ServerAddress(u) }
|
||||
|
||||
if !mc.HasVolumeServer(addr("10.0.0.1:8080")) {
|
||||
t.Fatalf("regular volume server must be known by http address")
|
||||
}
|
||||
if !mc.HasVolumeServer(addr("10.0.0.1:8080.18080")) {
|
||||
t.Fatalf("regular volume server must be known by grpc-suffix address")
|
||||
}
|
||||
if !mc.HasVolumeServer(addr("10.0.0.2:8080")) {
|
||||
t.Fatalf("ec-only volume server must be known")
|
||||
}
|
||||
if mc.HasVolumeServer(addr("127.0.0.1:1")) {
|
||||
t.Fatalf("unknown address must not be known")
|
||||
}
|
||||
|
||||
// Adding the same location twice must not double-count:
|
||||
// deleting once should evict the server.
|
||||
mc.addLocation(7, regular)
|
||||
mc.deleteLocation(7, regular)
|
||||
if mc.HasVolumeServer(addr("10.0.0.1:8080")) {
|
||||
t.Fatalf("server should be evicted after deleteLocation")
|
||||
}
|
||||
|
||||
// Removing the EC entry must also drop the index entry.
|
||||
mc.deleteEcLocation(9, ecOnly)
|
||||
if mc.HasVolumeServer(addr("10.0.0.2:8080")) {
|
||||
t.Fatalf("server should be evicted after deleteEcLocation")
|
||||
}
|
||||
|
||||
// deleteVid removes every reference held by that vid in one call.
|
||||
mc.addLocation(11, regular)
|
||||
mc.addEcLocation(11, regular)
|
||||
mc.InvalidateCache("11,abc")
|
||||
if mc.HasVolumeServer(addr("10.0.0.1:8080")) {
|
||||
t.Fatalf("server should be evicted after InvalidateCache")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
)
|
||||
|
||||
// VolumeLocationProvider is the interface for looking up volume locations
|
||||
@@ -290,6 +291,13 @@ func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string,
|
||||
return vc.getStableVidMap().LookupVolumeServerUrl(vid)
|
||||
}
|
||||
|
||||
// HasVolumeServer reports whether addr is currently a known volume server
|
||||
// (hosts at least one volume or EC shard) in the cached vid map. Used by
|
||||
// admission paths that must only contact peers learned from the master.
|
||||
func (vc *vidMapClient) HasVolumeServer(addr pb.ServerAddress) bool {
|
||||
return vc.getStableVidMap().hasVolumeServer(addr)
|
||||
}
|
||||
|
||||
// GetDataCenter safely retrieves the data center
|
||||
func (vc *vidMapClient) GetDataCenter() string {
|
||||
return vc.getStableVidMap().DataCenter
|
||||
|
||||
Reference in New Issue
Block a user