From adfc8f83469ac4250181e32ea01882aa042fe7b2 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sat, 25 Dec 2021 09:28:43 +0300 Subject: [PATCH] service: storage_service: coroutinize `handle_state_normal` Signed-off-by: Pavel Solodovnikov --- service/storage_service.cc | 31 ++++++++++++++----------------- service/storage_service.hh | 2 +- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index dad3a0c2fe..f98382d5a6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -860,14 +860,14 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) { co_await replicate_to_all_cores(std::move(tmptr)); } -void storage_service::handle_state_normal(inet_address endpoint) { +future<> storage_service::handle_state_normal(inet_address endpoint) { slogger.debug("endpoint={} handle_state_normal", endpoint); auto tokens = get_tokens_for(endpoint); slogger.debug("Node {} state normal, token {}", endpoint, tokens); - auto tmlock = std::make_unique(get_token_metadata_lock().get0()); - auto tmptr = get_mutable_token_metadata_ptr().get0(); + auto tmlock = std::make_unique(co_await get_token_metadata_lock()); + auto tmptr = co_await get_mutable_token_metadata_ptr(); if (tmptr->is_member(endpoint)) { slogger.info("Node {} state jump to normal", endpoint); } @@ -944,9 +944,9 @@ void storage_service::handle_state_normal(inet_address endpoint) { // Update pending ranges after update of normal tokens immediately to avoid // a race where natural endpoint was updated to contain node A, but A was // not yet removed from pending endpoints - tmptr->update_normal_tokens(owned_tokens, endpoint).get(); - update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint)).get(); - replicate_to_all_cores(std::move(tmptr)).get(); + co_await tmptr->update_normal_tokens(owned_tokens, endpoint); + co_await update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint)); + co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); for (auto ep : endpoints_to_remove) { @@ -954,20 +954,17 @@ void storage_service::handle_state_normal(inet_address endpoint) { } slogger.debug("handle_state_normal: endpoint={} owned_tokens = {}", endpoint, owned_tokens); if (!owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) { - update_peer_info(endpoint).get(); - db::system_keyspace::update_tokens(endpoint, owned_tokens).then_wrapped([endpoint] (auto&& f) { - try { - f.get(); - } catch (...) { - slogger.error("handle_state_normal: fail to update tokens for {}: {}", endpoint, std::current_exception()); - } - return make_ready_future<>(); - }).get(); + co_await update_peer_info(endpoint); + try { + co_await db::system_keyspace::update_tokens(endpoint, owned_tokens); + } catch (...) { + slogger.error("handle_state_normal: fail to update tokens for {}: {}", endpoint, std::current_exception()); + } } // Send joined notification only when this node was not a member prior to this if (!is_member) { - notify_joined(endpoint).get(); + co_await notify_joined(endpoint); } if (slogger.is_enabled(logging::log_level::debug)) { @@ -1154,7 +1151,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta co_await handle_state_bootstrap(endpoint); } else if (move_name == sstring(versioned_value::STATUS_NORMAL) || move_name == sstring(versioned_value::SHUTDOWN)) { - handle_state_normal(endpoint); + co_await handle_state_normal(endpoint); } else if (move_name == sstring(versioned_value::REMOVING_TOKEN) || move_name == sstring(versioned_value::REMOVED_TOKEN)) { handle_state_removing(endpoint, pieces); diff --git a/service/storage_service.hh b/service/storage_service.hh index 2812bcf65b..a61b6d7d0f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -586,7 +586,7 @@ private: * * @param endpoint node */ - void handle_state_normal(inet_address endpoint); + future<> handle_state_normal(inet_address endpoint); /** * Handle node preparing to leave the ring