Files
scylladb/service/topology_coordinator.cc
copilot-swe-agent[bot] 3b793ef09f Merge topology_request_tracking_mutation_builder calls for keyspace_rf_change
Combined the two separate topology_request_tracking_mutation_builder calls
into one, setting both start_time and done status in the same mutation builder
to reduce redundancy.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:53:23 +00:00

4100 lines
216 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <chrono>
#include <fmt/ranges.h>
#include <memory>
#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/util/noncopyable_function.hh>
#include <variant>
#include "auth/service.hh"
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/system_keyspace.hh"
#include "dht/boot_strapper.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "locator/host_id.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata.hh"
#include "locator/network_topology_strategy.hh"
#include "message/messaging_service.hh"
#include "mutation/async_utils.hh"
#include "raft/raft.hh"
#include "replica/database.hh"
#include "replica/tablet_mutation_builder.hh"
#include "replica/tablets.hh"
#include "db/config.hh"
#include "db/view/view_builder.hh"
#include "service/qos/service_level_controller.hh"
#include "service/migration_manager.hh"
#include "service/raft/group0_voter_handler.hh"
#include "service/raft/join_node.hh"
#include "service/raft/raft_group0.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "service/topology_state_machine.hh"
#include "db/view/view_building_coordinator.hh"
#include "topology_mutation.hh"
#include "utils/assert.hh"
#include "utils/error_injection.hh"
#include "utils/stall_free.hh"
#include "utils/to_string.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "idl/join_node.dist.hh"
#include "idl/storage_service.dist.hh"
#include "replica/exceptions.hh"
#include "service/paxos/prepare_response.hh"
#include "idl/storage_proxy.dist.hh"
#include "utils/updateable_value.hh"
#include "repair/repair.hh"
#include "idl/repair.dist.hh"
#include "service/topology_coordinator.hh"
#include <boost/range/join.hpp>
using token = dht::token;
using inet_address = gms::inet_address;
namespace service {
logging::logger rtlogger("raft_topology");
locator::host_id to_host_id(raft::server_id id) {
return locator::host_id{id.uuid()};
}
future<> wait_for_gossiper(raft::server_id id, const gms::gossiper& g, abort_source& as) {
const auto timeout = std::chrono::seconds{30};
const auto deadline = lowres_clock::now() + timeout;
while (true) {
auto hids = g.get_node_ip(to_host_id(id));
if (hids) {
co_return;
}
if (lowres_clock::now() > deadline) {
co_await coroutine::exception(std::make_exception_ptr(
wait_for_ip_timeout(id, std::chrono::duration_cast<std::chrono::seconds>(timeout).count())));
}
static thread_local logger::rate_limit rate_limit{std::chrono::seconds(1)};
rtlogger.log(log_level::warn, rate_limit, "cannot map {} to ip, retrying.", id);
co_await sleep_abortable(std::chrono::milliseconds(5), as);
}
}
namespace {
// Doesn't throw error on absence of values.
sstring get_application_state_gently(const gms::application_state_map& epmap, gms::application_state app_state) {
const auto it = epmap.find(app_state);
if (it == epmap.end()) {
return sstring{};
}
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
}
} // namespace
class topology_coordinator : public endpoint_lifecycle_subscriber {
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
gms::gossiper& _gossiper;
netw::messaging_service& _messaging;
locator::shared_token_metadata& _shared_tm;
db::system_keyspace& _sys_ks;
replica::database& _db;
utils::updateable_value<uint32_t> _tablet_load_stats_refresh_interval_in_seconds;
service::raft_group0& _group0;
service::topology_state_machine& _topo_sm;
db::view::view_building_state_machine& _vb_sm;
abort_source& _as;
gms::feature_service& _feature_service;
endpoint_lifecycle_notifier& _lifecycle_notifier;
qos::service_level_controller& _sl_controller;
raft::server& _raft;
const raft::term_t _term;
uint64_t _last_cmd_index = 0;
raft_topology_cmd_handler_type _raft_topology_cmd_handler;
tablet_allocator& _tablet_allocator;
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
cdc::generation_service& _cdc_gens;
// The reason load_stats_ptr is a shared ptr is that load balancer can yield, and we don't want it
// to suffer lifetime issues when stats refresh fiber overrides the current stats.
std::unordered_map<locator::host_id, locator::load_stats> _load_stats_per_node;
serialized_action _tablet_load_stats_refresh;
static constexpr std::chrono::seconds cdc_streams_gc_refresh_interval = std::chrono::seconds(60);
std::chrono::milliseconds _ring_delay;
gate::holder _group0_holder;
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
// Engaged if an ongoing topology change should be rolled back. The string inside
// will indicate a reason for the rollback.
std::optional<sstring> _rollback;
group0_voter_handler _voter_handler;
topology_coordinator_cmd_rpc_tracker& _topology_cmd_rpc_tracker;
const locator::token_metadata& get_token_metadata() const noexcept {
return *_shared_tm.get();
}
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept {
return _shared_tm.get();
}
// This is a topology snapshot for a given node. It contains pointers into the topology state machine
// that may be outdated after guard is released so the structure is meant to be destroyed together
// with the guard
struct node_to_work_on {
group0_guard guard;
const topology_state_machine::topology_type* topology;
raft::server_id id;
const replica_state* rs;
std::optional<topology_request> request;
std::optional<request_param> req_param;
};
future<group0_guard> cleanup_group0_config_if_needed(group0_guard guard) {
auto& topo = _topo_sm._topology;
auto rconf = _group0.group0_server().get_configuration();
if (!rconf.is_joint()) {
// Find nodes that 'left' but still in the config and remove them
auto to_remove = std::ranges::to<std::vector<raft::server_id>>(
rconf.current
| std::views::transform([&] (const raft::config_member& m) { return m.addr.id; })
| std::views::filter([&] (const raft::server_id& id) { return topo.left_nodes.contains(id); }));
if (!to_remove.empty()) {
// Remove from group 0 nodes that left. They may failed to do so by themselves
release_guard(std::move(guard));
try {
rtlogger.debug("topology coordinator fiber removing {}"
" from raft since they are in `left` state", to_remove);
co_await _group0.group0_server().modify_config({}, to_remove, &_as);
} catch (const raft::commit_status_unknown&) {
rtlogger.warn("topology coordinator fiber got commit_status_unknown status"
" while removing {} from raft", to_remove);
}
guard = co_await start_operation();
}
}
co_return std::move(guard);
}
struct cancel_requests {
group0_guard guard;
std::unordered_set<raft::server_id> dead_nodes;
};
// Request to start a cluster-wide cleanup of vnodes-based tables on nodes marked
// as cleanup_need. This step is required before removenode or decommission
// topology operations.
struct start_vnodes_cleanup {
group0_guard guard;
topology_request request;
raft::server_id request_server_id;
};
// Return dead nodes
std::unordered_set<locator::host_id> get_dead_nodes() const {
std::unordered_set<locator::host_id> dead_set;
for (auto& n : _topo_sm._topology.normal_nodes) {
bool alive = false;
try {
alive = _gossiper.is_alive(locator::host_id(n.first.uuid()));
} catch (...) {}
if (!alive) {
dead_set.insert(locator::host_id(n.first.uuid()));
}
}
return dead_set;
}
// Return dead nodes and while at it checking if there are live nodes that either need cleanup
// or running one already
std::unordered_set<raft::server_id> get_dead_node(bool& cleanup_running, bool& cleanup_needed) const {
std::unordered_set<raft::server_id> dead_set;
cleanup_needed = cleanup_running = false;
for (auto& n : _topo_sm._topology.normal_nodes) {
bool alive = false;
try {
alive = _gossiper.is_alive(locator::host_id(n.first.uuid()));
} catch (...) {}
if (!alive) {
dead_set.insert(n.first);
} else {
cleanup_running |= (n.second.cleanup == cleanup_status::running);
cleanup_needed |= (n.second.cleanup == cleanup_status::needed);
}
}
return dead_set;
}
std::optional<request_param> get_request_param(raft::server_id id) const {
return _topo_sm._topology.get_request_param(id);
};
// Returns:
// guard - there is nothing to do.
// cancel_requests - no request can be started so cancel the queue
// start_vnodes_cleanup - cleanup needs to be started
// node_to_work_on - the node the topology coordinator should work on
std::variant<group0_guard, cancel_requests, start_vnodes_cleanup, node_to_work_on> get_next_task(group0_guard guard) {
auto& topo = _topo_sm._topology;
if (topo.transition_nodes.size() != 0) {
// If there is a node that is the middle of topology operation continue with it
return get_node_to_work_on(std::move(guard));
}
bool cleanup_running;
bool cleanup_needed;
const auto dead_nodes = get_dead_node(cleanup_running, cleanup_needed);
if (cleanup_running || topo.requests.empty()) {
// Ether there is no requests or there is a live node that runs cleanup. Wait for it to complete.
return std::move(guard);
}
std::optional<std::pair<raft::server_id, topology_request>> next_req;
for (auto& req : topo.requests) {
auto enough_live_nodes = [&] {
if (req.second == topology_request::rebuild) {
// For rebuild only the node itself should be alive to start it
// it may still fail if down node has data for the rebuild process
return !dead_nodes.contains(req.first);
}
auto exclude_nodes = get_excluded_nodes_for_topology_request(topo, req.first, req.second);
for (auto id : dead_nodes) {
if (!exclude_nodes.contains(id)) {
return false;
}
}
return true;
};
if (enough_live_nodes()) {
if (!next_req || next_req->second > req.second) {
next_req = req;
}
}
}
if (!next_req) {
// We did not find a request that has enough live node to proceed
// Cancel all requests to let admin know that no operation can succeed
rtlogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes);
return cancel_requests{std::move(guard), std::move(dead_nodes)};
}
auto [id, req] = *next_req;
if (cleanup_needed && (req == topology_request::remove || req == topology_request::leave)) {
// If the highest prio request is removenode or decommission we need to start cleanup if one is needed
return start_vnodes_cleanup(std::move(guard), req, id);
}
return node_to_work_on(std::move(guard), &topo, id, &topo.find(id)->second, req, get_request_param(id));
};
node_to_work_on get_node_to_work_on(group0_guard guard) const {
auto& topo = _topo_sm._topology;
if (topo.transition_nodes.empty()) {
on_internal_error(rtlogger, ::format(
"could not find node to work on"
" even though the state requires it (state: {})", topo.tstate));
}
auto e = &*topo.transition_nodes.begin();
return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::nullopt, get_request_param(e->first)};
};
future<group0_guard> start_operation() {
rtlogger.debug("obtaining group 0 guard...");
auto guard = co_await _group0.client().start_operation(_as);
rtlogger.debug("guard taken, prev_state_id: {}, new_state_id: {}, coordinator term: {}, current Raft term: {}",
guard.observed_group0_state_id(), guard.new_group0_state_id(), _term, _raft.get_current_term());
if (_term != _raft.get_current_term()) {
throw term_changed_error{};
}
co_return std::move(guard);
}
void release_node(std::optional<node_to_work_on> node) {
// Leaving the scope destroys the object and releases the guard.
}
node_to_work_on retake_node(group0_guard guard, raft::server_id id) const {
auto& topo = _topo_sm._topology;
auto it = topo.find(id);
SCYLLA_ASSERT(it);
std::optional<topology_request> req;
auto rit = topo.requests.find(id);
if (rit != topo.requests.end()) {
req = rit->second;
}
std::optional<request_param> req_param;
auto pit = topo.req_param.find(id);
if (pit != topo.req_param.end()) {
req_param = pit->second;
}
return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)};
}
group0_guard take_guard(node_to_work_on&& node) {
return std::move(node.guard);
}
future<> update_topology_state(
group0_guard guard, utils::chunked_vector<canonical_mutation>&& updates, const sstring& reason) {
try {
rtlogger.info("updating topology state: {}", reason);
rtlogger.trace("update_topology_state mutations: {}", updates);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
} catch (group0_concurrent_modification&) {
rtlogger.info("race while changing state: {}. Retrying", reason);
throw;
}
};
raft::server_id parse_replaced_node(const std::optional<request_param>& req_param) const {
return service::topology::parse_replaced_node(req_param);
}
future<> exec_direct_command_helper(raft::server_id id, uint64_t cmd_index, const raft_topology_cmd& cmd) {
rtlogger.debug("send {} command with term {} and index {} to {}",
cmd.cmd, _term, cmd_index, id);
_topology_cmd_rpc_tracker.active_dst.emplace(id);
auto _ = seastar::defer([this, id] { _topology_cmd_rpc_tracker.active_dst.erase(id); });
auto result = _db.get_token_metadata().get_topology().is_me(to_host_id(id)) ?
co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) :
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, to_host_id(id), id, _term, cmd_index, cmd);
if (result.status == raft_topology_cmd_result::command_status::fail) {
co_await coroutine::exception(std::make_exception_ptr(
std::runtime_error(::format("failed status returned from {}", id))));
}
};
future<node_to_work_on> exec_direct_command(node_to_work_on&& node, const raft_topology_cmd& cmd) {
auto id = node.id;
release_node(std::move(node));
const auto cmd_index = ++_last_cmd_index;
_topology_cmd_rpc_tracker.current = cmd.cmd;
_topology_cmd_rpc_tracker.index = cmd_index;
co_await exec_direct_command_helper(id, cmd_index, cmd);
co_return retake_node(co_await start_operation(), id);
};
future<> exec_global_command_helper(auto nodes, const raft_topology_cmd& cmd) {
const auto cmd_index = ++_last_cmd_index;
_topology_cmd_rpc_tracker.current = cmd.cmd;
_topology_cmd_rpc_tracker.index = cmd_index;
auto f = co_await coroutine::as_future(
seastar::parallel_for_each(std::move(nodes), [this, &cmd, cmd_index] (raft::server_id id) {
return exec_direct_command_helper(id, cmd_index, cmd);
}));
if (f.failed()) {
co_await coroutine::return_exception(std::runtime_error(
::format("raft topology: exec_global_command({}) failed with {}",
cmd.cmd, f.get_exception())));
}
};
future<group0_guard> exec_global_command(
group0_guard guard, const raft_topology_cmd& cmd,
const std::unordered_set<raft::server_id>& exclude_nodes,
drop_guard_and_retake drop_and_retake = drop_guard_and_retake::yes) {
rtlogger.info("executing global topology command {}, excluded nodes: {}", cmd.cmd, exclude_nodes);
auto nodes = boost::range::join(_topo_sm._topology.normal_nodes, _topo_sm._topology.transition_nodes)
| std::views::filter([&exclude_nodes] (const std::pair<const raft::server_id, replica_state>& n) {
return !exclude_nodes.contains(n.first);
})
| std::views::keys;
if (drop_and_retake) {
release_guard(std::move(guard));
}
co_await exec_global_command_helper(std::move(nodes), cmd);
if (drop_and_retake) {
guard = co_await start_operation();
}
co_return guard;
}
std::unordered_set<raft::server_id> get_excluded_nodes_for_topology_request(const topology_state_machine::topology_type& topo,
raft::server_id id, const std::optional<topology_request>& req) const {
// ignored_nodes is not per request any longer, but for now consider ignored nodes only
// for remove and replace operations since only those operations support it on streaming level.
// Specifically, streaming for bootstrapping doesn't support ignored_nodes
// (see raft_topology_cmd::command::stream_ranges handler in storage_proxy, neither
// bootstrap_with_repair nor bs.bootstrap take an ignored_nodes parameter).
// If we ignored a dead node for a join request here, this request wouldn't be cancelled by
// get_next_task and the stream_ranges would fail with timeout after wasting some time
// trying to access the dead node.
const auto is_remove_or_replace = std::invoke([&]() {
if (req) {
return *req == topology_request::remove || *req == topology_request::replace;
}
const auto it = topo.transition_nodes.find(id);
if (it == topo.transition_nodes.end()) {
return false;
}
const auto s = it->second.state;
return s == node_state::removing || s == node_state::replacing;
});
std::unordered_set<raft::server_id> exclude_nodes;
if (is_remove_or_replace) {
exclude_nodes = topo.ignored_nodes;
}
return exclude_nodes;
}
std::unordered_set<raft::server_id> get_excluded_nodes_for_topology_request(const node_to_work_on& node) const {
return get_excluded_nodes_for_topology_request(*node.topology, node.id, node.request);
}
future<group0_guard> remove_from_group0(group0_guard guard, const raft::server_id& id) {
rtlogger.info("removing node {} from group 0 configuration...", id);
release_guard(std::move(guard));
co_await _voter_handler.on_node_removed(id, _as);
co_await _group0.remove_from_raft_config(id);
rtlogger.info("node {} removed from group 0 configuration", id);
co_return co_await start_operation();
}
future<> step_down_as_nonvoter() {
// Become a nonvoter which triggers a leader stepdown.
co_await _voter_handler.on_node_removed(_raft.id(), _as);
if (_raft.is_leader()) {
co_await _raft.wait_for_state_change(&_as);
}
// throw term_changed_error so we leave the coordinator loop instead of trying another
// read_barrier which may fail with an (harmless, but unnecessary and annoying) error
// telling us we're not in the configuration anymore (we'll get removed by the new
// coordinator)
throw term_changed_error{};
}
struct bootstrapping_info {
const std::unordered_set<token>& bootstrap_tokens;
const replica_state& rs;
};
// Returns data for a new CDC generation in the form of mutations for the CDC_GENERATIONS_V3 table
// and the generation's UUID.
//
// If there's a bootstrapping node, its tokens should be included in the new generation.
// Pass them and a reference to the bootstrapping node's replica_state through `binfo`.
future<std::pair<utils::UUID, utils::chunked_vector<mutation>>> prepare_new_cdc_generation_data(
locator::token_metadata_ptr tmptr, const group0_guard& guard, std::optional<bootstrapping_info> binfo,
noncopyable_function<std::pair<size_t, uint8_t>(locator::host_id)> get_sharding_info_for_host_id) {
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
if (binfo && binfo->bootstrap_tokens.contains(end)) {
return {binfo->rs.shard_count, binfo->rs.ignore_msb};
} else {
auto ep = tmptr->get_endpoint(end);
if (!ep) {
// get_sharding_info is only called for bootstrap tokens
// or for tokens present in token_metadata
on_internal_error(rtlogger, ::format(
"make_new_cdc_generation_data: get_sharding_info:"
" can't find endpoint for token {}", end));
}
try {
return get_sharding_info_for_host_id(*ep);
} catch (...) {
on_internal_error(rtlogger, ::format(
"make_new_cdc_generation_data: get_sharding_info:"
" can't get sharding info for node {}, owner of token {}."
" Reason: {}", *ep, end, std::current_exception()));
}
}
};
auto gen_uuid = guard.new_group0_state_id();
auto gen_desc = cdc::make_new_generation_description(
binfo ? binfo->bootstrap_tokens : std::unordered_set<token>{}, get_sharding_info, tmptr);
auto gen_table_schema = _db.find_schema(
db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
const size_t max_command_size = _raft.max_command_size();
const size_t mutation_size_threshold = max_command_size / 2;
auto gen_mutations = co_await cdc::get_cdc_generation_mutations_v3(
gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, guard.write_timestamp());
co_return std::pair{gen_uuid, std::move(gen_mutations)};
}
// Broadcasts all mutations returned from `prepare_new_cdc_generation_data` except the last one.
// Each mutation is sent in separate raft command. It takes `group0_guard`, and if the number of mutations
// is greater than one, the guard is dropped, and a new one is created and returned, otherwise the old one
// will be returned. Commands are sent in parallel and unguarded (the guard used for sending the last mutation
// will guarantee that the term hasn't been changed). Returns the generation's UUID, guard and last mutation,
// which will be sent with additional topology data by the caller.
//
// If we send the last mutation in the `write_mutation` command, we would use a total of `n + 1` commands
// instead of `n-1 + 1` (where `n` is the number of mutations), so it's better to send it in `topology_change`
// (we need to send it after all `write_mutations`) with some small metadata.
//
// With the default commitlog segment size, `mutation_size_threshold` will be 4 MB. In large clusters e.g.
// 100 nodes, 64 shards per node, 256 vnodes cdc generation data can reach the size of 30 MB, thus
// there will be no more than 8 commands.
//
// In a multi-DC cluster with 100ms latencies between DCs, this operation should take about 200ms since we
// send the commands concurrently, but even if the commands were replicated sequentially by Raft,
// it should take no more than 1.6s which is incomparably smaller than bootstrapping operation
// (bootstrapping is quick if there is no data in the cluster, but usually if one has 100 nodes they
// have tons of data, so indeed streaming/repair will take much longer (hours/days)).
future<std::tuple<utils::UUID, group0_guard, canonical_mutation>> prepare_and_broadcast_cdc_generation_data(
locator::token_metadata_ptr tmptr, group0_guard guard, std::optional<bootstrapping_info> binfo) {
auto get_sharding_info_for_host_id = [&] (locator::host_id ep) -> std::pair<size_t, uint8_t> {
auto ptr = _topo_sm._topology.find(raft::server_id{ep.uuid()});
if (!ptr) {
throw std::runtime_error(::format(
"raft topology: prepare_and_broadcast_cdc_generation_data: get_sharding_info_for_host_id:"
" couldn't find node {} in topology", ep));
}
auto& rs = ptr->second;
return {rs.shard_count, rs.ignore_msb};
};
co_return co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), binfo, get_sharding_info_for_host_id);
}
future<std::tuple<utils::UUID, group0_guard, canonical_mutation>> prepare_and_broadcast_cdc_generation_data(
locator::token_metadata_ptr tmptr, group0_guard guard, std::optional<bootstrapping_info> binfo,
noncopyable_function<std::pair<size_t, uint8_t>(locator::host_id)> get_sharding_info_for_host_id) {
auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, binfo, std::move(get_sharding_info_for_host_id));
if (gen_mutations.empty()) {
on_internal_error(rtlogger, "cdc_generation_data: gen_mutations is empty");
}
utils::chunked_vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
if (updates.size() > 1) {
release_guard(std::move(guard));
co_await parallel_for_each(updates.begin(), std::prev(updates.end()), [this, gen_uuid = gen_uuid] (canonical_mutation& m) {
auto const reason = format(
"insert CDC generation data (UUID: {}), part", gen_uuid);
rtlogger.trace("do update {} reason {}", m, reason);
write_mutations change{{std::move(m)}};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), reason);
return _group0.client().add_entry_unguarded(std::move(g0_cmd), &_as);
});
guard = co_await start_operation();
}
co_return std::tuple{gen_uuid, std::move(guard), std::move(updates.back())};
}
// Deletes obsolete CDC generations. These are the published generations that stopped operating
// sufficiently long ago (see comment below).
//
// Appends necessary mutations to `updates` and updates the `reason` string.
future<> clean_obsolete_cdc_generations(
const group0_guard& guard,
utils::chunked_vector<canonical_mutation>& updates,
sstring& reason) {
const auto& committed_gens = _topo_sm._topology.committed_cdc_generations;
if (committed_gens.empty()) {
co_return;
}
// If some node can still accept a write to a CDC generation, we cannot delete this generation.
// A request coordinator accepts a write to one of the previous generations (the ones that stopped
// operating before `now`) if the write's timestamp is higher than `now - 5s` where `now` is
// provided by the coordinator's local clock. So, we can safely delete a generation if it
// stopped operating more than 5s ago on all nodes. But since their clocks can be desynchronized,
// we have to account for that. We assume that the max clock difference is within `ring_delay`.
// So we delete only generations that stopped operating more than `5s + ring_delay` ago.
auto ts_upper_bound = db_clock::now() - (cdc::get_generation_leeway() + _ring_delay);
utils::get_local_injector().inject("clean_obsolete_cdc_generations_change_ts_ub", [&] {
ts_upper_bound = db_clock::now();
});
// Theoretically, some generations might not be published within `5s + ring_delay` after creation.
// If that happens, we reduce `ts_upper_bound` to ensure they aren't deleted.
if (!_topo_sm._topology.unpublished_cdc_generations.empty()) {
auto first_unpublished_ts = _topo_sm._topology.unpublished_cdc_generations.front().ts;
if (first_unpublished_ts < ts_upper_bound) {
ts_upper_bound = first_unpublished_ts;
}
}
std::optional<std::vector<cdc::generation_id_v2>::const_iterator> first_nonobsolete_gen_it;
for (auto it = committed_gens.begin(); it != committed_gens.end() && it->ts <= ts_upper_bound; it++) {
if (it + 1 == committed_gens.end() || (it + 1)->ts > ts_upper_bound) {
first_nonobsolete_gen_it = it;
}
}
if (!first_nonobsolete_gen_it || *first_nonobsolete_gen_it == committed_gens.begin()) {
co_return;
}
auto mut_ts = guard.write_timestamp();
// Insert a tombstone covering all obsolete generations.
auto s = _db.find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
auto id_upper_bound = (*first_nonobsolete_gen_it)->id;
mutation m(s, partition_key::from_singular(*s, cdc::CDC_GENERATIONS_V3_KEY));
auto range = query::clustering_range::make_ending_with({
clustering_key_prefix::from_single_value(*s, timeuuid_type->decompose(id_upper_bound)), false});
auto bv = bound_view::from_range(range);
m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{mut_ts, gc_clock::now()}});
updates.push_back(canonical_mutation(m));
std::vector<cdc::generation_id_v2> new_committed_gens(*first_nonobsolete_gen_it, committed_gens.end());
topology_mutation_builder builder(guard.write_timestamp());
builder.set_committed_cdc_generations(std::move(new_committed_gens));
updates.push_back(builder.build());
reason += ::format("deleted data of CDC generations with time UUID lower than {}", id_upper_bound);
}
// If there are some unpublished CDC generations, publishes the one with the oldest timestamp
// to user-facing description tables.
//
// Appends necessary mutations to `updates` and updates the `reason` string.
future<> publish_oldest_cdc_generation(
const group0_guard& guard,
utils::chunked_vector<canonical_mutation>& updates,
sstring& reason) {
const auto& unpublished_gens = _topo_sm._topology.unpublished_cdc_generations;
if (unpublished_gens.empty()) {
co_return;
}
// The generation under index 0 is the oldest because unpublished_cdc_generations are sorted by timestamp.
auto gen_id = unpublished_gens[0];
auto gen_data = co_await _sys_ks.read_cdc_generation(gen_id.id);
co_await _sys_dist_ks.local().create_cdc_desc(
gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() });
std::vector<cdc::generation_id_v2> new_unpublished_gens(unpublished_gens.begin() + 1, unpublished_gens.end());
topology_mutation_builder builder(guard.write_timestamp());
builder.set_unpublished_cdc_generations(std::move(new_unpublished_gens));
updates.push_back(builder.build());
reason += ::format("published CDC generation with ID {}, ", gen_id);
}
// The background fiber of the topology coordinator that continually publishes committed yet unpublished
// CDC generations. Every generation is published in a separate group 0 operation.
//
// It also continually cleans the obsolete CDC generation data.
future<> cdc_generation_publisher_fiber() {
rtlogger.debug("start CDC generation publisher fiber");
while (!_as.abort_requested()) {
co_await utils::get_local_injector().inject("cdc_generation_publisher_fiber", [] (auto& handler) -> future<> {
rtlogger.info("CDC generation publisher fiber sleeps after injection");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("CDC generation publisher fiber finishes sleeping after injection");
}, false);
bool sleep = false;
try {
auto guard = co_await start_operation();
utils::chunked_vector<canonical_mutation> updates;
sstring reason;
co_await publish_oldest_cdc_generation(guard, updates, reason);
co_await clean_obsolete_cdc_generations(guard, updates, reason);
if (!updates.empty()) {
co_await update_topology_state(std::move(guard), std::move(updates), std::move(reason));
} else {
release_guard(std::move(guard));
}
if (_topo_sm._topology.unpublished_cdc_generations.empty()) {
// No CDC generations to publish. Wait until one appears or the topology coordinator aborts.
rtlogger.debug("CDC generation publisher fiber has nothing to do. Sleeping.");
co_await _topo_sm.event.when([&] () {
return !_topo_sm._topology.unpublished_cdc_generations.empty() || _as.abort_requested();
});
rtlogger.debug("CDC generation publisher fiber wakes up");
}
} catch (raft::request_aborted&) {
rtlogger.debug("CDC generation publisher fiber aborted");
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("CDC generation publisher fiber aborted");
} catch (group0_concurrent_modification&) {
} catch (term_changed_error&) {
rtlogger.debug("CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term());
} catch (...) {
rtlogger.error("CDC generation publisher fiber got error {}", std::current_exception());
sleep = true;
}
if (sleep) {
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
rtlogger.debug("CDC generation publisher: sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
}
future<> cdc_streams_gc_fiber() {
auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); };
while (can_proceed()) {
bool sleep = true;
try {
auto guard = co_await start_operation();
utils::chunked_vector<canonical_mutation> updates;
co_await _cdc_gens.garbage_collect_cdc_streams(updates, guard.write_timestamp());
if (!updates.empty()) {
co_await update_topology_state(std::move(guard), std::move(updates), "CDC streams GC");
} else {
release_guard(std::move(guard));
}
} catch (raft::request_aborted&) {
rtlogger.debug("CDC streams GC fiber aborted");
sleep = false;
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("CDC streams GC fiber aborted");
sleep = false;
} catch (...) {
rtlogger.warn("CDC streams GC fiber got error {}", std::current_exception());
}
auto refresh_interval = utils::get_local_injector().is_enabled("short_cdc_streams_gc_refresh_interval") ?
std::chrono::seconds(1) : cdc_streams_gc_refresh_interval;
if (sleep && can_proceed()) {
try {
co_await seastar::sleep_abortable(refresh_interval, _as);
} catch (...) {
rtlogger.debug("CDC streams GC: sleep failed: {}", std::current_exception());
}
}
}
}
// If a node crashes after initiating gossip and before joining group0, it becomes orphan and
// remains in gossip. This background fiber periodically checks for such nodes and purges those
// entries from gossiper by committing the node as left in raft.
future<> gossiper_orphan_remover_fiber() {
rtlogger.debug("start gossiper orphan remover fiber");
bool do_speedup_fiber = false;
co_await utils::get_local_injector().inject("fast_orphan_removal_fiber", [&](auto& handler) -> future<> {
do_speedup_fiber = true;
// While testing, orphan ip is introduced, and its presence is captured. Wait is added before remover thread so that the presence of orphan ip is
// confirmed and the difference of gossiper ips before and after this thread application can be easily asserted.
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
});
while (!_as.abort_requested()) {
try {
auto guard = co_await start_operation();
utils::chunked_vector<canonical_mutation> updates;
int32_t timeout = 60;
co_await utils::get_local_injector().inject("speedup_orphan_removal", [&](auto& handler) -> future<> {
// Removes all unjoined nodes. Just for testing purposes.
timeout = 0;
co_return;
});
std::string reason = ::format("Ban the orphan nodes in group0. Orphan nodes HostId/IP:");
_gossiper.for_each_endpoint_state([&](const gms::endpoint_state& eps) -> void {
// Since generation is in seconds unit, converting current time to seconds eases comparison computations.
auto current_timestamp =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
auto generation = eps.get_heart_beat_state().get_generation().value();
auto host_id = eps.get_host_id();
if (current_timestamp - generation > timeout && !_topo_sm._topology.contains(raft::server_id{host_id.id}) && !_gossiper.is_alive(host_id)) {
topology_mutation_builder builder(guard.write_timestamp());
// This topology mutation moves a node to left state and bans it. Hence, the value of below fields are not useful.
// The dummy_value used for few fields indicates the trivialness of this row entry, and is used to detect this special case.
static constexpr uint32_t dummy_value = 0;
builder.with_node(raft::server_id{host_id.id})
.set("datacenter", get_application_state_gently(eps.get_application_state_map(), gms::application_state::DC))
.set("rack", get_application_state_gently(eps.get_application_state_map(), gms::application_state::RACK))
.set("release_version", get_application_state_gently(eps.get_application_state_map(), gms::application_state::RELEASE_VERSION))
.set("num_tokens", dummy_value)
.set("tokens_string", sstring{})
.set("shard_count", dummy_value)
.set("ignore_msb", dummy_value)
.set("request_id", dummy_value)
.set("cleanup_status", cleanup_status::clean)
.set("node_state", node_state::left);
reason.append(::format(" {}/{},", host_id, eps.get_ip()));
updates.push_back({builder.build()});
}
});
if (!updates.empty()) {
co_await update_topology_state(std::move(guard), std::move(updates), reason);
}
} catch (raft::request_aborted&) {
rtlogger.debug("gossiper orphan remover fiber aborted");
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("gossiper orphan remover fiber aborted");
} catch (group0_concurrent_modification&) {
} catch (term_changed_error&) {
rtlogger.debug("gossiper orphan remover fiber notices term change {} -> {}", _term, _raft.get_current_term());
} catch (...) {
rtlogger.error("gossiper orphan remover fiber got error {}", std::current_exception());
}
try {
co_await seastar::sleep_abortable(do_speedup_fiber ? std::chrono::milliseconds(1) : std::chrono::seconds(10), _as);
} catch (...) {
rtlogger.debug("gossiper orphan remover: sleep failed: {}", std::current_exception());
}
co_await coroutine::maybe_yield();
}
}
future<> group0_voter_refresher_fiber() {
rtlogger.debug("start group0 members refresh fiber");
// Skip waiting for the event in the first iteration to start the first voter refresh immediately.
//
// This is necessary to ensure we update the voters e.g. in case of a topology coordinator change
// (the previous topology coordinator might have died in which case we'll need to remove it from
// the voters).
bool skip_wait = true;
while (!_as.abort_requested()) {
try {
// only wait for the event in case we are not retrying the operation
if (!skip_wait) {
co_await await_event();
// Introduce a brief delay to potentially batch multiple changes that occur in close succession
co_await seastar::sleep_abortable(std::chrono::milliseconds{100}, _as);
}
skip_wait = false;
if (!_feature_service.group0_limited_voters) {
rtlogger.debug("group0 voters refresh fiber iteration skipped because the feature is disabled");
continue;
}
co_await _voter_handler.refresh(_as);
} catch (raft::request_aborted&) {
rtlogger.debug("group0 voters refresh fiber aborted");
break; // exit the loop immediately (not waiting for the abort source to be set)
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("group0 voters refresh fiber aborted");
break; // exit the loop immediately (not waiting for the abort source to be set)
} catch (group0_concurrent_modification&) {
rtlogger.debug("group0 voters refresh fiber notices concurrent modification, retrying...");
skip_wait = true;
} catch (term_changed_error&) {
rtlogger.debug("group0 voters refresh fiber notices term change {} -> {}", _term, _raft.get_current_term());
break; // exit the loop immediately (term change means we're not the coordinator anymore)
} catch (...) {
rtlogger.error("group0 voters refresh fiber got error {}", std::current_exception());
}
}
}
// Precondition: there is no node request and no ongoing topology transition
// (checked under the guard we're holding).
future<> handle_global_request(group0_guard guard) {
global_topology_request req;
utils::UUID req_id;
db::system_keyspace::topology_requests_entry req_entry;
if (_topo_sm._topology.global_request) {
req = *_topo_sm._topology.global_request;
req_id = _topo_sm._topology.global_request_id.value();
} else {
assert(_feature_service.topology_global_request_queue);
req_id = _topo_sm._topology.global_requests_queue[0];
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
req = std::get<global_topology_request>(req_entry.request_type);
}
switch (req) {
case global_topology_request::new_cdc_generation: {
rtlogger.info("new CDC generation requested");
auto tmptr = get_token_metadata_ptr();
auto [gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt);
guard = std::move(guard_);
topology_mutation_builder builder(guard.write_timestamp());
// We don't delete the request now, but only after the generation is committed. If we deleted
// the request now and received another new_cdc_generation request later, but before committing
// the new generation, the second request would also create a new generation. Deleting requests
// after the generation is committed prevents this from happening. The second request would have
// no effect - it would just overwrite the first request.
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id);
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder rtbuilder(req_id);
rtbuilder.set("start_time", db_clock::now());
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build(), rtbuilder.build()}, reason);
}
break;
case global_topology_request::cleanup:
co_await start_vnodes_cleanup_on_dirty_nodes(std::move(guard), req_id);
break;
case global_topology_request::keyspace_rf_change: {
rtlogger.info("keyspace_rf_change requested");
sstring ks_name;
std::unordered_map<sstring, sstring> saved_ks_props;
if (_topo_sm._topology.new_keyspace_rf_change_ks_name) {
ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data;
} else {
ks_name = *req_entry.new_keyspace_rf_change_ks_name;
saved_ks_props = *req_entry.new_keyspace_rf_change_data;
}
utils::chunked_vector<canonical_mutation> updates;
sstring error;
if (_db.has_keyspace(ks_name)) {
try {
auto& ks = _db.find_keyspace(ks_name);
auto tmptr = get_token_metadata_ptr();
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
new_ks_props.validate();
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, _db.features(), _db.get_config());
size_t unimportant_init_tablet_count = 2; // must be a power of 2
locator::tablet_map new_tablet_map{unimportant_init_tablet_count};
auto tables_with_mvs = ks.metadata()->tables();
auto views = ks.metadata()->views();
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
for (const auto& table_or_mv : tables_with_mvs) {
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
// Apply the transition only on base tables.
// If this table has a base table then the transition will be applied on the base table, and
// the base table will coordinate the transition for the entire group.
continue;
}
auto old_tablets = co_await tmptr->tablets().get_tablet_map(table_or_mv->id()).clone_gently();
locator::replication_strategy_params params{ks_md->strategy_options(), old_tablets.tablet_count(), ks.metadata()->consistency_option()};
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, tmptr->get_topology());
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, co_await old_tablets.clone_gently());
replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id());
co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> {
auto last_token = new_tablet_map.get_last_token(tablet_id);
updates.emplace_back(co_await make_canonical_mutation_gently(
replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id())
.set_new_replicas(last_token, tablet_info.replicas)
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
.build()
));
// Calculate abandoning replica and abort view building tasks on them
auto old_tablet_info = old_tablets.get_tablet_info(last_token);
auto abandoning_replicas = locator::substract_sets(old_tablet_info.replicas, tablet_info.replicas);
if (!abandoning_replicas.empty()) {
if (abandoning_replicas.size() != 1) {
on_internal_error(rtlogger, fmt::format("Keyspace RF abandons {} replicas for table {} and tablet id {}", abandoning_replicas.size(), table_or_mv->id(), tablet_id));
}
_vb_coordinator->abort_tasks(updates, guard, table_or_mv->id(), *abandoning_replicas.begin(), last_token);
}
co_await coroutine::maybe_yield();
});
}
auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp());
for (auto& m: schema_muts) {
updates.emplace_back(m);
}
} catch (const std::exception& e) {
error = e.what();
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, desired new ks opts: {}, error: {}",
saved_ks_props, std::current_exception());
updates.clear(); // remove all tablets mutations and only create mutations deleting the global req
}
} else {
error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist";
}
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.del_global_topology_request()
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.build()));
// Set start_time when we begin executing the request and mark as done
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.set("start_time", db_clock::now())
.done(error)
.build()));
sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props);
rtlogger.trace("do update {} reason {}", updates, reason);
mixed_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", utils::wait_for_message(30s));
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
}
break;
case global_topology_request::truncate_table: {
rtlogger.info("TRUNCATE TABLE requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::truncate_table)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.set_session(session_id(req_id));
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder rtbuilder(req_id);
rtbuilder.set("start_time", db_clock::now());
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "TRUNCATE TABLE requested");
}
break;
}
}
// Preconditions:
// - There are no topology operations in progress
// - `features_to_enable` represents a set of features that are currently
// marked as supported by all normal nodes and it is not empty
future<> enable_features(group0_guard guard, std::set<sstring> features_to_enable) {
if (!_topo_sm._topology.transition_nodes.empty()) {
on_internal_error(rtlogger,
"topology coordinator attempted to enable features even though there is"
" a topology operations in progress");
}
if (utils::get_local_injector().enter("raft_topology_suppress_enabling_features")) {
// Prevent enabling features while the injection is enabled.
// The topology coordinator will detect in the next iteration
// that there are still some cluster features to enable and will
// reach this place again. In order not to spin in a loop, sleep
// for a short while.
co_await sleep(std::chrono::milliseconds(100));
co_return;
}
// If we are here, then we noticed that all normal nodes support some
// features that are not enabled yet. Perform a global barrier to make
// sure that:
//
// 1. All normal nodes saw (and persisted) a view of the system.topology
// table that is equal to what the topology coordinator sees (or newer,
// but in that case updating the topology state will fail),
// 2. None of the normal nodes is restarting at the moment and trying to
// downgrade (this is done by a special check in the barrier handler).
//
// It's sufficient to only include normal nodes because:
//
// - There are no transitioning nodes due to the precondition,
// - New and left nodes are not part of group 0.
//
// After we get a successful confirmation from each normal node, we have
// a guarantee that they won't attempt to revoke support for those
// features. That's because we do not allow nodes to boot without
// a feature that is supported by all nodes in the cluster, even if
// the feature is not enabled yet.
guard = co_await exec_global_command(std::move(guard),
raft_topology_cmd{raft_topology_cmd::command::barrier},
{_raft.id()},
drop_guard_and_retake::no);
topology_mutation_builder builder(guard.write_timestamp());
builder.add_enabled_features(features_to_enable);
auto reason = ::format("enabling features: {}", features_to_enable);
co_await update_topology_state(std::move(guard), {builder.build()}, reason);
rtlogger.info("enabled features: {}", features_to_enable);
}
future<group0_guard> global_token_metadata_barrier(group0_guard&& guard, std::unordered_set<raft::server_id> exclude_nodes = {}, bool* fenced = nullptr) {
auto version = _topo_sm._topology.version;
bool drain_failed = false;
try {
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes);
} catch (...) {
rtlogger.warn("drain rpc failed, proceed to fence old writes: {}", std::current_exception());
drain_failed = true;
}
if (drain_failed) {
guard = co_await start_operation();
}
topology_mutation_builder builder(guard.write_timestamp());
// Other nodes are guaranteed to be drained on success only up to the version which was current
// prior to barrier_and_drain RPCs were sent.
builder.set_fence_version(version);
auto reason = ::format("advance fence version to {}", version);
co_await update_topology_state(std::move(guard), {builder.build()}, reason);
if (fenced) {
*fenced = true;
}
guard = co_await start_operation();
if (drain_failed) {
// if drain failed need to wait for fence to be active on all nodes
co_return co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier, exclude_nodes, drop_guard_and_retake::yes);
} else {
co_return std::move(guard);
}
}
future<group0_guard> global_tablet_token_metadata_barrier(group0_guard guard) {
// FIXME: Don't require all nodes to be up, only tablet replicas.
return global_token_metadata_barrier(std::move(guard), _topo_sm._topology.ignored_nodes);
}
// Represents a two-state state machine which changes monotonically
// from "not executed" to "executed successfully". This state
// machine is transient, lives only on this coordinator.
// The transition is achieved by execution of an idempotent async
// operation which is tracked by a future. Even though the async
// action is idempotent, it is costly, so we want to avoid
// re-executing it if it was already started by this coordinator,
// that's why we track it.
using background_action_holder = std::optional<future<>>;
// Transient state of tablet migration which lives on this coordinator.
// It is guaranteed to die when migration is finished.
// Next migration of the same tablet is guaranteed to use a different instance.
struct tablet_migration_state {
background_action_holder streaming;
background_action_holder rebuild_repair;
background_action_holder cleanup;
background_action_holder repair;
background_action_holder repair_update_compaction_ctrl;
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
service::session_id session_id;
};
std::unordered_map<locator::global_tablet_id, tablet_migration_state> _tablets;
// Set to true when any action started on behalf of a background_action_holder
// for any tablet finishes, or fails and needs to be restarted.
bool _tablets_ready = false;
seastar::named_gate _async_gate;
bool action_failed(background_action_holder& holder) const {
return holder && holder->failed();
}
// This function drives background_action_holder towards "executed successfully"
// by starting the action if it is not already running or if the previous instance
// of the action failed. If the action is already running, it does nothing.
// Returns true iff background_action_holder reached the "executed successfully" state.
bool advance_in_background(locator::global_tablet_id gid, background_action_holder& holder, const char* name,
std::function<future<>()> action) {
if (!holder || holder->failed()) {
if (action_failed(holder)) {
// Prevent warnings about abandoned failed future. Logged below.
holder->ignore_ready_future();
}
holder = futurize_invoke(action).then_wrapped([this, gid, name] (future<> f) {
if (f.failed()) {
auto ep = f.get_exception();
rtlogger.warn("{} for tablet {} failed: {}", name, gid, ep);
return seastar::sleep_abortable(std::chrono::seconds(1), _as).then([ep] () mutable {
std::rethrow_exception(ep);
});
}
return f;
}).finally([this, g = _async_gate.hold(), gid, name] () noexcept {
rtlogger.debug("{} for tablet {} resolved.", name, gid);
_tablets_ready = true;
_topo_sm.event.broadcast();
});
return false;
}
if (!holder->available()) {
rtlogger.debug("Tablet {} still doing {}", gid, name);
return false;
}
return true;
}
future<> for_each_tablet_group_transition(std::function<void(const locator::tablet_map&,
table_id,
const locator::table_group_set&,
locator::tablet_id,
const locator::tablet_transition_info&)> func) {
auto tm = get_token_metadata_ptr();
for (auto&& [base_table, tables] : tm->tablets().all_table_groups()) {
co_await coroutine::maybe_yield();
const auto& tmap = tm->tablets().get_tablet_map(base_table);
for (auto&& [tablet, trinfo]: tmap.transitions()) {
co_await coroutine::maybe_yield();
func(tmap, base_table, tables, tablet, trinfo);
}
}
}
bool is_excluded(raft::server_id server_id) const {
return _topo_sm._topology.excluded_tablet_nodes.contains(server_id);
}
void generate_migration_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const tablet_migration_info& mig) {
const auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
rtlogger.warn("Tablet already in transition, ignoring migration: {}", mig);
return;
}
auto migration_task_info = mig.kind == locator::tablet_transition_kind::migration ? locator::tablet_task_info::make_migration_request()
: locator::tablet_task_info::make_intranode_migration_request();
migration_task_info.sched_nr++;
migration_task_info.sched_time = db_clock::now();
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), mig.tablet.table)
.set_new_replicas(last_token, locator::get_new_replicas(tmap.get_tablet_info(mig.tablet.tablet), mig))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, mig.kind)
.set_migration_task_info(last_token, std::move(migration_task_info), _feature_service)
.build());
}
void generate_repair_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const locator::global_tablet_id& gid, db_clock::time_point sched_time) {
auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(gid.table);
auto last_token = tmap.get_last_token(gid.tablet);
if (tmap.get_tablet_transition_info(gid.tablet)) {
rtlogger.warn("Tablet already in transition, ignoring repair: {}", gid);
return;
}
auto& info = tmap.get_tablet_info(gid.tablet);
auto repair_task_info = info.repair_task_info;
if (!repair_task_info.is_user_repair_request()) {
repair_task_info = locator::tablet_task_info::make_auto_repair_request();
}
repair_task_info.sched_nr++;
repair_task_info.sched_time = db_clock::now();
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), gid.table)
.set_new_replicas(last_token, tmap.get_tablet_info(gid.tablet).replicas)
.set_stage(last_token, locator::tablet_transition_stage::repair)
.set_transition(last_token, locator::tablet_transition_kind::repair)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.set_session(last_token, session_id(utils::UUID_gen::get_time_UUID()))
.build());
}
void generate_resize_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, table_id table_id, locator::resize_decision resize_decision) {
// FIXME: indent.
auto s = _db.find_schema(table_id);
const auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
// Sequence number is monotonically increasing, globally. Therefore, it can be used to identify a decision.
resize_decision.sequence_number = tmap.resize_decision().next_sequence_number();
rtlogger.debug("Generating resize decision for table {} of type {} and sequence number {}",
table_id, resize_decision.type_name(), resize_decision.sequence_number);
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), table_id)
.set_resize_decision(std::move(resize_decision), _feature_service)
.build());
}
future<> generate_migration_updates(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const migration_plan& plan) {
if (plan.resize_plan().finalize_resize.empty() || plan.has_nodes_to_drain()) {
// schedule tablet migration only if there are no pending resize finalisations or if the node is draining.
for (const tablet_migration_info& mig : plan.migrations()) {
co_await coroutine::maybe_yield();
generate_migration_update(out, guard, mig);
}
}
auto sched_time = db_clock::now();
for (const auto& gid : plan.repair_plan().repairs()) {
co_await coroutine::maybe_yield();
generate_repair_update(out, guard, gid, sched_time);
}
for (auto [table_id, resize_decision] : plan.resize_plan().resize) {
co_await coroutine::maybe_yield();
generate_resize_update(out, guard, table_id, resize_decision);
}
}
// When "drain" is true, we migrate tablets only as long as there are nodes to drain
// and then change the transition state to write_both_read_old. Also, while draining,
// we ignore pending topology requests which normally interrupt load balancing.
// When "drain" is false, we do regular load balancing.
future<> handle_tablet_migration(group0_guard guard, bool drain) {
// This step acts like a pump which advances state machines of individual tablets,
// batching barriers and group0 updates.
// If progress cannot be made, e.g. because all transitions are streaming, we block
// and wait for notification.
rtlogger.debug("handle_tablet_migration()");
utils::chunked_vector<canonical_mutation> updates;
bool needs_barrier = false;
bool has_transitions = false;
shared_promise barrier;
auto fail_barrier = seastar::defer([&] {
if (needs_barrier) {
barrier.set_exception(seastar::broken_promise());
}
});
_tablets_ready = false;
// We operate here on groups of co-located tablets.
// The tablets of several tables may be co-located and share the same tablet map, and in
// particular their transitions are shared. Therefore, each transition must be handled for
// all tablets in a co-location group at once.
co_await for_each_tablet_group_transition([&] (const locator::tablet_map& tmap,
table_id base_table,
const locator::table_group_set& tables,
locator::tablet_id tid,
const locator::tablet_transition_info& trinfo) {
// we have `gid` which is the tablet id of the base table of the tablet group, which we use as a
// representative of the group for some of the operations - such as logging, and as the key in the _tablets
// map where we store the migration state.
// `gids` is all the tablet ids of tablets in the co-location group. When we execute some operation such as
// streaming, cleanup, etc, we do it for each tablet in `gids`.
locator::global_tablet_id gid { base_table, tid };
auto gids = tables | std::views::transform([tid] (table_id table) { return locator::global_tablet_id{table, tid}; }) | std::ranges::to<std::vector>();
has_transitions = true;
auto last_token = tmap.get_last_token(gid.tablet);
auto& tablet_state = _tablets[gid];
auto get_mutation_builder = [&] () {
return replica::tablet_mutation_builder(guard.write_timestamp(), base_table);
};
auto transition_to = [&] (locator::tablet_transition_stage stage) {
rtlogger.debug("Will set tablet {} stage to {}", gid, stage);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, stage)
.build());
};
auto do_barrier = [&] {
return advance_in_background(gid, tablet_state.barriers[trinfo.stage], "barrier", [&] {
needs_barrier = true;
return barrier.get_shared_future();
});
};
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
if (do_barrier()) {
transition_to(stage);
}
};
auto check_excluded_replicas = [&] {
auto tsi = get_migration_streaming_info(get_token_metadata().get_topology(), tmap.get_tablet_info(gid.tablet), trinfo);
for (auto r : tsi.read_from) {
if (is_excluded(raft::server_id(r.host.uuid()))) {
rtlogger.debug("Aborting streaming of {} because read-from {} is marked as ignored", gid, r);
return true;
}
}
for (auto r : tsi.written_to) {
if (is_excluded(raft::server_id(r.host.uuid()))) {
rtlogger.debug("Aborting streaming of {} because written-to {} is marked as ignored", gid, r);
return true;
}
}
return false;
};
switch (trinfo.stage) {
case locator::tablet_transition_stage::allow_write_both_read_old:
if (action_failed(tablet_state.barriers[trinfo.stage])) {
if (check_excluded_replicas()) {
transition_to_with_barrier(locator::tablet_transition_stage::cleanup_target);
break;
}
}
if (do_barrier()) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_old);
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (leaving_replica) {
_vb_coordinator->abort_tasks(updates, guard, gid.table, *leaving_replica, last_token);
}
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::write_both_read_old)
// Create session a bit earlier to avoid adding barrier
// to the streaming stage to create sessions on replicas.
.set_session(last_token, session_id(utils::UUID_gen::get_time_UUID()))
.build());
}
break;
case locator::tablet_transition_stage::write_both_read_old:
if (action_failed(tablet_state.barriers[trinfo.stage])) {
if (check_excluded_replicas()) {
transition_to_with_barrier(locator::tablet_transition_stage::cleanup_target);
break;
}
}
if (trinfo.transition == locator::tablet_transition_kind::rebuild_v2) {
transition_to_with_barrier(locator::tablet_transition_stage::rebuild_repair);
} else {
transition_to_with_barrier(locator::tablet_transition_stage::streaming);
}
break;
case locator::tablet_transition_stage::rebuild_repair: {
if (action_failed(tablet_state.rebuild_repair)) {
bool fail = utils::get_local_injector().enter("rebuild_repair_stage_fail");
if (fail || check_excluded_replicas()) {
if (do_barrier()) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::cleanup_target);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::cleanup_target)
.del_session(last_token)
.build());
}
break;
}
}
if (advance_in_background(gid, tablet_state.rebuild_repair, "rebuild_repair", [&] {
utils::get_local_injector().inject("rebuild_repair_stage_fail",
[] { throw std::runtime_error("rebuild_repair failed due to error injection"); });
if (!trinfo.pending_replica) {
rtlogger.info("Skipped tablet rebuild repair of {} as no pending replica found", gid);
return make_ready_future<>();
}
auto tsi = get_migration_streaming_info(get_token_metadata().get_topology(), tmap.get_tablet_info(gid.tablet), trinfo);
if (tsi.read_from.empty()) {
rtlogger.info("Skipped tablet rebuild repair of {} as no tablet replica was found", gid);
return make_ready_future<>();
}
auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()},
_db.get_token_metadata().get_topology(), [] (const auto& tr) { return true; }).value().host;
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
});
});
})) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::streaming);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::streaming)
.build());
}
}
break;
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished for earlier
// stage in case of coordinator failover.
case locator::tablet_transition_stage::streaming: {
if (drain) {
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
}
if (action_failed(tablet_state.streaming) || utils::get_local_injector().enter("stream_tablet_fail")) {
const bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
bool critical_disk_utilization = false;
if (auto stats = _tablet_allocator.get_load_stats()) {
const auto& util = stats->critical_disk_utilization;
auto it = util.find(trinfo.pending_replica->host);
critical_disk_utilization = (it != util.end() && it->second);
}
if (cleanup || check_excluded_replicas() || critical_disk_utilization) {
if (do_barrier()) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::cleanup_target);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::cleanup_target)
.del_session(last_token)
.build());
}
break;
}
}
bool wait = utils::get_local_injector().enter("stream_tablet_wait");
if (!wait && advance_in_background(gid, tablet_state.streaming, "streaming", [&] {
utils::get_local_injector().inject("stream_tablet_move_to_cleanup",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
if (!trinfo.pending_replica) {
rtlogger.info("Skipped tablet streaming ({}) of {} as no pending replica found", trinfo.transition, gid);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet streaming ({}) of {} to {}", trinfo.transition, gid, *trinfo.pending_replica);
auto dst = trinfo.pending_replica->host;
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
});
});
})) {
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::write_both_read_new);
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::write_both_read_new)
.del_session(last_token)
.build());
}
}
break;
case locator::tablet_transition_stage::write_both_read_new: {
utils::get_local_injector().inject("crash-in-tablet-write-both-read-new", [] {
rtlogger.info("crash-in-tablet-write-both-read-new hit, killing the node");
_exit(1);
});
auto next_stage = locator::tablet_transition_stage::use_new;
if (action_failed(tablet_state.barriers[trinfo.stage])) {
auto& tinfo = tmap.get_tablet_info(gid.tablet);
unsigned excluded_old = 0;
for (auto r : tinfo.replicas) {
if (is_excluded(raft::server_id(r.host.uuid()))) {
excluded_old++;
}
}
unsigned excluded_new = 0;
for (auto r : trinfo.next) {
if (is_excluded(raft::server_id(r.host.uuid()))) {
excluded_new++;
}
}
// Cannot revert if this is intra-node migration.
// We will lose data if the migrating node restarted, in which case the leaving replica
// doesn't contain writes anymore as sstables are attached only on the pending replica.
// Luckily, this we never reach this condition since excluded_new cannot be larger
// than excluded_old for intra-node migration.
if (excluded_new > excluded_old && trinfo.transition != locator::tablet_transition_kind::intranode_migration) {
rtlogger.debug("During {} stage of {} {} new nodes and {} old nodes were excluded", trinfo.stage, gid, excluded_new, excluded_old);
next_stage = locator::tablet_transition_stage::cleanup_target;
}
}
transition_to_with_barrier(next_stage);
}
break;
case locator::tablet_transition_stage::use_new:
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
break;
case locator::tablet_transition_stage::cleanup: {
bool wait = utils::get_local_injector().enter("cleanup_tablet_wait");
if (!wait && advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
auto maybe_dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (!maybe_dst) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas leaving", gid);
return make_ready_future<>();
}
locator::tablet_replica& dst = *maybe_dst;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
}).then([] {
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
rtlogger.info("Waiting after tablet cleanup");
return handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
});
});
})) {
transition_to(locator::tablet_transition_stage::end_migration);
}
}
break;
case locator::tablet_transition_stage::cleanup_target:
if (do_barrier()) {
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
if (!trinfo.pending_replica) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
return make_ready_future<>();
}
locator::tablet_replica dst = *trinfo.pending_replica;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
transition_to(locator::tablet_transition_stage::revert_migration);
}
}
break;
case locator::tablet_transition_stage::revert_migration:
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
// See do_tablet_operation() doc.
if (do_barrier()) {
_tablets.erase(gid);
updates.emplace_back(get_mutation_builder()
.del_transition(last_token)
.del_migration_task_info(last_token, _feature_service)
.build());
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (leaving_replica) {
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *leaving_replica, last_token);
}
}
break;
case locator::tablet_transition_stage::end_migration: {
// Update load_stats after a migration or rebuild
update_load_stats_on_end_migration(gid, tmap, trinfo);
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
// See do_tablet_operation() doc.
bool defer_transition = utils::get_local_injector().enter("handle_tablet_migration_end_migration");
if (!defer_transition && do_barrier()) {
_tablets.erase(gid);
updates.emplace_back(get_mutation_builder()
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.del_migration_task_info(last_token, _feature_service)
.build());
_vb_coordinator->generate_tablet_migration_updates(updates, guard, tmap, gid, trinfo);
}
}
break;
case locator::tablet_transition_stage::repair: {
bool fail_repair = utils::get_local_injector().enter("handle_tablet_migration_repair_fail");
if (fail_repair || action_failed(tablet_state.repair)) {
if (do_barrier()) {
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_session(last_token)
.build());
}
break;
}
if (advance_in_background(gid, tablet_state.repair, "repair", [&] () -> future<> {
auto& tinfo = tmap.get_tablet_info(gid.tablet);
bool valid = tinfo.repair_task_info.is_valid();
if (!valid) {
rtlogger.info("Skipping tablet repair for tablet={} which is cancelled by user", gid);
co_return;
}
auto* trinfo = tmap.get_tablet_transition_info(gid.tablet);
if (trinfo) {
tablet_state.session_id = trinfo->session_id;
}
auto sched_time = tinfo.repair_task_info.sched_time;
auto tablet = gid;
auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter;
auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter;
const auto& topo = _db.get_token_metadata().get_topology();
locator::host_id dst;
if (hosts_filter.empty() && dcs_filter.empty()) {
auto primary = tmap.get_primary_replica(gid.tablet, topo);
dst = primary.host;
} else {
auto dst_opt = tmap.maybe_get_selected_replica(gid.tablet, topo, tinfo.repair_task_info);
if (!dst_opt) {
co_return;
}
dst = dst_opt.value().host;
}
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
if (utils::get_local_injector().enter("delay_end_repair_update")) {
break;
}
auto& tinfo = tmap.get_tablet_info(gid.tablet);
bool valid = tinfo.repair_task_info.is_valid();
auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter;
auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter;
auto incremental = tinfo.repair_task_info.repair_incremental_mode != locator::tablet_repair_incremental_mode::disabled;
bool is_filter_off = hosts_filter.empty() && dcs_filter.empty();
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::end_repair);
auto update = get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token, _feature_service)
.del_session(last_token);
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;
auto time = tablet_state.repair_time;
update.set_repair_time(last_token, time);
auto repaired_at = sstring("None");
if (_feature_service.tablet_incremental_repair && incremental) {
auto sstables_repaired_at = tinfo.sstables_repaired_at + 1;
if (utils::get_local_injector().enter("repair_tablet_no_update_sstables_repair_at")) {
rtlogger.info("Skip update system.tablet ssstables_repaired_at={}", sstables_repaired_at);
} else {
update.set_sstables_repair_at(last_token, sstables_repaired_at);
repaired_at = seastar::format("{}", sstables_repaired_at);
}
}
rtlogger.debug("Set tablet repair time sched_time={} repair_time={} sstables_repaired_at={} last_token={}",
sched_time, time, repaired_at, last_token);
}
updates.emplace_back(update.build());
}
}
break;
case locator::tablet_transition_stage::end_repair: {
if (do_barrier()) {
if (action_failed(tablet_state.repair_update_compaction_ctrl)) {
rtlogger.warn("Failed to perform repair_update_compaction_ctrl for tablet repair tablet_id={}", gid);
_tablets.erase(gid);
updates.emplace_back(get_mutation_builder().del_transition(last_token).build());
break;
}
bool feature = _feature_service.tablet_incremental_repair;
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [ms = &_messaging,
gid = gid, sid = tablet_state.session_id, _replicas = tmap.get_tablet_info(gid.tablet).replicas, feature] () -> future<> {
if (feature) {
auto replicas = std::move(_replicas);
co_await coroutine::parallel_for_each(replicas, [replicas, ms, gid, sid] (locator::tablet_replica& r) -> future<> {
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
});
}
})) {
_tablets.erase(gid);
updates.emplace_back(get_mutation_builder().del_transition(last_token).build());
}
}
}
break;
}
});
// In order to keep the cluster saturated, ask the load balancer for more transitions.
// Unless there is a pending topology change operation.
bool preempt = false;
if (!drain) {
// When draining, this method is invoked with an active node transition, which
// would normally cause preemption, which we don't want here.
auto ts = guard.write_timestamp();
auto [new_preempt, new_guard] = should_preempt_balancing(std::move(guard));
preempt = new_preempt;
guard = std::move(new_guard);
if (ts != guard.write_timestamp()) {
// We rely on the fact that should_preempt_balancing() does not release the guard
// so that tablet metadata reading and updates are atomic.
on_internal_error(rtlogger, "should_preempt_balancing() retook the guard");
}
}
bool has_nodes_to_drain = false;
if (!preempt) {
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr(), {}, get_dead_nodes());
has_nodes_to_drain = plan.has_nodes_to_drain();
if (!drain || plan.has_nodes_to_drain()) {
co_await generate_migration_updates(updates, guard, plan);
}
}
// The updates have to be executed under the same guard which was used to read tablet metadata
// to ensure that we don't reinsert tablet rows which were concurrently deleted by schema change
// which happens outside the topology coordinator.
bool has_updates = !updates.empty();
if (has_updates) {
co_await utils::get_local_injector().inject("tablet_transition_updates", utils::wait_for_message(2min));
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration"));
}
if (needs_barrier) {
// If has_updates is true then we have dropped the guard and need to re-obtain it.
// It's fine to start an independent operation here. The barrier doesn't have to be executed
// atomically with the read which set needs_barrier, because it's fine if the global barrier
// works with a more recent set of nodes and it's fine if it propagates a more recent topology.
if (!guard) {
guard = co_await start_operation();
}
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
barrier.set_value();
fail_barrier.cancel();
co_return;
}
if (has_updates) {
co_return;
}
if (has_transitions) {
// Streaming may have finished after we checked. To avoid missed notification, we need
// to check atomically with event.wait()
if (!_tablets_ready) {
rtlogger.debug("Going to sleep with active tablet transitions");
release_guard(std::move(guard));
co_await await_event();
}
co_return;
}
if (drain) {
if (has_nodes_to_drain) {
// Prevent jumping to write_both_read_old with un-drained tablets.
// This can happen when all candidate nodes are down.
rtlogger.warn("Tablet draining stalled: No tablets migrating but there are nodes to drain");
release_guard(std::move(guard));
co_await sleep(3s); // Throttle retries
co_return;
}
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::write_both_read_old)
.set_session(session_id(guard.new_group0_state_id()))
.set_version(_topo_sm._topology.version + 1)
.build());
} else {
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.build());
}
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}
// Migrates tablet size from leaving to pending host after migration,
// or creates a new tablet size on pending host after a rebuild
void update_load_stats_on_end_migration(locator::global_tablet_id gid, const locator::tablet_map& tmap, const locator::tablet_transition_info& trinfo) {
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
lw_shared_ptr<locator::load_stats> new_load_stats;
auto& tinfo = tmap.get_tablet_info(gid.tablet);
auto leaving = locator::get_leaving_replica(tinfo, trinfo);
auto pending = trinfo.pending_replica;
const dht::token_range trange {tmap.get_token_range(gid.tablet)};
if (leaving && pending) {
// Handle tablet migration
new_load_stats = old_load_stats->migrate_tablet_size(leaving->host, pending->host, gid, trange);
} else if (!leaving && pending) {
// Handle rebuild: compute the average tablet size of existing replicas
new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
uint64_t tablet_size_sum = 0;
size_t replica_count = 0;
const locator::range_based_tablet_id rb_tid {gid.table, trange};
for (auto r : tinfo.replicas) {
auto tablet_size_opt = new_load_stats->get_tablet_size(r.host, rb_tid);
if (tablet_size_opt) {
tablet_size_sum += *tablet_size_opt;
replica_count++;
}
}
if (replica_count && new_load_stats->tablet_stats.contains(pending->host)) {
new_load_stats->tablet_stats.at(pending->host).tablet_sizes[gid.table][trange] = tablet_size_sum / replica_count;
}
}
if (new_load_stats) {
_tablet_allocator.set_load_stats(std::move(new_load_stats));
}
}
}
future<> handle_tablet_resize_finalization(group0_guard g) {
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
rtlogger.info("handle_tablet_resize_finalization: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
});
// Executes a global barrier to guarantee that any process (e.g. repair) holding stale version
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
co_await utils::get_local_injector().inject("tablet_resize_finalization_post_barrier", utils::wait_for_message(std::chrono::minutes(2)));
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());
utils::chunked_vector<canonical_mutation> updates;
updates.reserve(plan.resize_plan().finalize_resize.size() * 2 + 1);
for (auto& table_id : plan.resize_plan().finalize_resize) {
auto s = _db.find_schema(table_id);
auto new_tablet_map = co_await _tablet_allocator.resize_tablets(tm, table_id);
co_await replica::tablet_map_to_mutations(
new_tablet_map,
table_id,
s->ks_name(),
s->cf_name(),
guard.write_timestamp(),
_feature_service,
[&] (mutation m) -> future<> {
updates.emplace_back(co_await make_canonical_mutation_gently(m));
});
// Clears the resize decision for a table.
generate_resize_update(updates, guard, table_id, locator::resize_decision{});
_vb_coordinator->generate_tablet_resize_updates(updates, guard, table_id, tm->tablets().get_tablet_map(table_id), new_tablet_map);
for (auto table_id : tm->tablets().all_table_groups().at(table_id)) {
co_await _cdc_gens.generate_tablet_resize_update(updates, table_id, new_tablet_map, guard.write_timestamp());
}
const auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
auto old_cnt = tmap.tablet_count();
auto new_cnt = new_tablet_map.tablet_count();
if (old_cnt > new_cnt) {
std::unordered_set<locator::host_id> replicas;
for (auto& tinfo : tmap.tablets()) {
for (auto& r : tinfo.replicas) {
if (!is_excluded(raft::server_id(r.host.uuid()))) {
replicas.insert(r.host);
}
}
}
if (_feature_service.tablet_incremental_repair) {
rtlogger.info("Send rpc verb repair_update_repaired_at_for_merge table={} replicas={} old_cnt={} new_cnt={}", table_id, replicas, old_cnt, new_cnt);
if (utils::get_local_injector().enter("handle_tablet_resize_finalization_for_merge_error")) {
rtlogger.info("Got handle_tablet_resize_finalization_for_merge_error old_cnt={} new_cnt={}", old_cnt, new_cnt);
co_await sleep_abortable(std::chrono::minutes(1), _as);
}
co_await coroutine::parallel_for_each(replicas, [ms = &_messaging, table_id] (const locator::host_id& h) -> future<> {
co_await ser::repair_rpc_verbs::send_repair_update_repaired_at_for_merge(ms, h, table_id);
});
}
}
}
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
guard = co_await start_operation();
auto new_tm = get_token_metadata_ptr();
auto reconciled_stats = old_load_stats->reconcile_tablets_resize(plan.resize_plan().finalize_resize, *tm, *new_tm);
if (reconciled_stats) {
_tablet_allocator.set_load_stats(reconciled_stats);
}
}
}
future<> handle_truncate_table(group0_guard guard) {
// Execute a barrier to make sure the nodes we are performing truncate on see the session
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
const utils::UUID& global_request_id = _topo_sm._topology.global_request_id.value();
std::optional<sstring> error;
// We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate
// handler performed the truncate and cleared the session, but crashed before finalizing the request
if (_topo_sm._topology.session) {
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true);
const table_id& table_id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
if (table) {
const sstring& ks_name = table->schema()->ks_name();
const sstring& cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
std::unordered_set<locator::host_id> replica_hosts;
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
}
return make_ready_future<>();
});
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
release_guard(std::move(guard));
co_await utils::get_local_injector().inject("truncate_table_wait", utils::wait_for_message(std::chrono::minutes(2)));
// Check if all the nodes with replicas are alive
for (const locator::host_id& replica_host: replica_hosts) {
if (!_gossiper.is_alive(replica_host)) {
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
}
}
// Send the RPC to all replicas
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
});
} else {
error = ::format("Cannot TRUNCATE table with UUID {} because it does not exist.", table_id);
}
// Clear the session and save the error message
while (true) {
if (!guard) {
guard = co_await start_operation();
}
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(topology_mutation_builder(guard.write_timestamp())
.del_session()
.build());
if (error) {
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
.set("error", *error)
.build());
}
try {
co_await update_topology_state(std::move(guard), std::move(updates), "Clear truncate session");
break;
} catch (group0_concurrent_modification&) {
}
}
}
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
_exit(1);
});
// Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point
if (!guard) {
guard = co_await start_operation();
}
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
// Finalize the request
while (true) {
if (!guard) {
guard = co_await start_operation();
}
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(topology_mutation_builder(guard.write_timestamp())
.del_transition_state()
.del_global_topology_request()
.del_global_topology_request_id()
.build());
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
.done()
.build());
try {
co_await update_topology_state(std::move(guard), std::move(updates), "Truncate has completed");
break;
} catch (group0_concurrent_modification&) {
}
}
}
// This function must not release and reacquire the guard, callers rely
// on the fact that the block which calls this is atomic.
// FIXME: Don't take the ownership of the guard to make the above guarantee explicit.
std::pair<bool, group0_guard> should_preempt_balancing(group0_guard guard) {
auto work = get_next_task(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&work)) {
return std::make_pair(true, std::move(node->guard));
}
if (auto* cancel = std::get_if<cancel_requests>(&work)) {
// request queue needs to be canceled, so preempt balancing
return std::make_pair(true, std::move(cancel->guard));
}
if (auto* cleanup = std::get_if<start_vnodes_cleanup>(&work)) {
// cleanup has to be started
return std::make_pair(true, std::move(cleanup->guard));
}
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
return std::make_pair(true, std::move(guard));
}
if (!_topo_sm._topology.calculate_not_yet_enabled_features().empty()) {
return std::make_pair(true, std::move(guard));
}
return std::make_pair(false, std::move(guard));
}
void cleanup_ignored_nodes_on_left(topology_mutation_builder& builder, raft::server_id id) {
if (_topo_sm._topology.ignored_nodes.contains(id)) {
auto l = _topo_sm._topology.ignored_nodes;
l.erase(id);
builder.set_ignored_nodes(l);
}
}
void trigger_load_stats_refresh() {
(void)_tablet_load_stats_refresh.trigger().handle_exception([] (auto ep) {
rtlogger.warn("Error during tablet load stats refresh: {}", ep);
});
}
future<> cancel_all_requests(group0_guard guard, std::unordered_set<raft::server_id> dead_nodes) {
utils::chunked_vector<canonical_mutation> muts;
std::vector<raft::server_id> reject_join;
if (_topo_sm._topology.requests.empty()) {
co_return;
}
auto ts = guard.write_timestamp();
for (auto& [id, req] : _topo_sm._topology.requests) {
topology_mutation_builder builder(ts);
topology_request_tracking_mutation_builder rtbuilder(_topo_sm._topology.find(id)->second.request_id);
auto node_builder = builder.with_node(id).del("topology_request");
auto done_msg = fmt::format("Canceled. Dead nodes: {}", dead_nodes);
rtbuilder.done(done_msg);
switch (req) {
case topology_request::replace:
[[fallthrough]];
case topology_request::join: {
node_builder.set("node_state", node_state::left);
reject_join.emplace_back(id);
try {
co_await wait_for_gossiper(id, _gossiper, _as);
} catch (...) {
rtlogger.warn("wait_for_ip failed during cancellation: {}", std::current_exception());
}
}
break;
case topology_request::leave:
[[fallthrough]];
case topology_request::rebuild:
[[fallthrough]];
case topology_request::remove: {
}
break;
}
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
}
co_await update_topology_state(std::move(guard), std::move(muts), "cancel all topology requests");
for (auto id : reject_join) {
try {
co_await respond_to_joining_node(id, join_node_response_params{
.response = join_node_response_params::rejected{
.reason = "request canceled because some required nodes are dead"
},
});
} catch (...) {
rtlogger.warn("attempt to send rejection response to {} failed: {}. "
"The node may hang. It's safe to shut it down manually now.",
id, std::current_exception());
}
}
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
if (!tstate) {
// When adding a new source of work, make sure to update should_preempt_balancing() as well.
auto work = get_next_task(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&work)) {
co_await handle_node_transition(std::move(*node));
co_return true;
}
if (auto* cancel = std::get_if<cancel_requests>(&work)) {
co_await cancel_all_requests(std::move(cancel->guard), std::move(cancel->dead_nodes));
co_return true;
}
if (auto* cleanup = std::get_if<start_vnodes_cleanup>(&work)) {
co_await start_vnodes_cleanup_on_dirty_nodes(std::move(cleanup->guard),
std::pair(cleanup->request, cleanup->request_server_id));
co_return true;
}
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request || !_topo_sm._topology.global_requests_queue.empty()) {
co_await handle_global_request(std::move(guard));
co_return true;
}
if (auto feats = _topo_sm._topology.calculate_not_yet_enabled_features(); !feats.empty()) {
co_await enable_features(std::move(guard), std::move(feats));
co_return true;
}
if (auto guard_opt = co_await maybe_migrate_system_tables(std::move(guard)); !guard_opt) {
// The guard is consumed, it means we did migration
co_return true;
} else {
guard = std::move(*guard_opt);
}
// If there is no other work, evaluate load and start tablet migration if there is imbalance.
if (co_await maybe_start_tablet_migration(std::move(guard))) {
co_return true;
}
co_return false;
}
rtlogger.info("entered `{}` transition state", *tstate);
switch (*tstate) {
case topology::transition_state::join_group0: {
auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard)));
// If responding to the joining node failed, move the node to the left state and
// stop the topology transition.
if (!accepted) {
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
rtbuilder.done("join is not accepted");
auto reason = ::format("bootstrap: failed to accept {}", node.id);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
rtlogger.info("node {} moved to left state", node.id);
break;
}
switch (node.rs->state) {
case node_state::bootstrapping: {
SCYLLA_ASSERT(!node.rs->ring);
auto num_tokens = std::get<join_param>(node.req_param.value()).num_tokens;
auto tokens_string = std::get<join_param>(node.req_param.value()).tokens_string;
auto guard = take_guard(std::move(node));
topology_mutation_builder builder(guard.write_timestamp());
// A node has just been accepted. If it is a zero-token node, we can
// instantly move its state to normal. Otherwise, we need to assign
// random tokens and prepare a new CDC generation.
if (num_tokens == 0 && tokens_string.empty()) {
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::normal);
rtbuilder.done();
auto reason = ::format("bootstrap: joined a zero-token node {}", node.id);
co_await _voter_handler.on_node_added(node.id, _as);
utils::chunked_vector<canonical_mutation> updates{builder.build(), rtbuilder.build()};
co_await mark_view_build_statuses_on_node_join(updates, guard, node.id);
co_await update_topology_state(std::move(guard), std::move(updates), reason);
break;
}
auto tmptr = get_token_metadata_ptr();
std::unordered_set<token> bootstrap_tokens;
try {
bootstrap_tokens = dht::boot_strapper::get_bootstrap_tokens(tmptr, tokens_string, num_tokens, dht::check_token_endpoint::yes);
} catch (...) {
_rollback = fmt::format("Failed to assign tokens: {}", std::current_exception());
break;
}
auto [gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(
tmptr, std::move(guard), bootstrapping_info{bootstrap_tokens, *node.rs});
// Write the new CDC generation data through raft.
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid)
.with_node(node.id)
.set("tokens", bootstrap_tokens);
auto reason = ::format(
"bootstrap: insert tokens and CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard_), {std::move(mutation), builder.build()}, reason);
co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", utils::wait_for_message(5min));
}
break;
case node_state::replacing: {
SCYLLA_ASSERT(!node.rs->ring);
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
SCYLLA_ASSERT(it->second.ring && it->second.state == node_state::normal);
topology_mutation_builder builder(node.guard.write_timestamp());
// If a zero-token node is replacing another zero-token node,
// we can instantly move the new node's state to normal.
if (it->second.ring->tokens.empty()) {
node = retake_node(co_await remove_from_group0(std::move(node.guard), replaced_id), node.id);
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::normal);
// Move the old node to the left state.
topology_mutation_builder builder2(node.guard.write_timestamp());
cleanup_ignored_nodes_on_left(builder2, replaced_id);
builder2.with_node(replaced_id)
.set("node_state", node_state::left);
rtbuilder.done();
co_await _voter_handler.on_node_added(node.id, _as);
utils::chunked_vector<canonical_mutation> updates{builder.build(), builder2.build(), rtbuilder.build()};
co_await mark_view_build_statuses_on_node_join(updates, node.guard, node.id);
co_await remove_view_build_statuses_on_left_node(updates, node.guard, replaced_id);
co_await update_topology_state(take_guard(std::move(node)), std::move(updates),
fmt::format("replace: replaced node {} with the new zero-token node {}", replaced_id, node.id));
break;
}
guard = take_guard(std::move(node));
builder.set_transition_state(topology::transition_state::write_both_read_old)
.set_version(_topo_sm._topology.version + 1)
.set_session(session_id(guard.new_group0_state_id()))
.with_node(node.id)
.set("tokens", it->second.ring->tokens);
co_await update_topology_state(std::move(guard), {builder.build()},
"replace: transition to write_both_read_old and take ownership of the replaced node's tokens");
}
break;
default:
on_internal_error(rtlogger,
format("topology is in join_group0 state, but the node"
" being worked on ({}) is in unexpected state '{}'; should be"
" either 'bootstrapping' or 'replacing'", node.id, node.rs->state));
}
}
break;
case topology::transition_state::commit_cdc_generation: {
// make sure all nodes know about new topology and have the new CDC generation data
// (we require all nodes to be alive for topo change for now)
// Note: if there was a replace or removenode going on, we'd need to put the replaced/removed
// node into `exclude_nodes` parameter in `exec_global_command`, but CDC generations are never
// introduced during replace/remove.
topology_mutation_builder builder(guard.write_timestamp());
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
builder.del_global_topology_request();
builder.del_global_topology_request_id();
builder.del_transition_state();
}
try {
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier, {_raft.id()});
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("transition_state::commit_cdc_generation, "
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
_rollback = fmt::format("Failed to commit cdc generation: {}", std::current_exception());
}
if (_rollback) {
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation && _feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
// if this is global command fail it since there is nothing to rollback
rtbuilder.done(*_rollback);
_rollback.reset();
co_await update_topology_state(std::move(guard), {builder.build(), rtbuilder.build()}, "committed new CDC generation command failed");
}
break;
}
// We don't need to add delay to the generation timestamp if this is the first generation.
bool add_ts_delay = !_topo_sm._topology.committed_cdc_generations.empty();
// Begin the race.
// See the large FIXME below.
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay);
auto cdc_gen_uuid = _topo_sm._topology.new_cdc_generation_data_uuid;
if (!cdc_gen_uuid) {
on_internal_error(rtlogger,
"new CDC generation data UUID missing in `commit_cdc_generation` state");
}
cdc::generation_id_v2 cdc_gen_id {
.ts = cdc_gen_ts,
.id = *cdc_gen_uuid,
};
if (!_topo_sm._topology.committed_cdc_generations.empty()) {
// Sanity check.
// This could happen if the topology coordinator's clock is broken.
auto last_committed_gen_id = _topo_sm._topology.committed_cdc_generations.back();
if (last_committed_gen_id.ts >= cdc_gen_ts) {
on_internal_error(rtlogger, ::format(
"new CDC generation has smaller timestamp than the previous one."
" Old generation ID: {}, new generation ID: {}", last_committed_gen_id, cdc_gen_id));
}
}
// Tell all nodes to start using the new CDC generation by updating the topology
// with the generation's ID and timestamp.
// At the same time move the topology change procedure to the next step.
//
// FIXME: as in previous implementation with gossiper and ring_delay, this assumes that all nodes
// will learn about the new CDC generation before their clocks reach the generation's timestamp.
// With this group 0 based implementation, it means that the command must be committed,
// replicated and applied on all nodes before their clocks reach the generation's timestamp
// (i.e. within 2 * ring_delay = 60 seconds by default if clocks are synchronized). If this
// doesn't hold some coordinators might use the wrong CDC streams for some time and CDC stream
// readers will miss some data. It's likely that Raft replication doesn't converge as quickly
// as gossiping does.
//
// We could use a two-phase algorithm instead: first tell all nodes to prepare for using
// the new generation, then tell all nodes to commit. If some nodes don't manage to prepare
// in time, we abort the generation switch. If all nodes prepare, we commit. If a node prepares
// but doesn't receive a commit in time, it stops coordinating CDC-enabled writes until it
// receives a commit or abort. This solution does not have a safety problem like the one
// above, but it has an availability problem when nodes get disconnected from group 0 majority
// in the middle of a CDC generation switch (when they are prepared to switch but not
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
// majority and commit.
utils::chunked_vector<canonical_mutation> updates;
builder.add_new_committed_cdc_generation(cdc_gen_id);
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
if (_feature_service.topology_global_request_queue) {
topology_request_tracking_mutation_builder rtbuilder(*_topo_sm._topology.global_request_id);
rtbuilder.done();
updates.push_back(rtbuilder.build());
}
} else {
builder.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_session(session_id(guard.new_group0_state_id()));
builder.set_version(_topo_sm._topology.version + 1);
}
updates.push_back(builder.build());
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
co_await update_topology_state(std::move(guard), std::move(updates), std::move(str));
}
break;
case topology::transition_state::tablet_draining:
co_await utils::get_local_injector().inject("suspend_decommission", utils::wait_for_message(1min));
try {
co_await handle_tablet_migration(std::move(guard), true);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception());
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
}
break;
case topology::transition_state::write_both_read_old: {
auto node = get_node_to_work_on(std::move(guard));
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_old, "
"global_token_metadata_barrier failed, error {}",
std::current_exception());
_rollback = fmt::format("global_token_metadata_barrier failed in write_both_read_old state {}", std::current_exception());
break;
}
if (_group0.is_member(node.id, true)) {
// If we remove a node, we make it a non-voter early to improve availability in some situations.
// There is no downside to it because the removed node is already considered dead by us.
//
// FIXME: removenode may be aborted and the already dead node can be resurrected. We should consider
// restoring its voter state on the recovery path.
if (node.rs->state == node_state::removing && !_feature_service.group0_limited_voters) {
co_await _voter_handler.on_node_removed(node.id, _as);
}
}
if (node.rs->state == node_state::replacing && !_feature_service.group0_limited_voters) {
// We make a replaced node a non-voter early, just like a removed node.
auto replaced_node_id = parse_replaced_node(node.req_param);
if (_group0.is_member(replaced_node_id, true)) {
co_await _voter_handler.on_node_removed(replaced_node_id, _as);
}
}
utils::get_local_injector().inject("crash_coordinator_before_stream", [] { abort(); });
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
auto state = node.rs->state;
try {
// No need to stream data when the node is zero-token.
if (!node.rs->ring->tokens.empty()) {
if (node.rs->state == node_state::removing) {
// tell all token owners to stream data of the removed node to new range owners
auto exclude_nodes = get_excluded_nodes_for_topology_request(node);
auto normal_zero_token_nodes = _topo_sm._topology.get_normal_zero_token_nodes();
std::move(normal_zero_token_nodes.begin(), normal_zero_token_nodes.end(),
std::inserter(exclude_nodes, exclude_nodes.begin()));
node = retake_node(
co_await exec_global_command(take_guard(std::move(node)), cmd, exclude_nodes), node.id);
} else {
// Tell joining/leaving/replacing node to stream its ranges
node = co_await exec_direct_command(std::move(node), cmd);
}
}
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is {}): {}", state, std::current_exception());
_rollback = fmt::format("Failed stream ranges: {}", std::current_exception());
break;
}
co_await utils::get_local_injector().inject("topology_coordinator/write_both_read_old/before_version_increment",
utils::wait_for_message(std::chrono::minutes(5)));
// Streaming completed. We can now move tokens state to topology::transition_state::write_both_read_new
topology_mutation_builder builder(node.guard.write_timestamp());
builder
.set_transition_state(topology::transition_state::write_both_read_new)
.del_session()
.set_version(_topo_sm._topology.version + 1);
auto str = ::format("{}: streaming completed for node {}", node.rs->state, node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
case topology::transition_state::write_both_read_new: {
while (utils::get_local_injector().enter("topology_coordinator_pause_after_streaming")) {
co_await sleep_abortable(std::chrono::milliseconds(10), _as);
}
auto node = get_node_to_work_on(std::move(guard));
bool barrier_failed = false;
// In this state writes goes to old and new replicas but reads start to be done from new replicas
// Before we stop writing to old replicas we need to wait for all previous reads to complete
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_new, "
"global_token_metadata_barrier failed, error {}",
std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfenced reads from old replicas.
// Lets wait for the ring delay for those writes to complete or fence to propagate
// before continuing.
// FIXME: nodes that cannot be reached need to be isolated either automatically or
// by an administrator
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
co_await utils::get_local_injector().inject("topology_coordinator/write_both_read_new/after_barrier",
utils::wait_for_message(std::chrono::minutes(5)));
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
switch(node.rs->state) {
case node_state::bootstrapping: {
co_await utils::get_local_injector().inject("delay_node_bootstrap", utils::wait_for_message(std::chrono::minutes(5)));
utils::chunked_vector<canonical_mutation> muts;
// Since after bootstrapping a new node some nodes lost some ranges they need to cleanup
muts = mark_nodes_as_cleanup_needed(node, false);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::normal);
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
co_await _voter_handler.on_node_added(node.id, _as);
co_await mark_view_build_statuses_on_node_join(muts, node.guard, node.id);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), "bootstrap: read fence completed");
trigger_load_stats_refresh();
}
break;
case node_state::removing: {
co_await utils::get_local_injector().inject("delay_node_removal", utils::wait_for_message(std::chrono::minutes(5)));
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
}
[[fallthrough]];
case node_state::decommissioning: {
topology_mutation_builder builder(node.guard.write_timestamp());
node_state next_state;
utils::chunked_vector<canonical_mutation> muts;
muts.reserve(2);
if (node.rs->state == node_state::decommissioning) {
next_state = node.rs->state;
builder.set_transition_state(topology::transition_state::left_token_ring);
} else {
next_state = node_state::left;
builder.del_transition_state();
cleanup_ignored_nodes_on_left(builder, node.id);
muts.push_back(rtbuilder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.del("tokens")
.set("node_state", next_state);
auto str = ::format("{}: read fence completed", node.rs->state);
muts.push_back(builder.build());
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
}
break;
case node_state::replacing: {
auto replaced_node_id = parse_replaced_node(node.req_param);
node = retake_node(co_await remove_from_group0(std::move(node.guard), replaced_node_id), node.id);
utils::chunked_vector<canonical_mutation> muts;
topology_mutation_builder builder1(node.guard.write_timestamp());
// Move new node to 'normal'
builder1.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::normal);
muts.push_back(builder1.build());
// Move old node to 'left'
topology_mutation_builder builder2(node.guard.write_timestamp());
cleanup_ignored_nodes_on_left(builder2, replaced_node_id);
builder2.with_node(replaced_node_id)
.del("tokens")
.set("node_state", node_state::left);
muts.push_back(builder2.build());
muts.push_back(rtbuilder.build());
co_await mark_view_build_statuses_on_node_join(muts, node.guard, node.id);
co_await remove_view_build_statuses_on_left_node(muts, node.guard, replaced_node_id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(replaced_node_id.uuid()), muts);
co_await _voter_handler.on_node_added(node.id, _as);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts),
"replace: read fence completed");
trigger_load_stats_refresh();
}
break;
default:
on_fatal_internal_error(rtlogger, ::format(
"Ring state on node {} is write_both_read_new while the node is in state {}",
node.id, node.rs->state));
}
// Reads are fenced. We can now remove topology::transition_state and move node state to normal
}
break;
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard), false);
break;
case topology::transition_state::tablet_split_finalization:
[[fallthrough]];
case topology::transition_state::tablet_resize_finalization:
co_await handle_tablet_resize_finalization(std::move(guard));
break;
case topology::transition_state::lock:
release_guard(std::move(guard));
co_await await_event();
break;
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
utils::get_local_injector().inject("finish_left_token_ring_transition_throw", [] {
throw std::runtime_error("finish_left_token_ring_transition failed due to error injection");
});
utils::chunked_vector<canonical_mutation> muts;
topology_mutation_builder builder(node.guard.write_timestamp());
cleanup_ignored_nodes_on_left(builder, node.id);
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
muts.push_back(builder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
};
// Denotes the case when this path is already executed before and first topology state update was successful.
// So we can skip all steps and perform the second topology state update operation (which modifies node state to left)
// and remove node conditionally from group0.
if (auto [done, error] = co_await _sys_ks.get_topology_request_state(node.rs->request_id, false); done) {
co_await finish_left_token_ring_transition(node);
break;
}
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
co_await step_down_as_nonvoter();
// Note: if we restart after this point and become a voter
// and then a coordinator again, it's fine - we'll just repeat this step.
// (If we're in `left` state when we try to restart we won't
// be able to become a voter - we'll be banned from the cluster.)
}
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("transition_state::left_token_ring, "
"raft_topology_cmd::command::barrier failed, error {}",
std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
// Make decommissioning node a non voter before reporting operation completion below.
// Otherwise the decommissioned node may see the completion and exit before it is removed from
// the config at which point the removal from the config will hang if the cluster had only two
// nodes before the decommission.
co_await _voter_handler.on_node_removed(node.id, _as);
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
// a member of group0, hence we use a separate direct RPC in this state to shut it down.
//
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);
}
co_await finish_left_token_ring_transition(node);
}
break;
case topology::transition_state::rollback_to_normal: {
auto node = get_node_to_work_on(std::move(guard));
// The barrier waits for all double writes started during the operation to complete. It allowed to fail
// since we will fence the requests later.
bool barrier_failed = false;
auto state = node.rs->state;
try {
node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes_for_topology_request(node), drop_guard_and_retake::yes);
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.warn("failed to run barrier_and_drain during rollback of {} after {} failure: {}",
node.id, state, std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
node.guard = co_await start_operation();
}
node = retake_node(std::move(node.guard), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed
.set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions
.with_node(node.id)
.set("node_state", node_state::normal);
rtbuilder.done();
auto str = fmt::format("complete rollback of {} to state normal after {} failure", node.id, node.rs->state);
co_await _voter_handler.on_node_added(node.id, _as);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str);
}
break;
case topology::transition_state::truncate_table:
co_await handle_truncate_table(std::move(guard));
break;
}
co_return true;
};
// Called when there is no ongoing topology transition.
// Used to start new topology transitions using node requests or perform node operations
// that don't change the topology (like rebuild).
future<> handle_node_transition(node_to_work_on&& node) {
rtlogger.info("coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state);
switch (node.rs->state) {
case node_state::none: {
if (_topo_sm._topology.normal_nodes.empty()) {
rtlogger.info("skipping join node handshake for the first node in the cluster");
} else {
auto validation_result = validate_joining_node(node);
if (utils::get_local_injector().enter("handle_node_transition_drop_expiring")) {
_gossiper.get_mutable_address_map().force_drop_expiring_entries();
}
// When the validation succeeded, it's important that all nodes in the
// cluster are aware of the IP address of the new node before we proceed to
// the topology::transition_state::join_group0 state, since in this state
// node IPs are already used to populate pending nodes in erm.
// This applies both to new and replacing nodes.
// If the wait_for_ip is unsuccessful, we should inform the new
// node about this failure.
// If the validation doesn't pass, we only need to call wait_for_ip on the current node,
// so that we can communicate the failure of the join request directly to
// the joining node.
{
std::exception_ptr wait_for_ip_error;
try {
if (holds_alternative<join_node_response_params::rejected>(validation_result)) {
release_guard(std::move(node.guard));
co_await wait_for_gossiper(node.id, _gossiper, _as);
node.guard = co_await start_operation();
} else {
auto exclude_nodes = get_excluded_nodes_for_topology_request(node);
exclude_nodes.insert(node.id);
node.guard = co_await exec_global_command(std::move(node.guard),
raft_topology_cmd::command::wait_for_ip,
exclude_nodes);
}
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
wait_for_ip_error = std::current_exception();
rtlogger.warn("raft_topology_cmd::command::wait_for_ip failed, error {}",
wait_for_ip_error);
}
if (wait_for_ip_error) {
node.guard = co_await start_operation();
}
node = retake_node(std::move(node.guard), node.id);
if (wait_for_ip_error && holds_alternative<join_node_response_params::accepted>(validation_result)) {
validation_result = join_node_response_params::rejected {
.reason = ::format("wait_for_ip failed, error {}", wait_for_ip_error)
};
}
}
if (std::holds_alternative<join_node_response_params::rejected>(validation_result)) {
// Transition to left
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.with_node(node.id)
.del("topology_request")
.set("node_state", node_state::left);
rtbuilder.done("Join is rejected during validation");
auto reason = ::format("bootstrap: node rejected");
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
rtlogger.info("rejected node moved to left state {}", node.id);
try {
co_await respond_to_joining_node(node.id, join_node_response_params{
.response = std::move(validation_result),
});
} catch (const std::runtime_error& e) {
rtlogger.warn("attempt to send rejection response to {} failed. "
"The node may hang. It's safe to shut it down manually now. Error: {}",
node.id, e.what());
}
break;
}
}
}
[[fallthrough]];
case node_state::normal: {
// if the state is none there have to be either 'join' or 'replace' request
// if the state is normal there have to be either 'leave', 'remove' or 'rebuild' request
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.set("start_time", db_clock::now());
switch (node.request.value()) {
case topology_request::join: {
SCYLLA_ASSERT(!node.rs->ring);
// Write chosen tokens through raft.
builder.set_transition_state(topology::transition_state::join_group0)
.with_node(node.id)
.set("node_state", node_state::bootstrapping)
.del("topology_request");
auto reason = ::format("bootstrap: accept node");
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, reason);
break;
}
case topology_request::leave:
SCYLLA_ASSERT(node.rs->ring);
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
// meaning that reads will go to the replica being decommissioned
// but writes will go to new owner as well
builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::decommissioning)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start decommission");
break;
case topology_request::remove: {
SCYLLA_ASSERT(node.rs->ring);
builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::removing)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start removenode");
break;
}
case topology_request::replace: {
SCYLLA_ASSERT(!node.rs->ring);
builder.set_transition_state(topology::transition_state::join_group0)
.with_node(node.id)
.set("node_state", node_state::replacing)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"replace: accept node");
break;
}
case topology_request::rebuild: {
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(node.id)
.set("node_state", node_state::rebuilding)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()},
"start rebuilding");
break;
}
}
break;
}
case node_state::rebuilding: {
bool retake = false;
auto id = node.id;
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::stream_ranges);
rtbuilder.done();
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is rebuilding): {}", std::current_exception());
rtbuilder.done("streaming failed");
retake = true;
}
if (retake) {
node = retake_node(co_await start_operation(), id);
}
builder.del_session().with_node(node.id)
.set("node_state", node_state::normal)
.del("rebuild_option");
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
}
break;
case node_state::bootstrapping:
case node_state::decommissioning:
case node_state::removing:
case node_state::replacing:
// Should not get here
on_fatal_internal_error(rtlogger, ::format(
"Found node {} in state {} but there is no ongoing topology transition",
node.id, node.rs->state));
case node_state::left:
// Should not get here
on_fatal_internal_error(rtlogger, ::format(
"Topology coordinator is called for node {} in state 'left'", node.id));
break;
}
}
std::variant<join_node_response_params::accepted, join_node_response_params::rejected>
validate_joining_node(const node_to_work_on& node) {
if (*node.request == topology_request::replace) {
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
if (!_topo_sm._topology.normal_nodes.contains(replaced_id)) {
return join_node_response_params::rejected {
.reason = ::format("Cannot replace node {} because it is not in the 'normal' state", replaced_id),
};
}
}
std::vector<sstring> unsupported_features;
const auto& supported_features = node.rs->supported_features;
std::ranges::set_difference(node.topology->enabled_features, supported_features, std::back_inserter(unsupported_features));
if (!unsupported_features.empty()) {
rtlogger.warn("node {} does not understand some features: {}", node.id, unsupported_features);
return join_node_response_params::rejected{
.reason = seastar::format("Feature check failed. The node does not support some features that are enabled by the cluster: {}",
unsupported_features),
};
}
return join_node_response_params::accepted {};
}
// Tries to finish accepting the joining node by updating the cluster
// configuration and sending the acceptance response.
//
// Returns the retaken node and information on whether responding to the
// join request succeeded.
future<std::tuple<node_to_work_on, bool>> finish_accepting_node(node_to_work_on&& node) {
if (_topo_sm._topology.normal_nodes.empty()) {
// This is the first node, it joins without the handshake.
co_return std::tuple{std::move(node), true};
}
auto id = node.id;
SCYLLA_ASSERT(!_topo_sm._topology.transition_nodes.empty());
release_node(std::move(node));
if (!_raft.get_configuration().contains(id)) {
co_await _raft.modify_config({raft::config_member({id, {}}, {})}, {}, &_as);
}
auto responded = false;
try {
co_await respond_to_joining_node(id, join_node_response_params{
.response = join_node_response_params::accepted{},
});
responded = true;
} catch (const std::runtime_error& e) {
rtlogger.warn("attempt to send acceptance response to {} failed. "
"The node may hang. It's safe to shut it down manually now. Error: {}",
node.id, e.what());
}
co_return std::tuple{retake_node(co_await start_operation(), id), responded};
}
future<> respond_to_joining_node(raft::server_id id, join_node_response_params&& params) {
co_await ser::join_node_rpc_verbs::send_join_node_response(
&_messaging, to_host_id(id), id,
std::move(params)
);
}
utils::chunked_vector<canonical_mutation> mark_nodes_as_cleanup_needed(node_to_work_on& node, bool rollback) {
auto& topo = _topo_sm._topology;
utils::chunked_vector<canonical_mutation> muts;
muts.reserve(topo.normal_nodes.size());
std::unordered_set<locator::host_id> dirty_nodes;
for (auto& [_, ermp] : _db.get_non_local_strategy_keyspaces_erms()) {
auto* erm = ermp->maybe_as_vnode_effective_replication_map();
const std::unordered_set<locator::host_id>& nodes = rollback ? erm->get_all_pending_nodes() : erm->get_dirty_endpoints();
dirty_nodes.insert(nodes.begin(), nodes.end());
}
for (auto& n : dirty_nodes) {
auto id = raft::server_id(n.uuid());
// mark all nodes (except self) as cleanup needed
if (node.id != id) {
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(id).set("cleanup_status", cleanup_status::needed);
muts.emplace_back(builder.build());
rtlogger.debug("mark node {} as needed for cleanup", id);
}
}
return muts;
}
future<> start_vnodes_cleanup_on_dirty_nodes(group0_guard guard, std::variant<std::pair<topology_request, raft::server_id>, utils::UUID> cmd) {
sstring cleanup_reason;
if (const auto* global_request_id = std::get_if<utils::UUID>(&cmd)) {
cleanup_reason = ::format("by global request {}", *global_request_id);
} else {
const auto& req = std::get<std::pair<topology_request, raft::server_id>>(cmd);
cleanup_reason = ::format("required by '{}' of the node {}", req.first, req.second);
}
rtlogger.info("starting vnodes cleanup {}", cleanup_reason);
// We need a barrier to make sure that no request coordinators
// are sending requests to old replicas/ranges that we're going to cleanup.
if (_topo_sm._topology.fence_version < _topo_sm._topology.version) {
bool fenced = false;
bool failed = false;
try {
rtlogger.info("vnodes cleanup {}: running global_token_metadata_barrier", cleanup_reason);
guard = co_await global_token_metadata_barrier(std::move(guard), _topo_sm._topology.ignored_nodes, &fenced);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (...) {
// The barrier does its best to ensure no data plane requests are affected
// by fencing. It runs barrier_and_drain command on all nodes and then
// commits fence_version := version to the raft group0. If the first barrier
// failed it runs another barrier with the new fence_version.
// Both of these barriers might fail, but if we managed to commit the
// new fence_version to group0 then we can continue with the cleanup. Why?
// Cleanup on a replica first calls `read_barrier()`. This ensures that
// new requests from old coordinators are fenced out before they start
// executing, preventing them from corrupting the cleaned-up replica state.
// There may still be active requests on the replica from the old topology
// if `barrier_and_drain` failed on this replica. We wait for those requests
// to finish using `await_pending_writes()` before starting the cleanup.
if (!fenced) {
throw;
}
failed = true;
rtlogger.warn("vnodes cleanup {}: global_token_metadata_barrier threw an error {}; "
"this is not fatal, continuing cleanup.",
cleanup_reason, std::current_exception());
}
if (failed) {
guard = co_await start_operation();
}
}
auto& topo = _topo_sm._topology;
utils::chunked_vector<canonical_mutation> muts;
if (const auto* global_request_id = std::get_if<utils::UUID>(&cmd)) {
muts.reserve(topo.normal_nodes.size() + 2);
topology_mutation_builder builder(guard.write_timestamp());
builder.del_global_topology_request();
if (_feature_service.topology_global_request_queue) {
// Set start_time when we begin executing the request
topology_request_tracking_mutation_builder start_rtbuilder(*global_request_id);
start_rtbuilder.set("start_time", db_clock::now());
muts.emplace_back(start_rtbuilder.build());
topology_request_tracking_mutation_builder rtbuilder(*global_request_id);
builder.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, *global_request_id);
rtbuilder.done();
muts.emplace_back(rtbuilder.build());
}
muts.emplace_back(builder.build());
} else {
muts.reserve(topo.normal_nodes.size());
}
size_t nodes_to_cleanup = 0;
for (auto& [id, rs] : topo.normal_nodes) {
if (rs.cleanup == cleanup_status::needed) {
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(id).set("cleanup_status", cleanup_status::running);
muts.emplace_back(builder.build());
rtlogger.debug("mark node {} as cleanup running", id);
++nodes_to_cleanup;
}
}
if (!muts.empty()) {
co_await update_topology_state(std::move(guard), std::move(muts), "Starting cleanup");
}
rtlogger.info("vnodes cleanup {} started, nodes to cleanup {}", cleanup_reason, nodes_to_cleanup);
}
future<> run_view_building_coordinator() {
while (!_feature_service.view_building_coordinator && !_as.abort_requested()) {
condition_variable feature_cv;
auto listener = _feature_service.view_building_coordinator.when_enabled([&feature_cv] {
feature_cv.broadcast();
});
auto abort = _as.subscribe([&feature_cv] () noexcept {
feature_cv.broadcast();
});
co_await feature_cv.wait();
}
if (_as.abort_requested()) {
co_return;
}
_lifecycle_notifier.register_subscriber(_vb_coordinator.get());
try {
co_await _vb_coordinator->run();
} catch (...) {
on_fatal_internal_error(rtlogger, format("unhandled exception in view_building_coordinator::run(): {}", std::current_exception()));
}
co_await _lifecycle_notifier.unregister_subscriber(_vb_coordinator.get());
}
future<> mark_view_build_statuses_on_node_join(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, raft::server_id node_id) {
co_await _vb_coordinator->mark_view_build_statuses_on_node_join(out, guard, locator::host_id{node_id.uuid()});
}
future<> remove_view_build_statuses_on_left_node(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, raft::server_id node_id) {
co_await _vb_coordinator->remove_view_build_statuses_on_left_node(out, guard, locator::host_id{node_id.uuid()});
}
// Returns the guard if no work done. Otherwise, performs a table migration and consumes the guard.
future<std::optional<group0_guard>> maybe_migrate_system_tables(group0_guard guard);
// Returns true if the state machine was transitioned into tablet migration path.
future<bool> maybe_start_tablet_migration(group0_guard);
// Returns true if the state machine was transitioned into tablet resize finalization path.
future<bool> maybe_start_tablet_resize_finalization(group0_guard, const table_resize_plan& plan);
future<> refresh_tablet_load_stats();
future<> start_tablet_load_stats_refresher();
// Precondition: the state machine upgrade state is not at upgrade_state::done.
future<> do_upgrade_step(group0_guard);
future<> build_coordinator_state(group0_guard);
future<> await_event() {
_as.check();
co_await _topo_sm.event.when();
}
future<> fence_previous_coordinator();
future<> rollback_current_topology_op(group0_guard&& guard);
// Returns true if the coordinator should sleep after the exception.
bool handle_topology_coordinator_error(std::exception_ptr eptr) noexcept;
public:
topology_coordinator(
sharded<db::system_distributed_keyspace>& sys_dist_ks, gms::gossiper& gossiper,
netw::messaging_service& messaging, locator::shared_token_metadata& shared_tm,
db::system_keyspace& sys_ks, replica::database& db, service::raft_group0& group0,
service::topology_state_machine& topo_sm, db::view::view_building_state_machine& vb_sm, abort_source& as, raft::server& raft_server,
raft_topology_cmd_handler_type raft_topology_cmd_handler,
tablet_allocator& tablet_allocator,
cdc::generation_service& cdc_gens,
std::chrono::milliseconds ring_delay,
gms::feature_service& feature_service,
endpoint_lifecycle_notifier& lifecycle_notifier,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
, _group0(group0), _topo_sm(topo_sm), _vb_sm(vb_sm), _as(as)
, _feature_service(feature_service), _lifecycle_notifier(lifecycle_notifier)
, _sl_controller(sl_controller)
, _raft(raft_server), _term(raft_server.get_current_term())
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
, _cdc_gens(cdc_gens)
, _tablet_load_stats_refresh([this] { return refresh_tablet_load_stats(); })
, _ring_delay(ring_delay)
, _group0_holder(_group0.hold_group0_gate())
, _voter_handler(group0, topo_sm._topology, gossiper, feature_service)
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
, _async_gate("topology_coordinator")
{}
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
future<bool> maybe_run_upgrade();
future<> run();
future<> stop();
virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); };
};
future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_tables(group0_guard guard) {
// Some of the upgrades are guarded by _feature_service. If the feature gets enabled,
// it's in `topology_coordinator::enable_features` ,so topology_coordinator will re-run its loop
// and `maybe_migrate_system_tables` will be called.
// Check if we can upgrade the view_build_status table to v2, being managed by group0.
// First we upgrade to an intermediate version v1_5 where we write to both tables, then
// we upgrade to v2.
const auto view_builder_version = co_await _sys_ks.get_view_builder_version();
if (view_builder_version == db::system_keyspace::view_builder_version_t::v1 && _feature_service.view_build_status_on_group0) {
rtlogger.info("Migrating view_builder to v1_5");
auto tmptr = get_token_metadata_ptr();
co_await db::view::view_builder::migrate_to_v1_5(tmptr, _sys_ks, _sys_ks.query_processor(), _group0.client(), _as, std::move(guard));
co_return std::nullopt;
}
if (view_builder_version == db::system_keyspace::view_builder_version_t::v1_5) {
if (!get_dead_nodes().empty()) {
rtlogger.debug("Not all nodes are alive. Skipping system table migration until there are any dead nodes.");
co_return std::move(guard);
}
rtlogger.info("Migrating view_builder to v2");
// do a barrier to ensure all nodes applied the migration to v1_5 before we continue to v2
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier, {_raft.id()});
auto tmptr = get_token_metadata_ptr();
co_await db::view::view_builder::migrate_to_v2(tmptr, _sys_ks, _sys_ks.query_processor(), _group0.client(), _as, std::move(guard));
co_return std::nullopt;
}
if (_sl_controller.is_v2() && _feature_service.driver_service_level) {
const auto sl_driver_created = co_await _sys_ks.get_service_level_driver_created();
if (!sl_driver_created.value_or(false)) {
co_return co_await _sl_controller.migrate_to_driver_service_level(std::move(guard), _sys_ks);
}
}
co_return std::move(guard);
}
future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard guard) {
rtlogger.debug("Evaluating tablet balance");
if (utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) {
co_await _tablet_load_stats_refresh.trigger();
}
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());
if (plan.empty()) {
rtlogger.debug("Tablet load balancer did not make any plan");
co_return false;
}
utils::chunked_vector<canonical_mutation> updates;
co_await generate_migration_updates(updates, guard, plan);
// We only want to consider transitioning into tablet resize finalization path, if there's no other work
// to be done (e.g. start migration or/and emit split decision).
if (updates.empty()) {
co_return co_await maybe_start_tablet_resize_finalization(std::move(guard), plan.resize_plan());
}
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), "Starting tablet migration");
co_return true;
}
future<bool> topology_coordinator::maybe_start_tablet_resize_finalization(group0_guard guard, const table_resize_plan& plan) {
if (plan.finalize_resize.empty()) {
co_return false;
}
if (utils::get_local_injector().enter("tablet_split_finalization_postpone")) {
co_return false;
}
auto resize_finalization_transition_state = [this] {
return _feature_service.tablet_merge ? topology::transition_state::tablet_resize_finalization : topology::transition_state::tablet_split_finalization;
};
utils::chunked_vector<canonical_mutation> updates;
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(resize_finalization_transition_state())
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), "Started tablet resize finalization");
co_return true;
}
future<> topology_coordinator::refresh_tablet_load_stats() {
auto tm = get_token_metadata_ptr();
locator::load_stats stats;
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
std::unordered_map<table_id, size_t> total_replicas;
bool table_load_stats_invalid = false;
for (auto& [dc, nodes] : tm->get_datacenter_token_owners_nodes()) {
locator::load_stats dc_stats;
rtlogger.debug("raft topology: Refreshing table load stats for DC {} that has {} token owners", dc, nodes.size());
co_await coroutine::parallel_for_each(nodes, [&] (const auto& node) -> future<> {
auto dst = node.get().host_id();
auto dst_server = raft::server_id(dst.uuid());
_as.check();
// FIXME: if a node is down, completion status cannot be relied upon, but the average tablet size
// could still be inferred from the replicas available.
auto timeout = netw::messaging_service::clock_type::now() + wait_for_live_nodes_timeout;
abort_source as;
auto request_abort = [&as] () mutable noexcept {
as.request_abort();
};
auto t = timer<lowres_clock>(request_abort);
t.arm(timeout);
auto sub = _as.subscribe(request_abort);
locator::load_stats node_stats;
if (!_gossiper.is_alive(dst)) {
if (_load_stats_per_node.contains(dst)) {
node_stats = _load_stats_per_node[dst];
} else {
rtlogger.debug("raft topology: Unable to refresh table load on {} because it's down.", dst);
table_load_stats_invalid = true;
co_return;
}
} else if (_feature_service.tablet_load_stats_v2) {
node_stats = co_await ser::storage_service_rpc_verbs::send_table_load_stats(&_messaging,
dst,
as,
dst_server);
} else {
node_stats = locator::load_stats::from_v1(
co_await ser::storage_service_rpc_verbs::send_table_load_stats_v1(&_messaging,
dst,
as,
dst_server));
}
_load_stats_per_node[dst] = node_stats;
dc_stats += node_stats;
});
for (auto& [table_id, table_stats] : dc_stats.tables) {
co_await coroutine::maybe_yield();
auto& t = _db.find_column_family(table_id);
auto& rs = t.get_effective_replication_map()->get_replication_strategy();
if (!rs.uses_tablets()) {
continue;
}
const auto* nts_ptr = dynamic_cast<const locator::network_topology_strategy*>(&rs);
if (!nts_ptr) {
on_internal_error(rtlogger, "Cannot convert replication_strategy that uses tablets into network_topology_strategy");
}
auto rf_for_this_dc = nts_ptr->get_replication_factor(dc);
if (rf_for_this_dc <= 0) {
continue;
}
total_replicas[table_id] += rf_for_this_dc;
rtlogger.debug("raft topology: Refreshed table load stats for DC {}, table={}, RF={}, size_in_bytes={}, split_ready_seq_number={}",
dc, table_id, rf_for_this_dc, table_stats.size_in_bytes, table_stats.split_ready_seq_number);
}
stats += dc_stats;
}
for (auto& [table_id, table_load_stats] : stats.tables) {
auto table_total_replicas = total_replicas.at(table_id);
if (table_total_replicas == 0) {
continue;
}
// Takes into account the RF of each DC, so we can compute the average total size
// for a single table replica. This allows the load balancer to compute, in turn,
// the average tablet size by dividing total size by tablet count.
table_load_stats.size_in_bytes /= table_total_replicas;
}
if (table_load_stats_invalid) {
stats.tables.clear();
}
rtlogger.debug("raft topology: Refreshed table load stats for all DC(s).");
_tablet_allocator.set_load_stats(make_lw_shared<const locator::load_stats>(std::move(stats)));
}
future<> topology_coordinator::start_tablet_load_stats_refresher() {
auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); };
while (can_proceed()) {
bool sleep = true;
try {
co_await _tablet_load_stats_refresh.trigger();
_topo_sm.event.broadcast(); // wake up load balancer.
} catch (raft::request_aborted&) {
rtlogger.debug("raft topology: Tablet load stats refresher aborted");
sleep = false;
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("raft topology: Tablet load stats refresher aborted");
sleep = false;
} catch (...) {
rtlogger.warn("Found error while refreshing load stats for tablets: {}, retrying...", std::current_exception());
}
auto refresh_interval = std::chrono::seconds(_tablet_load_stats_refresh_interval_in_seconds.get());
if (sleep && can_proceed()) {
try {
co_await seastar::sleep_abortable(refresh_interval, _as);
} catch (...) {
rtlogger.debug("raft topology: Tablet load stats refresher: sleep failed: {}", std::current_exception());
}
}
}
}
future<> topology_coordinator::do_upgrade_step(group0_guard guard) {
switch (_topo_sm._topology.upgrade_state) {
case topology::upgrade_state_type::not_upgraded:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator was started even though upgrade to raft topology was not requested yet")));
case topology::upgrade_state_type::build_coordinator_state:
utils::get_local_injector().inject("topology_coordinator_fail_to_build_state_during_upgrade", [] {
throw std::runtime_error("failed to build topology coordinator state due to error injection");
});
co_await build_coordinator_state(std::move(guard));
co_return;
case topology::upgrade_state_type::done:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator::do_upgrade_step called after upgrade was completed")));
}
}
future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
// Wait until all nodes reach use_post_raft_procedures
rtlogger.info("waiting for all nodes to finish upgrade to raft schema");
release_guard(std::move(guard));
co_await _group0.wait_for_all_nodes_to_finish_upgrade(_as);
auto tmptr = get_token_metadata_ptr();
auto sl_version = co_await _sys_ks.get_service_levels_version();
if (!sl_version || *sl_version < 2) {
rtlogger.info("migrating service levels data");
co_await qos::service_level_controller::migrate_to_v2(tmptr->get_normal_token_owners().size(), _sys_ks, _sys_ks.query_processor(), _group0.client(), _as);
}
auto auth_version = co_await _sys_ks.get_auth_version();
if (auth_version < db::system_keyspace::auth_version_t::v2) {
rtlogger.info("migrating system_auth keyspace data");
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
[this] (abort_source&) { return start_operation();}, _as);
}
rtlogger.info("building initial raft topology state and CDC generation");
guard = co_await start_operation();
auto get_application_state = [&] (locator::host_id host_id, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring {
const auto it = epmap.find(app_state);
if (it == epmap.end()) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}: application state {} is missing in gossip",
host_id, app_state));
}
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
};
// Create a new CDC generation
auto get_sharding_info_for_host_id = [&] (locator::host_id host_id) -> std::pair<size_t, uint8_t> {
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("no gossiper endpoint state for node {}", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto shard_count = std::stoi(get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT));
const auto ignore_msb = std::stoi(get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS));
return std::make_pair<size_t, uint8_t>(shard_count, ignore_msb);
};
auto [cdc_gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt, get_sharding_info_for_host_id);
guard = std::move(guard_);
topology_mutation_builder builder(guard.write_timestamp());
std::set<sstring> enabled_features;
// Build per-node state
for (const auto& node: tmptr->get_topology().get_nodes()) {
if (!node.get().is_member()) {
continue;
}
const auto& host_id = node.get().host_id();
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {} as gossip contains no data for it", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto datacenter = get_application_state(host_id, epmap, gms::application_state::DC);
const auto rack = get_application_state(host_id, epmap, gms::application_state::RACK);
const auto tokens_v = tmptr->get_tokens(host_id);
const std::unordered_set<dht::token> tokens(tokens_v.begin(), tokens_v.end());
const auto release_version = get_application_state(host_id, epmap, gms::application_state::RELEASE_VERSION);
const auto num_tokens = tokens.size();
const auto shard_count = get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT);
const auto ignore_msb = get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS);
const auto supported_features_s = get_application_state(host_id, epmap, gms::application_state::SUPPORTED_FEATURES);
const auto supported_features = gms::feature_service::to_feature_set(supported_features_s);
if (enabled_features.empty()) {
enabled_features = supported_features;
} else {
std::erase_if(enabled_features, [&] (const sstring& f) { return !supported_features.contains(f); });
}
builder.with_node(raft::server_id(host_id.uuid()))
.set("datacenter", datacenter)
.set("rack", rack)
.set("tokens", tokens)
.set("node_state", node_state::normal)
.set("release_version", release_version)
.set("num_tokens", (uint32_t)num_tokens)
.set("tokens_string", "")
.set("shard_count", (uint32_t)std::stoi(shard_count))
.set("ignore_msb", (uint32_t)std::stoi(ignore_msb))
.set("cleanup_status", cleanup_status::clean)
.set("request_id", utils::UUID())
.set("supported_features", supported_features);
rtlogger.debug("node {} will contain the following parameters: "
"datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}",
host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features);
}
// Build the static columns
const bool add_ts_delay = true; // This is not the first generation, so add delay
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay);
const cdc::generation_id_v2 cdc_gen_id {
.ts = cdc_gen_ts,
.id = cdc_gen_uuid,
};
builder.set_version(topology::initial_version)
.set_fence_version(topology::initial_version)
.add_new_committed_cdc_generation(cdc_gen_id)
.add_enabled_features(std::move(enabled_features));
// Commit
builder.set_upgrade_state(topology::upgrade_state_type::done);
auto reason = "upgrade: build the initial state";
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
}
future<> topology_coordinator::fence_previous_coordinator() {
// Write empty change to make sure that a guard taken by any previous coordinator cannot
// be used to do a successful write any more. Otherwise the following can theoretically happen
// while a coordinator tries to execute RPC R and move to state S.
// 1. Leader A executes topology RPC R
// 2. Leader A takes guard G
// 3. Leader A calls update_topology_state(S)
// 4. Leadership moves to B (while update_topology_state is still running)
// 5. B executed topology RPC R again
// 6. while the RPC is running leadership moves to A again
// 7. A completes update_topology_state(S)
// Topology state machine moves to state S while RPC R is still running.
// If RPC is idempotent that should not be a problem since second one executed by B will do nothing,
// but better to be safe and cut off previous write attempt
while (!_as.abort_requested()) {
try {
auto guard = co_await start_operation();
topology_mutation_builder builder(guard.write_timestamp());
co_await update_topology_state(std::move(guard), {builder.build()}, fmt::format("Starting new topology coordinator {}", _group0.group0_server().id()));
break;
} catch (group0_concurrent_modification&) {
// If we failed to write because of concurrent modification lets retry
continue;
} catch (raft::request_aborted&) {
// Abort was requested. Break the loop
rtlogger.debug("request to fence previous coordinator was aborted");
break;
} catch (seastar::abort_requested_exception&) {
// Abort was requested. Break the loop
rtlogger.debug("request to fence previous coordinator was aborted");
break;
} catch (...) {
rtlogger.error("failed to fence previous coordinator {}", std::current_exception());
}
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (abort_requested_exception&) {
// Abort was requested. Break the loop
break;
} catch (...) {
rtlogger.debug("sleep failed while fencing previous coordinator: {}", std::current_exception());
}
}
}
future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard) {
rtlogger.info("start rolling back topology change");
// Look for a node which operation should be aborted
// (there should be one since we are in the rollback)
node_to_work_on node = get_node_to_work_on(std::move(guard));
topology::transition_state transition_state;
switch (node.rs->state) {
case node_state::bootstrapping:
[[fallthrough]];
case node_state::replacing:
// To rollback bootstrap of replace just move the topology to left_token_ring. The node we tried to
// add will be removed from the group0 by the transition handler. It will also be notified to shutdown.
transition_state = topology::transition_state::left_token_ring;
break;
case node_state::removing: {
auto id = node.id;
// The node was removed already. We need to add it back. Lets do it as non voter.
// If it ever boots again it will make itself a voter.
release_node(std::move(node));
co_await _group0.group0_server().modify_config({raft::config_member{{id, {}}, raft::is_voter::no}}, {}, &_as);
node = retake_node(co_await start_operation(), id);
}
[[fallthrough]];
case node_state::decommissioning:
// To rollback decommission or remove just move the topology to rollback_to_normal.
transition_state = topology::transition_state::rollback_to_normal;
break;
default:
on_internal_error(rtlogger, fmt::format("tried to rollback in unsupported state {}", node.rs->state));
}
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.set_transition_state(transition_state)
.set_version(_topo_sm._topology.version + 1);
rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback));
utils::chunked_vector<canonical_mutation> muts;
// We are in the process of aborting remove or decommission which may have streamed some
// ranges to other nodes. Cleanup is needed.
muts = mark_nodes_as_cleanup_needed(node, true);
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
std::string str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag",
node.id, node.rs->state, transition_state);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), std::move(muts), str);
}
bool topology_coordinator::handle_topology_coordinator_error(std::exception_ptr eptr) noexcept {
try {
std::rethrow_exception(std::move(eptr));
} catch (raft::request_aborted&) {
rtlogger.debug("topology change coordinator fiber aborted");
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("topology change coordinator fiber aborted");
} catch (raft::commit_status_unknown&) {
rtlogger.warn("topology change coordinator fiber got commit_status_unknown");
} catch (group0_concurrent_modification&) {
rtlogger.info("topology change coordinator fiber got group0_concurrent_modification");
} catch (term_changed_error&) {
// Term changed. We may no longer be a leader
rtlogger.debug("topology change coordinator fiber notices term change {} -> {}", _term, _raft.get_current_term());
} catch (seastar::rpc::remote_verb_error&) {
rtlogger.warn("topology change coordinator fiber got rpc::remote_verb_error {}", std::current_exception());
return true;
} catch (...) {
rtlogger.error("topology change coordinator fiber got error {}", std::current_exception());
return true;
}
return false;
}
future<bool> topology_coordinator::maybe_run_upgrade() {
if (_topo_sm._topology.upgrade_state == topology::upgrade_state_type::done) {
// Upgrade was already done, nothing to do
co_return true;
}
rtlogger.info("topology coordinator fiber is upgrading the cluster to raft topology mode");
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
});
while (!_as.abort_requested() && _topo_sm._topology.upgrade_state != topology::upgrade_state_type::done) {
bool sleep = false;
try {
auto guard = co_await start_operation();
co_await do_upgrade_step(std::move(guard));
} catch (...) {
sleep = handle_topology_coordinator_error(std::current_exception());
}
if (sleep) {
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
rtlogger.debug("sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_return !_as.abort_requested();
}
future<> topology_coordinator::run() {
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
});
co_await fence_previous_coordinator();
auto cdc_generation_publisher = cdc_generation_publisher_fiber();
auto cdc_streams_gc = cdc_streams_gc_fiber();
auto tablet_load_stats_refresher = start_tablet_load_stats_refresher();
auto gossiper_orphan_remover = gossiper_orphan_remover_fiber();
auto group0_voter_refresher = group0_voter_refresher_fiber();
auto vb_coordinator_fiber = run_view_building_coordinator();
while (!_as.abort_requested()) {
bool sleep = false;
try {
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min));
auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation());
if (_rollback) {
co_await rollback_current_topology_op(std::move(guard));
_rollback = std::nullopt;
continue;
}
bool had_work = co_await handle_topology_transition(std::move(guard));
if (!had_work) {
// Nothing to work on. Wait for topology change event.
rtlogger.debug("topology coordinator fiber has nothing to do. Sleeping.");
co_await await_event();
rtlogger.debug("topology coordinator fiber got an event");
}
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s));
} catch (...) {
sleep = handle_topology_coordinator_error(std::current_exception());
}
if (sleep) {
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
rtlogger.debug("sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_await _async_gate.close();
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover);
co_await std::move(group0_voter_refresher);
co_await std::move(vb_coordinator_fiber);
co_await _vb_coordinator->stop();
}
future<> topology_coordinator::stop() {
// if topology_coordinator::run() is aborted either because we are not a
// leader anymore, or we are shutting down as a leader, we have to handle
// futures in _tablets in case any of them failed, before these failures
// are checked by future's destructor.
co_await coroutine::parallel_for_each(_tablets, [] (auto& tablet) -> future<> {
auto& [gid, tablet_state] = tablet;
rtlogger.debug("Checking tablet migration state for {}", gid);
auto stop_background_action = [] (background_action_holder& holder, locator::global_tablet_id gid, std::function<std::string()> fmt_desc) -> future<> {
if (holder) {
try {
co_await std::move(*holder);
} catch (...) {
rtlogger.warn("Tablet '{}' migration failed {}: {}",
gid, fmt_desc(), std::current_exception());
}
}
};
// we should have at most only a single active barrier for each tablet,
// but let's check all of them because we never reset these holders
// once they are added as barriers
for (auto& [stage, barrier]: tablet_state.barriers) {
SCYLLA_ASSERT(barrier.has_value());
co_await stop_background_action(barrier, gid, [stage] { return format("at stage {}", tablet_transition_stage_to_string(stage)); });
}
co_await stop_background_action(tablet_state.streaming, gid, [] { return "during streaming"; });
co_await stop_background_action(tablet_state.cleanup, gid, [] { return "during cleanup"; });
co_await stop_background_action(tablet_state.rebuild_repair, gid, [] { return "during rebuild_repair"; });
co_await stop_background_action(tablet_state.repair, gid, [] { return "during repair"; });
});
}
future<> run_topology_coordinator(
seastar::sharded<db::system_distributed_keyspace>& sys_dist_ks, gms::gossiper& gossiper,
netw::messaging_service& messaging, locator::shared_token_metadata& shared_tm,
db::system_keyspace& sys_ks, replica::database& db, service::raft_group0& group0,
service::topology_state_machine& topo_sm, db::view::view_building_state_machine& vb_sm, seastar::abort_source& as, raft::server& raft,
raft_topology_cmd_handler_type raft_topology_cmd_handler,
tablet_allocator& tablet_allocator,
cdc::generation_service& cdc_gens,
std::chrono::milliseconds ring_delay,
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
topology_coordinator coordinator{
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
std::move(raft_topology_cmd_handler),
tablet_allocator,
cdc_gens,
ring_delay,
feature_service, lifecycle_notifier,
sl_controller,
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
const bool upgrade_done = co_await coordinator.maybe_run_upgrade();
if (upgrade_done) {
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
return coordinator.run();
});
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
try {
if (raft.is_leader()) {
rtlogger.warn("unhandled exception in topology_coordinator::run: {}; stepping down as a leader", ex);
const auto stepdown_timeout_ticks = std::chrono::seconds(5) / raft_tick_interval;
co_await raft.stepdown(raft::logical_clock::duration(stepdown_timeout_ticks));
}
} catch (...) {
rtlogger.error("failed to step down before aborting: {}", std::current_exception());
}
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
}
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
co_await coordinator.stop();
}
} // namespace service