diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 90e89f98da..2342edb334 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -164,6 +164,7 @@ public: gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv }; gms::feature tablet_load_stats_v2 { *this, "TABLET_LOAD_STATS_V2"sv }; gms::feature repair_based_tablet_rebuild { *this, "REPAIR_BASED_TABLET_REBUILD"sv }; + gms::feature enforced_raft_rpc_scheduling_group { *this, "ENFORCED_RAFT_RPC_SCHEDULING_GROUP"sv }; public: const std::unordered_map>& registered_features() const; diff --git a/replica/database.hh b/replica/database.hh index 2b6bf55638..e3041873cd 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1668,6 +1668,8 @@ public: seastar::scheduling_group get_streaming_scheduling_group() const { return _dbcfg.streaming_scheduling_group; } + seastar::scheduling_group get_gossip_scheduling_group() const { return _dbcfg.gossip_scheduling_group; } + compaction_manager& get_compaction_manager() { return _compaction_manager; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 08db681b6a..a81a8b2d43 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -157,6 +157,20 @@ namespace { [[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) { return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list)); } + +void check_raft_rpc_scheduling_group(const replica::database& db, const gms::feature_service& feature_service, const std::string_view rpc_name) { + if (!feature_service.enforced_raft_rpc_scheduling_group) { + return; + } + + const auto gossip_scheduling_group = db.get_gossip_scheduling_group(); + if (current_scheduling_group() != gossip_scheduling_group) { + on_internal_error_noexcept( + slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group [{}], current group is [{}], operation [{}].", + gossip_scheduling_group.name(), current_scheduling_group().name(), rpc_name)); + } +} + } // namespace static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30}; @@ -7163,11 +7177,13 @@ void storage_service::init_messaging_service() { }); ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd"); return ss.raft_topology_cmd_handler(term, cmd_index, cmd); }); }); ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot"); utils::chunked_vector mutations; // FIXME: make it an rwlock, here we only need to lock for reads, // might be useful if multiple nodes are trying to pull concurrently. @@ -7272,11 +7288,13 @@ void storage_service::init_messaging_service() { }); ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request"); return ss.join_node_request_handler(std::move(params)); }); }); ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) { return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_response"); co_await ss._join_node_group0_started.get_shared_future(ss._group0_as); if (ss._group0->load_my_id() != dst_id) { throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); @@ -7286,6 +7304,7 @@ void storage_service::init_messaging_service() { }); ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) { return handle_raft_rpc(dst_id, [] (auto& ss) -> future { + check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query"); if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) { throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time."); } diff --git a/service/storage_service.hh b/service/storage_service.hh index d8d41d4e4b..8f29d9a628 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -300,6 +300,7 @@ private: } friend struct ::node_ops_ctl; + friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view); public: const gms::gossiper& gossiper() const noexcept {