topology: let banned node know that it is banned

Currently if a banned node tries to connect to a cluster it fails to
create connections, but has no idea why, so from inside the node it
looks like it has communication problems. This patch adds new rpc
NOTIFY_BANNED which is sent back to the node when its connection is
dropped. On receiving the rpc the node isolates itself and print an
informative message about why it did so.

Closes scylladb/scylladb#26943
This commit is contained in:
Gleb Natapov
2025-11-10 15:32:38 +02:00
committed by Patryk Jędrzejczak
parent d4b77c422f
commit 39cec4ae45
5 changed files with 31 additions and 22 deletions

View File

@@ -66,5 +66,6 @@ struct join_node_response_result {};
verb [[ip]] join_node_query (raft::server_id dst_id, service::join_node_query_params) -> service::join_node_query_result;
verb [[ip]] join_node_request (raft::server_id dst_id, service::join_node_request_params) -> service::join_node_request_result;
verb join_node_response (raft::server_id dst_id, service::join_node_response_params) -> service::join_node_response_result;
verb [[with_client_info, one_way, ip]] notify_banned (raft::server_id dst_id);
}

View File

@@ -9,6 +9,7 @@
#include "gms/generation-number.hh"
#include "gms/inet_address.hh"
#include <seastar/core/shard_id.hh>
#include "message/msg_addr.hh"
#include "utils/assert.hh"
#include <fmt/ranges.h>
#include <seastar/core/coroutine.hh>
@@ -520,7 +521,10 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
auto peer_host_id = locator::host_id(*host_id);
if (is_host_banned(peer_host_id)) {
ci.server.abort_connection(ci.conn_id);
return make_ready_future<rpc::no_wait_type>(rpc::no_wait);
return ser::join_node_rpc_verbs::send_notify_banned(this, msg_addr{broadcast_address}, raft::server_id{*host_id}).then_wrapped([] (future<> f) {
f.ignore_ready_future();
return rpc::no_wait;
});
}
ci.attach_auxiliary("host_id", peer_host_id);
ci.attach_auxiliary("baddr", broadcast_address);
@@ -681,6 +685,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::RAFT_PULL_SNAPSHOT:
case messaging_verb::NOTIFY_BANNED:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.

View File

@@ -210,7 +210,8 @@ enum class messaging_verb : int32_t {
REPAIR_UPDATE_COMPACTION_CTRL = 81,
REPAIR_UPDATE_REPAIRED_AT_FOR_MERGE = 82,
WORK_ON_VIEW_BUILDING_TASKS = 83,
LAST = 84,
NOTIFY_BANNED = 84,
LAST = 85,
};
} // namespace netw

View File

@@ -7844,6 +7844,21 @@ void storage_service::init_messaging_service() {
return make_ready_future<join_node_query_result>(std::move(result));
});
});
ser::join_node_rpc_verbs::register_notify_banned(&_messaging.local(), [this] (const rpc::client_info& cinfo, raft::server_id dst_id) {
auto src_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(0, [src_id, dst_id] (auto& ss) -> future<rpc::no_wait_type> {
if (ss.my_host_id() != locator::host_id{dst_id.uuid()}) {
rtlogger.warn("received notify_banned from {} for {}, but my id is {}, ignoring", src_id, dst_id, ss.my_host_id());
} else if (ss._topology_state_machine._topology.tstate != topology::transition_state::left_token_ring &&
!ss._topology_state_machine._topology.left_nodes.contains(dst_id) && !ss._group0_as.abort_requested()) {
// Ignore rpc if the node is already shutting down or during decommissioning because
// the node is expected to shut itself down after being banned.
rtlogger.info("received notification of being banned from the cluster from {}, terminating.", src_id);
_exit(0);
}
co_return rpc::no_wait_type{};
});
});
}
future<> storage_service::uninit_messaging_service() {

View File

@@ -22,9 +22,8 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster is not able to perform inserts
that require communicating with other nodes."""
async def test_banned_node_notification(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': 2000
@@ -32,11 +31,6 @@ async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
srvs = await manager.servers_add(3, config=config)
cql = manager.get_cql()
# Use RF=2 keyspace and below CL=ALL so that performing an INSERT requires
# communicating with another node.
ks = await create_new_test_keyspace(cql, "with replication = {'class': 'SimpleStrategy', 'replication_factor': 2}")
await cql.run_async(f"create table {ks}.t (pk int primary key)")
# Pause one of the servers so other nodes mark it as dead and we can remove it.
# We deliberately don't shut it down, but only pause it - we want to test
# that we solved the harder problem of safely removing nodes which didn't shut down.
@@ -50,15 +44,8 @@ async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
logger.info(f"Unpausing {srvs[2]}")
await manager.server_unpause(srvs[2].server_id)
# We need a separate driver session to communicate with the removed server,
# the original driver session bugs out.
logger.info(f"Connecting to {srvs[2]}")
with manager.con_gen([srvs[2].ip_addr], manager.port, manager.use_ssl) as c:
with c.connect() as s:
logger.info(f"Connected, sending request")
q = SimpleStatement(f'insert into {ks}.t (pk) values (0)', consistency_level=ConsistencyLevel.ALL)
# Before introducing host banning, a removed node was able to participate
# as if it was a normal node and, for example, could insert data into the cluster.
# Now other nodes refuse to communicate so we'll get an exception.
with pytest.raises((NoHostAvailable, OperationTimedOut)):
await s.run_async(q, execution_profile='whitelist', timeout=5)
log = await manager.server_open_log(srvs[2].server_id)
_, matches = await log.wait_for(f"received notification of being banned from the cluster from")
# check that the node is notified about being banned
assert len(matches) != 0, "The node did not log being banned from the cluster"