/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "token_metadata.hh" #include #include "locator/snitch_base.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/tablets.hh" #include "utils/chunked_vector.hh" #include "utils/log.hh" #include "partition_range_compat.hh" #include #include #include #include #include #include #include #include "utils/assert.hh" #include "utils/stall_free.hh" #include "gms/gossiper.hh" namespace locator { static logging::logger tlogger("token_metadata"); template static void remove_by_value(C& container, V value) { for (auto it = container.begin(); it != container.end();) { if (it->second == value) { it = container.erase(it); } else { it++; } } } class token_metadata_impl final { private: /** * Maintains token to endpoint map of every node in the cluster. * Each Token is associated with exactly one Address, but each Address may have * multiple tokens. Hence, the BiMultiValMap collection. */ // FIXME: have to be BiMultiValMap std::unordered_map _token_to_endpoint_map; // Track the unique set of nodes in _token_to_endpoint_map std::unordered_set _normal_token_owners; std::unordered_map _bootstrap_tokens; std::unordered_set _leaving_endpoints; // The map between the existing node to be replaced and the replacing node std::unordered_map _replacing_endpoints; std::optional _topology_change_info; utils::chunked_vector _sorted_tokens; tablet_metadata _tablets; topology _topology; token_metadata::read_new_t _read_new = token_metadata::read_new_t::no; long _ring_version = 0; static thread_local long _static_ring_version; // Zero means that token_metadata versions are not supported, // this will be used in RPC handling to decide whether we // need to apply fencing or not. // The initial valid version is 1; token_metadata::version_t _version = 0; token_metadata::version_tracker_t _version_tracker; // Note: if any member is added to this class // clone_async() must be updated to copy that member. void sort_tokens(); const tablet_metadata& tablets() const { return _tablets; } tablet_metadata& tablets() { return _tablets; } void set_tablets(tablet_metadata&& tablets) { _tablets = std::move(tablets); invalidate_cached_rings(); } struct shallow_copy {}; public: token_metadata_impl(shallow_copy, const token_metadata_impl& o) noexcept : _topology(topology::shallow_copy{}, topology::config{}) {} token_metadata_impl(token_metadata::config cfg) noexcept : _topology(std::move(cfg.topo_cfg)) {}; token_metadata_impl(const token_metadata_impl&) = delete; // it's too huge for direct copy, use clone_async() token_metadata_impl(token_metadata_impl&&) noexcept = default; const utils::chunked_vector& sorted_tokens() const; future<> update_normal_tokens(std::unordered_set tokens, host_id endpoint); const token& first_token(const token& start) const; size_t first_token_index(const token& start) const; std::optional get_endpoint(const token& token) const; std::vector get_tokens(const host_id& addr) const; const std::unordered_map& get_token_to_endpoint() const { return _token_to_endpoint_map; } const std::unordered_set& get_leaving_endpoints() const { return _leaving_endpoints; } const std::unordered_map& get_bootstrap_tokens() const { return _bootstrap_tokens; } void update_topology(host_id id, std::optional opt_dr, std::optional opt_st, std::optional shard_count = std::nullopt) { _topology.add_or_update_endpoint(id, std::move(opt_dr), std::move(opt_st), std::move(shard_count)); } /** * Creates an iterable range of the sorted tokens starting at the token next * after the given one. * * @param start A token that will define the beginning of the range * * @return The requested range (see the description above) */ std::ranges::subrange ring_range(const token& start) const; std::ranges::subrange ring_range(dht::ring_position_view pos) const; topology& get_topology() { return _topology; } const topology& get_topology() const { return _topology; } void debug_show() const; /** @return a copy of host id set for read-only operations */ std::unordered_set get_host_ids() const; void add_bootstrap_token(token t, host_id endpoint); void add_bootstrap_tokens(std::unordered_set tokens, host_id endpoint); void remove_bootstrap_tokens(std::unordered_set tokens); void add_leaving_endpoint(host_id endpoint); void del_leaving_endpoint(host_id endpoint); public: void remove_endpoint(host_id endpoint); bool is_normal_token_owner(host_id endpoint) const; bool is_leaving(host_id endpoint) const; // Is this node being replaced by another node bool is_being_replaced(host_id endpoint) const; // Is any node being replaced by another node bool is_any_node_being_replaced() const; void add_replacing_endpoint(host_id existing_node, host_id replacing_node); void del_replacing_endpoint(host_id existing_node); public: /** * Create a full copy of token_metadata_impl using asynchronous continuations. * The caller must ensure that the cloned object will not change if * the function yields. */ future> clone_async() const noexcept; /** * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges, * bootstrap tokens and leaving endpoints are not included in the copy. * The caller must ensure that the cloned object will not change if * the function yields. */ future> clone_only_token_map(bool clone_sorted_tokens = true) const noexcept; /** * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all * current leave operations have finished. * * @return new token metadata */ future> clone_after_all_left() const noexcept { return clone_only_token_map(false).then([this] (std::unique_ptr all_left_metadata) { for (auto endpoint : _leaving_endpoints) { all_left_metadata->remove_endpoint(endpoint); } all_left_metadata->sort_tokens(); return all_left_metadata; }); } /** * Destroy the token_metadata members using continuations * to prevent reactor stalls. */ future<> clear_gently() noexcept; public: dht::token_range_vector get_primary_ranges_for(std::unordered_set tokens) const; dht::token_range_vector get_primary_ranges_for(token right) const; static boost::icl::interval::interval_type range_to_interval(wrapping_interval r); static wrapping_interval interval_to_range(boost::icl::interval::interval_type i); public: future<> update_topology_change_info(dc_rack_fn& get_dc_rack); const std::optional& get_topology_change_info() const { return _topology_change_info; } public: token get_predecessor(token t) const; // Returns nodes that are officially part of the ring. It does not include: // - nodes that are still joining the cluster, e.g., a node that is still // streaming data before it finishes the bootstrap process and turns into // NORMAL status, // - zero-token nodes (the ones with join_ring=false). const std::unordered_set& get_normal_token_owners() const noexcept { return _normal_token_owners; } void for_each_token_owner(std::function func) const; /* Returns the number of different endpoints that own tokens in the ring. * Bootstrapping tokens are not taken into account. */ size_t count_normal_token_owners() const; std::unordered_map> get_datacenter_token_owners() const; std::unordered_map>> get_datacenter_racks_token_owners() const; std::unordered_map>> get_datacenter_token_owners_nodes() const; std::unordered_map>>> get_datacenter_racks_token_owners_nodes() const; private: future<> update_normal_token_owners(); public: void set_read_new(token_metadata::read_new_t read_new) { _read_new = read_new; } public: long get_ring_version() const { return _ring_version; } void invalidate_cached_rings() { _ring_version = ++_static_ring_version; tlogger.debug("ring_version={}", _ring_version); } token_metadata::version_t get_version() const { return _version; } void set_version(token_metadata::version_t version) { if (version <= 0) { on_internal_error(tlogger, format("token_metadata_impl::set_version: invalid new version {}", version)); } if (version < _version) { on_internal_error(tlogger, format("token_metadata_impl::set_version: new version can't be smaller than the previous one, " "new version {}, previous version {}", version, _version)); } _version = version; } void set_version_tracker(token_metadata::version_tracker_t tracker) { _version_tracker = std::move(tracker); } friend class token_metadata; }; thread_local long token_metadata_impl::_static_ring_version; tokens_iterator::tokens_iterator(const token& start, const token_metadata_impl* token_metadata) : _token_metadata(token_metadata) { _cur_it = _token_metadata->sorted_tokens().begin() + _token_metadata->first_token_index(start); _remaining = _token_metadata->sorted_tokens().size(); } bool tokens_iterator::operator==(const tokens_iterator& it) const { return _remaining == it._remaining; } const token& tokens_iterator::operator*() const { return *_cur_it; } tokens_iterator& tokens_iterator::operator++() { ++_cur_it; if (_cur_it == _token_metadata->sorted_tokens().end()) { _cur_it = _token_metadata->sorted_tokens().begin(); } --_remaining; return *this; } host_id token_metadata::get_my_id() const { return get_topology().get_config().this_host_id; } inline std::ranges::subrange token_metadata_impl::ring_range(const token& start) const { auto begin = token_metadata::tokens_iterator(start, this); auto end = token_metadata::tokens_iterator(); return std::ranges::subrange(begin, end); } future> token_metadata_impl::clone_async() const noexcept { auto ret = co_await clone_only_token_map(); ret->_bootstrap_tokens.reserve(_bootstrap_tokens.size()); for (const auto& p : _bootstrap_tokens) { ret->_bootstrap_tokens.emplace(p); co_await coroutine::maybe_yield(); } ret->_leaving_endpoints = _leaving_endpoints; ret->_replacing_endpoints = _replacing_endpoints; ret->_ring_version = _ring_version; ret->_version = _version; co_return ret; } future> token_metadata_impl::clone_only_token_map(bool clone_sorted_tokens) const noexcept { auto ret = std::make_unique(shallow_copy{}, *this); ret->_token_to_endpoint_map.reserve(_token_to_endpoint_map.size()); for (const auto& p : _token_to_endpoint_map) { ret->_token_to_endpoint_map.emplace(p); co_await coroutine::maybe_yield(); } ret->_normal_token_owners = _normal_token_owners; ret->_topology = co_await _topology.clone_gently(); if (clone_sorted_tokens) { ret->_sorted_tokens = _sorted_tokens; co_await coroutine::maybe_yield(); } ret->_tablets = co_await _tablets.copy(); ret->_read_new = _read_new; co_return ret; } future<> token_metadata_impl::clear_gently() noexcept { _version_tracker = {}; co_await utils::clear_gently(_token_to_endpoint_map); co_await utils::clear_gently(_normal_token_owners); co_await utils::clear_gently(_bootstrap_tokens); co_await utils::clear_gently(_leaving_endpoints); co_await utils::clear_gently(_replacing_endpoints); co_await utils::clear_gently(_sorted_tokens); co_await _topology.clear_gently(); co_await _tablets.clear_gently(); co_return; } void token_metadata_impl::sort_tokens() { utils::chunked_vector sorted; sorted.reserve(_token_to_endpoint_map.size()); for (auto&& i : _token_to_endpoint_map) { sorted.push_back(i.first); } std::sort(sorted.begin(), sorted.end()); _sorted_tokens = std::move(sorted); } const tablet_metadata& token_metadata::tablets() const { return _impl->tablets(); } tablet_metadata& token_metadata::tablets() { return _impl->tablets(); } void token_metadata::set_tablets(tablet_metadata tm) { _impl->set_tablets(std::move(tm)); } const utils::chunked_vector& token_metadata_impl::sorted_tokens() const { return _sorted_tokens; } std::vector token_metadata_impl::get_tokens(const host_id& addr) const { std::vector res; for (auto&& i : _token_to_endpoint_map) { if (i.second == addr) { res.push_back(i.first); } } std::sort(res.begin(), res.end()); return res; } future<> token_metadata_impl::update_normal_tokens(std::unordered_set tokens, host_id endpoint) { if (tokens.empty()) { co_return; } if (!_topology.has_node(endpoint)) { on_internal_error(tlogger, format("token_metadata_impl: {} must be a member of topology to update normal tokens", endpoint)); } bool should_sort_tokens = false; // Phase 1: erase all tokens previously owned by the endpoint. for(auto it = _token_to_endpoint_map.begin(), ite = _token_to_endpoint_map.end(); it != ite;) { co_await coroutine::maybe_yield(); if(it->second == endpoint) { auto tokit = tokens.find(it->first); if (tokit == tokens.end()) { // token no longer owned by endpoint it = _token_to_endpoint_map.erase(it); continue; } // token ownership did not change, // no further update needed for it. tokens.erase(tokit); } ++it; } // Phase 2: // a. ... // b. update pending _bootstrap_tokens and _leaving_endpoints // c. update _token_to_endpoint_map with the new endpoint->token mappings // - set `should_sort_tokens` if new tokens were added remove_by_value(_bootstrap_tokens, endpoint); _leaving_endpoints.erase(endpoint); invalidate_cached_rings(); for (const token& t : tokens) { co_await coroutine::maybe_yield(); auto prev = _token_to_endpoint_map.insert(std::pair(t, endpoint)); should_sort_tokens |= prev.second; // new token inserted -> sort if (prev.first->second != endpoint) { tlogger.debug("Token {} changing ownership from {} to {}", t, prev.first->second, endpoint); prev.first->second = endpoint; } } co_await update_normal_token_owners(); // New tokens were added to _token_to_endpoint_map // so re-sort all tokens. if (should_sort_tokens) { sort_tokens(); } co_return; } size_t token_metadata_impl::first_token_index(const token& start) const { if (_sorted_tokens.empty()) { auto msg = format("sorted_tokens is empty in first_token_index!"); tlogger.error("{}", msg); throw std::runtime_error(msg); } auto it = std::lower_bound(_sorted_tokens.begin(), _sorted_tokens.end(), start); if (it == _sorted_tokens.end()) { return 0; } else { return std::distance(_sorted_tokens.begin(), it); } } const token& token_metadata_impl::first_token(const token& start) const { return _sorted_tokens[first_token_index(start)]; } std::optional token_metadata_impl::get_endpoint(const token& token) const { auto it = _token_to_endpoint_map.find(token); if (it == _token_to_endpoint_map.end()) { return std::nullopt; } else { return it->second; } } void token_metadata_impl::debug_show() const { auto reporter = std::make_shared>(); reporter->set_callback ([reporter, this] { fmt::print("Endpoint -> Token\n"); for (auto x : _token_to_endpoint_map) { fmt::print("inet_address={}, token={}\n", x.second, x.first); } fmt::print("Sorted Token\n"); for (auto x : _sorted_tokens) { fmt::print("token={}\n", x); } }); reporter->arm_periodic(std::chrono::seconds(1)); } std::unordered_set token_metadata_impl::get_host_ids() const { return _topology.get_nodes() | std::views::filter([&] (const node& n) { return !n.left() && !n.is_none(); }) | std::views::transform([] (const node& n) { return n.host_id(); }) | std::ranges::to(); } bool token_metadata_impl::is_normal_token_owner(host_id endpoint) const { return _normal_token_owners.contains(endpoint); } void token_metadata_impl::add_bootstrap_token(token t, host_id endpoint) { std::unordered_set tokens{t}; add_bootstrap_tokens(tokens, endpoint); } std::ranges::subrange token_metadata_impl::ring_range(const dht::ring_position_view start) const { return ring_range(start.token()); } void token_metadata_impl::add_bootstrap_tokens(std::unordered_set tokens, host_id endpoint) { for (auto t : tokens) { auto old_endpoint = _bootstrap_tokens.find(t); if (old_endpoint != _bootstrap_tokens.end() && (*old_endpoint).second != endpoint) { auto msg = format("Bootstrap Token collision between {} and {} (token {}", (*old_endpoint).second, endpoint, t); throw std::runtime_error(msg); } auto old_endpoint2 = _token_to_endpoint_map.find(t); if (old_endpoint2 != _token_to_endpoint_map.end() && (*old_endpoint2).second != endpoint) { auto msg = format("Bootstrap Token collision between {} and {} (token {}", (*old_endpoint2).second, endpoint, t); throw std::runtime_error(msg); } } std::erase_if(_bootstrap_tokens, [endpoint] (const std::pair& n) { return n.second == endpoint; }); for (auto t : tokens) { _bootstrap_tokens[t] = endpoint; } } void token_metadata_impl::remove_bootstrap_tokens(std::unordered_set tokens) { if (tokens.empty()) { tlogger.warn("tokens is empty in remove_bootstrap_tokens!"); return; } for (auto t : tokens) { _bootstrap_tokens.erase(t); } } bool token_metadata_impl::is_leaving(host_id endpoint) const { return _leaving_endpoints.contains(endpoint); } bool token_metadata_impl::is_being_replaced(host_id endpoint) const { return _replacing_endpoints.contains(endpoint); } bool token_metadata_impl::is_any_node_being_replaced() const { return !_replacing_endpoints.empty(); } void token_metadata_impl::remove_endpoint(host_id endpoint) { remove_by_value(_bootstrap_tokens, endpoint); remove_by_value(_token_to_endpoint_map, endpoint); _normal_token_owners.erase(endpoint); _topology.remove_endpoint(endpoint); _leaving_endpoints.erase(endpoint); del_replacing_endpoint(endpoint); invalidate_cached_rings(); } token token_metadata_impl::get_predecessor(token t) const { auto& tokens = sorted_tokens(); auto it = std::lower_bound(tokens.begin(), tokens.end(), t); if (it == tokens.end() || *it != t) { auto msg = format("token error in get_predecessor!"); tlogger.error("{}", msg); throw std::runtime_error(msg); } if (it == tokens.begin()) { // If the token is the first element, its preprocessor is the last element return tokens.back(); } else { return *(--it); } } dht::token_range_vector token_metadata_impl::get_primary_ranges_for(std::unordered_set tokens) const { dht::token_range_vector ranges; ranges.reserve(tokens.size() + 1); // one of the ranges will wrap for (auto right : tokens) { auto left = get_predecessor(right); ::compat::unwrap_into( wrapping_interval(interval_bound(left, false), interval_bound(right)), dht::token_comparator(), [&] (auto&& rng) { ranges.push_back(std::move(rng)); }); } return ranges; } dht::token_range_vector token_metadata_impl::get_primary_ranges_for(token right) const { return get_primary_ranges_for(std::unordered_set{right}); } boost::icl::interval::interval_type token_metadata_impl::range_to_interval(wrapping_interval r) { bool start_inclusive = false; bool end_inclusive = false; token start = dht::minimum_token(); token end = dht::maximum_token(); if (r.start()) { start = r.start()->value(); start_inclusive = r.start()->is_inclusive(); } if (r.end()) { end = r.end()->value(); end_inclusive = r.end()->is_inclusive(); } if (start_inclusive == false && end_inclusive == false) { return boost::icl::interval::open(std::move(start), std::move(end)); } else if (start_inclusive == false && end_inclusive == true) { return boost::icl::interval::left_open(std::move(start), std::move(end)); } else if (start_inclusive == true && end_inclusive == false) { return boost::icl::interval::right_open(std::move(start), std::move(end)); } else { return boost::icl::interval::closed(std::move(start), std::move(end)); } } wrapping_interval token_metadata_impl::interval_to_range(boost::icl::interval::interval_type i) { bool start_inclusive; bool end_inclusive; auto bounds = i.bounds().bits(); if (bounds == boost::icl::interval_bounds::static_open) { start_inclusive = false; end_inclusive = false; } else if (bounds == boost::icl::interval_bounds::static_left_open) { start_inclusive = false; end_inclusive = true; } else if (bounds == boost::icl::interval_bounds::static_right_open) { start_inclusive = true; end_inclusive = false; } else if (bounds == boost::icl::interval_bounds::static_closed) { start_inclusive = true; end_inclusive = true; } else { throw std::runtime_error("Invalid boost::icl::interval bounds"); } return wrapping_interval({{i.lower(), start_inclusive}}, {{i.upper(), end_inclusive}}); } future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { co_await utils::clear_gently(_topology_change_info); _topology_change_info.reset(); co_return; } // target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing auto target_token_metadata = co_await clone_only_token_map(false); { // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints std::unordered_map> new_normal_tokens; if (!_replacing_endpoints.empty()) { for (const auto& [token, inet_address]: _token_to_endpoint_map) { const auto it = _replacing_endpoints.find(inet_address); if (it == _replacing_endpoints.end()) { continue; } new_normal_tokens[it->second].insert(token); } for (const auto& [replace_from, replace_to]: _replacing_endpoints) { target_token_metadata->remove_endpoint(replace_from); } } for (const auto& [token, inet_address]: _bootstrap_tokens) { new_normal_tokens[inet_address].insert(token); } // apply new_normal_tokens for (auto& [endpoint, tokens]: new_normal_tokens) { target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint); } // apply leaving endpoints for (const auto& endpoint: _leaving_endpoints) { target_token_metadata->remove_endpoint(endpoint); } target_token_metadata->sort_tokens(); } // merge tokens from token_to_endpoint and bootstrap_tokens, // preserving tokens of leaving endpoints auto all_tokens = sorted_tokens(); all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); for (const auto& p: get_bootstrap_tokens()) { all_tokens.push_back(p.first); } std::sort(all_tokens.begin(), all_tokens.end()); auto prev_value = std::move(_topology_change_info); _topology_change_info.emplace(make_lw_shared(std::move(target_token_metadata)), std::move(all_tokens), _read_new); co_await utils::clear_gently(prev_value); } size_t token_metadata_impl::count_normal_token_owners() const { return _normal_token_owners.size(); } void token_metadata_impl::for_each_token_owner(std::function func) const { _topology.for_each_node([this, func = std::move(func)] (const node& node) { if (is_normal_token_owner(node.host_id())) { func(node); } }); } std::unordered_map> token_metadata_impl::get_datacenter_token_owners() const { std::unordered_map> datacenter_token_owners; _topology.for_each_node([&] (const node& n) { if (is_normal_token_owner(n.host_id())) { datacenter_token_owners[n.dc_rack().dc].insert(n.host_id()); } }); return datacenter_token_owners; } std::unordered_map>> token_metadata_impl::get_datacenter_racks_token_owners() const { std::unordered_map>> dc_racks_token_owners; _topology.for_each_node([&] (const node& n) { const auto& dc_rack = n.dc_rack(); if (is_normal_token_owner(n.host_id())) { dc_racks_token_owners[dc_rack.dc][dc_rack.rack].insert(n.host_id()); } }); return dc_racks_token_owners; } std::unordered_map>> token_metadata_impl::get_datacenter_token_owners_nodes() const { std::unordered_map>> datacenter_token_owners; _topology.for_each_node([&] (const node& n) { if (is_normal_token_owner(n.host_id())) { datacenter_token_owners[n.dc_rack().dc].insert(std::cref(n)); } }); return datacenter_token_owners; } std::unordered_map>>> token_metadata_impl::get_datacenter_racks_token_owners_nodes() const { std::unordered_map>>> dc_racks_token_owners; _topology.for_each_node([&] (const node& n) { const auto& dc_rack = n.dc_rack(); if (is_normal_token_owner(n.host_id())) { dc_racks_token_owners[dc_rack.dc][dc_rack.rack].insert(std::cref(n)); } }); return dc_racks_token_owners; } future<> token_metadata_impl::update_normal_token_owners() { std::unordered_set eps; for (auto [t, ep]: _token_to_endpoint_map) { eps.insert(ep); co_await coroutine::maybe_yield(); } _normal_token_owners = std::move(eps); } void token_metadata_impl::add_leaving_endpoint(host_id endpoint) { _leaving_endpoints.emplace(endpoint); } void token_metadata_impl::del_leaving_endpoint(host_id endpoint) { _leaving_endpoints.erase(endpoint); } void token_metadata_impl::add_replacing_endpoint(host_id existing_node, host_id replacing_node) { if (existing_node == replacing_node) { on_internal_error(tlogger, seastar::format("Can't replace node {} with itself", existing_node)); } tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}", replacing_node, existing_node); _replacing_endpoints[existing_node] = replacing_node; } void token_metadata_impl::del_replacing_endpoint(host_id existing_node) { if (_replacing_endpoints.contains(existing_node)) { tlogger.info("Removed node {} as pending replacing endpoint which replaces existing node {}", _replacing_endpoints[existing_node], existing_node); } _replacing_endpoints.erase(existing_node); } topology_change_info::topology_change_info(lw_shared_ptr target_token_metadata_, utils::chunked_vector all_tokens_, token_metadata::read_new_t read_new_) : target_token_metadata(std::move(target_token_metadata_)) , all_tokens(std::move(all_tokens_)) , read_new(read_new_) { } future<> topology_change_info::clear_gently() { co_await utils::clear_gently(target_token_metadata); co_await utils::clear_gently(all_tokens); } token_metadata::token_metadata(std::unique_ptr impl) : _impl(std::move(impl)) { } token_metadata::token_metadata(shared_token_metadata& stm, config cfg) : _shared_token_metadata(&stm) , _impl(std::make_unique(std::move(cfg))) { } token_metadata::~token_metadata() { clear_and_dispose_impl(); } token_metadata::token_metadata(token_metadata&&) noexcept = default; token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept { if (this != &o) { clear_and_dispose_impl(); _shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr); _impl = std::exchange(o._impl, nullptr); } return *this; } void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) { _shared_token_metadata = &stm; } shared_token_metadata& token_metadata::get_shared_token_metadata() { return *_shared_token_metadata; } const utils::chunked_vector& token_metadata::sorted_tokens() const { return _impl->sorted_tokens(); } future<> token_metadata::update_normal_tokens(std::unordered_set tokens, host_id endpoint) { return _impl->update_normal_tokens(std::move(tokens), endpoint); } const token& token_metadata::first_token(const token& start) const { return _impl->first_token(start); } size_t token_metadata::first_token_index(const token& start) const { return _impl->first_token_index(start); } std::optional token_metadata::get_endpoint(const token& token) const { return _impl->get_endpoint(token); } std::vector token_metadata::get_tokens(const host_id& addr) const { return _impl->get_tokens(addr); } const std::unordered_map& token_metadata::get_token_to_endpoint() const { return _impl->get_token_to_endpoint(); } const std::unordered_set& token_metadata::get_leaving_endpoints() const { return _impl->get_leaving_endpoints(); } const std::unordered_map& token_metadata::get_bootstrap_tokens() const { return _impl->get_bootstrap_tokens(); } void token_metadata::update_topology(host_id ep, std::optional opt_dr, std::optional opt_st, std::optional shard_count) { _impl->update_topology(ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count)); } std::ranges::subrange token_metadata::ring_range(const token& start) const { return _impl->ring_range(start); } std::ranges::subrange token_metadata::ring_range(dht::ring_position_view start) const { return _impl->ring_range(start); } class token_metadata_ring_splitter : public locator::token_range_splitter { token_metadata_ptr _tmptr; std::ranges::subrange _range; public: token_metadata_ring_splitter(token_metadata_ptr tmptr) : _tmptr(std::move(tmptr)) , _range(_tmptr->sorted_tokens().empty() // ring_range() throws if the ring is empty ? std::ranges::subrange(token_metadata::tokens_iterator(), token_metadata::tokens_iterator()) : _tmptr->ring_range(dht::minimum_token())) { } void reset(dht::ring_position_view pos) override { _range = _tmptr->ring_range(pos); } std::optional next_token() override { if (_range.empty()) { return std::nullopt; } auto t = *_range.begin(); _range.advance(1); return t; } }; std::unique_ptr make_splitter(token_metadata_ptr tmptr) { return std::make_unique(std::move(tmptr)); } class local_splitter : public locator::token_range_splitter { std::optional _next_token; public: local_splitter() : _next_token(dht::minimum_token()) {} void reset(dht::ring_position_view pos) override { _next_token = pos.token(); } std::optional next_token() override { if (auto ret = std::exchange(_next_token, std::nullopt)) { return ret; } return std::nullopt; } }; topology& token_metadata::get_topology() { return _impl->get_topology(); } const topology& token_metadata::get_topology() const { return _impl->get_topology(); } void token_metadata::debug_show() const { _impl->debug_show(); } std::unordered_set token_metadata::get_host_ids() const { return _impl->get_host_ids(); } void token_metadata::add_bootstrap_token(token t, host_id endpoint) { _impl->add_bootstrap_token(t, endpoint); } void token_metadata::add_bootstrap_tokens(std::unordered_set tokens, host_id endpoint) { _impl->add_bootstrap_tokens(std::move(tokens), endpoint); } void token_metadata::remove_bootstrap_tokens(std::unordered_set tokens) { _impl->remove_bootstrap_tokens(std::move(tokens)); } void token_metadata::add_leaving_endpoint(host_id endpoint) { _impl->add_leaving_endpoint(endpoint); } void token_metadata::del_leaving_endpoint(host_id endpoint) { _impl->del_leaving_endpoint(endpoint); } void token_metadata::remove_endpoint(host_id endpoint) { _impl->remove_endpoint(endpoint); _impl->sort_tokens(); } bool token_metadata::is_normal_token_owner(host_id endpoint) const { return _impl->is_normal_token_owner(endpoint); } bool token_metadata::is_leaving(host_id endpoint) const { return _impl->is_leaving(endpoint); } bool token_metadata::is_being_replaced(host_id endpoint) const { return _impl->is_being_replaced(endpoint); } bool token_metadata::is_any_node_being_replaced() const { return _impl->is_any_node_being_replaced(); } void token_metadata::add_replacing_endpoint(host_id existing_node, host_id replacing_node) { _impl->add_replacing_endpoint(existing_node, replacing_node); } void token_metadata::del_replacing_endpoint(host_id existing_node) { _impl->del_replacing_endpoint(existing_node); } future token_metadata::clone_async() const noexcept { co_return token_metadata(co_await _impl->clone_async()); } future token_metadata::clone_only_token_map() const noexcept { co_return token_metadata(co_await _impl->clone_only_token_map()); } future token_metadata::clone_after_all_left() const noexcept { co_return token_metadata(co_await _impl->clone_after_all_left()); } void token_metadata::clear_and_dispose_impl() noexcept { if (!_shared_token_metadata) { return; } if (auto impl = std::exchange(_impl, nullptr)) { _shared_token_metadata->clear_and_dispose(std::move(impl)); } } future<> token_metadata::clear_gently() noexcept { return _impl->clear_gently(); } dht::token_range_vector token_metadata::get_primary_ranges_for(std::unordered_set tokens) const { return _impl->get_primary_ranges_for(std::move(tokens)); } dht::token_range_vector token_metadata::get_primary_ranges_for(token right) const { return _impl->get_primary_ranges_for(right); } boost::icl::interval::interval_type token_metadata::range_to_interval(wrapping_interval r) { return token_metadata_impl::range_to_interval(std::move(r)); } wrapping_interval token_metadata::interval_to_range(boost::icl::interval::interval_type i) { return token_metadata_impl::interval_to_range(std::move(i)); } future<> token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { return _impl->update_topology_change_info(get_dc_rack); } const std::optional& token_metadata::get_topology_change_info() const { return _impl->get_topology_change_info(); } token token_metadata::get_predecessor(token t) const { return _impl->get_predecessor(t); } const std::unordered_set& token_metadata::get_normal_token_owners() const { return _impl->get_normal_token_owners(); } void token_metadata::for_each_token_owner(std::function func) const { return _impl->for_each_token_owner(func); } size_t token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } std::unordered_map> token_metadata::get_datacenter_token_owners() const { return _impl->get_datacenter_token_owners(); } std::unordered_map>> token_metadata::get_datacenter_racks_token_owners() const { return _impl->get_datacenter_racks_token_owners(); } std::unordered_map>> token_metadata::get_datacenter_token_owners_nodes() const { return _impl->get_datacenter_token_owners_nodes(); } std::unordered_map>>> token_metadata::get_datacenter_racks_token_owners_nodes() const { return _impl->get_datacenter_racks_token_owners_nodes(); } void token_metadata::set_read_new(read_new_t read_new) { _impl->set_read_new(read_new); } long token_metadata::get_ring_version() const { return _impl->get_ring_version(); } void token_metadata::invalidate_cached_rings() { _impl->invalidate_cached_rings(); } auto token_metadata::get_version() const -> version_t { return _impl->get_version(); } void token_metadata::set_version(version_t version) { _impl->set_version(version); } void token_metadata::set_version_tracker(version_tracker_t tracker) { _impl->set_version_tracker(std::move(tracker)); } version_tracker::~version_tracker() { if (_expired_at) { auto now = std::chrono::steady_clock::now(); if (*_expired_at + _log_threshold < now) { auto d = std::chrono::duration_cast>(now - *_expired_at); tlogger.warn("topology version {} held for {:.3f} [s] past expiry, released at: {}", _version, d.count(), seastar::current_backtrace()); } } } version_tracker shared_token_metadata::new_tracker(token_metadata::version_t version) { auto tracker = version_tracker(_versions_barrier.start(), version); _trackers.push_front(tracker); return tracker; } future<> shared_token_metadata::stop() noexcept { co_await _background_dispose_gate.close(); } void shared_token_metadata::clear_and_dispose(std::unique_ptr impl) noexcept { // Safe to drop the future since the gate is closed in stop() if (auto gh = _background_dispose_gate.try_hold()) { (void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {}); } } void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); } if (_shared->get_version() > tmptr->get_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set decreasing version: {} -> {}", _shared->get_version(), tmptr->get_version())); } else if (_shared->get_version() < tmptr->get_version()) { _stale_versions_in_use = _versions_barrier.advance_and_await(); } tmptr->set_shared_token_metadata(*this); _shared = std::move(tmptr); _shared->set_version_tracker(new_tracker(_shared->get_version())); for (auto&& v : _trackers) { if (v.version() != _shared->get_version()) { v.mark_expired(_stall_detector_threshold); } } tlogger.debug("new token_metadata is set, version {}", _shared->get_version()); } future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func) { auto lk = co_await get_lock(); auto tm = co_await _shared->clone_async(); // bump the token_metadata ring_version // to invalidate cached token/replication mappings // when the modified token_metadata is committed. tm.invalidate_cached_rings(); co_await func(tm); set(make_token_metadata_ptr(std::move(tm))); } void shared_token_metadata::mutate_token_metadata_for_test(seastar::noncopyable_function func) { auto& tm = *_shared; // bump the token_metadata ring_version // to invalidate cached token/replication mappings // when the modified token_metadata is committed. tm.invalidate_cached_rings(); func(tm); } future<> shared_token_metadata::mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func) { auto base_shard = this_shard_id(); SCYLLA_ASSERT(base_shard == 0); auto lk = co_await stm.local().get_lock(); std::vector pending_token_metadata_ptr; pending_token_metadata_ptr.resize(smp::count); auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async()); auto& tm = *tmptr; // bump the token_metadata ring_version // to invalidate cached token/replication mappings // when the modified token_metadata is committed. tm.invalidate_cached_rings(); co_await func(tm); // Apply the mutated token_metadata only after successfully cloning it on all shards. pending_token_metadata_ptr[base_shard] = tmptr; co_await smp::invoke_on_others(base_shard, [&] () -> future<> { pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async()); }); co_await stm.invoke_on_all([&] (shared_token_metadata& stm) { stm.set(std::move(pending_token_metadata_ptr[this_shard_id()])); }); } host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) { switch (restrict) { case param_type::host_id: try { _value = host_id(utils::UUID(s)); } catch (const marshal_exception& e) { throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what())); } break; case param_type::endpoint: try { _value = gms::inet_address(s); } catch (std::invalid_argument& e) { throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what())); } break; case param_type::auto_detect: try { _value = host_id(utils::UUID(s)); } catch (const marshal_exception& e) { try { _value = gms::inet_address(s); } catch (std::invalid_argument& e) { throw std::invalid_argument(format("Invalid host_id or inet_address {}", s)); } } } } host_id host_id_or_endpoint::resolve_id(const gms::gossiper& g) const { if (has_host_id()) { return id(); } try { return g.get_host_id(endpoint()); } catch (...) { throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint())); } } gms::inet_address host_id_or_endpoint::resolve_endpoint(const gms::gossiper& g) const { if (has_endpoint()) { return endpoint(); } auto endpoint_opt = g.get_address_map().find(id()); if (!endpoint_opt) { throw std::runtime_error(format("Host ID {} not found in the cluster", id())); } return *endpoint_opt; } future<> pending_token_metadata::assign(locator::mutable_token_metadata_ptr new_token_metadata) { auto& sharded_token_metadata = new_token_metadata->get_shared_token_metadata().container(); // clone a local copy of new_token_metadata on all other shards co_await smp::invoke_on_others([this, &new_token_metadata, &sharded_token_metadata] () -> future<> { local() = sharded_token_metadata.local().make_token_metadata_ptr( co_await new_token_metadata->clone_async()); }); local() = std::move(new_token_metadata); } locator::mutable_token_metadata_ptr& pending_token_metadata::local() { return _shards[this_shard_id()]; } locator::token_metadata_ptr pending_token_metadata::local() const { return _shards[this_shard_id()]; } future<> pending_token_metadata::destroy() { return smp::invoke_on_all([this] () { _shards[this_shard_id()] = nullptr; }); } } // namespace locator