token_metdata: futurize update_normal_tokens

The function complexity if O(#tokens) in the worst case
as for each endpoint token to traverses _token_to_endpoint_map
lineraly to erase the endpoint mapping if it exists.

This change renames the current implementation of
update_normal_tokens to update_normal_tokens_sync
and clones the code as a coroutine that returns a future
and may yield if needed.

Eventually we should futurize the whole token_metadata
and abstract_replication_strategy interface and get rid
of the synchronous functions.  Until then the sync
version is still required from call sites that
are neither returning a future nor run in a seastar thread.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-12-17 15:53:49 +02:00
parent e7f4cd89a9
commit e089c22ec1
8 changed files with 117 additions and 38 deletions

View File

@@ -161,7 +161,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
auto range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
//Pending ranges
metadata_clone.update_normal_tokens(_tokens, _address);
metadata_clone.update_normal_tokens(_tokens, _address).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
//Collects the source that will have its range moved to the new node

View File

@@ -23,6 +23,7 @@
#include "utils/class_registrator.hh"
#include "exceptions/exceptions.hh"
#include <boost/range/algorithm/remove_if.hpp>
#include <seastar/core/coroutine.hh>
namespace locator {
@@ -295,10 +296,11 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
token_metadata temp;
if (can_yield) {
temp = tmptr->clone_only_token_map().get0();
temp.update_normal_tokens(pending_tokens, pending_address).get();
} else {
temp = tmptr->clone_only_token_map_sync();
temp.update_normal_tokens_sync(pending_tokens, pending_address);
}
temp.update_normal_tokens(pending_tokens, pending_address);
for (auto& x : get_address_ranges(temp, pending_address, can_yield)) {
ret.push_back(x.second);
}

View File

@@ -30,6 +30,7 @@
#include <algorithm>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include <seastar/core/coroutine.hh>
namespace locator {
@@ -88,9 +89,11 @@ public:
token_metadata_impl(const token_metadata_impl&) = default;
token_metadata_impl(token_metadata_impl&&) noexcept = default;
const std::vector<token>& sorted_tokens() const;
void update_normal_token(token token, inet_address endpoint);
void update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
future<> update_normal_token(token token, inet_address endpoint);
future<> update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
future<> update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
void update_normal_tokens_sync(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens_sync(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
const token& first_token(const token& start) const;
size_t first_token_index(const token& start) const;
std::optional<inet_address> get_endpoint(const token& token) const;
@@ -1036,17 +1039,74 @@ std::vector<token> token_metadata_impl::get_tokens(const inet_address& addr) con
/**
* Update token map with a single token/endpoint pair in normal state.
*/
void token_metadata_impl::update_normal_token(token t, inet_address endpoint)
future<> token_metadata_impl::update_normal_token(token t, inet_address endpoint)
{
update_normal_tokens(std::unordered_set<token>({t}), endpoint);
return update_normal_tokens(std::unordered_set<token>({t}), endpoint);
}
void token_metadata_impl::update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
if (tokens.empty()) {
co_return;
}
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens ({{endpoint, std::move(tokens)}});
co_return co_await update_normal_tokens(endpoint_tokens);
}
void token_metadata_impl::update_normal_tokens_sync(std::unordered_set<token> tokens, inet_address endpoint) {
if (tokens.empty()) {
return;
}
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens ({{endpoint, tokens}});
update_normal_tokens(endpoint_tokens);
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens ({{endpoint, std::move(tokens)}});
update_normal_tokens_sync(endpoint_tokens);
}
// Note: The sync version of this function `update_normal_tokens_sync`
// must be kept in sync with this function if any change is made.
future<> token_metadata_impl::update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
if (endpoint_tokens.empty()) {
co_return;
}
bool should_sort_tokens = false;
for (auto&& i : endpoint_tokens) {
inet_address endpoint = i.first;
const auto& tokens = i.second;
if (tokens.empty()) {
auto msg = format("tokens is empty in update_normal_tokens");
tlogger.error("{}", msg);
throw std::runtime_error(msg);
}
for(auto it = _token_to_endpoint_map.begin(), ite = _token_to_endpoint_map.end(); it != ite;) {
co_await make_ready_future<>(); // maybe yield
if(it->second == endpoint) {
it = _token_to_endpoint_map.erase(it);
} else {
++it;
}
}
_topology.add_endpoint(endpoint);
remove_by_value(_bootstrap_tokens, endpoint);
_leaving_endpoints.erase(endpoint);
invalidate_cached_rings();
for (const token& t : tokens)
{
co_await make_ready_future<>(); // maybe yield
auto prev = _token_to_endpoint_map.insert(std::pair<token, inet_address>(t, endpoint));
should_sort_tokens |= prev.second; // new token inserted -> sort
if (prev.first->second != endpoint) {
tlogger.debug("Token {} changing ownership from {} to {}", t, prev.first->second, endpoint);
prev.first->second = endpoint;
}
}
}
if (should_sort_tokens) {
sort_tokens();
}
co_return;
}
/**
@@ -1056,8 +1116,14 @@ void token_metadata_impl::update_normal_tokens(std::unordered_set<token> tokens,
* is expensive (CASSANDRA-3831).
*
* @param endpointTokens
*
* Note: The futurized version of this function `update_normal_tokens`
* must be kept in sync with this function if any change is made.
*
* This version is meant to be deprecated when the whole interface
* will be futurized.
*/
void token_metadata_impl::update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
void token_metadata_impl::update_normal_tokens_sync(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
if (endpoint_tokens.empty()) {
return;
}
@@ -1068,7 +1134,7 @@ void token_metadata_impl::update_normal_tokens(const std::unordered_map<inet_add
const auto& tokens = i.second;
if (tokens.empty()) {
auto msg = format("tokens is empty in update_normal_tokens");
auto msg = format("tokens is empty in update_normal_tokens_sync");
tlogger.error("{}", msg);
throw std::runtime_error(msg);
}
@@ -1499,7 +1565,7 @@ void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
for (auto& x : tmp) {
auto& endpoint = x.first;
auto& tokens = x.second;
all_left_metadata->update_normal_tokens(tokens, endpoint);
all_left_metadata->update_normal_tokens(tokens, endpoint).get();
for (auto& x : strategy.get_address_ranges(*all_left_metadata, endpoint, can_yield::yes)) {
new_pending_ranges.emplace(x.second, endpoint);
}
@@ -1674,19 +1740,29 @@ token_metadata::sorted_tokens() const {
return _impl->sorted_tokens();
}
void
future<>
token_metadata::update_normal_token(token token, inet_address endpoint) {
_impl->update_normal_token(token, endpoint);
return _impl->update_normal_token(token, endpoint);
}
void
future<>
token_metadata::update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
_impl->update_normal_tokens(std::move(tokens), endpoint);
return _impl->update_normal_tokens(std::move(tokens), endpoint);
}
future<>
token_metadata::update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
return _impl->update_normal_tokens(endpoint_tokens);
}
void
token_metadata::update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
_impl->update_normal_tokens(endpoint_tokens);
token_metadata::update_normal_tokens_sync(std::unordered_set<token> tokens, inet_address endpoint) {
_impl->update_normal_tokens_sync(std::move(tokens), endpoint);
}
void
token_metadata::update_normal_tokens_sync(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
_impl->update_normal_tokens_sync(endpoint_tokens);
}
const token&

View File

@@ -180,9 +180,11 @@ public:
token_metadata& operator=(token_metadata&&) noexcept;
~token_metadata();
const std::vector<token>& sorted_tokens() const;
void update_normal_token(token token, inet_address endpoint);
void update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
future<> update_normal_token(token token, inet_address endpoint);
future<> update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
future<> update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
void update_normal_tokens_sync(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens_sync(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
const token& first_token(const token& start) const;
size_t first_token_index(const token& start) const;
std::optional<inet_address> get_endpoint(const token& token) const;

View File

@@ -1810,7 +1810,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
auto range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
//Pending ranges
metadata_clone.update_normal_tokens(tokens, myip);
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, utils::can_yield::yes);
//Collects the source that will have its range moved to the new node
@@ -2242,7 +2242,7 @@ future<> replace_with_repair(seastar::sharded<database>& db, seastar::sharded<ne
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
co_return co_await do_rebuild_replace_with_repair(db, ms, std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason);
}

View File

@@ -279,7 +279,7 @@ void storage_service::prepare_to_join(
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address);
tmptr->update_normal_tokens(_bootstrap_tokens, *replace_address).get();
} else if (should_bootstrap()) {
check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get();
} else {
@@ -302,7 +302,7 @@ void storage_service::prepare_to_join(
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get();
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
if (!_cdc_streams_ts) {
@@ -543,8 +543,7 @@ void storage_service::join_token_ring(int delay) {
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
tmptr->update_normal_tokens(_bootstrap_tokens, get_broadcast_address());
return make_ready_future<>();
return tmptr->update_normal_tokens(_bootstrap_tokens, get_broadcast_address());
}).get();
if (!db::system_keyspace::bootstrap_complete()) {
@@ -1194,7 +1193,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
// Update pending ranges after update of normal tokens immediately to avoid
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
tmptr->update_normal_tokens(owned_tokens, endpoint);
tmptr->update_normal_tokens(owned_tokens, endpoint).get();
update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint)).get();
replicate_to_all_cores(std::move(tmptr)).get();
tmlock.reset();
@@ -1247,7 +1246,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
slogger.info("Node {} state jump to leaving", endpoint);
handle_cdc_generation(cdc_streams_ts);
tmptr->update_normal_tokens(tokens, endpoint);
tmptr->update_normal_tokens(tokens, endpoint).get();
} else {
auto tokens_ = tmptr->get_tokens(endpoint);
std::set<token> tmp(tokens.begin(), tokens.end());
@@ -1256,7 +1255,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
slogger.debug("tokens_={}, tokens={}", tokens_, tmp);
handle_cdc_generation(cdc_streams_ts);
tmptr->update_normal_tokens(tokens, endpoint);
tmptr->update_normal_tokens(tokens, endpoint).get();
}
}
@@ -1659,7 +1658,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) {
// entry has been mistakenly added, delete it
db::system_keyspace::remove_endpoint(ep).get();
} else {
tmptr->update_normal_tokens(tokens, ep);
tmptr->update_normal_tokens(tokens, ep).get();
if (loaded_host_ids.contains(ep)) {
tmptr->update_host_id(loaded_host_ids.at(ep), ep);
}

View File

@@ -212,7 +212,7 @@ void simple_test() {
for (unsigned i = 0; i < ring_points.size(); i++) {
tmptr->update_normal_token(
{dht::token::kind::key, d2t(ring_points[i].point / ring_points.size())},
ring_points[i].host);
ring_points[i].host).get();
}
stm.set(tmptr);
@@ -316,7 +316,7 @@ void heavy_origin_test() {
}
}
tmptr->update_normal_tokens(tokens);
tmptr->update_normal_tokens(tokens).get();
stm.set(std::move(tmptr));
auto ars_uptr = abstract_replication_strategy::create_replication_strategy(
@@ -614,7 +614,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
for (auto& node : nodes) {
for (size_t i = 0; i < VNODES; ++i) {
tmptr->update_normal_token(dht::token::get_random_token(), node);
tmptr->update_normal_token(dht::token::get_random_token(), node).get();
}
}
stm.set(std::move(tmptr));

View File

@@ -67,7 +67,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
{
// Ring with minimum token
auto tmptr = locator::make_token_metadata_ptr();
tmptr->update_normal_token(dht::minimum_token(), {"10.0.0.1"});
tmptr->update_normal_token(dht::minimum_token(), {"10.0.0.1"}).get();
check(tmptr, dht::partition_range::make_singular(ring[0]), {
dht::partition_range::make_singular(ring[0])
@@ -80,8 +80,8 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
{
auto tmptr = locator::make_token_metadata_ptr();
tmptr->update_normal_token(ring[2].token(), {"10.0.0.1"});
tmptr->update_normal_token(ring[5].token(), {"10.0.0.2"});
tmptr->update_normal_token(ring[2].token(), {"10.0.0.1"}).get();
tmptr->update_normal_token(ring[5].token(), {"10.0.0.2"}).get();
check(tmptr, dht::partition_range::make_singular(ring[0]), {
dht::partition_range::make_singular(ring[0])