From d34061818cee414cfa80778a15eadcf8a7e4aee3 Mon Sep 17 00:00:00 2001 From: Sergey Zolotukhin Date: Thu, 30 Jan 2025 09:54:34 +0100 Subject: [PATCH] Ensure raft group0 RPCs use the gossip scheduling group Scylla operations use concurrency semaphores to limit the number of concurrent operations and prevent resource exhaustion. The semaphore is selected based on the current scheduling group. For Raft group operations, it is essential to use a system semaphore to avoid queuing behind user operations. This commit adds a check to ensure that the raft group0 RPCs are executed with the `gossiper` scheduling group. (cherry picked from commit e05c082002d05485fd9276ffe2622a7ff32c8884) --- gms/feature_service.hh | 1 + replica/database.hh | 2 ++ service/storage_service.cc | 19 +++++++++++++++++++ service/storage_service.hh | 1 + 4 files changed, 23 insertions(+) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 90e89f98da..2342edb334 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -164,6 +164,7 @@ public: gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv }; gms::feature tablet_load_stats_v2 { *this, "TABLET_LOAD_STATS_V2"sv }; gms::feature repair_based_tablet_rebuild { *this, "REPAIR_BASED_TABLET_REBUILD"sv }; + gms::feature enforced_raft_rpc_scheduling_group { *this, "ENFORCED_RAFT_RPC_SCHEDULING_GROUP"sv }; public: const std::unordered_map>& registered_features() const; diff --git a/replica/database.hh b/replica/database.hh index 2b6bf55638..e3041873cd 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1668,6 +1668,8 @@ public: seastar::scheduling_group get_streaming_scheduling_group() const { return _dbcfg.streaming_scheduling_group; } + seastar::scheduling_group get_gossip_scheduling_group() const { return _dbcfg.gossip_scheduling_group; } + compaction_manager& get_compaction_manager() { return _compaction_manager; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 08db681b6a..a81a8b2d43 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -157,6 +157,20 @@ namespace { [[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) { return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list)); } + +void check_raft_rpc_scheduling_group(const replica::database& db, const gms::feature_service& feature_service, const std::string_view rpc_name) { + if (!feature_service.enforced_raft_rpc_scheduling_group) { + return; + } + + const auto gossip_scheduling_group = db.get_gossip_scheduling_group(); + if (current_scheduling_group() != gossip_scheduling_group) { + on_internal_error_noexcept( + slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group [{}], current group is [{}], operation [{}].", + gossip_scheduling_group.name(), current_scheduling_group().name(), rpc_name)); + } +} + } // namespace static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30}; @@ -7163,11 +7177,13 @@ void storage_service::init_messaging_service() { }); ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd"); return ss.raft_topology_cmd_handler(term, cmd_index, cmd); }); }); ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot"); utils::chunked_vector mutations; // FIXME: make it an rwlock, here we only need to lock for reads, // might be useful if multiple nodes are trying to pull concurrently. @@ -7272,11 +7288,13 @@ void storage_service::init_messaging_service() { }); ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request"); return ss.join_node_request_handler(std::move(params)); }); }); ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) { return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_response"); co_await ss._join_node_group0_started.get_shared_future(ss._group0_as); if (ss._group0->load_my_id() != dst_id) { throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); @@ -7286,6 +7304,7 @@ void storage_service::init_messaging_service() { }); ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) { return handle_raft_rpc(dst_id, [] (auto& ss) -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query"); if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) { throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time."); } diff --git a/service/storage_service.hh b/service/storage_service.hh index d8d41d4e4b..8f29d9a628 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -300,6 +300,7 @@ private: } friend struct ::node_ops_ctl; + friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view); public: const gms::gossiper& gossiper() const noexcept {