diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 8d6c935131..2d8d9faa45 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -161,7 +161,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n auto range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); //Pending ranges - metadata_clone.update_normal_tokens(_tokens, _address); + metadata_clone.update_normal_tokens(_tokens, _address).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); //Collects the source that will have its range moved to the new node diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index f2d151b8b7..d97e35c65e 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -23,6 +23,7 @@ #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" #include +#include namespace locator { @@ -295,10 +296,11 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p token_metadata temp; if (can_yield) { temp = tmptr->clone_only_token_map().get0(); + temp.update_normal_tokens(pending_tokens, pending_address).get(); } else { temp = tmptr->clone_only_token_map_sync(); + temp.update_normal_tokens_sync(pending_tokens, pending_address); } - temp.update_normal_tokens(pending_tokens, pending_address); for (auto& x : get_address_ranges(temp, pending_address, can_yield)) { ret.push_back(x.second); } diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 38cfe0935e..9522498dcb 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -30,6 +30,7 @@ #include #include #include +#include namespace locator { @@ -88,9 +89,11 @@ public: token_metadata_impl(const token_metadata_impl&) = default; token_metadata_impl(token_metadata_impl&&) noexcept = default; const std::vector& sorted_tokens() const; - void update_normal_token(token token, inet_address endpoint); - void update_normal_tokens(std::unordered_set tokens, inet_address endpoint); - void update_normal_tokens(const std::unordered_map>& endpoint_tokens); + future<> update_normal_token(token token, inet_address endpoint); + future<> update_normal_tokens(std::unordered_set tokens, inet_address endpoint); + future<> update_normal_tokens(const std::unordered_map>& endpoint_tokens); + void update_normal_tokens_sync(std::unordered_set tokens, inet_address endpoint); + void update_normal_tokens_sync(const std::unordered_map>& endpoint_tokens); const token& first_token(const token& start) const; size_t first_token_index(const token& start) const; std::optional get_endpoint(const token& token) const; @@ -1036,17 +1039,74 @@ std::vector token_metadata_impl::get_tokens(const inet_address& addr) con /** * Update token map with a single token/endpoint pair in normal state. */ -void token_metadata_impl::update_normal_token(token t, inet_address endpoint) +future<> token_metadata_impl::update_normal_token(token t, inet_address endpoint) { - update_normal_tokens(std::unordered_set({t}), endpoint); + return update_normal_tokens(std::unordered_set({t}), endpoint); } -void token_metadata_impl::update_normal_tokens(std::unordered_set tokens, inet_address endpoint) { +future<> token_metadata_impl::update_normal_tokens(std::unordered_set tokens, inet_address endpoint) { + if (tokens.empty()) { + co_return; + } + std::unordered_map> endpoint_tokens ({{endpoint, std::move(tokens)}}); + co_return co_await update_normal_tokens(endpoint_tokens); +} + +void token_metadata_impl::update_normal_tokens_sync(std::unordered_set tokens, inet_address endpoint) { if (tokens.empty()) { return; } - std::unordered_map> endpoint_tokens ({{endpoint, tokens}}); - update_normal_tokens(endpoint_tokens); + std::unordered_map> endpoint_tokens ({{endpoint, std::move(tokens)}}); + update_normal_tokens_sync(endpoint_tokens); +} + +// Note: The sync version of this function `update_normal_tokens_sync` +// must be kept in sync with this function if any change is made. +future<> token_metadata_impl::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { + if (endpoint_tokens.empty()) { + co_return; + } + + bool should_sort_tokens = false; + for (auto&& i : endpoint_tokens) { + inet_address endpoint = i.first; + const auto& tokens = i.second; + + if (tokens.empty()) { + auto msg = format("tokens is empty in update_normal_tokens"); + tlogger.error("{}", msg); + throw std::runtime_error(msg); + } + + for(auto it = _token_to_endpoint_map.begin(), ite = _token_to_endpoint_map.end(); it != ite;) { + co_await make_ready_future<>(); // maybe yield + if(it->second == endpoint) { + it = _token_to_endpoint_map.erase(it); + } else { + ++it; + } + } + + _topology.add_endpoint(endpoint); + remove_by_value(_bootstrap_tokens, endpoint); + _leaving_endpoints.erase(endpoint); + invalidate_cached_rings(); + for (const token& t : tokens) + { + co_await make_ready_future<>(); // maybe yield + auto prev = _token_to_endpoint_map.insert(std::pair(t, endpoint)); + should_sort_tokens |= prev.second; // new token inserted -> sort + if (prev.first->second != endpoint) { + tlogger.debug("Token {} changing ownership from {} to {}", t, prev.first->second, endpoint); + prev.first->second = endpoint; + } + } + } + + if (should_sort_tokens) { + sort_tokens(); + } + co_return; } /** @@ -1056,8 +1116,14 @@ void token_metadata_impl::update_normal_tokens(std::unordered_set tokens, * is expensive (CASSANDRA-3831). * * @param endpointTokens + * + * Note: The futurized version of this function `update_normal_tokens` + * must be kept in sync with this function if any change is made. + * + * This version is meant to be deprecated when the whole interface + * will be futurized. */ -void token_metadata_impl::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { +void token_metadata_impl::update_normal_tokens_sync(const std::unordered_map>& endpoint_tokens) { if (endpoint_tokens.empty()) { return; } @@ -1068,7 +1134,7 @@ void token_metadata_impl::update_normal_tokens(const std::unordered_mapupdate_normal_tokens(tokens, endpoint); + all_left_metadata->update_normal_tokens(tokens, endpoint).get(); for (auto& x : strategy.get_address_ranges(*all_left_metadata, endpoint, can_yield::yes)) { new_pending_ranges.emplace(x.second, endpoint); } @@ -1674,19 +1740,29 @@ token_metadata::sorted_tokens() const { return _impl->sorted_tokens(); } -void +future<> token_metadata::update_normal_token(token token, inet_address endpoint) { - _impl->update_normal_token(token, endpoint); + return _impl->update_normal_token(token, endpoint); } -void +future<> token_metadata::update_normal_tokens(std::unordered_set tokens, inet_address endpoint) { - _impl->update_normal_tokens(std::move(tokens), endpoint); + return _impl->update_normal_tokens(std::move(tokens), endpoint); +} + +future<> +token_metadata::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { + return _impl->update_normal_tokens(endpoint_tokens); } void -token_metadata::update_normal_tokens(const std::unordered_map>& endpoint_tokens) { - _impl->update_normal_tokens(endpoint_tokens); +token_metadata::update_normal_tokens_sync(std::unordered_set tokens, inet_address endpoint) { + _impl->update_normal_tokens_sync(std::move(tokens), endpoint); +} + +void +token_metadata::update_normal_tokens_sync(const std::unordered_map>& endpoint_tokens) { + _impl->update_normal_tokens_sync(endpoint_tokens); } const token& diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 84c371069b..0efd88f243 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -180,9 +180,11 @@ public: token_metadata& operator=(token_metadata&&) noexcept; ~token_metadata(); const std::vector& sorted_tokens() const; - void update_normal_token(token token, inet_address endpoint); - void update_normal_tokens(std::unordered_set tokens, inet_address endpoint); - void update_normal_tokens(const std::unordered_map>& endpoint_tokens); + future<> update_normal_token(token token, inet_address endpoint); + future<> update_normal_tokens(std::unordered_set tokens, inet_address endpoint); + future<> update_normal_tokens(const std::unordered_map>& endpoint_tokens); + void update_normal_tokens_sync(std::unordered_set tokens, inet_address endpoint); + void update_normal_tokens_sync(const std::unordered_map>& endpoint_tokens); const token& first_token(const token& start) const; size_t first_token_index(const token& start) const; std::optional get_endpoint(const token& token) const; diff --git a/repair/repair.cc b/repair/repair.cc index 47734d472d..b8539a00f6 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1810,7 +1810,7 @@ future<> bootstrap_with_repair(seastar::sharded& db, seastar::sharded< auto range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); //Pending ranges - metadata_clone.update_normal_tokens(tokens, myip); + metadata_clone.update_normal_tokens(tokens, myip).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes); //Collects the source that will have its range moved to the new node @@ -2242,7 +2242,7 @@ future<> replace_with_repair(seastar::sharded& db, seastar::shardedupdate_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address()); + co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address()); co_return co_await do_rebuild_replace_with_repair(db, ms, std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 116ecfc810..70dfb4b951 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -279,7 +279,7 @@ void storage_service::prepare_to_join( slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}", get_broadcast_address() == *replace_address ? "the same" : "a different", get_broadcast_address(), *replace_address); - tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address); + tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address).get(); } else if (should_bootstrap()) { check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get(); } else { @@ -302,7 +302,7 @@ void storage_service::prepare_to_join( // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. // Therefore we update _token_metadata now, before gossip starts. - tmptr->update_normal_tokens(my_tokens, get_broadcast_address()); + tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get(); _cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0(); if (!_cdc_streams_ts) { @@ -543,8 +543,7 @@ void storage_service::join_token_ring(int delay) { // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. // Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now. - tmptr->update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); - return make_ready_future<>(); + return tmptr->update_normal_tokens(_bootstrap_tokens, get_broadcast_address()); }).get(); if (!db::system_keyspace::bootstrap_complete()) { @@ -1194,7 +1193,7 @@ void storage_service::handle_state_normal(inet_address endpoint) { // Update pending ranges after update of normal tokens immediately to avoid // a race where natural endpoint was updated to contain node A, but A was // not yet removed from pending endpoints - tmptr->update_normal_tokens(owned_tokens, endpoint); + tmptr->update_normal_tokens(owned_tokens, endpoint).get(); update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint)).get(); replicate_to_all_cores(std::move(tmptr)).get(); tmlock.reset(); @@ -1247,7 +1246,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) { slogger.info("Node {} state jump to leaving", endpoint); handle_cdc_generation(cdc_streams_ts); - tmptr->update_normal_tokens(tokens, endpoint); + tmptr->update_normal_tokens(tokens, endpoint).get(); } else { auto tokens_ = tmptr->get_tokens(endpoint); std::set tmp(tokens.begin(), tokens.end()); @@ -1256,7 +1255,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) { slogger.debug("tokens_={}, tokens={}", tokens_, tmp); handle_cdc_generation(cdc_streams_ts); - tmptr->update_normal_tokens(tokens, endpoint); + tmptr->update_normal_tokens(tokens, endpoint).get(); } } @@ -1659,7 +1658,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) { // entry has been mistakenly added, delete it db::system_keyspace::remove_endpoint(ep).get(); } else { - tmptr->update_normal_tokens(tokens, ep); + tmptr->update_normal_tokens(tokens, ep).get(); if (loaded_host_ids.contains(ep)) { tmptr->update_host_id(loaded_host_ids.at(ep), ep); } diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 824fa0347b..08a5d90bd6 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -212,7 +212,7 @@ void simple_test() { for (unsigned i = 0; i < ring_points.size(); i++) { tmptr->update_normal_token( {dht::token::kind::key, d2t(ring_points[i].point / ring_points.size())}, - ring_points[i].host); + ring_points[i].host).get(); } stm.set(tmptr); @@ -316,7 +316,7 @@ void heavy_origin_test() { } } - tmptr->update_normal_tokens(tokens); + tmptr->update_normal_tokens(tokens).get(); stm.set(std::move(tmptr)); auto ars_uptr = abstract_replication_strategy::create_replication_strategy( @@ -614,7 +614,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { for (auto& node : nodes) { for (size_t i = 0; i < VNODES; ++i) { - tmptr->update_normal_token(dht::token::get_random_token(), node); + tmptr->update_normal_token(dht::token::get_random_token(), node).get(); } } stm.set(std::move(tmptr)); diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index fb973d620b..017b26eb4f 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -67,7 +67,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { { // Ring with minimum token auto tmptr = locator::make_token_metadata_ptr(); - tmptr->update_normal_token(dht::minimum_token(), {"10.0.0.1"}); + tmptr->update_normal_token(dht::minimum_token(), {"10.0.0.1"}).get(); check(tmptr, dht::partition_range::make_singular(ring[0]), { dht::partition_range::make_singular(ring[0]) @@ -80,8 +80,8 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { { auto tmptr = locator::make_token_metadata_ptr(); - tmptr->update_normal_token(ring[2].token(), {"10.0.0.1"}); - tmptr->update_normal_token(ring[5].token(), {"10.0.0.2"}); + tmptr->update_normal_token(ring[2].token(), {"10.0.0.1"}).get(); + tmptr->update_normal_token(ring[5].token(), {"10.0.0.2"}).get(); check(tmptr, dht::partition_range::make_singular(ring[0]), { dht::partition_range::make_singular(ring[0])