From b46ebd4fe53cc85bc780cdc99142cda9b7c8a0da Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sat, 25 Dec 2021 09:14:18 +0300 Subject: [PATCH] service: storage_service: coroutinize `do_update_system_peers_table` Signed-off-by: Pavel Solodovnikov --- service/storage_service.cc | 23 +++++++++++------------ service/storage_service.hh | 2 +- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 3fe545ec6f..e7d517a9e2 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1176,7 +1176,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta co_return; } if (get_token_metadata().is_member(endpoint)) { - do_update_system_peers_table(endpoint, state, value); + co_await do_update_system_peers_table(endpoint, state, value); if (state == application_state::RPC_READY) { slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready()); co_await notify_cql_change(endpoint, ep_state->is_cql_ready()); @@ -1218,15 +1218,14 @@ static future<> update_table(gms::inet_address endpoint, sstring col, T value) { } } -// Runs inside seastar::async context -void storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) { +future<> storage_service::do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value) { slogger.debug("Update system.peers table: endpoint={}, app_state={}, versioned_value={}", endpoint, state, value); if (state == application_state::RELEASE_VERSION) { - update_table(endpoint, "release_version", value.value).get(); + co_await update_table(endpoint, "release_version", value.value); } else if (state == application_state::DC) { - update_table(endpoint, "data_center", value.value).get(); + co_await update_table(endpoint, "data_center", value.value); } else if (state == application_state::RACK) { - update_table(endpoint, "rack", value.value).get(); + co_await update_table(endpoint, "rack", value.value); } else if (state == application_state::RPC_ADDRESS) { auto col = sstring("rpc_address"); inet_address ep; @@ -1234,15 +1233,15 @@ void storage_service::do_update_system_peers_table(gms::inet_address endpoint, c ep = gms::inet_address(value.value); } catch (...) { slogger.error("fail to update {} for {}: invalid rcpaddr {}", col, endpoint, value.value); - return; + co_return; } - update_table(endpoint, col, ep.addr()).get(); + co_await update_table(endpoint, col, ep.addr()); } else if (state == application_state::SCHEMA) { - update_table(endpoint, "schema_version", utils::UUID(value.value)).get(); + co_await update_table(endpoint, "schema_version", utils::UUID(value.value)); } else if (state == application_state::HOST_ID) { - update_table(endpoint, "host_id", utils::UUID(value.value)).get(); + co_await update_table(endpoint, "host_id", utils::UUID(value.value)); } else if (state == application_state::SUPPORTED_FEATURES) { - update_table(endpoint, "supported_features", value.value).get(); + co_await update_table(endpoint, "supported_features", value.value); } } @@ -1256,7 +1255,7 @@ void storage_service::update_peer_info(gms::inet_address endpoint) { for (auto& entry : ep_state->get_application_state_map()) { auto& app_state = entry.first; auto& value = entry.second; - do_update_system_peers_table(endpoint, app_state, value); + do_update_system_peers_table(endpoint, app_state, value).get(); } } diff --git a/service/storage_service.hh b/service/storage_service.hh index 86a5ffb3a8..3603ba2c73 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -563,7 +563,7 @@ public: virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {} private: void update_peer_info(inet_address endpoint); - void do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value); + future<> do_update_system_peers_table(gms::inet_address endpoint, const application_state& state, const versioned_value& value); std::unordered_set get_tokens_for(inet_address endpoint); private: