diff --git a/repair/repair.cc b/repair/repair.cc index 72f6b62a47..a5aa105f77 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1832,6 +1832,40 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes)); } +node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept { + switch (cmd) { + case node_ops_cmd::removenode_prepare: + case node_ops_cmd::replace_prepare: + case node_ops_cmd::decommission_prepare: + case node_ops_cmd::bootstrap_prepare: + return node_ops_cmd_category::prepare; + + case node_ops_cmd::removenode_heartbeat: + case node_ops_cmd::replace_heartbeat: + case node_ops_cmd::decommission_heartbeat: + case node_ops_cmd::bootstrap_heartbeat: + return node_ops_cmd_category::heartbeat; + + case node_ops_cmd::removenode_sync_data: + return node_ops_cmd_category::sync_data; + + case node_ops_cmd::removenode_abort: + case node_ops_cmd::replace_abort: + case node_ops_cmd::decommission_abort: + case node_ops_cmd::bootstrap_abort: + return node_ops_cmd_category::abort; + + case node_ops_cmd::removenode_done: + case node_ops_cmd::replace_done: + case node_ops_cmd::decommission_done: + case node_ops_cmd::bootstrap_done: + return node_ops_cmd_category::done; + + default: + return node_ops_cmd_category::other; + } +} + std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd) { switch (cmd) { case node_ops_cmd::removenode_prepare: diff --git a/repair/repair.hh b/repair/repair.hh index deca79ed39..c9b7a5ea4c 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -271,6 +271,17 @@ enum class node_ops_cmd : uint32_t { repair_updater, }; +enum class node_ops_cmd_category { + prepare, + heartbeat, + sync_data, + abort, + done, + other +}; + +node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept; + std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd); // The cmd and ops_uuid are mandatory for each request. diff --git a/service/storage_service.cc b/service/storage_service.cc index ff65ee39da..99052d3201 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1908,6 +1908,168 @@ future<> storage_service::do_stop_ms() { }); } +class node_ops_ctl { + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_failed; + +public: + const storage_service& ss; + sstring desc; + locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node) + inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node) + std::unordered_set sync_nodes; + std::unordered_set ignore_nodes; + node_ops_cmd_request req; + std::chrono::seconds heartbeat_interval; + abort_source as; + std::optional> heartbeat_updater_done_fut; + + explicit node_ops_ctl(const storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id()) + : ss(ss_) + , host_id(id) + , endpoint(ep) + , req(cmd, uuid) + , heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds()) + {} + + ~node_ops_ctl() { + if (heartbeat_updater_done_fut) { + on_internal_error_noexcept(slogger, "node_ops_ctl destroyed without stopping"); + } + } + + const node_ops_id& uuid() const noexcept { + return req.ops_uuid; + } + + void start(sstring desc_) { + desc = std::move(desc_); + for (auto& node : sync_nodes) { + if (!ss.gossiper().is_alive(node)) { + nodes_down.emplace(node); + } + } + if (!nodes_down.empty()) { + auto msg = format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + slogger.info("{}[{}]: Started {} operation: node={}/{}, sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes); + } + + future<> stop() noexcept { + co_await stop_heartbeat_updater(); + } + + // Caller should set the required req members before prepare + future<> prepare(node_ops_cmd cmd) noexcept { + return send_to_all(cmd); + } + + void start_heartbeat_updater(node_ops_cmd cmd) { + if (heartbeat_updater_done_fut) { + on_internal_error(slogger, "heartbeat_updater already started"); + } + heartbeat_updater_done_fut = heartbeat_updater(cmd); + } + + future<> stop_heartbeat_updater() noexcept { + if (heartbeat_updater_done_fut) { + as.request_abort(); + co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt); + } + } + + future<> done(node_ops_cmd cmd) noexcept { + co_await stop_heartbeat_updater(); + co_await send_to_all(cmd); + } + + future<> abort(node_ops_cmd cmd) noexcept { + co_await stop_heartbeat_updater(); + co_await send_to_all(cmd); + } + + future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept { + slogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex); + try { + co_await abort(cmd); + } catch (...) { + slogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception()); + } + co_await coroutine::return_exception_ptr(std::move(ex)); + } + + future<> send_to_all(node_ops_cmd cmd) { + req.cmd = cmd; + req.ignore_nodes = boost::copy_range>(ignore_nodes); + sstring op_desc = format("{}", cmd); + slogger.info("{}[{}]: Started {}", desc, uuid(), req); + auto cmd_category = categorize_node_ops_cmd(cmd); + co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || + (nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) { + // Note that we still send abort commands to failed nodes. + co_return; + } + try { + co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + slogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node); + } catch (const seastar::rpc::unknown_verb_error&) { + if (cmd_category == node_ops_cmd_category::prepare) { + slogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc); + } else { + slogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc); + } + nodes_unknown_verb.emplace(node); + } catch (const seastar::rpc::closed_error&) { + slogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), op_desc, node); + nodes_down.emplace(node); + } catch (...) { + slogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception()); + nodes_failed.emplace(node); + } + }); + std::vector errors; + if (!nodes_failed.empty()) { + errors.emplace_back(format("The {} command failed for nodes={}", op_desc, nodes_failed)); + } + if (!nodes_unknown_verb.empty()) { + if (cmd_category == node_ops_cmd_category::prepare) { + errors.emplace_back(format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb)); + } else { + errors.emplace_back(format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb)); + } + } + if (!nodes_down.empty()) { + errors.emplace_back(format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_failed)); + } + if (!errors.empty()) { + co_await coroutine::return_exception(std::runtime_error(join("; ", errors))); + } + slogger.info("{}[{}]: Finished {}", desc, uuid(), req); + } + + future<> heartbeat_updater(node_ops_cmd cmd) { + slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count()); + while (!as.abort_requested()) { + auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}}; + co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { + try { + co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + slogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node); + } catch (...) { + slogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node); + }; + }); + co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {}); + } + slogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid()); + } +}; + future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done) { std::string ops; if (cmd == node_ops_cmd::decommission_heartbeat) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 67e63db092..79b437278d 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -115,6 +115,8 @@ public: void cancel_watchdog(); }; +struct node_ops_ctl; + /** * This abstraction contains the token/identifier of this node * on the identifier space. This token gets gossiped around. @@ -233,6 +235,15 @@ private: return _batchlog_manager; } + const gms::gossiper& gossiper() const noexcept { + return _gossiper; + }; + + gms::gossiper& gossiper() noexcept { + return _gossiper; + }; + + friend struct node_ops_ctl; public: locator::effective_replication_map_factory& get_erm_factory() noexcept {