diff --git a/db/config.cc b/db/config.cc index 410d0a6138..7940e792de 100644 --- a/db/config.cc +++ b/db/config.cc @@ -330,14 +330,14 @@ const config_type& config_type_for -const config_type& config_type_for>() { +const config_type& config_type_for>() { static config_type ct( - "dictionary training conditions", printable_to_json>); + "dictionary training conditions", printable_to_json>); return ct; } template <> -const config_type& config_type_for() { +const config_type& config_type_for() { static config_type ct( "advanced rpc compressor config", printable_vector_to_json>); return ct; @@ -530,9 +530,9 @@ struct convert { template <> -class convert> { +class convert> { public: - static bool decode(const Node& node, enum_option& rhs) { + static bool decode(const Node& node, enum_option& rhs) { std::string name; if (!convert::decode(node, name)) { return false; @@ -1110,7 +1110,7 @@ db::config::config(std::shared_ptr exts) "Specifies RPC compression algorithms supported by this node. ") , internode_compression_enable_advanced(this, "internode_compression_enable_advanced", liveness::MustRestart, value_status::Used, false, "Enables the new implementation of RPC compression. If disabled, Scylla will fall back to the old implementation.") - , rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_loop::when::type::NEVER, + , rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_when::type::NEVER, "Specifies when RPC compression dictionary training is performed by this node.\n" "* `never` disables it unconditionally.\n" "* `when_leader` enables it only whenever the node is the Raft leader.\n" @@ -2025,8 +2025,8 @@ template struct utils::config_file::named_value>; template struct utils::config_file::named_value>; template struct utils::config_file::named_value>; -template struct utils::config_file::named_value>; -template struct utils::config_file::named_value; +template struct utils::config_file::named_value>; +template struct utils::config_file::named_value; template struct utils::config_file::named_value>>; template struct utils::config_file::named_value>>; template struct utils::config_file::named_value>>; diff --git a/db/config.hh b/db/config.hh index bf278e2c15..650317821a 100644 --- a/db/config.hh +++ b/db/config.hh @@ -9,6 +9,7 @@ #pragma once +#include #include #include @@ -23,8 +24,7 @@ #include "gms/inet_address.hh" #include "db/hints/host_filter.hh" #include "utils/error_injection.hh" -#include "message/dict_trainer.hh" -#include "message/advanced_rpc_compressor.hh" +#include "message/rpc_compression_types.hh" #include "db/consistency_level_type.hh" #include "db/tri_mode_restriction.hh" #include "sstables/compressor.hh" @@ -325,9 +325,9 @@ public: named_value internode_compression_zstd_min_message_size; named_value internode_compression_zstd_max_message_size; named_value internode_compression_checksumming; - named_value internode_compression_algorithms; + named_value internode_compression_algorithms; named_value internode_compression_enable_advanced; - named_value> rpc_dict_training_when; + named_value> rpc_dict_training_when; named_value rpc_dict_training_min_time_seconds; named_value rpc_dict_training_min_bytes; named_value inter_dc_tcp_nodelay; @@ -739,8 +739,8 @@ extern template struct utils::config_file::named_value>; extern template struct utils::config_file::named_value>; extern template struct utils::config_file::named_value>; -extern template struct utils::config_file::named_value>; -extern template struct utils::config_file::named_value; +extern template struct utils::config_file::named_value>; +extern template struct utils::config_file::named_value; extern template struct utils::config_file::named_value>>; extern template struct utils::config_file::named_value>>; extern template struct utils::config_file::named_value>>; diff --git a/message/advanced_rpc_compressor.hh b/message/advanced_rpc_compressor.hh index 70d7d01375..57de678e88 100644 --- a/message/advanced_rpc_compressor.hh +++ b/message/advanced_rpc_compressor.hh @@ -11,9 +11,10 @@ #include #include #include + +#include "rpc_compression_types.hh" #include "utils/refcounted.hh" #include "utils/updateable_value.hh" -#include "utils/enum_option.hh" #include "shared_dict.hh" namespace netw { @@ -28,103 +29,6 @@ class dict_sampler; using dict_ptr = lw_shared_ptr>>; class control_protocol_frame; -// An enum wrapper, describing supported RPC compression algorithms. -// Always contains a valid value —- the constructors won't allow -// an invalid/unknown enum variant to be constructed. -struct compression_algorithm { - using underlying = uint8_t; - enum class type : underlying { - RAW, - LZ4, - ZSTD, - COUNT, - } _value; - // Construct from an integer. - // Used to deserialize the algorithm from the first byte of the frame. - constexpr compression_algorithm(underlying x) { - if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) { - throw std::runtime_error(fmt::format("Invalid value {} for enum compression_algorithm", static_cast(x))); - } - _value = static_cast(x); - } - // Construct from `type`. Makes sure that `type` has a valid value. - constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {} - - // These names are used in multiple places: - // RPC negotiation, in metric labels, and config. - static constexpr std::string_view names[] = { - "raw", - "lz4", - "zstd", - }; - static_assert(std::size(names) == static_cast(compression_algorithm::type::COUNT)); - - // Implements enum_option. - static auto map() { - std::unordered_map ret; - for (size_t i = 0; i < std::size(names); ++i) { - ret.insert(std::make_pair(std::string(names[i]), compression_algorithm(i).get())); - } - return ret; - } - - constexpr std::string_view name() const noexcept { return names[idx()]; } - constexpr underlying idx() const noexcept { return std::to_underlying(_value); } - constexpr type get() const noexcept { return _value; } - constexpr static size_t count() { return static_cast(type::COUNT); }; - bool operator<=>(const compression_algorithm &) const = default; -}; - - -// Represents a set of compression algorithms. -// Backed by a bitset. -// Used for convenience during algorithm negotiations. -class compression_algorithm_set { - uint8_t _bitset; - static_assert(std::numeric_limits::digits > compression_algorithm::count()); - constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {} -public: - // Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order) - // than it. - constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept { - auto x = 1 << (algo.idx()); - return {x + (x - 1)}; - } - // Returns the strongest (greatest in the enum order) algorithm in the set. - constexpr compression_algorithm heaviest() const { - return {std::bit_width(_bitset) - 1}; - } - // The usual set operations. - constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept { - return {1 << algo.idx()}; - } - constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept { - return {_bitset & o._bitset}; - } - constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept { - return {_bitset &~ o._bitset}; - } - constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept { - return {_bitset | o._bitset}; - } - constexpr bool contains(compression_algorithm algo) const noexcept { - return _bitset & (1 << algo.idx()); - } - constexpr bool operator==(const compression_algorithm_set&) const = default; - // Returns the contained bitset. Used for serialization. - constexpr uint8_t value() const noexcept { - return _bitset; - } - // Reconstructs the set from the output of `value()`. Used for deserialization. - constexpr static compression_algorithm_set from_value(uint8_t bitset) { - compression_algorithm_set x = bitset; - x.heaviest(); // This is a validation check. It will throw if the bitset contains some illegal/unknown bits. - return x; - } -}; - -using algo_config = std::vector>; - // See docs/dev/advanced_rpc_compression.md, // section `Negotiation` for more information about the protocol. struct control_protocol { @@ -248,7 +152,7 @@ struct per_algorithm_stats { // prevent a misuse of the API (dangling references). class advanced_rpc_compressor::tracker : public utils::refcounted { public: - using algo_config = algo_config; + using algo_config = netw::algo_config; struct config { utils::updateable_value zstd_min_msg_size{0}; utils::updateable_value zstd_max_msg_size{std::numeric_limits::max()}; diff --git a/message/advanced_rpc_compressor_protocol.hh b/message/advanced_rpc_compressor_protocol.hh index d3d83a2763..037787a9d1 100644 --- a/message/advanced_rpc_compressor_protocol.hh +++ b/message/advanced_rpc_compressor_protocol.hh @@ -9,7 +9,7 @@ #pragma once #include "shared_dict.hh" -#include "advanced_rpc_compressor.hh" +#include "rpc_compression_types.hh" namespace netw { diff --git a/message/dict_trainer.hh b/message/dict_trainer.hh index ec02a6e67f..11f1d023ec 100644 --- a/message/dict_trainer.hh +++ b/message/dict_trainer.hh @@ -8,6 +8,7 @@ #pragma once +#include "rpc_compression_types.hh" #include "utils/reservoir_sampling.hh" #include "utils/updateable_value.hh" #include @@ -88,28 +89,7 @@ class dict_training_loop { seastar::semaphore _pause{0}; seastar::abort_source _pause_as; public: - struct when { - enum class type { - NEVER, - WHEN_LEADER, - ALWAYS, - COUNT, - }; - static constexpr std::string_view names[] = { - "never", - "when_leader", - "always", - }; - static_assert(std::size(names) == static_cast(type::COUNT)); - // Implements enum_option. - static std::unordered_map map() { - std::unordered_map ret; - for (size_t i = 0; i < std::size(names); ++i) { - ret.insert({std::string(names[i]), type(i)}); - } - return ret; - } - }; + using when = netw::dict_training_when; void pause(); void unpause(); void cancel() noexcept; diff --git a/message/dictionary_service.cc b/message/dictionary_service.cc index 03b918b838..8583cefda0 100644 --- a/message/dictionary_service.cc +++ b/message/dictionary_service.cc @@ -54,11 +54,11 @@ dictionary_service::dictionary_service( void dictionary_service::maybe_toggle_dict_training() { auto when = _rpc_dict_training_when(); netw::dict_trainer_logger.debug("dictionary_service::maybe_toggle_dict_training(), called, _is_leader={}, when={}", _is_leader, when); - if (when == netw::dict_training_loop::when::type::NEVER) { + if (when == netw::dict_training_when::type::NEVER) { _training_fiber.pause(); - } else if (when == netw::dict_training_loop::when::type::ALWAYS) { + } else if (when == netw::dict_training_when::type::ALWAYS) { _training_fiber.unpause(); - } else if (when == netw::dict_training_loop::when::type::WHEN_LEADER) { + } else if (when == netw::dict_training_when::type::WHEN_LEADER) { _is_leader ? _training_fiber.unpause() : _training_fiber.pause(); } }; diff --git a/message/dictionary_service.hh b/message/dictionary_service.hh index 256faf69c4..2b76ecca9a 100644 --- a/message/dictionary_service.hh +++ b/message/dictionary_service.hh @@ -40,7 +40,7 @@ namespace gms { class dictionary_service { db::system_keyspace& _sys_ks; locator::host_id _our_host_id; - utils::updateable_value> _rpc_dict_training_when; + utils::updateable_value> _rpc_dict_training_when; service::raft_group0_client& _raft_group0_client; abort_source& _as; netw::dict_training_loop _training_fiber; @@ -48,7 +48,7 @@ class dictionary_service { bool _is_leader = false; utils::observer _leadership_observer; - utils::observer> _when_observer; + utils::observer> _when_observer; std::optional _feature_observer; void maybe_toggle_dict_training(); @@ -61,7 +61,7 @@ public: locator::host_id our_host_id = Uninitialized(); utils::updateable_value rpc_dict_training_min_time_seconds = Uninitialized(); utils::updateable_value rpc_dict_training_min_bytes = Uninitialized(); - utils::updateable_value> rpc_dict_training_when = Uninitialized(); + utils::updateable_value> rpc_dict_training_when = Uninitialized(); }; // Note: the training fiber will start as soon as the relevant cluster feature is enabled. dictionary_service( diff --git a/message/rpc_compression_types.hh b/message/rpc_compression_types.hh new file mode 100644 index 0000000000..f3363cd07a --- /dev/null +++ b/message/rpc_compression_types.hh @@ -0,0 +1,155 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utils/enum_option.hh" + +namespace netw { + +// An enum wrapper, describing supported RPC compression algorithms. +// Always contains a valid value -- the constructors won't allow +// an invalid/unknown enum variant to be constructed. +struct compression_algorithm { + using underlying = uint8_t; + enum class type : underlying { + RAW, + LZ4, + ZSTD, + COUNT, + } _value; + + // Construct from an integer. + // Used to deserialize the algorithm from the first byte of the frame. + constexpr compression_algorithm(underlying x) { + if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) { + throw std::runtime_error(std::string("Invalid value ") + std::to_string(unsigned(x)) + " for enum compression_algorithm"); + } + _value = static_cast(x); + } + + // Construct from `type`. Makes sure that `type` has a valid value. + constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {} + + // These names are used in multiple places: + // RPC negotiation, in metric labels, and config. + static constexpr std::string_view names[] = { + "raw", + "lz4", + "zstd", + }; + static_assert(std::size(names) == static_cast(compression_algorithm::type::COUNT)); + + // Implements enum_option. + static auto map() { + std::unordered_map ret; + for (size_t i = 0; i < std::size(names); ++i) { + ret.insert(std::make_pair(std::string(names[i]), compression_algorithm(i).get())); + } + return ret; + } + + constexpr std::string_view name() const noexcept { return names[idx()]; } + constexpr underlying idx() const noexcept { return std::to_underlying(_value); } + constexpr type get() const noexcept { return _value; } + constexpr static size_t count() { return static_cast(type::COUNT); } + bool operator<=>(const compression_algorithm&) const = default; +}; + +// Represents a set of compression algorithms. +// Backed by a bitset. +// Used for convenience during algorithm negotiations. +class compression_algorithm_set { + uint8_t _bitset; + static_assert(std::numeric_limits::digits > compression_algorithm::count()); + constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {} +public: + // Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order) + // than it. + constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept { + auto x = 1 << algo.idx(); + return {uint8_t(x + (x - 1))}; + } + + // Returns the strongest (greatest in the enum order) algorithm in the set. + constexpr compression_algorithm heaviest() const { + return {compression_algorithm::underlying(std::bit_width(_bitset) - 1)}; + } + + // The usual set operations. + constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept { + return {uint8_t(1 << algo.idx())}; + } + constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept { + return {uint8_t(_bitset & o._bitset)}; + } + constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept { + return {uint8_t(_bitset &~ o._bitset)}; + } + constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept { + return {uint8_t(_bitset | o._bitset)}; + } + constexpr bool contains(compression_algorithm algo) const noexcept { + return _bitset & (1 << algo.idx()); + } + constexpr bool operator==(const compression_algorithm_set&) const = default; + + // Returns the contained bitset. Used for serialization. + constexpr uint8_t value() const noexcept { + return _bitset; + } + + // Reconstructs the set from the output of `value()`. Used for deserialization. + constexpr static compression_algorithm_set from_value(uint8_t bitset) { + compression_algorithm_set x = bitset; + x.heaviest(); // Validation: throws on illegal/unknown bits. + return x; + } +}; + +using algo_config = std::vector>; + +struct dict_training_when { + enum class type { + NEVER, + WHEN_LEADER, + ALWAYS, + COUNT, + }; + + static constexpr std::string_view names[] = { + "never", + "when_leader", + "always", + }; + static_assert(std::size(names) == static_cast(type::COUNT)); + + // Implements enum_option. + static std::unordered_map map() { + std::unordered_map ret; + for (size_t i = 0; i < std::size(names); ++i) { + ret.insert({std::string(names[i]), type(i)}); + } + return ret; + } +}; + +} // namespace netw