service: storage_service: coroutinize node_ops_cmd_heartbeat_updater()

Also, pass `node_ops_cmd` by value to get rid of lifetime issues
when converting to coroutine.

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2022-04-22 11:01:26 +03:00
parent 654e6726d1
commit 4af27ca653
2 changed files with 30 additions and 32 deletions

View File

@@ -1793,38 +1793,36 @@ future<> storage_service::do_stop_ms() {
});
}
future<> storage_service::node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
return seastar::async([this, cmd, uuid, nodes = std::move(nodes), heartbeat_updater_done] {
std::string ops;
if (cmd == node_ops_cmd::decommission_heartbeat) {
ops = "decommission";
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
ops = "removenode";
} else if (cmd == node_ops_cmd::replace_heartbeat) {
ops = "replace";
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
ops = "bootstrap";
} else {
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
std::string ops;
if (cmd == node_ops_cmd::decommission_heartbeat) {
ops = "decommission";
} else if (cmd == node_ops_cmd::removenode_heartbeat) {
ops = "removenode";
} else if (cmd == node_ops_cmd::replace_heartbeat) {
ops = "replace";
} else if (cmd == node_ops_cmd::bootstrap_heartbeat) {
ops = "bootstrap";
} else {
throw std::runtime_error(format("node_ops_cmd_heartbeat_updater: node_ops_cmd is not supported"));
}
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
co_await parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
return make_ready_future<>();
});
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
});
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
co_await sleep(std::chrono::seconds(1));
}
slogger.info("{}[{}]: Started heartbeat_updater", ops, uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{cmd, uuid, {}, {}, {}};
parallel_for_each(nodes, [this, ops, uuid, &req] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([ops, uuid, node] (node_ops_cmd_response resp) {
slogger.debug("{}[{}]: Got heartbeat response from node={}", ops, uuid, node);
return make_ready_future<>();
});
}).handle_exception([ops, uuid] (std::exception_ptr ep) {
slogger.warn("{}[{}]: Failed to send heartbeat: {}", ops, uuid, ep);
}).get();
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
sleep(std::chrono::seconds(1)).get();
}
}
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
});
}
slogger.info("{}[{}]: Stopped heartbeat_updater", ops, uuid);
}
future<> storage_service::decommission() {

View File

@@ -748,7 +748,7 @@ public:
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
future<mode> get_operation_mode();