From 51e80691ef3ac3e0caee9e5a557edb92c8f8fdb1 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 15 May 2023 17:48:28 +0400 Subject: [PATCH 01/21] token_metadata: replace set_topology_transition_state with set_read_new This helps isolate topology::transition_state dependencies, token_metadata doesn't need the entire enum, just this boolean flag. --- locator/token_metadata.cc | 14 +++++++------- locator/token_metadata.hh | 12 +++++++----- service/storage_service.cc | 14 +++++++++++++- test/boost/token_metadata_test.cc | 2 +- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 361fed8d97..93156832b6 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -80,7 +80,7 @@ private: topology _topology; - std::optional _topology_transition_state; + token_metadata::read_new_t _read_new = token_metadata::read_new_t::no; long _ring_version = 0; static thread_local long _static_ring_version; @@ -288,8 +288,8 @@ public: std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - void set_topology_transition_state(std::optional state) { - _topology_transition_state = state; + void set_read_new(token_metadata::read_new_t read_new) { + _read_new = read_new; } public: @@ -381,7 +381,7 @@ future> token_metadata_impl::clone_only_tok co_await coroutine::maybe_yield(); } ret->_tablets = _tablets; - ret->_topology_transition_state =_topology_transition_state; + ret->_read_new = _read_new; co_return ret; } @@ -865,7 +865,7 @@ future<> token_metadata_impl::update_pending_ranges( // in order not to waste memory, we update read_endpoints only if the // new endpoints differs from the old one - if (_topology_transition_state == service::topology::transition_state::write_both_read_new && + if (_read_new == token_metadata::read_new_t::yes && new_endpoints.get_vector() != old_endpoints.get_vector()) { add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set()); @@ -1258,8 +1258,8 @@ token_metadata::endpoints_for_reading(const token& token, const sstring& keyspac } void -token_metadata::set_topology_transition_state(std::optional state) { - _impl->set_topology_transition_state(state); +token_metadata::set_read_new(read_new_t read_new) { + _impl->set_read_new(read_new); } std::multimap diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 27a9b86a72..465b53a375 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -28,7 +28,6 @@ #include "locator/types.hh" #include "locator/topology.hh" -#include "service/topology_state_machine.hh" // forward declaration since replica/database.hh includes this file namespace replica { @@ -273,10 +272,13 @@ public: // new set of replicas differs from the old one. std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - // updates the current topology_transition_state of this instance, - // this value is preserved in all clone functions, - // by default it's not set - void set_topology_transition_state(std::optional state); + // Updates the read_new flag, switching read requests from + // the old endpoints to the new ones during topology changes: + // read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints; + // read_new_t::yes - triggers update_pending_ranges to compute and store new ranges for read requests. + // The value is preserved in all clone functions, the default is read_new_t::no. + using read_new_t = bool_class; + void set_read_new(read_new_t value); /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ std::multimap get_endpoint_to_token_map_for_reading() const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 767801c9d2..6456c95ad8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -360,7 +360,19 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await add_normal_node(id, rs); } - tmptr->set_topology_transition_state(_topology_state_machine._topology.tstate); + tmptr->set_read_new(std::invoke([](std::optional state) { + using read_new_t = locator::token_metadata::read_new_t; + if (!state.has_value()) { + return read_new_t::no; + } + switch (*state) { + case topology::transition_state::commit_cdc_generation: + case topology::transition_state::write_both_read_old: + return read_new_t::no; + case topology::transition_state::write_both_read_new: + return read_new_t::yes; + } + }, _topology_state_machine._topology.tstate)); for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) { locator::host_id host_id{id.uuid()}; diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 33697035d4..1926cc25bf 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -234,7 +234,7 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); check_no_endpoints(2); - token_metadata.set_topology_transition_state(service::topology::transition_state::write_both_read_new); + token_metadata.set_read_new(locator::token_metadata::read_new_t::yes); token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); check_endpoints(2, {e3, e1}); From 10bf8c7901f45ccb411f88e39624b89680a6d99e Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 15 May 2023 18:40:06 +0400 Subject: [PATCH 02/21] token_metadata: introduce topology_change_info We plan to move pending_endpoints and read_endpoints, along with their computation logic, from token_metadata to vnode_effective_replication_map. The vnode_effective_replication_map seems more appropriate for them since it contains functionally similar _replication_map and we will be able to reuse pending_endpoints/read_endpoints across keyspaces sharing the same factory_key. At present, pending_endpoints and read_endpoints are updated in the update_pending_ranges function. The update logic comprises two parts - preparing data common to all keyspaces/replication_strategies, and calculating the migration_info for specific keyspaces. In this commit, we introduce a new topology_change_info structure to hold the first part's data add create an update_topology_change_info function to update it. This structure will later be used in vnode_effective_replication_map to compute pending_endpoints and read_endpoints. This enables the reuse of topology_change_info across all keyspaces, unlike the current update_pending_ranges implementation, which is another benefit of this refactoring. The update_topology_change_info implementation is mostly derived from update_pending_ranges, there are a few differences though: * replacing async and thread with plain co_awaits; * adding a utils::clear_gently call for the previous value to mitigate reactor stalls if target_token_metadata grows large; * substituting immediately invoked lambdas with simple variables and blocks to reduce noise, as lambdas would need to be converted into coroutines. The original update_pending_ranges remains unchanged, and will be removed entirely upon transitioning to the new implementation. Meanwhile, we add an update_topology_change_info call to storage_service::update_pending_ranges so that we can iteratively switch the system to the new implementation. --- locator/token_metadata.cc | 114 +++++++++++++++++++++++++++++++++++++ locator/token_metadata.hh | 18 ++++++ service/storage_service.cc | 4 +- 3 files changed, 135 insertions(+), 1 deletion(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 93156832b6..0a30fe8a98 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -59,6 +59,8 @@ private: // The map between the existing node to be replaced and the replacing node std::unordered_map _replacing_endpoints; + std::optional _topology_change_info; + using ring_mapping = boost::icl::interval_map>; // For each keyspace, migration_info contains ranges of tokens and // corresponding replicas to which writes or reads will be directed: @@ -257,6 +259,11 @@ public: future<> update_pending_ranges( const token_metadata& unpimplified_this, const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + const std::optional& get_topology_change_info() const { + return _topology_change_info; + } public: token get_predecessor(token t) const; @@ -876,6 +883,86 @@ future<> token_metadata_impl::update_pending_ranges( }); } +future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { + if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { + co_await utils::clear_gently(_topology_change_info); + _topology_change_info.reset(); + co_return; + } + + // true if there is a node replaced with the same IP + bool replace_with_same_endpoint = false; + // target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing + auto target_token_metadata = co_await clone_only_token_map(false); + { + // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints + std::unordered_map> new_normal_tokens; + if (!_replacing_endpoints.empty()) { + for (const auto& [token, inet_address]: _token_to_endpoint_map) { + const auto it = _replacing_endpoints.find(inet_address); + if (it == _replacing_endpoints.end()) { + continue; + } + new_normal_tokens[it->second].insert(token); + } + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + replace_with_same_endpoint = true; + } else { + target_token_metadata->remove_endpoint(replace_from); + } + } + } + for (const auto& [token, inet_address]: _bootstrap_tokens) { + new_normal_tokens[inet_address].insert(token); + } + // apply new_normal_tokens + for (auto& [endpoint, tokens]: new_normal_tokens) { + target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); + co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint); + } + // apply leaving endpoints + for (const auto& endpoint: _leaving_endpoints) { + target_token_metadata->remove_endpoint(endpoint); + } + target_token_metadata->sort_tokens(); + } + + // We require a distinct token_metadata instance when replace_from equals replace_to, + // as it ensures the node is included in pending_ranges. + // Otherwise, the node would be excluded from both pending_ranges and + // get_natural_endpoints_without_node_being_replaced, + // causing the coordinator to overlook it entirely. + std::unique_ptr base_token_metadata; + if (replace_with_same_endpoint) { + base_token_metadata = co_await clone_only_token_map(false); + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + base_token_metadata->remove_endpoint(replace_from); + } + } + base_token_metadata->sort_tokens(); + } + + // merge tokens from token_to_endpoint and bootstrap_tokens, + // preserving tokens of leaving endpoints + auto all_tokens = std::vector(); + all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); + all_tokens.resize(sorted_tokens().size()); + std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens)); + for (const auto& p: get_bootstrap_tokens()) { + all_tokens.push_back(p.first); + } + std::sort(begin(all_tokens), end(all_tokens)); + + auto prev_value = std::move(_topology_change_info); + _topology_change_info.emplace(token_metadata(std::move(target_token_metadata)), + base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt, + std::move(all_tokens), + _read_new); + co_await utils::clear_gently(prev_value); +} + size_t token_metadata_impl::count_normal_token_owners() const { return _normal_token_owners.size(); } @@ -969,6 +1056,23 @@ std::multimap token_metadata_impl::get_endpoint_to_token_ma return cloned; } +topology_change_info::topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_) + : target_token_metadata(std::move(target_token_metadata_)) + , base_token_metadata(std::move(base_token_metadata_)) + , all_tokens(std::move(all_tokens_)) + , read_new(read_new_) +{ +} + +future<> topology_change_info::clear_gently() { + co_await utils::clear_gently(target_token_metadata); + co_await utils::clear_gently(base_token_metadata); + co_await utils::clear_gently(all_tokens); +} + token_metadata::token_metadata(std::unique_ptr impl) : _impl(std::move(impl)) { } @@ -1232,6 +1336,16 @@ token_metadata::update_pending_ranges(const abstract_replication_strategy& strat return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack); } +future<> +token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { + return _impl->update_topology_change_info(get_dc_rack); +} + +const std::optional& +token_metadata::get_topology_change_info() const { + return _impl->get_topology_change_info(); +} + token token_metadata::get_predecessor(token t) const { return _impl->get_predecessor(t); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 465b53a375..947217a3cd 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -69,6 +69,7 @@ struct host_id_or_endpoint { }; class token_metadata_impl; +struct topology_change_info; class token_metadata final { std::unique_ptr _impl; @@ -256,6 +257,10 @@ public: */ future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + + const std::optional& get_topology_change_info() const; + token get_predecessor(token t) const; const std::unordered_set& get_all_endpoints() const; @@ -294,6 +299,19 @@ public: friend class token_metadata_impl; }; +struct topology_change_info { + token_metadata target_token_metadata; + std::optional base_token_metadata; + std::vector all_tokens; + token_metadata::read_new_t read_new; + + topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_); + future<> clear_gently(); +}; + using token_metadata_ptr = lw_shared_ptr; using mutable_token_metadata_ptr = lw_shared_ptr; using token_metadata_lock = semaphore_units<>; diff --git a/service/storage_service.cc b/service/storage_service.cc index 6456c95ad8..ddefa87bb7 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4750,11 +4750,13 @@ future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr assert(this_shard_id() == 0); try { + locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); + co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper); + auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); for (const auto& [keyspace_name, erm] : ks_erms) { auto& strategy = erm->get_replication_strategy(); slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason); - locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper); slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason); } From 084abc0e4430c765476d26e0e85d6585aa96f07c Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 28 Apr 2023 18:04:42 +0400 Subject: [PATCH 03/21] token_metadata: add pending_endpoints and read_endpoints to vnode_effective_replication_map In this commit, we just add fields and pass them through the constructor. Calculation and usage logic will be added later. --- locator/abstract_replication_strategy.cc | 6 ++++-- locator/abstract_replication_strategy.hh | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index d02dd45c3d..a673033d26 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -349,7 +349,8 @@ future calculate_effective_replicat } auto rf = rs->get_replication_factor(*tmptr); - co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), rf); + co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), + ring_mapping{}, ring_mapping{}, rf); } future vnode_effective_replication_map::clone_endpoints_gently() const { @@ -426,7 +427,8 @@ future effective_replication_map_factory::c if (ref_erm) { auto rf = ref_erm->get_replication_factor(); auto local_replication_map = co_await ref_erm->clone_endpoints_gently(); - new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_replication_map), rf); + new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_replication_map), + ring_mapping{}, ring_mapping{}, rf); } else { new_erm = co_await calculate_effective_replication_map(std::move(rs), std::move(tmptr)); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index d9e5275b2c..d48240c25d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -11,6 +11,8 @@ #include #include #include +#include +#include #include "gms/inet_address.hh" #include "gms/feature_service.hh" #include "locator/snitch_base.hh" @@ -154,6 +156,7 @@ public: future get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const; }; +using ring_mapping = boost::icl::interval_map>; using replication_strategy_ptr = seastar::shared_ptr; using mutable_replication_strategy_ptr = seastar::shared_ptr; @@ -250,6 +253,8 @@ public: private: replication_map _replication_map; + ring_mapping _pending_endpoints; + ring_mapping _read_endpoints; std::optional _factory_key = std::nullopt; effective_replication_map_factory* _factory = nullptr; @@ -261,9 +266,12 @@ public: // effective_replication_map inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override; std::unique_ptr make_splitter() const override; public: - explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept + explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, + ring_mapping pending_endpoints, ring_mapping read_endpoints, size_t replication_factor) noexcept : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _replication_map(std::move(replication_map)) + , _pending_endpoints(std::move(pending_endpoints)) + , _read_endpoints(std::move(read_endpoints)) { } vnode_effective_replication_map() = delete; vnode_effective_replication_map(vnode_effective_replication_map&&) = default; @@ -336,9 +344,11 @@ using mutable_vnode_effective_replication_map_ptr = shared_ptr( - std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor); + std::move(rs), std::move(tmptr), std::move(replication_map), + std::move(pending_endpoints), std::move(read_endpoints), replication_factor); } // Apply the replication strategy over the current configuration and the given token_metadata. From 4a127c37820780a0e9902fc2f56e4f2d648c5441 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 19 May 2023 22:11:09 +0400 Subject: [PATCH 04/21] stall_free.hh: relax Container requirement We don't use the return value of erase, so we can allow it to return anything. We'll need this for ring_mapping, since boost::icl::interval_map::erase(it) returns void. --- utils/stall_free.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/stall_free.hh b/utils/stall_free.hh index bb0a0dda69..28e0a747ce 100644 --- a/utils/stall_free.hh +++ b/utils/stall_free.hh @@ -95,7 +95,7 @@ concept TriviallyClearableSequence = template concept Container = Iterable && requires (T x, typename T::iterator it) { - { x.erase(it) } -> std::same_as; + x.erase(it); }; template From 700eb90ed85af8525ec81458a966ad5ab22c68ef Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 19 May 2023 22:20:18 +0400 Subject: [PATCH 05/21] stall_free.hh: add clear_gently for rvalues --- utils/stall_free.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/utils/stall_free.hh b/utils/stall_free.hh index 28e0a747ce..6d855175e9 100644 --- a/utils/stall_free.hh +++ b/utils/stall_free.hh @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "utils/collection-concepts.hh" using namespace seastar; @@ -174,6 +176,14 @@ future<> clear_gently(foreign_ptr& o) noexcept { }); } +template +requires (std::is_rvalue_reference_v && ...) +future<> clear_gently(T&&... o) noexcept { + return do_with(std::move(o)..., [](auto&... args) { + return when_all(clear_gently(args)...).discard_result(); + }); +} + template future<> clear_gently(T& o) noexcept { if (o.use_count() == 1) { From 959f9757d3034b2dda547854023008e333fc5b39 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 19 May 2023 22:25:30 +0400 Subject: [PATCH 06/21] vnode_erm: gentle destruction of _pending_endpoints and _read_endpoints Refactor ~vnode_effective_replication_map, use our new clear_gently overload for rvalue references. Add new fields _pending_endpoints and _read_endpoints to the call. vnode_efficient_replication_map::clear_gently is removed as it was not used. --- locator/abstract_replication_strategy.cc | 16 ++++------------ locator/abstract_replication_strategy.hh | 2 -- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index a673033d26..6f4b771c4b 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -372,22 +372,14 @@ stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until( return _rs->for_each_natural_endpoint_until(search_token, *this, func); } -future<> vnode_effective_replication_map::clear_gently() noexcept { - co_await utils::clear_gently(_replication_map); - co_await utils::clear_gently(_tmptr); -} - vnode_effective_replication_map::~vnode_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); try { - struct background_clear_holder { - locator::replication_map replication_map; - locator::token_metadata_ptr tmptr; - }; - auto holder = make_lw_shared({std::move(_replication_map), std::move(_tmptr)}); - auto fut = when_all(utils::clear_gently(holder->replication_map), utils::clear_gently(holder->tmptr)).discard_result().then([holder] {}); - _factory->submit_background_work(std::move(fut)); + _factory->submit_background_work(clear_gently(std::move(_replication_map), + std::move(_pending_endpoints), + std::move(_read_endpoints), + std::move(_tmptr))); } catch (...) { // ignore } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index d48240c25d..aa53c60026 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -281,8 +281,6 @@ public: return _replication_map; } - future<> clear_gently() noexcept; - future clone_endpoints_gently() const; stop_iteration for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const; From 6f12c72c3f08c2c2f3fbca07070b623a512887c4 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 1 May 2023 14:45:46 +0400 Subject: [PATCH 07/21] effective_replication_map: clone_endpoints_gently -> clone_data_gently We need to account for the new fields in the clone implementation. The signature future erm::clone() const; doesn't work because the call will be made via foreign_ptr on an instance from another shard, so we need to use local values for replication_strategy and token_metadata. --- locator/abstract_replication_strategy.cc | 23 ++++++++++++++++------- locator/abstract_replication_strategy.hh | 9 ++++++++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 6f4b771c4b..f0bb0637f3 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -353,15 +353,24 @@ future calculate_effective_replicat ring_mapping{}, ring_mapping{}, rf); } -future vnode_effective_replication_map::clone_endpoints_gently() const { - replication_map cloned_endpoints; +auto vnode_effective_replication_map::clone_data_gently() const -> future> { + auto result = std::make_unique(); for (auto& i : _replication_map) { - cloned_endpoints.emplace(i.first, i.second); + result->replication_map.emplace(i.first, i.second); co_await coroutine::maybe_yield(); } - co_return cloned_endpoints; + for (const auto& i : _pending_endpoints) { + result->pending_endpoints += i; + co_await coroutine::maybe_yield(); + } + + for (const auto& i : _read_endpoints) { + result->read_endpoints += i; + co_await coroutine::maybe_yield(); + } + co_return std::move(result); } inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const { @@ -418,9 +427,9 @@ future effective_replication_map_factory::c mutable_vnode_effective_replication_map_ptr new_erm; if (ref_erm) { auto rf = ref_erm->get_replication_factor(); - auto local_replication_map = co_await ref_erm->clone_endpoints_gently(); - new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_replication_map), - ring_mapping{}, ring_mapping{}, rf); + auto local_data = co_await ref_erm->clone_data_gently(); + new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_data->replication_map), + std::move(local_data->pending_endpoints), std::move(local_data->read_endpoints), rf); } else { new_erm = co_await calculate_effective_replication_map(std::move(rs), std::move(tmptr)); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index aa53c60026..6a8a04c3f9 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -281,7 +281,14 @@ public: return _replication_map; } - future clone_endpoints_gently() const; + struct cloned_data { + replication_map replication_map; + ring_mapping pending_endpoints; + ring_mapping read_endpoints; + }; + // boost::icl::interval_map is not no_throw_move_constructible -> can't return cloned_data by val, + // since future_state requires T to be no_throw_move_constructible. + future> clone_data_gently() const; stop_iteration for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const; From e0bc98a2179507882bdc582176da9f6b7d766ae8 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 16 May 2023 12:44:11 +0400 Subject: [PATCH 08/21] sequenced_set: add extract_vector method Can be useful if we want to reuse the vector when we are done with this sequenced_set instance. --- utils/sequenced_set.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/utils/sequenced_set.hh b/utils/sequenced_set.hh index 97d523ff00..19f2498408 100644 --- a/utils/sequenced_set.hh +++ b/utils/sequenced_set.hh @@ -129,6 +129,10 @@ public: return _set; } + auto extract_vector() && noexcept { + return std::move(_vec); + } + auto extract_set() && noexcept { return std::move(_set); } From 99ff1fefe54f765e64d1d638f1a99da4ea3634a9 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 20 May 2023 20:04:35 +0400 Subject: [PATCH 09/21] abstract_replication_strategy.hh: de-virtualize natural_endpoints_depend_on_token We are going to use this function in vnode_erm::get_natural_endpoints, so for efficiency it's better to have fewer virtual calls. --- locator/abstract_replication_strategy.hh | 3 ++- locator/everywhere_replication_strategy.cc | 4 +++- locator/everywhere_replication_strategy.hh | 2 -- locator/local_strategy.cc | 4 +++- locator/local_strategy.hh | 2 -- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 6a8a04c3f9..36b36e134d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -67,6 +67,7 @@ protected: replication_strategy_type _my_type; bool _per_table = false; bool _uses_tablets = false; + bool _natural_endpoints_depend_on_token = true; template void err(const char* fmt, Args&&... args) const { @@ -92,7 +93,7 @@ public: // Evaluates to true iff calculate_natural_endpoints // returns different results for different tokens. - virtual bool natural_endpoints_depend_on_token() const noexcept { return true; } + bool natural_endpoints_depend_on_token() const noexcept { return _natural_endpoints_depend_on_token; } // The returned vector has size O(number of normal token owners), which is O(number of nodes in the cluster). // Note: it is not guaranteed that the function will actually yield. If the complexity of a particular implementation diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 43126347c0..9e771eaad2 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -17,7 +17,9 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const replication_strategy_config_options& config_options) : - abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) {} + abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) { + _natural_endpoints_depend_on_token = false; +} future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { auto eps = tm.get_all_endpoints(); diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index a123efbfa1..94152415a4 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -18,8 +18,6 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const replication_strategy_config_options& config_options); - virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options(const gms::feature_service&) const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index b3df8248b5..7fc8504d55 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -15,7 +15,9 @@ namespace locator { local_strategy::local_strategy(const replication_strategy_config_options& config_options) : - abstract_replication_strategy(config_options, replication_strategy_type::local) {} + abstract_replication_strategy(config_options, replication_strategy_type::local) { + _natural_endpoints_depend_on_token = false; +} future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { return make_ready_future(endpoint_set({utils::fb_utilities::get_broadcast_address()})); diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index be4d2856b7..6e7009f5cb 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -27,8 +27,6 @@ public: virtual ~local_strategy() {}; virtual size_t get_replication_factor(const token_metadata&) const override; - virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options(const gms::feature_service&) const override; From b9812023c636ad80ac4469379d45c37ccb4e963f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 20 May 2023 20:47:04 +0400 Subject: [PATCH 10/21] vnode_erm::get_range_addresses: use sorted_tokens We want to refactor replication_map so that it doesn't store multiple copies of the same endpoints vector in case of natural_endpoints_depend_on_token == false. To preserve get_range_addresses behaviour we iterate over tm.sorted_tokens() instead of _replication_map. It's possible that the callers of this function are ok with single range in case of natural_endpoints_depend_on_token == false, but to restrict the scope of the refactoring we refrain from going to that direction. --- locator/abstract_replication_strategy.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index f0bb0637f3..e2639730ea 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -287,10 +287,10 @@ future> vnode_effective_replication_map::get_range_addresses() const { const token_metadata& tm = *_tmptr; std::unordered_map ret; - for (const auto& [t, eps] : _replication_map) { + for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); for (auto& r : ranges) { - ret.emplace(r, eps); + ret.emplace(r, get_natural_endpoints(t)); } co_await coroutine::maybe_yield(); } From a8c36aad0bf50168e15051ff81dcb5d2486a1f5c Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 20 May 2023 21:01:36 +0400 Subject: [PATCH 11/21] vnode_erm: optimize replication_map We optimise memory usage of replication_map by storing endpoints list only once in case of natural_endpoints_depend_on_token() == false. For simplicity, this list is stored in the same unordered_map with special key default_replication_map_key. We inline both get_natural_endpoints and for_each_natural_endpoint_until from abstract_replication_strategy into vnode_erm since now the overrides in local and everywhere strategies are redundant. The default implementation works for them as empty sorted_tokens() is not a problem, we store endpoints with a special key. Function do_get_natural_endpoints was extracted, since get_natural_endpoints returns by val, but for_each_natural_endpoint_until reference in sufficient. --- locator/abstract_replication_strategy.cc | 59 ++++++++++------------ locator/abstract_replication_strategy.hh | 9 +--- locator/everywhere_replication_strategy.cc | 29 +++-------- locator/everywhere_replication_strategy.hh | 7 --- locator/local_strategy.cc | 8 --- locator/local_strategy.hh | 7 --- 6 files changed, 33 insertions(+), 86 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index e2639730ea..57df21ca09 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -64,23 +64,6 @@ sstring abstract_replication_strategy::to_qualified_class_name(std::string_view return strategy_class_registry::to_qualified_class_name(strategy_class_name); } -inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const { - const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); - auto res = erm.get_replication_map().find(key_token); - return res->second; -} - -stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { - const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); - auto res = erm.get_replication_map().find(key_token); - for (const auto& ep : res->second) { - if (func(ep) == stop_iteration::yes) { - return stop_iteration::yes; - } - } - return stop_iteration::no; -} - inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const { inet_address_vector_replica_set natural_endpoints = get_natural_endpoints(search_token); maybe_remove_node_being_replaced(*_tmptr, *_rs, natural_endpoints); @@ -328,24 +311,21 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p co_return ret; } +static const auto default_replication_map_key = dht::token::from_int64(0); + future calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; + const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); - - if (!sorted_tokens.empty()) { - replication_map.reserve(sorted_tokens.size()); - if (rs->natural_endpoints_depend_on_token()) { - for (const auto &t : sorted_tokens) { - auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); - replication_map.emplace(t, eps.get_vector()); - } - } else { - auto eps = co_await rs->calculate_natural_endpoints(sorted_tokens.front(), *tmptr); - for (const auto &t : sorted_tokens) { - replication_map.emplace(t, eps.get_vector()); - co_await coroutine::maybe_yield(); - } + replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); + if (depend_on_token) { + for (const auto &t : sorted_tokens) { + auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); + replication_map.emplace(t, eps.get_vector()); } + } else { + auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); + replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector()); } auto rf = rs->get_replication_factor(*tmptr); @@ -373,12 +353,25 @@ auto vnode_effective_replication_map::clone_data_gently() const -> futurenatural_endpoints_depend_on_token() + ? _tmptr->first_token(search_token) + : default_replication_map_key; + const auto it = _replication_map.find(key_token); + return it->second; +} + inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const { - return _rs->get_natural_endpoints(search_token, *this); + return do_get_natural_endpoints(search_token); } stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const { - return _rs->for_each_natural_endpoint_until(search_token, *this, func); + for (const auto& ep : do_get_natural_endpoints(search_token)) { + if (func(ep) == stop_iteration::yes) { + return stop_iteration::yes; + } + } + return stop_iteration::no; } vnode_effective_replication_map::~vnode_effective_replication_map() { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 36b36e134d..886c91f62e 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -113,9 +113,6 @@ public: static sstring to_qualified_class_name(std::string_view strategy_class_name); - virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const; - // Returns the last stop_iteration result of the called func - virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const; virtual void validate_options(const gms::feature_service&) const = 0; virtual std::optional> recognized_options(const topology&) const = 0; virtual size_t get_replication_factor(const token_metadata& tm) const = 0; @@ -251,7 +248,6 @@ public: sstring to_sstring() const; }; - private: replication_map _replication_map; ring_mapping _pending_endpoints; @@ -278,10 +274,6 @@ public: vnode_effective_replication_map(vnode_effective_replication_map&&) = default; ~vnode_effective_replication_map(); - const replication_map& get_replication_map() const noexcept { - return _replication_map; - } - struct cloned_data { replication_map replication_map; ring_mapping pending_endpoints; @@ -323,6 +315,7 @@ public: private: dht::token_range_vector do_get_ranges(noncopyable_function consider_range_for_endpoint) const; + const inet_address_vector_replica_set& do_get_natural_endpoints(const token& search_token) const; public: static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr); diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 9e771eaad2..f7940243db 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -22,35 +22,18 @@ everywhere_replication_strategy::everywhere_replication_strategy(const replicati } future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { - auto eps = tm.get_all_endpoints(); - return make_ready_future(endpoint_set(eps.begin(), eps.end())); + if (tm.sorted_tokens().empty()) { + endpoint_set result{inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})}; + return make_ready_future(std::move(result)); + } + const auto& all_endpoints = tm.get_all_endpoints(); + return make_ready_future(endpoint_set(all_endpoints.begin(), all_endpoints.end())); } size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const { return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners(); } -inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map& erm) const { - const auto& tm = *erm.get_token_metadata_ptr(); - if (tm.sorted_tokens().empty()) { - return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); - } - return boost::copy_range(tm.get_all_endpoints()); -} - -stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { - const auto& tm = *erm.get_token_metadata_ptr(); - if (tm.sorted_tokens().empty()) { - return func(utils::fb_utilities::get_broadcast_address()); - } - for (const auto& ep : tm.get_all_endpoints()) { - if (func(ep)) { - return stop_iteration::yes; - } - } - return stop_iteration::no; -} - using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.EverywhereStrategy"); static registry registrator_short_name("EverywhereStrategy"); diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index 94152415a4..292d3755fd 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -32,12 +32,5 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return true; } - - /** - * We need to override this because the default implementation depends - * on token calculations but everywhere_replication_strategy may be used before tokens are set up. - */ - virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 7fc8504d55..b3e479f2c1 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -35,14 +35,6 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const { return 1; } -inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map&) const { - return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); -} - -stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const { - return func(utils::fb_utilities::get_broadcast_address()); -} - using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.LocalStrategy"); static registry registrator_short_name("LocalStrategy"); diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index 6e7009f5cb..60e58e1d4b 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -36,13 +36,6 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return false; } - - /** - * We need to override this because the default implementation depends - * on token calculations but LocalStrategy may be used before tokens are set up. - */ - inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } From fbe3254a9e6935a7b5779031e8995624cc81d87a Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 28 Apr 2023 19:53:57 +0400 Subject: [PATCH 12/21] calculate_effective_replication_map: compute pending_endpoints and read_endpoints In this commit we add logic to calculate pending_endpoints and read_endpoints, similar to how it was done in update_pending_ranges. For situations where 'natural_endpoints_depend_on_token' is false we short-circuit the calculations, breaking out of the loop after the first iteration. In this case we add a single item with key=default_replication_map_key to the replication_map and set pending_endpoints/read_endpoints key range to the entire set of possible values. In the loop we iterate over all_tokens, which contains the union of all boundary tokens, from the old and from the new topology. In addition to updating pending_endpoints and read_endpoints in the loop, we remember the new natural endpoints in the replication_map if the current token is contained in the current set of boundary tokens. --- locator/abstract_replication_strategy.cc | 67 ++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 3 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 57df21ca09..2817736688 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -315,13 +315,74 @@ static const auto default_replication_map_key = dht::token::from_int64(0); future calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; + ring_mapping pending_endpoints; + ring_mapping read_endpoints; const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); - if (depend_on_token) { + if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) { + const auto& all_tokens = topology_changes->all_tokens; + const auto& base_token_metadata = topology_changes->base_token_metadata + ? *topology_changes->base_token_metadata + : *tmptr; + const auto& current_tokens = tmptr->get_token_to_endpoint(); + for (size_t i = 0, size = all_tokens.size(); i < size; ++i) { + co_await coroutine::maybe_yield(); + + const auto token = all_tokens[i]; + + auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata); + auto target_endpoints = co_await rs->calculate_natural_endpoints(token, topology_changes->target_token_metadata); + + auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { + using interval = ring_mapping::interval_type; + if (!depend_on_token) { + target += std::make_pair( + interval::open(dht::minimum_token(), dht::maximum_token()), + std::move(endpoints)); + } else if (i == 0) { + target += std::make_pair( + interval::open(all_tokens.back(), dht::maximum_token()), + endpoints); + target += std::make_pair( + interval::left_open(dht::minimum_token(), token), + std::move(endpoints)); + } else { + target += std::make_pair( + interval::left_open(all_tokens[i - 1], token), + std::move(endpoints)); + } + }; + + { + std::unordered_set endpoints_diff; + for (const auto& e: target_endpoints) { + if (!current_endpoints.contains(e)) { + endpoints_diff.insert(e); + } + } + if (!endpoints_diff.empty()) { + add_mapping(pending_endpoints, std::move(endpoints_diff)); + } + } + + // in order not to waste memory, we update read_endpoints only if the + // new endpoints differs from the old one + if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { + add_mapping(read_endpoints, std::move(target_endpoints).extract_set()); + } + + if (!depend_on_token) { + replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector()); + break; + } else if (current_tokens.contains(token)) { + replication_map.emplace(token, std::move(current_endpoints).extract_vector()); + } + } + } else if (depend_on_token) { for (const auto &t : sorted_tokens) { auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); - replication_map.emplace(t, eps.get_vector()); + replication_map.emplace(t, std::move(eps).extract_vector()); } } else { auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); @@ -330,7 +391,7 @@ future calculate_effective_replicat auto rf = rs->get_replication_factor(*tmptr); co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), - ring_mapping{}, ring_mapping{}, rf); + std::move(pending_endpoints), std::move(read_endpoints), rf); } auto vnode_effective_replication_map::clone_data_gently() const -> future> { From e22a5c42c8a7398ab4c6ac0b3ef28b5634e44ad2 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 28 Apr 2023 21:50:56 +0400 Subject: [PATCH 13/21] vnode_effective_replication_map: get_pending_endpoints and get_endpoints_for_reading In this commit we introduce functions to erm for accessing pending_endpoints and read_endpoints similar to the corresponding functions in token_metadata. The only difference - we no longer need the keyspace_name map. The functions get_pending_endpoints and get_endpoints_for_reading are virtual, since they have different implementations for vnode and for tablets. The get_pending_endpoints already existed. For tablets it remained unchanged, while for vnode we just changed it from calling on token_metadata to using a local field. We have also removed ks_name from the signature as it's no longer needed. For vnodes, the get_endpoints_for_reading also just employs the local field. In the case of tablets, we currently return nullptr as the appropriate implementation remains unclear. --- locator/abstract_replication_strategy.cc | 27 ++++++++++++++++++++++-- locator/abstract_replication_strategy.hh | 10 +++++++-- locator/tablets.cc | 6 +++++- service/storage_proxy.cc | 4 ++-- 4 files changed, 40 insertions(+), 7 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 2817736688..82ac611fa7 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -93,8 +93,31 @@ void maybe_remove_node_being_replaced(const token_metadata& tm, } } -inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const { - return _tmptr->pending_endpoints_for(search_token, ks_name); +static const std::unordered_set* find_token(const ring_mapping& ring_mapping, const token& token) { + if (ring_mapping.empty()) { + return nullptr; + } + const auto interval = token_metadata::range_to_interval(range(token)); + const auto it = ring_mapping.find(interval); + return it != ring_mapping.end() ? &it->second : nullptr; +} + +inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token) const { + inet_address_vector_topology_change endpoints; + const auto* pending_endpoints = find_token(_pending_endpoints, search_token); + if (pending_endpoints) { + // interval_map does not work with std::vector, convert to inet_address_vector_topology_change + endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end()); + } + return endpoints; +} + +std::optional vnode_effective_replication_map::get_endpoints_for_reading(const token& token) const { + const auto* endpoints = find_token(_read_endpoints, token); + if (endpoints == nullptr) { + return {}; + } + return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); } std::unique_ptr vnode_effective_replication_map::make_splitter() const { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 886c91f62e..c7556d3485 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -198,7 +198,12 @@ public: /// Returns the set of pending replicas for a given token. /// Pending replica is a replica which gains ownership of data. /// Non-empty only during topology change. - virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const = 0; + virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const = 0; + + /// Returns a list of nodes to which a read request should be directed. + /// Returns not null only during topology changes, if request_read_new was called and + /// new set of replicas differs from the old one. + virtual std::optional get_endpoints_for_reading(const token& search_token) const = 0; /// Returns a token_range_splitter which is line with the replica assignment of this replication map. /// The splitter can live longer than this instance. @@ -260,7 +265,8 @@ private: public: // effective_replication_map inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override; inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override; - inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override; + inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override; + std::optional get_endpoints_for_reading(const token& search_token) const override; std::unique_ptr make_splitter() const override; public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, diff --git a/locator/tablets.cc b/locator/tablets.cc index 280e9e2f6e..7fa4f6608d 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -228,7 +228,7 @@ public: return result; } - virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override { + virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override { auto&& tablets = get_tablet_map(); auto tablet = tablets.get_tablet_id(search_token); auto&& info = tablets.get_tablet_transition_info(tablet); @@ -240,6 +240,10 @@ public: return {get_endpoint_for_host_id(info->pending_replica.host)}; } + virtual std::optional get_endpoints_for_reading(const token& search_token) const override { + return std::nullopt; + } + virtual std::unique_ptr make_splitter() const override { class splitter : public token_range_splitter { token_metadata_ptr _tmptr; // To keep the tablet map alive. diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1e142d4b31..209a10baba 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2845,7 +2845,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); - inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token, s->ks_name()); + inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token); slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints); tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints); @@ -3208,7 +3208,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level storage_proxy::paxos_participants storage_proxy::get_paxos_participants(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token &token, db::consistency_level cl_for_paxos) { inet_address_vector_replica_set natural_endpoints = erm.get_natural_endpoints_without_node_being_replaced(token); - inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token, ks_name); + inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token); if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) { auto local_dc_filter = erm.get_topology().get_local_dc_filter(); From d4f004f5c7a332d18f1fb574548fadbed9921901 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 1 May 2023 16:12:11 +0400 Subject: [PATCH 14/21] token_metadata_test.cc: create token_metadata and replication_strategy as shared pointers We want to switch token_metadata_test to the new implementation of pending_endpoints and read_endpoints in erm. To do this, it is convenient to have token_metadata and replication_strategy as shared pointers, as it fits better with the signature of calculate_effective_replication_map. In this commit we don't change the logic of the tests, we just migrate them to use pointers. --- test/boost/token_metadata_test.cc | 166 +++++++++++++++--------------- 1 file changed, 83 insertions(+), 83 deletions(-) diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 1926cc25bf..18db80dfbf 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -24,8 +24,8 @@ namespace { }; } - token_metadata create_token_metadata(inet_address this_endpoint) { - return token_metadata({ + mutable_token_metadata_ptr create_token_metadata(inet_address this_endpoint) { + return make_lw_shared(token_metadata::config { topology::config { .this_host_id = host_id::create_random_id(), .this_endpoint = this_endpoint, @@ -41,17 +41,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "1"} }); - token_metadata.add_bootstrap_token(t1, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->add_bootstrap_token(t1, e1); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), inet_address_vector_topology_change{e1}); } @@ -63,23 +63,23 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { const auto e2 = inet_address("192.168.0.2"); const auto t2 = dht::token::from_int64(100); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "1"} }); - token_metadata.update_normal_tokens({t1}, e1).get(); - token_metadata.add_bootstrap_token(t2, e2); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_bootstrap_token(t2, e2); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), inet_address_vector_topology_change{}); } @@ -94,25 +94,25 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "2"} }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_bootstrap_token(t100, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_bootstrap_token(t100, e1); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), inet_address_vector_topology_change{}); } @@ -127,26 +127,26 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "2"} }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.update_normal_tokens({t100}, e1).get(); - token_metadata.add_leaving_endpoint(e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->update_normal_tokens({t100}, e1).get(); + token_metadata->add_leaving_endpoint(e1); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), inet_address_vector_topology_change{}); } @@ -162,31 +162,31 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e3 = inet_address("192.168.0.3"); const auto e4 = inet_address("192.168.0.4"); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - token_metadata.update_topology(e4, get_dc_rack(e4)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + token_metadata->update_topology(e4, get_dc_rack(e4)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "2"} }); - token_metadata.update_normal_tokens({t1000}, e1).get(); - token_metadata.update_normal_tokens({t1, t100}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_replacing_endpoint(e3, e4); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + token_metadata->update_normal_tokens({t1000}, e1).get(); + token_metadata->update_normal_tokens({t1, t100}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_replacing_endpoint(e3, e4); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1000), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1000), ks_name), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1001), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1001), ks_name), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(10), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(10), ks_name), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), inet_address_vector_topology_change{}); } @@ -201,15 +201,15 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "2"} }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_bootstrap_token(t100, e1); + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_bootstrap_token(t100, e1); auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas, seastar::compat::source_location sl = seastar::compat::source_location::current()) @@ -217,7 +217,7 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas BOOST_TEST_INFO("line: " << sl.line()); const auto expected_set = std::unordered_set(expected_replicas.begin(), expected_replicas.end()); - const auto actual_replicas = token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name); + const auto actual_replicas = token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name); BOOST_REQUIRE(actual_replicas.has_value()); const auto actual_set = std::unordered_set(actual_replicas->begin(), actual_replicas->end()); @@ -228,14 +228,14 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); - BOOST_REQUIRE(!token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value()); + BOOST_REQUIRE(!token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value()); }; - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); check_no_endpoints(2); - token_metadata.set_read_new(locator::token_metadata::read_new_t::yes); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); + token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); check_endpoints(2, {e3, e1}); check_endpoints(10, {e3, e1}); @@ -252,14 +252,14 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { const auto t1 = dht::token::from_int64(1); const auto e1 = inet_address("192.168.0.1"); auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { + token_metadata->update_topology(e1, get_dc_rack(e1)); + const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { {"replication_factor", "2"} }); - token_metadata.update_normal_tokens({t1}, e1).get(); - token_metadata.add_replacing_endpoint(e1, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_replacing_endpoint(e1, e1); + token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.get_endpoint(t1), e1); + BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); } From 87307781c43914b0dbb7503908100371de227800 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 1 May 2023 18:16:08 +0400 Subject: [PATCH 15/21] effective_replication_map: use new get_pending_endpoints and get_endpoints_for_reading We already use the new pending_endpoints from erm though the get_pending_ranges virtual function, in this commit we update all the remaining places to use the new implementation in erm, as well as remove the old implementation in token_metadata. --- db/view/view.cc | 2 +- locator/token_metadata.cc | 62 ----------------- locator/token_metadata.hh | 8 --- service/storage_proxy.cc | 2 +- test/boost/token_metadata_test.cc | 107 +++++++++++++++++------------- 5 files changed, 62 insertions(+), 119 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index adfbcc8498..20588eed9e 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1637,7 +1637,7 @@ future<> view_update_generator::mutate_MV( auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto& keyspace_name = mut.s->ks_name(); auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token); - auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); + auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 0a30fe8a98..491d720337 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -281,14 +281,6 @@ public: size_t count_normal_token_owners() const; private: future<> update_normal_token_owners(); - - enum class endpoints_field { - pending_endpoints, - read_endpoints - }; - const std::unordered_set* maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const; public: // returns empty vector if keyspace_name not found. inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; @@ -998,50 +990,6 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) { _replacing_endpoints.erase(existing_node); } -const std::unordered_set* token_metadata_impl::maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const -{ - // Fast path 0: migration_info not found for this keyspace_name - const auto migration_info_it = _keyspace_to_migration_info.find(keyspace_name); - if (migration_info_it == _keyspace_to_migration_info.end()) { - return nullptr; - } - - // Fast path 1: empty ring_mapping for this keyspace_name - const auto& migration_info = migration_info_it->second; - const auto& ring_mapping = field == endpoints_field::pending_endpoints - ? migration_info.pending_endpoints - : migration_info.read_endpoints; - if (ring_mapping.empty()) { - return nullptr; - } - - // Slow path: lookup remapping - const auto interval = range_to_interval(range(token)); - const auto it = ring_mapping.find(interval); - return it != ring_mapping.end() ? &it->second : nullptr; -} - -inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - inet_address_vector_topology_change endpoints; - const auto* pending_endpoints = maybe_migration_endpoints(endpoints_field::pending_endpoints, - token, keyspace_name); - if (pending_endpoints) { - // interval_map does not work with std::vector, convert to inet_address_vector_topology_change - endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end()); - } - return endpoints; -} - -std::optional token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name); - if (endpoints == nullptr) { - return std::nullopt; - } - return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); -} - std::map token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const { std::map ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end()); ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end()); @@ -1361,16 +1309,6 @@ token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } -inet_address_vector_topology_change -token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - return _impl->pending_endpoints_for(token, keyspace_name); -} - -std::optional -token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - return _impl->endpoints_for_reading(token, keyspace_name); -} - void token_metadata::set_read_new(read_new_t read_new) { _impl->set_read_new(read_new); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 947217a3cd..860ace552f 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -269,14 +269,6 @@ public: * Bootstrapping tokens are not taken into account. */ size_t count_normal_token_owners() const; - // returns empty vector if keyspace_name not found. - inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; - - // This function returns a list of nodes to which a read request should be directed. - // Returns not null only during topology changes, if _topology_change_stage == read_new and - // new set of replicas differs from the old one. - std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - // Updates the read_new flag, switching read requests from // the old endpoints to the new ones during topology changes: // read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 209a10baba..8531901af0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6008,7 +6008,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i } inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const { - auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name); + auto endpoints = erm.get_endpoints_for_reading(token); if (!endpoints) { endpoints = erm.get_natural_endpoints_without_node_being_replaced(token); } diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 18db80dfbf..530e7f2f03 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -46,12 +46,13 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { {"replication_factor", "1"} }); token_metadata->add_bootstrap_token(t1, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e1}); } @@ -70,16 +71,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_bootstrap_token(t2, e2); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -103,16 +105,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { token_metadata->update_normal_tokens({t1, t1000}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -137,16 +140,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->update_normal_tokens({t100}, e1).get(); token_metadata->add_leaving_endpoint(e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -173,20 +177,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { token_metadata->update_normal_tokens({t1, t100}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_replacing_endpoint(e3, e4); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1000), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1001), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1001)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(10), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(10)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{}); } @@ -211,39 +216,46 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); - auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas, + auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, + inet_address_vector_replica_set expected_replicas, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); const auto expected_set = std::unordered_set(expected_replicas.begin(), expected_replicas.end()); - const auto actual_replicas = token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name); + const auto actual_replicas = erm->get_endpoints_for_reading(dht::token::from_int64(t)); BOOST_REQUIRE(actual_replicas.has_value()); const auto actual_set = std::unordered_set(actual_replicas->begin(), actual_replicas->end()); BOOST_REQUIRE_EQUAL(expected_set, actual_set); }; - auto check_no_endpoints = [&](int64_t t, + auto check_no_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); - BOOST_REQUIRE(!token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value()); + BOOST_REQUIRE(!erm->get_endpoints_for_reading(dht::token::from_int64(t)).has_value()); }; - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - check_no_endpoints(2); + { + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + check_no_endpoints(erm, 2); + } - token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + { + token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); - check_endpoints(2, {e3, e1}); - check_endpoints(10, {e3, e1}); - check_endpoints(11, {e1, e2}); - check_endpoints(100, {e1, e2}); - check_no_endpoints(101); - check_no_endpoints(1001); - check_no_endpoints(1); + check_endpoints(erm, 2, {e3, e1}); + check_endpoints(erm, 10, {e3, e1}); + check_endpoints(erm, 11, {e1, e2}); + check_endpoints(erm, 100, {e1, e2}); + check_no_endpoints(erm, 101); + check_no_endpoints(erm, 1001); + check_no_endpoints(erm, 1); + } } SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { @@ -258,8 +270,9 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_replacing_endpoint(e1, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); } From 8cb709d3d60babb923a4983fe301b7d7577b2faa Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 1 May 2023 17:55:27 +0400 Subject: [PATCH 16/21] token_metadata: drop update_pending_ranges The function storage_service::update_pending_ranges is turned to update_topology_changes_info. The pending_endpoints and read_endpoints will be computed later, when the erms are rebuilt. --- locator/token_metadata.cc | 157 ------------------------------------- locator/token_metadata.hh | 11 --- service/storage_service.cc | 62 +++++++-------- service/storage_service.hh | 4 +- 4 files changed, 29 insertions(+), 205 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 491d720337..00718108f4 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -246,20 +246,6 @@ public: public: bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - /** - * Calculate pending ranges according to bootstrapping, leaving and replacing nodes. - * - * We construct an updated version of the token_metadata by incorporating - * all proposed modifications (join, bootstrap, and replace operations). - * Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints - * function applied to both the previous and the new token_metadata. - * Endpoints present in the updated version but absent in the original one - * ought to be appended to the pending_ranges. - */ - future<> update_pending_ranges( - const token_metadata& unpimplified_this, - const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); - future<> update_topology_change_info(dc_rack_fn& get_dc_rack); const std::optional& get_topology_change_info() const { return _topology_change_info; @@ -737,144 +723,6 @@ token_metadata_impl::has_pending_ranges(sstring keyspace_name, inet_address endp return false; } -future<> token_metadata_impl::update_pending_ranges( - const token_metadata& unpimplified_this, - const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) { - tlogger.debug("calculate_pending_ranges: keyspace_name={}, bootstrap_tokens={}, leaving nodes={}, replacing_endpoints={}", - keyspace_name, _bootstrap_tokens, _leaving_endpoints, _replacing_endpoints); - if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { - tlogger.debug("No bootstrapping, leaving nodes, replacing nodes -> empty pending ranges for {}", keyspace_name); - _keyspace_to_migration_info.erase(keyspace_name); - return make_ready_future<>(); - } - - return async([this, &unpimplified_this, &strategy, keyspace_name, &get_dc_rack] () mutable { - // true if there is a node replaced with the same IP - bool replace_with_same_endpoint = false; - // new_token_metadata incorporates all the changes from leaving, bootstrapping and replacing - const auto new_token_metadata = token_metadata(std::invoke([&]() -> std::unique_ptr { - auto result = clone_only_token_map(false).get0(); - - // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints - std::unordered_map> new_normal_tokens; - if (!_replacing_endpoints.empty()) { - for (const auto& [token, inet_address]: _token_to_endpoint_map) { - const auto it = _replacing_endpoints.find(inet_address); - if (it == _replacing_endpoints.end()) { - continue; - } - new_normal_tokens[it->second].insert(token); - } - for (const auto& [replace_from, replace_to]: _replacing_endpoints) { - if (replace_from == replace_to) { - replace_with_same_endpoint = true; - } else { - result->remove_endpoint(replace_from); - } - } - } - for (const auto& [token, inet_address]: _bootstrap_tokens) { - new_normal_tokens[inet_address].insert(token); - } - // apply new_normal_tokens - for (auto& [endpoint, tokens]: new_normal_tokens) { - result->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); - result->update_normal_tokens(std::move(tokens), endpoint).get(); - } - // apply leaving endpoints - for (const auto& endpoint: _leaving_endpoints) { - result->remove_endpoint(endpoint); - } - result->sort_tokens(); - return result; - })); - - // We require a distinct token_metadata instance when replace_from equals replace_to, - // as it ensures the node is included in pending_ranges. - // Otherwise, the node would be excluded from both pending_ranges and - // get_natural_endpoints_without_node_being_replaced, - // causing the coordinator to overlook it entirely. - const token_metadata* base_token_metadata; - std::optional self_copy; - if (replace_with_same_endpoint) { - self_copy = token_metadata(std::invoke([&]() -> std::unique_ptr { - auto result = clone_only_token_map(false).get0(); - for (const auto& [replace_from, replace_to]: _replacing_endpoints) { - if (replace_from == replace_to) { - result->remove_endpoint(replace_from); - } - } - result->sort_tokens(); - return result; - })); - base_token_metadata = &*self_copy; - } else { - base_token_metadata = &unpimplified_this; - } - - // merge tokens from token_to_endpoint and bootstrap_tokens, - // preserving tokens of leaving endpoints - const auto tokens = std::invoke([&]() -> std::vector { - auto tokens = std::vector(); - tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); - tokens.resize(sorted_tokens().size()); - std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(tokens)); - for (const auto& p: get_bootstrap_tokens()) { - tokens.push_back(p.first); - } - std::sort(begin(tokens), end(tokens)); - return tokens; - }); - - _keyspace_to_migration_info[keyspace_name] = std::invoke([&]() -> migration_info { - migration_info migration_info; - for (size_t i = 0, size = tokens.size(); i < size; ++i) { - seastar::thread::maybe_yield(); - - const auto token = tokens[i]; - - const auto old_endpoints = strategy.calculate_natural_endpoints(token, *base_token_metadata).get0(); - auto new_endpoints = strategy.calculate_natural_endpoints(token, new_token_metadata).get0(); - - auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { - using interval = ring_mapping::interval_type; - if (i == 0) { - target += std::make_pair( - interval::open(tokens.back(), dht::maximum_token()), - endpoints); - target += std::make_pair( - interval::left_open(dht::minimum_token(), token), - std::move(endpoints)); - } else { - target += std::make_pair( - interval::left_open(tokens[i - 1], token), - std::move(endpoints)); - } - }; - - std::unordered_set pending_endpoints; - for (const auto& e: new_endpoints) { - if (!old_endpoints.contains(e)) { - pending_endpoints.insert(e); - } - } - if (!pending_endpoints.empty()) { - add_mapping(migration_info.pending_endpoints, std::move(pending_endpoints)); - } - - // in order not to waste memory, we update read_endpoints only if the - // new endpoints differs from the old one - if (_read_new == token_metadata::read_new_t::yes && - new_endpoints.get_vector() != old_endpoints.get_vector()) - { - add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set()); - } - } - return migration_info; - }); - }); -} - future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { co_await utils::clear_gently(_topology_change_info); @@ -1279,11 +1127,6 @@ token_metadata::has_pending_ranges(sstring keyspace_name, inet_address endpoint) return _impl->has_pending_ranges(std::move(keyspace_name), endpoint); } -future<> -token_metadata::update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) { - return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack); -} - future<> token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { return _impl->update_topology_change_info(get_dc_rack); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 860ace552f..192cf2a9bf 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -245,17 +245,6 @@ public: static range interval_to_range(boost::icl::interval::interval_type i); bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - /** - * Calculate pending ranges according to bootstrapping, leaving and replacing nodes. - * - * We construct an updated version of the token_metadata by incorporating - * all proposed modifications (join, bootstrap, and replace operations). - * Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints - * function applied to both the previous and the new token_metadata. - * Endpoints present in the updated version but absent in the original one - * ought to be appended to the pending_ranges. - */ - future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); future<> update_topology_change_info(dc_rack_fn& get_dc_rack); diff --git a/service/storage_service.cc b/service/storage_service.cc index ddefa87bb7..79a2faee27 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -263,7 +263,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela slogger.info("waiting for schema information to complete"); co_await sleep_abortable(std::chrono::seconds(1), _abort_source); } - co_await update_pending_ranges("joining"); + co_await update_topology_change_info("joining"); auto tmptr = get_token_metadata_ptr(); if (!_db.local().get_config().consistent_rangemovement() || @@ -397,7 +397,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); } else { tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); - co_await update_pending_ranges(tmptr, ::format("bootstrapping node {}/{}", id, ip)); + co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip)); } break; case node_state::decommissioning: @@ -406,7 +406,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); tmptr->update_host_id(host_id, ip); tmptr->add_leaving_endpoint(ip); - co_await update_pending_ranges(tmptr, ::format("{} {}/{}", rs.state, id, ip)); + co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip)); break; case node_state::replacing: { assert(_topology_state_machine._topology.req_param.contains(id)); @@ -419,7 +419,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s assert(existing_ip); tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); tmptr->add_replacing_endpoint(*existing_ip, ip); - co_await update_pending_ranges(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); + co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); } break; case node_state::rebuilding: @@ -1912,7 +1912,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st auto endpoint = get_broadcast_address(); tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining); tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint); - return update_pending_ranges(std::move(tmptr), ::format("bootstrapping node {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint)); }).get(); } @@ -2012,7 +2012,7 @@ future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_t replacing_node, std::current_exception()); } slogger.info("handle_state_replacing: Update pending ranges for replacing node {}", replacing_node); - co_await update_pending_ranges(tmptr, ::format("handle_state_replacing {}", replacing_node)); + co_await update_topology_change_info(tmptr, ::format("handle_state_replacing {}", replacing_node)); } future<> storage_service::handle_state_bootstrap(inet_address endpoint) { @@ -2044,7 +2044,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) { if (_gossiper.uses_host_id(endpoint)) { tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint); } - co_await update_pending_ranges(tmptr, ::format("handle_state_bootstrap {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_bootstrap {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -2186,7 +2186,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) { co_await tmptr->update_normal_tokens(owned_tokens, endpoint); } - co_await update_pending_ranges(tmptr, ::format("handle_state_normal {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_normal {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); @@ -2251,7 +2251,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) { // normally tmptr->add_leaving_endpoint(endpoint); - co_await update_pending_ranges(tmptr, ::format("handle_state_leaving", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_leaving", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -2314,7 +2314,7 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint); // Note that the endpoint is being removed tmptr->add_leaving_endpoint(endpoint); - return update_pending_ranges(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); }); // find the endpoint coordinating this removal that we need to notify when we're done auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR); @@ -2464,7 +2464,7 @@ future<> storage_service::on_remove(gms::inet_address endpoint) { auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); tmptr->remove_endpoint(endpoint); - co_await update_pending_ranges(tmptr, ::format("on_remove {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("on_remove {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -3442,7 +3442,7 @@ future<> storage_service::decommission() { throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode)); } - ss.update_pending_ranges(::format("decommission {}", endpoint)).get(); + ss.update_topology_change_info(::format("decommission {}", endpoint)).get(); auto non_system_keyspaces = db.get_non_local_vnode_based_strategy_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { @@ -3991,7 +3991,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -3999,7 +3999,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }); }); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { @@ -4039,7 +4039,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -4047,7 +4047,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }); }); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { @@ -4109,7 +4109,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); tmptr->del_replacing_endpoint(existing_node); } - return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes)); + return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes)); }); }); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { @@ -4126,7 +4126,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad // Update the pending_ranges for the replacing node slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); mutate_token_metadata([&req, this] (mutable_token_metadata_ptr tmptr) mutable { - return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes)); + return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes)); }).get(); } else if (req.cmd == node_ops_cmd::replace_heartbeat) { slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -4151,7 +4151,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining); tmptr->add_bootstrap_tokens(tokens, endpoint); } - return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); + return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -4161,7 +4161,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); tmptr->remove_bootstrap_tokens(tokens); } - return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); + return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); }); }); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { @@ -4538,7 +4538,7 @@ future<> storage_service::excise(std::unordered_set tokens, inet_address tmptr->remove_endpoint(endpoint); tmptr->remove_bootstrap_tokens(tokens); - co_await update_pending_ranges(tmptr, ::format("excise {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); @@ -4584,7 +4584,7 @@ future<> storage_service::leave_ring() { co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) { auto endpoint = get_broadcast_address(); tmptr->remove_endpoint(endpoint); - return update_pending_ranges(std::move(tmptr), ::format("leave_ring {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("leave_ring {}", endpoint)); }); auto expire_time = _gossiper.compute_expire_time().time_since_epoch().count(); @@ -4746,30 +4746,22 @@ future<> storage_service::mutate_token_metadata(std::function (mutable_ co_await replicate_to_all_cores(std::move(tmptr)); } -future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason) { +future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) { assert(this_shard_id() == 0); try { locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper); - - auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); - for (const auto& [keyspace_name, erm] : ks_erms) { - auto& strategy = erm->get_replication_strategy(); - slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason); - co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper); - slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason); - } } catch (...) { auto ep = std::current_exception(); - slogger.error("Failed to update pending ranges for {}: {}", reason, ep); + slogger.error("Failed to update topology change info for {}: {}", reason, ep); std::rethrow_exception(std::move(ep)); } } -future<> storage_service::update_pending_ranges(sstring reason, acquire_merge_lock acquire_merge_lock) { +future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) { return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable { - return update_pending_ranges(std::move(tmptr), std::move(reason)); + return update_topology_change_info(std::move(tmptr), std::move(reason)); }, acquire_merge_lock); } @@ -4777,7 +4769,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) { // Update pending ranges since keyspace can be changed after we calculate pending ranges. sstring reason = ::format("keyspace {}", ks_name); return container().invoke_on(0, [reason = std::move(reason)] (auto& ss) mutable { - return ss.update_pending_ranges(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) { + return ss.update_topology_change_info(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) { slogger.warn("Failure to update pending ranges for {} ignored", reason); }); }); diff --git a/service/storage_service.hh b/service/storage_service.hh index f745bfe61e..1ff0024913 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -190,8 +190,8 @@ private: // Update pending ranges locally and then replicate to all cores. // Should be serialized under token_metadata_lock. // Must be called on shard 0. - future<> update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason); - future<> update_pending_ranges(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes); + future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason); + future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes); future<> keyspace_changed(const sstring& ks_name); void register_metrics(); future<> snitch_reconfigured(); From 5495065242b880290691a96c6c8523a1dba708d2 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 28 Apr 2023 22:21:28 +0400 Subject: [PATCH 17/21] effective_replication_map: add has_pending_ranges We add the has_pending_ranges function to erm. The implementation for vnode is similar to that of token_metadata. For tablets, we add new code that checks if the given endpoint is contained in tablet_map::_transitions. --- locator/abstract_replication_strategy.cc | 10 ++++++++++ locator/abstract_replication_strategy.hh | 6 ++++++ locator/tablets.cc | 13 +++++++++++++ locator/tablets.hh | 4 ++++ 4 files changed, 33 insertions(+) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 82ac611fa7..33cd81f442 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -120,6 +120,16 @@ std::optional vnode_effective_replication_map:: return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); } +bool vnode_effective_replication_map::has_pending_ranges(inet_address endpoint) const { + for (const auto& item : _pending_endpoints) { + const auto& nodes = item.second; + if (nodes.contains(endpoint)) { + return true; + } + } + return false; +} + std::unique_ptr vnode_effective_replication_map::make_splitter() const { return locator::make_splitter(_tmptr); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index c7556d3485..744eb172d6 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -205,6 +205,11 @@ public: /// new set of replicas differs from the old one. virtual std::optional get_endpoints_for_reading(const token& search_token) const = 0; + /// Returns true if there are any pending ranges for this endpoint. + /// This operation is expensive, for vnode_erm it iterates + /// over all pending ranges which is O(number of tokens). + virtual bool has_pending_ranges(inet_address endpoint) const = 0; + /// Returns a token_range_splitter which is line with the replica assignment of this replication map. /// The splitter can live longer than this instance. virtual std::unique_ptr make_splitter() const = 0; @@ -267,6 +272,7 @@ public: // effective_replication_map inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override; inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override; std::optional get_endpoints_for_reading(const token& search_token) const override; + bool has_pending_ranges(inet_address endpoint) const override; std::unique_ptr make_splitter() const override; public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, diff --git a/locator/tablets.cc b/locator/tablets.cc index 7fa4f6608d..5a732d80bf 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -244,6 +244,19 @@ public: return std::nullopt; } + virtual bool has_pending_ranges(inet_address endpoint) const override { + const auto host_id = _tmptr->get_host_id_if_known(endpoint); + if (!host_id.has_value()) { + return false; + } + for (const auto& [id, transition_info]: get_tablet_map().transitions()) { + if (transition_info.pending_replica.host == *host_id) { + return true; + } + } + return false; + } + virtual std::unique_ptr make_splitter() const override { class splitter : public token_range_splitter { token_metadata_ptr _tmptr; // To keep the tablet map alive. diff --git a/locator/tablets.hh b/locator/tablets.hh index ae57cb2724..ddb812cd15 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -152,6 +152,10 @@ public: return _tablets; } + const auto& transitions() const { + return _transitions; + } + /// Returns an iterable range over tablet_id:s which includes all tablets in token ring order. auto tablet_ids() const { return boost::irange(0, tablet_count()) | boost::adaptors::transformed([] (size_t i) { From 5976277c2cef5a3ffe7acd17f28d667fad5c6def Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 28 Apr 2023 22:30:12 +0400 Subject: [PATCH 18/21] token_metadata: drop has_pending_ranges and migration_info Use the new erm::has_pending_ranges function, drop the old implementation from token_metadata. --- locator/token_metadata.cc | 42 -------------------------------------- locator/token_metadata.hh | 2 -- service/storage_service.cc | 4 ++-- 3 files changed, 2 insertions(+), 46 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 00718108f4..7574fcd78e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -61,21 +61,6 @@ private: std::optional _topology_change_info; - using ring_mapping = boost::icl::interval_map>; - // For each keyspace, migration_info contains ranges of tokens and - // corresponding replicas to which writes or reads will be directed: - // - pending_endpoints - will be appended to normal endpoints for writes; - // - read_endpoints - will completely replace normal endpoints for reads. - // This data structure is filled only during data migration between nodes - // when they are added or removed from the cluster. - // During normal operation, token mapping to nodes is - // implemented in the effective_replication_map. - struct migration_info { - ring_mapping pending_endpoints; - ring_mapping read_endpoints; - }; - std::unordered_map _keyspace_to_migration_info; - std::vector _sorted_tokens; tablet_metadata _tablets; @@ -244,8 +229,6 @@ public: static range interval_to_range(boost::icl::interval::interval_type i); public: - bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - future<> update_topology_change_info(dc_rack_fn& get_dc_rack); const std::optional& get_topology_change_info() const { return _topology_change_info; @@ -344,10 +327,6 @@ future> token_metadata_impl::clone_async() } ret->_leaving_endpoints = _leaving_endpoints; ret->_replacing_endpoints = _replacing_endpoints; - for (const auto& p : _keyspace_to_migration_info) { - ret->_keyspace_to_migration_info.emplace(p); - co_await coroutine::maybe_yield(); - } ret->_ring_version = _ring_version; co_return ret; } @@ -376,7 +355,6 @@ future<> token_metadata_impl::clear_gently() noexcept { co_await utils::clear_gently(_bootstrap_tokens); co_await utils::clear_gently(_leaving_endpoints); co_await utils::clear_gently(_replacing_endpoints); - co_await utils::clear_gently(_keyspace_to_migration_info); co_await utils::clear_gently(_sorted_tokens); co_await _topology.clear_gently(); co_await _tablets.clear_gently(); @@ -708,21 +686,6 @@ token_metadata_impl::interval_to_range(boost::icl::interval::interval_typ return range({{i.lower(), start_inclusive}}, {{i.upper(), end_inclusive}}); } -bool -token_metadata_impl::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const { - const auto it = _keyspace_to_migration_info.find(keyspace_name); - if (it == _keyspace_to_migration_info.end()) { - return false; - } - for (const auto& item : it->second.pending_endpoints) { - const auto& nodes = item.second; - if (nodes.contains(endpoint)) { - return true; - } - } - return false; -} - future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { co_await utils::clear_gently(_topology_change_info); @@ -1122,11 +1085,6 @@ token_metadata::interval_to_range(boost::icl::interval::interval_type i) return token_metadata_impl::interval_to_range(std::move(i)); } -bool -token_metadata::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const { - return _impl->has_pending_ranges(std::move(keyspace_name), endpoint); -} - future<> token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { return _impl->update_topology_change_info(get_dc_rack); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 192cf2a9bf..e6f8d8a736 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -244,8 +244,6 @@ public: static boost::icl::interval::interval_type range_to_interval(range r); static range interval_to_range(boost::icl::interval::interval_type i); - bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - future<> update_topology_change_info(dc_rack_fn& get_dc_rack); const std::optional& get_topology_change_info() const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 79a2faee27..0ced4c3cc5 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3446,7 +3446,7 @@ future<> storage_service::decommission() { auto non_system_keyspaces = db.get_non_local_vnode_based_strategy_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { - if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) { + if (ss._db.local().find_keyspace(keyspace_name).get_effective_replication_map()->has_pending_ranges(ss.get_broadcast_address())) { throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); } } @@ -5374,7 +5374,7 @@ future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unord future storage_service::is_cleanup_allowed(sstring keyspace) { return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) { auto my_address = ss.get_broadcast_address(); - auto pending_ranges = ss.get_token_metadata().has_pending_ranges(keyspace, my_address); + auto pending_ranges = ss._db.local().find_keyspace(keyspace).get_effective_replication_map()->has_pending_ranges(my_address); bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP; slogger.debug("is_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}", keyspace, is_bootstrap_mode, pending_ranges); From e9a6fcc8e146f623b738141ce657da08895f5e8a Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 16 May 2023 14:23:10 +0400 Subject: [PATCH 19/21] token_metadata_test: refactor tests, extract create_erm No logical changes, just tidied up --- test/boost/token_metadata_test.cc | 80 +++++++++++-------------------- 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 530e7f2f03..ee71474809 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -33,21 +33,25 @@ namespace { } }); } + + template + mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) { + dc_rack_fn get_dc_rack_fn = get_dc_rack; + tmptr->update_topology_change_info(get_dc_rack_fn).get(); + auto strategy = seastar::make_shared(std::move(opts)); + return calculate_effective_replication_map(std::move(strategy), std::move(tmptr)).get0(); + } } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "1"} - }); token_metadata->add_bootstrap_token(t1, e1); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{e1}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), @@ -57,22 +61,18 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); const auto e2 = inet_address("192.168.0.2"); const auto t2 = dht::token::from_int64(100); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->update_topology(e2, get_dc_rack(e2)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "1"} - }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_bootstrap_token(t2, e2); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), @@ -86,8 +86,6 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -95,18 +93,16 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->update_topology(e2, get_dc_rack(e2)); token_metadata->update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "2"} - }); token_metadata->update_normal_tokens({t1, t1000}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), @@ -120,8 +116,6 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -129,19 +123,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->update_topology(e2, get_dc_rack(e2)); token_metadata->update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "2"} - }); token_metadata->update_normal_tokens({t1, t1000}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->update_normal_tokens({t100}, e1).get(); token_metadata->add_leaving_endpoint(e1); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), @@ -155,8 +147,6 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -165,20 +155,18 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); const auto e4 = inet_address("192.168.0.4"); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->update_topology(e2, get_dc_rack(e2)); token_metadata->update_topology(e3, get_dc_rack(e3)); token_metadata->update_topology(e4, get_dc_rack(e4)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "2"} - }); token_metadata->update_normal_tokens({t1000}, e1).get(); token_metadata->update_normal_tokens({t1, t100}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_replacing_endpoint(e3, e4); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)), @@ -196,8 +184,6 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { } SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -205,13 +191,11 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->update_topology(e2, get_dc_rack(e2)); token_metadata->update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "2"} - }); token_metadata->update_normal_tokens({t1, t1000}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); @@ -238,15 +222,13 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas }; { - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); check_no_endpoints(erm, 2); } { token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); check_endpoints(erm, 2, {e3, e1}); check_endpoints(erm, 10, {e3, e1}); @@ -259,19 +241,15 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas } SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto e1 = inet_address("192.168.0.1"); + auto token_metadata = create_token_metadata(e1); token_metadata->update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = seastar::make_shared(replication_strategy_config_options { - {"replication_factor", "2"} - }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_replacing_endpoint(e1, e1); - token_metadata->update_topology_change_info(get_dc_rack_fn).get(); - auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); From 8877641b0f0cee2791a321cf0928a857ccccfbcc Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 16 May 2023 14:24:05 +0400 Subject: [PATCH 20/21] token_metadata_test: check read_endpoints when bootstrapping first node --- test/boost/token_metadata_test.cc | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index ee71474809..b00a565f01 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -43,7 +43,7 @@ namespace { } } -SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { +SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_bootstrap_first_node) { const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); @@ -51,13 +51,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { token_metadata->update_topology(e1, get_dc_rack(e1)); token_metadata->add_bootstrap_token(t1, e1); - auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); - BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), - inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), - inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), - inet_address_vector_topology_change{e1}); + { + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE(!erm->get_endpoints_for_reading(t1).has_value()); + } + { + token_metadata->set_read_new(token_metadata::read_new_t::yes); + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); + BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t1), inet_address_vector_replica_set{e1}); + } } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { From 095f35a47d4dafe5f08f8a76c1fb49b16c2b7592 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 16 May 2023 14:25:11 +0400 Subject: [PATCH 21/21] token_metadata_test: add a test for everywhere strategy --- test/boost/token_metadata_test.cc | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index b00a565f01..2bf1333914 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -11,6 +11,7 @@ #include "utils/fb_utilities.hh" #include "locator/token_metadata.hh" #include "locator/simple_strategy.hh" +#include "locator/everywhere_replication_strategy.hh" using namespace locator; @@ -68,6 +69,26 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_bootstrap_first_nod } } +SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy) { + const auto e1 = inet_address("192.168.0.1"); + const auto e2 = inet_address("192.168.0.2"); + const auto t1 = dht::token::from_int64(10); + const auto t2 = dht::token::from_int64(20); + + auto token_metadata = create_token_metadata(e1); + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_bootstrap_token(t2, e2); + token_metadata->set_read_new(token_metadata::read_new_t::yes); + + auto erm = create_erm(token_metadata); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(t2), + inet_address_vector_topology_change{e2}); + BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t2), + (inet_address_vector_replica_set{e2, e1})); +} + SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1);