storage_service: add node_ops_ctl class to formalize all node_ops flow

All node operations we currently support go through
similar basic flow and may add some op-specific logic
around it.

1. Select the nodes to sync with (this is op specific).
2. hearbeat updater
3. send prepare req
4. perform the body of the node operation
5. send done
--
on any error: send abort

node_ops_ctl formalizes all those steps and makes
sure errors are handled in all steps, and
the error causing abort is not masked by errors
in the abort processing, and is propagated upstream.

Some of the printouts repeat the node operation description
to remain backward compatible so not to break dtests
that wait for them.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-02-09 16:52:28 +02:00
parent f3d6868738
commit d322bbf6ff
4 changed files with 218 additions and 0 deletions

View File

@@ -1832,6 +1832,40 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
}
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept {
switch (cmd) {
case node_ops_cmd::removenode_prepare:
case node_ops_cmd::replace_prepare:
case node_ops_cmd::decommission_prepare:
case node_ops_cmd::bootstrap_prepare:
return node_ops_cmd_category::prepare;
case node_ops_cmd::removenode_heartbeat:
case node_ops_cmd::replace_heartbeat:
case node_ops_cmd::decommission_heartbeat:
case node_ops_cmd::bootstrap_heartbeat:
return node_ops_cmd_category::heartbeat;
case node_ops_cmd::removenode_sync_data:
return node_ops_cmd_category::sync_data;
case node_ops_cmd::removenode_abort:
case node_ops_cmd::replace_abort:
case node_ops_cmd::decommission_abort:
case node_ops_cmd::bootstrap_abort:
return node_ops_cmd_category::abort;
case node_ops_cmd::removenode_done:
case node_ops_cmd::replace_done:
case node_ops_cmd::decommission_done:
case node_ops_cmd::bootstrap_done:
return node_ops_cmd_category::done;
default:
return node_ops_cmd_category::other;
}
}
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd) {
switch (cmd) {
case node_ops_cmd::removenode_prepare:

View File

@@ -271,6 +271,17 @@ enum class node_ops_cmd : uint32_t {
repair_updater,
};
enum class node_ops_cmd_category {
prepare,
heartbeat,
sync_data,
abort,
done,
other
};
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept;
std::ostream& operator<<(std::ostream& out, node_ops_cmd cmd);
// The cmd and ops_uuid are mandatory for each request.

View File

@@ -1908,6 +1908,168 @@ future<> storage_service::do_stop_ms() {
});
}
class node_ops_ctl {
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
std::unordered_set<gms::inet_address> nodes_failed;
public:
const storage_service& ss;
sstring desc;
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
std::unordered_set<gms::inet_address> sync_nodes;
std::unordered_set<gms::inet_address> ignore_nodes;
node_ops_cmd_request req;
std::chrono::seconds heartbeat_interval;
abort_source as;
std::optional<future<>> heartbeat_updater_done_fut;
explicit node_ops_ctl(const storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid = node_ops_id::create_random_id())
: ss(ss_)
, host_id(id)
, endpoint(ep)
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
~node_ops_ctl() {
if (heartbeat_updater_done_fut) {
on_internal_error_noexcept(slogger, "node_ops_ctl destroyed without stopping");
}
}
const node_ops_id& uuid() const noexcept {
return req.ops_uuid;
}
void start(sstring desc_) {
desc = std::move(desc_);
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
}
}
if (!nodes_down.empty()) {
auto msg = format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
slogger.info("{}[{}]: Started {} operation: node={}/{}, sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
}
future<> stop() noexcept {
co_await stop_heartbeat_updater();
}
// Caller should set the required req members before prepare
future<> prepare(node_ops_cmd cmd) noexcept {
return send_to_all(cmd);
}
void start_heartbeat_updater(node_ops_cmd cmd) {
if (heartbeat_updater_done_fut) {
on_internal_error(slogger, "heartbeat_updater already started");
}
heartbeat_updater_done_fut = heartbeat_updater(cmd);
}
future<> stop_heartbeat_updater() noexcept {
if (heartbeat_updater_done_fut) {
as.request_abort();
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
}
}
future<> done(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> abort(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
slogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
try {
co_await abort(cmd);
} catch (...) {
slogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
future<> send_to_all(node_ops_cmd cmd) {
req.cmd = cmd;
req.ignore_nodes = boost::copy_range<std::list<gms::inet_address>>(ignore_nodes);
sstring op_desc = format("{}", cmd);
slogger.info("{}[{}]: Started {}", desc, uuid(), req);
auto cmd_category = categorize_node_ops_cmd(cmd);
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
// Note that we still send abort commands to failed nodes.
co_return;
}
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
slogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
} catch (const seastar::rpc::unknown_verb_error&) {
if (cmd_category == node_ops_cmd_category::prepare) {
slogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
} else {
slogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
}
nodes_unknown_verb.emplace(node);
} catch (const seastar::rpc::closed_error&) {
slogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), op_desc, node);
nodes_down.emplace(node);
} catch (...) {
slogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
nodes_failed.emplace(node);
}
});
std::vector<sstring> errors;
if (!nodes_failed.empty()) {
errors.emplace_back(format("The {} command failed for nodes={}", op_desc, nodes_failed));
}
if (!nodes_unknown_verb.empty()) {
if (cmd_category == node_ops_cmd_category::prepare) {
errors.emplace_back(format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
} else {
errors.emplace_back(format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
}
}
if (!nodes_down.empty()) {
errors.emplace_back(format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_failed));
}
if (!errors.empty()) {
co_await coroutine::return_exception(std::runtime_error(join("; ", errors)));
}
slogger.info("{}[{}]: Finished {}", desc, uuid(), req);
}
future<> heartbeat_updater(node_ops_cmd cmd) {
slogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
while (!as.abort_requested()) {
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
try {
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
slogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
} catch (...) {
slogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
};
});
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
}
slogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
}
};
future<> storage_service::node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done) {
std::string ops;
if (cmd == node_ops_cmd::decommission_heartbeat) {

View File

@@ -115,6 +115,8 @@ public:
void cancel_watchdog();
};
struct node_ops_ctl;
/**
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
@@ -233,6 +235,15 @@ private:
return _batchlog_manager;
}
const gms::gossiper& gossiper() const noexcept {
return _gossiper;
};
gms::gossiper& gossiper() noexcept {
return _gossiper;
};
friend struct node_ops_ctl;
public:
locator::effective_replication_map_factory& get_erm_factory() noexcept {