diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 93156832b6..0a30fe8a98 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -59,6 +59,8 @@ private: // The map between the existing node to be replaced and the replacing node std::unordered_map _replacing_endpoints; + std::optional _topology_change_info; + using ring_mapping = boost::icl::interval_map>; // For each keyspace, migration_info contains ranges of tokens and // corresponding replicas to which writes or reads will be directed: @@ -257,6 +259,11 @@ public: future<> update_pending_ranges( const token_metadata& unpimplified_this, const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + const std::optional& get_topology_change_info() const { + return _topology_change_info; + } public: token get_predecessor(token t) const; @@ -876,6 +883,86 @@ future<> token_metadata_impl::update_pending_ranges( }); } +future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) { + if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) { + co_await utils::clear_gently(_topology_change_info); + _topology_change_info.reset(); + co_return; + } + + // true if there is a node replaced with the same IP + bool replace_with_same_endpoint = false; + // target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing + auto target_token_metadata = co_await clone_only_token_map(false); + { + // construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints + std::unordered_map> new_normal_tokens; + if (!_replacing_endpoints.empty()) { + for (const auto& [token, inet_address]: _token_to_endpoint_map) { + const auto it = _replacing_endpoints.find(inet_address); + if (it == _replacing_endpoints.end()) { + continue; + } + new_normal_tokens[it->second].insert(token); + } + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + replace_with_same_endpoint = true; + } else { + target_token_metadata->remove_endpoint(replace_from); + } + } + } + for (const auto& [token, inet_address]: _bootstrap_tokens) { + new_normal_tokens[inet_address].insert(token); + } + // apply new_normal_tokens + for (auto& [endpoint, tokens]: new_normal_tokens) { + target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal); + co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint); + } + // apply leaving endpoints + for (const auto& endpoint: _leaving_endpoints) { + target_token_metadata->remove_endpoint(endpoint); + } + target_token_metadata->sort_tokens(); + } + + // We require a distinct token_metadata instance when replace_from equals replace_to, + // as it ensures the node is included in pending_ranges. + // Otherwise, the node would be excluded from both pending_ranges and + // get_natural_endpoints_without_node_being_replaced, + // causing the coordinator to overlook it entirely. + std::unique_ptr base_token_metadata; + if (replace_with_same_endpoint) { + base_token_metadata = co_await clone_only_token_map(false); + for (const auto& [replace_from, replace_to]: _replacing_endpoints) { + if (replace_from == replace_to) { + base_token_metadata->remove_endpoint(replace_from); + } + } + base_token_metadata->sort_tokens(); + } + + // merge tokens from token_to_endpoint and bootstrap_tokens, + // preserving tokens of leaving endpoints + auto all_tokens = std::vector(); + all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size()); + all_tokens.resize(sorted_tokens().size()); + std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens)); + for (const auto& p: get_bootstrap_tokens()) { + all_tokens.push_back(p.first); + } + std::sort(begin(all_tokens), end(all_tokens)); + + auto prev_value = std::move(_topology_change_info); + _topology_change_info.emplace(token_metadata(std::move(target_token_metadata)), + base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt, + std::move(all_tokens), + _read_new); + co_await utils::clear_gently(prev_value); +} + size_t token_metadata_impl::count_normal_token_owners() const { return _normal_token_owners.size(); } @@ -969,6 +1056,23 @@ std::multimap token_metadata_impl::get_endpoint_to_token_ma return cloned; } +topology_change_info::topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_) + : target_token_metadata(std::move(target_token_metadata_)) + , base_token_metadata(std::move(base_token_metadata_)) + , all_tokens(std::move(all_tokens_)) + , read_new(read_new_) +{ +} + +future<> topology_change_info::clear_gently() { + co_await utils::clear_gently(target_token_metadata); + co_await utils::clear_gently(base_token_metadata); + co_await utils::clear_gently(all_tokens); +} + token_metadata::token_metadata(std::unique_ptr impl) : _impl(std::move(impl)) { } @@ -1232,6 +1336,16 @@ token_metadata::update_pending_ranges(const abstract_replication_strategy& strat return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack); } +future<> +token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) { + return _impl->update_topology_change_info(get_dc_rack); +} + +const std::optional& +token_metadata::get_topology_change_info() const { + return _impl->get_topology_change_info(); +} + token token_metadata::get_predecessor(token t) const { return _impl->get_predecessor(t); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 465b53a375..947217a3cd 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -69,6 +69,7 @@ struct host_id_or_endpoint { }; class token_metadata_impl; +struct topology_change_info; class token_metadata final { std::unique_ptr _impl; @@ -256,6 +257,10 @@ public: */ future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack); + future<> update_topology_change_info(dc_rack_fn& get_dc_rack); + + const std::optional& get_topology_change_info() const; + token get_predecessor(token t) const; const std::unordered_set& get_all_endpoints() const; @@ -294,6 +299,19 @@ public: friend class token_metadata_impl; }; +struct topology_change_info { + token_metadata target_token_metadata; + std::optional base_token_metadata; + std::vector all_tokens; + token_metadata::read_new_t read_new; + + topology_change_info(token_metadata target_token_metadata_, + std::optional base_token_metadata_, + std::vector all_tokens_, + token_metadata::read_new_t read_new_); + future<> clear_gently(); +}; + using token_metadata_ptr = lw_shared_ptr; using mutable_token_metadata_ptr = lw_shared_ptr; using token_metadata_lock = semaphore_units<>; diff --git a/service/storage_service.cc b/service/storage_service.cc index 6456c95ad8..ddefa87bb7 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4750,11 +4750,13 @@ future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr assert(this_shard_id() == 0); try { + locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); + co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper); + auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); for (const auto& [keyspace_name, erm] : ks_erms) { auto& strategy = erm->get_replication_strategy(); slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason); - locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); }); co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper); slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason); }