Files
scylladb/node_ops/node_ops_ctl.cc
Kefu Chai 5be39740a8 tree: migrate from boost::find to std::ranges algorithms
Replace boost::find() calls with std::ranges::find() and std::ranges::contains()
to leverage modern C++ standard library features. This change reduces external
dependencies and modernizes the codebase.

The following changes were made:
- Replaced boost::find() with std::ranges::find() where index/iterator is needed
- Used std::ranges::contains() for simple element presence checks

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>

Closes scylladb/scylladb#22920
2025-02-20 09:28:57 +03:00

211 lines
8.7 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/config.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "node_ops/node_ops_ctl.hh"
#include "service/storage_service.hh"
#include "replica/database.hh"
#include <fmt/ranges.h>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "idl/node_ops.dist.hh"
static logging::logger nlogger("node_ops");
node_ops_ctl::node_ops_ctl(const service::storage_service& ss_, node_ops_cmd cmd, locator::host_id id, gms::inet_address ep, node_ops_id uuid)
: ss(ss_)
, host_id(id)
, endpoint(ep)
, tmptr(ss.get_token_metadata_ptr())
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
node_ops_ctl::~node_ops_ctl() {
if (heartbeat_updater_done_fut) {
on_internal_error_noexcept(nlogger, "node_ops_ctl destroyed without stopping");
}
}
const node_ops_id& node_ops_ctl::uuid() const noexcept {
return req.ops_uuid;
}
// may be called multiple times
void node_ops_ctl::start(sstring desc_, std::function<bool(locator::host_id)> sync_to_node) {
desc = std::move(desc_);
nlogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
refresh_sync_nodes(std::move(sync_to_node));
}
void node_ops_ctl::refresh_sync_nodes(std::function<bool(locator::host_id)> sync_to_node) {
// sync data with all normal token owners
sync_nodes.clear();
auto can_sync_with_node = [] (const locator::node& node) {
// Sync with reachable token owners.
// Note that although nodes in `being_replaced` and `being_removed`
// are still token owners, they are known to be dead and can't be sync'ed with.
switch (node.get_state()) {
case locator::node::state::normal:
case locator::node::state::being_decommissioned:
return true;
default:
return false;
}
};
tmptr->for_each_token_owner([&] (const locator::node& node) {
seastar::thread::maybe_yield();
// FIXME: use node* rather than endpoint
auto endpoint = node.host_id();
if (!ignore_nodes.contains(endpoint) && can_sync_with_node(node) && sync_to_node(endpoint)) {
sync_nodes.insert(endpoint);
}
});
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
}
}
if (!nodes_down.empty()) {
auto msg = ::format("{}[{}]: Cannot start: nodes={} needed for {} operation are down. It is highly recommended to fix the down nodes and try again.", desc, uuid(), nodes_down, desc);
nlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
nlogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
}
future<> node_ops_ctl::stop() noexcept {
co_await stop_heartbeat_updater();
}
// Caller should set the required req members before prepare
future<> node_ops_ctl::prepare(node_ops_cmd cmd) noexcept {
return send_to_all(cmd);
}
void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
if (heartbeat_updater_done_fut) {
on_internal_error(nlogger, "heartbeat_updater already started");
}
heartbeat_updater_done_fut = heartbeat_updater(cmd);
}
future<> node_ops_ctl::query_pending_op() {
req.cmd = node_ops_cmd::query_pending_ops;
co_await coroutine::parallel_for_each(sync_nodes, [this] (const locator::host_id& node) -> future<> {
auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
if (std::ranges::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
}
});
}
future<> node_ops_ctl::stop_heartbeat_updater() noexcept {
if (heartbeat_updater_done_fut) {
as.request_abort();
co_await *std::exchange(heartbeat_updater_done_fut, std::nullopt);
}
}
future<> node_ops_ctl::done(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort(node_ops_cmd cmd) noexcept {
co_await stop_heartbeat_updater();
co_await send_to_all(cmd);
}
future<> node_ops_ctl::abort_on_error(node_ops_cmd cmd, std::exception_ptr ex) noexcept {
nlogger.error("{}[{}]: Operation failed, sync_nodes={}: {}", desc, uuid(), sync_nodes, ex);
try {
co_await abort(cmd);
} catch (...) {
nlogger.warn("{}[{}]: The {} command failed while handling a previous error, sync_nodes={}: {}. Ignoring", desc, uuid(), cmd, sync_nodes, std::current_exception());
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
req.cmd = cmd;
req.ignore_nodes = ignore_nodes |
std::views::transform([&] (locator::host_id id) { return ss.gossiper().get_address_map().get(id); }) |
std::ranges::to<std::list>();
sstring op_desc = ::format("{}", cmd);
nlogger.info("{}[{}]: Started {}", desc, uuid(), req);
auto cmd_category = categorize_node_ops_cmd(cmd);
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) ||
(nodes_failed.contains(node) && (cmd_category != node_ops_cmd_category::abort))) {
// Note that we still send abort commands to failed nodes.
co_return;
}
try {
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
} catch (const seastar::rpc::unknown_verb_error&) {
if (cmd_category == node_ops_cmd_category::prepare) {
nlogger.warn("{}[{}]: Node {} does not support the {} verb", desc, uuid(), node, op_desc);
} else {
nlogger.warn("{}[{}]: Node {} did not find ops_uuid={} or does not support the {} verb", desc, uuid(), node, uuid(), op_desc);
}
nodes_unknown_verb.emplace(node);
} catch (const seastar::rpc::closed_error&) {
nlogger.warn("{}[{}]: Node {} is down for {} verb", desc, uuid(), node, op_desc);
nodes_down.emplace(node);
} catch (...) {
nlogger.warn("{}[{}]: Node {} failed {} verb: {}", desc, uuid(), node, op_desc, std::current_exception());
nodes_failed.emplace(node);
}
});
std::vector<sstring> errors;
if (!nodes_failed.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}", op_desc, nodes_failed));
}
if (!nodes_unknown_verb.empty()) {
if (cmd_category == node_ops_cmd_category::prepare) {
errors.emplace_back(::format("The {} command is unsupported on nodes={}. Please upgrade your cluster and run operation again", op_desc, nodes_unknown_verb));
} else {
errors.emplace_back(::format("The ops_uuid={} was not found or the {} command is unsupported on nodes={}", uuid(), op_desc, nodes_unknown_verb));
}
}
if (!nodes_down.empty()) {
errors.emplace_back(::format("The {} command failed for nodes={}: the needed nodes are down. It is highly recommended to fix the down nodes and try again", op_desc, nodes_down));
}
if (!errors.empty()) {
co_await coroutine::return_exception(std::runtime_error(fmt::to_string(fmt::join(errors, "; "))));
}
nlogger.info("{}[{}]: Finished {}", desc, uuid(), req);
}
future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
nlogger.info("{}[{}]: Started heartbeat_updater (interval={}s)", desc, uuid(), heartbeat_interval.count());
while (!as.abort_requested()) {
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
co_await coroutine::parallel_for_each(sync_nodes, [&] (const locator::host_id& node) -> future<> {
try {
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), node, req);
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
} catch (...) {
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
};
});
co_await sleep_abortable(heartbeat_interval, as).handle_exception([] (std::exception_ptr) {});
}
nlogger.info("{}[{}]: Stopped heartbeat_updater", desc, uuid());
}