In get_children we get the vector of alive nodes with get_nodes. Yet, between this and sending rpc to those nodes there might be a preemption. Currently, the liveness of a node is checked once again before the rpcs (only with gossiper not in topology - unlike get_nodes). Modify get_children, so that it keeps a token_metadata_ptr, preventing topology from changing between get_nodes and rpcs. Remove test_get_children as it checked if the get_children method won't fail if a node is down after get_nodes - which cannot happen currently.
227 lines
8.3 KiB
C++
227 lines
8.3 KiB
C++
/*
|
|
* Copyright (C) 2024-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "db/system_keyspace.hh"
|
|
#include "node_ops/task_manager_module.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/topology_coordinator.hh"
|
|
#include "service/topology_state_machine.hh"
|
|
#include "tasks/task_handler.hh"
|
|
#include "tasks/virtual_task_hint.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include <variant>
|
|
#include "utils/overloaded_functor.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
namespace node_ops {
|
|
|
|
static sstring request_type_to_task_type(const std::variant<std::monostate, service::topology_request, service::global_topology_request>& request_type) {
|
|
return std::visit(overloaded_functor {
|
|
[] (const service::topology_request& type) -> sstring {
|
|
switch (type) {
|
|
case service::topology_request::join:
|
|
return "bootstrap";
|
|
case service::topology_request::remove:
|
|
return "remove node";
|
|
case service::topology_request::leave:
|
|
return "decommission";
|
|
default:
|
|
return fmt::to_string(type);
|
|
}
|
|
},
|
|
[] (const service::global_topology_request type) -> sstring {
|
|
return fmt::to_string(type);
|
|
},
|
|
[] (const std::monostate type) -> sstring {
|
|
return "";
|
|
}
|
|
}, request_type);
|
|
}
|
|
|
|
static tasks::task_manager::task_state get_state(const db::system_keyspace::topology_requests_entry& entry) {
|
|
if (!entry.id) {
|
|
return tasks::task_manager::task_state::created;
|
|
} else if (!entry.done) {
|
|
return tasks::task_manager::task_state::running;
|
|
} else if (entry.error == "") {
|
|
return tasks::task_manager::task_state::done;
|
|
} else {
|
|
return tasks::task_manager::task_state::failed;
|
|
}
|
|
}
|
|
|
|
static future<db::system_keyspace::topology_requests_entries> get_entries(db::system_keyspace& sys_ks, std::chrono::seconds ttl) {
|
|
return sys_ks.get_node_ops_request_entries(db_clock::now() - ttl);
|
|
}
|
|
|
|
static tasks::task_stats get_task_stats(const db::system_keyspace::topology_requests_entry& entry,
|
|
const tasks::virtual_task_hint& hint) {
|
|
return tasks::task_stats{
|
|
.task_id = tasks::task_id{entry.id},
|
|
.type = request_type_to_task_type(entry.request_type),
|
|
.kind = tasks::task_kind::cluster,
|
|
.scope = "cluster",
|
|
.state = get_state(entry),
|
|
.sequence_number = 0,
|
|
.keyspace = "",
|
|
.table = "",
|
|
.entity = hint.node_id ? fmt::to_string(*hint.node_id) : "",
|
|
.shard = 0,
|
|
.start_time = entry.start_time,
|
|
.end_time = entry.end_time,
|
|
};
|
|
}
|
|
|
|
future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) {
|
|
auto entry_opt = co_await _ss._sys_ks.local().get_topology_request_entry_opt(id.uuid());
|
|
if (!entry_opt) {
|
|
co_return std::nullopt;
|
|
}
|
|
auto& entry = *entry_opt;
|
|
auto stats = get_task_stats(entry, hint);
|
|
co_return tasks::task_status{
|
|
.task_id = stats.task_id,
|
|
.type = stats.type,
|
|
.kind = stats.kind,
|
|
.scope = stats.scope,
|
|
.state = stats.state,
|
|
.is_abortable = co_await is_abortable(std::move(hint)),
|
|
.start_time = stats.start_time,
|
|
.end_time = stats.end_time,
|
|
.error = entry.error,
|
|
.parent_id = tasks::task_id::create_null_id(),
|
|
.sequence_number = stats.sequence_number,
|
|
.shard = stats.shard,
|
|
.keyspace = stats.keyspace,
|
|
.table = stats.table,
|
|
.entity = stats.entity,
|
|
.progress_units = "",
|
|
.progress = tasks::task_manager::task::progress{},
|
|
.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr())
|
|
};
|
|
}
|
|
|
|
tasks::task_manager::task_group node_ops_virtual_task::get_group() const noexcept {
|
|
return tasks::task_manager::task_group::topology_change_group;
|
|
}
|
|
|
|
static std::map<tasks::task_id, locator::host_id> get_requests(const service::topology& topology) {
|
|
std::map<tasks::task_id, locator::host_id> result;
|
|
for (auto& request : topology.requests) {
|
|
auto* rs = topology.find(request.first);
|
|
if (rs) {
|
|
result.emplace(tasks::task_id(rs->second.request_id), locator::host_id(request.first.uuid()));
|
|
}
|
|
}
|
|
for (auto& [node, rs] : topology.transition_nodes) {
|
|
result.emplace(tasks::task_id(rs.request_id), locator::host_id(node.uuid()));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
future<std::optional<tasks::virtual_task_hint>> node_ops_virtual_task::contains(tasks::task_id task_id) const {
|
|
if (!task_id.uuid().is_timestamp()) {
|
|
// Task id of node ops operation is always a timestamp.
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
auto hint = std::make_optional<tasks::virtual_task_hint>({});
|
|
service::topology& topology = _ss._topology_state_machine._topology;
|
|
auto reqs = get_requests(topology);
|
|
if (reqs.contains(task_id)) {
|
|
hint->node_id = reqs.at(task_id);
|
|
co_return hint;
|
|
}
|
|
|
|
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(task_id.uuid());
|
|
co_return entry && std::holds_alternative<service::topology_request>(entry->request_type) ? hint : std::nullopt;
|
|
}
|
|
|
|
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint hint) const {
|
|
// Currently, only node operations are supported by abort_topology_request().
|
|
return make_ready_future<tasks::is_abortable>(tasks::is_abortable(hint.node_id.has_value()));
|
|
}
|
|
|
|
future<std::optional<tasks::task_status>> node_ops_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) {
|
|
auto entry = co_await get_status(id, hint);
|
|
if (!entry) {
|
|
co_return std::nullopt;
|
|
}
|
|
|
|
co_await _ss.wait_for_topology_request_completion(id.uuid(), false);
|
|
co_return co_await get_status(id, std::move(hint));
|
|
}
|
|
|
|
future<> node_ops_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept {
|
|
co_await _ss.abort_topology_request(id.uuid());
|
|
}
|
|
|
|
future<std::vector<tasks::task_stats>> node_ops_virtual_task::get_stats() {
|
|
db::system_keyspace& sys_ks = _ss._sys_ks.local();
|
|
co_return std::ranges::to<std::vector<tasks::task_stats>>(co_await get_entries(sys_ks, get_task_manager().get_user_task_ttl())
|
|
| std::views::transform([reqs = get_requests(_ss._topology_state_machine._topology)] (const auto& e) {
|
|
auto id = tasks::task_id{e.first};
|
|
auto& entry = e.second;
|
|
tasks::virtual_task_hint hint;
|
|
if (reqs.contains(id)) {
|
|
hint.node_id = reqs.at(id);
|
|
}
|
|
return get_task_stats(entry, hint);
|
|
}));
|
|
}
|
|
|
|
streaming_task_impl::streaming_task_impl(tasks::task_manager::module_ptr module,
|
|
tasks::task_id parent_id,
|
|
streaming::stream_reason reason,
|
|
std::optional<shared_future<>>& result,
|
|
std::function<future<>()> action) noexcept
|
|
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", "", "", "", parent_id)
|
|
, _reason(reason)
|
|
, _result(result)
|
|
, _action(std::move(action))
|
|
{}
|
|
|
|
std::string streaming_task_impl::type() const {
|
|
return fmt::format("{}: streaming", _reason);
|
|
}
|
|
|
|
tasks::is_internal streaming_task_impl::is_internal() const noexcept {
|
|
return tasks::is_internal::no;
|
|
}
|
|
|
|
future<> streaming_task_impl::run() {
|
|
// If no operation was previously started - start it now
|
|
// If previous operation still running - wait for it an return its result
|
|
// If previous operation completed successfully - return immediately
|
|
// If previous operation failed - restart it
|
|
if (!_result || _result->failed()) {
|
|
if (_result) {
|
|
service::rtlogger.info("retry streaming after previous attempt failed with {}", _result->get_future().get_exception());
|
|
} else {
|
|
service::rtlogger.info("start streaming");
|
|
}
|
|
_result = _action();
|
|
} else {
|
|
service::rtlogger.debug("already streaming");
|
|
}
|
|
co_await _result.value().get_future();
|
|
service::rtlogger.info("streaming completed");
|
|
}
|
|
|
|
task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept
|
|
: tasks::task_manager::module(tm, "node_ops")
|
|
, _ss(ss)
|
|
{}
|
|
|
|
std::set<locator::host_id> task_manager_module::get_nodes() const {
|
|
return get_task_manager().get_nodes(_ss);
|
|
}
|
|
|
|
}
|