From 6a982ee0dcc901117e930c34bf2ca3dd409ce866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Wed, 4 Dec 2024 18:02:03 +0100 Subject: [PATCH] service: make Raft group 0 aware of system.dicts Adds glue which causes the contents of system.dicts to be sent in group 0 snapshots, and causes a callback to be called when system.dicts is updated locally. The callback is currently empty and will be hooked up to the RPC compressor tracker in one of the next commits. --- gms/feature_service.hh | 1 + service/raft/group0_state_machine.cc | 6 ++++++ service/raft/group0_state_machine.hh | 1 + service/storage_service.cc | 8 ++++++++ service/storage_service.hh | 9 +++++++++ 5 files changed, 25 insertions(+) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index a0758f22c7..dd585cb793 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -156,6 +156,7 @@ public: gms::feature test_only_feature { *this, "TEST_ONLY_FEATURE"sv }; gms::feature address_nodes_by_host_ids { *this, "ADDRESS_NODES_BY_HOST_IDS"sv }; + gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv }; public: const std::unordered_map>& registered_features() const; diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 3c472ed09c..edb6b09f8a 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -156,6 +156,9 @@ group0_state_machine::modules_to_reload group0_state_machine::get_modules_to_rel } else if (id == db::system_keyspace::role_members()->id() || id == db::system_keyspace::role_attributes()->id()) { modules.service_levels_effective_cache = true; } + if (mut.column_family_id() == db::system_keyspace::dicts()->id()) { + modules.compression_dictionary = true; + } } return modules; @@ -165,6 +168,9 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) { if (modules.service_levels_cache || modules.service_levels_effective_cache) { // this also updates SL effective cache co_await _ss.update_service_levels_cache(qos::update_both_cache_levels(modules.service_levels_cache), qos::query_context::group0); } + if (modules.compression_dictionary) { + co_await _ss.compression_dictionary_updated_callback(); + } } future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) { diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh index 27220c0cd9..976753af90 100644 --- a/service/raft/group0_state_machine.hh +++ b/service/raft/group0_state_machine.hh @@ -95,6 +95,7 @@ class group0_state_machine : public raft_state_machine { struct modules_to_reload { bool service_levels_cache = false; bool service_levels_effective_cache = false; + bool compression_dictionary = false; }; raft_group0_client& _client; diff --git a/service/storage_service.cc b/service/storage_service.cc index 167e0bdecc..62c5502cfd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -918,6 +918,11 @@ future<> storage_service::update_service_levels_cache(qos::update_both_cache_lev co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx); } +future<> storage_service::compression_dictionary_updated_callback() { + assert(this_shard_id() == 0); + return _compression_dictionary_updated_callback(); +} + // Moves the coroutine lambda onto the heap and extends its // lifetime until the resulting future is completed. // This allows to use captures in coroutine lambda after co_await-s. @@ -6990,6 +6995,9 @@ void storage_service::init_messaging_service() { if (ss._feature_service.view_build_status_on_group0) { additional_tables.push_back(db::system_keyspace::view_build_status_v2()->id()); } + if (ss._feature_service.compression_dicts) { + additional_tables.push_back(db::system_keyspace::dicts()->id()); + } } for (const auto& table : boost::join(params.tables, additional_tables)) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 7d540ae38c..d21d9df704 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -906,6 +906,13 @@ public: // update_both_cache_levels::no - update only effective service levels cache future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified); + // Should be called whenever new compression dictionaries are published to system.dicts. + // This is an arbitrary callback passed through the constructor, + // but its intended usage is to set up the RPC connections to use the new dictionaries. + // + // Must be called on shard 0. + future<> compression_dictionary_updated_callback(); + future<> do_cluster_cleanup(); // Starts the upgrade procedure to topology on raft. @@ -991,6 +998,8 @@ private: // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that abort_source _group0_as; + std::function()> _compression_dictionary_updated_callback; + friend class join_node_rpc_handshaker; friend class node_ops::node_ops_virtual_task; friend class node_ops::task_manager_module;