Ensure raft group0 RPCs use the gossip scheduling group

Scylla operations use concurrency semaphores to limit the number
of concurrent operations and prevent resource exhaustion. The
semaphore is selected based on the current scheduling group.
For Raft group operations, it is essential to use a system semaphore to
avoid queuing behind user operations.
This commit adds a check to ensure that the raft group0 RPCs are
executed with the `gossiper` scheduling group.

(cherry picked from commit e05c082002)
This commit is contained in:
Sergey Zolotukhin
2025-01-30 09:54:34 +01:00
committed by GitHub Action
parent b0fe705c80
commit d34061818c
4 changed files with 23 additions and 0 deletions

View File

@@ -164,6 +164,7 @@ public:
gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv };
gms::feature tablet_load_stats_v2 { *this, "TABLET_LOAD_STATS_V2"sv };
gms::feature repair_based_tablet_rebuild { *this, "REPAIR_BASED_TABLET_REBUILD"sv };
gms::feature enforced_raft_rpc_scheduling_group { *this, "ENFORCED_RAFT_RPC_SCHEDULING_GROUP"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -1668,6 +1668,8 @@ public:
seastar::scheduling_group get_streaming_scheduling_group() const { return _dbcfg.streaming_scheduling_group; }
seastar::scheduling_group get_gossip_scheduling_group() const { return _dbcfg.gossip_scheduling_group; }
compaction_manager& get_compaction_manager() {
return _compaction_manager;
}

View File

@@ -157,6 +157,20 @@ namespace {
[[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) {
return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list));
}
void check_raft_rpc_scheduling_group(const replica::database& db, const gms::feature_service& feature_service, const std::string_view rpc_name) {
if (!feature_service.enforced_raft_rpc_scheduling_group) {
return;
}
const auto gossip_scheduling_group = db.get_gossip_scheduling_group();
if (current_scheduling_group() != gossip_scheduling_group) {
on_internal_error_noexcept(
slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group [{}], current group is [{}], operation [{}].",
gossip_scheduling_group.name(), current_scheduling_group().name(), rpc_name));
}
}
} // namespace
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
@@ -7163,11 +7177,13 @@ void storage_service::init_messaging_service() {
});
ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd");
return ss.raft_topology_cmd_handler(term, cmd_index, cmd);
});
});
ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future<raft_snapshot> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot");
utils::chunked_vector<canonical_mutation> mutations;
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
@@ -7272,11 +7288,13 @@ void storage_service::init_messaging_service() {
});
ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request");
return ss.join_node_request_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) {
return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future<join_node_response_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_response");
co_await ss._join_node_group0_started.get_shared_future(ss._group0_as);
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
@@ -7286,6 +7304,7 @@ void storage_service::init_messaging_service() {
});
ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) {
return handle_raft_rpc(dst_id, [] (auto& ss) -> future<join_node_query_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query");
if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) {
throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time.");
}

View File

@@ -300,6 +300,7 @@ private:
}
friend struct ::node_ops_ctl;
friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view);
public:
const gms::gossiper& gossiper() const noexcept {