topology: let banned node know that it is banned
Currently if a banned node tries to connect to a cluster it fails to create connections, but has no idea why, so from inside the node it looks like it has communication problems. This patch adds new rpc NOTIFY_BANNED which is sent back to the node when its connection is dropped. On receiving the rpc the node isolates itself and print an informative message about why it did so. Closes scylladb/scylladb#26943
This commit is contained in:
committed by
Patryk Jędrzejczak
parent
d4b77c422f
commit
39cec4ae45
@@ -66,5 +66,6 @@ struct join_node_response_result {};
|
||||
verb [[ip]] join_node_query (raft::server_id dst_id, service::join_node_query_params) -> service::join_node_query_result;
|
||||
verb [[ip]] join_node_request (raft::server_id dst_id, service::join_node_request_params) -> service::join_node_request_result;
|
||||
verb join_node_response (raft::server_id dst_id, service::join_node_response_params) -> service::join_node_response_result;
|
||||
verb [[with_client_info, one_way, ip]] notify_banned (raft::server_id dst_id);
|
||||
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "gms/generation-number.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include "message/msg_addr.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
@@ -520,7 +521,10 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
|
||||
auto peer_host_id = locator::host_id(*host_id);
|
||||
if (is_host_banned(peer_host_id)) {
|
||||
ci.server.abort_connection(ci.conn_id);
|
||||
return make_ready_future<rpc::no_wait_type>(rpc::no_wait);
|
||||
return ser::join_node_rpc_verbs::send_notify_banned(this, msg_addr{broadcast_address}, raft::server_id{*host_id}).then_wrapped([] (future<> f) {
|
||||
f.ignore_ready_future();
|
||||
return rpc::no_wait;
|
||||
});
|
||||
}
|
||||
ci.attach_auxiliary("host_id", peer_host_id);
|
||||
ci.attach_auxiliary("baddr", broadcast_address);
|
||||
@@ -681,6 +685,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::RAFT_ADD_ENTRY:
|
||||
case messaging_verb::RAFT_MODIFY_CONFIG:
|
||||
case messaging_verb::RAFT_PULL_SNAPSHOT:
|
||||
case messaging_verb::NOTIFY_BANNED:
|
||||
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
|
||||
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
|
||||
// only verbs which are 'rare' and 'cheap'.
|
||||
|
||||
@@ -210,7 +210,8 @@ enum class messaging_verb : int32_t {
|
||||
REPAIR_UPDATE_COMPACTION_CTRL = 81,
|
||||
REPAIR_UPDATE_REPAIRED_AT_FOR_MERGE = 82,
|
||||
WORK_ON_VIEW_BUILDING_TASKS = 83,
|
||||
LAST = 84,
|
||||
NOTIFY_BANNED = 84,
|
||||
LAST = 85,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -7844,6 +7844,21 @@ void storage_service::init_messaging_service() {
|
||||
return make_ready_future<join_node_query_result>(std::move(result));
|
||||
});
|
||||
});
|
||||
ser::join_node_rpc_verbs::register_notify_banned(&_messaging.local(), [this] (const rpc::client_info& cinfo, raft::server_id dst_id) {
|
||||
auto src_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
||||
return container().invoke_on(0, [src_id, dst_id] (auto& ss) -> future<rpc::no_wait_type> {
|
||||
if (ss.my_host_id() != locator::host_id{dst_id.uuid()}) {
|
||||
rtlogger.warn("received notify_banned from {} for {}, but my id is {}, ignoring", src_id, dst_id, ss.my_host_id());
|
||||
} else if (ss._topology_state_machine._topology.tstate != topology::transition_state::left_token_ring &&
|
||||
!ss._topology_state_machine._topology.left_nodes.contains(dst_id) && !ss._group0_as.abort_requested()) {
|
||||
// Ignore rpc if the node is already shutting down or during decommissioning because
|
||||
// the node is expected to shut itself down after being banned.
|
||||
rtlogger.info("received notification of being banned from the cluster from {}, terminating.", src_id);
|
||||
_exit(0);
|
||||
}
|
||||
co_return rpc::no_wait_type{};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::uninit_messaging_service() {
|
||||
|
||||
@@ -22,9 +22,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.nightly
|
||||
async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
|
||||
"""Test that a node banned from the cluster is not able to perform inserts
|
||||
that require communicating with other nodes."""
|
||||
async def test_banned_node_notification(manager: ManagerClient) -> None:
|
||||
"""Test that a node banned from the cluster get notification about been banned"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
@@ -32,11 +31,6 @@ async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
|
||||
srvs = await manager.servers_add(3, config=config)
|
||||
cql = manager.get_cql()
|
||||
|
||||
# Use RF=2 keyspace and below CL=ALL so that performing an INSERT requires
|
||||
# communicating with another node.
|
||||
ks = await create_new_test_keyspace(cql, "with replication = {'class': 'SimpleStrategy', 'replication_factor': 2}")
|
||||
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
||||
|
||||
# Pause one of the servers so other nodes mark it as dead and we can remove it.
|
||||
# We deliberately don't shut it down, but only pause it - we want to test
|
||||
# that we solved the harder problem of safely removing nodes which didn't shut down.
|
||||
@@ -50,15 +44,8 @@ async def test_banned_node_cannot_communicate(manager: ManagerClient) -> None:
|
||||
logger.info(f"Unpausing {srvs[2]}")
|
||||
await manager.server_unpause(srvs[2].server_id)
|
||||
|
||||
# We need a separate driver session to communicate with the removed server,
|
||||
# the original driver session bugs out.
|
||||
logger.info(f"Connecting to {srvs[2]}")
|
||||
with manager.con_gen([srvs[2].ip_addr], manager.port, manager.use_ssl) as c:
|
||||
with c.connect() as s:
|
||||
logger.info(f"Connected, sending request")
|
||||
q = SimpleStatement(f'insert into {ks}.t (pk) values (0)', consistency_level=ConsistencyLevel.ALL)
|
||||
# Before introducing host banning, a removed node was able to participate
|
||||
# as if it was a normal node and, for example, could insert data into the cluster.
|
||||
# Now other nodes refuse to communicate so we'll get an exception.
|
||||
with pytest.raises((NoHostAvailable, OperationTimedOut)):
|
||||
await s.run_async(q, execution_profile='whitelist', timeout=5)
|
||||
log = await manager.server_open_log(srvs[2].server_id)
|
||||
_, matches = await log.wait_for(f"received notification of being banned from the cluster from")
|
||||
|
||||
# check that the node is notified about being banned
|
||||
assert len(matches) != 0, "The node did not log being banned from the cluster"
|
||||
Reference in New Issue
Block a user