service: storage_service: coroutinize handle_state_normal

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2021-12-25 09:28:43 +03:00
parent ba113439de
commit adfc8f8346
2 changed files with 15 additions and 18 deletions

View File

@@ -860,14 +860,14 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
co_await replicate_to_all_cores(std::move(tmptr));
}
void storage_service::handle_state_normal(inet_address endpoint) {
future<> storage_service::handle_state_normal(inet_address endpoint) {
slogger.debug("endpoint={} handle_state_normal", endpoint);
auto tokens = get_tokens_for(endpoint);
slogger.debug("Node {} state normal, token {}", endpoint, tokens);
auto tmlock = std::make_unique<token_metadata_lock>(get_token_metadata_lock().get0());
auto tmptr = get_mutable_token_metadata_ptr().get0();
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (tmptr->is_member(endpoint)) {
slogger.info("Node {} state jump to normal", endpoint);
}
@@ -944,9 +944,9 @@ void storage_service::handle_state_normal(inet_address endpoint) {
// Update pending ranges after update of normal tokens immediately to avoid
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
tmptr->update_normal_tokens(owned_tokens, endpoint).get();
update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint)).get();
replicate_to_all_cores(std::move(tmptr)).get();
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
co_await update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint));
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
for (auto ep : endpoints_to_remove) {
@@ -954,20 +954,17 @@ void storage_service::handle_state_normal(inet_address endpoint) {
}
slogger.debug("handle_state_normal: endpoint={} owned_tokens = {}", endpoint, owned_tokens);
if (!owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
update_peer_info(endpoint).get();
db::system_keyspace::update_tokens(endpoint, owned_tokens).then_wrapped([endpoint] (auto&& f) {
try {
f.get();
} catch (...) {
slogger.error("handle_state_normal: fail to update tokens for {}: {}", endpoint, std::current_exception());
}
return make_ready_future<>();
}).get();
co_await update_peer_info(endpoint);
try {
co_await db::system_keyspace::update_tokens(endpoint, owned_tokens);
} catch (...) {
slogger.error("handle_state_normal: fail to update tokens for {}: {}", endpoint, std::current_exception());
}
}
// Send joined notification only when this node was not a member prior to this
if (!is_member) {
notify_joined(endpoint).get();
co_await notify_joined(endpoint);
}
if (slogger.is_enabled(logging::log_level::debug)) {
@@ -1154,7 +1151,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
co_await handle_state_bootstrap(endpoint);
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
move_name == sstring(versioned_value::SHUTDOWN)) {
handle_state_normal(endpoint);
co_await handle_state_normal(endpoint);
} else if (move_name == sstring(versioned_value::REMOVING_TOKEN) ||
move_name == sstring(versioned_value::REMOVED_TOKEN)) {
handle_state_removing(endpoint, pieces);

View File

@@ -586,7 +586,7 @@ private:
*
* @param endpoint node
*/
void handle_state_normal(inet_address endpoint);
future<> handle_state_normal(inet_address endpoint);
/**
* Handle node preparing to leave the ring