mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
service: make Raft group 0 aware of system.dicts
Adds glue which causes the contents of system.dicts to be sent in group 0 snapshots, and causes a callback to be called when system.dicts is updated locally. The callback is currently empty and will be hooked up to the RPC compressor tracker in one of the next commits.
This commit is contained in:
@@ -156,6 +156,7 @@ public:
|
||||
gms::feature test_only_feature { *this, "TEST_ONLY_FEATURE"sv };
|
||||
gms::feature address_nodes_by_host_ids { *this, "ADDRESS_NODES_BY_HOST_IDS"sv };
|
||||
|
||||
gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -156,6 +156,9 @@ group0_state_machine::modules_to_reload group0_state_machine::get_modules_to_rel
|
||||
} else if (id == db::system_keyspace::role_members()->id() || id == db::system_keyspace::role_attributes()->id()) {
|
||||
modules.service_levels_effective_cache = true;
|
||||
}
|
||||
if (mut.column_family_id() == db::system_keyspace::dicts()->id()) {
|
||||
modules.compression_dictionary = true;
|
||||
}
|
||||
}
|
||||
|
||||
return modules;
|
||||
@@ -165,6 +168,9 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
|
||||
if (modules.service_levels_cache || modules.service_levels_effective_cache) { // this also updates SL effective cache
|
||||
co_await _ss.update_service_levels_cache(qos::update_both_cache_levels(modules.service_levels_cache), qos::query_context::group0);
|
||||
}
|
||||
if (modules.compression_dictionary) {
|
||||
co_await _ss.compression_dictionary_updated_callback();
|
||||
}
|
||||
}
|
||||
|
||||
future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) {
|
||||
|
||||
@@ -95,6 +95,7 @@ class group0_state_machine : public raft_state_machine {
|
||||
struct modules_to_reload {
|
||||
bool service_levels_cache = false;
|
||||
bool service_levels_effective_cache = false;
|
||||
bool compression_dictionary = false;
|
||||
};
|
||||
|
||||
raft_group0_client& _client;
|
||||
|
||||
@@ -918,6 +918,11 @@ future<> storage_service::update_service_levels_cache(qos::update_both_cache_lev
|
||||
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
|
||||
}
|
||||
|
||||
future<> storage_service::compression_dictionary_updated_callback() {
|
||||
assert(this_shard_id() == 0);
|
||||
return _compression_dictionary_updated_callback();
|
||||
}
|
||||
|
||||
// Moves the coroutine lambda onto the heap and extends its
|
||||
// lifetime until the resulting future is completed.
|
||||
// This allows to use captures in coroutine lambda after co_await-s.
|
||||
@@ -6990,6 +6995,9 @@ void storage_service::init_messaging_service() {
|
||||
if (ss._feature_service.view_build_status_on_group0) {
|
||||
additional_tables.push_back(db::system_keyspace::view_build_status_v2()->id());
|
||||
}
|
||||
if (ss._feature_service.compression_dicts) {
|
||||
additional_tables.push_back(db::system_keyspace::dicts()->id());
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& table : boost::join(params.tables, additional_tables)) {
|
||||
|
||||
@@ -906,6 +906,13 @@ public:
|
||||
// update_both_cache_levels::no - update only effective service levels cache
|
||||
future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified);
|
||||
|
||||
// Should be called whenever new compression dictionaries are published to system.dicts.
|
||||
// This is an arbitrary callback passed through the constructor,
|
||||
// but its intended usage is to set up the RPC connections to use the new dictionaries.
|
||||
//
|
||||
// Must be called on shard 0.
|
||||
future<> compression_dictionary_updated_callback();
|
||||
|
||||
future<> do_cluster_cleanup();
|
||||
|
||||
// Starts the upgrade procedure to topology on raft.
|
||||
@@ -991,6 +998,8 @@ private:
|
||||
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
||||
abort_source _group0_as;
|
||||
|
||||
std::function<future<void>()> _compression_dictionary_updated_callback;
|
||||
|
||||
friend class join_node_rpc_handshaker;
|
||||
friend class node_ops::node_ops_virtual_task;
|
||||
friend class node_ops::task_manager_module;
|
||||
|
||||
Reference in New Issue
Block a user