node_ops: extract classes related to node operations
Node operations will be integrated with task manager and so node_ops directory needs to be created. To have an access to node ops related classes from task manager and preserve consistent naming, move the classes to node_ops/node_ops_data.cc.
This commit is contained in:
@@ -1101,6 +1101,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'rust/wasmtime_bindings/src/lib.rs',
|
||||
'utils/to_string.cc',
|
||||
'service/topology_state_machine.cc',
|
||||
'node_ops/node_ops_ctl.cc'
|
||||
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
|
||||
+ scylla_raft_core
|
||||
)
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "range.hh"
|
||||
#include "frozen_schema.hh"
|
||||
#include "repair/repair.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "utils/digest_algorithm.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "service/paxos/prepare_response.hh"
|
||||
|
||||
206
node_ops/node_ops_ctl.cc
Normal file
206
node_ops/node_ops_ctl.cc
Normal file
@@ -0,0 +1,206 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
static logging::logger nlogger("node_ops");
|
||||
|
||||
node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid)
|
||||
: ss(ss_)
|
||||
, host_id(id)
|
||||
, endpoint(ep)
|
||||
, tmptr(ss.get_token_metadata_ptr())
|
||||
, req(cmd, uuid)
|
||||
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
|
||||
{}
|
||||
|
||||
node_ops_ctl::~node_ops_ctl() {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping");
|
||||
}
|
||||
}
|
||||
|
||||
const node_ops_id& node_ops_ctl::uuid() const noexcept {
|
||||
return req.ops_uuid;
|
||||
}
|
||||
|
||||
// may be called multiple times
|
||||
void node_ops_ctl::start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node) {
|
||||
desc = std::move(desc_);
|
||||
|
||||
nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
|
||||
|
||||
refresh_sync_nodes(std::move(sync_to_node));
|
||||
}
|
||||
|
||||
void node_ops_ctl::refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node) {
|
||||
// sync data with all normal token owners
|
||||
sync_nodes.clear();
|
||||
const auto& topo = tmptr->get_topology();
|
||||
auto can_sync_with_node = [] (const locator::node& node) {
|
||||
// Sync with reachable token owners.
|
||||
// Note that although nodes in `being_replaced` and `being_removed`
|
||||
// are still token owners, they are known to be dead and can't be sync'ed with.
|
||||
switch (node.get_state()) {
|
||||
case locator::node::state::normal:
|
||||
case locator::node::state::being_decommissioned:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
};
|
||||
topo.for_each_node([&] (const locator::node* np) {
|
||||
seastar::thread::maybe_yield();
|
||||
// FIXME: use node* rather than endpoint
|
||||
auto node = np->endpoint();
|
||||
if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) {
|
||||
sync_nodes.insert(node);
|
||||
}
|
||||
});
|
||||
|
||||
for (auto& node : sync_nodes) {
|
||||
if (!ss.gossiper().is_alive(node)) {
|
||||
nodes_down.emplace(node);
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
|
||||
nlogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::stop() noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
}
|
||||
|
||||
// Caller should set the required req members before prepare
|
||||
future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept {
|
||||
return send_to_all(cmd);
|
||||
}
|
||||
|
||||
void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error(nlogger, "heartbeat_updater already started");
|
||||
}
|
||||
heartbeat_updater_done_fut = heartbeat_updater(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::query_pending_op() {
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
|
||||
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
|
||||
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::stop_heartbeat_updater() noexcept {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
as.request_abort();
|
||||
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
|
||||
nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
|
||||
try {
|
||||
co_await abort(cmd);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
|
||||
req.cmd = cmd;
|
||||
req.ignore_nodes = boost::copy_range<std::list<gms::inet_address>>(ignore_nodes);
|
||||
sstring op_desc = ::format("{}", cmd);
|
||||
nlogger.info("{}[{}]: Started {}", desc, uuid(), req);
|
||||
auto cmd_category = categorize_node_ops_cmd(cmd);
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
|
||||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
|
||||
// Note that we still send abort commands to failed nodes.
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
|
||||
} catch (const seastar::rpc::unknown_verb_error&) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
|
||||
} else {
|
||||
nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
|
||||
}
|
||||
nodes_unknown_verb.emplace(node);
|
||||
} catch (const seastar::rpc::closed_error&) {
|
||||
nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
|
||||
nodes_down.emplace(node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
|
||||
nodes_failed.emplace(node);
|
||||
}
|
||||
});
|
||||
std::vector<sstring> errors;
|
||||
if (!nodes_failed.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
|
||||
}
|
||||
if (!nodes_unknown_verb.empty()) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
|
||||
} else {
|
||||
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
|
||||
}
|
||||
if (!errors.empty()) {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
|
||||
}
|
||||
nlogger.info("{}[{}]: Finished {}", desc, uuid(), req);
|
||||
}
|
||||
|
||||
future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
|
||||
nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
|
||||
while (!as.abort_requested()) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
|
||||
};
|
||||
});
|
||||
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
|
||||
}
|
||||
nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
|
||||
}
|
||||
163
node_ops/node_ops_ctl.hh
Normal file
163
node_ops/node_ops_ctl.hh
Normal file
@@ -0,0 +1,163 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "dht/token.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "node_ops/id.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
|
||||
#include <list>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace service {
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class token_metadata;
|
||||
}
|
||||
|
||||
class node_ops_info {
|
||||
public:
|
||||
node_ops_id ops_uuid;
|
||||
shared_ptr<abort_source> as;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
|
||||
public:
|
||||
node_ops_info(node_ops_id ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
|
||||
node_ops_info(const node_ops_info&) = delete;
|
||||
node_ops_info(node_ops_info&&) = delete;
|
||||
|
||||
void check_abort();
|
||||
};
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
replace_prepare,
|
||||
replace_prepare_mark_alive,
|
||||
replace_prepare_pending_ranges,
|
||||
replace_heartbeat,
|
||||
replace_abort,
|
||||
replace_done,
|
||||
decommission_prepare,
|
||||
decommission_heartbeat,
|
||||
decommission_abort,
|
||||
decommission_done,
|
||||
bootstrap_prepare,
|
||||
bootstrap_heartbeat,
|
||||
bootstrap_abort,
|
||||
bootstrap_done,
|
||||
query_pending_ops,
|
||||
repair_updater,
|
||||
};
|
||||
|
||||
enum class node_ops_cmd_category {
|
||||
prepare,
|
||||
heartbeat,
|
||||
sync_data,
|
||||
abort,
|
||||
done,
|
||||
other
|
||||
};
|
||||
|
||||
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd);
|
||||
|
||||
// The cmd and ops_uuid are mandatory for each request.
|
||||
// The ignore_nodes and leaving_node are optional.
|
||||
struct node_ops_cmd_request {
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_cmd cmd;
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_id ops_uuid;
|
||||
// Optional field, list nodes to ignore, set by all cmds
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
// Optional field, list leaving nodes, set by decommission and removenode cmd
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
// Optional field, map existing nodes to replacing nodes, set by replace cmd
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
|
||||
// Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
|
||||
// Optional field, list uuids of tables being repaired, set by repair cmd
|
||||
std::list<table_id> repair_tables;
|
||||
node_ops_cmd_request(node_ops_cmd command,
|
||||
node_ops_id uuid,
|
||||
std::list<gms::inet_address> ignore = {},
|
||||
std::list<gms::inet_address> leaving = {},
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace = {},
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap = {},
|
||||
std::list<table_id> tables = {})
|
||||
: cmd(command)
|
||||
, ops_uuid(std::move(uuid))
|
||||
, ignore_nodes(std::move(ignore))
|
||||
, leaving_nodes(std::move(leaving))
|
||||
, replace_nodes(std::move(replace))
|
||||
, bootstrap_nodes(std::move(bootstrap))
|
||||
, repair_tables(std::move(tables)) {
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req);
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
// Mandatory field, set by all cmds
|
||||
bool ok;
|
||||
// Optional field, set by query_pending_ops cmd
|
||||
std::list<node_ops_id> pending_ops;
|
||||
node_ops_cmd_response(bool o, std::list<node_ops_id> pending = {})
|
||||
: ok(o)
|
||||
, pending_ops(std::move(pending)) {
|
||||
}
|
||||
};
|
||||
|
||||
class node_ops_ctl {
|
||||
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
||||
std::unordered_set<gms::inet_address> nodes_down;
|
||||
std::unordered_set<gms::inet_address> nodes_failed;
|
||||
|
||||
public:
|
||||
const service::storage_service& ss;
|
||||
sstring desc;
|
||||
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
|
||||
gms::inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
|
||||
lw_shared_ptr<const locator::token_metadata> tmptr;
|
||||
std::unordered_set<gms::inet_address> sync_nodes;
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
node_ops_cmd_request req;
|
||||
std::chrono::seconds heartbeat_interval;
|
||||
abort_source as;
|
||||
std::optional<future<>> heartbeat_updater_done_fut;
|
||||
|
||||
explicit node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id());
|
||||
~node_ops_ctl();
|
||||
const node_ops_id& uuid() const noexcept;
|
||||
// may be called multiple times
|
||||
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; });
|
||||
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; });
|
||||
future<> stop() noexcept;
|
||||
// Caller should set the required req members before prepare
|
||||
future<> prepare(node_ops_cmd cmd) noexcept;
|
||||
void start_heartbeat_updater(node_ops_cmd cmd);
|
||||
future<> query_pending_op();
|
||||
future<> stop_heartbeat_updater() noexcept;
|
||||
future<> done(node_ops_cmd cmd) noexcept;
|
||||
future<> abort(node_ops_cmd cmd) noexcept;
|
||||
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept;
|
||||
future<> send_to_all(node_ops_cmd cmd);
|
||||
future<> heartbeat_updater(node_ops_cmd cmd);
|
||||
};
|
||||
@@ -85,20 +85,6 @@ struct repair_uniq_id {
|
||||
};
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
class node_ops_info {
|
||||
public:
|
||||
node_ops_id ops_uuid;
|
||||
shared_ptr<abort_source> as;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
|
||||
public:
|
||||
node_ops_info(node_ops_id ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
|
||||
node_ops_info(const node_ops_info&) = delete;
|
||||
node_ops_info(node_ops_info&&) = delete;
|
||||
|
||||
void check_abort();
|
||||
};
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
// operation.
|
||||
// repair_start() starts the requested repair on this node. It returns an
|
||||
@@ -249,91 +235,6 @@ enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo);
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
replace_prepare,
|
||||
replace_prepare_mark_alive,
|
||||
replace_prepare_pending_ranges,
|
||||
replace_heartbeat,
|
||||
replace_abort,
|
||||
replace_done,
|
||||
decommission_prepare,
|
||||
decommission_heartbeat,
|
||||
decommission_abort,
|
||||
decommission_done,
|
||||
bootstrap_prepare,
|
||||
bootstrap_heartbeat,
|
||||
bootstrap_abort,
|
||||
bootstrap_done,
|
||||
query_pending_ops,
|
||||
repair_updater,
|
||||
};
|
||||
|
||||
enum class node_ops_cmd_category {
|
||||
prepare,
|
||||
heartbeat,
|
||||
sync_data,
|
||||
abort,
|
||||
done,
|
||||
other
|
||||
};
|
||||
|
||||
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd);
|
||||
|
||||
// The cmd and ops_uuid are mandatory for each request.
|
||||
// The ignore_nodes and leaving_node are optional.
|
||||
struct node_ops_cmd_request {
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_cmd cmd;
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_id ops_uuid;
|
||||
// Optional field, list nodes to ignore, set by all cmds
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
// Optional field, list leaving nodes, set by decommission and removenode cmd
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
// Optional field, map existing nodes to replacing nodes, set by replace cmd
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
|
||||
// Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
|
||||
// Optional field, list uuids of tables being repaired, set by repair cmd
|
||||
std::list<table_id> repair_tables;
|
||||
node_ops_cmd_request(node_ops_cmd command,
|
||||
node_ops_id uuid,
|
||||
std::list<gms::inet_address> ignore = {},
|
||||
std::list<gms::inet_address> leaving = {},
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace = {},
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap = {},
|
||||
std::list<table_id> tables = {})
|
||||
: cmd(command)
|
||||
, ops_uuid(std::move(uuid))
|
||||
, ignore_nodes(std::move(ignore))
|
||||
, leaving_nodes(std::move(leaving))
|
||||
, replace_nodes(std::move(replace))
|
||||
, bootstrap_nodes(std::move(bootstrap))
|
||||
, repair_tables(std::move(tables)) {
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const node_ops_cmd_request& req);
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
// Mandatory field, set by all cmds
|
||||
bool ok;
|
||||
// Optional field, set by query_pending_ops cmd
|
||||
std::list<node_ops_id> pending_ops;
|
||||
node_ops_cmd_response(bool o, std::list<node_ops_id> pending = {})
|
||||
: ok(o)
|
||||
, pending_ops(std::move(pending)) {
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct repair_update_system_table_request {
|
||||
tasks::task_id repair_uuid;
|
||||
table_id table_uuid;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "repair/repair.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
|
||||
|
||||
@@ -84,6 +84,7 @@
|
||||
#include "service/raft/raft_address_map.hh"
|
||||
#include "protocol_server.hh"
|
||||
#include "types/set.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
@@ -4111,213 +4112,6 @@ future<> storage_service::stop_gossiping() {
|
||||
});
|
||||
}
|
||||
|
||||
class node_ops_ctl {
|
||||
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
||||
std::unordered_set<gms::inet_address> nodes_down;
|
||||
std::unordered_set<gms::inet_address> nodes_failed;
|
||||
|
||||
public:
|
||||
const storage_service& ss;
|
||||
sstring desc;
|
||||
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
|
||||
inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
|
||||
locator::token_metadata_ptr tmptr;
|
||||
std::unordered_set<gms::inet_address> sync_nodes;
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
node_ops_cmd_request req;
|
||||
std::chrono::seconds heartbeat_interval;
|
||||
abort_source as;
|
||||
std::optional<future<>> heartbeat_updater_done_fut;
|
||||
|
||||
explicit node_ops_ctl(const storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id())
|
||||
: ss(ss_)
|
||||
, host_id(id)
|
||||
, endpoint(ep)
|
||||
, tmptr(ss.get_token_metadata_ptr())
|
||||
, req(cmd, uuid)
|
||||
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
|
||||
{}
|
||||
|
||||
~node_ops_ctl() {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error_noexcept(slogger, "node_ops_ctl destroyed without stopping");
|
||||
}
|
||||
}
|
||||
|
||||
const node_ops_id& uuid() const noexcept {
|
||||
return req.ops_uuid;
|
||||
}
|
||||
|
||||
// may be called multiple times
|
||||
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
|
||||
desc = std::move(desc_);
|
||||
|
||||
slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
|
||||
|
||||
refresh_sync_nodes(std::move(sync_to_node));
|
||||
}
|
||||
|
||||
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
|
||||
// sync data with all normal token owners
|
||||
sync_nodes.clear();
|
||||
const auto& topo = tmptr->get_topology();
|
||||
auto can_sync_with_node = [] (const locator::node& node) {
|
||||
// Sync with reachable token owners.
|
||||
// Note that although nodes in `being_replaced` and `being_removed`
|
||||
// are still token owners, they are known to be dead and can't be sync'ed with.
|
||||
switch (node.get_state()) {
|
||||
case locator::node::state::normal:
|
||||
case locator::node::state::being_decommissioned:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
};
|
||||
topo.for_each_node([&] (const locator::node* np) {
|
||||
seastar::thread::maybe_yield();
|
||||
// FIXME: use node* rather than endpoint
|
||||
auto node = np->endpoint();
|
||||
if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) {
|
||||
sync_nodes.insert(node);
|
||||
}
|
||||
});
|
||||
|
||||
for (auto& node : sync_nodes) {
|
||||
if (!ss.gossiper().is_alive(node)) {
|
||||
nodes_down.emplace(node);
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
|
||||
}
|
||||
|
||||
future<> stop() noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
}
|
||||
|
||||
// Caller should set the required req members before prepare
|
||||
future<> prepare(node_ops_cmd cmd) noexcept {
|
||||
return send_to_all(cmd);
|
||||
}
|
||||
|
||||
void start_heartbeat_updater(node_ops_cmd cmd) {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
on_internal_error(slogger, "heartbeat_updater already started");
|
||||
}
|
||||
heartbeat_updater_done_fut = heartbeat_updater(cmd);
|
||||
}
|
||||
|
||||
future<> query_pending_op() {
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
|
||||
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
slogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
|
||||
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> stop_heartbeat_updater() noexcept {
|
||||
if (heartbeat_updater_done_fut) {
|
||||
as.request_abort();
|
||||
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
|
||||
}
|
||||
}
|
||||
|
||||
future<> done(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> abort(node_ops_cmd cmd) noexcept {
|
||||
co_await stop_heartbeat_updater();
|
||||
co_await send_to_all(cmd);
|
||||
}
|
||||
|
||||
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
|
||||
slogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
|
||||
try {
|
||||
co_await abort(cmd);
|
||||
} catch (...) {
|
||||
slogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
|
||||
future<> send_to_all(node_ops_cmd cmd) {
|
||||
req.cmd = cmd;
|
||||
req.ignore_nodes = boost::copy_range<std::list<gms::inet_address>>(ignore_nodes);
|
||||
sstring op_desc = ::format("{}", cmd);
|
||||
slogger.info("{}[{}]: Started {}", desc, uuid(), req);
|
||||
auto cmd_category = categorize_node_ops_cmd(cmd);
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
|
||||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
|
||||
// Note that we still send abort commands to failed nodes.
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
slogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
|
||||
} catch (const seastar::rpc::unknown_verb_error&) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
slogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
|
||||
} else {
|
||||
slogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
|
||||
}
|
||||
nodes_unknown_verb.emplace(node);
|
||||
} catch (const seastar::rpc::closed_error&) {
|
||||
slogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
|
||||
nodes_down.emplace(node);
|
||||
} catch (...) {
|
||||
slogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
|
||||
nodes_failed.emplace(node);
|
||||
}
|
||||
});
|
||||
std::vector<sstring> errors;
|
||||
if (!nodes_failed.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
|
||||
}
|
||||
if (!nodes_unknown_verb.empty()) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
|
||||
} else {
|
||||
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
|
||||
}
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
|
||||
}
|
||||
if (!errors.empty()) {
|
||||
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
|
||||
}
|
||||
slogger.info("{}[{}]: Finished {}", desc, uuid(), req);
|
||||
}
|
||||
|
||||
future<> heartbeat_updater(node_ops_cmd cmd) {
|
||||
slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
|
||||
while (!as.abort_requested()) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
slogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
|
||||
} catch (...) {
|
||||
slogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
|
||||
};
|
||||
});
|
||||
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
|
||||
}
|
||||
slogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
|
||||
}
|
||||
};
|
||||
|
||||
static
|
||||
void on_streaming_finished() {
|
||||
utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get();
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
|
||||
class node_ops_cmd_request;
|
||||
class node_ops_cmd_response;
|
||||
struct node_ops_ctl;
|
||||
class node_ops_info;
|
||||
enum class node_ops_cmd : uint32_t;
|
||||
class repair_service;
|
||||
@@ -90,7 +91,6 @@ class raft_group0;
|
||||
enum class disk_error { regular, commit };
|
||||
|
||||
class node_ops_meta_data;
|
||||
struct node_ops_ctl;
|
||||
|
||||
/**
|
||||
* This abstraction contains the token/identifier of this node
|
||||
@@ -224,7 +224,7 @@ private:
|
||||
return _gossiper;
|
||||
};
|
||||
|
||||
friend struct node_ops_ctl;
|
||||
friend struct ::node_ops_ctl;
|
||||
public:
|
||||
|
||||
locator::effective_replication_map_factory& get_erm_factory() noexcept {
|
||||
|
||||
Reference in New Issue
Block a user