From 4af27ca653ffec2b8ff7e4f1a81ea07bdc8ff23b Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Fri, 22 Apr 2022 11:01:26 +0300 Subject: [PATCH] service: storage_service: coroutinize `node_ops_cmd_heartbeat_updater()` Also, pass `node_ops_cmd` by value to get rid of lifetime issues when converting to coroutine. Signed-off-by: Pavel Solodovnikov --- service/storage_service.cc | 60 ++++++++++++++++++-------------------- service/storage_service.hh | 2 +- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 4c3495bdbb..f138ae77ab 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1793,38 +1793,36 @@ future<> storage_service::do_stop_ms() { }); } -future<> storage_service::node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done) { - return seastar::async([this, cmd, uuid, nodes = std::move(nodes), heartbeat_updater_done] { - std::string ops; - if (cmd == node_ops_cmd::decommission_heartbeat) { - ops = "decommission"; - } else if (cmd == node_ops_cmd::removenode_heartbeat) { - ops = "removenode"; - } else if (cmd == node_ops_cmd::replace_heartbeat) { - ops = "replace"; - } else if (cmd == node_ops_cmd::bootstrap_heartbeat) { - ops = "bootstrap"; - } else { - throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported")); +future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done) { + std::string ops; + if (cmd == node_ops_cmd::decommission_heartbeat) { + ops = "decommission"; + } else if (cmd == node_ops_cmd::removenode_heartbeat) { + ops = "removenode"; + } else if (cmd == node_ops_cmd::replace_heartbeat) { + ops = "replace"; + } else if (cmd == node_ops_cmd::bootstrap_heartbeat) { + ops = "bootstrap"; + } else { + throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported")); + } + slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}}; + co_await parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) { + slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([ops, uuid] (std::exception_ptr ep) { + slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep); + }); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + co_await sleep(std::chrono::seconds(1)); } - slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid); - while (!(*heartbeat_updater_done)) { - auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}}; - parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) { - return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) { - slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node); - return make_ready_future<>(); - }); - }).handle_exception([ops, uuid] (std::exception_ptr ep) { - slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep); - }).get(); - int nr_seconds = 10; - while (!(*heartbeat_updater_done) && nr_seconds--) { - sleep(std::chrono::seconds(1)).get(); - } - } - slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid); - }); + } + slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid); } future<> storage_service::decommission() { diff --git a/service/storage_service.hh b/service/storage_service.hh index 9d6e26e071..35124fcba3 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -748,7 +748,7 @@ public: future<> removenode(sstring host_id_string, std::list ignore_nodes); future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); - future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done); + future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done); future get_operation_mode();