node_ops: extract classes related to node operations

Node operations will be integrated with task manager and so node_ops
directory needs to be created. To have an access to node ops related
classes from task manager and preserve consistent naming, move
the classes to node_ops/node_ops_data.cc.
This commit is contained in:
Aleksandra Martyniuk
2023-09-11 16:25:41 +02:00
parent e90e10112f
commit d0d0ad7aa4
8 changed files with 375 additions and 308 deletions

View File

@@ -1101,6 +1101,7 @@ scylla_core = (['message/messaging_service.cc',
'rust/wasmtime_bindings/src/lib.rs',
'utils/to_string.cc',
'service/topology_state_machine.cc',
'node_ops/node_ops_ctl.cc'
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)

View File

@@ -30,6 +30,7 @@
#include "range.hh"
#include "frozen_schema.hh"
#include "repair/repair.hh"
#include "node_ops/node_ops_ctl.hh"
#include "utils/digest_algorithm.hh"
#include "service/paxos/proposal.hh"
#include "service/paxos/prepare_response.hh"

206
node_ops/node_ops_ctl.cc Normal file
View File

@@ -0,0 +1,206 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "db/config.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "node_ops/node_ops_ctl.hh"
#include "service/storage_service.hh"
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
static logging::logger nlogger("node_ops");
node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid)
: ss(ss_)
, host_id(id)
, endpoint(ep)
, tmptr(ss.get_token_metadata_ptr())
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
node_ops_ctl::~node_ops_ctl() {
if (heartbeat_updater_done_fut) {
on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping");
}
}
const node_ops_id& node_ops_ctl::uuid() const noexcept {
return req.ops_uuid;
}
// may be called multiple times
void node_ops_ctl::start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node) {
desc = std::move(desc_);
nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
refresh_sync_nodes(std::move(sync_to_node));
}
void node_ops_ctl::refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node) {
// sync data with all normal token owners
sync_nodes.clear();
const auto& topo = tmptr->get_topology();
auto can_sync_with_node = [] (const locator::node& node) {
// Sync with reachable token owners.
// Note that although nodes in `being_replaced` and `being_removed`
// are still token owners, they are known to be dead and can't be sync'ed with.
switch (node.get_state()) {
case locator::node::state::normal:
case locator::node::state::being_decommissioned:
return true;
default:
return false;
}
};
topo.for_each_node([&] (const locator::node* np) {
seastar::thread::maybe_yield();
// FIXME: use node* rather than endpoint
auto node = np->endpoint();
if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) {
sync_nodes.insert(node);
}
});
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
}
}
if (!nodes_down.empty()) {
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
nlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
}
future<> node_ops_ctl::stop() noexcept {
co_await stop_heartbeat_updater();
}
// Caller should set the required req members before prepare
future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept {
return send_to_all(cmd);
}
void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
if (heartbeat_updater_done_fut) {
on_internal_error(nlogger, "heartbeat_updater already started");
}
heartbeat_updater_done_fut = heartbeat_updater(cmd);
}
future<> node_ops_ctl::query_pending_op() {
req.cmd = node_ops_cmd::query_pending_ops;
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
}
});
}
future<> node_ops_ctl::stop_heartbeat_updater() noexcept {
if (heartbeat_updater_done_fut) {
as.request_abort();
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
}
}
future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
try {
co_await abort(cmd);
} catch (...) {
nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
req.cmd = cmd;
req.ignore_nodes = boost::copy_range<std::list<gms::inet_address>>(ignore_nodes);
sstring op_desc = ::format("{}", cmd);
nlogger.info("{}[{}]: Started {}", desc, uuid(), req);
auto cmd_category = categorize_node_ops_cmd(cmd);
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
// Note that we still send abort commands to failed nodes.
co_return;
}
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
} catch (const seastar::rpc::unknown_verb_error&) {
if (cmd_category == node_ops_cmd_category::prepare) {
nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
} else {
nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
}
nodes_unknown_verb.emplace(node);
} catch (const seastar::rpc::closed_error&) {
nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
nodes_down.emplace(node);
} catch (...) {
nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
nodes_failed.emplace(node);
}
});
std::vector<sstring> errors;
if (!nodes_failed.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
}
if (!nodes_unknown_verb.empty()) {
if (cmd_category == node_ops_cmd_category::prepare) {
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
} else {
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
}
}
if (!nodes_down.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
}
if (!errors.empty()) {
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
}
nlogger.info("{}[{}]: Finished {}", desc, uuid(), req);
}
future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
while (!as.abort_requested()) {
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
} catch (...) {
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
};
});
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
}
nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
}

163
node_ops/node_ops_ctl.hh Normal file
View File

@@ -0,0 +1,163 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "dht/token.hh"
#include "gms/inet_address.hh"
#include "locator/host_id.hh"
#include "node_ops/id.hh"
#include "schema/schema_fwd.hh"
#include <seastar/core/abort_source.hh>
#include <list>
#include <unordered_set>
namespace service {
class storage_service;
}
namespace locator {
class token_metadata;
}
class node_ops_info {
public:
node_ops_id ops_uuid;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
public:
node_ops_info(node_ops_id ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
node_ops_info(const node_ops_info&) = delete;
node_ops_info(node_ops_info&&) = delete;
void check_abort();
};
enum class node_ops_cmd : uint32_t {
removenode_prepare,
removenode_heartbeat,
removenode_sync_data,
removenode_abort,
removenode_done,
replace_prepare,
replace_prepare_mark_alive,
replace_prepare_pending_ranges,
replace_heartbeat,
replace_abort,
replace_done,
decommission_prepare,
decommission_heartbeat,
decommission_abort,
decommission_done,
bootstrap_prepare,
bootstrap_heartbeat,
bootstrap_abort,
bootstrap_done,
query_pending_ops,
repair_updater,
};
enum class node_ops_cmd_category {
prepare,
heartbeat,
sync_data,
abort,
done,
other
};
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd);
// The cmd and ops_uuid are mandatory for each request.
// The ignore_nodes and leaving_node are optional.
struct node_ops_cmd_request {
// Mandatory field, set by all cmds
node_ops_cmd cmd;
// Mandatory field, set by all cmds
node_ops_id ops_uuid;
// Optional field, list nodes to ignore, set by all cmds
std::list<gms::inet_address> ignore_nodes;
// Optional field, list leaving nodes, set by decommission and removenode cmd
std::list<gms::inet_address> leaving_nodes;
// Optional field, map existing nodes to replacing nodes, set by replace cmd
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
// Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
// Optional field, list uuids of tables being repaired, set by repair cmd
std::list<table_id> repair_tables;
node_ops_cmd_request(node_ops_cmd command,
node_ops_id uuid,
std::list<gms::inet_address> ignore = {},
std::list<gms::inet_address> leaving = {},
std::unordered_map<gms::inet_address, gms::inet_address> replace = {},
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap = {},
std::list<table_id> tables = {})
: cmd(command)
, ops_uuid(std::move(uuid))
, ignore_nodes(std::move(ignore))
, leaving_nodes(std::move(leaving))
, replace_nodes(std::move(replace))
, bootstrap_nodes(std::move(bootstrap))
, repair_tables(std::move(tables)) {
}
};
std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req);
struct node_ops_cmd_response {
// Mandatory field, set by all cmds
bool ok;
// Optional field, set by query_pending_ops cmd
std::list<node_ops_id> pending_ops;
node_ops_cmd_response(bool o, std::list<node_ops_id> pending = {})
: ok(o)
, pending_ops(std::move(pending)) {
}
};
class node_ops_ctl {
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
std::unordered_set<gms::inet_address> nodes_failed;
public:
const service::storage_service& ss;
sstring desc;
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
lw_shared_ptr<const locator::token_metadata> tmptr;
std::unordered_set<gms::inet_address> sync_nodes;
std::unordered_set<gms::inet_address> ignore_nodes;
node_ops_cmd_request req;
std::chrono::seconds heartbeat_interval;
abort_source as;
std::optional<future<>> heartbeat_updater_done_fut;
explicit node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id());
~node_ops_ctl();
const node_ops_id& uuid() const noexcept;
// may be called multiple times
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; });
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; });
future<> stop() noexcept;
// Caller should set the required req members before prepare
future<> prepare(node_ops_cmd cmd) noexcept;
void start_heartbeat_updater(node_ops_cmd cmd);
future<> query_pending_op();
future<> stop_heartbeat_updater() noexcept;
future<> done(node_ops_cmd cmd) noexcept;
future<> abort(node_ops_cmd cmd) noexcept;
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept;
future<> send_to_all(node_ops_cmd cmd);
future<> heartbeat_updater(node_ops_cmd cmd);
};

View File

@@ -85,20 +85,6 @@ struct repair_uniq_id {
};
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
class node_ops_info {
public:
node_ops_id ops_uuid;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
public:
node_ops_info(node_ops_id ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
node_ops_info(const node_ops_info&) = delete;
node_ops_info(node_ops_info&&) = delete;
void check_abort();
};
// NOTE: repair_start() can be run on any node, but starts a node-global
// operation.
// repair_start() starts the requested repair on this node. It returns an
@@ -249,91 +235,6 @@ enum class row_level_diff_detect_algorithm : uint8_t {
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
enum class node_ops_cmd : uint32_t {
removenode_prepare,
removenode_heartbeat,
removenode_sync_data,
removenode_abort,
removenode_done,
replace_prepare,
replace_prepare_mark_alive,
replace_prepare_pending_ranges,
replace_heartbeat,
replace_abort,
replace_done,
decommission_prepare,
decommission_heartbeat,
decommission_abort,
decommission_done,
bootstrap_prepare,
bootstrap_heartbeat,
bootstrap_abort,
bootstrap_done,
query_pending_ops,
repair_updater,
};
enum class node_ops_cmd_category {
prepare,
heartbeat,
sync_data,
abort,
done,
other
};
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd);
// The cmd and ops_uuid are mandatory for each request.
// The ignore_nodes and leaving_node are optional.
struct node_ops_cmd_request {
// Mandatory field, set by all cmds
node_ops_cmd cmd;
// Mandatory field, set by all cmds
node_ops_id ops_uuid;
// Optional field, list nodes to ignore, set by all cmds
std::list<gms::inet_address> ignore_nodes;
// Optional field, list leaving nodes, set by decommission and removenode cmd
std::list<gms::inet_address> leaving_nodes;
// Optional field, map existing nodes to replacing nodes, set by replace cmd
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
// Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
// Optional field, list uuids of tables being repaired, set by repair cmd
std::list<table_id> repair_tables;
node_ops_cmd_request(node_ops_cmd command,
node_ops_id uuid,
std::list<gms::inet_address> ignore = {},
std::list<gms::inet_address> leaving = {},
std::unordered_map<gms::inet_address, gms::inet_address> replace = {},
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap = {},
std::list<table_id> tables = {})
: cmd(command)
, ops_uuid(std::move(uuid))
, ignore_nodes(std::move(ignore))
, leaving_nodes(std::move(leaving))
, replace_nodes(std::move(replace))
, bootstrap_nodes(std::move(bootstrap))
, repair_tables(std::move(tables)) {
}
};
std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req);
struct node_ops_cmd_response {
// Mandatory field, set by all cmds
bool ok;
// Optional field, set by query_pending_ops cmd
std::list<node_ops_id> pending_ops;
node_ops_cmd_response(bool o, std::list<node_ops_id> pending = {})
: ok(o)
, pending_ops(std::move(pending)) {
}
};
struct repair_update_system_table_request {
tasks::task_id repair_uuid;
table_id table_uuid;

View File

@@ -8,6 +8,7 @@
#pragma once
#include "node_ops/node_ops_ctl.hh"
#include "repair/repair.hh"
#include "tasks/task_manager.hh"

View File

@@ -84,6 +84,7 @@
#include "service/raft/raft_address_map.hh"
#include "protocol_server.hh"
#include "types/set.hh"
#include "node_ops/node_ops_ctl.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
@@ -4111,213 +4112,6 @@ future<> storage_service::stop_gossiping() {
});
}
class node_ops_ctl {
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
std::unordered_set<gms::inet_address> nodes_failed;
public:
const storage_service& ss;
sstring desc;
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
locator::token_metadata_ptr tmptr;
std::unordered_set<gms::inet_address> sync_nodes;
std::unordered_set<gms::inet_address> ignore_nodes;
node_ops_cmd_request req;
std::chrono::seconds heartbeat_interval;
abort_source as;
std::optional<future<>> heartbeat_updater_done_fut;
explicit node_ops_ctl(const storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id())
: ss(ss_)
, host_id(id)
, endpoint(ep)
, tmptr(ss.get_token_metadata_ptr())
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
~node_ops_ctl() {
if (heartbeat_updater_done_fut) {
on_internal_error_noexcept(slogger, "node_ops_ctl destroyed without stopping");
}
}
const node_ops_id& uuid() const noexcept {
return req.ops_uuid;
}
// may be called multiple times
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
desc = std::move(desc_);
slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
refresh_sync_nodes(std::move(sync_to_node));
}
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
// sync data with all normal token owners
sync_nodes.clear();
const auto& topo = tmptr->get_topology();
auto can_sync_with_node = [] (const locator::node& node) {
// Sync with reachable token owners.
// Note that although nodes in `being_replaced` and `being_removed`
// are still token owners, they are known to be dead and can't be sync'ed with.
switch (node.get_state()) {
case locator::node::state::normal:
case locator::node::state::being_decommissioned:
return true;
default:
return false;
}
};
topo.for_each_node([&] (const locator::node* np) {
seastar::thread::maybe_yield();
// FIXME: use node* rather than endpoint
auto node = np->endpoint();
if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) {
sync_nodes.insert(node);
}
});
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
}
}
if (!nodes_down.empty()) {
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
}
future<> stop() noexcept {
co_await stop_heartbeat_updater();
}
// Caller should set the required req members before prepare
future<> prepare(node_ops_cmd cmd) noexcept {
return send_to_all(cmd);
}
void start_heartbeat_updater(node_ops_cmd cmd) {
if (heartbeat_updater_done_fut) {
on_internal_error(slogger, "heartbeat_updater already started");
}
heartbeat_updater_done_fut = heartbeat_updater(cmd);
}
future<> query_pending_op() {
req.cmd = node_ops_cmd::query_pending_ops;
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
slogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
}
});
}
future<> stop_heartbeat_updater() noexcept {
if (heartbeat_updater_done_fut) {
as.request_abort();
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
}
}
future<> done(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> abort(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
slogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
try {
co_await abort(cmd);
} catch (...) {
slogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
future<> send_to_all(node_ops_cmd cmd) {
req.cmd = cmd;
req.ignore_nodes = boost::copy_range<std::list<gms::inet_address>>(ignore_nodes);
sstring op_desc = ::format("{}", cmd);
slogger.info("{}[{}]: Started {}", desc, uuid(), req);
auto cmd_category = categorize_node_ops_cmd(cmd);
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
// Note that we still send abort commands to failed nodes.
co_return;
}
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
slogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
} catch (const seastar::rpc::unknown_verb_error&) {
if (cmd_category == node_ops_cmd_category::prepare) {
slogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
} else {
slogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
}
nodes_unknown_verb.emplace(node);
} catch (const seastar::rpc::closed_error&) {
slogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
nodes_down.emplace(node);
} catch (...) {
slogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
nodes_failed.emplace(node);
}
});
std::vector<sstring> errors;
if (!nodes_failed.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
}
if (!nodes_unknown_verb.empty()) {
if (cmd_category == node_ops_cmd_category::prepare) {
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
} else {
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
}
}
if (!nodes_down.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
}
if (!errors.empty()) {
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
}
slogger.info("{}[{}]: Finished {}", desc, uuid(), req);
}
future<> heartbeat_updater(node_ops_cmd cmd) {
slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
while (!as.abort_requested()) {
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
slogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
} catch (...) {
slogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
};
});
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
}
slogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
}
};
static
void on_streaming_finished() {
utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get();

View File

@@ -43,6 +43,7 @@
class node_ops_cmd_request;
class node_ops_cmd_response;
struct node_ops_ctl;
class node_ops_info;
enum class node_ops_cmd : uint32_t;
class repair_service;
@@ -90,7 +91,6 @@ class raft_group0;
enum class disk_error { regular, commit };
class node_ops_meta_data;
struct node_ops_ctl;
/**
* This abstraction contains the token/identifier of this node
@@ -224,7 +224,7 @@ private:
return _gossiper;
};
friend struct node_ops_ctl;
friend struct ::node_ops_ctl;
public:
locator::effective_replication_map_factory& get_erm_factory() noexcept {