diff --git a/db/view/view.cc b/db/view/view.cc index adfbcc8498..20588eed9e 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1637,7 +1637,7 @@ future<> view_update_generator::mutate_MV( auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto& keyspace_name = mut.s->ks_name(); auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token); - auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); + auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index d02dd45c3d..33cd81f442 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -64,23 +64,6 @@ sstring abstract_replication_strategy::to_qualified_class_name(std::string_view return strategy_class_registry::to_qualified_class_name(strategy_class_name); } -inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const { - const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); - auto res = erm.get_replication_map().find(key_token); - return res->second; -} - -stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { - const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); - auto res = erm.get_replication_map().find(key_token); - for (const auto& ep : res->second) { - if (func(ep) == stop_iteration::yes) { - return stop_iteration::yes; - } - } - return stop_iteration::no; -} - inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const { inet_address_vector_replica_set natural_endpoints = get_natural_endpoints(search_token); maybe_remove_node_being_replaced(*_tmptr, *_rs, natural_endpoints); @@ -110,8 +93,41 @@ void maybe_remove_node_being_replaced(const token_metadata& tm, } } -inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const { - return _tmptr->pending_endpoints_for(search_token, ks_name); +static const std::unordered_set* find_token(const ring_mapping& ring_mapping, const token& token) { + if (ring_mapping.empty()) { + return nullptr; + } + const auto interval = token_metadata::range_to_interval(range(token)); + const auto it = ring_mapping.find(interval); + return it != ring_mapping.end() ? &it->second : nullptr; +} + +inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token) const { + inet_address_vector_topology_change endpoints; + const auto* pending_endpoints = find_token(_pending_endpoints, search_token); + if (pending_endpoints) { + // interval_map does not work with std::vector, convert to inet_address_vector_topology_change + endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end()); + } + return endpoints; +} + +std::optional vnode_effective_replication_map::get_endpoints_for_reading(const token& token) const { + const auto* endpoints = find_token(_read_endpoints, token); + if (endpoints == nullptr) { + return {}; + } + return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); +} + +bool vnode_effective_replication_map::has_pending_ranges(inet_address endpoint) const { + for (const auto& item : _pending_endpoints) { + const auto& nodes = item.second; + if (nodes.contains(endpoint)) { + return true; + } + } + return false; } std::unique_ptr vnode_effective_replication_map::make_splitter() const { @@ -287,10 +303,10 @@ future> vnode_effective_replication_map::get_range_addresses() const { const token_metadata& tm = *_tmptr; std::unordered_map ret; - for (const auto& [t, eps] : _replication_map) { + for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); for (auto& r : ranges) { - ret.emplace(r, eps); + ret.emplace(r, get_natural_endpoints(t)); } co_await coroutine::maybe_yield(); } @@ -328,65 +344,138 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p co_return ret; } +static const auto default_replication_map_key = dht::token::from_int64(0); + future calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; + ring_mapping pending_endpoints; + ring_mapping read_endpoints; + const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); + replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); + if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) { + const auto& all_tokens = topology_changes->all_tokens; + const auto& base_token_metadata = topology_changes->base_token_metadata + ? *topology_changes->base_token_metadata + : *tmptr; + const auto& current_tokens = tmptr->get_token_to_endpoint(); + for (size_t i = 0, size = all_tokens.size(); i < size; ++i) { + co_await coroutine::maybe_yield(); - if (!sorted_tokens.empty()) { - replication_map.reserve(sorted_tokens.size()); - if (rs->natural_endpoints_depend_on_token()) { - for (const auto &t : sorted_tokens) { - auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); - replication_map.emplace(t, eps.get_vector()); + const auto token = all_tokens[i]; + + auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata); + auto target_endpoints = co_await rs->calculate_natural_endpoints(token, topology_changes->target_token_metadata); + + auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { + using interval = ring_mapping::interval_type; + if (!depend_on_token) { + target += std::make_pair( + interval::open(dht::minimum_token(), dht::maximum_token()), + std::move(endpoints)); + } else if (i == 0) { + target += std::make_pair( + interval::open(all_tokens.back(), dht::maximum_token()), + endpoints); + target += std::make_pair( + interval::left_open(dht::minimum_token(), token), + std::move(endpoints)); + } else { + target += std::make_pair( + interval::left_open(all_tokens[i - 1], token), + std::move(endpoints)); + } + }; + + { + std::unordered_set endpoints_diff; + for (const auto& e: target_endpoints) { + if (!current_endpoints.contains(e)) { + endpoints_diff.insert(e); + } + } + if (!endpoints_diff.empty()) { + add_mapping(pending_endpoints, std::move(endpoints_diff)); + } } - } else { - auto eps = co_await rs->calculate_natural_endpoints(sorted_tokens.front(), *tmptr); - for (const auto &t : sorted_tokens) { - replication_map.emplace(t, eps.get_vector()); - co_await coroutine::maybe_yield(); + + // in order not to waste memory, we update read_endpoints only if the + // new endpoints differs from the old one + if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { + add_mapping(read_endpoints, std::move(target_endpoints).extract_set()); + } + + if (!depend_on_token) { + replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector()); + break; + } else if (current_tokens.contains(token)) { + replication_map.emplace(token, std::move(current_endpoints).extract_vector()); } } + } else if (depend_on_token) { + for (const auto &t : sorted_tokens) { + auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); + replication_map.emplace(t, std::move(eps).extract_vector()); + } + } else { + auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); + replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector()); } auto rf = rs->get_replication_factor(*tmptr); - co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), rf); + co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), + std::move(pending_endpoints), std::move(read_endpoints), rf); } -future vnode_effective_replication_map::clone_endpoints_gently() const { - replication_map cloned_endpoints; +auto vnode_effective_replication_map::clone_data_gently() const -> future> { + auto result = std::make_unique(); for (auto& i : _replication_map) { - cloned_endpoints.emplace(i.first, i.second); + result->replication_map.emplace(i.first, i.second); co_await coroutine::maybe_yield(); } - co_return cloned_endpoints; + for (const auto& i : _pending_endpoints) { + result->pending_endpoints += i; + co_await coroutine::maybe_yield(); + } + + for (const auto& i : _read_endpoints) { + result->read_endpoints += i; + co_await coroutine::maybe_yield(); + } + co_return std::move(result); +} + +const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_natural_endpoints(const token& search_token) const { + const token& key_token = _rs->natural_endpoints_depend_on_token() + ? _tmptr->first_token(search_token) + : default_replication_map_key; + const auto it = _replication_map.find(key_token); + return it->second; } inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const { - return _rs->get_natural_endpoints(search_token, *this); + return do_get_natural_endpoints(search_token); } stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const { - return _rs->for_each_natural_endpoint_until(search_token, *this, func); -} - -future<> vnode_effective_replication_map::clear_gently() noexcept { - co_await utils::clear_gently(_replication_map); - co_await utils::clear_gently(_tmptr); + for (const auto& ep : do_get_natural_endpoints(search_token)) { + if (func(ep) == stop_iteration::yes) { + return stop_iteration::yes; + } + } + return stop_iteration::no; } vnode_effective_replication_map::~vnode_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); try { - struct background_clear_holder { - locator::replication_map replication_map; - locator::token_metadata_ptr tmptr; - }; - auto holder = make_lw_shared({std::move(_replication_map), std::move(_tmptr)}); - auto fut = when_all(utils::clear_gently(holder->replication_map), utils::clear_gently(holder->tmptr)).discard_result().then([holder] {}); - _factory->submit_background_work(std::move(fut)); + _factory->submit_background_work(clear_gently(std::move(_replication_map), + std::move(_pending_endpoints), + std::move(_read_endpoints), + std::move(_tmptr))); } catch (...) { // ignore } @@ -425,8 +514,9 @@ future effective_replication_map_factory::c mutable_vnode_effective_replication_map_ptr new_erm; if (ref_erm) { auto rf = ref_erm->get_replication_factor(); - auto local_replication_map = co_await ref_erm->clone_endpoints_gently(); - new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_replication_map), rf); + auto local_data = co_await ref_erm->clone_data_gently(); + new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_data->replication_map), + std::move(local_data->pending_endpoints), std::move(local_data->read_endpoints), rf); } else { new_erm = co_await calculate_effective_replication_map(std::move(rs), std::move(tmptr)); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index d9e5275b2c..744eb172d6 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -11,6 +11,8 @@ #include #include #include +#include +#include #include "gms/inet_address.hh" #include "gms/feature_service.hh" #include "locator/snitch_base.hh" @@ -65,6 +67,7 @@ protected: replication_strategy_type _my_type; bool _per_table = false; bool _uses_tablets = false; + bool _natural_endpoints_depend_on_token = true; template void err(const char* fmt, Args&&... args) const { @@ -90,7 +93,7 @@ public: // Evaluates to true iff calculate_natural_endpoints // returns different results for different tokens. - virtual bool natural_endpoints_depend_on_token() const noexcept { return true; } + bool natural_endpoints_depend_on_token() const noexcept { return _natural_endpoints_depend_on_token; } // The returned vector has size O(number of normal token owners), which is O(number of nodes in the cluster). // Note: it is not guaranteed that the function will actually yield. If the complexity of a particular implementation @@ -110,9 +113,6 @@ public: static sstring to_qualified_class_name(std::string_view strategy_class_name); - virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const; - // Returns the last stop_iteration result of the called func - virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const; virtual void validate_options(const gms::feature_service&) const = 0; virtual std::optional> recognized_options(const topology&) const = 0; virtual size_t get_replication_factor(const token_metadata& tm) const = 0; @@ -154,6 +154,7 @@ public: future get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const; }; +using ring_mapping = boost::icl::interval_map>; using replication_strategy_ptr = seastar::shared_ptr; using mutable_replication_strategy_ptr = seastar::shared_ptr; @@ -197,7 +198,17 @@ public: /// Returns the set of pending replicas for a given token. /// Pending replica is a replica which gains ownership of data. /// Non-empty only during topology change. - virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const = 0; + virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const = 0; + + /// Returns a list of nodes to which a read request should be directed. + /// Returns not null only during topology changes, if request_read_new was called and + /// new set of replicas differs from the old one. + virtual std::optional get_endpoints_for_reading(const token& search_token) const = 0; + + /// Returns true if there are any pending ranges for this endpoint. + /// This operation is expensive, for vnode_erm it iterates + /// over all pending ranges which is O(number of tokens). + virtual bool has_pending_ranges(inet_address endpoint) const = 0; /// Returns a token_range_splitter which is line with the replica assignment of this replication map. /// The splitter can live longer than this instance. @@ -247,9 +258,10 @@ public: sstring to_sstring() const; }; - private: replication_map _replication_map; + ring_mapping _pending_endpoints; + ring_mapping _read_endpoints; std::optional _factory_key = std::nullopt; effective_replication_map_factory* _factory = nullptr; @@ -258,24 +270,30 @@ private: public: // effective_replication_map inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override; inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override; - inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override; + inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override; + std::optional get_endpoints_for_reading(const token& search_token) const override; + bool has_pending_ranges(inet_address endpoint) const override; std::unique_ptr make_splitter() const override; public: - explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept + explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, + ring_mapping pending_endpoints, ring_mapping read_endpoints, size_t replication_factor) noexcept : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _replication_map(std::move(replication_map)) + , _pending_endpoints(std::move(pending_endpoints)) + , _read_endpoints(std::move(read_endpoints)) { } vnode_effective_replication_map() = delete; vnode_effective_replication_map(vnode_effective_replication_map&&) = default; ~vnode_effective_replication_map(); - const replication_map& get_replication_map() const noexcept { - return _replication_map; - } - - future<> clear_gently() noexcept; - - future clone_endpoints_gently() const; + struct cloned_data { + replication_map replication_map; + ring_mapping pending_endpoints; + ring_mapping read_endpoints; + }; + // boost::icl::interval_map is not no_throw_move_constructible -> can't return cloned_data by val, + // since future_state requires T to be no_throw_move_constructible. + future> clone_data_gently() const; stop_iteration for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const; @@ -309,6 +327,7 @@ public: private: dht::token_range_vector do_get_ranges(noncopyable_function consider_range_for_endpoint) const; + const inet_address_vector_replica_set& do_get_natural_endpoints(const token& search_token) const; public: static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr); @@ -336,9 +355,11 @@ using mutable_vnode_effective_replication_map_ptr = shared_ptr( - std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor); + std::move(rs), std::move(tmptr), std::move(replication_map), + std::move(pending_endpoints), std::move(read_endpoints), replication_factor); } // Apply the replication strategy over the current configuration and the given token_metadata. diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 43126347c0..f7940243db 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -17,38 +17,23 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const replication_strategy_config_options& config_options) : - abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) {} + abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) { + _natural_endpoints_depend_on_token = false; +} future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { - auto eps = tm.get_all_endpoints(); - return make_ready_future(endpoint_set(eps.begin(), eps.end())); + if (tm.sorted_tokens().empty()) { + endpoint_set result{inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})}; + return make_ready_future(std::move(result)); + } + const auto& all_endpoints = tm.get_all_endpoints(); + return make_ready_future(endpoint_set(all_endpoints.begin(), all_endpoints.end())); } size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const { return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners(); } -inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map& erm) const { - const auto& tm = *erm.get_token_metadata_ptr(); - if (tm.sorted_tokens().empty()) { - return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); - } - return boost::copy_range(tm.get_all_endpoints()); -} - -stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { - const auto& tm = *erm.get_token_metadata_ptr(); - if (tm.sorted_tokens().empty()) { - return func(utils::fb_utilities::get_broadcast_address()); - } - for (const auto& ep : tm.get_all_endpoints()) { - if (func(ep)) { - return stop_iteration::yes; - } - } - return stop_iteration::no; -} - using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.EverywhereStrategy"); static registry registrator_short_name("EverywhereStrategy"); diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index a123efbfa1..292d3755fd 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -18,8 +18,6 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const replication_strategy_config_options& config_options); - virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options(const gms::feature_service&) const override { /* noop */ } @@ -34,12 +32,5 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return true; } - - /** - * We need to override this because the default implementation depends - * on token calculations but everywhere_replication_strategy may be used before tokens are set up. - */ - virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index b3df8248b5..b3e479f2c1 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -15,7 +15,9 @@ namespace locator { local_strategy::local_strategy(const replication_strategy_config_options& config_options) : - abstract_replication_strategy(config_options, replication_strategy_type::local) {} + abstract_replication_strategy(config_options, replication_strategy_type::local) { + _natural_endpoints_depend_on_token = false; +} future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { return make_ready_future(endpoint_set({utils::fb_utilities::get_broadcast_address()})); @@ -33,14 +35,6 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const { return 1; } -inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map&) const { - return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); -} - -stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const { - return func(utils::fb_utilities::get_broadcast_address()); -} - using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.LocalStrategy"); static registry registrator_short_name("LocalStrategy"); diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index be4d2856b7..60e58e1d4b 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -27,8 +27,6 @@ public: virtual ~local_strategy() {}; virtual size_t get_replication_factor(const token_metadata&) const override; - virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options(const gms::feature_service&) const override; @@ -38,13 +36,6 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return false; } - - /** - * We need to override this because the default implementation depends - * on token calculations but LocalStrategy may be used before tokens are set up. - */ - inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } diff --git a/locator/tablets.cc b/locator/tablets.cc index 280e9e2f6e..5a732d80bf 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -228,7 +228,7 @@ public: return result; } - virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override { + virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override { auto&& tablets = get_tablet_map(); auto tablet = tablets.get_tablet_id(search_token); auto&& info = tablets.get_tablet_transition_info(tablet); @@ -240,6 +240,23 @@ public: return {get_endpoint_for_host_id(info->pending_replica.host)}; } + virtual std::optional get_endpoints_for_reading(const token& search_token) const override { + return std::nullopt; + } + + virtual bool has_pending_ranges(inet_address endpoint) const override { + const auto host_id = _tmptr->get_host_id_if_known(endpoint); + if (!host_id.has_value()) { + return false; + } + for (const auto& [id, transition_info]: get_tablet_map().transitions()) { + if (transition_info.pending_replica.host == *host_id) { + return true; + } + } + return false; + } + virtual std::unique_ptr make_splitter() const override { class splitter : public token_range_splitter { token_metadata_ptr _tmptr; // To keep the tablet map alive. diff --git a/locator/tablets.hh b/locator/tablets.hh index ae57cb2724..ddb812cd15 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -152,6 +152,10 @@ public: return _tablets; } + const auto& transitions() const { + return _transitions; + } + /// Returns an iterable range over tablet_id:s which includes all tablets in token ring order. auto tablet_ids() const { return boost::irange(0, tablet_count()) | boost::adaptors::transformed([] (size_t i) { diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 361fed8d97..7574fcd78e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -59,20 +59,7 @@ private: // The map between the existing node to be replaced and the replacing node std::unordered_map _replacing_endpoints; - using ring_mapping = boost::icl::interval_map>; - // For each keyspace, migration_info contains ranges of tokens and - // corresponding replicas to which writes or reads will be directed: - // - pending_endpoints - will be appended to normal endpoints for writes; - // - read_endpoints - will completely replace normal endpoints for reads. - // This data structure is filled only during data migration between nodes - // when they are added or removed from the cluster. - // During normal operation, token mapping to nodes is - // implemented in the effective_replication_map. - struct migration_info { - ring_mapping pending_endpoints; - ring_mapping read_endpoints; - }; - std::unordered_map _keyspace_to_migration_info; + std::optional _topology_change_info; std::vector _sorted_tokens; @@ -80,7 +67,7 @@ private: topology _topology; - std::optional _topology_transition_state; + token_metadata::read_new_t _read_new = token_metadata::read_new_t::no; long _ring_version = 0; static thread_local long _static_ring_version; @@ -242,21 +229,10 @@ public: static range interval_to_range(boost::icl::interval::interval_type i); public: - bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - - /** - * Calculate pending ranges according to bootstrapping, leaving and replacing nodes. - * - * We construct an updated version of the token_metadata by incorporating - * all proposed modifications (join, bootstrap, and replace operations). - * Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints - * function applied to both the previous and the new token_metadata. - * Endpoints present in the updated version but absent in the original one - * ought to be appended to the pending_ranges. - */ - future<> update_pending_ranges( - const token_metadata& unpimplified_this, - const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + const std::optional& get_topology_change_info() const { + return _topology_change_info; + } public: token get_predecessor(token t) const; @@ -274,22 +250,14 @@ public: size_t count_normal_token_owners() const; private: future<> update_normal_token_owners(); - - enum class endpoints_field { - pending_endpoints, - read_endpoints - }; - const std::unordered_set* maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const; public: // returns empty vector if keyspace_name not found. inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - void set_topology_transition_state(std::optional state) { - _topology_transition_state = state; + void set_read_new(token_metadata::read_new_t read_new) { + _read_new = read_new; } public: @@ -359,10 +327,6 @@ future> token_metadata_impl::clone_async() } ret->_leaving_endpoints = _leaving_endpoints; ret->_replacing_endpoints = _replacing_endpoints; - for (const auto& p : _keyspace_to_migration_info) { - ret->_keyspace_to_migration_info.emplace(p); - co_await coroutine::maybe_yield(); - } ret->_ring_version = _ring_version; co_return ret; } @@ -381,7 +345,7 @@ future> token_metadata_impl::clone_only_tok co_await coroutine::maybe_yield(); } ret->_tablets = _tablets; - ret->_topology_transition_state =_topology_transition_state; + ret->_read_new = _read_new; co_return ret; } @@ -391,7 +355,6 @@ future<> token_metadata_impl::clear_gently() noexcept { co_await utils::clear_gently(_bootstrap_tokens); co_await utils::clear_gently(_leaving_endpoints); co_await utils::clear_gently(_replacing_endpoints); - co_await utils::clear_gently(_keyspace_to_migration_info); co_await utils::clear_gently(_sorted_tokens); co_await _topology.clear_gently(); co_await _tablets.clear_gently(); @@ -723,157 +686,84 @@ token_metadata_impl::interval_to_range(boost::icl::interval::interval_typ return range({{i.lower(), start_inclusive}}, {{i.upper(), end_inclusive}}); } -bool -token_metadata_impl::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const { - const auto it = _keyspace_to_migration_info.find(keyspace_name); - if (it == _keyspace_to_migration_info.end()) { - return false; - } - for (const auto& item : it->second.pending_endpoints) { - const auto& nodes = item.second; - if (nodes.contains(endpoint)) { - return true; - } - } - return false; -} - -future<> token_metadata_impl::update_pending_ranges( - const token_metadata& unpimplified_this, - const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) { - tlogger.debug("calculate_pending_ranges: keyspace_name={}, bootstrap_tokens={}, leaving nodes={}, replacing_endpoints={}", - keyspace_name, _bootstrap_tokens, _leaving_endpoints, _replacing_endpoints); +future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { - tlogger.debug("No bootstrapping, leaving nodes, replacing nodes -> empty pending ranges for {}", keyspace_name); - _keyspace_to_migration_info.erase(keyspace_name); - return make_ready_future<>(); + co_await utils::clear_gently(_topology_change_info); + _topology_change_info.reset(); + co_return; } - return async([this, &unpimplified_this, &strategy, keyspace_name, &get_dc_rack] () mutable { - // true if there is a node replaced with the same IP - bool replace_with_same_endpoint = false; - // new_token_metadata incorporates all the changes from leaving, bootstrapping and replacing - const auto new_token_metadata = token_metadata(std::invoke([&]() -> std::unique_ptr { - auto result = clone_only_token_map(false).get0(); - - // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints - std::unordered_map> new_normal_tokens; - if (!_replacing_endpoints.empty()) { - for (const auto& [token, inet_address]: _token_to_endpoint_map) { - const auto it = _replacing_endpoints.find(inet_address); - if (it == _replacing_endpoints.end()) { - continue; - } - new_normal_tokens[it->second].insert(token); + // true if there is a node replaced with the same IP + bool replace_with_same_endpoint = false; + // target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing + auto target_token_metadata = co_await clone_only_token_map(false); + { + // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints + std::unordered_map> new_normal_tokens; + if (!_replacing_endpoints.empty()) { + for (const auto& [token, inet_address]: _token_to_endpoint_map) { + const auto it = _replacing_endpoints.find(inet_address); + if (it == _replacing_endpoints.end()) { + continue; } - for (const auto& [replace_from, replace_to]: _replacing_endpoints) { - if (replace_from == replace_to) { - replace_with_same_endpoint = true; - } else { - result->remove_endpoint(replace_from); - } + new_normal_tokens[it->second].insert(token); + } + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + replace_with_same_endpoint = true; + } else { + target_token_metadata->remove_endpoint(replace_from); } } - for (const auto& [token, inet_address]: _bootstrap_tokens) { - new_normal_tokens[inet_address].insert(token); - } - // apply new_normal_tokens - for (auto& [endpoint, tokens]: new_normal_tokens) { - result->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); - result->update_normal_tokens(std::move(tokens), endpoint).get(); - } - // apply leaving endpoints - for (const auto& endpoint: _leaving_endpoints) { - result->remove_endpoint(endpoint); - } - result->sort_tokens(); - return result; - })); - - // We require a distinct token_metadata instance when replace_from equals replace_to, - // as it ensures the node is included in pending_ranges. - // Otherwise, the node would be excluded from both pending_ranges and - // get_natural_endpoints_without_node_being_replaced, - // causing the coordinator to overlook it entirely. - const token_metadata* base_token_metadata; - std::optional self_copy; - if (replace_with_same_endpoint) { - self_copy = token_metadata(std::invoke([&]() -> std::unique_ptr { - auto result = clone_only_token_map(false).get0(); - for (const auto& [replace_from, replace_to]: _replacing_endpoints) { - if (replace_from == replace_to) { - result->remove_endpoint(replace_from); - } - } - result->sort_tokens(); - return result; - })); - base_token_metadata = &*self_copy; - } else { - base_token_metadata = &unpimplified_this; } + for (const auto& [token, inet_address]: _bootstrap_tokens) { + new_normal_tokens[inet_address].insert(token); + } + // apply new_normal_tokens + for (auto& [endpoint, tokens]: new_normal_tokens) { + target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); + co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint); + } + // apply leaving endpoints + for (const auto& endpoint: _leaving_endpoints) { + target_token_metadata->remove_endpoint(endpoint); + } + target_token_metadata->sort_tokens(); + } - // merge tokens from token_to_endpoint and bootstrap_tokens, - // preserving tokens of leaving endpoints - const auto tokens = std::invoke([&]() -> std::vector { - auto tokens = std::vector(); - tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); - tokens.resize(sorted_tokens().size()); - std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(tokens)); - for (const auto& p: get_bootstrap_tokens()) { - tokens.push_back(p.first); + // We require a distinct token_metadata instance when replace_from equals replace_to, + // as it ensures the node is included in pending_ranges. + // Otherwise, the node would be excluded from both pending_ranges and + // get_natural_endpoints_without_node_being_replaced, + // causing the coordinator to overlook it entirely. + std::unique_ptr base_token_metadata; + if (replace_with_same_endpoint) { + base_token_metadata = co_await clone_only_token_map(false); + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + base_token_metadata->remove_endpoint(replace_from); } - std::sort(begin(tokens), end(tokens)); - return tokens; - }); + } + base_token_metadata->sort_tokens(); + } - _keyspace_to_migration_info[keyspace_name] = std::invoke([&]() -> migration_info { - migration_info migration_info; - for (size_t i = 0, size = tokens.size(); i < size; ++i) { - seastar::thread::maybe_yield(); + // merge tokens from token_to_endpoint and bootstrap_tokens, + // preserving tokens of leaving endpoints + auto all_tokens = std::vector(); + all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); + all_tokens.resize(sorted_tokens().size()); + std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens)); + for (const auto& p: get_bootstrap_tokens()) { + all_tokens.push_back(p.first); + } + std::sort(begin(all_tokens), end(all_tokens)); - const auto token = tokens[i]; - - const auto old_endpoints = strategy.calculate_natural_endpoints(token, *base_token_metadata).get0(); - auto new_endpoints = strategy.calculate_natural_endpoints(token, new_token_metadata).get0(); - - auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { - using interval = ring_mapping::interval_type; - if (i == 0) { - target += std::make_pair( - interval::open(tokens.back(), dht::maximum_token()), - endpoints); - target += std::make_pair( - interval::left_open(dht::minimum_token(), token), - std::move(endpoints)); - } else { - target += std::make_pair( - interval::left_open(tokens[i - 1], token), - std::move(endpoints)); - } - }; - - std::unordered_set pending_endpoints; - for (const auto& e: new_endpoints) { - if (!old_endpoints.contains(e)) { - pending_endpoints.insert(e); - } - } - if (!pending_endpoints.empty()) { - add_mapping(migration_info.pending_endpoints, std::move(pending_endpoints)); - } - - // in order not to waste memory, we update read_endpoints only if the - // new endpoints differs from the old one - if (_topology_transition_state == service::topology::transition_state::write_both_read_new && - new_endpoints.get_vector() != old_endpoints.get_vector()) - { - add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set()); - } - } - return migration_info; - }); - }); + auto prev_value = std::move(_topology_change_info); + _topology_change_info.emplace(token_metadata(std::move(target_token_metadata)), + base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt, + std::move(all_tokens), + _read_new); + co_await utils::clear_gently(prev_value); } size_t token_metadata_impl::count_normal_token_owners() const { @@ -911,50 +801,6 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) { _replacing_endpoints.erase(existing_node); } -const std::unordered_set* token_metadata_impl::maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const -{ - // Fast path 0: migration_info not found for this keyspace_name - const auto migration_info_it = _keyspace_to_migration_info.find(keyspace_name); - if (migration_info_it == _keyspace_to_migration_info.end()) { - return nullptr; - } - - // Fast path 1: empty ring_mapping for this keyspace_name - const auto& migration_info = migration_info_it->second; - const auto& ring_mapping = field == endpoints_field::pending_endpoints - ? migration_info.pending_endpoints - : migration_info.read_endpoints; - if (ring_mapping.empty()) { - return nullptr; - } - - // Slow path: lookup remapping - const auto interval = range_to_interval(range(token)); - const auto it = ring_mapping.find(interval); - return it != ring_mapping.end() ? &it->second : nullptr; -} - -inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - inet_address_vector_topology_change endpoints; - const auto* pending_endpoints = maybe_migration_endpoints(endpoints_field::pending_endpoints, - token, keyspace_name); - if (pending_endpoints) { - // interval_map does not work with std::vector, convert to inet_address_vector_topology_change - endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end()); - } - return endpoints; -} - -std::optional token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name); - if (endpoints == nullptr) { - return std::nullopt; - } - return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); -} - std::map token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const { std::map ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end()); ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end()); @@ -969,6 +815,23 @@ std::multimap token_metadata_impl::get_endpoint_to_token_ma return cloned; } +topology_change_info::topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_) + : target_token_metadata(std::move(target_token_metadata_)) + , base_token_metadata(std::move(base_token_metadata_)) + , all_tokens(std::move(all_tokens_)) + , read_new(read_new_) +{ +} + +future<> topology_change_info::clear_gently() { + co_await utils::clear_gently(target_token_metadata); + co_await utils::clear_gently(base_token_metadata); + co_await utils::clear_gently(all_tokens); +} + token_metadata::token_metadata(std::unique_ptr impl) : _impl(std::move(impl)) { } @@ -1222,14 +1085,14 @@ token_metadata::interval_to_range(boost::icl::interval::interval_type i) return token_metadata_impl::interval_to_range(std::move(i)); } -bool -token_metadata::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const { - return _impl->has_pending_ranges(std::move(keyspace_name), endpoint); +future<> +token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { + return _impl->update_topology_change_info(get_dc_rack); } -future<> -token_metadata::update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) { - return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack); +const std::optional& +token_metadata::get_topology_change_info() const { + return _impl->get_topology_change_info(); } token @@ -1247,19 +1110,9 @@ token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } -inet_address_vector_topology_change -token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - return _impl->pending_endpoints_for(token, keyspace_name); -} - -std::optional -token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - return _impl->endpoints_for_reading(token, keyspace_name); -} - void -token_metadata::set_topology_transition_state(std::optional state) { - _impl->set_topology_transition_state(state); +token_metadata::set_read_new(read_new_t read_new) { + _impl->set_read_new(read_new); } std::multimap diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 27a9b86a72..e6f8d8a736 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -28,7 +28,6 @@ #include "locator/types.hh" #include "locator/topology.hh" -#include "service/topology_state_machine.hh" // forward declaration since replica/database.hh includes this file namespace replica { @@ -70,6 +69,7 @@ struct host_id_or_endpoint { }; class token_metadata_impl; +struct topology_change_info; class token_metadata final { std::unique_ptr _impl; @@ -244,18 +244,9 @@ public: static boost::icl::interval::interval_type range_to_interval(range r); static range interval_to_range(boost::icl::interval::interval_type i); - bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const; - /** - * Calculate pending ranges according to bootstrapping, leaving and replacing nodes. - * - * We construct an updated version of the token_metadata by incorporating - * all proposed modifications (join, bootstrap, and replace operations). - * Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints - * function applied to both the previous and the new token_metadata. - * Endpoints present in the updated version but absent in the original one - * ought to be appended to the pending_ranges. - */ - future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + + const std::optional& get_topology_change_info() const; token get_predecessor(token t) const; @@ -265,18 +256,13 @@ public: * Bootstrapping tokens are not taken into account. */ size_t count_normal_token_owners() const; - // returns empty vector if keyspace_name not found. - inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; - - // This function returns a list of nodes to which a read request should be directed. - // Returns not null only during topology changes, if _topology_change_stage == read_new and - // new set of replicas differs from the old one. - std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - - // updates the current topology_transition_state of this instance, - // this value is preserved in all clone functions, - // by default it's not set - void set_topology_transition_state(std::optional state); + // Updates the read_new flag, switching read requests from + // the old endpoints to the new ones during topology changes: + // read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints; + // read_new_t::yes - triggers update_pending_ranges to compute and store new ranges for read requests. + // The value is preserved in all clone functions, the default is read_new_t::no. + using read_new_t = bool_class; + void set_read_new(read_new_t value); /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ std::multimap get_endpoint_to_token_map_for_reading() const; @@ -292,6 +278,19 @@ public: friend class token_metadata_impl; }; +struct topology_change_info { + token_metadata target_token_metadata; + std::optional base_token_metadata; + std::vector all_tokens; + token_metadata::read_new_t read_new; + + topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_); + future<> clear_gently(); +}; + using token_metadata_ptr = lw_shared_ptr; using mutable_token_metadata_ptr = lw_shared_ptr; using token_metadata_lock = semaphore_units<>; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1e142d4b31..8531901af0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2845,7 +2845,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); - inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token, s->ks_name()); + inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token); slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints); tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints); @@ -3208,7 +3208,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level storage_proxy::paxos_participants storage_proxy::get_paxos_participants(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token &token, db::consistency_level cl_for_paxos) { inet_address_vector_replica_set natural_endpoints = erm.get_natural_endpoints_without_node_being_replaced(token); - inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token, ks_name); + inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token); if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) { auto local_dc_filter = erm.get_topology().get_local_dc_filter(); @@ -6008,7 +6008,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i } inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const { - auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name); + auto endpoints = erm.get_endpoints_for_reading(token); if (!endpoints) { endpoints = erm.get_natural_endpoints_without_node_being_replaced(token); } diff --git a/service/storage_service.cc b/service/storage_service.cc index e37e4c4c82..25024d4b86 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -263,7 +263,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela slogger.info("waiting for schema information to complete"); co_await sleep_abortable(std::chrono::seconds(1), _abort_source); } - co_await update_pending_ranges("joining"); + co_await update_topology_change_info("joining"); auto tmptr = get_token_metadata_ptr(); if (!_db.local().get_config().consistent_rangemovement() || @@ -360,7 +360,20 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await add_normal_node(id, rs); } - tmptr->set_topology_transition_state(_topology_state_machine._topology.tstate); + tmptr->set_read_new(std::invoke([](std::optional state) { + using read_new_t = locator::token_metadata::read_new_t; + if (!state.has_value()) { + return read_new_t::no; + } + switch (*state) { + case topology::transition_state::commit_cdc_generation: + case topology::transition_state::publish_cdc_generation: + case topology::transition_state::write_both_read_old: + return read_new_t::no; + case topology::transition_state::write_both_read_new: + return read_new_t::yes; + } + }, _topology_state_machine._topology.tstate)); for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) { locator::host_id host_id{id.uuid()}; @@ -385,7 +398,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); } else { tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); - co_await update_pending_ranges(tmptr, ::format("bootstrapping node {}/{}", id, ip)); + co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip)); } break; case node_state::decommissioning: @@ -394,7 +407,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); tmptr->update_host_id(host_id, ip); tmptr->add_leaving_endpoint(ip); - co_await update_pending_ranges(tmptr, ::format("{} {}/{}", rs.state, id, ip)); + co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip)); break; case node_state::replacing: { assert(_topology_state_machine._topology.req_param.contains(id)); @@ -407,7 +420,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s assert(existing_ip); tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); tmptr->add_replacing_endpoint(*existing_ip, ip); - co_await update_pending_ranges(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); + co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); } break; case node_state::rebuilding: @@ -2037,7 +2050,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st auto endpoint = get_broadcast_address(); tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining); tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint); - return update_pending_ranges(std::move(tmptr), ::format("bootstrapping node {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint)); }).get(); } @@ -2137,7 +2150,7 @@ future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_t replacing_node, std::current_exception()); } slogger.info("handle_state_replacing: Update pending ranges for replacing node {}", replacing_node); - co_await update_pending_ranges(tmptr, ::format("handle_state_replacing {}", replacing_node)); + co_await update_topology_change_info(tmptr, ::format("handle_state_replacing {}", replacing_node)); } future<> storage_service::handle_state_bootstrap(inet_address endpoint) { @@ -2169,7 +2182,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) { if (_gossiper.uses_host_id(endpoint)) { tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint); } - co_await update_pending_ranges(tmptr, ::format("handle_state_bootstrap {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_bootstrap {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -2311,7 +2324,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) { co_await tmptr->update_normal_tokens(owned_tokens, endpoint); } - co_await update_pending_ranges(tmptr, ::format("handle_state_normal {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_normal {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); @@ -2376,7 +2389,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) { // normally tmptr->add_leaving_endpoint(endpoint); - co_await update_pending_ranges(tmptr, ::format("handle_state_leaving", endpoint)); + co_await update_topology_change_info(tmptr, ::format("handle_state_leaving", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -2439,7 +2452,7 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint); // Note that the endpoint is being removed tmptr->add_leaving_endpoint(endpoint); - return update_pending_ranges(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); }); // find the endpoint coordinating this removal that we need to notify when we're done auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR); @@ -2589,7 +2602,7 @@ future<> storage_service::on_remove(gms::inet_address endpoint) { auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); tmptr->remove_endpoint(endpoint); - co_await update_pending_ranges(tmptr, ::format("on_remove {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("on_remove {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); } @@ -3568,11 +3581,11 @@ future<> storage_service::decommission() { throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode)); } - ss.update_pending_ranges(::format("decommission {}", endpoint)).get(); + ss.update_topology_change_info(::format("decommission {}", endpoint)).get(); auto non_system_keyspaces = db.get_non_local_vnode_based_strategy_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { - if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) { + if (ss._db.local().find_keyspace(keyspace_name).get_effective_replication_map()->has_pending_ranges(ss.get_broadcast_address())) { throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); } } @@ -4128,7 +4141,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -4136,7 +4149,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }); }); } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { @@ -4176,7 +4189,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -4184,7 +4197,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); } - return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes)); + return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }); }); } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { @@ -4246,7 +4259,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); tmptr->del_replacing_endpoint(existing_node); } - return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes)); + return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes)); }); }); } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { @@ -4263,7 +4276,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad // Update the pending_ranges for the replacing node slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); mutate_token_metadata([&req, this] (mutable_token_metadata_ptr tmptr) mutable { - return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes)); + return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes)); }).get(); } else if (req.cmd == node_ops_cmd::replace_heartbeat) { slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); @@ -4288,7 +4301,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining); tmptr->add_bootstrap_tokens(tokens, endpoint); } - return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); + return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); }).get(); node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { @@ -4298,7 +4311,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); tmptr->remove_bootstrap_tokens(tokens); } - return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); + return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); }); }); } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { @@ -4714,7 +4727,7 @@ future<> storage_service::excise(std::unordered_set tokens, inet_address tmptr->remove_endpoint(endpoint); tmptr->remove_bootstrap_tokens(tokens); - co_await update_pending_ranges(tmptr, ::format("excise {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint)); co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); @@ -4760,7 +4773,7 @@ future<> storage_service::leave_ring() { co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) { auto endpoint = get_broadcast_address(); tmptr->remove_endpoint(endpoint); - return update_pending_ranges(std::move(tmptr), ::format("leave_ring {}", endpoint)); + return update_topology_change_info(std::move(tmptr), ::format("leave_ring {}", endpoint)); }); auto expire_time = _gossiper.compute_expire_time().time_since_epoch().count(); @@ -4922,28 +4935,22 @@ future<> storage_service::mutate_token_metadata(std::function (mutable_ co_await replicate_to_all_cores(std::move(tmptr)); } -future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason) { +future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) { assert(this_shard_id() == 0); try { - auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); - for (const auto& [keyspace_name, erm] : ks_erms) { - auto& strategy = erm->get_replication_strategy(); - slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason); - locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); - co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper); - slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason); - } + locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); + co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper); } catch (...) { auto ep = std::current_exception(); - slogger.error("Failed to update pending ranges for {}: {}", reason, ep); + slogger.error("Failed to update topology change info for {}: {}", reason, ep); std::rethrow_exception(std::move(ep)); } } -future<> storage_service::update_pending_ranges(sstring reason, acquire_merge_lock acquire_merge_lock) { +future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) { return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable { - return update_pending_ranges(std::move(tmptr), std::move(reason)); + return update_topology_change_info(std::move(tmptr), std::move(reason)); }, acquire_merge_lock); } @@ -4951,7 +4958,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) { // Update pending ranges since keyspace can be changed after we calculate pending ranges. sstring reason = ::format("keyspace {}", ks_name); return container().invoke_on(0, [reason = std::move(reason)] (auto& ss) mutable { - return ss.update_pending_ranges(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) { + return ss.update_topology_change_info(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) { slogger.warn("Failure to update pending ranges for {} ignored", reason); }); }); @@ -5556,7 +5563,7 @@ future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unord future storage_service::is_cleanup_allowed(sstring keyspace) { return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) { auto my_address = ss.get_broadcast_address(); - auto pending_ranges = ss.get_token_metadata().has_pending_ranges(keyspace, my_address); + auto pending_ranges = ss._db.local().find_keyspace(keyspace).get_effective_replication_map()->has_pending_ranges(my_address); bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP; slogger.debug("is_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}", keyspace, is_bootstrap_mode, pending_ranges); diff --git a/service/storage_service.hh b/service/storage_service.hh index 506bd353f2..700c6c97e1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -190,8 +190,8 @@ private: // Update pending ranges locally and then replicate to all cores. // Should be serialized under token_metadata_lock. // Must be called on shard 0. - future<> update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason); - future<> update_pending_ranges(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes); + future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason); + future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes); future<> keyspace_changed(const sstring& ks_name); void register_metrics(); future<> snitch_reconfigured(); diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 33697035d4..2bf1333914 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -11,6 +11,7 @@ #include "utils/fb_utilities.hh" #include "locator/token_metadata.hh" #include "locator/simple_strategy.hh" +#include "locator/everywhere_replication_strategy.hh" using namespace locator; @@ -24,8 +25,8 @@ namespace { }; } - token_metadata create_token_metadata(inet_address this_endpoint) { - return token_metadata({ + mutable_token_metadata_ptr create_token_metadata(inet_address this_endpoint) { + return make_lw_shared(token_metadata::config { topology::config { .this_host_id = host_id::create_random_id(), .this_endpoint = this_endpoint, @@ -33,59 +34,87 @@ namespace { } }); } + + template + mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) { + dc_rack_fn get_dc_rack_fn = get_dc_rack; + tmptr->update_topology_change_info(get_dc_rack_fn).get(); + auto strategy = seastar::make_shared(std::move(opts)); + return calculate_effective_replication_map(std::move(strategy), std::move(tmptr)).get0(); + } } -SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - +SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_bootstrap_first_node) { const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "1"} - }); - token_metadata.add_bootstrap_token(t1, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name), - inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), - inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), - inet_address_vector_topology_change{e1}); + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->add_bootstrap_token(t1, e1); + + { + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), + inet_address_vector_topology_change{e1}); + BOOST_REQUIRE(!erm->get_endpoints_for_reading(t1).has_value()); + } + { + token_metadata->set_read_new(token_metadata::read_new_t::yes); + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); + BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t1), inet_address_vector_replica_set{e1}); + } +} + +SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy) { + const auto e1 = inet_address("192.168.0.1"); + const auto e2 = inet_address("192.168.0.2"); + const auto t1 = dht::token::from_int64(10); + const auto t2 = dht::token::from_int64(20); + + auto token_metadata = create_token_metadata(e1); + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_bootstrap_token(t2, e2); + token_metadata->set_read_new(token_metadata::read_new_t::yes); + + auto erm = create_erm(token_metadata); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(t2), + inet_address_vector_topology_change{e2}); + BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t2), + (inet_address_vector_replica_set{e2, e1})); } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto e1 = inet_address("192.168.0.1"); const auto t1 = dht::token::from_int64(1); const auto e2 = inet_address("192.168.0.2"); const auto t2 = dht::token::from_int64(100); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "1"} - }); - token_metadata.update_normal_tokens({t1}, e1).get(); - token_metadata.add_bootstrap_token(t2, e2); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_bootstrap_token(t2, e2); + + auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -93,32 +122,29 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "2"} - }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_bootstrap_token(t100, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_bootstrap_token(t100, e1); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -126,33 +152,30 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "2"} - }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.update_normal_tokens({t100}, e1).get(); - token_metadata.add_leaving_endpoint(e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->update_normal_tokens({t100}, e1).get(); + token_metadata->add_leaving_endpoint(e1); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -161,38 +184,35 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); const auto e4 = inet_address("192.168.0.4"); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - token_metadata.update_topology(e4, get_dc_rack(e4)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "2"} - }); - token_metadata.update_normal_tokens({t1000}, e1).get(); - token_metadata.update_normal_tokens({t1, t100}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_replacing_endpoint(e3, e4); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name), + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + token_metadata->update_topology(e4, get_dc_rack(e4)); + token_metadata->update_normal_tokens({t1000}, e1).get(); + token_metadata->update_normal_tokens({t1, t100}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_replacing_endpoint(e3, e4); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1000), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1001), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1001)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(10), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(10)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{}); } SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto t10 = dht::token::from_int64(10); const auto t100 = dht::token::from_int64(100); @@ -200,66 +220,66 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); - auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - token_metadata.update_topology(e2, get_dc_rack(e2)); - token_metadata.update_topology(e3, get_dc_rack(e3)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "2"} - }); - token_metadata.update_normal_tokens({t1, t1000}, e2).get(); - token_metadata.update_normal_tokens({t10}, e3).get(); - token_metadata.add_bootstrap_token(t100, e1); - auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas, + auto token_metadata = create_token_metadata(e1); + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_topology(e2, get_dc_rack(e2)); + token_metadata->update_topology(e3, get_dc_rack(e3)); + token_metadata->update_normal_tokens({t1, t1000}, e2).get(); + token_metadata->update_normal_tokens({t10}, e3).get(); + token_metadata->add_bootstrap_token(t100, e1); + + auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, + inet_address_vector_replica_set expected_replicas, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); const auto expected_set = std::unordered_set(expected_replicas.begin(), expected_replicas.end()); - const auto actual_replicas = token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name); + const auto actual_replicas = erm->get_endpoints_for_reading(dht::token::from_int64(t)); BOOST_REQUIRE(actual_replicas.has_value()); const auto actual_set = std::unordered_set(actual_replicas->begin(), actual_replicas->end()); BOOST_REQUIRE_EQUAL(expected_set, actual_set); }; - auto check_no_endpoints = [&](int64_t t, + auto check_no_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); - BOOST_REQUIRE(!token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value()); + BOOST_REQUIRE(!erm->get_endpoints_for_reading(dht::token::from_int64(t)).has_value()); }; - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - check_no_endpoints(2); + { + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); + check_no_endpoints(erm, 2); + } - token_metadata.set_topology_transition_state(service::topology::transition_state::write_both_read_new); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); + { + token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); - check_endpoints(2, {e3, e1}); - check_endpoints(10, {e3, e1}); - check_endpoints(11, {e1, e2}); - check_endpoints(100, {e1, e2}); - check_no_endpoints(101); - check_no_endpoints(1001); - check_no_endpoints(1); + check_endpoints(erm, 2, {e3, e1}); + check_endpoints(erm, 10, {e3, e1}); + check_endpoints(erm, 11, {e1, e2}); + check_endpoints(erm, 100, {e1, e2}); + check_no_endpoints(erm, 101); + check_no_endpoints(erm, 1001); + check_no_endpoints(erm, 1); + } } SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; - const auto t1 = dht::token::from_int64(1); const auto e1 = inet_address("192.168.0.1"); + auto token_metadata = create_token_metadata(e1); - token_metadata.update_topology(e1, get_dc_rack(e1)); - const auto replication_strategy = simple_strategy(replication_strategy_config_options { - {"replication_factor", "2"} - }); - token_metadata.update_normal_tokens({t1}, e1).get(); - token_metadata.add_replacing_endpoint(e1, e1); - token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology(e1, get_dc_rack(e1)); + token_metadata->update_normal_tokens({t1}, e1).get(); + token_metadata->add_replacing_endpoint(e1, e1); + + auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata.get_endpoint(t1), e1); + BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); } diff --git a/utils/sequenced_set.hh b/utils/sequenced_set.hh index 97d523ff00..19f2498408 100644 --- a/utils/sequenced_set.hh +++ b/utils/sequenced_set.hh @@ -129,6 +129,10 @@ public: return _set; } + auto extract_vector() && noexcept { + return std::move(_vec); + } + auto extract_set() && noexcept { return std::move(_set); } diff --git a/utils/stall_free.hh b/utils/stall_free.hh index bb0a0dda69..6d855175e9 100644 --- a/utils/stall_free.hh +++ b/utils/stall_free.hh @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "utils/collection-concepts.hh" using namespace seastar; @@ -95,7 +97,7 @@ concept TriviallyClearableSequence = template concept Container = Iterable && requires (T x, typename T::iterator it) { - { x.erase(it) } -> std::same_as; + x.erase(it); }; template @@ -174,6 +176,14 @@ future<> clear_gently(foreign_ptr& o) noexcept { }); } +template +requires (std::is_rvalue_reference_v && ...) +future<> clear_gently(T&&... o) noexcept { + return do_with(std::move(o)..., [](auto&... args) { + return when_all(clear_gently(args)...).discard_result(); + }); +} + template future<> clear_gently(T& o) noexcept { if (o.use_count() == 1) {