Files
scylladb/service/storage_service.cc
Pavel Emelyanov f112e42ddd raft: Fix split mutations freeze
Commit faa0ee9844 accidentally broke the way split snapshot mutation was
frozen -- instead of appending the sub-mutation `m` the commit kept the
old variable name of `mut` which in the new code corresponds to "old"
non-split mutation

Fixes #29051

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#29052
2026-03-24 08:53:50 +02:00

6301 lines
308 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "storage_service.hh"
#include "db/view/view_building_worker.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/shard_id.hh>
#include "db/view/view_building_coordinator.hh"
#include "utils/disk_space_monitor.hh"
#include "compaction/task_manager_module.hh"
#include "gc_clock.hh"
#include "raft/raft.hh"
#include "auth/cache.hh"
#include <ranges>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sleep.hh>
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/qos/service_level_controller.hh"
#include "locator/token_metadata.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <chrono>
#include <exception>
#include <optional>
#include <fmt/ranges.h>
#include <seastar/core/sharded.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/as_future.hh>
#include "gms/endpoint_state.hh"
#include "locator/snitch_base.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include <seastar/core/when_all.hh>
#include "service/tablet_allocator.hh"
#include "locator/types.hh"
#include "locator/tablets.hh"
#include "dht/auto_refreshing_sharder.hh"
#include "mutation_writer/multishard_writer.hh"
#include "locator/tablet_metadata_guard.hh"
#include "replica/tablet_mutation_builder.hh"
#include <seastar/core/smp.hh>
#include "mutation/canonical_mutation.hh"
#include "mutation/async_utils.hh"
#include <seastar/core/on_internal_error.hh>
#include "service/strong_consistency/groups_manager.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/topology_state_machine.hh"
#include "utils/assert.hh"
#include "utils/UUID.hh"
#include "utils/to_string.hh"
#include "gms/inet_address.hh"
#include "utils/log.hh"
#include "service/migration_manager.hh"
#include "service/raft/raft_group0.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include <seastar/core/thread.hh>
#include <algorithm>
#include "locator/local_strategy.hh"
#include "utils/user_provided_param.hh"
#include "version.hh"
#include "streaming/stream_blob.hh"
#include "dht/range_streamer.hh"
#include <boost/range/algorithm.hpp>
#include <boost/range/join.hpp>
#include "transport/server.hh"
#include <seastar/core/rwlock.hh>
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "db/hints/manager.hh"
#include "utils/exceptions.hh"
#include "message/messaging_service.hh"
#include "supervisor.hh"
#include "compaction/compaction_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "db/view/view_builder.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include <seastar/core/metrics.hh>
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include "gms/generation-number.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/exception.hh>
#include "utils/pretty_printers.hh"
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include "locator/util.hh"
#include "idl/storage_service.dist.hh"
#include "idl/streaming.dist.hh"
#include "service/storage_proxy.hh"
#include "service/raft/join_node.hh"
#include "idl/join_node.dist.hh"
#include "idl/migration_manager.dist.hh"
#include "idl/node_ops.dist.hh"
#include "transport/protocol_server.hh"
#include "node_ops/node_ops_ctl.hh"
#include "node_ops/task_manager_module.hh"
#include "service/task_manager_module.hh"
#include "service/topology_mutation.hh"
#include "cql3/query_processor.hh"
#include "service/qos/service_level_controller.hh"
#include <csignal>
#include "utils/labels.hh"
#include "view_info.hh"
#include "raft/raft.hh"
#include "debug.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <stdexcept>
#include <unistd.h>
#include <variant>
#include <utility>
using token = dht::token;
using UUID = utils::UUID;
using inet_address = gms::inet_address;
extern logging::logger cdc_log;
namespace service {
static logging::logger slogger("storage_service");
static thread_local session_manager topology_session_manager;
session_manager& get_topology_session_manager() {
return topology_session_manager;
}
namespace {
[[nodiscard]] locator::host_id_or_endpoint_list string_list_to_endpoint_list(const std::vector<sstring>& src_node_strings) {
locator::host_id_or_endpoint_list resulting_node_list;
resulting_node_list.reserve(src_node_strings.size());
for (const sstring& n : src_node_strings) {
try {
resulting_node_list.emplace_back(n);
} catch (...) {
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", src_node_strings, n, std::current_exception()));
}
}
return resulting_node_list;
}
void check_raft_rpc_scheduling_group(const replica::database& db, const gms::feature_service& feature_service, const std::string_view rpc_name) {
if (!feature_service.enforced_raft_rpc_scheduling_group) {
return;
}
if (current_scheduling_group() != debug::gossip_scheduling_group) {
on_internal_error_noexcept(
slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group, current group is [{}], operation [{}].",
current_scheduling_group().name(), rpc_name));
}
}
} // namespace
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
storage_service::storage_service(abort_source& abort_source,
sharded<replica::database>& db, gms::gossiper& gossiper,
sharded<db::system_keyspace>& sys_ks,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
gms::feature_service& feature_service,
sharded<service::migration_manager>& mm,
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
endpoint_lifecycle_notifier& elc_notif,
sharded<db::batchlog_manager>& bm,
sharded<locator::snitch_ptr>& snitch,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<cdc::generation_service>& cdc_gens,
sharded<db::view::view_builder>& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller,
auth::cache& auth_cache,
sharded<client_routes_service>& client_routes,
topology_state_machine& topology_state_machine,
db::view::view_building_state_machine& view_building_state_machine,
tasks::task_manager& tm,
gms::gossip_address_map& address_map,
std::function<future<void>(std::string_view)> compression_dictionary_updated_callback,
utils::disk_space_monitor* disk_space_monitor,
strong_consistency::groups_manager& groups_manager
)
: _abort_source(abort_source)
, _feature_service(feature_service)
, _db(db)
, _gossiper(gossiper)
, _messaging(ms)
, _migration_manager(mm)
, _qp(qp)
, _repair(repair)
, _stream_manager(stream_manager)
, _snitch(snitch)
, _sl_controller(sl_controller)
, _auth_cache(auth_cache)
, _client_routes(client_routes)
, _group0(nullptr)
, _async_gate("storage_service")
, _node_ops_module(make_shared<node_ops::task_manager_module>(tm, *this))
, _tablets_module(make_shared<service::task_manager_module>(tm, *this))
, _global_topology_requests_module(make_shared<service::topo::task_manager_module>(tm))
, _address_map(address_map)
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
, _lifecycle_notifier(elc_notif)
, _batchlog_manager(bm)
, _sys_ks(sys_ks)
, _sys_dist_ks(sys_dist_ks)
, _snitch_reconfigure([this] {
return container().invoke_on(0, [] (auto& ss) {
return ss.snitch_reconfigured();
});
})
, _tablet_allocator(tablet_allocator)
, _cdc_gens(cdc_gens)
, _view_builder(view_builder)
, _view_building_worker(view_building_worker)
, _topology_state_machine(topology_state_machine)
, _view_building_state_machine(view_building_state_machine)
, _compression_dictionary_updated_callback(std::move(compression_dictionary_updated_callback))
, _disk_space_monitor(disk_space_monitor)
, _groups_manager(groups_manager)
{
tm.register_module(_node_ops_module->get_name(), _node_ops_module);
tm.register_module(_tablets_module->get_name(), _tablets_module);
tm.register_module(_global_topology_requests_module->get_name(), _global_topology_requests_module);
if (this_shard_id() == 0) {
_node_ops_module->make_virtual_task<node_ops::node_ops_virtual_task>(*this);
_tablets_module->make_virtual_task<service::tablet_virtual_task>(*this);
_global_topology_requests_module->make_virtual_task<service::topo::global_topology_request_virtual_task>(*this);
}
register_metrics();
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_write_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(general_disk_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(commit_error.connect([this] { do_isolate_on_error(disk_error::commit); }))));
if (_snitch.local_is_initialized()) {
_listeners.emplace_back(make_lw_shared(_snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
init_messaging_service();
_migration_manager.local().plug_storage_service(*this);
}
storage_service::~storage_service() = default;
node_ops::task_manager_module& storage_service::get_node_ops_module() noexcept {
return *_node_ops_module;
}
auth::cache& storage_service::auth_cache() noexcept {
return _auth_cache;
}
enum class node_external_status {
UNKNOWN = 0,
STARTING = 1,
JOINING = 2,
NORMAL = 3,
LEAVING = 4,
DECOMMISSIONED = 5,
DRAINING = 6,
DRAINED = 7,
MOVING = 8, //deprecated
MAINTENANCE = 9
};
static node_external_status map_operation_mode(storage_service::mode m) {
switch (m) {
case storage_service::mode::NONE: return node_external_status::STARTING;
case storage_service::mode::STARTING: return node_external_status::STARTING;
case storage_service::mode::BOOTSTRAP: return node_external_status::JOINING;
case storage_service::mode::JOINING: return node_external_status::JOINING;
case storage_service::mode::NORMAL: return node_external_status::NORMAL;
case storage_service::mode::LEAVING: return node_external_status::LEAVING;
case storage_service::mode::DECOMMISSIONED: return node_external_status::DECOMMISSIONED;
case storage_service::mode::DRAINING: return node_external_status::DRAINING;
case storage_service::mode::DRAINED: return node_external_status::DRAINED;
case storage_service::mode::MOVING: return node_external_status::MOVING;
case storage_service::mode::MAINTENANCE: return node_external_status::MAINTENANCE;
}
return node_external_status::UNKNOWN;
}
void storage_service::register_metrics() {
if (this_shard_id() != 0) {
// the relevant data is distributed between the shards,
// We only need to register it once.
return;
}
namespace sm = seastar::metrics;
_metrics.add_group("node", {
sm::make_gauge("operation_mode", sm::description("The operation mode of the current node. UNKNOWN = 0, STARTING = 1, JOINING = 2, NORMAL = 3, "
"LEAVING = 4, DECOMMISSIONED = 5, DRAINING = 6, DRAINED = 7, MOVING = 8, MAINTENANCE = 9"), [this] {
return static_cast<std::underlying_type_t<node_external_status>>(map_operation_mode(_operation_mode));
})(basic_level),
});
}
bool storage_service::is_replacing() {
const auto& cfg = _db.local().get_config();
if (!cfg.replace_node_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace node on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
if (!cfg.replace_address_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace address on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
// Returning true if cfg.replace_address is provided
// will trigger an exception down the road if bootstrap_complete(),
// as it is an error to use this option post bootstrap.
// That said, we should just stop supporting it and force users
// to move to the new, replace_node_first_boot config option.
return !cfg.replace_address().empty();
}
bool storage_service::is_first_node() {
if (is_replacing()) {
return false;
}
auto seeds = _gossiper.get_seeds();
if (seeds.empty()) {
return false;
}
// Node with the smallest IP address is chosen as the very first node
// in the cluster. The first node is the only node that does not
// bootstrap in the cluster. All other nodes will bootstrap.
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
std::sort(sorted_seeds.begin(), sorted_seeds.end());
if (sorted_seeds.front() == get_broadcast_address()) {
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
return true;
}
return false;
}
bool storage_service::should_bootstrap() {
return !_sys_ks.local().bootstrap_complete() && !is_first_node();
}
/* Broadcasts the chosen tokens through gossip,
* together with a CDC generation timestamp and STATUS=NORMAL.
*
* Assumes that no other functions modify CDC_GENERATION_ID, TOKENS or STATUS
* in the gossiper's local application state while this function runs.
*/
static future<> set_gossip_tokens(gms::gossiper& g,
const std::unordered_set<dht::token>& tokens) {
return g.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::normal(tokens)));
}
static locator::node::state to_topology_node_state(node_state ns) {
switch (ns) {
case node_state::bootstrapping: return locator::node::state::bootstrapping;
case node_state::decommissioning: return locator::node::state::being_decommissioned;
case node_state::removing: return locator::node::state::being_removed;
case node_state::normal: return locator::node::state::normal;
case node_state::left: return locator::node::state::left;
case node_state::replacing: return locator::node::state::replacing;
case node_state::rebuilding: return locator::node::state::normal;
case node_state::none: return locator::node::state::none;
}
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& host_id_to_ip_map, nodes_to_notify_after_sync* nodes_to_notify) {
const auto& t = _topology_state_machine._topology;
raft::server_id raft_id{id.uuid()};
std::vector<future<>> sys_ks_futures;
auto node = t.find(raft_id);
if (!node) {
co_return;
}
const auto& rs = node->second;
switch (rs.state) {
case node_state::normal: {
if (is_me(id)) {
co_return;
}
// In replace-with-same-ip scenario the replaced node IP will be the same
// as ours, we shouldn't put it into system.peers.
// Some state that is used to fill in 'peers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
auto info = get_peer_info_for_update(id);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
info->data_center = rs.datacenter;
info->rack = rs.rack;
info->release_version = rs.release_version;
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
}
if (nodes_to_notify) {
nodes_to_notify->joined.emplace_back(ip, id);
}
if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
slogger.info("crash-before-prev-ip-removed hit, killing the node");
_exit(1);
});
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
}
}
break;
case node_state::bootstrapping:
if (!is_me(ip)) {
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
_exit(1);
});
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
}
break;
case node_state::replacing:
// Save the mapping just like for bootstrap above, but only in the case of replace with different IP, so
// that we don't have to delete the row of the node being replaced. For replace with the same IP, the
// mapping is recovered on restart based on the state of the topology state machine and system.peers.
if (!is_me(ip)) {
auto replaced_id = std::get<replace_param>(t.req_param.at(raft_id)).replaced_id;
if (const auto it = host_id_to_ip_map.find(locator::host_id(replaced_id.uuid())); it == host_id_to_ip_map.end() || it->second != ip) {
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
}
}
break;
default:
break;
}
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
}
static std::unordered_set<locator::host_id> get_released_nodes(const service::topology& topology, const locator::token_metadata& tm) {
return boost::join(topology.left_nodes, topology.ignored_nodes)
| std::views::transform([] (const auto& raft_id) { return locator::host_id(raft_id.uuid()); })
| std::views::filter([&] (const auto& h) { return !tm.get_topology().has_node(h); })
| std::ranges::to<std::unordered_set<locator::host_id>>();
}
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal, std::optional<std::unordered_set<locator::host_id>> prev_released) {
nodes_to_notify_after_sync nodes_to_notify;
rtlogger.trace("Start sync_raft_topology_nodes");
const auto& t = _topology_state_machine._topology;
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
};
std::vector<future<>> sys_ks_futures;
auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, bool notify) -> future<> {
if (ip) {
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
co_await _gossiper.force_remove_endpoint(host_id, gms::null_permit_id);
if (notify) {
nodes_to_notify.left.push_back({*ip, host_id});
}
}
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
update_topology(host_id, t.left_nodes_rs.at(id));
}
};
auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (is_me(host_id)) {
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
co_await _gossiper.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens)));
}
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
};
auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
seastar::value_of([&] () -> sstring {
return rs.ring ? ::format("{}", rs.ring->tokens) : sstring("null");
}));
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
update_topology(host_id, rs);
if (_topology_state_machine._topology.normal_nodes.empty()) {
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
break;
case node_state::decommissioning:
[[fallthrough]];
case node_state::removing:
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
break;
}
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, host_id, ip, rs);
break;
}
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
tmptr->add_leaving_endpoint(host_id);
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
break;
case node_state::replacing: {
if (!_topology_state_machine._topology.req_param.contains(id)) {
on_internal_error(rtlogger, format("No request parameters for replacing node {}", id));
}
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
if (rs.ring.has_value()) {
update_topology(host_id, rs);
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, existing_ip.value_or(gms::inet_address{}), id, ip));
}
}
break;
case node_state::rebuilding:
// Rebuilding node is normal
co_await process_normal_node(id, host_id, ip, rs);
break;
default:
on_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
};
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
auto id_to_ip_map = co_await _sys_ks.local().get_host_id_to_ip_map();
for (const auto& id: t.left_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_left_node(id, host_id, ip, id_to_ip_map.find(host_id) != id_to_ip_map.end());
}
for (const auto& [id, rs]: t.normal_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_normal_node(id, host_id, ip, rs);
if (ip) {
auto it = id_to_ip_map.find(host_id);
bool notify = it == id_to_ip_map.end() || it->second != ip || !prev_normal.contains(id);
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, notify ? &nodes_to_notify : nullptr));
}
}
for (const auto& [id, rs]: t.transition_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_transition_node(id, host_id, ip, rs);
if (ip) {
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr));
}
}
for (auto id : t.excluded_tablet_nodes) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
n->set_excluded(true);
}
}
for (auto [node, req] : t.requests) {
if (req == topology_request::leave || req == topology_request::remove) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(node.uuid()));
if (n) {
n->set_draining(true);
}
}
}
if (prev_released) {
auto nodes_to_release = get_released_nodes(t, *tmptr);
std::erase_if(nodes_to_release, [&] (const auto& host_id) { return prev_released->contains(host_id); });
std::copy(nodes_to_release.begin(), nodes_to_release.end(), std::back_inserter(nodes_to_notify.released));
}
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
rtlogger.trace("End sync_raft_topology_nodes");
co_return nodes_to_notify;
}
future<> storage_service::notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify) {
for (auto host_id : nodes_to_notify.released) {
co_await notify_released(host_id);
}
for (auto [ip, host_id] : nodes_to_notify.left) {
co_await notify_left(ip, host_id);
}
for (auto [ip, host_id] : nodes_to_notify.joined) {
co_await notify_joined(ip, host_id);
}
}
future<> storage_service::topology_state_load(state_change_hint hint) {
#ifdef SEASTAR_DEBUG
static bool running = false;
SCYLLA_ASSERT(!running); // The function is not re-entrant
auto d = defer([] {
running = false;
});
running = true;
#endif
co_await utils::get_local_injector().inject("topology_state_load_error", [] {
return std::make_exception_ptr(std::runtime_error("topology_state_load_error"));
});
rtlogger.debug("reload raft topology state");
std::unordered_set<raft::server_id> prev_normal = _topology_state_machine._topology.normal_nodes | std::views::keys | std::ranges::to<std::unordered_set>();
std::optional<std::unordered_set<locator::host_id>> prev_released;
if (!_topology_state_machine._topology.is_empty()) {
prev_released = get_released_nodes(_topology_state_machine._topology, get_token_metadata());
}
std::unordered_set<locator::host_id> tablet_hosts = co_await replica::read_required_hosts(_qp);
// read topology state from disk and recreate token_metadata from it
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(tablet_hosts);
_topology_state_machine.reload_count++;
auto& topology = _topology_state_machine._topology;
// the view_builder is migrated to v2 in view_builder::migrate_to_v2.
// it writes a v2 version mutation as topology_change, then we get here
// to update the service to start using the v2 table.
auto view_builder_version = co_await _sys_ks.local().get_view_builder_version();
switch (view_builder_version) {
case db::system_keyspace::view_builder_version_t::v1_5:
co_await _view_builder.invoke_on_all([] (db::view::view_builder& vb) -> future<> {
co_await vb.upgrade_to_v1_5();
});
break;
case db::system_keyspace::view_builder_version_t::v2:
co_await _view_builder.invoke_on_all([] (db::view::view_builder& vb) -> future<> {
co_await vb.upgrade_to_v2();
});
break;
default:
break;
}
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
return fs.enable(topology.enabled_features | std::ranges::to<std::set<std::string_view>>());
});
// Update the legacy `enabled_features` key in `system.scylla_local`.
// It's OK to update it after enabling features because `system.topology` now
// is the source of truth about enabled features.
co_await _sys_ks.local().save_local_enabled_features(topology.enabled_features, false);
auto saved_tmpr = get_token_metadata_ptr();
{
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
tmptr->invalidate_cached_rings();
tmptr->set_version(topology.version);
const auto read_new = std::invoke([](std::optional<topology::transition_state> state) {
using read_new_t = locator::token_metadata::read_new_t;
if (!state.has_value()) {
return read_new_t::no;
}
switch (*state) {
case topology::transition_state::lock:
[[fallthrough]];
case topology::transition_state::join_group0:
[[fallthrough]];
case topology::transition_state::tablet_migration:
[[fallthrough]];
case topology::transition_state::tablet_split_finalization:
[[fallthrough]];
case topology::transition_state::tablet_resize_finalization:
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
case topology::transition_state::tablet_draining:
[[fallthrough]];
case topology::transition_state::write_both_read_old:
[[fallthrough]];
case topology::transition_state::left_token_ring:
[[fallthrough]];
case topology::transition_state::truncate_table:
[[fallthrough]];
case topology::transition_state::snapshot_tables:
[[fallthrough]];
case topology::transition_state::rollback_to_normal:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
}
}, topology.tstate);
tmptr->set_read_new(read_new);
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal), std::move(prev_released));
std::optional<locator::tablet_metadata> tablets;
if (hint.tablets_hint) {
// We want to update the tablet metadata incrementally, so copy it
// from the current token metadata and update only the changed parts.
tablets = co_await get_token_metadata().tablets().copy();
co_await replica::update_tablet_metadata(_db.local(), _qp, *tablets, *hint.tablets_hint);
} else {
tablets = co_await replica::read_tablet_metadata(_qp);
}
tablets->set_balancing_enabled(topology.tablet_balancing_enabled);
tmptr->set_tablets(std::move(*tablets));
if (_feature_service.parallel_tablet_draining) {
for (auto&& [node, req]: topology.requests) {
if (req == topology_request::leave || req == topology_request::remove) {
if (tmptr->tablets().has_replica_on(locator::host_id(node.uuid()))) {
topology.paused_requests.emplace(node, req);
}
}
}
}
co_await replicate_to_all_cores(std::move(tmptr));
co_await notify_nodes_after_sync(std::move(nodes_to_notify));
rtlogger.debug("topology_state_load: token metadata replication to all cores finished");
}
co_await update_fence_version(topology.fence_version);
// As soon as a node joins token_metadata.topology we
// need to drop all its rpc connections with ignored_topology flag.
{
std::vector<future<>> futures;
get_token_metadata_ptr()->get_topology().for_each_node([&](const locator::node& n) {
const auto ep = n.host_id();
if (auto ip_opt = _address_map.find(ep); ip_opt && !saved_tmpr->get_topology().has_node(ep)) {
futures.push_back(remove_rpc_client_with_ignored_topology(*ip_opt, n.host_id()));
}
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
for (const auto& gen_id : topology.committed_cdc_generations) {
rtlogger.trace("topology_state_load: process committed cdc generation {}", gen_id);
co_await utils::get_local_injector().inject("topology_state_load_before_update_cdc", [](auto& handler) -> future<> {
rtlogger.info("topology_state_load_before_update_cdc hit, wait for message");
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
});
co_await _cdc_gens.local().handle_cdc_generation(gen_id);
if (gen_id == topology.committed_cdc_generations.back()) {
rtlogger.debug("topology_state_load: the last committed CDC generation ID: {}", gen_id);
}
}
// Ban all left and ignored nodes. We do not allow them to go back online.
co_await _messaging.local().ban_hosts(boost::join(topology.left_nodes, topology.ignored_nodes)
| std::views::transform([] (auto id) { return locator::host_id{id.uuid()}; })
| std::ranges::to<utils::chunked_vector<locator::host_id>>());
slogger.debug("topology_state_load: excluded tablet nodes: {}", topology.excluded_tablet_nodes);
}
future<> storage_service::topology_transition(state_change_hint hint) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await topology_state_load(std::move(hint)); // reload new state
_topology_state_machine.event.broadcast();
}
future<> storage_service::view_building_state_load() {
rtlogger.debug("reload view building state");
auto filter_vnode_keyspace = [this] (std::string_view ks_name) {
// If the keyspace doesn't exist, also filter it out.
// It should be entry from vnode view, which hasn't been cleaned up yet.
// Entries from tablet-views should be removed in the same batch as drop keyspace/view mutations.
return _db.local().has_keyspace(ks_name) && _db.local().find_keyspace(ks_name).uses_tablets();
};
auto vb_tasks = co_await _sys_ks.local().get_view_building_tasks();
auto processing_base_table = co_await _sys_ks.local().get_view_building_processing_base_id();
std::map<table_id, std::vector<table_id>> views_per_base;
auto views = _db.local().get_views()
| std::views::filter([&] (const view_ptr& v) { return filter_vnode_keyspace(v->ks_name()); })
| std::views::transform([] (const view_ptr& v) { return std::make_pair(v->view_info()->base_id(), v->id()); });
for (const auto& [base_id, view_id]: views) {
views_per_base[base_id].push_back(view_id);
}
auto status_map = co_await _sys_ks.local().get_view_build_status_map()
| std::views::filter([&] (const auto& e) { return filter_vnode_keyspace(e.first.first); })
| std::views::transform([this] (const auto& e) { // convert (ks_name, view_name) to table_id
auto id = _db.local().find_schema(e.first.first, e.first.second)->id();
return std::make_pair(id, std::move(e.second));
})
| std::ranges::to<db::view::views_state::view_build_status_map>();
db::view::view_building_state building_state {std::move(vb_tasks), std::move(processing_base_table)};
db::view::views_state views_state {std::move(views_per_base), std::move(status_map)};
_view_building_state_machine.building_state = std::move(building_state);
_view_building_state_machine.views_state = std::move(views_state);
}
future<> storage_service::view_building_transition() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await view_building_state_load();
_view_building_state_machine.event.broadcast();
}
future<> storage_service::reload_raft_topology_state(service::raft_group0_client& group0_client) {
slogger.info("Waiting for group 0 read/apply mutex before reloading Raft topology state...");
auto holder = co_await group0_client.hold_read_apply_mutex(_abort_source);
slogger.info("Reloading Raft topology state");
// Using topology_transition() instead of topology_state_load(), because the former notifies listeners
co_await topology_transition();
slogger.info("Reloaded Raft topology state");
}
future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
auto it = std::partition(snp.mutations.begin(), snp.mutations.end(), [] (const canonical_mutation& m) {
return m.column_family_id() != db::system_keyspace::cdc_generations_v3()->id();
});
if (it != snp.mutations.end()) {
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
// Split big mutations into smaller ones, prepare frozen_muts_to_apply
utils::chunked_vector<frozen_mutation> frozen_muts_to_apply;
{
frozen_muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
const auto max_size = _db.local().schema_commitlog()->max_record_size() / 2;
for (auto i = it; i != snp.mutations.end(); i++) {
const auto& m = *i;
auto mut = co_await to_mutation_gently(m, s);
if (m.representation().size() <= max_size) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(m));
});
}
}
}
// Apply non-atomically so as not to hit the commitlog size limit.
// The cdc_generations_v3 data is not used in any way until
// it's referenced from the topology table.
// By applying the cdc_generations_v3 mutations before topology mutations
// we ensure that the lack of atomicity isn't a problem here.
co_await max_concurrent_for_each(frozen_muts_to_apply, 128, [&] (const frozen_mutation& m) -> future<> {
return _db.local().apply(s, m, {}, db::commitlog::force_sync::yes, db::no_timeout);
});
}
// Apply system.topology and system.topology_requests mutations atomically
// to have a consistent state after restart
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(std::distance(snp.mutations.begin(), it));
for (auto cur = snp.mutations.begin(); cur != it; ++cur) {
const auto& m = *cur;
auto s = _db.local().find_schema(m.column_family_id());
// FIXME: in theory, we can generate a frozen_mutation
// directly from canonical_mutation rather than building
// a mutation and then freezing it.
muts.emplace_back(freeze(m.to_mutation(s)));
co_await coroutine::maybe_yield();
}
co_await _db.local().apply(muts, db::no_timeout);
}
future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
}
future<> storage_service::compression_dictionary_updated_callback_all() {
auto all_dict_names = co_await _sys_ks.local().query_all_dict_names();
for (const auto& x : all_dict_names) {
co_await _compression_dictionary_updated_callback(x);
}
}
future<> storage_service::compression_dictionary_updated_callback(std::string_view name) {
assert(this_shard_id() == 0);
return _compression_dictionary_updated_callback(name);
}
future<> storage_service::load_cdc_streams(std::optional<std::unordered_set<table_id>> changed_tables) {
co_await _cdc_gens.local().load_cdc_tablet_streams(std::move(changed_tables));
}
// Moves the coroutine lambda onto the heap and extends its
// lifetime until the resulting future is completed.
// This allows to use captures in coroutine lambda after co_await-s.
// Without this helper the coroutine lambda is destroyed immediately after
// the caller (e.g. 'then' function implementation) has invoked it and got the future,
// so referencing the captures after co_await would be use-after-free.
template <typename Coro>
static auto ensure_alive(Coro&& coro) {
return [coro_ptr = std::make_unique<Coro>(std::move(coro))]<typename ...Args>(Args&&... args) mutable {
auto& coro = *coro_ptr;
return coro(std::forward<Args>(args)...).finally([coro_ptr = std::move(coro_ptr)] {});
};
}
// {{{ ip_address_updater
class storage_service::ip_address_updater: public gms::i_endpoint_state_change_subscriber {
gms::gossip_address_map& _address_map;
storage_service& _ss;
future<>
on_endpoint_change(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id, const char* ev) {
rslog.debug("ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id);
// If id maps to different ip in peers table it needs to be updated which is done by sync_raft_topology_nodes below
std::optional<gms::inet_address> prev_ip = co_await _ss._sys_ks.local().get_ip_from_peers_table(id);
if (_address_map.find(id) != endpoint) {
// Address map refused to update IP for the host_id,
// this means prev_ip has higher generation than endpoint.
// Do not update address.
co_return;
}
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
"old ip [{}], new ip [{}], "
"waiting for group 0 read/apply mutex before reloading Raft topology state...",
ev, id, prev_ip, endpoint);
// We're in a gossiper event handler, so gossiper is currently holding a lock
// for the endpoint parameter of on_endpoint_change.
// The topology_state_load function can also try to acquire gossiper locks.
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
(void)futurize_invoke(ensure_alive([this, id, prev_ip, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
// We need to call raft_topology_update_ip even if ip hasn't changed.
// Suppose a bootstrapping node A appears in the system.peers table of
// some other node B. Its record has only ID and IP of the node A, due to
// the special handling of bootstrapping nodes in raft_topology_update_ip.
// Suppose node B gets temporarily isolated from the topology coordinator.
// The topology coordinator fences out node B and successfully finishes
// bootstrapping of the node A. Later, when the connectivity is restored,
// topology_state_load runs on the node B, node A is already in
// normal state, but the gossiper on B might not yet have any state for
// it. In this case, raft_topology_update_ip would not update
// system.peers because the gossiper state is missing. Subsequently,
// on_join/on_restart/on_alive events would skip updates because the IP
// in gossiper matches the IP for that node in system.peers.
//
// If ip hasn't changed we set nodes_to_notify to nullptr since
// we don't need join events in this case.
nodes_to_notify_after_sync nodes_to_notify;
co_await _ss.raft_topology_update_ip(id, endpoint,
co_await _ss._sys_ks.local().get_host_id_to_ip_map(),
prev_ip == endpoint ? nullptr : &nodes_to_notify);
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
}));
}
public:
ip_address_updater(gms::gossip_address_map& address_map, storage_service& ss)
: _address_map(address_map)
, _ss(ss)
{}
virtual future<>
on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_join");
}
virtual future<>
on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_alive");
}
virtual future<>
on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_restart");
}
};
// }}} ip_address_updater
future<> storage_service::sstable_vnodes_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
try {
co_await _topology_state_machine.event.when([&] {
auto me = _topology_state_machine._topology.find(server.id());
return me && me->second.cleanup == cleanup_status::running;
});
std::unordered_map<sstring, std::vector<table_info>> ks_tables;
{
// The scope for the guard
auto guard = co_await _group0->client().start_operation(_group0_as);
auto me = _topology_state_machine._topology.find(server.id());
// Recheck that cleanup is needed after the barrier
if (!me || me->second.cleanup != cleanup_status::running) {
rtlogger.trace("vnodes_cleanup triggered, but not needed");
continue;
}
rtlogger.info("start vnodes_cleanup");
// Skip tablets tables since they do their own cleanup and system tables
// since they are local and not affected by range movements.
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
ks_tables.reserve(ks_erms.size());
for (auto&& [ks_name, erm] : ks_erms) {
auto& ks = _db.local().find_keyspace(ks_name);
const auto& cf_meta_data = ks.metadata().get()->cf_meta_data();
std::vector<table_info> table_infos;
table_infos.reserve(cf_meta_data.size());
for (const auto& [name, schema] : cf_meta_data) {
table_infos.emplace_back(table_info{name, schema->id()});
}
ks_tables.emplace(std::move(ks_name), std::move(table_infos));
};
}
{
rtlogger.info("vnodes_cleanup: drain closing sessions");
co_await proxy.invoke_on_all([] (storage_proxy& sp) {
return get_topology_session_manager().drain_closing_sessions();
});
rtlogger.info("vnodes_cleanup: wait for stale pending writes");
co_await proxy.invoke_on_all([] (storage_proxy& sp) {
return sp.await_stale_pending_writes();
});
rtlogger.info("vnodes_cleanup: flush_all_tables");
co_await _db.invoke_on_all([&] (replica::database& db) {
return db.flush_all_tables();
});
co_await coroutine::parallel_for_each(ks_tables, [&](auto& item) -> future<> {
auto& [ks_name, table_infos] = item;
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
// we flush all tables before cleanup the keyspaces individually, so skip the flush-tables step here
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, ks_name, _db, table_infos, compaction::flush_mode::skip, tasks::is_user_task::no);
try {
rtlogger.info("vnodes_cleanup {} started", ks_name);
co_await task->done();
rtlogger.info("vnodes_cleanup {} finished", ks_name);
} catch (...) {
rtlogger.error("vnodes_cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception());
throw;
}
});
}
rtlogger.info("vnodes_cleanup ended");
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.debug("vnodes_cleanup: cleanup flag cleared");
} catch (const seastar::abort_requested_exception&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (raft::request_aborted&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (const seastar::broken_condition_variable&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (...) {
rtlogger.error("vnodes_cleanup fiber got an error: {}", std::current_exception());
err = true;
}
if (err) {
co_await sleep_abortable(std::chrono::seconds(1), _group0_as);
}
}
}
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder) {
std::optional<abort_source> as;
try {
while (!_group0_as.abort_requested()) {
// Wait for a state change in case we are not a leader yet, or we are are the leader
// and coordinator work is running (in which case 'as' is engaged)
while (!raft.is_leader() || as) {
co_await raft.wait_for_state_change(&_group0_as);
if (as) {
as->request_abort(); // we are no longer a leader, so abort the coordinator
co_await std::exchange(_topology_change_coordinator, make_ready_future<>());
as = std::nullopt;
try {
_tablet_allocator.local().on_leadership_lost();
} catch (...) {
rtlogger.error("tablet_allocator::on_leadership_lost() failed: {}", std::current_exception());
}
}
}
// We are the leader now but that can change any time!
as.emplace();
// start topology change coordinator in the background
_topology_change_coordinator = run_topology_coordinator(
_sys_dist_ks, _gossiper, _messaging.local(), _shared_token_metadata,
_sys_ks.local(), _db.local(), *_group0, _topology_state_machine, _view_building_state_machine, *as, raft,
std::bind_front(&storage_service::raft_topology_cmd_handler, this),
_tablet_allocator.local(),
_cdc_gens.local(),
get_ring_delay(),
_lifecycle_notifier,
_feature_service,
_sl_controller.local(),
_topology_cmd_rpc_tracker);
}
} catch (...) {
rtlogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
}
if (as) {
as->request_abort(); // abort current coordinator if running
co_await std::move(_topology_change_coordinator);
}
}
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const {
std::unordered_set<raft::server_id> ids;
for (const auto& hoep : hoeps) {
std::optional<raft::server_id> id;
if (hoep.has_host_id()) {
id = raft::server_id{hoep.id().uuid()};
} else {
auto hid = _address_map.find_by_addr(hoep.endpoint());
if (!hid) {
throw std::runtime_error(::format("Cannot find a mapping to IP {}", hoep.endpoint()));
}
id = raft::server_id{hid->uuid()};
}
if (!_topology_state_machine._topology.contains(*id)) {
throw std::runtime_error(::format("Node {} is not found in the cluster", *id));
}
ids.insert(*id);
}
return ids;
}
std::unordered_set<raft::server_id> storage_service::ignored_nodes_from_join_params(const join_node_request_params& params) {
const locator::host_id_or_endpoint_list ignore_nodes_params = string_list_to_endpoint_list(params.ignore_nodes);
std::unordered_set<raft::server_id> ignored_nodes{find_raft_nodes_from_hoeps(ignore_nodes_params)};
if (params.replaced_id) {
// insert node that should be replaced to ignore list so that other topology operations
// can ignore it
ignored_nodes.insert(*params.replaced_id);
}
return ignored_nodes;
}
utils::chunked_vector<canonical_mutation> storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, utils::UUID old_request_id) {
topology_mutation_builder builder(write_timestamp);
auto ignored_nodes = ignored_nodes_from_join_params(params);
if (!ignored_nodes.empty()) {
auto bad_id = std::find_if_not(ignored_nodes.begin(), ignored_nodes.end(), [&] (auto n) {
return _topology_state_machine._topology.normal_nodes.contains(n);
});
if (bad_id != ignored_nodes.end()) {
throw std::runtime_error(::format("replace: there is no node with id {} in normal state. Cannot ignore it.", *bad_id));
}
builder.add_ignored_nodes(std::move(ignored_nodes));
}
auto& node_builder = builder.with_node(params.host_id)
.set("node_state", node_state::none)
.set("datacenter", params.datacenter)
.set("rack", params.rack)
.set("release_version", params.release_version)
.set("num_tokens", params.num_tokens)
.set("tokens_string", params.tokens_string)
.set("shard_count", params.shard_count)
.set("ignore_msb", params.ignore_msb)
.set("cleanup_status", cleanup_status::clean)
.set("supported_features", params.supported_features | std::ranges::to<std::set<sstring>>());
if (params.replaced_id) {
node_builder
.set("topology_request", topology_request::replace)
.set("replaced_id", *params.replaced_id);
} else {
node_builder
.set("topology_request", topology_request::join);
}
node_builder.set("request_id", params.request_id);
topology_request_tracking_mutation_builder rtbuilder(params.request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host", params.host_id.uuid())
.set("done", false);
rtbuilder.set("request_type", params.replaced_id ? topology_request::replace : topology_request::join);
utils::chunked_vector<canonical_mutation> muts = {builder.build(), rtbuilder.build()};
if (old_request_id) {
// If this is a replace operation, we need to mark the old request for replaced node as done if exists
// It should be safe to do so here since if the request is still pending it means the topology coordinator does not
// work on it yet, and if the topology coordinator will pick it up meanwhile the write of this new state will fail
topology_mutation_builder builder(write_timestamp);
builder.with_node(*params.replaced_id).del("topology_request");
topology_request_tracking_mutation_builder rtbuilder(old_request_id, _feature_service.topology_requests_type_column);
rtbuilder.done("node was replaced before the request could start");
muts.push_back(builder.build());
muts.push_back(rtbuilder.build());
}
return muts;
}
class join_node_rpc_handshaker : public service::group0_handshaker {
private:
service::storage_service& _ss;
const join_node_request_params& _req;
public:
join_node_rpc_handshaker(service::storage_service& ss, const join_node_request_params& req)
: _ss(ss)
, _req(req)
{}
future<> pre_server_start(const group0_info& g0_info) override {
rtlogger.info("join: sending the join request to {}", g0_info.ip_addr);
co_await utils::get_local_injector().inject("crash_before_group0_join", [](auto& handler) -> future<> {
// This wait ensures that node gossips its state before crashing.
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
throw std::runtime_error("deliberately crashed for orphan remover test");
});
auto result = co_await ser::join_node_rpc_verbs::send_join_node_request(
&_ss._messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, _req);
std::visit(overloaded_functor {
[this] (const join_node_request_result::ok&) {
rtlogger.info("join: request to join placed, waiting"
" for the response from the topology coordinator");
if (utils::get_local_injector().enter("pre_server_start_drop_expiring")) {
_ss._gossiper.get_mutable_address_map().force_drop_expiring_entries();
}
_ss._join_node_request_done.set_value();
},
[] (const join_node_request_result::rejected& rej) {
throw std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason));
},
}, result.result);
co_return;
}
future<bool> post_server_start(const group0_info& g0_info, abort_source& as) override {
// Group 0 has been started. Allow the join_node_response to be handled.
_ss._join_node_group0_started.set_value();
// Processing of the response is done in `join_node_response_handler`.
// Wait for it to complete. If the topology coordinator fails to
// deliver the rejection, it won't complete. In such a case, the
// operator is responsible for shutting down the joining node.
co_await _ss._join_node_response_done.get_shared_future(as);
rtlogger.info("join: success");
co_return true;
}
};
future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstring ks) const {
auto ongoing_ks_rf_change = [&] (utils::UUID request_id) -> future<bool> {
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
co_return std::holds_alternative<global_topology_request>(req_entry.request_type) &&
std::get<global_topology_request>(req_entry.request_type) == global_topology_request::keyspace_rf_change &&
req_entry.new_keyspace_rf_change_ks_name.has_value() && req_entry.new_keyspace_rf_change_ks_name.value() == ks;
};
if (_topology_state_machine._topology.global_request_id.has_value()) {
auto req_id = _topology_state_machine._topology.global_request_id.value();
if (co_await ongoing_ks_rf_change(req_id)) {
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.paused_rf_change_requests) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.global_requests_queue) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
}
co_return false;
}
future<> storage_service::raft_initialize_discovery_leader(const join_node_request_params& params) {
if (params.replaced_id.has_value()) {
throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster"));
}
if (params.num_tokens == 0 && params.tokens_string.empty()) {
throw std::runtime_error("Cannot start the first node in the cluster as zero-token");
}
const auto new_group0_state_id = raft_group0_client::generate_group0_state_id(utils::UUID{});
auto write_timestamp = utils::UUID_gen::micros_timestamp(new_group0_state_id);
rtlogger.info("adding myself as the first node to the topology");
auto insert_join_request_mutations = build_mutation_from_join_params(params, write_timestamp);
// We are the first node and we define the cluster.
// Set the enabled_features field to our features.
topology_mutation_builder builder(write_timestamp);
builder.add_enabled_features(params.supported_features | std::ranges::to<std::set<sstring>>())
.set_upgrade_state_done(); // Start right in the topology-on-raft mode
auto enable_features_mutation = builder.build();
insert_join_request_mutations.push_back(std::move(enable_features_mutation));
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(2, write_timestamp);
insert_join_request_mutations.emplace_back(std::move(sl_status_mutation));
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_auth_version_mutation(write_timestamp, db::system_keyspace::auth_version_t::v2));
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
for (auto& m : sl_driver_mutations) {
insert_join_request_mutations.emplace_back(m);
}
if (!utils::get_local_injector().is_enabled("skip_vb_v2_version_mut")) {
insert_join_request_mutations.emplace_back(
co_await _sys_ks.local().make_view_builder_version_mutation(write_timestamp, db::system_keyspace::view_builder_version_t::v2));
}
topology_change change{std::move(insert_join_request_mutations)};
auto history_append = db::system_keyspace::make_group0_history_state_id_mutation(new_group0_state_id,
_migration_manager.local().get_group0_client().get_history_gc_duration(), "bootstrap: adding myself as the first node to the topology");
auto mutation_creator_addr = _sys_ks.local().local_db().get_token_metadata().get_topology().my_address();
co_await write_mutations_to_database(*this, _qp.proxy(), mutation_creator_addr, std::move(change.mutations));
co_await _qp.proxy().mutate_locally({history_append}, nullptr);
}
future<> storage_service::initialize_done_topology_upgrade_state() {
const sstring insert_query = format("UPDATE {}.{} SET upgrade_state='done' WHERE key='topology'",
db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
co_await _qp.execute_internal(
insert_query,
{},
cql3::query_processor::cache_internal::no).discard_result();
}
future<> storage_service::update_topology_with_local_metadata(raft::server& raft_server) {
// TODO: include more metadata here
auto local_shard_count = smp::count;
auto local_ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits();
auto local_release_version = version::release();
auto local_supported_features = _feature_service.supported_feature_set() | std::ranges::to<std::set<sstring>>();
auto synchronized = [&] () {
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error{"Removed from topology while performing metadata update"};
}
auto& replica_state = it->second;
return replica_state.shard_count == local_shard_count
&& replica_state.ignore_msb == local_ignore_msb
&& replica_state.release_version == local_release_version
&& replica_state.supported_features == local_supported_features;
};
// We avoid performing a read barrier if we're sure that our metadata stored in topology
// is the same as local metadata. Note that only we can update our metadata, other nodes cannot.
//
// We use a persisted flag `must_update_topology` to avoid the following scenario:
// 1. the node restarts and its metadata changes
// 2. the node commits the new metadata to topology, but before the update is applied
// to the local state machine, the node crashes
// 3. then the metadata changes back to old values and node restarts again
// 4. the local state machine tells us that we're in sync, which is wrong
// If the persisted flag is true, it tells us that we attempted a metadata change earlier,
// forcing us to perform a read barrier even when the local state machine tells us we're in sync.
if (synchronized() && !(co_await _sys_ks.local().get_must_synchronize_topology())) {
co_return;
}
while (true) {
rtlogger.info("refreshing topology to check if it's synchronized with local metadata");
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (synchronized()) {
break;
}
// It might happen that, in the previous run, the node commits a command
// that adds support for a feature, crashes before applying it and now
// it is not safe to disable support for it. If there is an attempt to
// downgrade the node then `enable_features_on_startup` called much
// earlier won't catch it, we only can do it here after performing
// a read barrier - so we repeat it here.
//
// Fortunately, there is no risk that this feature was marked as enabled
// because it requires that the current node responded to a barrier
// request - which will fail in this situation.
const auto& enabled_features = _topology_state_machine._topology.enabled_features;
const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features();
_feature_service.check_features(enabled_features, unsafe_to_disable_features);
rtlogger.info("updating topology with local metadata");
co_await _sys_ks.local().set_must_synchronize_topology(true);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("shard_count", local_shard_count)
.set("ignore_msb", local_ignore_msb)
.set("release_version", local_release_version)
.set("supported_features", local_supported_features);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("update topology with local metadata:"
" concurrent operation is detected, retrying.");
}
}
co_await _sys_ks.local().set_must_synchronize_topology(false);
}
future<> storage_service::await_tablets_rebuilt(raft::server_id replaced_id) {
auto is_drained = [&] {
return !get_token_metadata().tablets().has_replica_on(locator::host_id(replaced_id.uuid()));
};
if (!is_drained()) {
slogger.info("Waiting for tablet replicas from the replaced node to be rebuilt");
co_await _topology_state_machine.event.when([&] {
return is_drained();
});
}
slogger.info("Tablet replicas from the replaced node have been rebuilt");
}
future<> storage_service::start_sys_dist_ks() const {
slogger.info("starting system distributed keyspace shards");
return _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
}
future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
start_hint_manager start_hm,
gms::generation_type new_generation) {
gms::application_state_map app_states;
std::optional<replacement_info> ri;
std::optional<raft_group0::replace_info> raft_replace_info;
if (is_replacing()) {
if (_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
ri = co_await prepare_replacement_info(initial_contact_nodes);
const auto& my_location = _snitch.local()->get_location();
if (my_location != ri->dc_rack) {
auto msg = fmt::format("Cannot replace node {} with a node on a different data center or rack. Current location={}/{}, new location={}/{}",
ri->host_id, ri->dc_rack.dc, ri->dc_rack.rack, my_location.dc, my_location.rack);
slogger.error("{}", msg);
throw std::runtime_error(msg);
}
raft_replace_info = raft_group0::replace_info {
.raft_id = raft::server_id{ri->host_id.uuid()},
};
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes);
} else {
slogger.info("Performing gossip shadow round, initial_contact_nodes={}", initial_contact_nodes);
co_await _gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::no);
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
co_await _gossiper.reset_endpoint_state_map();
for (const auto& [host_id, st] : loaded_endpoints) {
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
}
auto features = _feature_service.supported_feature_set();
slogger.info("Save advertised features list in the 'system.{}' table", db::system_keyspace::LOCAL);
// Save the advertised feature set to system.local table after
// all remote feature checks are complete and after gossip shadow rounds are done.
// At this point, the final feature set is already determined before the node joins the ring.
co_await _sys_ks.local().save_local_supported_features(features);
// If this is a restarting node, we should update tokens before gossip starts
auto my_tokens = co_await _sys_ks.local().get_saved_tokens();
bool restarting_normal_node = _sys_ks.local().bootstrap_complete() && !is_replacing();
if (restarting_normal_node) {
if (my_tokens.empty() && _db.local().get_config().join_ring()) {
throw std::runtime_error("Cannot restart with join_ring=true because the node has already joined the cluster as a zero-token node");
}
if (!my_tokens.empty() && !_db.local().get_config().join_ring()) {
throw std::runtime_error("Cannot restart with join_ring=false because the node already owns tokens");
}
slogger.info("Restarting a node in NORMAL status");
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
auto local_host_id = get_token_metadata().get_my_id();
utils::get_local_injector().inject("stop_after_saving_tokens",
[] { std::raise(SIGSTOP); });
auto broadcast_rpc_address = get_token_metadata_ptr()->get_topology().my_cql_address();
app_states.emplace(gms::application_state::NET_VERSION, versioned_value::network_version());
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, versioned_value::release_version());
app_states.emplace(gms::application_state::SUPPORTED_FEATURES, versioned_value::supported_features(features));
app_states.emplace(gms::application_state::CACHE_HITRATES, versioned_value::cache_hitrates(""));
app_states.emplace(gms::application_state::SCHEMA_TABLES_VERSION, versioned_value(db::schema_tables::version));
app_states.emplace(gms::application_state::RPC_READY, versioned_value::cql_ready(false));
app_states.emplace(gms::application_state::VIEW_BACKLOG, versioned_value(""));
app_states.emplace(gms::application_state::SCHEMA, versioned_value::schema(_db.local().get_version()));
if (restarting_normal_node) {
app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens));
}
app_states.emplace(gms::application_state::SNITCH_NAME, versioned_value::snitch_name(_snitch.local()->get_name()));
app_states.emplace(gms::application_state::SHARD_COUNT, versioned_value::shard_count(smp::count));
app_states.emplace(gms::application_state::IGNORE_MSB_BITS, versioned_value::ignore_msb_bits(_db.local().get_config().murmur3_partitioner_ignore_msb_bits()));
for (auto&& s : _snitch.local()->get_app_states()) {
app_states.emplace(s.first, std::move(s.second));
}
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable {
_migration_manager.local().passive_announce(std::move(schema_version));
});
_listeners.emplace_back(make_lw_shared(std::move(schema_change_announce)));
slogger.info("Starting up server gossip");
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
co_await _gossiper.start_gossiping(new_generation, app_states);
utils::get_local_injector().inject("stop_after_starting_gossiping",
[] { std::raise(SIGSTOP); });
SCYLLA_ASSERT(_group0);
join_node_request_params join_params {
.host_id = _group0->load_my_id(),
.cluster_name = _db.local().get_config().cluster_name(),
.snitch_name = _db.local().get_snitch_name(),
.datacenter = _snitch.local()->get_datacenter(),
.rack = _snitch.local()->get_rack(),
.release_version = version::release(),
.num_tokens = _db.local().get_config().join_ring() ? _db.local().get_config().num_tokens() : 0,
.tokens_string = _db.local().get_config().join_ring() ? _db.local().get_config().initial_token() : sstring(),
.shard_count = smp::count,
.ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits(),
.supported_features = _feature_service.supported_feature_set() | std::ranges::to<std::vector<sstring>>(),
.request_id = utils::UUID_gen::get_time_UUID(),
};
if (raft_replace_info) {
join_params.replaced_id = raft_replace_info->raft_id;
join_params.ignore_nodes = utils::split_comma_separated_list(_db.local().get_config().ignore_dead_nodes_for_replace());
if (!locator::check_host_ids_contain_only_uuid(join_params.ignore_nodes)) {
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
" be disabled in a future release. Please use host IDs instead. Provided values: {}",
_db.local().get_config().ignore_dead_nodes_for_replace());
}
}
// setup_group0 will do nothing if the node has already set up group 0 in setup_group0_if_exist in main.cc, which
// happens when the node is restarting and not joining the new group 0 in the Raft-based recovery procedure.
// It does not matter which handshaker we choose in this case since it will not be used.
//
// We use the legacy handshaker in the Raft-based recovery procedure to join the new group 0 without involving
// the topology coordinator. We can assume this node has already been accepted by the topology coordinator once
// and joined topology.
::shared_ptr<group0_handshaker> handshaker =
!_db.local().get_config().recovery_leader.is_set()
? ::make_shared<join_node_rpc_handshaker>(*this, join_params)
: _group0->make_legacy_handshaker(raft::is_voter::no);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, std::move(handshaker),
*this, _qp, _migration_manager.local(), join_params);
raft::server& raft_server = _group0->group0_server();
// This is the moment when the locator::topology has gathered information about other nodes
// in the cluster -- either through gossiper, or by loading it from disk -- so it's safe
// to start the hint managers.
if (start_hm) {
co_await proxy.invoke_on_all([] (storage_proxy& local_proxy) {
return local_proxy.start_hints_manager();
});
}
set_mode(mode::JOINING);
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
rtlogger.info("topology changes are using raft");
// Prevent shutdown hangs. We cannot count on wait_for_group0_stop while we are
// joining group 0.
auto sub = _abort_source.subscribe([this] () noexcept {
_group0_as.request_abort();
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
});
// start topology coordinator fiber
_raft_state_monitor = raft_state_monitor_fiber(raft_server, _group0->hold_group0_gate());
// start vnodes cleanup fiber
_sstable_vnodes_cleanup_fiber = sstable_vnodes_cleanup_fiber(raft_server, _group0->hold_group0_gate(), proxy);
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
// process may access those tables.
co_await start_sys_dist_ks();
if (_sys_ks.local().bootstrap_complete()) {
if (_topology_state_machine._topology.left_nodes.contains(raft_server.id())) {
throw std::runtime_error("A node that already left the cluster cannot be restarted");
}
} else {
if (!_db.local().get_config().join_ring() && !_feature_service.zero_token_nodes) {
throw std::runtime_error("Cannot boot a node with join_ring=false because the cluster does not support the ZERO_TOKEN_NODES feature");
}
co_await utils::get_local_injector().inject("crash_before_topology_request_completion", [] (auto& handler) -> future<> {
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
throw std::runtime_error("Crashed in crash_before_topology_request_completion");
});
auto err = co_await wait_for_topology_request_completion(join_params.request_id);
if (!err.empty()) {
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
}
if (raft_replace_info) {
co_await await_tablets_rebuilt(raft_replace_info->raft_id);
}
}
co_await update_topology_with_local_metadata(raft_server);
// Node state is enough to know that bootstrap has completed, but to make legacy code happy
// let it know that the bootstrap is completed as well
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
set_mode(mode::NORMAL);
// Load schema version into the database object
co_await db::schema_tables::update_schema_version_and_announce(_sys_ks, proxy, co_await db::schema_tables::get_group0_schema_version(_sys_ks.local()));
utils::get_local_injector().inject("stop_after_setting_mode_to_normal_raft_topology",
[] { std::raise(SIGSTOP); });
if (get_token_metadata().sorted_tokens().empty()) {
auto err = ::format("join_topology: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local());
// Initializes monitor only after updating local topology.
start_tablet_split_monitor();
auto ids = _topology_state_machine._topology.normal_nodes |
std::views::keys |
std::views::transform([] (raft::server_id id) { return locator::host_id{id.uuid()}; }) |
std::ranges::to<std::unordered_set<locator::host_id>>();
co_await _gossiper.notify_nodes_on_up(std::move(ids));
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(sstring keyspace, std::optional<table_id> table_id) const {
locator::effective_replication_map_ptr erm;
utils::chunked_vector<token> tokens;
if (table_id.has_value()) {
auto& cf = _db.local().find_column_family(*table_id);
erm = cf.get_effective_replication_map();
} else {
auto& ks = _db.local().find_keyspace(keyspace);
erm = ks.get_static_effective_replication_map();
}
const auto& tm = *erm->get_token_metadata_ptr();
if (erm->get_replication_strategy().uses_tablets()) {
const auto& tablets = tm.tablets().get_tablet_map(*table_id);
tokens = co_await tablets.get_sorted_tokens();
} else {
tokens = tm.sorted_tokens();
}
co_return (co_await locator::get_range_to_address_map(erm, std::move(tokens))) |
std::views::transform([&] (auto tid) { return std::make_pair(tid.first,
tid.second | std::views::transform([&] (auto id) { return _address_map.get(id); }) | std::ranges::to<inet_address_vector_replica_set>()); }) |
std::ranges::to<std::unordered_map>();
}
future<> storage_service::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
slogger.debug("endpoint={} on_join: permit_id={}", endpoint, pid);
co_await on_change(endpoint, id, ep_state->get_application_state_map(), pid);
}
future<> storage_service::on_alive(gms::inet_address endpoint, locator::host_id host_id, gms::endpoint_state_ptr state, gms::permit_id pid) {
const auto& tm = get_token_metadata();
slogger.debug("endpoint={}/{} on_alive: permit_id={}", endpoint, host_id, pid);
const auto* node = tm.get_topology().find_node(host_id);
if (node && node->is_member()) {
co_await notify_up(endpoint, host_id);
} else {
slogger.debug("ignore on_alive since endpoint {}/{} is not a topology member", endpoint, host_id);
}
}
future<> storage_service::on_change(gms::inet_address endpoint, locator::host_id host_id, const gms::application_state_map& states_, gms::permit_id pid) {
// copy the states map locally since the coroutine may yield
auto states = states_;
slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid);
auto ep_state = _gossiper.get_endpoint_state_ptr(host_id);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
co_return;
}
const auto& tm = get_token_metadata();
const auto* node = tm.get_topology().find_node(host_id);
// The check peers[host_id] == endpoint is needed when a node changes
// its IP - on_change can be called by the gossiper for old IP as part
// of its removal, after handle_state_normal has already been called for
// the new one. Without the check, the do_update_system_peers_table call
// overwrites the IP back to its old value.
// In essence, the code under the 'if' should fire if the given IP belongs
// to a cluster member.
if (node && node->is_member() && (co_await _sys_ks.local().get_ip_from_peers_table(host_id)) == endpoint) {
if (!is_me(endpoint)) {
slogger.debug("endpoint={}/{} on_change: updating system.peers table", endpoint, host_id);
if (auto info = get_peer_info_for_update(host_id, states)) {
co_await _sys_ks.local().update_peer_info(endpoint, host_id, *info);
}
}
if (states.contains(application_state::RPC_READY)) {
slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready());
co_await notify_cql_change(endpoint, host_id, ep_state->is_cql_ready());
}
if (auto it = states.find(application_state::INTERNAL_IP); it != states.end()) {
co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(it->second.value()), host_id);
}
}
}
future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip, locator::host_id host_id) {
if (!_snitch.local()->prefer_local()) {
co_return;
}
const auto& topo = get_token_metadata().get_topology();
if (topo.get_datacenter() == topo.get_datacenter(host_id) && _messaging.local().get_preferred_ip(ep) != local_ip) {
slogger.debug("Initiated reconnect to an Internal IP {} for the {}", local_ip, ep);
co_await _messaging.invoke_on_all([ep, local_ip] (auto& local_ms) {
local_ms.cache_preferred_ip(ep, local_ip);
});
}
}
future<> storage_service::on_remove(gms::inet_address endpoint, locator::host_id host_id, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_remove: permit_id={}", endpoint, host_id, pid);
return make_ready_future<>();
}
future<> storage_service::on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_dead: permit_id={}", endpoint, id, pid);
return notify_down(endpoint, id);
}
future<> storage_service::on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_restart: permit_id={}", endpoint, id, pid);
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (id != my_host_id() && _gossiper.is_alive(id)) {
return on_dead(endpoint, id, state, pid);
}
return make_ready_future();
}
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
return db::system_keyspace::peer_info{};
}
auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
return info;
}
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map) {
std::optional<db::system_keyspace::peer_info> ret;
auto get_peer_info = [&] () -> db::system_keyspace::peer_info& {
if (!ret) {
ret.emplace();
}
return *ret;
};
auto set_field = [&]<typename T> (std::optional<T>& field,
const gms::versioned_value& value,
std::string_view name,
bool managed_by_raft)
{
if (managed_by_raft) {
return;
}
try {
field = T(value.value());
} catch (...) {
on_internal_error(slogger, fmt::format("failed to parse {} {} for {}: {}", name, value.value(),
endpoint, std::current_exception()));
}
};
for (const auto& [state, value] : app_state_map) {
switch (state) {
case application_state::DC:
set_field(get_peer_info().data_center, value, "data_center", true);
break;
case application_state::INTERNAL_IP:
set_field(get_peer_info().preferred_ip, value, "preferred_ip", false);
break;
case application_state::RACK:
set_field(get_peer_info().rack, value, "rack", true);
break;
case application_state::RELEASE_VERSION:
set_field(get_peer_info().release_version, value, "release_version", true);
break;
case application_state::RPC_ADDRESS:
set_field(get_peer_info().rpc_address, value, "rpc_address", false);
break;
case application_state::SCHEMA:
set_field(get_peer_info().schema_version, value, "schema_version", false);
break;
case application_state::TOKENS:
// tokens are updated separately
break;
case application_state::SUPPORTED_FEATURES:
set_field(get_peer_info().supported_features, value, "supported_features", true);
break;
default:
break;
}
}
return ret;
}
std::optional<locator::endpoint_dc_rack> storage_service::get_dc_rack_for(const gms::endpoint_state& ep_state) {
auto* dc = ep_state.get_application_state_ptr(gms::application_state::DC);
auto* rack = ep_state.get_application_state_ptr(gms::application_state::RACK);
if (!dc || !rack) {
return std::nullopt;
}
return locator::endpoint_dc_rack{
.dc = dc->value(),
.rack = rack->value(),
};
}
std::optional<locator::endpoint_dc_rack> storage_service::get_dc_rack_for(locator::host_id endpoint) {
auto eps = _gossiper.get_endpoint_state_ptr(endpoint);
if (!eps) {
return std::nullopt;
}
return get_dc_rack_for(*eps);
}
void endpoint_lifecycle_notifier::register_subscriber(endpoint_lifecycle_subscriber* subscriber)
{
_subscribers.add(subscriber);
}
future<> endpoint_lifecycle_notifier::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept
{
return _subscribers.remove(subscriber);
}
future<> storage_service::stop_transport() {
if (!_transport_stopped.has_value()) {
promise<> stopped;
_transport_stopped = stopped.get_future();
seastar::async([this] {
slogger.info("Stop transport: starts");
slogger.debug("shutting down migration manager");
_migration_manager.invoke_on_all(&service::migration_manager::drain).get();
shutdown_protocol_servers().get();
slogger.info("Stop transport: shutdown rpc and cql server done");
_gossiper.container().invoke_on_all(&gms::gossiper::shutdown).get();
slogger.info("Stop transport: stop_gossiping done");
_messaging.invoke_on_all(&netw::messaging_service::shutdown).get();
slogger.info("Stop transport: shutdown messaging_service done");
_stream_manager.invoke_on_all(&streaming::stream_manager::shutdown).get();
slogger.info("Stop transport: shutdown stream_manager done");
slogger.info("Stop transport: done");
}).forward_to(std::move(stopped));
}
return _transport_stopped.value();
}
future<> storage_service::drain_on_shutdown() {
SCYLLA_ASSERT(this_shard_id() == 0);
return (_operation_mode == mode::DRAINING || _operation_mode == mode::DRAINED) ?
_drain_finished.get_future() : do_drain();
}
void storage_service::set_group0(raft_group0& group0) {
_group0 = &group0;
}
future<> storage_service::init_address_map(gms::gossip_address_map& address_map) {
_ip_address_updater = make_shared<ip_address_updater>(address_map, *this);
_gossiper.register_(_ip_address_updater);
co_return;
}
future<> storage_service::uninit_address_map() {
return _gossiper.unregister_(_ip_address_updater);
}
future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
start_hint_manager start_hm, gms::generation_type new_generation) {
SCYLLA_ASSERT(this_shard_id() == 0);
if (_sys_ks.local().was_decommissioned()) {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless "
"all existing data is removed and the node is bootstrapped again");
slogger.error("{}", msg);
throw std::runtime_error(msg);
}
set_mode(mode::STARTING);
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints = co_await _sys_ks.local().load_endpoint_state();
if (_group0->joined_group0()) {
// Recover the endpoint state of the node replacing with the same IP. Its IP mapping is not in system.peers.
const auto& topo = _topology_state_machine._topology;
for (const auto& [id, replica_state]: topo.transition_nodes) {
auto host_id = locator::host_id(id.uuid());
if (replica_state.state == node_state::replacing && !loaded_endpoints.contains(host_id)) {
auto replaced_id = std::get<replace_param>(topo.req_param.at(id)).replaced_id;
if (const auto it = loaded_endpoints.find(locator::host_id(replaced_id.uuid())); it != loaded_endpoints.end()) {
auto ip = it->second.endpoint;
slogger.info("Adding node {}/{} that is replacing {}/{} to loaded_endpoints", host_id, ip, replaced_id, ip);
gms::loaded_endpoint_state st{.endpoint = ip};
loaded_endpoints.emplace(host_id, std::move(st));
}
}
}
}
// Seeds are now only used as the initial contact point nodes. If the
// loaded_endpoints are empty which means this node is a completely new
// node, we use the nodes specified in seeds as the initial contact
// point nodes, otherwise use the peer nodes persisted in system table.
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints | std::views::transform([] (const auto& x) {
return x.second.endpoint;
}) | std::ranges::to<std::unordered_set<gms::inet_address>>();
gms::inet_address recovery_leader_ip;
locator::host_id recovery_leader_id;
if (_db.local().get_config().recovery_leader.is_set()) {
if (_group0->joined_group0()) {
// Something is wrong unless it is a noninitial (and unneeded) restart while recreating the new group 0 in
// the Raft-based recovery procedure.
slogger.warn(
"recovery_leader is set to {} but persistent group 0 ID is present: {}. "
"The recovery_leader option will be ignored. If you are trying to run "
"the Raft-based recovery procedure, please follow the steps in the documentation.",
_db.local().get_config().recovery_leader(), _group0->load_my_id());
} else {
recovery_leader_id = locator::host_id(_db.local().get_config().recovery_leader());
auto recovery_leader_it = loaded_endpoints.find(recovery_leader_id);
if (recovery_leader_id != my_host_id() && recovery_leader_it == loaded_endpoints.end()) {
throw std::runtime_error(
fmt::format("Recovery leader {} unrecognised as a cluster member, loaded endpoints: {}",
recovery_leader_id, loaded_endpoints));
}
recovery_leader_ip = recovery_leader_id == my_host_id() ?
get_broadcast_address() : recovery_leader_it->second.endpoint;
initial_contact_nodes = std::unordered_set{recovery_leader_ip};
if (!_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot bootstrap in the Raft-based recovery procedure");
}
}
}
if (recovery_leader_id) {
// The Raft-based recovery procedure.
slogger.info("Performing Raft-based recovery procedure with recovery leader {}/{}",
recovery_leader_id, recovery_leader_ip);
auto g0_info = co_await _group0->discover_group0(std::vector{recovery_leader_ip}, _qp);
if (g0_info.id.uuid() != recovery_leader_id.uuid()) {
throw std::runtime_error(fmt::format(
"Raft-based recovery procedure - found group 0 {} with leader {}/{} not matching "
"recovery leader {}/{}. The procedure must be restarted.",
g0_info.group0_id, g0_info.id, g0_info.ip_addr, recovery_leader_id, recovery_leader_ip));
}
slogger.info("Raft-based recovery procedure - found group 0 with ID {}", g0_info.group0_id);
} else if (_group0->joined_group0()) {
slogger.info("The node is already in group 0");
} else if (_sys_ks.local().bootstrap_complete()) {
// setup_group0_if_exist() should already throw in this case, so do internal error here.
on_internal_error(slogger, "The node does not have group 0 but has already completed bootstrap. This should not happen.");
} else {
// We are not in group 0 and we are just bootstrapping. We need to discover group 0.
const std::vector<gms::inet_address> contact_nodes{initial_contact_nodes.begin(), initial_contact_nodes.end()};
auto g0_info = co_await _group0->discover_group0(contact_nodes, _qp);
slogger.info("Found group 0 with ID {}, with leader of ID {} and IP {}",
g0_info.group0_id, g0_info.id, g0_info.ip_addr);
if (_group0->load_my_id() == g0_info.id) {
// We're creating the group 0.
slogger.info("We are creating the group 0. Start in raft topology operations mode");
} else {
// Ask the current member of the raft group about which mode to use
auto params = join_node_query_params {};
auto result = co_await ser::join_node_rpc_verbs::send_join_node_query(
&_messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, std::move(params));
switch (result.topo_mode) {
case join_node_query_result::topology_mode::raft:
slogger.info("Will join existing cluster in raft topology operations mode");
break;
case join_node_query_result::topology_mode::legacy:
throw std::runtime_error(
"Cannot join existing cluster in legacy topology operations mode because it is no longer supported. "
"Enable consistent topology changes with Raft.");
}
}
}
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints | std::views::keys, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
co_return co_await join_topology(proxy, std::move(initial_contact_nodes),
std::move(loaded_endpoints), start_hm, new_generation);
}
future<token_metadata_change> storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr, const schema_getter& schema_getter) {
SCYLLA_ASSERT(this_shard_id() == 0);
std::exception_ptr ex;
token_metadata_change change;
// Collect open sessions
{
auto session = _topology_state_machine._topology.session;
if (session) {
change.open_sessions.insert(session);
}
for (auto&& [table, tables] : tmptr->tablets().all_table_groups()) {
const auto& tmap = tmptr->tablets().get_tablet_map(table);
for (auto&& [tid, trinfo]: tmap.transitions()) {
if (trinfo.session_id) {
auto id = session_id(trinfo.session_id);
change.open_sessions.insert(id);
}
}
}
}
try {
auto base_shard = this_shard_id();
change.pending_token_metadata_ptr[base_shard] = tmptr;
auto& sharded_token_metadata = _shared_token_metadata.container();
// clone a local copy of updated token_metadata on all other shards
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
change.pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
});
// Precalculate new effective_replication_map for all keyspaces
// and clone to all shards;
//
// TODO: at the moment create on shard 0 first
// but in the future we may want to use hash() % smp::count
// to evenly distribute the load.
auto replications = schema_getter.get_keyspaces_replication();
for (const auto& [ks_name, rs] : replications) {
if (rs->is_per_table()) {
continue;
}
auto erm = co_await get_erm_factory().create_static_effective_replication_map(rs, tmptr);
change.pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm));
}
co_await container().invoke_on_others([&] (storage_service& ss) -> future<> {
auto replications = schema_getter.get_keyspaces_replication();
for (const auto& [ks_name, rs] : replications) {
if (rs->is_per_table()) {
continue;
}
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
auto erm = co_await ss.get_erm_factory().create_static_effective_replication_map(rs, tmptr);
change.pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm));
}
});
// Prepare per-table erms.
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
auto replications = schema_getter.get_keyspaces_replication();
co_await schema_getter.for_each_table_schema_gently([&] (table_id id, schema_ptr table_schema) {
auto rs = replications.at(table_schema->ks_name());
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs->maybe_as_per_table()) {
erm = pt_rs->make_replication_map(id, tmptr);
} else {
erm = change.pending_effective_replication_maps[this_shard_id()][table_schema->ks_name()];
}
if (table_schema->is_view()) {
change.pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
} else {
change.pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
}
return make_ready_future();
});
});
} catch (...) {
ex = std::current_exception();
}
// Rollback on metadata replication error
if (ex) {
try {
co_await smp::invoke_on_all([&] () -> future<> {
auto tmptr = std::move(change.pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(change.pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(change.pending_table_erms[this_shard_id()]);
auto view_erms = std::move(change.pending_view_erms[this_shard_id()]);
co_await utils::clear_gently(erms);
co_await utils::clear_gently(tmptr);
co_await utils::clear_gently(table_erms);
co_await utils::clear_gently(view_erms);
});
} catch (...) {
slogger.warn("Failure to reset pending token_metadata in cleanup path: {}. Ignored.", std::current_exception());
}
std::rethrow_exception(std::move(ex));
}
co_return change;
}
void storage_service::commit_token_metadata_change(token_metadata_change& change) noexcept {
slogger.debug("Replicating token_metadata");
// Apply changes on a single shard
try {
_shared_token_metadata.set(std::move(change.pending_token_metadata_ptr[this_shard_id()]));
_groups_manager.update(_shared_token_metadata.get());
auto& db =_db.local();
auto& erms = change.pending_effective_replication_maps[this_shard_id()];
for (auto it = erms.begin(); it != erms.end(); ) {
auto& ks = db.find_keyspace(it->first);
ks.update_static_effective_replication_map(std::move(it->second));
it = erms.erase(it);
}
auto& table_erms = change.pending_table_erms[this_shard_id()];
auto& view_erms = change.pending_view_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
// Update base/views effective_replication_maps atomically.
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
for (const auto& view_ptr : cf.views()) {
const auto& view_id = view_ptr->id();
auto& view = db.find_column_family(view_id);
auto view_it = view_erms.find(view_id);
if (view_it == view_erms.end()) {
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
}
view.update_effective_replication_map(std::move(view_it->second));
if (view.uses_tablets()) {
register_tablet_split_candidate(view_it->first);
}
view_erms.erase(view_it);
}
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
}
it = table_erms.erase(it);
}
if (!view_erms.empty()) {
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
}
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(change.open_sessions);
for (auto id : change.open_sessions) {
session_mgr.create_session(id);
}
} catch (...) {
// applying the changes on all shards should never fail
// it will end up in an inconsistent state that we can't recover from.
slogger.error("Failed to apply token_metadata changes: {}. Aborting.", std::current_exception());
abort();
}
}
future<> token_metadata_change::destroy() {
return smp::invoke_on_all([this] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = nullptr;
co_await utils::clear_gently(pending_effective_replication_maps[this_shard_id()]);
co_await utils::clear_gently(pending_table_erms[this_shard_id()]);
co_await utils::clear_gently(pending_view_erms[this_shard_id()]);
});
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
slogger.debug("Replicating token_metadata to all cores");
class db_schema_getter : public schema_getter {
private:
sharded<replica::database>& _db;
public:
db_schema_getter(sharded<replica::database>& db) : _db(db) {};
virtual flat_hash_map<sstring, locator::replication_strategy_ptr> get_keyspaces_replication() const override {
flat_hash_map<sstring, locator::replication_strategy_ptr> out;
for (auto& [name, ks] : _db.local().get_keyspaces()) {
out.emplace(name, ks.get_replication_strategy_ptr());
}
return out;
};
virtual future<> for_each_table_schema_gently(std::function<future<>(table_id, schema_ptr)> f) const override {
auto ff = [&f](table_id id, lw_shared_ptr<replica::table> t) -> future<> {
return f(id, t->schema());
};
return _db.local().get_tables_metadata().for_each_table_gently(ff);
};
};
db_schema_getter getter{_db};
auto change = co_await prepare_token_metadata_change(tmptr, getter);
co_await container().invoke_on_all([&change] (storage_service& ss) {
ss.commit_token_metadata_change(change);
});
co_await change.destroy();
co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state().
flush_pending_repair_time_update(_db.local());
}
future<> storage_service::stop() {
co_await _migration_manager.local().unplug_storage_service();
// if there is a background "isolate" shutdown
// in progress, we need to sync with it. Mostly
// relevant for tests
if (_transport_stopped.has_value()) {
co_await stop_transport();
}
co_await uninit_messaging_service();
// make sure nobody uses the semaphore
_listeners.clear();
co_await _tablets_module->stop();
co_await _node_ops_module->stop();
co_await _global_topology_requests_module->stop();
co_await _async_gate.close();
_tablet_split_monitor_event.signal();
co_await std::move(_tablet_split_monitor);
}
future<> storage_service::wait_for_group0_stop() {
if (!_group0_as.abort_requested()) {
_group0_as.request_abort();
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
_view_building_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_vnodes_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
}
}
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes) {
slogger.debug("Starting shadow gossip round to check for endpoint collision");
return seastar::async([this, initial_contact_nodes] {
bool found_bootstrapping_node = false;
auto local_features = _feature_service.supported_feature_set();
do {
slogger.info("Performing gossip shadow round");
_gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::yes).get();
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
auto addr = get_broadcast_address();
if (!_gossiper.is_safe_for_bootstrap(addr)) {
throw std::runtime_error(::format("A node with address {} already exists, cancelling join. "
"Use replace_address if you want to replace this node.", addr));
}
} while (found_bootstrapping_node);
slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)");
_gossiper.reset_endpoint_state_map().get();
});
}
future<> storage_service::remove_endpoint(inet_address endpoint, gms::permit_id pid) {
auto host_id_opt = _gossiper.try_get_host_id(endpoint);
if (host_id_opt) {
co_await _gossiper.remove_endpoint(*host_id_opt, pid);
}
try {
co_await _sys_ks.local().remove_endpoint(endpoint);
} catch (...) {
slogger.error("fail to remove endpoint={}: {}", endpoint, std::current_exception());
}
}
future<storage_service::replacement_info>
storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes) {
locator::host_id replace_host_id;
gms::inet_address replace_address;
auto& cfg = _db.local().get_config();
if (!cfg.replace_node_first_boot().empty()) {
replace_host_id = locator::host_id(utils::UUID(cfg.replace_node_first_boot()));
} else if (!cfg.replace_address_first_boot().empty()) {
replace_address = gms::inet_address(cfg.replace_address_first_boot());
slogger.warn("The replace_address_first_boot={} option is deprecated. Please use the replace_node_first_boot option", replace_address);
} else if (!cfg.replace_address().empty()) {
replace_address = gms::inet_address(cfg.replace_address());
slogger.warn("The replace_address={} option is deprecated. Please use the replace_node_first_boot option", replace_address);
} else {
on_internal_error(slogger, "No replace_node or replace_address configuration options found");
}
slogger.info("Gathering node replacement information for {}/{}", replace_host_id, replace_address);
auto seeds = _gossiper.get_seeds();
if (seeds.size() == 1 && seeds.contains(replace_address)) {
throw std::runtime_error(::format("Cannot replace_address {} because no seed node is up", replace_address));
}
// make magic happen
slogger.info("Performing gossip shadow round");
co_await _gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::yes);
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (replace_host_id) {
auto node = _gossiper.get_node_ip(replace_host_id);
if (!node) {
throw std::runtime_error(::format("Replaced node with Host ID {} not found", replace_host_id));
}
replace_address = *node;
} else {
replace_host_id = _gossiper.get_host_id(replace_address);
}
auto state = _gossiper.get_endpoint_state_ptr(replace_host_id);
if (!state) {
throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address));
}
// Reject to replace a node that has left the ring
auto status = _gossiper.get_gossip_status(replace_host_id);
if (status == gms::versioned_value::STATUS_LEFT || status == gms::versioned_value::REMOVED_TOKEN) {
throw std::runtime_error(::format("Cannot replace_address {} because it has left the ring, status={}", replace_address, status));
}
auto dc_rack = get_dc_rack_for(replace_host_id).value_or(locator::endpoint_dc_rack::default_location);
auto ri = replacement_info {
.dc_rack = std::move(dc_rack),
.host_id = std::move(replace_host_id),
};
slogger.info("Host {}/{} is replacing {}/{}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address);
co_await _gossiper.reset_endpoint_state_map();
co_return ri;
}
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
return run_with_no_api_lock([this] (storage_service& ss) {
const auto& tm = ss.get_token_metadata();
auto token_map = dht::token::describe_ownership(tm.sorted_tokens());
// describeOwnership returns tokens in an unspecified order, let's re-order them
std::map<gms::inet_address, float> ownership;
for (auto entry : token_map) {
locator::host_id id = tm.get_endpoint(entry.first).value();
auto token_ownership = entry.second;
ownership[_address_map.get(id)] += token_ownership;
}
return ownership;
});
}
future<std::map<gms::inet_address, float>> storage_service::effective_ownership(sstring keyspace_name, sstring table_name) {
return run_with_no_api_lock([keyspace_name, table_name] (storage_service& ss) mutable -> future<std::map<gms::inet_address, float>> {
locator::effective_replication_map_ptr erm;
if (keyspace_name != "") {
//find throws no such keyspace if it is missing
const replica::keyspace& ks = ss._db.local().find_keyspace(keyspace_name);
// This is ugly, but it follows origin
auto&& rs = ks.get_replication_strategy(); // clang complains about typeid(ks.get_replication_strategy());
if (rs.is_local()) {
throw std::runtime_error("Ownership values for keyspaces with LocalStrategy are meaningless");
}
if (table_name.empty()) {
erm = ks.get_static_effective_replication_map();
} else {
auto& cf = ss._db.local().find_column_family(keyspace_name, table_name);
erm = cf.get_effective_replication_map();
}
} else {
auto non_system_keyspaces = ss._db.local().get_non_system_keyspaces();
//system_traces is a non-system keyspace however it needs to be counted as one for this process
size_t special_table_count = 0;
if (std::find(non_system_keyspaces.begin(), non_system_keyspaces.end(), "system_traces") !=
non_system_keyspaces.end()) {
special_table_count += 1;
}
if (non_system_keyspaces.size() > special_table_count) {
throw std::runtime_error("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless");
}
keyspace_name = "system_traces";
const auto& ks = ss._db.local().find_keyspace(keyspace_name);
erm = ks.get_static_effective_replication_map();
}
// The following loops seems computationally heavy, but it's not as bad.
// The upper two simply iterate over all the endpoints by iterating over all the
// DC and all the instances in each DC.
//
// The call for get_range_for_endpoint is done once per endpoint
const auto& tm = *erm->get_token_metadata_ptr();
const auto tokens = co_await std::invoke([&]() -> future<utils::chunked_vector<token>> {
if (!erm->get_replication_strategy().uses_tablets()) {
return make_ready_future<utils::chunked_vector<token>>(tm.sorted_tokens());
} else {
auto& cf = ss._db.local().find_column_family(keyspace_name, table_name);
const auto& tablets = tm.tablets().get_tablet_map(cf.schema()->id());
return tablets.get_sorted_tokens();
}
});
const auto token_ownership = dht::token::describe_ownership(tokens);
const auto datacenter_endpoints = tm.get_topology().get_datacenter_host_ids();
std::map<gms::inet_address, float> final_ownership;
for (const auto& [dc, endpoints_map] : datacenter_endpoints) {
for (auto endpoint : endpoints_map) {
// calculate the ownership with replication and add the endpoint to the final ownership map
try {
float ownership = 0.0f;
auto ranges = co_await ss.get_ranges_for_endpoint(*erm, endpoint);
for (auto& r : ranges) {
// get_ranges_for_endpoint will unwrap the first range.
// With t0 t1 t2 t3, the first range (t3,t0] will be split
// as (min,t0] and (t3,max]. Skippping the range (t3,max]
// we will get the correct ownership number as if the first
// range were not split.
if (!r.end()) {
continue;
}
auto end_token = r.end()->value();
auto loc = token_ownership.find(end_token);
if (loc != token_ownership.end()) {
ownership += loc->second;
}
}
final_ownership[ss._address_map.find(endpoint).value()] = ownership;
} catch (replica::no_such_keyspace&) {
// In case ss.get_ranges_for_endpoint(keyspace_name, endpoint) is not found, just mark it as zero and continue
final_ownership[ss._address_map.find(endpoint).value()] = 0;
}
}
}
co_return final_ownership;
});
}
void storage_service::set_mode(mode m) {
if (m == mode::MAINTENANCE && _operation_mode != mode::NONE) {
// Prevent from calling `start_maintenance_mode` after `join_cluster`.
on_fatal_internal_error(slogger, format("Node should enter maintenance mode only from mode::NONE (current mode: {})", _operation_mode));
}
if (m == mode::STARTING && _operation_mode == mode::MAINTENANCE) {
// Prevent from calling `join_cluster` after `start_maintenance_mode`.
on_fatal_internal_error(slogger, "Node in the maintenance mode cannot enter the starting mode");
}
if (m != _operation_mode) {
slogger.info("entering {} mode", m);
_operation_mode = m;
} else {
// This shouldn't happen, but it's too much for an SCYLLA_ASSERT,
// so -- just emit a warning in the hope that it will be
// noticed, reported and fixed
slogger.warn("re-entering {} mode", m);
}
}
sstring storage_service::get_release_version() {
return version::release();
}
sstring storage_service::get_schema_version() {
return _db.local().get_version().to_sstring();
}
static constexpr auto UNREACHABLE = "UNREACHABLE";
future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::describe_schema_versions() {
auto live_hosts = _gossiper.get_live_members();
std::unordered_map<sstring, std::vector<sstring>> results;
netw::messaging_service& ms = _messaging.local();
return map_reduce(std::move(live_hosts), [&ms, as = abort_source()] (auto host) mutable {
auto f0 = ser::migration_manager_rpc_verbs::send_schema_check(&ms, host, as);
return std::move(f0).then_wrapped([host] (auto f) {
if (f.failed()) {
f.ignore_ready_future();
return std::pair<locator::host_id, std::optional<table_schema_version>>(host, std::nullopt);
}
return std::pair<locator::host_id, std::optional<table_schema_version>>(host, f.get());
});
}, std::move(results), [this] (auto results, auto host_and_version) {
auto version = host_and_version.second ? host_and_version.second->to_sstring() : UNREACHABLE;
results.try_emplace(version).first->second.emplace_back(fmt::to_string(_address_map.get(host_and_version.first)));
return results;
}).then([this] (auto results) {
// we're done: the results map is ready to return to the client. the rest is just debug logging:
auto it_unreachable = results.find(UNREACHABLE);
if (it_unreachable != results.end()) {
slogger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", fmt::join(it_unreachable->second, ","));
}
auto my_version = get_schema_version();
for (auto&& entry : results) {
// check for version disagreement. log the hosts that don't agree.
if (entry.first == UNREACHABLE || entry.first == my_version) {
continue;
}
for (auto&& host : entry.second) {
slogger.debug("{} disagrees ({})", host, entry.first);
}
}
if (results.size() == 1) {
slogger.debug("Schemas are in agreement.");
}
return results;
});
};
future<storage_service::mode> storage_service::get_operation_mode() {
return run_with_no_api_lock([] (storage_service& ss) {
return make_ready_future<mode>(ss._operation_mode);
});
}
future<bool> storage_service::is_gossip_running() {
return run_with_no_api_lock([] (storage_service& ss) {
return ss._gossiper.is_enabled();
});
}
future<> storage_service::start_gossiping() {
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) -> future<> {
if (!ss._gossiper.is_enabled()) {
slogger.warn("Starting gossip by operator request");
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::start);
bool should_stop_gossiper = false; // undo action
try {
co_await set_gossip_tokens(ss._gossiper,
co_await ss._sys_ks.local().get_local_tokens());
ss._gossiper.force_newer_generation();
co_await ss._gossiper.start_gossiping(gms::get_generation_number());
} catch (...) {
should_stop_gossiper = true;
}
if (should_stop_gossiper) {
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
}
});
}
future<> storage_service::stop_gossiping() {
return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) {
if (ss._gossiper.is_enabled()) {
slogger.warn("Stopping gossip by operator request");
return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
return make_ready_future<>();
});
}
static size_t count_normal_token_owners(const topology& topology) {
return std::count_if(topology.normal_nodes.begin(), topology.normal_nodes.end(), [] (const auto& node) {
return !node.second.ring.value().tokens.empty();
});
}
future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
if (_topology_state_machine._topology.normal_nodes.size() == 1) {
throw std::runtime_error("Cannot decommission last node in the cluster");
}
if (!rs.ring.value().tokens.empty() && count_normal_token_owners(_topology_state_machine._topology) == 1) {
throw std::runtime_error("Cannot decommission the last token-owning node in the cluster");
}
auto validation_result = validate_removing_node(_db.local(), locator::host_id(raft_server.id().uuid()));
if (std::holds_alternative<node_validation_failure>(validation_result)) {
throw std::runtime_error(fmt::format("Decommission failed: node decommission rejected: {}",
std::get<node_validation_failure>(validation_result).reason));
}
rtlogger.info("request decommission for: {}", raft_server.id());
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::leave)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::leave);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id()));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("decommission: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.info("decommission: waiting for completion (request ID: {})", request_id);
auto error = co_await wait_for_topology_request_completion(request_id);
if (error.empty()) {
// Need to set it otherwise gossiper will try to send shutdown on exit
rtlogger.info("decommission: successfully removed from topology (request ID: {}), updating gossip status", request_id);
co_await _gossiper.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count())));
rtlogger.info("Decommission succeeded. Request ID: {}", request_id);
} else {
auto err = fmt::format("Decommission failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
}
future<> storage_service::decommission(sharded<db::snapshot_ctl>& snapshot_ctl) {
return run_with_api_lock(sstring("decommission"), [&] (storage_service& ss) {
return seastar::async([&] {
if (ss._operation_mode != mode::NORMAL) {
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
}
snapshot_ctl.invoke_on_all([](auto& sctl) {
return sctl.disable_all_operations();
}).get();
slogger.info("DECOMMISSIONING: disabled backup and snapshots");
ss.raft_decommission().get();
ss.stop_transport().get();
slogger.info("DECOMMISSIONING: stopped transport");
ss.get_batchlog_manager().invoke_on_all([] (auto& bm) {
return bm.drain();
}).get();
slogger.info("DECOMMISSIONING: stop batchlog_manager done");
// StageManager.shutdownNow();
ss._sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
slogger.info("DECOMMISSIONING: set_bootstrap_state done");
ss.set_mode(mode::DECOMMISSIONED);
slogger.info("DECOMMISSIONING: done");
// let op be responsible for killing the process
});
});
}
future<> storage_service::raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
auto id = raft::server_id{host_id.uuid()};
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(id);
if (!it) {
throw std::runtime_error(::format("removenode: host id {} is not found in the cluster", host_id));
}
auto& rs = it->second; // not usable after yield
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("removenode: node {} is in '{}' state. Wait for it to be in 'normal' state", id, rs.state));
}
if (!rs.ring.value().tokens.empty() && count_normal_token_owners(_topology_state_machine._topology) == 1) {
throw std::runtime_error(::format(
"removenode: node {} cannot be removed because it is the last token-owning "
"node in the cluster. If this node is unrecoverable, the cluster has entered an incorrect "
"and unrecoverable state. All user data and a part of the system data is lost.",
id));
}
if (_gossiper.is_alive(host_id)) {
const std::string message = ::format(
"removenode: Rejected removenode operation for node {}"
"the node being removed is alive, maybe you should use decommission instead?",
id);
rtlogger.warn("{}", message);
throw std::runtime_error(message);
}
auto validation_result = validate_removing_node(_db.local(), host_id);
if (std::holds_alternative<node_validation_failure>(validation_result)) {
throw std::runtime_error(fmt::format("Removenode failed: node remove rejected: {}",
std::get<node_validation_failure>(validation_result).reason));
}
auto ignored_ids = find_raft_nodes_from_hoeps(ignore_nodes_params);
// insert node that should be removed to ignore list so that other topology operations
// can ignore it
ignored_ids.insert(id);
rtlogger.info("request removenode for: {}, new ignored nodes: {}, existing ignore nodes: {}", id, ignored_ids, _topology_state_machine._topology.ignored_nodes);
topology_mutation_builder builder(guard.write_timestamp());
builder.add_ignored_nodes(ignored_ids).with_node(id)
.set("topology_request", topology_request::remove)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::remove);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
request_id = guard.new_group0_state_id();
if (auto itr = _topology_state_machine._topology.requests.find(id);
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
}
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("removenode: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.info("removenode: waiting for completion (request ID: {})", request_id);
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Removenode failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
rtlogger.info("Removenode succeeded. Request ID: {}", request_id);
}
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.mark_excluded(hosts);
});
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::unordered_set<raft::server_id> raft_hosts;
for (auto host : hosts) {
if (_gossiper.is_alive(host)) {
const std::string message = ::format("Cannot mark host {} as excluded because it's alive", host);
rtlogger.warn("{}", message);
throw std::runtime_error(message);
}
raft_hosts.insert(raft::server_id(host.uuid()));
}
topology_mutation_builder builder(guard.write_timestamp());
builder.add_ignored_nodes(raft_hosts);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("Mark as excluded: {}", hosts));
rtlogger.info("Marking nodes as excluded: {}, previous set: {}", hosts, _topology_state_machine._topology.ignored_nodes);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("mark_excluded: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("Nodes marked as excluded: {}", hosts);
break;
}
}
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
return run_with_no_api_lock([host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
ss.raft_removenode(host_id, std::move(ignore_nodes_params)).get();
});
});
}
future<> storage_service::check_and_repair_cdc_streams() {
SCYLLA_ASSERT(this_shard_id() == 0);
if (!_cdc_gens.local_is_initialized()) {
return make_exception_future<>(std::runtime_error("CDC generation service not initialized yet"));
}
return raft_check_and_repair_cdc_streams();
}
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req) {
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid);
if (req.cmd == node_ops_cmd::repair_updater) {
slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator);
_db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (replica::database &db) {
for (const auto& table_id : tables) {
try {
auto& table = db.find_column_family(table_id);
table.update_off_strategy_trigger();
slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}",
ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator);
} catch (replica::no_such_column_family&) {
// The table could be dropped by user, ignore it.
} catch (...) {
throw;
}
}
}).get();
return node_ops_cmd_response(true);
}
slogger.error("node_ops_cmd_handler cmd={}, ops_uuid={} this cmd is no longer supported", req.cmd, ops_uuid);
return node_ops_cmd_response(false);
});
}
future<> storage_service::reload_schema() {
// Flush memtables and clear cache so that we use the same state we would after node restart
// to rule out potential discrepancies which could stem from merging with memtable/cache readers.
co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await _migration_manager.invoke_on(0, [] (auto& mm) {
return mm.reload_schema();
});
}
future<> storage_service::drain() {
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
if (ss._operation_mode == mode::DRAINED) {
slogger.warn("Cannot drain node (did it already happen?)");
return make_ready_future<>();
}
ss.set_mode(mode::DRAINING);
return ss.do_drain().then([&ss] {
ss._drain_finished.set_value();
ss.set_mode(mode::DRAINED);
});
});
}
future<> storage_service::do_drain() {
co_await utils::get_local_injector().inject("storage_service_drain_wait", utils::wait_for_message(60s));
// Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found.
co_await stop_transport();
// Drain view builder before group0, because the view builder uses group0 to coordinate view building.
// Drain after transport is stopped, because view_builder::drain aborts view writes for user writes as well.
co_await _view_builder.invoke_on_all(&db::view::view_builder::drain);
co_await _view_building_worker.invoke_on_all(&db::view::view_building_worker::drain);
// group0 persistence relies on local storage, so we need to stop group0 first.
// This must be kept in sync with defer_verbose_shutdown for group0 in main.cc to
// handle the case when initialization fails before reaching drain_on_shutdown for ss.
_sl_controller.local().abort_group0_operations();
co_await wait_for_group0_stop();
if (_group0) {
co_await _group0->abort_and_drain();
}
co_await tracing::tracing::tracing_instance().invoke_on_all(&tracing::tracing::shutdown);
co_await get_batchlog_manager().invoke_on_all([] (auto& bm) {
return bm.drain();
});
co_await _db.invoke_on_all(&replica::database::drain);
co_await _sys_ks.invoke_on_all(&db::system_keyspace::shutdown);
co_await _repair.invoke_on_all(&repair_service::shutdown);
}
future<> storage_service::do_clusterwide_vnodes_cleanup() {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::cleanup) {
throw std::runtime_error{
"topology coordinator: cluster-wide vnodes cleanup: a different topology request is already pending, try again later"};
}
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
rtlogger.info("cluster-wide vnodes cleanup requested");
topology_mutation_builder builder(guard.write_timestamp());
utils::chunked_vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
builder.queue_global_topology_request_id(request_id);
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::cleanup);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::cleanup);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("vnodes cleanup: cluster-wide cleanup requested"));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("cluster-wide vnodes cleanup: concurrent operation is detected, retrying.");
continue;
}
break;
}
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Cluster-wide vnodes cleanup failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
}
// The wait above only waits until the command is processed by the topology coordinator which start
// the vnodes cleanup process, but we still need to wait for it to complete here.
co_await _topology_state_machine.event.when([this] {
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
return n.second.cleanup == cleanup_status::clean;
});
});
rtlogger.info("cluster-wide vnodes cleanup done");
}
future<> storage_service::reset_cleanup_needed() {
auto& server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto me = _topology_state_machine._topology.find(server.id());
if (!me || me->second.state != node_state::normal) {
throw std::runtime_error(format("cannot mark the node as clean: local node {} is either not a member of the cluster or is not in a normal state", server.id()));
}
if (me->second.cleanup != cleanup_status::needed) {
rtlogger.info("cannot reset cleanup flag when it is {}", me->second.cleanup);
co_return;
}
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup status reset by force for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("cleanup needed flag is reset by force");
break;
}
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id, bool require_entry) {
co_return co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), id, require_entry);
}
future<> storage_service::abort_topology_request(utils::UUID request_id) {
co_await container().invoke_on(0, [request_id, this] (storage_service& ss) {
return _topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
});
}
future<> storage_service::wait_for_topology_not_busy() {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
while (_topology_state_machine._topology.is_busy()) {
release_guard(std::move(guard));
co_await _topology_state_machine.event.when();
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
}
future<> storage_service::abort_paused_rf_change(utils::UUID request_id) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.abort_paused_rf_change(request_id);
});
}
if (!_feature_service.rack_list_rf) {
throw std::runtime_error("The RACK_LIST_RF feature is not enabled on the cluster yet");
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
bool found = std::ranges::contains(_topology_state_machine._topology.paused_rf_change_requests, request_id);
if (!found) {
slogger.warn("RF change request with id '{}' is not paused, so it can't be aborted", request_id);
co_return;
}
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.resume_rf_change_request(_topology_state_machine._topology.paused_rf_change_requests, request_id).build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(request_id)
.done("Aborted by user request")
.build()));
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
format("aborting rf change request {}", request_id));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
slogger.info("aborting request {}: concurrent modification, retrying.", request_id);
continue;
}
break;
}
}
bool storage_service::topology_global_queue_empty() const {
return !_topology_state_machine._topology.global_request.has_value();
}
semaphore& storage_service::get_do_sample_sstables_concurrency_limiter() {
return _do_sample_sstables_concurrency_limiter;
}
future<uint64_t> storage_service::estimate_total_sstable_volume(table_id t, ignore_errors errors) {
co_return co_await seastar::map_reduce(
_db.local().get_token_metadata().get_host_ids(),
[&] (auto h) -> future<uint64_t> {
try {
co_return co_await ser::storage_service_rpc_verbs::send_estimate_sstable_volume(&_messaging.local(), h, t);
}
catch(...) {
if (errors == ignore_errors::yes) {
// If the call failed we just return 0 for this one
slogger.info("call to estimate_total_sstable_volume failed for table {} and host {}, returning 0", t, h);
co_return 0;
}
throw;
}
},
uint64_t(0),
std::plus<uint64_t>()
);
}
future<std::vector<std::byte>> storage_service::train_dict(utils::chunked_vector<temporary_buffer<char>> sample) {
std::vector<std::vector<std::byte>> tmp;
tmp.reserve(sample.size());
for (const auto& s : sample) {
auto v = std::as_bytes(std::span(s));
tmp.push_back(std::vector<std::byte>(v.begin(), v.end()));
}
co_return co_await container().invoke_on(0, [tmp = std::move(tmp)] (auto& local) {
if (!local._train_dict) {
on_internal_error(slogger, "retrain_dict: _train_dict not plugged");
}
return local._train_dict(std::move(tmp));
});
}
future<> storage_service::publish_new_sstable_dict(table_id t_id, std::span<const std::byte> dict, service::raft_group0_client& group0_client) {
co_await container().invoke_on(0, coroutine::lambda([t_id, dict, &group0_client] (storage_service& local_ss) -> future<> {
auto group0_holder = local_ss._group0->hold_group0_gate();
while (true) {
try {
auto name = fmt::format("sstables/{}", t_id);
slogger.debug("publish_new_sstable_dict: trying to publish the dict as {}", name);
auto batch = service::group0_batch(co_await group0_client.start_operation(local_ss._group0_as));
auto write_ts = batch.write_timestamp();
auto new_dict_ts = db_clock::now();
auto data = bytes(reinterpret_cast<const bytes::value_type*>(dict.data()), dict.size());
auto this_host_id = local_ss._db.local().get_token_metadata().get_topology().get_config().this_host_id;
mutation publish_new_dict = co_await local_ss._sys_ks.local().get_insert_dict_mutation(name, std::move(data), this_host_id, new_dict_ts, write_ts);
batch.add_mutation(std::move(publish_new_dict), "publish new SSTable compression dictionary");
slogger.debug("publish_new_sstable_dict: committing");
co_await std::move(batch).commit(group0_client, local_ss._group0_as, {});
slogger.debug("publish_new_sstable_dict: finished");
break;
} catch (const service::group0_concurrent_modification&) {
slogger.debug("group0_concurrent_modification in publish_new_sstable_dict, retrying");
}
}
}));
}
void storage_service::set_train_dict_callback(decltype(_train_dict) cb) {
_train_dict = std::move(cb);
}
future<utils::chunked_vector<temporary_buffer<char>>> storage_service::do_sample_sstables(table_id t, uint64_t chunk_size, uint64_t n_chunks) {
uint64_t max_chunks_per_round = 16 * 1024 * 1024 / chunk_size;
uint64_t chunks_done = 0;
auto result = utils::chunked_vector<temporary_buffer<char>>();
result.reserve(n_chunks);
while (chunks_done < n_chunks) {
auto chunks_this_round = std::min(max_chunks_per_round, n_chunks - chunks_done);
auto round_result = co_await do_sample_sstables_oneshot(t, chunk_size, chunks_this_round);
std::move(round_result.begin(), round_result.end(), std::back_inserter(result));
if (round_result.size() < chunks_this_round) {
break;
}
chunks_done += chunks_this_round;
}
co_return result;
}
future<utils::chunked_vector<temporary_buffer<char>>> storage_service::do_sample_sstables_oneshot(table_id t, uint64_t chunk_size, uint64_t n_chunks) {
slogger.debug("do_sample_sstables(): called with table_id={} chunk_size={} n_chunks={}", t, chunk_size, n_chunks);
auto& db = _db.local();
auto& ms = _messaging.local();
std::unordered_map<locator::host_id, uint64_t> estimated_sizes;
co_await coroutine::parallel_for_each(
db.get_token_metadata().get_host_ids(),
[&] (auto h) -> future<> {
auto est = co_await ser::storage_service_rpc_verbs::send_estimate_sstable_volume(&ms, h, t);
if (est) {
estimated_sizes.emplace(h, est);
}
}
);
const auto total_size = std::ranges::fold_left(estimated_sizes | std::ranges::views::values, uint64_t(0), std::plus());
slogger.debug("do_sample_sstables(): estimate_sstable_volume returned {}, total={}", estimated_sizes, total_size);
std::unordered_map<locator::host_id, uint64_t> chunks_per_host;
{
uint64_t partial_sum = 0;
uint64_t covered_samples = 0;
for (const auto& [k, v] : estimated_sizes) {
partial_sum += v;
uint64_t next_covered = static_cast<double>(partial_sum) / total_size * n_chunks;
chunks_per_host.emplace(k, next_covered - covered_samples);
covered_samples = next_covered;
}
// Just a sanity check
auto covered = std::ranges::fold_left(chunks_per_host | std::ranges::views::values, uint64_t(0), std::plus());
if (total_size > 0 && covered != n_chunks) {
on_internal_error(slogger, "do_sample_sstables(): something went wrong with the sample distribution algorithm");
}
}
slogger.debug("do_sample_sstables(): sending out send_sample_sstables with proportions {}", chunks_per_host);
auto samples = co_await seastar::map_reduce(
chunks_per_host,
[&] (std::pair<locator::host_id, uint64_t> h_s) -> future<utils::chunked_vector<temporary_buffer<char>>> {
const auto& [h, sz] = h_s;
return ser::storage_service_rpc_verbs::send_sample_sstables(&ms, h, t, chunk_size, sz);
},
utils::chunked_vector<temporary_buffer<char>>(),
[] (auto v, auto some_samples) {
std::ranges::move(some_samples, std::back_inserter(v));
return v;
}
);
slogger.debug("do_sample_sstables(): returned {} chunks", samples.size());
co_return samples;
}
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
if (rs.ring.value().tokens.empty()) {
rtlogger.warn("local node does not own any tokens, skipping redundant rebuild");
co_return;
}
if (_topology_state_machine._topology.normal_nodes.size() == 1) {
throw std::runtime_error("Cannot rebuild a single node");
}
rtlogger.info("request rebuild for: {} source_dc={}", raft_server.id(), sdc_param);
topology_mutation_builder builder(guard.write_timestamp());
sstring source_dc = sdc_param.value_or("");
if (sdc_param.force() && !source_dc.empty()) {
source_dc += ":force";
}
builder.with_node(raft_server.id())
.set("topology_request", topology_request::rebuild)
.set("rebuild_option", source_dc)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::rebuild);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("rebuild: concurrent operation is detected, retrying.");
continue;
}
break;
}
// Wait until request completes
auto err = co_await wait_for_topology_request_completion(request_id);
if (!err.empty()) {
throw std::runtime_error(::format("rebuild failed: {}", err));
}
}
future<> storage_service::raft_check_and_repair_cdc_streams() {
std::optional<cdc::generation_id> last_committed_gen;
utils::UUID request_id;
while (true) {
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::optional<global_topology_request> curr_req;
if (_topology_state_machine._topology.global_request) {
curr_req = *_topology_state_machine._topology.global_request;
request_id = _topology_state_machine._topology.global_request_id.value();
} else if (!_topology_state_machine._topology.global_requests_queue.empty()) {
request_id = _topology_state_machine._topology.global_requests_queue[0];
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
curr_req = std::get<global_topology_request>(req_entry.request_type);
} else {
request_id = utils::UUID{};
}
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
if (!_feature_service.topology_global_request_queue) {
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
} else {
request_id = utils::UUID{};
}
}
if (_topology_state_machine._topology.committed_cdc_generations.empty()) {
slogger.error("check_and_repair_cdc_streams: no committed CDC generations, requesting a new one.");
} else {
last_committed_gen = _topology_state_machine._topology.committed_cdc_generations.back();
auto gen = co_await _sys_ks.local().read_cdc_generation(last_committed_gen->id);
if (cdc::is_cdc_generation_optimal(gen, get_token_metadata())) {
cdc_log.info("CDC generation {} does not need repair", last_committed_gen);
co_return;
}
cdc_log.info("CDC generation {} needs repair, requesting a new one", last_committed_gen);
}
// With global request queue coalescing requests should not be needed, but test_cdc_generation_publishing assumes that multiple new_cdc_generation
// commands will be coalesced here, so do that until the test is fixed.
if (!request_id) {
topology_mutation_builder builder(guard.write_timestamp());
utils::chunked_vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
builder.queue_global_topology_request_id(request_id);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::new_cdc_generation);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
continue;
}
}
break;
}
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Check and repair cdc stream failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
if (last_committed_gen == gen) {
on_internal_error(rtlogger, "Wrong generation after completion of check and repair cdc stream");
}
} else {
// Wait until we commit a new CDC generation.
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
return last_committed_gen != gen;
});
}
}
future<> storage_service::rebuild(utils::optional_param source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> {
if (auto tablets_keyspaces = ss._db.local().get_tablets_keyspaces(); !tablets_keyspaces.empty()) {
std::ranges::sort(tablets_keyspaces);
slogger.warn("Rebuild is not supported for the following tablets-enabled keyspaces: {}: "
"Rebuild is not required for tablets-enabled keyspace after increasing replication factor. "
"However, recovering from local data loss on this node requires running repair on all nodes in the datacenter", tablets_keyspaces);
}
co_await ss.raft_rebuild(source_dc);
});
}
int32_t storage_service::get_exception_count() {
// FIXME
// We return 0 for no exceptions, it should probably be
// replaced by some general exception handling that would count
// the unhandled exceptions.
//return (int)StorageMetrics.exceptions.count();
return 0;
}
future<std::unordered_multimap<dht::token_range, locator::host_id>>
storage_service::get_changed_ranges_for_leaving(const locator::vnode_effective_replication_map* erm, locator::host_id endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = co_await get_ranges_for_endpoint(*erm, endpoint);
slogger.debug("Node {} ranges [{}]", endpoint, ranges);
std::unordered_map<dht::token_range, host_id_vector_replica_set> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = erm->get_natural_replicas(end_token);
current_replica_endpoints.emplace(r, std::move(eps));
co_await coroutine::maybe_yield();
}
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
// command was used), it is still present in temp and must be removed.
if (temp.is_normal_token_owner(endpoint)) {
temp.remove_endpoint(endpoint);
}
std::unordered_multimap<dht::token_range, locator::host_id> changed_ranges;
// Go through the ranges and for each range check who will be
// storing replicas for these ranges when the leaving endpoint
// is gone. Whoever is present in newReplicaEndpoints list, but
// not in the currentReplicaEndpoints list, will be needing the
// range.
const auto& rs = erm->get_replication_strategy();
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
const dht::token_range& range_ = it->first;
host_id_vector_replica_set& current_eps = it->second;
slogger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints);
for (auto ep : it->second) {
auto beg = new_replica_endpoints.begin();
auto end = new_replica_endpoints.end();
new_replica_endpoints.erase(std::remove(beg, end, ep), end);
}
}
if (slogger.is_enabled(logging::log_level::debug)) {
if (new_replica_endpoints.empty()) {
slogger.debug("Range {} already in all replicas", r);
} else {
slogger.debug("Range {} will be responsibility of {}", r, new_replica_endpoints);
}
}
for (auto& ep : new_replica_endpoints) {
changed_ranges.emplace(r, ep);
}
// Replication strategy doesn't necessarily yield in calculate_natural_endpoints.
// E.g. everywhere_replication_strategy
co_await coroutine::maybe_yield();
}
co_await temp.clear_gently();
co_return changed_ranges;
}
future<> storage_service::unbootstrap() {
slogger.info("Started batchlog replay for decommission");
co_await get_batchlog_manager().local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes);
slogger.info("Finished batchlog replay for decommission");
if (is_repair_based_node_ops_enabled(streaming::stream_reason::decommission)) {
co_await _repair.local().decommission_with_repair(get_token_metadata_ptr(), null_topology_guard);
} else {
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream;
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
auto ranges_mm = co_await get_changed_ranges_for_leaving(erm->maybe_as_vnode_effective_replication_map(), my_host_id());
if (slogger.is_enabled(logging::log_level::debug)) {
std::vector<wrapping_interval<token>> ranges;
for (auto& x : ranges_mm) {
ranges.push_back(x.first);
}
slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges);
}
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
}
set_mode(mode::LEAVING);
auto stream_success = stream_ranges(std::move(ranges_to_stream));
// wait for the transfer runnables to signal the latch.
slogger.debug("waiting for stream acks.");
try {
co_await std::move(stream_success);
} catch (...) {
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
throw;
}
slogger.debug("stream acks all received.");
}
}
future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, locator::host_id leaving_node) {
auto my_address = my_host_id();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, ermp] : ks_erms) {
auto* erm = ermp->maybe_as_vnode_effective_replication_map();
std::unordered_multimap<dht::token_range, locator::host_id> changed_ranges = co_await get_changed_ranges_for_leaving(erm, leaving_node);
dht::token_range_vector my_new_ranges;
for (auto& x : changed_ranges) {
if (x.second == my_address) {
my_new_ranges.emplace_back(x.first);
}
}
std::unordered_multimap<locator::host_id, dht::token_range> source_ranges = co_await get_new_source_ranges(erm, my_new_ranges);
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto& x : source_ranges) {
ranges_per_endpoint[x.first].emplace_back(x.second);
}
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
}
}
future<> storage_service::removenode_with_stream(locator::host_id leaving_node,
frozen_topology_guard topo_guard,
shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr, topo_guard] {
auto tmptr = get_token_metadata_ptr();
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});
if (!as_ptr) {
throw std::runtime_error("removenode_with_stream: abort_source is nullptr");
}
auto as_ptr_sub = as_ptr->subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, tmptr->get_my_id(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
} catch (...) {
slogger.warn("removenode_with_stream: stream failed: {}", std::current_exception());
throw;
}
});
}
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_token_metadata_ptr()->get_my_id(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission, null_topology_guard);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
if (ranges_with_endpoints.empty()) {
continue;
}
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
dht::token_range r = end_point_entry.first;
locator::host_id endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
co_await coroutine::maybe_yield();
}
streamer.add_tx_ranges(keyspace, std::move(ranges_per_endpoint));
}
try {
co_await streamer.stream_async();
slogger.info("stream_ranges successful");
} catch (...) {
auto ep = std::current_exception();
slogger.warn("stream_ranges failed: {}", ep);
std::rethrow_exception(std::move(ep));
}
}
bool storage_service::is_raft_leader() const noexcept {
return _group0->joined_group0() && _group0->group0_server().is_leader();
}
future<> storage_service::shutdown_protocol_servers() {
for (auto& server : _protocol_servers) {
slogger.info("Shutting down {} server", server->name());
try {
co_await server->stop_server();
} catch (...) {
slogger.error("Unexpected error shutting down {} server: {}",
server->name(), std::current_exception());
throw;
}
slogger.info("Shutting down {} server was successful", server->name());
}
}
future<std::unordered_multimap<locator::host_id, dht::token_range>>
storage_service::get_new_source_ranges(const locator::vnode_effective_replication_map* erm, const dht::token_range_vector& ranges) const {
auto my_address = my_host_id();
std::unordered_map<dht::token_range, host_id_vector_replica_set> range_addresses = co_await erm->get_range_host_ids();
std::unordered_multimap<locator::host_id, dht::token_range> source_ranges;
// find alive sources for our new ranges
auto tmptr = erm->get_token_metadata_ptr();
for (auto r : ranges) {
host_id_vector_replica_set sources;
auto it = range_addresses.find(r);
if (it != range_addresses.end()) {
sources = it->second;
}
tmptr->get_topology().sort_by_proximity(my_address, sources);
if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) {
auto err = ::format("get_new_source_ranges: sources={}, my_address={}", sources, my_address);
slogger.warn("{}", err);
throw std::runtime_error(err);
}
for (auto& source : sources) {
if (_gossiper.is_alive(source)) {
source_ranges.emplace(source, r);
break;
}
}
co_await coroutine::maybe_yield();
}
co_return source_ranges;
}
future<> storage_service::move(token new_token) {
return run_with_api_lock(sstring("move"), [] (storage_service& ss) mutable {
return make_exception_future<>(std::runtime_error("Move operation is not supported only more"));
});
}
future<utils::chunked_vector<storage_service::token_range_endpoints>>
storage_service::describe_ring(const sstring& keyspace, bool include_only_local_dc) const {
if (_db.local().find_keyspace(keyspace).uses_tablets()) {
throw std::runtime_error(fmt::format("The keyspace {} has tablet table. Query describe_ring with the table parameter!", keyspace));
}
co_return co_await locator::describe_ring(_db.local(), _gossiper, keyspace, include_only_local_dc);
}
future<utils::chunked_vector<dht::token_range_endpoints>>
storage_service::describe_ring_for_table(const sstring& keyspace_name, const sstring& table_name) const {
slogger.debug("describe_ring for table {}.{}", keyspace_name, table_name);
auto& t = _db.local().find_column_family(keyspace_name, table_name);
if (!t.uses_tablets()) {
auto ranges = co_await describe_ring(keyspace_name);
co_return ranges;
}
table_id tid = t.schema()->id();
auto erm = t.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
const auto& topology = erm->get_topology();
utils::chunked_vector<dht::token_range_endpoints> ranges;
ranges.reserve(tmap.tablet_count());
std::unordered_map<locator::host_id, locator::describe_ring_endpoint_info> host_infos;
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto range = tmap.get_token_range(id);
auto& replicas = info.replicas;
dht::token_range_endpoints tr;
if (range.start()) {
tr._start_token = range.start()->value().to_sstring();
}
if (range.end()) {
tr._end_token = range.end()->value().to_sstring();
}
tr._endpoints.reserve(replicas.size());
tr._rpc_endpoints.reserve(replicas.size());
tr._endpoint_details.reserve(replicas.size());
for (auto& r : replicas) {
auto& endpoint = r.host;
auto it = host_infos.find(endpoint);
if (it == host_infos.end()) {
it = host_infos.emplace(endpoint, get_describe_ring_endpoint_info(endpoint, topology, _gossiper)).first;
}
tr._rpc_endpoints.emplace_back(it->second.rpc_addr);
tr._endpoints.emplace_back(fmt::to_string(it->second.details._host));
tr._endpoint_details.emplace_back(it->second.details);
}
ranges.push_back(std::move(tr));
return make_ready_future<>();
});
co_return ranges;
}
std::map<token, inet_address> storage_service::get_token_to_endpoint_map() {
const auto& tm = get_token_metadata();
std::map<token, inet_address> result;
for (const auto [t, id]: tm.get_token_to_endpoint()) {
result.insert({t, _address_map.get(id)});
}
for (const auto [t, id]: tm.get_bootstrap_tokens()) {
result.insert({t, _address_map.get(id)});
}
return result;
}
future<std::map<token, inet_address>> storage_service::get_tablet_to_endpoint_map(table_id table) {
const auto& tm = get_token_metadata();
const auto& tmap = tm.tablets().get_tablet_map(table);
std::map<token, inet_address> result;
for (std::optional<locator::tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid, tm.get_topology()).host));
co_await coroutine::maybe_yield();
}
co_return result;
}
std::chrono::milliseconds storage_service::get_ring_delay() {
auto ring_delay = _db.local().get_config().ring_delay_ms();
slogger.trace("Get RING_DELAY: {}ms", ring_delay);
return std::chrono::milliseconds(ring_delay);
}
future<locator::token_metadata_lock> storage_service::get_token_metadata_lock() noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
return _shared_token_metadata.get_lock();
}
// Acquire the token_metadata lock and get a mutable_token_metadata_ptr.
// Pass that ptr to \c func, and when successfully done,
// replicate it to all cores.
//
// By default the merge_lock (that is unified with the token_metadata_lock)
// is acquired for mutating the token_metadata. Pass acquire_merge_lock::no
// when called from paths that already acquire the merge_lock, like
// db::schema_tables::do_merge_schema.
//
// Note: must be called on shard 0.
future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock acquire_merge_lock) noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
std::optional<token_metadata_lock> tmlock;
if (acquire_merge_lock) {
tmlock.emplace(co_await get_token_metadata_lock());
}
auto tmptr = co_await get_mutable_token_metadata_ptr();
co_await func(tmptr);
co_await replicate_to_all_cores(std::move(tmptr));
}
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
SCYLLA_ASSERT(this_shard_id() == 0);
try {
locator::dc_rack_fn get_dc_rack_by_host_id([this, &tm = *tmptr] (locator::host_id host_id) -> std::optional<locator::endpoint_dc_rack> {
const auto server_id = raft::server_id(host_id.uuid());
const auto* node = _topology_state_machine._topology.find(server_id);
if (node) {
return locator::endpoint_dc_rack {
.dc = node->second.datacenter,
.rack = node->second.rack,
};
}
return std::nullopt;
});
co_await tmptr->update_topology_change_info(get_dc_rack_by_host_id);
} catch (...) {
auto ep = std::current_exception();
slogger.error("Failed to update topology change info for {}: {}", reason, ep);
std::rethrow_exception(std::move(ep));
}
}
future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) {
return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable {
return update_topology_change_info(std::move(tmptr), std::move(reason));
}, acquire_merge_lock);
}
future<> storage_service::keyspace_changed(const sstring& ks_name) {
// The keyspace_changed notification is called on all shards
// after any keyspace schema change, but we need to mutate_token_metadata
// once after all shards are done with database::update_keyspace.
// mutate_token_metadata (via update_topology_change_info) will update the
// token metadata and effective_replication_map on all shards.
if (this_shard_id() != 0) {
return make_ready_future<>();
}
// Update pending ranges since keyspace can be changed after we calculate pending ranges.
sstring reason = ::format("keyspace {}", ks_name);
return update_topology_change_info(reason, acquire_merge_lock::no);
}
future<locator::mutable_token_metadata_ptr> storage_service::prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata) {
SCYLLA_ASSERT(this_shard_id() == 0);
if (hint) {
co_await replica::update_tablet_metadata(_db.local(), _qp, pending_token_metadata->tablets(), hint);
} else {
pending_token_metadata->set_tablets(co_await replica::read_tablet_metadata(_qp));
}
pending_token_metadata->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
co_return pending_token_metadata;
}
void storage_service::wake_up_topology_state_machine() noexcept {
_topology_state_machine.event.broadcast();
}
future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
auto change = co_await prepare_tablet_metadata(hint,
co_await get_mutable_token_metadata_ptr());
co_await replicate_to_all_cores(std::move(change));
wake_up_topology_state_machine();
}
future<> storage_service::process_tablet_split_candidate(table_id table) noexcept {
tasks::task_info tablet_split_task_info;
auto all_compaction_groups_split = [&] () mutable {
return _db.map_reduce0([table_ = table] (replica::database& db) {
auto all_split = db.find_column_family(table_).all_storage_groups_split();
return make_ready_future<bool>(all_split);
}, bool{true}, std::logical_and<bool>());
};
auto split_all_compaction_groups = [&] () -> future<> {
return _db.invoke_on_all([table, tablet_split_task_info] (replica::database& db) -> future<> {
return db.find_column_family(table).split_all_storage_groups(tablet_split_task_info);
});
};
exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
bool sleep = false;
try {
// Ensures that latest changes to tablet metadata, in group0, are visible
auto guard = co_await _group0->client().start_operation(_group0_as);
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
if (!tmap.needs_split()) {
release_guard(std::move(guard));
break;
}
tablet_split_task_info.id = tasks::task_id{tmap.resize_task_info().tablet_task_id.uuid()};
if (co_await all_compaction_groups_split()) {
slogger.debug("All compaction groups of table {} are split ready.", table);
release_guard(std::move(guard));
break;
} else {
release_guard(std::move(guard));
co_await split_all_compaction_groups();
}
} catch (const locator::no_such_tablet_map& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (const replica::no_such_column_family& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (const seastar::abort_requested_exception& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (raft::request_aborted& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (seastar::gate_closed_exception& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (...) {
slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds",
table, std::current_exception(), split_retry.sleep_time());
sleep = true;
}
if (sleep) {
try {
co_await split_retry.retry(_group0_as);
} catch (...) {
slogger.warn("Sleep in split monitor failed with {}", std::current_exception());
}
}
}
}
void storage_service::register_tablet_split_candidate(table_id table) noexcept {
if (this_shard_id() != 0) {
return;
}
try {
if (get_token_metadata().tablets().get_tablet_map(table).needs_split()) {
_tablet_split_candidates.push_back(table);
_tablet_split_monitor_event.signal();
}
} catch (...) {
slogger.error("Unable to register table {} as candidate for tablet splitting, due to {}", table, std::current_exception());
}
}
future<> storage_service::run_tablet_split_monitor() {
auto can_proceed = [this] { return !_async_gate.is_closed() && !_group0_as.abort_requested(); };
while (can_proceed()) {
auto tablet_split_candidates = std::exchange(_tablet_split_candidates, {});
for (auto candidate : tablet_split_candidates) {
co_await process_tablet_split_candidate(candidate);
}
co_await utils::clear_gently(tablet_split_candidates);
// Returns if there is more work to do, or shutdown was requested.
co_await _tablet_split_monitor_event.when([&] {
return _tablet_split_candidates.size() > 0 || !can_proceed();
});
}
}
void storage_service::start_tablet_split_monitor() {
if (this_shard_id() != 0) {
return;
}
slogger.info("Starting the tablet split monitor...");
_tablet_split_monitor = run_tablet_split_monitor();
}
future<> storage_service::snitch_reconfigured() {
SCYLLA_ASSERT(this_shard_id() == 0);
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
tmptr->update_topology(tmptr->get_my_id(), snitch->get_location());
return make_ready_future<>();
});
if (_gossiper.is_enabled()) {
co_await _gossiper.add_local_application_state(snitch->get_app_states());
}
}
future<> storage_service::local_topology_barrier() {
if (this_shard_id() != 0) {
co_await container().invoke_on(0, [] (storage_service& ss) {
return ss.local_topology_barrier();
});
co_return;
}
auto version = _topology_state_machine._topology.version;
utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail_before", [] {
throw std::runtime_error("raft_topology_barrier_and_drain_fail_before injected exception");
});
co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5)));
if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) {
for (auto& n : _topology_state_machine._topology.transition_nodes) {
if (!_address_map.find(locator::host_id{n.first.uuid()})) {
rtlogger.error("The topology transition is in a double write state but the IP of the node in transition is not known");
break;
}
}
}
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
const auto current_version = ss._shared_token_metadata.get()->get_version();
rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, "
"current version {}, stale versions (version: use_count): {}",
version, current_version, ss._shared_token_metadata.describe_stale_versions());
// This shouldn't happen under normal operation, it's only plausible
// if the topology change coordinator has
// moved to another node and managed to update the topology
// parallel to this method. The previous coordinator
// should be inactive now, so it won't observe this
// exception. By returning exception we aim
// to reveal any other conditions where this may arise.
if (current_version != version) {
co_await coroutine::return_exception(std::runtime_error(
::format("raft topology: command::barrier_and_drain, the version has changed, "
"version {}, current_version {}, the topology change coordinator "
" had probably migrated to another node",
version, current_version)));
}
co_await ss._shared_token_metadata.stale_versions_in_use();
co_await get_topology_session_manager().drain_closing_sessions();
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
});
}
future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
raft_topology_cmd_result result;
rtlogger.info("topology cmd rpc {} is called index={}", cmd.cmd, cmd_index);
try {
auto& raft_server = _group0->group0_server();
auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
co_await raft_server.read_barrier(&_group0_as);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;
}
auto id = raft_server.id();
group0_holder.release();
{
auto& state = _raft_topology_cmd_handler_state;
if (state.term != term) {
state.term = term;
} else if (cmd_index <= state.last_index) {
// Return an error since the command is outdated
co_return result;
}
state.last_index = cmd_index;
}
switch (cmd.cmd) {
case raft_topology_cmd::command::barrier: {
utils::get_local_injector().inject("raft_topology_barrier_fail",
[] { throw std::runtime_error("raft topology barrier failed due to error injection"); });
// This barrier might have been issued by the topology coordinator
// as a step in enabling a feature, i.e. it noticed that all
// nodes support some feature, then issue the barrier to make
// sure that all nodes observed this fact in their local state
// (a node cannot revoke support for a feature after that), and
// after receiving a confirmation from all nodes it will mark
// the feature as enabled.
//
// However, it might happen that the node handles this request
// early in the boot process, before it did the second feature
// check that happens when the node updates its metadata
// in `system.topology`. The node might have committed a command
// that advertises support for a feature as the last node
// to do so, crashed and now it doesn't support it. This should
// be rare, but it can happen and we can detect it right here.
std::exception_ptr ex;
try {
const auto& enabled_features = _topology_state_machine._topology.enabled_features;
const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features();
_feature_service.check_features(enabled_features, unsafe_to_disable_features);
} catch (const gms::unsupported_feature_exception&) {
ex = std::current_exception();
}
if (ex) {
rtlogger.error("feature check during barrier failed: {}", ex);
co_await drain();
break;
}
// we already did read barrier above
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case raft_topology_cmd::command::barrier_and_drain: {
co_await local_topology_barrier();
co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {
auto ks = handler.get("keyspace");
auto cf = handler.get("table");
auto last_token = dht::token::from_int64(std::atoll(handler.get("last_token")->data()));
auto table_id = _db.local().find_column_family(*ks, *cf).schema()->id();
auto stage = co_await replica::read_tablet_transition_stage(_qp, table_id, last_token);
if (stage) {
sstring want_stage(handler.get("stage").value());
if (*stage == locator::tablet_transition_stage_from_string(want_stage)) {
rtlogger.info("raft_topology_cmd: barrier handler waits");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("raft_topology_cmd: barrier handler continues");
}
}
});
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case raft_topology_cmd::command::stream_ranges: {
co_await with_scheduling_group(_stream_manager.local().get_scheduling_group(), coroutine::lambda([&] () -> future<> {
const auto* server_rs = _topology_state_machine._topology.find(id);
if (!server_rs) {
on_internal_error(rtlogger, format("Got {} request for node {} not found in topology", cmd.cmd, id));
}
const auto rs = server_rs->second;
auto tstate = _topology_state_machine._topology.tstate;
auto session = _topology_state_machine._topology.session;
if (!rs.ring || rs.ring->tokens.empty()) {
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
co_return;
}
if (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding) {
rtlogger.warn("got {} request while the topology transition state is {} and node state is {}", cmd.cmd, tstate, rs.state);
co_return;
}
utils::get_local_injector().inject("stream_ranges_fail",
[] { throw std::runtime_error("stream_range failed due to error injection"); });
utils::get_local_injector().inject("stop_before_streaming",
[] { std::raise(SIGSTOP); });
switch(rs.state) {
case node_state::bootstrapping:
case node_state::replacing: {
set_mode(mode::BOOTSTRAP);
// See issue #4001
co_await _view_builder.local().mark_existing_views_as_built();
co_await _db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_start();
}
});
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
if (rs.state == node_state::bootstrapping) {
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::bootstrap, _bootstrap_result, [this, &rs, session] (this auto) -> future<> {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, session);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, session);
}
});
co_await task->done();
}
// Bootstrap did not complete yet, but streaming did
utils::get_local_injector().inject("stop_after_streaming",
[] { std::raise(SIGSTOP); });
} else {
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, [this, &rs, &id, replaced_id, session] (this auto) -> future<> {
if (!_topology_state_machine._topology.req_param.contains(id)) {
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
}
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
auto ignored_nodes = _topology_state_machine._topology.ignored_nodes | std::views::transform([] (const auto& id) {
return locator::host_id(id.uuid());
}) | std::ranges::to<std::unordered_set<locator::host_id>>();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
auto tmptr = get_token_metadata_ptr();
auto replaced_node = locator::host_id(replaced_id.uuid());
co_await _repair.local().replace_with_repair(std::move(ks_erms), std::move(tmptr), rs.ring.value().tokens, std::move(ignored_nodes), replaced_node, session);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, session, locator::host_id{replaced_id.uuid()});
}
});
co_await task->done();
}
co_await _db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_end();
}
});
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::decommissioning: {
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::decommission, _decommission_result, [this] (this auto) -> future<> {
co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", utils::wait_for_message(60s));
co_await unbootstrap();
});
co_await task->done();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::normal: {
// If asked to stream a node in normal state it means that remove operation is running
// Find the node that is been removed
auto it = std::ranges::find_if(_topology_state_machine._topology.transition_nodes, [] (auto& e) { return e.second.state == node_state::removing; });
if (it == _topology_state_machine._topology.transition_nodes.end()) {
rtlogger.warn("got stream_ranges request while my state is normal but cannot find a node that is been removed");
break;
}
auto id = it->first;
rtlogger.debug("streaming to remove node {}", id);
tasks::task_info parent_info{tasks::task_id{it->second.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::removenode, _remove_result[id], [this, id = locator::host_id{id.uuid()}, session] (this auto) {
auto as = make_shared<abort_source>();
auto sub = _abort_source.subscribe([as] () noexcept {
if (!as->abort_requested()) {
as->request_abort();
}
});
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
std::list<locator::host_id> ignored_ips = _topology_state_machine._topology.ignored_nodes | std::views::transform([] (const auto& id) {
return locator::host_id(id.uuid());
}) | std::ranges::to<std::list<locator::host_id>>();
auto ops = seastar::make_shared<node_ops_info>(node_ops_id::create_random_id(), as, std::move(ignored_ips));
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), id, ops, session);
} else {
return removenode_with_stream(id, session, as);
}
});
co_await task->done();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::rebuilding: {
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::rebuild, _rebuild_result, [this, &source_dc, session] (this auto) -> future<> {
auto tmptr = get_token_metadata_ptr();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
utils::optional_param sdc_param;
bool force;
if ((force = source_dc.ends_with(":force"))) {
source_dc.resize(source_dc.size() - 6);
}
if (!source_dc.empty()) {
sdc_param.emplace(source_dc).set_user_provided().set_force(force);
}
co_await _repair.local().rebuild_with_repair(std::move(ks_erms), tmptr, std::move(sdc_param), session);
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
tmptr->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, session);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
}
for (const auto& [keyspace_name, erm] : ks_erms) {
auto ranges = co_await get_ranges_for_endpoint(*erm, my_host_id());
co_await streamer->add_ranges(keyspace_name, erm, std::move(ranges), _gossiper, false);
}
try {
co_await streamer->stream_async();
rtlogger.info("streaming for rebuild successful");
} catch (...) {
auto ep = std::current_exception();
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
rtlogger.warn("error while rebuilding node: {}", ep);
std::rethrow_exception(std::move(ep));
}
}
});
co_await task->done();
_rebuild_result.reset();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::left:
case node_state::none:
case node_state::removing:
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
id, rs.state));
break;
}
}));
}
break;
case raft_topology_cmd::command::wait_for_ip: {
std::vector<raft::server_id> ids;
{
const auto& new_nodes = _topology_state_machine._topology.new_nodes;
ids.reserve(new_nodes.size());
for (const auto& [id, rs]: new_nodes) {
ids.push_back(id);
}
}
rtlogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids);
for (const auto& id: ids) {
co_await wait_for_gossiper(id, _gossiper, _abort_source);
}
rtlogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids);
result.status = raft_topology_cmd_result::command_status::success;
break;
}
}
} catch (const raft::request_aborted& e) {
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
} catch (...) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
}
rtlogger.info("topology cmd rpc {} completed with status={} index={}",
cmd.cmd, (result.status == raft_topology_cmd_result::command_status::success) ? "succeeded" : "failed", cmd_index);
co_return result;
}
future<> storage_service::update_fence_version(token_metadata::version_t new_version) {
return container().invoke_on_all([new_version] (storage_service& ss) {
ss._qp.proxy().update_fence_version(new_version);
});
}
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
// metadata according to his intent and trigger the operation,
// without passing any transient information.
//
// If the operation succeeds, and the coordinator is still valid, it means
// that the operation intended by the coordinator was performed.
// If the coordinator is no longer valid, the operation may succeed but
// the actual operation performed may be different than intended, it may
// be the one intended by the new coordinator. This is not a problem
// because the old coordinator should do nothing with such result.
//
// The triggers may be retried. They may also be reordered with older triggers, from
// the same or a different coordinator. There is a protocol which ensures that
// stale triggers won't cause operations to run beyond the migration stage they were
// intended for. For example, that streaming is not still running after the coordinator
// moved past the "streaming" stage, and that it won't be started when the stage is not appropriate.
// A non-stale trigger is the one which completed successfully and caused the valid coordinator
// to advance tablet migration to the next stage. Other triggers are called stale.
// We can divide stale triggers into categories:
// (1) Those which start after the tablet was moved to the next stage
// Those which start before the tablet was moved to the next stage,
// (2) ...but after the non-stale trigger finished
// (3) ...but before the non-stale trigger finished
//
// By "start" I mean the atomic block which inserts into _tablet_ops, and by "finish" I mean
// removal from _tablet_ops.
// So event ordering is local from the perspective of this replica, and is linear because
// this happens on the same shard.
//
// What prevents (1) from running is the fact that triggers check the state of tablet
// metadata, and will fail immediately if the stage is not appropriate. It can happen
// that the trigger is so stale that it will match with an appropriate stage of the next
// migration of the same tablet. This is not a problem because we fall into the same
// category as a stale trigger which was started in the new migration, so cases (2) or (3) apply.
//
// What prevents (2) from running is the fact that after the coordinator moves on to
// the next stage, it executes a token metadata barrier, which will wait for such triggers
// to complete as they hold on to erm via tablet_metadata_barrier. They should be aborted
// soon after the coordinator changes the stage by the means of tablet_metadata_barrier::get_abort_source().
//
// What prevents (3) from running is that they will join with the non-stale trigger, or non-stale
// trigger will join with them, depending on which came first. In that case they finish at the same time.
//
// It's very important that the global token metadata barrier involves all nodes which
// may receive stale triggers started in the previous stage, so that those nodes will
// see tablet metadata which reflects group0 state. This will cut-off stale triggers
// as soon as the coordinator moves to the next stage.
future<tablet_operation_result> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<tablet_operation_result>(locator::tablet_metadata_guard&)> op) {
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_group0_as);
if (_tablet_ops.contains(tablet)) {
rtlogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
auto result = co_await _tablet_ops[tablet].done.get_future();
co_return result;
}
locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
auto& as = guard.get_abort_source();
auto sub = _group0_as.subscribe([&as] () noexcept {
as.request_abort();
});
auto async_gate_holder = _async_gate.hold();
promise<tablet_operation_result> p;
_tablet_ops.emplace(tablet, tablet_operation {
op_name, seastar::shared_future<tablet_operation_result>(p.get_future())
});
auto erase_registry_entry = seastar::defer([&] {
_tablet_ops.erase(tablet);
});
try {
auto result = co_await op(guard);
p.set_value(result);
rtlogger.debug("{} for tablet migration of {} successful", op_name, tablet);
co_return result;
} catch (...) {
p.set_exception(std::current_exception());
rtlogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception());
throw;
}
}
future<service::tablet_operation_repair_result> storage_service::repair_tablet(locator::global_tablet_id tablet, service::session_id session_id) {
auto result = co_await do_tablet_operation(tablet, "Repair", [this, tablet, session_id] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
slogger.debug("Executing repair for tablet={}", tablet);
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this repair was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::repair && trinfo->stage != locator::tablet_transition_stage::rebuild_repair) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at repair", tablet));
}
auto session = session_id ? session_id : trinfo->session_id;
slogger.debug("repair_tablet: tablet={} session_id={}", tablet, session);
tasks::task_info global_tablet_repair_task_info;
std::optional<locator::tablet_replica_set> replicas = std::nullopt;
if (trinfo->stage == locator::tablet_transition_stage::repair) {
global_tablet_repair_task_info = {tasks::task_id{tmap.get_tablet_info(tablet.tablet).repair_task_info.tablet_task_id.uuid()}, 0};
} else {
auto migration_streaming_info = get_migration_streaming_info(get_token_metadata_ptr()->get_topology(), tmap.get_tablet_info(tablet.tablet), *trinfo);
replicas = locator::tablet_replica_set{migration_streaming_info.read_from.begin(), migration_streaming_info.read_from.end()};
}
utils::get_local_injector().inject("repair_tablet_fail_on_rpc_call",
[] { throw std::runtime_error("repair_tablet failed due to error injection"); });
auto time = co_await _repair.local().repair_tablet(_address_map, guard, tablet, global_tablet_repair_task_info, session, std::move(replicas), trinfo->stage);
co_return service::tablet_operation_repair_result{time};
});
if (std::holds_alternative<service::tablet_operation_repair_result>(result)) {
co_return std::get<service::tablet_operation_repair_result>(result);
}
on_internal_error(slogger, "Got wrong tablet_operation_repair_result");
}
future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id tablet, locator::tablet_replica leaving, locator::tablet_replica pending) {
if (leaving.host != pending.host) {
throw std::runtime_error(fmt::format("Leaving and pending tablet replicas belong to different nodes, {} and {} respectively",
leaving.host, pending.host));
}
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
bool leave_unsealed = true;
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto& table = _db.local().find_column_family(tablet.table);
auto op = table.stream_in_progress();
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
});
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
auto ignore_digest_mismatch = _db.local().get_config().ignore_component_digest_mismatch();
auto load_sstable = [leave_unsealed, ignore_digest_mismatch] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto& mng = t.get_sstables_manager();
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
// The loader will consider current shard as sstable owner, despite the tablet sharder
// will still point to leaving replica at this stage in migration. If node goes down,
// SSTables will be loaded at pending replica and migration is retried, so correctness
// wise, we're good.
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
.unsealed_sstable = leave_unsealed,
.ignore_component_digest_mismatch = ignore_digest_mismatch };
co_await sst->load(sharder, cfg);
co_return sst;
};
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
// Loads cloned sstables from leaving replica into pending one.
auto& table = _db.local().find_column_family(tablet.table);
auto& sstm = table.get_sstables_manager();
auto op = table.stream_in_progress();
dht::auto_refreshing_sharder sharder(table.shared_from_this());
std::unordered_set<sstables::shared_sstable> ssts;
for (auto&& sst_desc : d) {
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (ssts.contains(loading_sst)) {
auto cfg = sstm.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
}
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
co_await utils::get_local_injector().inject("block_tablet_streaming", [this, &tablet] (auto& handler) -> future<> {
const auto keyspace = handler.get("keyspace");
const auto table = handler.get("table");
SCYLLA_ASSERT(keyspace);
SCYLLA_ASSERT(table);
auto s = _db.local().find_column_family(tablet.table).schema();
bool should_block = s->ks_name() == *keyspace && s->cf_name() == *table;
while (should_block && !handler.poll_for_message() && !_async_gate.is_closed()) {
co_await sleep(std::chrono::milliseconds(100));
}
});
co_await do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at streaming", tablet));
}
auto topo_guard = trinfo->session_id;
if (!trinfo->session_id) {
throw std::runtime_error(fmt::format("Tablet {} session is not set", tablet));
}
auto pending_replica = trinfo->pending_replica;
if (!pending_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no pending replica", tablet));
}
if (pending_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has pending replica different than this one", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
std::optional<locator::tablet_replica> leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
locator::tablet_migration_streaming_info streaming_info = get_migration_streaming_info(tm->get_topology(), tinfo, *trinfo);
locator::tablet_replica_set read_from{streaming_info.read_from.begin(), streaming_info.read_from.end()};
if (trinfo->transition == locator::tablet_transition_kind::rebuild_v2) {
auto nearest_hosts = read_from | std::views::transform([] (const auto& tr) {
return tr.host;
}) | std::ranges::to<host_id_vector_replica_set>();
tm->get_topology().sort_by_proximity(trinfo->pending_replica->host, nearest_hosts);
if (!nearest_hosts.empty()) {
auto it = std::find_if(read_from.begin(), read_from.end(), [nearest_host = nearest_hosts[0]] (const auto& tr) { return tr.host == nearest_host; });
if (it == read_from.end()) {
on_internal_error(slogger, "Nearest replica not found");
}
read_from = { *it };
} else {
read_from = {};
}
}
streaming::stream_reason reason = std::invoke([&] {
switch (trinfo->transition) {
case locator::tablet_transition_kind::migration: return streaming::stream_reason::tablet_migration;
case locator::tablet_transition_kind::intranode_migration: return streaming::stream_reason::tablet_migration;
case locator::tablet_transition_kind::rebuild: return streaming::stream_reason::rebuild;
case locator::tablet_transition_kind::rebuild_v2: return streaming::stream_reason::rebuild;
default:
throw std::runtime_error(fmt::format("stream_tablet(): Invalid tablet transition: {}", trinfo->transition));
}
});
if (trinfo->transition != locator::tablet_transition_kind::intranode_migration && _feature_service.file_stream && _db.local().get_config().enable_file_stream()) {
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
rtlogger.info("migration_streaming_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
auto dst_node = trinfo->pending_replica->host;
auto dst_shard_id = trinfo->pending_replica->shard;
auto transition = trinfo->transition;
// Release token_metadata_ptr early so it will no block barriers for other migrations
// Don't access trinfo after this.
tm = {};
for (auto src : read_from) {
// Use file stream for tablet to stream data
auto ops_id = streaming::file_stream_id::create_random_id();
auto start_time = std::chrono::steady_clock::now();
size_t stream_bytes = 0;
try {
auto& table = _db.local().find_column_family(tablet.table);
slogger.debug("stream_sstables[{}] Streaming for tablet {} of {} started table={}.{} range={} src={}",
ops_id, transition, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, src);
auto resp = co_await streaming::tablet_stream_files(ops_id, table, range, src.host, dst_node, dst_shard_id, _messaging.local(), _abort_source, topo_guard);
stream_bytes = resp.stream_bytes;
slogger.debug("stream_sstables[{}] Streaming for tablet migration of {} successful", ops_id, tablet);
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
auto bw = utils::pretty_printed_throughput(stream_bytes, duration);;
slogger.info("stream_sstables[{}] Streaming for tablet migration of {} finished table={}.{} range={} stream_bytes={} stream_time={} stream_bw={}",
ops_id, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, stream_bytes, duration, bw);
} catch (...) {
slogger.warn("stream_sstables[{}] Streaming for tablet migration of {} from {} failed: {}", ops_id, tablet, leaving_replica, std::current_exception());
throw;
}
}
} else { // Caution: following code is intentionally unindented to be in sync with OSS
if (trinfo->transition == locator::tablet_transition_kind::intranode_migration) {
if (!leaving_replica || leaving_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Invalid leaving replica for intra-node migration, tablet: {}, leaving: {}",
tablet, leaving_replica));
}
tm = nullptr;
co_await utils::get_local_injector().inject("intranode_migration_streaming_wait", [this] (auto& handler) -> future<> {
rtlogger.info("intranode_migration_streaming: waiting");
while (!handler.poll_for_message() && !_async_gate.is_closed()) {
co_await sleep(std::chrono::milliseconds(5));
}
rtlogger.info("intranode_migration_streaming: released");
});
rtlogger.info("Starting intra-node streaming of tablet {} from shard {} to {}", tablet, leaving_replica->shard, pending_replica->shard);
co_await clone_locally_tablet_storage(tablet, *leaving_replica, *pending_replica);
rtlogger.info("Finished intra-node streaming of tablet {} from shard {} to {}", tablet, leaving_replica->shard, pending_replica->shard);
} else {
if (leaving_replica && leaving_replica->host == tm->get_my_id()) {
throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}",
tablet, leaving_replica->shard, trinfo->pending_replica->shard));
}
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
rtlogger.info("migration_streaming_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto my_id = tm->get_my_id();
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm),
guard.get_abort_source(),
my_id, _snitch.local()->get_location(),
format("Tablet {}", trinfo->transition),
reason,
topo_guard,
std::move(tables));
tm = nullptr;
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto r: read_from) {
ranges_per_endpoint[r.host].emplace_back(range);
}
streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint));
slogger.debug("Streaming for tablet migration of {} started table={}.{} range={}", tablet, table.schema()->ks_name(), table.schema()->cf_name(), range);
co_await streamer->stream_async();
slogger.info("Streaming for tablet migration of {} finished table={}.{} range={}", tablet, table.schema()->ks_name(), table.schema()->cf_name(), range);
}
} // Traditional streaming vs file-based streaming.
// If new pending tablet replica needs splitting, streaming waits for it to complete.
// That's to provide a guarantee that once migration is over, the coordinator can finalize
// splitting under the promise that compaction groups of tablets are all split, ready
// for the subsequent topology change.
//
// FIXME:
// We could do the splitting not in the streaming stage, but in a later stage, so that
// from the tablet scheduler's perspective migrations blocked on compaction are not
// participating in streaming anymore (which is true), so it could schedule more
// migrations. This way compaction would run in parallel with streaming which can
// reduce the delay.
co_await _db.invoke_on(pending_replica->shard, [tablet] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
return table.maybe_split_compaction_group_of(tablet.tablet);
});
co_await utils::get_local_injector().inject("pause_after_streaming_tablet", [] (auto& handler) {
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(1));
});
co_return tablet_operation_result();
});
}
future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
utils::get_local_injector().inject("cleanup_tablet_crash", [] {
slogger.info("Crashing tablet cleanup");
_exit(1);
});
co_await do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
shard_id shard;
{
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this cleanup was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage == locator::tablet_transition_stage::cleanup) {
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
std::optional<locator::tablet_replica> leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (!leaving_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no leaving replica", tablet));
}
if (leaving_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has leaving replica different than this one", tablet));
}
shard = leaving_replica->shard;
} else if (trinfo->stage == locator::tablet_transition_stage::cleanup_target) {
if (!trinfo->pending_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no pending replica", tablet));
}
if (trinfo->pending_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has pending replica different than this one", tablet));
}
shard = trinfo->pending_replica->shard;
} else {
throw std::runtime_error(fmt::format("Tablet {} stage is not at cleanup/cleanup_target", tablet));
}
}
co_await _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks, &vbw = _view_building_worker] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
vbw.local().cleanup_staging_sstables(table.get_effective_replication_map(), tablet.table, tablet.tablet);
return table.cleanup_tablet(db, sys_ks.local(), tablet.tablet);
});
co_return tablet_operation_result();
});
}
static bool increases_replicas_per_rack(const locator::topology& topology, const locator::tablet_info& tinfo, sstring dst_rack) {
std::unordered_map<sstring, size_t> m;
for (auto& replica: tinfo.replicas) {
m[topology.get_rack(replica.host)]++;
}
auto max = *std::ranges::max_element(m | std::views::values);
return m[dst_rack] + 1 > max;
}
future<service::group0_guard> storage_service::get_guard_for_tablet_update() {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
co_return guard;
}
future<bool> storage_service::exec_tablet_update(service::group0_guard guard, utils::chunked_vector<canonical_mutation> updates, sstring reason) {
rtlogger.info("{}", reason);
rtlogger.trace("do update {} reason {}", updates, reason);
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
.set_version(_topology_state_machine._topology.version + 1)
.build());
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
co_return true;
} catch (group0_concurrent_modification&) {
rtlogger.debug("exec_tablet_update(): concurrent modification, retrying");
}
co_return false;
}
replica::tablet_mutation_builder storage_service::tablet_mutation_builder_for_base_table(api::timestamp_type ts, table_id table) {
auto base_table = get_token_metadata_ptr()->tablets().get_base_table(table);
return replica::tablet_mutation_builder(ts, base_table);
}
// Repair the tablets contain the tokens and wait for the repair to finish
// This is used to run a manual repair requested by user from the restful API.
future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant,
std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion, locator::tablet_repair_incremental_mode incremental_mode) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.add_repair_tablet_request(table, std::move(tokens_variant), std::move(hosts_filter), std::move(dcs_filter), await_completion, incremental_mode);
});
}
bool all_tokens = std::holds_alternative<all_tokens_tag>(tokens_variant);
utils::chunked_vector<dht::token> tokens;
if (!all_tokens) {
tokens = std::get<utils::chunked_vector<dht::token>>(tokens_variant);
}
if (!_feature_service.tablet_repair_scheduler) {
throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet");
}
auto repair_task_info = locator::tablet_task_info::make_user_repair_request(hosts_filter, dcs_filter, incremental_mode);
auto res = std::unordered_map<sstring, sstring>{{sstring("tablet_task_id"), repair_task_info.tablet_task_id.to_sstring()}};
auto start = std::chrono::steady_clock::now();
slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} hosts_filter={} dcs_filter={} incremental_mode={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, hosts_filter, dcs_filter, incremental_mode);
while (true) {
auto guard = co_await get_guard_for_tablet_update();
// we don't allow requesting repair on tablets of colocated tables, because the repair task info
// is stored on the base table's tablet map which is shared by all the tables that are colocated with it.
// we don't have a way currently to store the repair task info for a specific colocated table.
// repair can only be requested for the base table, and this will repair the base table's tablets
// and all its colocated tablets as well.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot set repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be made only on the base table. "
"Repairing the base table will also repair all tables colocated with it.",
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
utils::chunked_vector<canonical_mutation> updates;
if (all_tokens) {
tokens.clear();
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) -> future<> {
auto last_token = tmap.get_last_token(tid);
tokens.push_back(last_token);
co_return;
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto& req_id = tinfo.repair_task_info.tablet_task_id;
if (req_id) {
throw std::runtime_error(fmt::format("Tablet {} is already in repair by tablet_task_id={}",
locator::global_tablet_id{table, tid}, req_id));
}
auto last_token = tmap.get_last_token(tid);
updates.emplace_back(
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);
if (co_await exec_tablet_update(std::move(guard), std::move(updates), std::move(reason))) {
break;
}
}
if (!await_completion) {
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
slogger.info("Issued tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} duration={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, duration);
co_return res;
}
co_await _topology_state_machine.event.wait([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return std::all_of(tokens.begin(), tokens.end(), [&] (const dht::token& token) {
auto id = tmap.get_tablet_id(token);
return tmap.get_tablet_info(id).repair_task_info.tablet_task_id != repair_task_info.tablet_task_id;
});
});
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
slogger.info("Finished tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} duration={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, duration);
co_return res;
}
// Delete a tablet repair request by the given tablet_task_id
future<> storage_service::del_repair_tablet_request(table_id table, locator::tablet_task_id tablet_task_id) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.del_repair_tablet_request(table, tablet_task_id);
});
}
if (!_feature_service.tablet_repair_scheduler) {
throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet");
}
slogger.info("Deleting tablet repair request by API request table_id={} tablet_task_id={}", table, tablet_task_id);
while (true) {
auto guard = co_await get_guard_for_tablet_update();
// see add_repair_tablet_request. repair requests can only be added on base tables.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot delete repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be added and deleted only on the base table.",
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
utils::chunked_vector<canonical_mutation> updates;
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) -> future<> {
auto& tinfo = tmap.get_tablet_info(tid);
auto& req_id = tinfo.repair_task_info.tablet_task_id;
if (req_id != tablet_task_id) {
co_return;
}
auto last_token = tmap.get_last_token(tid);
auto* trinfo = tmap.get_tablet_transition_info(tid);
auto update = tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.del_repair_task_info(last_token, _feature_service);
if (trinfo && trinfo->transition == locator::tablet_transition_kind::repair) {
update.del_session(last_token);
}
updates.emplace_back(update.build());
});
sstring reason = format("Deleting tablet repair request by API request tablet_id={} tablet_task_id={}", table, tablet_task_id);
if (co_await exec_tablet_update(std::move(guard), std::move(updates), std::move(reason))) {
break;
}
}
slogger.info("Deleted tablet repair request by API request table_id={} tablet_task_id={}", table, tablet_task_id);
}
future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.move_tablet(table, token, src, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
if (!locator::contains(tinfo.replicas, src)) {
throw std::runtime_error(seastar::format("Tablet {} has no replica on {}", gid, src));
}
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(seastar::format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(seastar::format("Host {} does not have shard {}", *node, dst.shard));
}
if (src == dst) {
sstring reason = format("No-op move of tablet {} to {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
}
if (src.host != dst.host && locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} has replica on {}", gid, dst.host));
}
auto src_dc_rack = get_token_metadata().get_topology().get_location(src.host);
auto dst_dc_rack = get_token_metadata().get_topology().get_location(dst.host);
if (src_dc_rack.dc != dst_dc_rack.dc) {
if (force) {
slogger.warn("Moving tablet {} between DCs ({} and {})", gid, src_dc_rack.dc, dst_dc_rack.dc);
} else {
throw std::runtime_error(fmt::format("Attempted to move tablet {} between DCs ({} and {})", gid, src_dc_rack.dc, dst_dc_rack.dc));
}
}
if (src_dc_rack.rack != dst_dc_rack.rack && increases_replicas_per_rack(get_token_metadata().get_topology(), tinfo, dst_dc_rack.rack)) {
if (force) {
slogger.warn("Moving tablet {} between racks ({} and {}) which reduces availability", gid, src_dc_rack.rack, dst_dc_rack.rack);
} else {
throw std::runtime_error(fmt::format("Attempted to move tablet {} between racks ({} and {}) which would reduce availability", gid, src_dc_rack.rack, dst_dc_rack.rack));
}
}
auto migration_task_info = src.host == dst.host ? locator::tablet_task_info::make_intranode_migration_request()
: locator::tablet_task_info::make_migration_request();
migration_task_info.sched_nr++;
migration_task_info.sched_time = db_clock::now();
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, src.host == dst.host ? locator::tablet_transition_kind::intranode_migration
: locator::tablet_transition_kind::migration)
.set_migration_task_info(last_token, std::move(migration_task_info), _feature_service)
.build());
if (_feature_service.view_building_coordinator) {
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, src, last_token);
}
sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<> storage_service::add_tablet_replica(table_id table, dht::token token, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.add_tablet_replica(table, token, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", *node, dst.shard));
}
if (locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} has replica on {}", gid, dst.host));
}
locator::tablet_replica_set new_replicas(tinfo.replicas);
new_replicas.push_back(dst);
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, new_replicas)
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
.build());
sstring reason = format("Adding replica to tablet {}, node {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<> storage_service::del_tablet_replica(table_id table, dht::token token, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.del_tablet_replica(table, token, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", *node, dst.shard));
}
if (!locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} doesn't have replica on {}", gid, dst.host));
}
locator::tablet_replica_set new_replicas;
new_replicas.reserve(tinfo.replicas.size() - 1);
std::copy_if(tinfo.replicas.begin(), tinfo.replicas.end(), std::back_inserter(new_replicas), [&dst] (auto r) { return r != dst; });
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, new_replicas)
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
.build());
if (_feature_service.view_building_coordinator) {
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, dst, last_token);
}
sstring reason = format("Removing replica from tablet {}, node {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables() {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// topology coordinator only exists in shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.load_stats_for_tablet_based_tables();
});
}
// Refresh is triggered after table creation, need to make sure we see the new tablets.
co_await _group0->group0_server().read_barrier(&_group0_as);
using table_ids_t = std::unordered_set<table_id>;
const auto table_ids = co_await std::invoke([this] () -> future<table_ids_t> {
table_ids_t ids;
co_await _db.local().get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr<replica::table> table) mutable {
if (table->uses_tablets()) {
ids.insert(id);
}
return make_ready_future<>();
});
co_return std::move(ids);
});
// Helps with intra-node migration by serializing with changes to token metadata, so shards
// participating in the migration will see migration in same stage, therefore preventing
// double accounting (anomaly) in the reported size.
auto tmlock = co_await get_token_metadata_lock();
const locator::host_id this_host = _db.local().get_token_metadata().get_my_id();
// Align to 64 bytes to avoid cache line ping-pong when updating size in map_reduce0() below
struct alignas(64) aligned_tablet_size {
uint64_t size = 0;
};
std::vector<aligned_tablet_size> tablet_sizes_per_shard(smp::count);
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
// So if there are 1k nodes, there will be 1k RPCs in total.
auto load_stats = co_await _db.map_reduce0([&table_ids, &this_host, &tablet_sizes_per_shard] (replica::database& db) -> future<locator::load_stats> {
locator::load_stats load_stats{};
auto& tables_metadata = db.get_tables_metadata();
for (const auto& id : table_ids) {
auto table = tables_metadata.get_table_if_exists(id);
if (!table) {
continue;
}
locator::combined_load_stats combined_ls { table->table_load_stats() };
load_stats.tables.emplace(id, std::move(combined_ls.table_ls));
tablet_sizes_per_shard[this_shard_id()].size += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
co_await coroutine::maybe_yield();
}
co_return std::move(load_stats);
}, locator::load_stats{}, std::plus<locator::load_stats>());
load_stats.capacity[this_host] = _disk_space_monitor->space().capacity;
load_stats.critical_disk_utilization[this_host] = _disk_space_monitor->disk_utilization() > _db.local().get_config().critical_disk_utilization_level();
const std::filesystem::space_info si = _disk_space_monitor->space();
load_stats.capacity[this_host] = si.capacity;
locator::tablet_load_stats& tls = load_stats.tablet_stats[this_host];
const uint64_t config_capacity = _db.local().get_config().data_file_capacity();
if (config_capacity != 0) {
tls.effective_capacity = config_capacity;
} else {
uint64_t sum_tablet_sizes = 0;
for (const auto& ts : tablet_sizes_per_shard) {
sum_tablet_sizes += ts.size;
}
tls.effective_capacity = si.available + sum_tablet_sizes;
}
co_return std::move(load_stats);
}
future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) {
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
bool topology_busy;
while ((topology_busy = _topology_state_machine._topology.is_busy())) {
const auto tstate = *_topology_state_machine._topology.tstate;
if (tstate == topology::transition_state::tablet_draining ||
tstate == topology::transition_state::tablet_migration) {
break;
}
rtlogger.debug("transit_tablet(): topology state machine is busy: {}", tstate);
release_guard(std::move(guard));
co_await _topology_state_machine.event.when();
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
auto tid = tmap.get_tablet_id(token);
if (tmap.get_tablet_transition_info(tid)) {
throw std::runtime_error(fmt::format("Tablet {} is in transition", locator::global_tablet_id{table, tid}));
}
auto [ updates, reason ] = prepare_mutations(tmap, guard.write_timestamp());
rtlogger.info("{}", reason);
rtlogger.trace("do update {} reason {}", updates, reason);
{
topology_mutation_builder builder(guard.write_timestamp());
if (topology_busy) {
rtlogger.debug("transit_tablet({}): topology busy, keeping transition state", locator::global_tablet_id{table, tid});
} else {
builder.set_transition_state(topology::transition_state::tablet_migration);
}
builder.set_version(_topology_state_machine._topology.version + 1);
updates.push_back(builder.build());
}
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("transit_tablet(): concurrent modification, retrying");
}
}
// Wait for transition to finish.
co_await _topology_state_machine.event.when([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token));
});
}
future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.set_tablet_balancing_enabled(enabled);
});
}
utils::UUID request_id;
auto reason = format("Setting tablet balancing to {}", enabled);
while (true) {
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(
topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(enabled)
.build()));
if (!enabled
&& _feature_service.topology_noop_request && _feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
updates.push_back(canonical_mutation(
topology_mutation_builder(guard.write_timestamp())
.queue_global_topology_request_id(request_id)
.build()));
updates.push_back(canonical_mutation(
topology_request_tracking_mutation_builder(request_id, _feature_service.topology_requests_type_column)
.set("done", false)
.set("request_type", global_topology_request::noop_request)
.build()));
}
rtlogger.info("{}", reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
}
}
if (request_id) {
co_await wait_for_topology_request_completion(request_id);
} else if (!enabled) {
while (_topology_state_machine._topology.is_busy()) {
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
co_await _topology_state_machine.event.when();
}
}
}
future<> storage_service::await_topology_quiesced() {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_await container().invoke_on(0, [&] (auto& ss) {
return ss.await_topology_quiesced();
});
co_return;
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_await _topology_state_machine.await_not_busy();
}
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.verify_topology_quiesced(expected_version);
});
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
rtlogger.info("received request to join from host_id: {}", params.host_id);
if (params.cluster_name != _db.local().get_config().cluster_name()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Cluster name check failed. This node cannot join the cluster "
"because it expected cluster name \"{}\" and not \"{}\"",
params.cluster_name,
_db.local().get_config().cluster_name()),
};
co_return result;
}
if (params.snitch_name != _db.local().get_snitch_name()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Snitch name check failed. This node cannot join the cluster "
"because it uses \"{}\" and not \"{}\"",
params.snitch_name,
_db.local().get_snitch_name()),
};
co_return result;
}
co_await _topology_state_machine.event.when([this] {
// The first node defines the cluster and inserts its entry to the
// `system.topology` without checking anything. It is possible that the
// `join_node_request_handler` fires before the first node sets itself
// as a normal node, therefore we might need to wait until that happens,
// here. If we didn't do it, the topology coordinator could handle the
// joining node as the first one and skip the necessary join node
// handshake.
return !_topology_state_machine._topology.normal_nodes.empty();
});
auto& g0_server = _group0->group0_server();
auto g0_holder = _group0->hold_group0_gate();
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
// There is a peculiar case that can happen if the leader is killed
// and then replaced very quickly:
//
// - Cluster with nodes `A`, `B`, `C` - `A` is the topology
// coordinator/group0 leader,
// - `A` is killed,
// - New node `D` attempts to replace `A` with the same IP as `A`,
// sends `join_node_request` rpc to node `B`,
// - Node `B` handles the RPC and wants to perform group0 operation
// and wants to perform a barrier - still thinks that `A`
// is the leader and is alive, sends an RPC to its IP,
// - `D` accidentally receives the request that was meant to `A`
// but throws an exception because of host_id mismatch,
// - Failure is propagated back to `B`, and then to `D` - and `D`
// fails the replace operation.
//
// We can try to detect if this failure might happen: if the new node
// is going to replace but the ID of the replaced node is the same
// as the leader, wait for a short while until a reelection happens.
// If replaced ID == leader ID, then this indicates either the situation
// above or an operator error (actually trying to replace a live node).
const auto timeout = std::chrono::seconds(10);
rtlogger.warn("the node {} which was requested to be"
" replaced has the same ID as the current group 0 leader ({});"
" this looks like an attempt to join a node with the same IP"
" as a leader which might have just crashed; waiting for"
" a reelection",
params.host_id, g0_server.current_leader());
abort_source as;
timer<lowres_clock> t;
t.set_callback([&as] {
as.request_abort();
});
t.arm(timeout);
try {
while (!g0_server.current_leader() || *params.replaced_id == g0_server.current_leader()) {
// FIXME: Wait for the next term instead of sleeping in a loop
// Waiting for state change is not enough because a new leader
// might be chosen without us going through the candidate state.
co_await sleep_abortable(std::chrono::milliseconds(100), as);
}
} catch (abort_requested_exception&) {
rtlogger.warn("the node {} tries to replace the"
" current leader {} but the leader didn't change within"
" {}s. Rejecting the node",
params.host_id,
*params.replaced_id,
std::chrono::duration_cast<std::chrono::seconds>(timeout).count());
result.result = join_node_request_result::rejected{
.reason = format(
"It is only allowed to replace dead nodes, however the"
" node that was requested to be replaced is still seen"
" as the group0 leader after {}s, which indicates that"
" it might be still alive. You are either trying to replace"
" a live node or trying to replace a node very quickly"
" after it went down and reelection didn't happen within"
" the timeout. Refusing to continue",
std::chrono::duration_cast<std::chrono::seconds>(timeout).count()),
};
co_return result;
}
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
const auto& rs = p->second;
if (rs.state == node_state::left) {
rtlogger.warn("the node {} attempted to join",
" but it was removed from the cluster. Rejecting"
" the node",
params.host_id);
result.result = join_node_request_result::rejected{
.reason = "The node has already been removed from the cluster",
};
} else {
rtlogger.warn("the node {} attempted to join",
" again after an unfinished attempt but it is no longer"
" allowed to do so. Rejecting the node",
params.host_id);
result.result = join_node_request_result::rejected{
.reason = "The node requested to join before but didn't finish the procedure. "
"Please clear the data directory and restart.",
};
}
co_return result;
}
utils::UUID old_request_id;
if (params.replaced_id) {
auto rhid = locator::host_id{params.replaced_id->uuid()};
if (is_me(rhid) || _gossiper.is_alive(rhid)) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("tried to replace alive node {}", *params.replaced_id),
};
co_return result;
}
auto replaced_it = _topology_state_machine._topology.normal_nodes.find(*params.replaced_id);
if (replaced_it == _topology_state_machine._topology.normal_nodes.end()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Cannot replace node {} because it is not in the 'normal' state", *params.replaced_id),
};
co_return result;
}
if (replaced_it->second.datacenter != params.datacenter || replaced_it->second.rack != params.rack) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace node in {}/{} with node in {}/{}", replaced_it->second.datacenter, replaced_it->second.rack, params.datacenter, params.rack),
};
co_return result;
}
auto is_zero_token = params.num_tokens == 0 && params.tokens_string.empty();
if (replaced_it->second.ring.value().tokens.empty() && !is_zero_token) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace the zero-token node {} with a token-owning node", *params.replaced_id),
};
co_return result;
}
if (!replaced_it->second.ring.value().tokens.empty() && is_zero_token) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace the token-owning node {} with a zero-token node", *params.replaced_id),
};
co_return result;
}
if (_topology_state_machine._topology.requests.find(*params.replaced_id) != _topology_state_machine._topology.requests.end()) {
old_request_id = replaced_it->second.request_id;
}
}
auto mutation = build_mutation_from_join_params(params, guard.write_timestamp(), old_request_id);
topology_change change{{std::move(mutation)}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
format("raft topology: placing join request for {}", params.host_id));
co_await utils::get_local_injector().inject("join-node-before-add-entry", utils::wait_for_message(5min));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("join_node_request: concurrent operation is detected, retrying.");
}
}
rtlogger.info("placed join request for {}", params.host_id);
// Success
result.result = join_node_request_result::ok {};
co_return result;
}
future<join_node_response_result> storage_service::join_node_response_handler(join_node_response_params params) {
SCYLLA_ASSERT(this_shard_id() == 0);
// Usually this handler will only run once, but there are some cases where we might get more than one RPC,
// possibly happening at the same time, e.g.:
//
// - Another node becomes the topology coordinator while the old one waits for the RPC,
// - Topology coordinator finished the RPC but failed to update the group 0 state.
// Serialize handling the responses.
auto lock = co_await get_units(_join_node_response_handler_mutex, 1);
// Wait until we sent and completed the join_node_request RPC
co_await _join_node_request_done.get_shared_future(_group0_as);
if (_join_node_response_done.available()) {
// We already handled this RPC. No need to retry it.
rtlogger.info("the node got join_node_response RPC for the second time, ignoring");
if (std::holds_alternative<join_node_response_params::accepted>(params.response)
&& _join_node_response_done.failed()) {
// The topology coordinator accepted the node that was rejected before or failed while handling
// the response. Inform the coordinator about it so it moves the node to the left state.
co_await coroutine::return_exception_ptr(_join_node_response_done.get_shared_future().get_exception());
}
co_return join_node_response_result{};
}
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
_gossiper.get_mutable_address_map().force_drop_expiring_entries();
}
try {
co_return co_await std::visit(overloaded_functor {
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", utils::wait_for_message(5min));
// Do a read barrier to read/initialize the topology state
co_await _group0->group0_server_with_timeouts().read_barrier(&_group0_as, raft_timeout{});
// Calculate nodes to ignore
// TODO: ignore_dead_nodes setting for bootstrap
std::unordered_set<raft::server_id> ignored_ids = _topology_state_machine._topology.ignored_nodes;
auto my_request_it =
_topology_state_machine._topology.req_param.find(_group0->load_my_id());
if (my_request_it != _topology_state_machine._topology.req_param.end()) {
if (auto* replace = std::get_if<service::replace_param>(&my_request_it->second)) {
ignored_ids.insert(replace->replaced_id);
}
}
// After this RPC finishes, repair or streaming will be run, and
// both of them require this node to see the normal nodes as UP.
// This condition might not be true yet as this information is
// propagated through gossip. In order to reduce the chance of
// repair/streaming failure, wait here until we see normal nodes
// as UP (or the timeout elapses).
auto sync_nodes = _topology_state_machine._topology.normal_nodes | std::views::keys
| std::ranges::views::filter([ignored_ids] (raft::server_id id) { return !ignored_ids.contains(id); })
| std::views::transform([] (raft::server_id id) { return locator::host_id{id.uuid()}; })
| std::ranges::to<std::vector<locator::host_id>>();
rtlogger.info("coordinator accepted request to join, "
"waiting for nodes {} to be alive before responding and continuing",
sync_nodes);
co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout);
rtlogger.info("nodes {} are alive", sync_nodes);
// Unblock waiting join_node_rpc_handshaker::post_server_start,
// which will start the raft server and continue
_join_node_response_done.set_value();
co_return join_node_response_result{};
},
[&] (const join_node_response_params::rejected& rej) -> future<join_node_response_result> {
auto eptr = std::make_exception_ptr(std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason)));
_join_node_response_done.set_exception(std::move(eptr));
co_return join_node_response_result{};
},
}, params.response);
} catch (...) {
auto eptr = std::current_exception();
rtlogger.warn("error while handling the join response from the topology coordinator. "
"The node will not join the cluster. Error: {}", eptr);
_join_node_response_done.set_exception(std::move(eptr));
throw;
}
}
future<utils::chunked_vector<canonical_mutation>> storage_service::get_system_mutations(schema_ptr schema) {
utils::chunked_vector<canonical_mutation> result;
auto rs = co_await db::system_keyspace::query_mutations(_db, schema);
result.reserve(rs->partitions().size());
for (const auto& p : rs->partitions()) {
result.emplace_back(co_await make_canonical_mutation_gently(co_await unfreeze_gently(p.mut(), schema)));
}
co_return result;
}
future<utils::chunked_vector<canonical_mutation>> storage_service::get_system_mutations(const sstring& ks_name, const sstring& cf_name) {
auto s = _db.local().find_schema(ks_name, cf_name);
return get_system_mutations(s);
}
node_state storage_service::get_node_state(locator::host_id id) {
if (this_shard_id() != 0) {
on_internal_error(rtlogger, "cannot access node state on non zero shard");
}
auto rid = raft::server_id{id.uuid()};
if (!_topology_state_machine._topology.contains(rid)) {
on_internal_error(rtlogger, format("unknown node {}", rid));
}
auto p = _topology_state_machine._topology.find(rid);
if (!p) {
return node_state::left;
}
return p->second.state;
}
void storage_service::init_messaging_service() {
ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
std::optional<locator::host_id> coordinator_host_id;
if (const auto* id = cinfo.retrieve_auxiliary_opt<locator::host_id>("host_id")) {
coordinator_host_id = *id;
}
return container().invoke_on(0, [coordinator, coordinator_host_id, req = std::move(req)] (auto& ss) mutable {
return ss.node_ops_cmd_handler(coordinator, coordinator_host_id, std::move(req));
});
});
auto handle_raft_rpc = [this] (raft::server_id dst_id, auto handler) {
return container().invoke_on(0, [dst_id, handler = std::move(handler)] (auto& ss) mutable {
if (!ss._group0 || !ss._group0->joined_group0()) {
throw std::runtime_error("The node did not join group 0 yet");
}
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
}
return handler(ss);
});
};
ser::streaming_rpc_verbs::register_tablet_stream_files(&_messaging.local(),
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
streaming::stream_files_response resp;
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
co_return res.stream_bytes;
},
size_t(0),
std::plus<size_t>());
co_return resp;
});
ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd");
return ss.raft_topology_cmd_handler(term, cmd_index, cmd);
});
});
ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future<raft_snapshot> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot");
utils::chunked_vector<canonical_mutation> mutations;
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(ss._abort_source);
// We may need to send additional raft-based tables to the requester that
// are not indicated in the parameter.
// For example, when a node joins, it requests a snapshot before it knows
// which features are enabled, so it doesn't know yet if these tables exist
// on other nodes.
// In the current "legacy" mode we assume the requesting node sends 2 RPCs - one for
// topology tables and one for auth tables, service levels, and additional tables.
// When we detect it's the second RPC, we add additional tables based on our feature flags.
// In the future we want to deprecate this parameter, so this condition should
// apply only for "legacy" snapshot pull RPCs.
std::vector<table_id> additional_tables;
if (params.tables.size() > 0 && params.tables[0] != db::system_keyspace::topology()->id()) {
if (ss._feature_service.view_build_status_on_group0) {
additional_tables.push_back(db::system_keyspace::view_build_status_v2()->id());
}
if (ss._feature_service.compression_dicts) {
additional_tables.push_back(db::system_keyspace::dicts()->id());
}
if (ss._feature_service.view_building_coordinator) {
additional_tables.push_back(db::system_keyspace::view_building_tasks()->id());
}
if (ss._feature_service.cdc_with_tablets) {
additional_tables.push_back(db::system_keyspace::cdc_streams_state()->id());
additional_tables.push_back(db::system_keyspace::cdc_streams_history()->id());
}
if (ss._feature_service.client_routes) {
additional_tables.push_back(db::system_keyspace::client_routes()->id());
}
}
for (const auto& table : boost::join(params.tables, additional_tables)) {
auto schema = ss._db.local().find_schema(table);
auto muts = co_await ss.get_system_mutations(schema);
if (table == db::system_keyspace::cdc_generations_v3()->id()) {
utils::get_local_injector().inject("cdc_generation_mutations_topology_snapshot_replication",
[target_size=ss._db.local().schema_commitlog()->max_record_size() * 2, &muts] {
// Copy mutations n times, where n is picked so that the memory size of all mutations
// together exceeds `schema_commitlog()->max_record_size()`.
// We multiply by two to account for all possible deltas (like segment::entry_overhead_size).
size_t current_size = 0;
for (const auto& m: muts) {
current_size += m.representation().size();
}
const auto number_of_copies = (target_size / current_size + 1) * 2;
muts.reserve(muts.size() * number_of_copies);
const auto it_begin = muts.begin();
const auto it_end = muts.end();
for (unsigned i = 0; i < number_of_copies; ++i) {
std::copy(it_begin, it_end, std::back_inserter(muts));
}
});
}
mutations.reserve(mutations.size() + muts.size());
std::move(muts.begin(), muts.end(), std::back_inserter(mutations));
}
auto sl_driver_created_mut = co_await ss._sys_ks.local().get_service_level_driver_created_mutation();
if (sl_driver_created_mut) {
mutations.push_back(canonical_mutation(*sl_driver_created_mut));
}
auto sl_version_mut = co_await ss._sys_ks.local().get_service_levels_version_mutation();
if (sl_version_mut) {
mutations.push_back(canonical_mutation(*sl_version_mut));
}
auto auth_version_mut = co_await ss._sys_ks.local().get_auth_version_mutation();
if (auth_version_mut) {
mutations.emplace_back(*auth_version_mut);
}
auto view_builder_version_mut = co_await ss._sys_ks.local().get_view_builder_version_mutation();
if (view_builder_version_mut) {
mutations.emplace_back(*view_builder_version_mut);
}
auto vb_processing_base_mut = co_await ss._sys_ks.local().get_view_building_processing_base_id_mutation();
if (vb_processing_base_mut) {
mutations.emplace_back(*vb_processing_base_mut);
}
co_return raft_snapshot{
.mutations = std::move(mutations),
};
});
});
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.stream_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_tablet_repair(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet, rpc::optional<service::session_id> session_id) {
return handle_raft_rpc(dst_id, [tablet, session_id = session_id.value_or(service::session_id::create_null_id())] (auto& ss) -> future<service::tablet_operation_repair_result> {
auto res = co_await ss.repair_tablet(tablet, session_id);
co_return res;
});
});
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.cleanup_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) {
return handle_raft_rpc(dst_id, [] (auto& ss) mutable {
return ss.load_stats_for_tablet_based_tables();
});
});
ser::storage_service_rpc_verbs::register_table_load_stats_v1(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) {
return handle_raft_rpc(dst_id, [] (auto& ss) mutable {
return ss.load_stats_for_tablet_based_tables().then([] (auto stats) {
return locator::load_stats_v1{ .tables = std::move(stats.tables) };
});
});
});
ser::storage_service_rpc_verbs::register_estimate_sstable_volume(&_messaging.local(), [this] (table_id t_id) -> future<uint64_t> {
co_return co_await _db.map_reduce0(seastar::coroutine::lambda([&] (replica::database& local_db) -> future<uint64_t> {
uint64_t result = 0;
auto& t = local_db.get_tables_metadata().get_table(t_id);
auto snap = co_await t.take_sstable_set_snapshot();
for (const auto& sst : snap) {
result += sst.get()->data_size();
}
co_return result;
}), uint64_t(0), std::plus());
});
ser::storage_service_rpc_verbs::register_sample_sstables(&_messaging.local(), [this] (table_id table, uint64_t chunk_size, uint64_t n_chunks) -> future<utils::chunked_vector<temporary_buffer<char>>> {
return _db.local().sample_data_files(table, chunk_size, n_chunks);
});
ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request");
return ss.join_node_request_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) {
return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future<join_node_response_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_response");
co_await ss._join_node_group0_started.get_shared_future(ss._group0_as);
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
}
co_return co_await ss.join_node_response_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) {
return handle_raft_rpc(dst_id, [] (auto& ss) -> future<join_node_query_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query");
auto result = join_node_query_result{
.topo_mode = join_node_query_result::topology_mode::raft
};
return make_ready_future<join_node_query_result>(std::move(result));
});
});
ser::join_node_rpc_verbs::register_notify_banned(&_messaging.local(), [this] (const rpc::client_info& cinfo, raft::server_id dst_id) {
auto src_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(0, [src_id, dst_id] (auto& ss) -> future<rpc::no_wait_type> {
if (ss.my_host_id() != locator::host_id{dst_id.uuid()}) {
rtlogger.warn("received notify_banned from {} for {}, but my id is {}, ignoring", src_id, dst_id, ss.my_host_id());
} else if (ss._topology_state_machine._topology.tstate != topology::transition_state::left_token_ring &&
!ss._topology_state_machine._topology.left_nodes.contains(dst_id) && !ss._group0_as.abort_requested()) {
// Ignore rpc if the node is already shutting down or during decommissioning because
// the node is expected to shut itself down after being banned.
rtlogger.info("received notification of being banned from the cluster from {}, terminating.", src_id);
_exit(0);
}
co_return rpc::no_wait_type{};
});
});
}
future<> storage_service::uninit_messaging_service() {
return when_all_succeed(
ser::node_ops_rpc_verbs::unregister(&_messaging.local()),
ser::storage_service_rpc_verbs::unregister(&_messaging.local()),
ser::join_node_rpc_verbs::unregister(&_messaging.local()),
ser::streaming_rpc_verbs::unregister_tablet_stream_files(&_messaging.local())
).discard_result();
}
void storage_service::do_isolate_on_error(disk_error type)
{
if (!std::exchange(_isolated, true)) {
slogger.error("Shutting down communications due to I/O errors until operator intervention: {} error: {}", type == disk_error::commit ? "Commitlog" : "Disk", std::current_exception());
// isolated protect us against multiple stops on _this_ shard
//FIXME: discarded future.
(void)isolate();
}
}
future<> storage_service::isolate() {
auto src_shard = this_shard_id();
// this invokes on shard 0. So if we get here _from_ shard 0,
// we _should_ do the stop. If we call from another shard, we
// should test-and-set again to avoid double shutdown.
return run_with_no_api_lock([src_shard] (storage_service& ss) {
// check again to ensure secondary shard does not race
if (src_shard == this_shard_id() || !std::exchange(ss._isolated, true)) {
return ss.stop_transport();
}
return make_ready_future<>();
});
}
future<sstring> storage_service::get_removal_status() {
return run_with_no_api_lock([] (storage_service& ss) {
return make_ready_future<sstring>(sstring("No token removals in process."));
});
}
future<> storage_service::force_remove_completion() {
return make_exception_future<>(std::runtime_error("The unsafe nodetool removenode force is not supported anymore"));
}
future<dht::token_range_vector>
storage_service::get_ranges_for_endpoint(const locator::effective_replication_map& erm, const locator::host_id& ep) const {
return erm.get_ranges(ep);
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const {
auto& table = _db.local().find_column_family(keyspace, cf);
const auto schema = table.schema();
auto pk = partition_key::from_nodetool_style_string(schema, key);
return get_natural_endpoints(keyspace, schema, table, pk);
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const std::vector<sstring>& key_components) const {
auto& table = _db.local().find_column_family(keyspace, cf);
const auto schema = table.schema();
auto pk = partition_key::from_string_components(schema, key_components);
return get_natural_endpoints(keyspace, schema, table, pk);
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const {
dht::token token = schema->get_partitioner().get_token(*schema, pk.view());
const auto& ks = _db.local().find_keyspace(keyspace);
host_id_vector_replica_set replicas;
if (ks.uses_tablets()) {
replicas = cf.get_effective_replication_map()->get_natural_replicas(token);
} else {
replicas = ks.get_static_effective_replication_map()->get_natural_replicas(token);
}
return replicas | std::views::transform([&] (locator::host_id id) { return _address_map.get(id); }) | std::ranges::to<inet_address_vector_replica_set>();
}
future<> endpoint_lifecycle_notifier::notify_down(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_down(endpoint, hid);
} catch (...) {
slogger.warn("Down notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_down(inet_address endpoint, locator::host_id hid) {
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0}, hid);
return ss._lifecycle_notifier.notify_down(endpoint, hid);
});
slogger.debug("Notify node {}/{} has been down", endpoint, hid);
}
future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_leave_cluster(endpoint, hid);
} catch (...) {
slogger.warn("Leave cluster notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> endpoint_lifecycle_notifier::notify_released(locator::host_id hid) {
return seastar::async([this, hid] {
_subscribers.thread_for_each([hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_released(hid);
} catch (...) {
slogger.warn("Node released notification failed {}: {}", hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_left(inet_address endpoint, locator::host_id hid) {
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_left(endpoint, hid);
});
slogger.debug("Notify node {} has left the cluster", endpoint);
}
future<> storage_service::notify_released(locator::host_id hid) {
co_await container().invoke_on_all([hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_released(hid);
});
slogger.debug("Notify node {} been released from the cluster and no longer owns any tokens", hid);
}
future<> endpoint_lifecycle_notifier::notify_up(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_up(endpoint, hid);
} catch (...) {
slogger.warn("Up notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_up(inet_address endpoint, locator::host_id hid) {
if (!_gossiper.is_cql_ready(hid) || !_gossiper.is_alive(hid)) {
co_return;
}
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_up(endpoint, hid);
});
slogger.debug("Notify node {}/{} has been up", endpoint, hid);
}
future<> endpoint_lifecycle_notifier::notify_joined(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_join_cluster(endpoint, hid);
} catch (...) {
slogger.warn("Join cluster notification failed {}/{}: {}", endpoint, hid,std::current_exception());
}
});
});
}
future<> endpoint_lifecycle_notifier::notify_client_routes_change(const client_routes_service::client_route_keys& client_route_keys) {
co_await seastar::async([this, &client_route_keys] {
_subscribers.thread_for_each([&client_route_keys] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_client_routes_change(client_route_keys);
} catch (...) {
slogger.warn("Client routes notification failed: {}", std::current_exception());
}
});
});
}
future<> storage_service::notify_joined(inet_address endpoint, locator::host_id hid) {
co_await utils::get_local_injector().inject(
"storage_service_notify_joined_sleep", std::chrono::milliseconds{500});
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_joined(endpoint, hid);
});
slogger.debug("Notify node {}/{} has joined the cluster", endpoint, hid);
}
future<> storage_service::remove_rpc_client_with_ignored_topology(inet_address endpoint, locator::host_id id) {
return container().invoke_on_all([endpoint, id] (auto&& ss) {
ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0}, id);
});
}
future<> storage_service::notify_cql_change(inet_address endpoint, locator::host_id hid, bool ready) {
if (ready) {
co_await notify_up(endpoint, hid);
} else {
co_await notify_down(endpoint, hid);
}
}
future<> storage_service::notify_client_routes_change(const client_routes_service::client_route_keys& client_route_keys) {
co_await _client_routes.local().notify_client_routes_change(client_route_keys);
}
future<bool> storage_service::is_vnodes_cleanup_allowed(sstring keyspace) {
return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) {
const auto my_id = ss.get_token_metadata().get_my_id();
const auto pending_ranges = ss._db.local().find_keyspace(keyspace).get_static_effective_replication_map()->has_pending_ranges(my_id);
const bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP;
slogger.debug("is_vnodes_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}",
keyspace, is_bootstrap_mode, pending_ranges);
return !is_bootstrap_mode && !pending_ranges;
});
}
bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason reason) {
static const std::unordered_map<sstring, streaming::stream_reason> reason_map{
{"replace", streaming::stream_reason::replace},
{"bootstrap", streaming::stream_reason::bootstrap},
{"decommission", streaming::stream_reason::decommission},
{"removenode", streaming::stream_reason::removenode},
{"rebuild", streaming::stream_reason::rebuild},
};
const sstring& enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
std::vector<sstring> enabled_list = utils::split_comma_separated_list(enabled_list_str);
std::unordered_set<streaming::stream_reason> enabled_set;
for (const sstring& op : enabled_list) {
try {
auto it = reason_map.find(op);
if (it != reason_map.end()) {
enabled_set.insert(it->second);
} else {
throw std::invalid_argument(::format("unsupported operation name: {}", op));
}
} catch (...) {
throw std::invalid_argument(::format("Failed to parse allowed_repair_based_node_ops parameter [{}]: {}",
enabled_list_str, std::current_exception()));
}
}
bool global_enabled = _db.local().get_config().enable_repair_based_node_ops();
slogger.info("enable_repair_based_node_ops={}, allowed_repair_based_node_ops={{{}}}", global_enabled, fmt::join(enabled_set, ", "));
return global_enabled && enabled_set.contains(reason);
}
future<> storage_service::start_maintenance_mode() {
set_mode(mode::MAINTENANCE);
return mutate_token_metadata([this] (mutable_token_metadata_ptr token_metadata) -> future<> {
token_metadata->update_topology(my_host_id(), _snitch.local()->get_location(), locator::node::state::normal, smp::count);
return token_metadata->update_normal_tokens({ dht::token{} }, my_host_id());
}, acquire_merge_lock::yes);
}
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
_protocol_servers.push_back(&server);
if (start_instantly) {
co_await server.start_server();
}
}
std::vector<table_id> storage_service::get_tables_with_cdc_tablet_streams() const {
return _cdc_gens.local().get_cdc_metadata().get_tables_with_cdc_tablet_streams();
}
future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f) {
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
}
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
}
future<> storage_service::on_cleanup_for_drop_table(const table_id& id) {
co_await container().invoke_on_all([id] (storage_service& ss) {
if (ss._repair.local_is_initialized()) {
ss._repair.local().on_cleanup_for_drop_table(id);
}
});
co_return;
}
} // namespace service