diff --git a/configure.py b/configure.py index ee8b9fa009..c80cc0e2b8 100755 --- a/configure.py +++ b/configure.py @@ -1101,6 +1101,7 @@ scylla_core = (['message/messaging_service.cc', 'rust/wasmtime_bindings/src/lib.rs', 'utils/to_string.cc', 'service/topology_state_machine.cc', + 'node_ops/node_ops_ctl.cc' ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \ + scylla_raft_core ) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 12d01843e9..c87f65460d 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -30,6 +30,7 @@ #include "range.hh" #include "frozen_schema.hh" #include "repair/repair.hh" +#include "node_ops/node_ops_ctl.hh" #include "utils/digest_algorithm.hh" #include "service/paxos/proposal.hh" #include "service/paxos/prepare_response.hh" diff --git a/node_ops/node_ops_ctl.cc b/node_ops/node_ops_ctl.cc new file mode 100644 index 0000000000..9db04329ab --- /dev/null +++ b/node_ops/node_ops_ctl.cc @@ -0,0 +1,206 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "db/config.hh" +#include "gms/gossiper.hh" +#include "message/messaging_service.hh" +#include "node_ops/node_ops_ctl.hh" +#include "service/storage_service.hh" + +#include +#include + +static logging::logger nlogger("node_ops"); + +node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid) + : ss(ss_) + , host_id(id) + , endpoint(ep) + , tmptr(ss.get_token_metadata_ptr()) + , req(cmd, uuid) + , heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds()) +{} + +node_ops_ctl::~node_ops_ctl() { + if (heartbeat_updater_done_fut) { + on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping"); + } +} + +const node_ops_id& node_ops_ctl::uuid() const noexcept { + return req.ops_uuid; +} + +// may be called multiple times +void node_ops_ctl::start(sstring desc_, std::function sync_to_node) { + desc = std::move(desc_); + + nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint); + + refresh_sync_nodes(std::move(sync_to_node)); +} + +void node_ops_ctl::refresh_sync_nodes(std::function sync_to_node) { + // sync data with all normal token owners + sync_nodes.clear(); + const auto& topo = tmptr->get_topology(); + auto can_sync_with_node = [] (const locator::node& node) { + // Sync with reachable token owners. + // Note that although nodes in `being_replaced` and `being_removed` + // are still token owners, they are known to be dead and can't be sync'ed with. + switch (node.get_state()) { + case locator::node::state::normal: + case locator::node::state::being_decommissioned: + return true; + default: + return false; + } + }; + topo.for_each_node([&] (const locator::node* np) { + seastar::thread::maybe_yield(); + // FIXME: use node* rather than endpoint + auto node = np->endpoint(); + if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) { + sync_nodes.insert(node); + } + }); + + for (auto& node : sync_nodes) { + if (!ss.gossiper().is_alive(node)) { + nodes_down.emplace(node); + } + } + if (!nodes_down.empty()) { + auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc); + nlogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes); +} + +future<> node_ops_ctl::stop() noexcept { + co_await stop_heartbeat_updater(); +} + +// Caller should set the required req members before prepare +future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept { + return send_to_all(cmd); +} + +void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) { + if (heartbeat_updater_done_fut) { + on_internal_error(nlogger, "heartbeat_updater already started"); + } + heartbeat_updater_done_fut = heartbeat_updater(cmd); +} + +future<> node_ops_ctl::query_pending_op() { + req.cmd = node_ops_cmd::query_pending_ops; + co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> { + auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops); + if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) { + throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node)); + } + }); +} + +future<> node_ops_ctl::stop_heartbeat_updater() noexcept { + if (heartbeat_updater_done_fut) { + as.request_abort(); + co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt); + } +} + +future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept { + co_await stop_heartbeat_updater(); + co_await send_to_all(cmd); +} + +future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept { + co_await stop_heartbeat_updater(); + co_await send_to_all(cmd); +} + +future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept { + nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex); + try { + co_await abort(cmd); + } catch (...) { + nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception()); + } + co_await coroutine::return_exception_ptr(std::move(ex)); +} + +future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) { + req.cmd = cmd; + req.ignore_nodes = boost::copy_range>(ignore_nodes); + sstring op_desc = ::format("{}", cmd); + nlogger.info("{}[{}]: Started {}", desc, uuid(), req); + auto cmd_category = categorize_node_ops_cmd(cmd); + co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || + (nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) { + // Note that we still send abort commands to failed nodes. + co_return; + } + try { + co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node); + } catch (const seastar::rpc::unknown_verb_error&) { + if (cmd_category == node_ops_cmd_category::prepare) { + nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc); + } else { + nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc); + } + nodes_unknown_verb.emplace(node); + } catch (const seastar::rpc::closed_error&) { + nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc); + nodes_down.emplace(node); + } catch (...) { + nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception()); + nodes_failed.emplace(node); + } + }); + std::vector errors; + if (!nodes_failed.empty()) { + errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed)); + } + if (!nodes_unknown_verb.empty()) { + if (cmd_category == node_ops_cmd_category::prepare) { + errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb)); + } else { + errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb)); + } + } + if (!nodes_down.empty()) { + errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down)); + } + if (!errors.empty()) { + co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; ")))); + } + nlogger.info("{}[{}]: Finished {}", desc, uuid(), req); +} + +future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) { + nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count()); + while (!as.abort_requested()) { + auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}}; + co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { + try { + co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node); + } catch (...) { + nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node); + }; + }); + co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {}); + } + nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid()); +} \ No newline at end of file diff --git a/node_ops/node_ops_ctl.hh b/node_ops/node_ops_ctl.hh new file mode 100644 index 0000000000..cc2d9dfdab --- /dev/null +++ b/node_ops/node_ops_ctl.hh @@ -0,0 +1,163 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/token.hh" +#include "gms/inet_address.hh" +#include "locator/host_id.hh" +#include "node_ops/id.hh" +#include "schema/schema_fwd.hh" + +#include + +#include +#include + +namespace service { +class storage_service; +} + +namespace locator { +class token_metadata; +} + +class node_ops_info { +public: + node_ops_id ops_uuid; + shared_ptr as; + std::list ignore_nodes; + +public: + node_ops_info(node_ops_id ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept; + node_ops_info(const node_ops_info&) = delete; + node_ops_info(node_ops_info&&) = delete; + + void check_abort(); +}; + +enum class node_ops_cmd : uint32_t { + removenode_prepare, + removenode_heartbeat, + removenode_sync_data, + removenode_abort, + removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, + decommission_prepare, + decommission_heartbeat, + decommission_abort, + decommission_done, + bootstrap_prepare, + bootstrap_heartbeat, + bootstrap_abort, + bootstrap_done, + query_pending_ops, + repair_updater, +}; + +enum class node_ops_cmd_category { + prepare, + heartbeat, + sync_data, + abort, + done, + other +}; + +node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept; + +std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd); + +// The cmd and ops_uuid are mandatory for each request. +// The ignore_nodes and leaving_node are optional. +struct node_ops_cmd_request { + // Mandatory field, set by all cmds + node_ops_cmd cmd; + // Mandatory field, set by all cmds + node_ops_id ops_uuid; + // Optional field, list nodes to ignore, set by all cmds + std::list ignore_nodes; + // Optional field, list leaving nodes, set by decommission and removenode cmd + std::list leaving_nodes; + // Optional field, map existing nodes to replacing nodes, set by replace cmd + std::unordered_map replace_nodes; + // Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd + std::unordered_map> bootstrap_nodes; + // Optional field, list uuids of tables being repaired, set by repair cmd + std::list repair_tables; + node_ops_cmd_request(node_ops_cmd command, + node_ops_id uuid, + std::list ignore = {}, + std::list leaving = {}, + std::unordered_map replace = {}, + std::unordered_map> bootstrap = {}, + std::list tables = {}) + : cmd(command) + , ops_uuid(std::move(uuid)) + , ignore_nodes(std::move(ignore)) + , leaving_nodes(std::move(leaving)) + , replace_nodes(std::move(replace)) + , bootstrap_nodes(std::move(bootstrap)) + , repair_tables(std::move(tables)) { + } +}; + +std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req); + +struct node_ops_cmd_response { + // Mandatory field, set by all cmds + bool ok; + // Optional field, set by query_pending_ops cmd + std::list pending_ops; + node_ops_cmd_response(bool o, std::list pending = {}) + : ok(o) + , pending_ops(std::move(pending)) { + } +}; + +class node_ops_ctl { + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_failed; + +public: + const service::storage_service& ss; + sstring desc; + locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node) + gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node) + lw_shared_ptr tmptr; + std::unordered_set sync_nodes; + std::unordered_set ignore_nodes; + node_ops_cmd_request req; + std::chrono::seconds heartbeat_interval; + abort_source as; + std::optional> heartbeat_updater_done_fut; + + explicit node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id()); + ~node_ops_ctl(); + const node_ops_id& uuid() const noexcept; + // may be called multiple times + void start(sstring desc_, std::function sync_to_node = [] (gms::inet_address) { return true; }); + void refresh_sync_nodes(std::function sync_to_node = [] (gms::inet_address) { return true; }); + future<> stop() noexcept; + // Caller should set the required req members before prepare + future<> prepare(node_ops_cmd cmd) noexcept; + void start_heartbeat_updater(node_ops_cmd cmd); + future<> query_pending_op(); + future<> stop_heartbeat_updater() noexcept; + future<> done(node_ops_cmd cmd) noexcept; + future<> abort(node_ops_cmd cmd) noexcept; + future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept; + future<> send_to_all(node_ops_cmd cmd); + future<> heartbeat_updater(node_ops_cmd cmd); +}; \ No newline at end of file diff --git a/repair/repair.hh b/repair/repair.hh index 648beec94c..91f5a7324b 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -85,20 +85,6 @@ struct repair_uniq_id { }; std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x); -class node_ops_info { -public: - node_ops_id ops_uuid; - shared_ptr as; - std::list ignore_nodes; - -public: - node_ops_info(node_ops_id ops_uuid_, shared_ptr as_, std::list&& ignore_nodes_) noexcept; - node_ops_info(const node_ops_info&) = delete; - node_ops_info(node_ops_info&&) = delete; - - void check_abort(); -}; - // NOTE: repair_start() can be run on any node, but starts a node-global // operation. // repair_start() starts the requested repair on this node. It returns an @@ -249,91 +235,6 @@ enum class row_level_diff_detect_algorithm : uint8_t { std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo); -enum class node_ops_cmd : uint32_t { - removenode_prepare, - removenode_heartbeat, - removenode_sync_data, - removenode_abort, - removenode_done, - replace_prepare, - replace_prepare_mark_alive, - replace_prepare_pending_ranges, - replace_heartbeat, - replace_abort, - replace_done, - decommission_prepare, - decommission_heartbeat, - decommission_abort, - decommission_done, - bootstrap_prepare, - bootstrap_heartbeat, - bootstrap_abort, - bootstrap_done, - query_pending_ops, - repair_updater, -}; - -enum class node_ops_cmd_category { - prepare, - heartbeat, - sync_data, - abort, - done, - other -}; - -node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept; - -std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd); - -// The cmd and ops_uuid are mandatory for each request. -// The ignore_nodes and leaving_node are optional. -struct node_ops_cmd_request { - // Mandatory field, set by all cmds - node_ops_cmd cmd; - // Mandatory field, set by all cmds - node_ops_id ops_uuid; - // Optional field, list nodes to ignore, set by all cmds - std::list ignore_nodes; - // Optional field, list leaving nodes, set by decommission and removenode cmd - std::list leaving_nodes; - // Optional field, map existing nodes to replacing nodes, set by replace cmd - std::unordered_map replace_nodes; - // Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd - std::unordered_map> bootstrap_nodes; - // Optional field, list uuids of tables being repaired, set by repair cmd - std::list repair_tables; - node_ops_cmd_request(node_ops_cmd command, - node_ops_id uuid, - std::list ignore = {}, - std::list leaving = {}, - std::unordered_map replace = {}, - std::unordered_map> bootstrap = {}, - std::list tables = {}) - : cmd(command) - , ops_uuid(std::move(uuid)) - , ignore_nodes(std::move(ignore)) - , leaving_nodes(std::move(leaving)) - , replace_nodes(std::move(replace)) - , bootstrap_nodes(std::move(bootstrap)) - , repair_tables(std::move(tables)) { - } -}; - -std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req); - -struct node_ops_cmd_response { - // Mandatory field, set by all cmds - bool ok; - // Optional field, set by query_pending_ops cmd - std::list pending_ops; - node_ops_cmd_response(bool o, std::list pending = {}) - : ok(o) - , pending_ops(std::move(pending)) { - } -}; - - struct repair_update_system_table_request { tasks::task_id repair_uuid; table_id table_uuid; diff --git a/repair/task_manager_module.hh b/repair/task_manager_module.hh index 810cc29efa..3cf234281c 100644 --- a/repair/task_manager_module.hh +++ b/repair/task_manager_module.hh @@ -8,6 +8,7 @@ #pragma once +#include "node_ops/node_ops_ctl.hh" #include "repair/repair.hh" #include "tasks/task_manager.hh" diff --git a/service/storage_service.cc b/service/storage_service.cc index 0e176304c4..c32baa91e2 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -84,6 +84,7 @@ #include "service/raft/raft_address_map.hh" #include "protocol_server.hh" #include "types/set.hh" +#include "node_ops/node_ops_ctl.hh" #include #include @@ -4111,213 +4112,6 @@ future<> storage_service::stop_gossiping() { }); } -class node_ops_ctl { - std::unordered_set nodes_unknown_verb; - std::unordered_set nodes_down; - std::unordered_set nodes_failed; - -public: - const storage_service& ss; - sstring desc; - locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node) - inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node) - locator::token_metadata_ptr tmptr; - std::unordered_set sync_nodes; - std::unordered_set ignore_nodes; - node_ops_cmd_request req; - std::chrono::seconds heartbeat_interval; - abort_source as; - std::optional> heartbeat_updater_done_fut; - - explicit node_ops_ctl(const storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id()) - : ss(ss_) - , host_id(id) - , endpoint(ep) - , tmptr(ss.get_token_metadata_ptr()) - , req(cmd, uuid) - , heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds()) - {} - - ~node_ops_ctl() { - if (heartbeat_updater_done_fut) { - on_internal_error_noexcept(slogger, "node_ops_ctl destroyed without stopping"); - } - } - - const node_ops_id& uuid() const noexcept { - return req.ops_uuid; - } - - // may be called multiple times - void start(sstring desc_, std::function sync_to_node = [] (gms::inet_address) { return true; }) { - desc = std::move(desc_); - - slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint); - - refresh_sync_nodes(std::move(sync_to_node)); - } - - void refresh_sync_nodes(std::function sync_to_node = [] (gms::inet_address) { return true; }) { - // sync data with all normal token owners - sync_nodes.clear(); - const auto& topo = tmptr->get_topology(); - auto can_sync_with_node = [] (const locator::node& node) { - // Sync with reachable token owners. - // Note that although nodes in `being_replaced` and `being_removed` - // are still token owners, they are known to be dead and can't be sync'ed with. - switch (node.get_state()) { - case locator::node::state::normal: - case locator::node::state::being_decommissioned: - return true; - default: - return false; - } - }; - topo.for_each_node([&] (const locator::node* np) { - seastar::thread::maybe_yield(); - // FIXME: use node* rather than endpoint - auto node = np->endpoint(); - if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) { - sync_nodes.insert(node); - } - }); - - for (auto& node : sync_nodes) { - if (!ss.gossiper().is_alive(node)) { - nodes_down.emplace(node); - } - } - if (!nodes_down.empty()) { - auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc); - slogger.warn("{}", msg); - throw std::runtime_error(msg); - } - - slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes); - } - - future<> stop() noexcept { - co_await stop_heartbeat_updater(); - } - - // Caller should set the required req members before prepare - future<> prepare(node_ops_cmd cmd) noexcept { - return send_to_all(cmd); - } - - void start_heartbeat_updater(node_ops_cmd cmd) { - if (heartbeat_updater_done_fut) { - on_internal_error(slogger, "heartbeat_updater already started"); - } - heartbeat_updater_done_fut = heartbeat_updater(cmd); - } - - future<> query_pending_op() { - req.cmd = node_ops_cmd::query_pending_ops; - co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> { - auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); - slogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops); - if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) { - throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node)); - } - }); - } - - future<> stop_heartbeat_updater() noexcept { - if (heartbeat_updater_done_fut) { - as.request_abort(); - co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt); - } - } - - future<> done(node_ops_cmd cmd) noexcept { - co_await stop_heartbeat_updater(); - co_await send_to_all(cmd); - } - - future<> abort(node_ops_cmd cmd) noexcept { - co_await stop_heartbeat_updater(); - co_await send_to_all(cmd); - } - - future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept { - slogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex); - try { - co_await abort(cmd); - } catch (...) { - slogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception()); - } - co_await coroutine::return_exception_ptr(std::move(ex)); - } - - future<> send_to_all(node_ops_cmd cmd) { - req.cmd = cmd; - req.ignore_nodes = boost::copy_range>(ignore_nodes); - sstring op_desc = ::format("{}", cmd); - slogger.info("{}[{}]: Started {}", desc, uuid(), req); - auto cmd_category = categorize_node_ops_cmd(cmd); - co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { - if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || - (nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) { - // Note that we still send abort commands to failed nodes. - co_return; - } - try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); - slogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node); - } catch (const seastar::rpc::unknown_verb_error&) { - if (cmd_category == node_ops_cmd_category::prepare) { - slogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc); - } else { - slogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc); - } - nodes_unknown_verb.emplace(node); - } catch (const seastar::rpc::closed_error&) { - slogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc); - nodes_down.emplace(node); - } catch (...) { - slogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception()); - nodes_failed.emplace(node); - } - }); - std::vector errors; - if (!nodes_failed.empty()) { - errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed)); - } - if (!nodes_unknown_verb.empty()) { - if (cmd_category == node_ops_cmd_category::prepare) { - errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb)); - } else { - errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb)); - } - } - if (!nodes_down.empty()) { - errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down)); - } - if (!errors.empty()) { - co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; ")))); - } - slogger.info("{}[{}]: Finished {}", desc, uuid(), req); - } - - future<> heartbeat_updater(node_ops_cmd cmd) { - slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count()); - while (!as.abort_requested()) { - auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}}; - co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { - try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); - slogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node); - } catch (...) { - slogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node); - }; - }); - co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {}); - } - slogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid()); - } -}; - static void on_streaming_finished() { utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get(); diff --git a/service/storage_service.hh b/service/storage_service.hh index af469977c8..47b15587ab 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -43,6 +43,7 @@ class node_ops_cmd_request; class node_ops_cmd_response; +struct node_ops_ctl; class node_ops_info; enum class node_ops_cmd : uint32_t; class repair_service; @@ -90,7 +91,6 @@ class raft_group0; enum class disk_error { regular, commit }; class node_ops_meta_data; -struct node_ops_ctl; /** * This abstraction contains the token/identifier of this node @@ -224,7 +224,7 @@ private: return _gossiper; }; - friend struct node_ops_ctl; + friend struct ::node_ops_ctl; public: locator::effective_replication_map_factory& get_erm_factory() noexcept {