topology: Require entry in the map for update_normal_tokens()

The method in question tries to be on the safest side and adds the
enpoint for which it updates the tokens into the topology. From now on
it's up to the caller to put the endpoint into topology in advance.

So most of what this patch does is places topology.update_endpoint()
into the relevant places of the code.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-08-23 15:05:18 +03:00
parent 5fc9854eae
commit 4cbe6ee9f4
6 changed files with 24 additions and 6 deletions

View File

@@ -288,6 +288,7 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
dht::token_range_vector ret;
token_metadata temp;
temp = co_await tmptr->clone_only_token_map();
temp.update_topology(pending_address, std::move(dr));
co_await temp.update_normal_tokens(pending_tokens, pending_address);
for (const auto& t : temp.sorted_tokens()) {
auto eps = co_await calculate_natural_endpoints(t, temp);

View File

@@ -422,6 +422,10 @@ future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tok
co_return;
}
if (!is_member(endpoint)) {
on_internal_error(tlogger, format("token_metadata_impl: {} must be member to update normal tokens", endpoint));
}
bool should_sort_tokens = false;
// Phase 1: erase all tokens previously owned by the endpoint.
@@ -442,11 +446,10 @@ future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tok
}
// Phase 2:
// a. Add the endpoint to _topology if needed.
// a. ...
// b. update pending _bootstrap_tokens and _leaving_endpoints
// c. update _token_to_endpoint_map with the new endpoint->token mappings
// - set `should_sort_tokens` if new tokens were added
_topology.add_endpoint(endpoint, {});
remove_by_value(_bootstrap_tokens, endpoint);
_leaving_endpoints.erase(endpoint);
invalidate_cached_rings();
@@ -1263,10 +1266,6 @@ void topology::add_endpoint(const inet_address& ep, endpoint_dc_rack dr)
}
void topology::update_endpoint(inet_address ep, endpoint_dc_rack dr) {
if (!_current_locations.contains(ep) || !locator::i_endpoint_snitch::snitch_instance().local_is_initialized()) {
return;
}
add_endpoint(ep, std::move(dr));
}

View File

@@ -1377,6 +1377,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
//Pending ranges
metadata_clone.update_topology(myip, {});
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
metadata_clone.clear_gently().get();
@@ -1831,6 +1832,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), {});
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
}

View File

@@ -326,6 +326,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_topology(*replace_address, {});
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
@@ -362,6 +363,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_topology(get_broadcast_address(), {});
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
cdc_gen_id = co_await db::system_keyspace::get_cdc_generation_id();
@@ -538,6 +540,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
tmptr->update_topology(get_broadcast_address(), {});
return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address());
});
@@ -683,6 +686,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->update_topology(endpoint, {});
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
}).get();
@@ -869,6 +873,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
tmptr->remove_endpoint(endpoint);
}
tmptr->update_topology(endpoint, {});
tmptr->add_bootstrap_tokens(tokens, endpoint);
if (_gossiper.uses_host_id(endpoint)) {
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
@@ -961,6 +966,9 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
// Update pending ranges after update of normal tokens immediately to avoid
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
if (!is_member) {
tmptr->update_topology(endpoint, {});
}
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
co_await update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint));
co_await replicate_to_all_cores(std::move(tmptr));
@@ -1009,6 +1017,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
// FIXME: this code should probably resolve token collisions too, like handle_state_normal
slogger.info("Node {} state jump to leaving", endpoint);
tmptr->update_topology(endpoint, {});
co_await tmptr->update_normal_tokens(tokens, endpoint);
} else {
auto tokens_ = tmptr->get_tokens(endpoint);
@@ -1395,6 +1404,7 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service,
// entry has been mistakenly added, delete it
_sys_ks.local().remove_endpoint(ep).get();
} else {
tmptr->update_topology(ep, {});
tmptr->update_normal_tokens(tokens, ep).get();
if (loaded_host_ids.contains(ep)) {
tmptr->update_host_id(loaded_host_ids.at(ep), ep);

View File

@@ -208,6 +208,7 @@ void simple_test() {
// Initialize the token_metadata
stm.mutate_token_metadata([&endpoint_tokens] (token_metadata& tm) -> future<> {
for (auto&& i : endpoint_tokens) {
tm.update_topology(i.first, {});
co_await tm.update_normal_tokens(std::move(i.second), i.first);
}
}).get();
@@ -312,6 +313,7 @@ void heavy_origin_test() {
stm.mutate_token_metadata([&tokens] (token_metadata& tm) -> future<> {
for (auto&& i : tokens) {
tm.update_topology(i.first, {});
co_await tm.update_normal_tokens(std::move(i.second), i.first);
}
}).get();
@@ -625,6 +627,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
stm.mutate_token_metadata([&endpoint_tokens] (token_metadata& tm) -> future<> {
for (auto&& i : endpoint_tokens) {
tm.update_topology(i.first, {});
co_await tm.update_normal_tokens(std::move(i.second), i.first);
}
}).get();

View File

@@ -55,6 +55,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
{
// Ring with minimum token
auto tmptr = locator::make_token_metadata_ptr();
tmptr->update_topology(gms::inet_address("10.0.0.1"), {});
tmptr->update_normal_tokens(std::unordered_set<dht::token>({dht::minimum_token()}), gms::inet_address("10.0.0.1")).get();
check(tmptr, dht::partition_range::make_singular(ring[0]), {
@@ -68,7 +69,9 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
{
auto tmptr = locator::make_token_metadata_ptr();
tmptr->update_topology(gms::inet_address("10.0.0.1"), {});
tmptr->update_normal_tokens(std::unordered_set<dht::token>({ring[2].token()}), gms::inet_address("10.0.0.1")).get();
tmptr->update_topology(gms::inet_address("10.0.0.2"), {});
tmptr->update_normal_tokens(std::unordered_set<dht::token>({ring[5].token()}), gms::inet_address("10.0.0.2")).get();
check(tmptr, dht::partition_range::make_singular(ring[0]), {