Files
scylladb/service/storage_service.cc
Patryk Jędrzejczak 9a9202c909 Merge 'Remove gossiper topology code' from Gleb Natapov
The PR removes most of the code that assumes that group0 and raft topology is not enabled. It also makes sure that joining a cluster in no raft mode or upgrading a node in a cluster that not yet uses raft topology to this version will fail.

Refs #15422

No backport needed since this removes functionality.

Closes scylladb/scylladb#28514

* https://github.com/scylladb/scylladb:
  group0: fix indentation after previous patch
  raft_group0: simplify get_group0_upgrade_state function since no upgrade can happen any more
  raft_group0: move service::group0_upgrade_state to use fmt::formatter instead of iostream
  raft_group0: remove unused code from raft_group0
  node_ops: remove topology over node ops code
  topology: fix indentation after the previous patch
  topology: drop topology_change_enabled parameter from raft_group0 code
  storage_service: remove unused handle_state_* functions
  gossiper: drop wait_for_gossip_to_settle and deprecate correspondent option
  storage_service: fix indentation after the last patch
  storage_service: remove gossiper bootstrapping code
  storage_service: drop get_group_server_if_raft_topolgy_enabled
  storage_service: drop is_topology_coordinator_enabled and its uses
  storage_service: drop run_with_api_lock_in_gossiper_mode_only
  topology: remove code that assumes raft_topology_change_enabled() may return false
  test: schema_change_test: make test_schema_digest_does_not_change_with_disabled_features tests run in raft mode
  test: schema_change_test: drop schema tests relevant for no raft mode only
  topology: remove upgrade to raft topology code
  group0: remove upgrade to group0 code
  group0: refuse to boot if a cluster is still is not in a raft topology mode
  storage_service: refuse to join a cluster in legacy mode
2026-02-27 14:43:41 +01:00

6651 lines
328 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "storage_service.hh"
#include "db/view/view_building_worker.hh"
#include "utils/chunked_vector.hh"
#include <seastar/core/shard_id.hh>
#include "db/view/view_building_coordinator.hh"
#include "utils/disk_space_monitor.hh"
#include "compaction/task_manager_module.hh"
#include "gc_clock.hh"
#include "raft/raft.hh"
#include "auth/cache.hh"
#include <ranges>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sleep.hh>
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/qos/service_level_controller.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "locator/token_metadata.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <chrono>
#include <exception>
#include <optional>
#include <fmt/ranges.h>
#include <seastar/core/sharded.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/as_future.hh>
#include "gms/endpoint_state.hh"
#include "locator/snitch_base.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include <seastar/core/when_all.hh>
#include "service/tablet_allocator.hh"
#include "locator/types.hh"
#include "locator/tablets.hh"
#include "dht/auto_refreshing_sharder.hh"
#include "mutation_writer/multishard_writer.hh"
#include "locator/tablet_metadata_guard.hh"
#include "replica/tablet_mutation_builder.hh"
#include <seastar/core/smp.hh>
#include "mutation/canonical_mutation.hh"
#include "mutation/async_utils.hh"
#include <seastar/core/on_internal_error.hh>
#include "service/strong_consistency/groups_manager.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/topology_state_machine.hh"
#include "utils/assert.hh"
#include "utils/UUID.hh"
#include "utils/to_string.hh"
#include "gms/inet_address.hh"
#include "utils/log.hh"
#include "service/migration_manager.hh"
#include "service/raft/raft_group0.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include <seastar/core/thread.hh>
#include <algorithm>
#include "locator/local_strategy.hh"
#include "utils/user_provided_param.hh"
#include "version.hh"
#include "streaming/stream_blob.hh"
#include "dht/range_streamer.hh"
#include <boost/range/algorithm.hpp>
#include <boost/range/join.hpp>
#include "transport/server.hh"
#include <seastar/core/rwlock.hh>
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "db/hints/manager.hh"
#include "utils/exceptions.hh"
#include "message/messaging_service.hh"
#include "supervisor.hh"
#include "compaction/compaction_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "db/view/view_builder.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include <seastar/core/metrics.hh>
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include "gms/generation-number.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/exception.hh>
#include "utils/pretty_printers.hh"
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include "locator/util.hh"
#include "idl/storage_service.dist.hh"
#include "idl/streaming.dist.hh"
#include "service/storage_proxy.hh"
#include "service/raft/join_node.hh"
#include "idl/join_node.dist.hh"
#include "idl/migration_manager.dist.hh"
#include "idl/node_ops.dist.hh"
#include "transport/protocol_server.hh"
#include "node_ops/node_ops_ctl.hh"
#include "node_ops/task_manager_module.hh"
#include "service/task_manager_module.hh"
#include "service/topology_mutation.hh"
#include "cql3/query_processor.hh"
#include "service/qos/service_level_controller.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include <csignal>
#include "utils/labels.hh"
#include "view_info.hh"
#include "raft/raft.hh"
#include "debug.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <stdexcept>
#include <unistd.h>
#include <variant>
#include <utility>
using token = dht::token;
using UUID = utils::UUID;
using inet_address = gms::inet_address;
extern logging::logger cdc_log;
namespace service {
static logging::logger slogger("storage_service");
static thread_local session_manager topology_session_manager;
session_manager& get_topology_session_manager() {
return topology_session_manager;
}
namespace {
[[nodiscard]] locator::host_id_or_endpoint_list string_list_to_endpoint_list(const std::vector<sstring>& src_node_strings) {
locator::host_id_or_endpoint_list resulting_node_list;
resulting_node_list.reserve(src_node_strings.size());
for (const sstring& n : src_node_strings) {
try {
resulting_node_list.emplace_back(n);
} catch (...) {
throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", src_node_strings, n, std::current_exception()));
}
}
return resulting_node_list;
}
[[nodiscard]] locator::host_id_or_endpoint_list parse_node_list(const std::string_view comma_separated_list) {
return string_list_to_endpoint_list(utils::split_comma_separated_list(comma_separated_list));
}
void check_raft_rpc_scheduling_group(const replica::database& db, const gms::feature_service& feature_service, const std::string_view rpc_name) {
if (!feature_service.enforced_raft_rpc_scheduling_group) {
return;
}
if (current_scheduling_group() != debug::gossip_scheduling_group) {
on_internal_error_noexcept(
slogger, seastar::format("Raft group0 RPCs should be executed in the gossip scheduling group, current group is [{}], operation [{}].",
current_scheduling_group().name(), rpc_name));
}
}
} // namespace
static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30};
storage_service::storage_service(abort_source& abort_source,
sharded<replica::database>& db, gms::gossiper& gossiper,
sharded<db::system_keyspace>& sys_ks,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
gms::feature_service& feature_service,
sharded<service::migration_manager>& mm,
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
endpoint_lifecycle_notifier& elc_notif,
sharded<db::batchlog_manager>& bm,
sharded<locator::snitch_ptr>& snitch,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<cdc::generation_service>& cdc_gens,
sharded<db::view::view_builder>& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller,
auth::cache& auth_cache,
sharded<client_routes_service>& client_routes,
topology_state_machine& topology_state_machine,
db::view::view_building_state_machine& view_building_state_machine,
tasks::task_manager& tm,
gms::gossip_address_map& address_map,
std::function<future<void>(std::string_view)> compression_dictionary_updated_callback,
utils::disk_space_monitor* disk_space_monitor,
strong_consistency::groups_manager& groups_manager
)
: _abort_source(abort_source)
, _feature_service(feature_service)
, _db(db)
, _gossiper(gossiper)
, _messaging(ms)
, _migration_manager(mm)
, _qp(qp)
, _repair(repair)
, _stream_manager(stream_manager)
, _snitch(snitch)
, _sl_controller(sl_controller)
, _auth_cache(auth_cache)
, _client_routes(client_routes)
, _group0(nullptr)
, _async_gate("storage_service")
, _node_ops_module(make_shared<node_ops::task_manager_module>(tm, *this))
, _tablets_module(make_shared<service::task_manager_module>(tm, *this))
, _global_topology_requests_module(make_shared<service::topo::task_manager_module>(tm))
, _address_map(address_map)
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
, _lifecycle_notifier(elc_notif)
, _batchlog_manager(bm)
, _sys_ks(sys_ks)
, _sys_dist_ks(sys_dist_ks)
, _snitch_reconfigure([this] {
return container().invoke_on(0, [] (auto& ss) {
return ss.snitch_reconfigured();
});
})
, _tablet_allocator(tablet_allocator)
, _cdc_gens(cdc_gens)
, _view_builder(view_builder)
, _view_building_worker(view_building_worker)
, _topology_state_machine(topology_state_machine)
, _view_building_state_machine(view_building_state_machine)
, _compression_dictionary_updated_callback(std::move(compression_dictionary_updated_callback))
, _disk_space_monitor(disk_space_monitor)
, _groups_manager(groups_manager)
{
tm.register_module(_node_ops_module->get_name(), _node_ops_module);
tm.register_module(_tablets_module->get_name(), _tablets_module);
tm.register_module(_global_topology_requests_module->get_name(), _global_topology_requests_module);
if (this_shard_id() == 0) {
_node_ops_module->make_virtual_task<node_ops::node_ops_virtual_task>(*this);
_tablets_module->make_virtual_task<service::tablet_virtual_task>(*this);
_global_topology_requests_module->make_virtual_task<service::topo::global_topology_request_virtual_task>(*this);
}
register_metrics();
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_write_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(general_disk_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(commit_error.connect([this] { do_isolate_on_error(disk_error::commit); }))));
if (_snitch.local_is_initialized()) {
_listeners.emplace_back(make_lw_shared(_snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
init_messaging_service();
_migration_manager.local().plug_storage_service(*this);
}
storage_service::~storage_service() = default;
node_ops::task_manager_module& storage_service::get_node_ops_module() noexcept {
return *_node_ops_module;
}
auth::cache& storage_service::auth_cache() noexcept {
return _auth_cache;
}
enum class node_external_status {
UNKNOWN = 0,
STARTING = 1,
JOINING = 2,
NORMAL = 3,
LEAVING = 4,
DECOMMISSIONED = 5,
DRAINING = 6,
DRAINED = 7,
MOVING = 8, //deprecated
MAINTENANCE = 9
};
static node_external_status map_operation_mode(storage_service::mode m) {
switch (m) {
case storage_service::mode::NONE: return node_external_status::STARTING;
case storage_service::mode::STARTING: return node_external_status::STARTING;
case storage_service::mode::BOOTSTRAP: return node_external_status::JOINING;
case storage_service::mode::JOINING: return node_external_status::JOINING;
case storage_service::mode::NORMAL: return node_external_status::NORMAL;
case storage_service::mode::LEAVING: return node_external_status::LEAVING;
case storage_service::mode::DECOMMISSIONED: return node_external_status::DECOMMISSIONED;
case storage_service::mode::DRAINING: return node_external_status::DRAINING;
case storage_service::mode::DRAINED: return node_external_status::DRAINED;
case storage_service::mode::MOVING: return node_external_status::MOVING;
case storage_service::mode::MAINTENANCE: return node_external_status::MAINTENANCE;
}
return node_external_status::UNKNOWN;
}
void storage_service::register_metrics() {
if (this_shard_id() != 0) {
// the relevant data is distributed between the shards,
// We only need to register it once.
return;
}
namespace sm = seastar::metrics;
_metrics.add_group("node", {
sm::make_gauge("operation_mode", sm::description("The operation mode of the current node. UNKNOWN = 0, STARTING = 1, JOINING = 2, NORMAL = 3, "
"LEAVING = 4, DECOMMISSIONED = 5, DRAINING = 6, DRAINED = 7, MOVING = 8, MAINTENANCE = 9"), [this] {
return static_cast<std::underlying_type_t<node_external_status>>(map_operation_mode(_operation_mode));
})(basic_level),
});
}
bool storage_service::is_replacing() {
const auto& cfg = _db.local().get_config();
if (!cfg.replace_node_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace node on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
if (!cfg.replace_address_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace address on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
// Returning true if cfg.replace_address is provided
// will trigger an exception down the road if bootstrap_complete(),
// as it is an error to use this option post bootstrap.
// That said, we should just stop supporting it and force users
// to move to the new, replace_node_first_boot config option.
return !cfg.replace_address().empty();
}
bool storage_service::is_first_node() {
if (is_replacing()) {
return false;
}
auto seeds = _gossiper.get_seeds();
if (seeds.empty()) {
return false;
}
// Node with the smallest IP address is chosen as the very first node
// in the cluster. The first node is the only node that does not
// bootstrap in the cluster. All other nodes will bootstrap.
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
std::sort(sorted_seeds.begin(), sorted_seeds.end());
if (sorted_seeds.front() == get_broadcast_address()) {
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
return true;
}
return false;
}
bool storage_service::should_bootstrap() {
return !_sys_ks.local().bootstrap_complete() && !is_first_node();
}
/* Broadcasts the chosen tokens through gossip,
* together with a CDC generation timestamp and STATUS=NORMAL.
*
* Assumes that no other functions modify CDC_GENERATION_ID, TOKENS or STATUS
* in the gossiper's local application state while this function runs.
*/
static future<> set_gossip_tokens(gms::gossiper& g,
const std::unordered_set<dht::token>& tokens, std::optional<cdc::generation_id> cdc_gen_id) {
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
return g.add_local_application_state(
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(tokens)),
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(cdc_gen_id)),
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(tokens))
);
}
static locator::node::state to_topology_node_state(node_state ns) {
switch (ns) {
case node_state::bootstrapping: return locator::node::state::bootstrapping;
case node_state::decommissioning: return locator::node::state::being_decommissioned;
case node_state::removing: return locator::node::state::being_removed;
case node_state::normal: return locator::node::state::normal;
case node_state::left: return locator::node::state::left;
case node_state::replacing: return locator::node::state::replacing;
case node_state::rebuilding: return locator::node::state::normal;
case node_state::none: return locator::node::state::none;
}
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& host_id_to_ip_map, nodes_to_notify_after_sync* nodes_to_notify) {
const auto& t = _topology_state_machine._topology;
raft::server_id raft_id{id.uuid()};
std::vector<future<>> sys_ks_futures;
auto node = t.find(raft_id);
if (!node) {
co_return;
}
const auto& rs = node->second;
switch (rs.state) {
case node_state::normal: {
if (is_me(id)) {
co_return;
}
// In replace-with-same-ip scenario the replaced node IP will be the same
// as ours, we shouldn't put it into system.peers.
// Some state that is used to fill in 'peers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
auto info = get_peer_info_for_update(id);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
info->data_center = rs.datacenter;
info->rack = rs.rack;
info->release_version = rs.release_version;
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
}
if (nodes_to_notify) {
nodes_to_notify->joined.emplace_back(ip, id);
}
if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
slogger.info("crash-before-prev-ip-removed hit, killing the node");
_exit(1);
});
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
}
}
break;
case node_state::bootstrapping:
if (!is_me(ip)) {
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
_exit(1);
});
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
}
break;
case node_state::replacing:
// Save the mapping just like for bootstrap above, but only in the case of replace with different IP, so
// that we don't have to delete the row of the node being replaced. For replace with the same IP, the
// mapping is recovered on restart based on the state of the topology state machine and system.peers.
if (!is_me(ip)) {
auto replaced_id = std::get<replace_param>(t.req_param.at(raft_id)).replaced_id;
if (const auto it = host_id_to_ip_map.find(locator::host_id(replaced_id.uuid())); it == host_id_to_ip_map.end() || it->second != ip) {
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
}
}
break;
default:
break;
}
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
}
static std::unordered_set<locator::host_id> get_released_nodes(const service::topology& topology, const locator::token_metadata& tm) {
return boost::join(topology.left_nodes, topology.ignored_nodes)
| std::views::transform([] (const auto& raft_id) { return locator::host_id(raft_id.uuid()); })
| std::views::filter([&] (const auto& h) { return !tm.get_topology().has_node(h); })
| std::ranges::to<std::unordered_set<locator::host_id>>();
}
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal, std::optional<std::unordered_set<locator::host_id>> prev_released) {
nodes_to_notify_after_sync nodes_to_notify;
rtlogger.trace("Start sync_raft_topology_nodes");
const auto& t = _topology_state_machine._topology;
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
};
std::vector<future<>> sys_ks_futures;
auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, bool notify) -> future<> {
if (ip) {
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
co_await _gossiper.force_remove_endpoint(host_id, gms::null_permit_id);
if (notify) {
nodes_to_notify.left.push_back({*ip, host_id});
}
}
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
update_topology(host_id, t.left_nodes_rs.at(id));
}
};
auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (is_me(host_id)) {
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
co_await _gossiper.add_local_application_state(
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
);
}
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
};
auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
seastar::value_of([&] () -> sstring {
return rs.ring ? ::format("{}", rs.ring->tokens) : sstring("null");
}));
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
update_topology(host_id, rs);
if (_topology_state_machine._topology.normal_nodes.empty()) {
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
break;
case node_state::decommissioning:
[[fallthrough]];
case node_state::removing:
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
break;
}
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, host_id, ip, rs);
break;
}
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
tmptr->add_leaving_endpoint(host_id);
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
break;
case node_state::replacing: {
if (!_topology_state_machine._topology.req_param.contains(id)) {
on_internal_error(rtlogger, format("No request parameters for replacing node {}", id));
}
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
if (rs.ring.has_value()) {
update_topology(host_id, rs);
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, existing_ip.value_or(gms::inet_address{}), id, ip));
}
}
break;
case node_state::rebuilding:
// Rebuilding node is normal
co_await process_normal_node(id, host_id, ip, rs);
break;
default:
on_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
};
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
auto id_to_ip_map = co_await _sys_ks.local().get_host_id_to_ip_map();
for (const auto& id: t.left_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_left_node(id, host_id, ip, id_to_ip_map.find(host_id) != id_to_ip_map.end());
}
for (const auto& [id, rs]: t.normal_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_normal_node(id, host_id, ip, rs);
if (ip) {
auto it = id_to_ip_map.find(host_id);
bool notify = it == id_to_ip_map.end() || it->second != ip || !prev_normal.contains(id);
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, notify ? &nodes_to_notify : nullptr));
}
}
for (const auto& [id, rs]: t.transition_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_transition_node(id, host_id, ip, rs);
if (ip) {
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr));
}
}
for (auto id : t.excluded_tablet_nodes) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
n->set_excluded(true);
}
}
for (auto [node, req] : t.requests) {
if (req == topology_request::leave || req == topology_request::remove) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(node.uuid()));
if (n) {
n->set_draining(true);
}
}
}
if (prev_released) {
auto nodes_to_release = get_released_nodes(t, *tmptr);
std::erase_if(nodes_to_release, [&] (const auto& host_id) { return prev_released->contains(host_id); });
std::copy(nodes_to_release.begin(), nodes_to_release.end(), std::back_inserter(nodes_to_notify.released));
}
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
rtlogger.trace("End sync_raft_topology_nodes");
co_return nodes_to_notify;
}
future<> storage_service::notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify) {
for (auto host_id : nodes_to_notify.released) {
co_await notify_released(host_id);
}
for (auto [ip, host_id] : nodes_to_notify.left) {
co_await notify_left(ip, host_id);
}
for (auto [ip, host_id] : nodes_to_notify.joined) {
co_await notify_joined(ip, host_id);
}
}
future<> storage_service::topology_state_load(state_change_hint hint) {
#ifdef SEASTAR_DEBUG
static bool running = false;
SCYLLA_ASSERT(!running); // The function is not re-entrant
auto d = defer([] {
running = false;
});
running = true;
#endif
co_await utils::get_local_injector().inject("topology_state_load_error", [] {
return std::make_exception_ptr(std::runtime_error("topology_state_load_error"));
});
rtlogger.debug("reload raft topology state");
std::unordered_set<raft::server_id> prev_normal = _topology_state_machine._topology.normal_nodes | std::views::keys | std::ranges::to<std::unordered_set>();
std::optional<std::unordered_set<locator::host_id>> prev_released;
if (!_topology_state_machine._topology.is_empty()) {
prev_released = get_released_nodes(_topology_state_machine._topology, get_token_metadata());
}
std::unordered_set<locator::host_id> tablet_hosts = co_await replica::read_required_hosts(_qp);
// read topology state from disk and recreate token_metadata from it
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state(tablet_hosts);
_topology_state_machine.reload_count++;
auto& topology = _topology_state_machine._topology;
set_topology_change_kind(upgrade_state_to_topology_op_kind(topology.upgrade_state));
if (topology.upgrade_state != topology::upgrade_state_type::done) {
co_return;
}
if (_qp.auth_version < db::system_keyspace::auth_version_t::v2) {
// auth-v2 gets enabled when consistent topology changes are enabled
// (see topology::upgrade_state_type::done above) as we use the same migration procedure
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
qp.auth_version = db::system_keyspace::auth_version_t::v2;
});
co_await auth_cache().load_all();
}
if (!_sl_controller.local().is_v2()) {
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
sl_controller.upgrade_to_v2(_qp, _group0->client());
});
co_await _sl_controller.local().update_cache(qos::update_both_cache_levels::yes, qos::query_context::group0);
}
// the view_builder is migrated to v2 in view_builder::migrate_to_v2.
// it writes a v2 version mutation as topology_change, then we get here
// to update the service to start using the v2 table.
auto view_builder_version = co_await _sys_ks.local().get_view_builder_version();
switch (view_builder_version) {
case db::system_keyspace::view_builder_version_t::v1_5:
co_await _view_builder.invoke_on_all([] (db::view::view_builder& vb) -> future<> {
co_await vb.upgrade_to_v1_5();
});
break;
case db::system_keyspace::view_builder_version_t::v2:
co_await _view_builder.invoke_on_all([] (db::view::view_builder& vb) -> future<> {
co_await vb.upgrade_to_v2();
});
break;
default:
break;
}
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
return fs.enable(topology.enabled_features | std::ranges::to<std::set<std::string_view>>());
});
// Update the legacy `enabled_features` key in `system.scylla_local`.
// It's OK to update it after enabling features because `system.topology` now
// is the source of truth about enabled features.
co_await _sys_ks.local().save_local_enabled_features(topology.enabled_features, false);
auto saved_tmpr = get_token_metadata_ptr();
{
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = _shared_token_metadata.make_token_metadata_ptr();
tmptr->invalidate_cached_rings();
tmptr->set_version(topology.version);
const auto read_new = std::invoke([](std::optional<topology::transition_state> state) {
using read_new_t = locator::token_metadata::read_new_t;
if (!state.has_value()) {
return read_new_t::no;
}
switch (*state) {
case topology::transition_state::lock:
[[fallthrough]];
case topology::transition_state::join_group0:
[[fallthrough]];
case topology::transition_state::tablet_migration:
[[fallthrough]];
case topology::transition_state::tablet_split_finalization:
[[fallthrough]];
case topology::transition_state::tablet_resize_finalization:
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
case topology::transition_state::tablet_draining:
[[fallthrough]];
case topology::transition_state::write_both_read_old:
[[fallthrough]];
case topology::transition_state::left_token_ring:
[[fallthrough]];
case topology::transition_state::truncate_table:
[[fallthrough]];
case topology::transition_state::snapshot_tables:
[[fallthrough]];
case topology::transition_state::rollback_to_normal:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
}
}, topology.tstate);
tmptr->set_read_new(read_new);
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal), std::move(prev_released));
std::optional<locator::tablet_metadata> tablets;
if (hint.tablets_hint) {
// We want to update the tablet metadata incrementally, so copy it
// from the current token metadata and update only the changed parts.
tablets = co_await get_token_metadata().tablets().copy();
co_await replica::update_tablet_metadata(_db.local(), _qp, *tablets, *hint.tablets_hint);
} else {
tablets = co_await replica::read_tablet_metadata(_qp);
}
tablets->set_balancing_enabled(topology.tablet_balancing_enabled);
tmptr->set_tablets(std::move(*tablets));
if (_feature_service.parallel_tablet_draining) {
for (auto&& [node, req]: topology.requests) {
if (req == topology_request::leave || req == topology_request::remove) {
if (tmptr->tablets().has_replica_on(locator::host_id(node.uuid()))) {
topology.paused_requests.emplace(node, req);
}
}
}
}
co_await replicate_to_all_cores(std::move(tmptr));
co_await notify_nodes_after_sync(std::move(nodes_to_notify));
rtlogger.debug("topology_state_load: token metadata replication to all cores finished");
}
co_await update_fence_version(topology.fence_version);
// As soon as a node joins token_metadata.topology we
// need to drop all its rpc connections with ignored_topology flag.
{
std::vector<future<>> futures;
get_token_metadata_ptr()->get_topology().for_each_node([&](const locator::node& n) {
const auto ep = n.host_id();
if (auto ip_opt = _address_map.find(ep); ip_opt && !saved_tmpr->get_topology().has_node(ep)) {
futures.push_back(remove_rpc_client_with_ignored_topology(*ip_opt, n.host_id()));
}
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
for (const auto& gen_id : topology.committed_cdc_generations) {
rtlogger.trace("topology_state_load: process committed cdc generation {}", gen_id);
co_await utils::get_local_injector().inject("topology_state_load_before_update_cdc", [](auto& handler) -> future<> {
rtlogger.info("topology_state_load_before_update_cdc hit, wait for message");
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
});
co_await _cdc_gens.local().handle_cdc_generation(gen_id);
if (gen_id == topology.committed_cdc_generations.back()) {
co_await _sys_ks.local().update_cdc_generation_id(gen_id);
rtlogger.debug("topology_state_load: the last committed CDC generation ID: {}", gen_id);
}
}
// Ban all left and ignored nodes. We do not allow them to go back online.
co_await _messaging.local().ban_hosts(boost::join(topology.left_nodes, topology.ignored_nodes)
| std::views::transform([] (auto id) { return locator::host_id{id.uuid()}; })
| std::ranges::to<utils::chunked_vector<locator::host_id>>());
slogger.debug("topology_state_load: excluded tablet nodes: {}", topology.excluded_tablet_nodes);
}
future<> storage_service::topology_transition(state_change_hint hint) {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await topology_state_load(std::move(hint)); // reload new state
_topology_state_machine.event.broadcast();
}
future<> storage_service::view_building_state_load() {
rtlogger.debug("reload view building state");
auto filter_vnode_keyspace = [this] (std::string_view ks_name) {
// If the keyspace doesn't exist, also filter it out.
// It should be entry from vnode view, which hasn't been cleaned up yet.
// Entries from tablet-views should be removed in the same batch as drop keyspace/view mutations.
return _db.local().has_keyspace(ks_name) && _db.local().find_keyspace(ks_name).uses_tablets();
};
auto vb_tasks = co_await _sys_ks.local().get_view_building_tasks();
auto processing_base_table = co_await _sys_ks.local().get_view_building_processing_base_id();
std::map<table_id, std::vector<table_id>> views_per_base;
auto views = _db.local().get_views()
| std::views::filter([&] (const view_ptr& v) { return filter_vnode_keyspace(v->ks_name()); })
| std::views::transform([] (const view_ptr& v) { return std::make_pair(v->view_info()->base_id(), v->id()); });
for (const auto& [base_id, view_id]: views) {
views_per_base[base_id].push_back(view_id);
}
auto status_map = co_await _sys_ks.local().get_view_build_status_map()
| std::views::filter([&] (const auto& e) { return filter_vnode_keyspace(e.first.first); })
| std::views::transform([this] (const auto& e) { // convert (ks_name, view_name) to table_id
auto id = _db.local().find_schema(e.first.first, e.first.second)->id();
return std::make_pair(id, std::move(e.second));
})
| std::ranges::to<db::view::views_state::view_build_status_map>();
db::view::view_building_state building_state {std::move(vb_tasks), std::move(processing_base_table)};
db::view::views_state views_state {std::move(views_per_base), std::move(status_map)};
_view_building_state_machine.building_state = std::move(building_state);
_view_building_state_machine.views_state = std::move(views_state);
}
future<> storage_service::view_building_transition() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await view_building_state_load();
_view_building_state_machine.event.broadcast();
}
future<> storage_service::reload_raft_topology_state(service::raft_group0_client& group0_client) {
slogger.info("Waiting for group 0 read/apply mutex before reloading Raft topology state...");
auto holder = co_await group0_client.hold_read_apply_mutex(_abort_source);
slogger.info("Reloading Raft topology state");
// Using topology_transition() instead of topology_state_load(), because the former notifies listeners
co_await topology_transition();
slogger.info("Reloaded Raft topology state");
}
future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
auto it = std::partition(snp.mutations.begin(), snp.mutations.end(), [] (const canonical_mutation& m) {
return m.column_family_id() != db::system_keyspace::cdc_generations_v3()->id();
});
if (it != snp.mutations.end()) {
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
// Split big mutations into smaller ones, prepare frozen_muts_to_apply
utils::chunked_vector<frozen_mutation> frozen_muts_to_apply;
{
frozen_muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
const auto max_size = _db.local().schema_commitlog()->max_record_size() / 2;
for (auto i = it; i != snp.mutations.end(); i++) {
const auto& m = *i;
auto mut = co_await to_mutation_gently(m, s);
if (m.representation().size() <= max_size) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
});
}
}
}
// Apply non-atomically so as not to hit the commitlog size limit.
// The cdc_generations_v3 data is not used in any way until
// it's referenced from the topology table.
// By applying the cdc_generations_v3 mutations before topology mutations
// we ensure that the lack of atomicity isn't a problem here.
co_await max_concurrent_for_each(frozen_muts_to_apply, 128, [&] (const frozen_mutation& m) -> future<> {
return _db.local().apply(s, m, {}, db::commitlog::force_sync::yes, db::no_timeout);
});
}
// Apply system.topology and system.topology_requests mutations atomically
// to have a consistent state after restart
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(std::distance(snp.mutations.begin(), it));
for (auto cur = snp.mutations.begin(); cur != it; ++cur) {
const auto& m = *cur;
auto s = _db.local().find_schema(m.column_family_id());
// FIXME: in theory, we can generate a frozen_mutation
// directly from canonical_mutation rather than building
// a mutation and then freezing it.
muts.emplace_back(freeze(m.to_mutation(s)));
co_await coroutine::maybe_yield();
}
co_await _db.local().apply(muts, db::no_timeout);
}
future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) {
SCYLLA_ASSERT(this_shard_id() == 0);
if (_sl_controller.local().is_v2()) {
// Skip cache update unless the topology upgrade is done
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
}
}
future<> storage_service::compression_dictionary_updated_callback_all() {
auto all_dict_names = co_await _sys_ks.local().query_all_dict_names();
for (const auto& x : all_dict_names) {
co_await _compression_dictionary_updated_callback(x);
}
}
future<> storage_service::compression_dictionary_updated_callback(std::string_view name) {
assert(this_shard_id() == 0);
return _compression_dictionary_updated_callback(name);
}
future<> storage_service::load_cdc_streams(std::optional<std::unordered_set<table_id>> changed_tables) {
co_await _cdc_gens.local().load_cdc_tablet_streams(std::move(changed_tables));
}
// Moves the coroutine lambda onto the heap and extends its
// lifetime until the resulting future is completed.
// This allows to use captures in coroutine lambda after co_await-s.
// Without this helper the coroutine lambda is destroyed immediately after
// the caller (e.g. 'then' function implementation) has invoked it and got the future,
// so referencing the captures after co_await would be use-after-free.
template <typename Coro>
static auto ensure_alive(Coro&& coro) {
return [coro_ptr = std::make_unique<Coro>(std::move(coro))]<typename ...Args>(Args&&... args) mutable {
auto& coro = *coro_ptr;
return coro(std::forward<Args>(args)...).finally([coro_ptr = std::move(coro_ptr)] {});
};
}
// {{{ ip_address_updater
class storage_service::ip_address_updater: public gms::i_endpoint_state_change_subscriber {
gms::gossip_address_map& _address_map;
storage_service& _ss;
future<>
on_endpoint_change(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id, const char* ev) {
rslog.debug("ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id);
// If id maps to different ip in peers table it needs to be updated which is done by sync_raft_topology_nodes below
std::optional<gms::inet_address> prev_ip = co_await _ss._sys_ks.local().get_ip_from_peers_table(id);
if (_address_map.find(id) != endpoint) {
// Address map refused to update IP for the host_id,
// this means prev_ip has higher generation than endpoint.
// Do not update address.
co_return;
}
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
"old ip [{}], new ip [{}], "
"waiting for group 0 read/apply mutex before reloading Raft topology state...",
ev, id, prev_ip, endpoint);
// We're in a gossiper event handler, so gossiper is currently holding a lock
// for the endpoint parameter of on_endpoint_change.
// The topology_state_load function can also try to acquire gossiper locks.
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
(void)futurize_invoke(ensure_alive([this, id, prev_ip, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
// We need to call raft_topology_update_ip even if ip hasn't changed.
// Suppose a bootstrapping node A appears in the system.peers table of
// some other node B. Its record has only ID and IP of the node A, due to
// the special handling of bootstrapping nodes in raft_topology_update_ip.
// Suppose node B gets temporarily isolated from the topology coordinator.
// The topology coordinator fences out node B and successfully finishes
// bootstrapping of the node A. Later, when the connectivity is restored,
// topology_state_load runs on the node B, node A is already in
// normal state, but the gossiper on B might not yet have any state for
// it. In this case, raft_topology_update_ip would not update
// system.peers because the gossiper state is missing. Subsequently,
// on_join/on_restart/on_alive events would skip updates because the IP
// in gossiper matches the IP for that node in system.peers.
//
// If ip hasn't changed we set nodes_to_notify to nullptr since
// we don't need join events in this case.
nodes_to_notify_after_sync nodes_to_notify;
co_await _ss.raft_topology_update_ip(id, endpoint,
co_await _ss._sys_ks.local().get_host_id_to_ip_map(),
prev_ip == endpoint ? nullptr : &nodes_to_notify);
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
}));
}
public:
ip_address_updater(gms::gossip_address_map& address_map, storage_service& ss)
: _address_map(address_map)
, _ss(ss)
{}
virtual future<>
on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_join");
}
virtual future<>
on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_alive");
}
virtual future<>
on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, id, ep_state, permit_id, "on_restart");
}
};
// }}} ip_address_updater
future<> storage_service::sstable_vnodes_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
try {
co_await _topology_state_machine.event.when([&] {
auto me = _topology_state_machine._topology.find(server.id());
return me && me->second.cleanup == cleanup_status::running;
});
std::unordered_map<sstring, std::vector<table_info>> ks_tables;
{
// The scope for the guard
auto guard = co_await _group0->client().start_operation(_group0_as);
auto me = _topology_state_machine._topology.find(server.id());
// Recheck that cleanup is needed after the barrier
if (!me || me->second.cleanup != cleanup_status::running) {
rtlogger.trace("vnodes_cleanup triggered, but not needed");
continue;
}
rtlogger.info("start vnodes_cleanup");
// Skip tablets tables since they do their own cleanup and system tables
// since they are local and not affected by range movements.
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
ks_tables.reserve(ks_erms.size());
for (auto&& [ks_name, erm] : ks_erms) {
auto& ks = _db.local().find_keyspace(ks_name);
const auto& cf_meta_data = ks.metadata().get()->cf_meta_data();
std::vector<table_info> table_infos;
table_infos.reserve(cf_meta_data.size());
for (const auto& [name, schema] : cf_meta_data) {
table_infos.emplace_back(table_info{name, schema->id()});
}
ks_tables.emplace(std::move(ks_name), std::move(table_infos));
};
}
{
rtlogger.info("vnodes_cleanup: drain closing sessions");
co_await proxy.invoke_on_all([] (storage_proxy& sp) {
return get_topology_session_manager().drain_closing_sessions();
});
rtlogger.info("vnodes_cleanup: wait for stale pending writes");
co_await proxy.invoke_on_all([] (storage_proxy& sp) {
return sp.await_stale_pending_writes();
});
rtlogger.info("vnodes_cleanup: flush_all_tables");
co_await _db.invoke_on_all([&] (replica::database& db) {
return db.flush_all_tables();
});
co_await coroutine::parallel_for_each(ks_tables, [&](auto& item) -> future<> {
auto& [ks_name, table_infos] = item;
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
// we flush all tables before cleanup the keyspaces individually, so skip the flush-tables step here
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, ks_name, _db, table_infos, compaction::flush_mode::skip, tasks::is_user_task::no);
try {
rtlogger.info("vnodes_cleanup {} started", ks_name);
co_await task->done();
rtlogger.info("vnodes_cleanup {} finished", ks_name);
} catch (...) {
rtlogger.error("vnodes_cleanup failed keyspace={} tables={} failed: {}", task->get_status().keyspace, table_infos, std::current_exception());
throw;
}
});
}
rtlogger.info("vnodes_cleanup ended");
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.debug("vnodes_cleanup: cleanup flag cleared");
} catch (const seastar::abort_requested_exception&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (raft::request_aborted&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (const seastar::broken_condition_variable&) {
rtlogger.info("vnodes_cleanup fiber aborted");
break;
} catch (...) {
rtlogger.error("vnodes_cleanup fiber got an error: {}", std::current_exception());
err = true;
}
if (err) {
co_await sleep_abortable(std::chrono::seconds(1), _group0_as);
}
}
}
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder) {
std::optional<abort_source> as;
try {
while (!_group0_as.abort_requested()) {
// Wait for a state change in case we are not a leader yet, or we are are the leader
// and coordinator work is running (in which case 'as' is engaged)
while (!raft.is_leader() || as) {
co_await raft.wait_for_state_change(&_group0_as);
if (as) {
as->request_abort(); // we are no longer a leader, so abort the coordinator
co_await std::exchange(_topology_change_coordinator, make_ready_future<>());
as = std::nullopt;
try {
_tablet_allocator.local().on_leadership_lost();
} catch (...) {
rtlogger.error("tablet_allocator::on_leadership_lost() failed: {}", std::current_exception());
}
}
}
// We are the leader now but that can change any time!
as.emplace();
// start topology change coordinator in the background
_topology_change_coordinator = run_topology_coordinator(
_sys_dist_ks, _gossiper, _messaging.local(), _shared_token_metadata,
_sys_ks.local(), _db.local(), *_group0, _topology_state_machine, _view_building_state_machine, *as, raft,
std::bind_front(&storage_service::raft_topology_cmd_handler, this),
_tablet_allocator.local(),
_cdc_gens.local(),
get_ring_delay(),
_lifecycle_notifier,
_feature_service,
_sl_controller.local(),
_topology_cmd_rpc_tracker);
}
} catch (...) {
rtlogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception());
}
if (as) {
as->request_abort(); // abort current coordinator if running
co_await std::move(_topology_change_coordinator);
}
}
std::unordered_set<raft::server_id> storage_service::find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const {
std::unordered_set<raft::server_id> ids;
for (const auto& hoep : hoeps) {
std::optional<raft::server_id> id;
if (hoep.has_host_id()) {
id = raft::server_id{hoep.id().uuid()};
} else {
auto hid = _address_map.find_by_addr(hoep.endpoint());
if (!hid) {
throw std::runtime_error(::format("Cannot find a mapping to IP {}", hoep.endpoint()));
}
id = raft::server_id{hid->uuid()};
}
if (!_topology_state_machine._topology.contains(*id)) {
throw std::runtime_error(::format("Node {} is not found in the cluster", *id));
}
ids.insert(*id);
}
return ids;
}
std::unordered_set<raft::server_id> storage_service::ignored_nodes_from_join_params(const join_node_request_params& params) {
const locator::host_id_or_endpoint_list ignore_nodes_params = string_list_to_endpoint_list(params.ignore_nodes);
std::unordered_set<raft::server_id> ignored_nodes{find_raft_nodes_from_hoeps(ignore_nodes_params)};
if (params.replaced_id) {
// insert node that should be replaced to ignore list so that other topology operations
// can ignore it
ignored_nodes.insert(*params.replaced_id);
}
return ignored_nodes;
}
utils::chunked_vector<canonical_mutation> storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, utils::UUID old_request_id) {
topology_mutation_builder builder(write_timestamp);
auto ignored_nodes = ignored_nodes_from_join_params(params);
if (!ignored_nodes.empty()) {
auto bad_id = std::find_if_not(ignored_nodes.begin(), ignored_nodes.end(), [&] (auto n) {
return _topology_state_machine._topology.normal_nodes.contains(n);
});
if (bad_id != ignored_nodes.end()) {
throw std::runtime_error(::format("replace: there is no node with id {} in normal state. Cannot ignore it.", *bad_id));
}
builder.add_ignored_nodes(std::move(ignored_nodes));
}
auto& node_builder = builder.with_node(params.host_id)
.set("node_state", node_state::none)
.set("datacenter", params.datacenter)
.set("rack", params.rack)
.set("release_version", params.release_version)
.set("num_tokens", params.num_tokens)
.set("tokens_string", params.tokens_string)
.set("shard_count", params.shard_count)
.set("ignore_msb", params.ignore_msb)
.set("cleanup_status", cleanup_status::clean)
.set("supported_features", params.supported_features | std::ranges::to<std::set<sstring>>());
if (params.replaced_id) {
node_builder
.set("topology_request", topology_request::replace)
.set("replaced_id", *params.replaced_id);
} else {
node_builder
.set("topology_request", topology_request::join);
}
node_builder.set("request_id", params.request_id);
topology_request_tracking_mutation_builder rtbuilder(params.request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host", params.host_id.uuid())
.set("done", false);
rtbuilder.set("request_type", params.replaced_id ? topology_request::replace : topology_request::join);
utils::chunked_vector<canonical_mutation> muts = {builder.build(), rtbuilder.build()};
if (old_request_id) {
// If this is a replace operation, we need to mark the old request for replaced node as done if exists
// It should be safe to do so here since if the request is still pending it means the topology coordinator does not
// work on it yet, and if the topology coordinator will pick it up meanwhile the write of this new state will fail
topology_mutation_builder builder(write_timestamp);
builder.with_node(*params.replaced_id).del("topology_request");
topology_request_tracking_mutation_builder rtbuilder(old_request_id, _feature_service.topology_requests_type_column);
rtbuilder.done("node was replaced before the request could start");
muts.push_back(builder.build());
muts.push_back(rtbuilder.build());
}
return muts;
}
class join_node_rpc_handshaker : public service::group0_handshaker {
private:
service::storage_service& _ss;
const join_node_request_params& _req;
public:
join_node_rpc_handshaker(service::storage_service& ss, const join_node_request_params& req)
: _ss(ss)
, _req(req)
{}
future<> pre_server_start(const group0_info& g0_info) override {
rtlogger.info("join: sending the join request to {}", g0_info.ip_addr);
co_await utils::get_local_injector().inject("crash_before_group0_join", [](auto& handler) -> future<> {
// This wait ensures that node gossips its state before crashing.
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
throw std::runtime_error("deliberately crashed for orphan remover test");
});
auto result = co_await ser::join_node_rpc_verbs::send_join_node_request(
&_ss._messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, _req);
std::visit(overloaded_functor {
[this] (const join_node_request_result::ok&) {
rtlogger.info("join: request to join placed, waiting"
" for the response from the topology coordinator");
if (utils::get_local_injector().enter("pre_server_start_drop_expiring")) {
_ss._gossiper.get_mutable_address_map().force_drop_expiring_entries();
}
_ss._join_node_request_done.set_value();
},
[] (const join_node_request_result::rejected& rej) {
throw std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason));
},
}, result.result);
co_return;
}
future<bool> post_server_start(const group0_info& g0_info, abort_source& as) override {
// Group 0 has been started. Allow the join_node_response to be handled.
_ss._join_node_group0_started.set_value();
// Processing of the response is done in `join_node_response_handler`.
// Wait for it to complete. If the topology coordinator fails to
// deliver the rejection, it won't complete. In such a case, the
// operator is responsible for shutting down the joining node.
co_await _ss._join_node_response_done.get_shared_future(as);
rtlogger.info("join: success");
co_return true;
}
};
future<bool> storage_service::ongoing_rf_change(const group0_guard& guard, sstring ks) const {
auto ongoing_ks_rf_change = [&] (utils::UUID request_id) -> future<bool> {
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
co_return std::holds_alternative<global_topology_request>(req_entry.request_type) &&
std::get<global_topology_request>(req_entry.request_type) == global_topology_request::keyspace_rf_change &&
req_entry.new_keyspace_rf_change_ks_name.has_value() && req_entry.new_keyspace_rf_change_ks_name.value() == ks;
};
if (_topology_state_machine._topology.global_request_id.has_value()) {
auto req_id = _topology_state_machine._topology.global_request_id.value();
if (co_await ongoing_ks_rf_change(req_id)) {
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.paused_rf_change_requests) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
}
for (auto request_id : _topology_state_machine._topology.global_requests_queue) {
if (co_await ongoing_ks_rf_change(request_id)) {
co_return true;
}
}
co_return false;
}
future<> storage_service::raft_initialize_discovery_leader(const join_node_request_params& params) {
if (params.replaced_id.has_value()) {
throw std::runtime_error(::format("Cannot perform a replace operation because this is the first node in the cluster"));
}
if (params.num_tokens == 0 && params.tokens_string.empty()) {
throw std::runtime_error("Cannot start the first node in the cluster as zero-token");
}
const auto new_group0_state_id = raft_group0_client::generate_group0_state_id(utils::UUID{});
auto write_timestamp = utils::UUID_gen::micros_timestamp(new_group0_state_id);
rtlogger.info("adding myself as the first node to the topology");
auto insert_join_request_mutations = build_mutation_from_join_params(params, write_timestamp);
// We are the first node and we define the cluster.
// Set the enabled_features field to our features.
topology_mutation_builder builder(write_timestamp);
builder.add_enabled_features(params.supported_features | std::ranges::to<std::set<sstring>>())
.set_upgrade_state(topology::upgrade_state_type::done); // Skip upgrade, start right in the topology-on-raft mode
auto enable_features_mutation = builder.build();
insert_join_request_mutations.push_back(std::move(enable_features_mutation));
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(2, write_timestamp);
insert_join_request_mutations.emplace_back(std::move(sl_status_mutation));
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_auth_version_mutation(write_timestamp, db::system_keyspace::auth_version_t::v2));
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
for (auto& m : sl_driver_mutations) {
insert_join_request_mutations.emplace_back(m);
}
if (!utils::get_local_injector().is_enabled("skip_vb_v2_version_mut")) {
insert_join_request_mutations.emplace_back(
co_await _sys_ks.local().make_view_builder_version_mutation(write_timestamp, db::system_keyspace::view_builder_version_t::v2));
}
topology_change change{std::move(insert_join_request_mutations)};
auto history_append = db::system_keyspace::make_group0_history_state_id_mutation(new_group0_state_id,
_migration_manager.local().get_group0_client().get_history_gc_duration(), "bootstrap: adding myself as the first node to the topology");
auto mutation_creator_addr = _sys_ks.local().local_db().get_token_metadata().get_topology().my_address();
co_await write_mutations_to_database(*this, _qp.proxy(), mutation_creator_addr, std::move(change.mutations));
co_await _qp.proxy().mutate_locally({history_append}, nullptr);
}
future<> storage_service::initialize_done_topology_upgrade_state() {
const sstring insert_query = format("UPDATE {}.{} SET upgrade_state='done' WHERE key='topology'",
db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
co_await _qp.execute_internal(
insert_query,
{},
cql3::query_processor::cache_internal::no).discard_result();
}
future<> storage_service::update_topology_with_local_metadata(raft::server& raft_server) {
// TODO: include more metadata here
auto local_shard_count = smp::count;
auto local_ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits();
auto local_release_version = version::release();
auto local_supported_features = _feature_service.supported_feature_set() | std::ranges::to<std::set<sstring>>();
auto synchronized = [&] () {
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error{"Removed from topology while performing metadata update"};
}
auto& replica_state = it->second;
return replica_state.shard_count == local_shard_count
&& replica_state.ignore_msb == local_ignore_msb
&& replica_state.release_version == local_release_version
&& replica_state.supported_features == local_supported_features;
};
// We avoid performing a read barrier if we're sure that our metadata stored in topology
// is the same as local metadata. Note that only we can update our metadata, other nodes cannot.
//
// We use a persisted flag `must_update_topology` to avoid the following scenario:
// 1. the node restarts and its metadata changes
// 2. the node commits the new metadata to topology, but before the update is applied
// to the local state machine, the node crashes
// 3. then the metadata changes back to old values and node restarts again
// 4. the local state machine tells us that we're in sync, which is wrong
// If the persisted flag is true, it tells us that we attempted a metadata change earlier,
// forcing us to perform a read barrier even when the local state machine tells us we're in sync.
if (synchronized() && !(co_await _sys_ks.local().get_must_synchronize_topology())) {
co_return;
}
while (true) {
rtlogger.info("refreshing topology to check if it's synchronized with local metadata");
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (synchronized()) {
break;
}
// It might happen that, in the previous run, the node commits a command
// that adds support for a feature, crashes before applying it and now
// it is not safe to disable support for it. If there is an attempt to
// downgrade the node then `enable_features_on_startup` called much
// earlier won't catch it, we only can do it here after performing
// a read barrier - so we repeat it here.
//
// Fortunately, there is no risk that this feature was marked as enabled
// because it requires that the current node responded to a barrier
// request - which will fail in this situation.
const auto& enabled_features = _topology_state_machine._topology.enabled_features;
const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features();
_feature_service.check_features(enabled_features, unsafe_to_disable_features);
rtlogger.info("updating topology with local metadata");
co_await _sys_ks.local().set_must_synchronize_topology(true);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("shard_count", local_shard_count)
.set("ignore_msb", local_ignore_msb)
.set("release_version", local_release_version)
.set("supported_features", local_supported_features);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("update topology with local metadata:"
" concurrent operation is detected, retrying.");
}
}
co_await _sys_ks.local().set_must_synchronize_topology(false);
}
topology::upgrade_state_type storage_service::get_topology_upgrade_state() const {
SCYLLA_ASSERT(this_shard_id() == 0);
return _topology_state_machine._topology.upgrade_state;
}
future<> storage_service::await_tablets_rebuilt(raft::server_id replaced_id) {
auto is_drained = [&] {
return !get_token_metadata().tablets().has_replica_on(locator::host_id(replaced_id.uuid()));
};
if (!is_drained()) {
slogger.info("Waiting for tablet replicas from the replaced node to be rebuilt");
co_await _topology_state_machine.event.when([&] {
return is_drained();
});
}
slogger.info("Tablet replicas from the replaced node have been rebuilt");
}
future<> storage_service::start_sys_dist_ks() const {
slogger.info("starting system distributed keyspace shards");
return _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
}
future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
std::unordered_map<locator::host_id, sstring> loaded_peer_features,
std::chrono::milliseconds delay,
start_hint_manager start_hm,
gms::generation_type new_generation) {
std::unordered_set<token> bootstrap_tokens;
gms::application_state_map app_states;
/* The timestamp of the CDC streams generation that this node has proposed when joining.
* This value is nullopt only when:
* 1. this node is being upgraded from a non-CDC version,
* 2. this node is starting for the first time or restarting with CDC previously disabled,
* in which case the value should become populated before we leave the join_topology procedure.
*
* Important: this variable is using only during the startup procedure. It is moved out from
* at the end of `join_topology`; the responsibility handling of CDC generations is passed
* to cdc::generation_service.
*
* DO NOT use this variable after `join_topology` (i.e. after we call `generation_service::after_join`
* and pass it the ownership of the timestamp.
*/
std::optional<cdc::generation_id> cdc_gen_id;
std::optional<replacement_info> ri;
std::optional<gms::inet_address> replace_address;
std::optional<raft_group0::replace_info> raft_replace_info;
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (is_replacing()) {
if (_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
ri = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
const auto& my_location = tmptr->get_topology().get_location();
if (my_location != ri->dc_rack) {
auto msg = fmt::format("Cannot replace node {}/{} with a node on a different data center or rack. Current location={}/{}, new location={}/{}",
ri->host_id, ri->address, ri->dc_rack.dc, ri->dc_rack.rack, my_location.dc, my_location.rack);
slogger.error("{}", msg);
throw std::runtime_error(msg);
}
replace_address = ri->address;
raft_replace_info = raft_group0::replace_info {
.raft_id = raft::server_id{ri->host_id.uuid()},
};
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
} else {
auto local_features = _feature_service.supported_feature_set();
slogger.info("Performing gossip shadow round, initial_contact_nodes={}", initial_contact_nodes);
co_await _gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::no);
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
// Check if the node is already removed from the cluster
auto local_host_id = get_token_metadata().get_my_id();
auto my_ip = get_broadcast_address();
if (!_gossiper.is_safe_for_restart(local_host_id)) {
throw std::runtime_error(::format("The node {} with host_id {} is removed from the cluster. Can not restart the removed node to join the cluster again!",
my_ip, local_host_id));
}
co_await _gossiper.reset_endpoint_state_map();
for (const auto& [host_id, st] : loaded_endpoints) {
// gossiping hasn't started yet
// so no need to lock the endpoint
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
}
auto features = _feature_service.supported_feature_set();
slogger.info("Save advertised features list in the 'system.{}' table", db::system_keyspace::LOCAL);
// Save the advertised feature set to system.local table after
// all remote feature checks are complete and after gossip shadow rounds are done.
// At this point, the final feature set is already determined before the node joins the ring.
co_await _sys_ks.local().save_local_supported_features(features);
// If this is a restarting node, we should update tokens before gossip starts
auto my_tokens = co_await _sys_ks.local().get_saved_tokens();
bool restarting_normal_node = _sys_ks.local().bootstrap_complete() && !is_replacing();
if (restarting_normal_node) {
if (my_tokens.empty() && _db.local().get_config().join_ring()) {
throw std::runtime_error("Cannot restart with join_ring=true because the node has already joined the cluster as a zero-token node");
}
if (!my_tokens.empty() && !_db.local().get_config().join_ring()) {
throw std::runtime_error("Cannot restart with join_ring=false because the node already owns tokens");
}
slogger.info("Restarting a node in NORMAL status");
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_topology(tmptr->get_my_id(), _snitch.local()->get_location(), locator::node::state::normal);
co_await tmptr->update_normal_tokens(my_tokens, tmptr->get_my_id());
cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id();
if (!cdc_gen_id) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
// In that case we won't begin a CDC generation: it should be done by one of the nodes
// after it learns that it everyone supports the CDC feature.
cdc_log.warn(
"Restarting node in NORMAL status with CDC enabled, but no streams timestamp was proposed"
" by this node according to its local tables. Are we upgrading from a non-CDC supported version?");
}
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
auto local_host_id = get_token_metadata().get_my_id();
// Replicate the tokens early because once gossip runs other nodes
// might send reads/writes to this node. Replicate it early to make
// sure the tokens are valid on all the shards.
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
utils::get_local_injector().inject("stop_after_saving_tokens",
[] { std::raise(SIGSTOP); });
auto broadcast_rpc_address = get_token_metadata_ptr()->get_topology().my_cql_address();
// Ensure we know our own actual Schema UUID in preparation for updates
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
app_states.emplace(gms::application_state::NET_VERSION, versioned_value::network_version());
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, versioned_value::release_version());
app_states.emplace(gms::application_state::SUPPORTED_FEATURES, versioned_value::supported_features(features));
app_states.emplace(gms::application_state::CACHE_HITRATES, versioned_value::cache_hitrates(""));
app_states.emplace(gms::application_state::SCHEMA_TABLES_VERSION, versioned_value(db::schema_tables::version));
app_states.emplace(gms::application_state::RPC_READY, versioned_value::cql_ready(false));
app_states.emplace(gms::application_state::VIEW_BACKLOG, versioned_value(""));
app_states.emplace(gms::application_state::SCHEMA, versioned_value::schema(_db.local().get_version()));
if (restarting_normal_node) {
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
// Exception: there might be no CDC streams timestamp proposed by us if we're upgrading from a non-CDC version.
app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(my_tokens));
app_states.emplace(gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(cdc_gen_id));
app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens));
}
app_states.emplace(gms::application_state::SNITCH_NAME, versioned_value::snitch_name(_snitch.local()->get_name()));
app_states.emplace(gms::application_state::SHARD_COUNT, versioned_value::shard_count(smp::count));
app_states.emplace(gms::application_state::IGNORE_MSB_BITS, versioned_value::ignore_msb_bits(_db.local().get_config().murmur3_partitioner_ignore_msb_bits()));
for (auto&& s : _snitch.local()->get_app_states()) {
app_states.emplace(s.first, std::move(s.second));
}
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable {
_migration_manager.local().passive_announce(std::move(schema_version));
});
_listeners.emplace_back(make_lw_shared(std::move(schema_change_announce)));
slogger.info("Starting up server gossip");
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
co_await _gossiper.start_gossiping(new_generation, app_states);
utils::get_local_injector().inject("stop_after_starting_gossiping",
[] { std::raise(SIGSTOP); });
SCYLLA_ASSERT(_group0);
join_node_request_params join_params {
.host_id = _group0->load_my_id(),
.cluster_name = _db.local().get_config().cluster_name(),
.snitch_name = _db.local().get_snitch_name(),
.datacenter = _snitch.local()->get_datacenter(),
.rack = _snitch.local()->get_rack(),
.release_version = version::release(),
.num_tokens = _db.local().get_config().join_ring() ? _db.local().get_config().num_tokens() : 0,
.tokens_string = _db.local().get_config().join_ring() ? _db.local().get_config().initial_token() : sstring(),
.shard_count = smp::count,
.ignore_msb = _db.local().get_config().murmur3_partitioner_ignore_msb_bits(),
.supported_features = _feature_service.supported_feature_set() | std::ranges::to<std::vector<sstring>>(),
.request_id = utils::UUID_gen::get_time_UUID(),
};
if (raft_replace_info) {
join_params.replaced_id = raft_replace_info->raft_id;
join_params.ignore_nodes = utils::split_comma_separated_list(_db.local().get_config().ignore_dead_nodes_for_replace());
if (!locator::check_host_ids_contain_only_uuid(join_params.ignore_nodes)) {
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
" be disabled in a future release. Please use host IDs instead. Provided values: {}",
_db.local().get_config().ignore_dead_nodes_for_replace());
}
}
// setup_group0 will do nothing if the node has already set up group 0 in setup_group0_if_exist in main.cc, which
// happens when the node is restarting and not joining the new group 0 in the Raft-based recovery procedure.
// It does not matter which handshaker we choose in this case since it will not be used.
//
// We use the legacy handshaker in the Raft-based recovery procedure to join the new group 0 without involving
// the topology coordinator. We can assume this node has already been accepted by the topology coordinator once
// and joined topology.
::shared_ptr<group0_handshaker> handshaker =
!_db.local().get_config().recovery_leader.is_set()
? ::make_shared<join_node_rpc_handshaker>(*this, join_params)
: _group0->make_legacy_handshaker(raft::is_voter::no);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, std::move(handshaker),
*this, _qp, _migration_manager.local(), join_params);
raft::server& raft_server = _group0->group0_server();
// This is the moment when the locator::topology has gathered information about other nodes
// in the cluster -- either through gossiper, or by loading it from disk -- so it's safe
// to start the hint managers.
if (start_hm) {
co_await proxy.invoke_on_all([] (storage_proxy& local_proxy) {
return local_proxy.start_hints_manager();
});
}
set_mode(mode::JOINING);
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
rtlogger.info("topology changes are using raft");
// Prevent shutdown hangs. We cannot count on wait_for_group0_stop while we are
// joining group 0.
auto sub = _abort_source.subscribe([this] () noexcept {
_group0_as.request_abort();
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
});
// start topology coordinator fiber
_raft_state_monitor = raft_state_monitor_fiber(raft_server, _group0->hold_group0_gate());
// start vnodes cleanup fiber
_sstable_vnodes_cleanup_fiber = sstable_vnodes_cleanup_fiber(raft_server, _group0->hold_group0_gate(), proxy);
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
// process may access those tables.
co_await start_sys_dist_ks();
if (_sys_ks.local().bootstrap_complete()) {
if (_topology_state_machine._topology.left_nodes.contains(raft_server.id())) {
throw std::runtime_error("A node that already left the cluster cannot be restarted");
}
} else {
if (!_db.local().get_config().join_ring() && !_feature_service.zero_token_nodes) {
throw std::runtime_error("Cannot boot a node with join_ring=false because the cluster does not support the ZERO_TOKEN_NODES feature");
}
co_await utils::get_local_injector().inject("crash_before_topology_request_completion", [] (auto& handler) -> future<> {
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
throw std::runtime_error("Crashed in crash_before_topology_request_completion");
});
auto err = co_await wait_for_topology_request_completion(join_params.request_id);
if (!err.empty()) {
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
}
if (raft_replace_info) {
co_await await_tablets_rebuilt(raft_replace_info->raft_id);
}
}
set_topology_change_kind(upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state));
co_await update_topology_with_local_metadata(raft_server);
// Node state is enough to know that bootstrap has completed, but to make legacy code happy
// let it know that the bootstrap is completed as well
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
set_mode(mode::NORMAL);
utils::get_local_injector().inject("stop_after_setting_mode_to_normal_raft_topology",
[] { std::raise(SIGSTOP); });
if (get_token_metadata().sorted_tokens().empty()) {
auto err = ::format("join_topology: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local());
// Initializes monitor only after updating local topology.
start_tablet_split_monitor();
auto ids = _topology_state_machine._topology.normal_nodes |
std::views::keys |
std::views::transform([] (raft::server_id id) { return locator::host_id{id.uuid()}; }) |
std::ranges::to<std::unordered_set<locator::host_id>>();
co_await _gossiper.notify_nodes_on_up(std::move(ids));
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(sstring keyspace, std::optional<table_id> table_id) const {
locator::effective_replication_map_ptr erm;
utils::chunked_vector<token> tokens;
if (table_id.has_value()) {
auto& cf = _db.local().find_column_family(*table_id);
erm = cf.get_effective_replication_map();
} else {
auto& ks = _db.local().find_keyspace(keyspace);
erm = ks.get_static_effective_replication_map();
}
const auto& tm = *erm->get_token_metadata_ptr();
if (erm->get_replication_strategy().uses_tablets()) {
const auto& tablets = tm.tablets().get_tablet_map(*table_id);
tokens = co_await tablets.get_sorted_tokens();
} else {
tokens = tm.sorted_tokens();
}
co_return (co_await locator::get_range_to_address_map(erm, std::move(tokens))) |
std::views::transform([&] (auto tid) { return std::make_pair(tid.first,
tid.second | std::views::transform([&] (auto id) { return _address_map.get(id); }) | std::ranges::to<inet_address_vector_replica_set>()); }) |
std::ranges::to<std::unordered_map>();
}
future<> storage_service::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
slogger.debug("endpoint={} on_join: permit_id={}", endpoint, pid);
co_await on_change(endpoint, id, ep_state->get_application_state_map(), pid);
}
future<> storage_service::on_alive(gms::inet_address endpoint, locator::host_id host_id, gms::endpoint_state_ptr state, gms::permit_id pid) {
const auto& tm = get_token_metadata();
slogger.debug("endpoint={}/{} on_alive: permit_id={}", endpoint, host_id, pid);
const auto* node = tm.get_topology().find_node(host_id);
if (node && node->is_member()) {
co_await notify_up(endpoint, host_id);
} else {
slogger.debug("ignore on_alive since endpoint {}/{} is not a topology member", endpoint, host_id);
}
}
future<> storage_service::on_change(gms::inet_address endpoint, locator::host_id host_id, const gms::application_state_map& states_, gms::permit_id pid) {
// copy the states map locally since the coroutine may yield
auto states = states_;
slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid);
auto ep_state = _gossiper.get_endpoint_state_ptr(host_id);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
co_return;
}
const auto& tm = get_token_metadata();
const auto* node = tm.get_topology().find_node(host_id);
// The check peers[host_id] == endpoint is needed when a node changes
// its IP - on_change can be called by the gossiper for old IP as part
// of its removal, after handle_state_normal has already been called for
// the new one. Without the check, the do_update_system_peers_table call
// overwrites the IP back to its old value.
// In essence, the code under the 'if' should fire if the given IP belongs
// to a cluster member.
if (node && node->is_member() && (co_await _sys_ks.local().get_ip_from_peers_table(host_id)) == endpoint) {
if (!is_me(endpoint)) {
slogger.debug("endpoint={}/{} on_change: updating system.peers table", endpoint, host_id);
if (auto info = get_peer_info_for_update(host_id, states)) {
co_await _sys_ks.local().update_peer_info(endpoint, host_id, *info);
}
}
if (states.contains(application_state::RPC_READY)) {
slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready());
co_await notify_cql_change(endpoint, host_id, ep_state->is_cql_ready());
}
if (auto it = states.find(application_state::INTERNAL_IP); it != states.end()) {
co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(it->second.value()), host_id);
}
}
}
future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip, locator::host_id host_id) {
if (!_snitch.local()->prefer_local()) {
co_return;
}
const auto& topo = get_token_metadata().get_topology();
if (topo.get_datacenter() == topo.get_datacenter(host_id) && _messaging.local().get_preferred_ip(ep) != local_ip) {
slogger.debug("Initiated reconnect to an Internal IP {} for the {}", local_ip, ep);
co_await _messaging.invoke_on_all([ep, local_ip] (auto& local_ms) {
local_ms.cache_preferred_ip(ep, local_ip);
});
}
}
future<> storage_service::on_remove(gms::inet_address endpoint, locator::host_id host_id, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_remove: permit_id={}", endpoint, host_id, pid);
return make_ready_future<>();
}
future<> storage_service::on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_dead: permit_id={}", endpoint, id, pid);
return notify_down(endpoint, id);
}
future<> storage_service::on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={}/{} on_restart: permit_id={}", endpoint, id, pid);
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (id != my_host_id() && _gossiper.is_alive(id)) {
return on_dead(endpoint, id, state, pid);
}
return make_ready_future();
}
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
return db::system_keyspace::peer_info{};
}
auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
return info;
}
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map) {
std::optional<db::system_keyspace::peer_info> ret;
auto get_peer_info = [&] () -> db::system_keyspace::peer_info& {
if (!ret) {
ret.emplace();
}
return *ret;
};
auto set_field = [&]<typename T> (std::optional<T>& field,
const gms::versioned_value& value,
std::string_view name,
bool managed_by_raft)
{
if (managed_by_raft) {
return;
}
try {
field = T(value.value());
} catch (...) {
on_internal_error(slogger, fmt::format("failed to parse {} {} for {}: {}", name, value.value(),
endpoint, std::current_exception()));
}
};
for (const auto& [state, value] : app_state_map) {
switch (state) {
case application_state::DC:
set_field(get_peer_info().data_center, value, "data_center", true);
break;
case application_state::INTERNAL_IP:
set_field(get_peer_info().preferred_ip, value, "preferred_ip", false);
break;
case application_state::RACK:
set_field(get_peer_info().rack, value, "rack", true);
break;
case application_state::RELEASE_VERSION:
set_field(get_peer_info().release_version, value, "release_version", true);
break;
case application_state::RPC_ADDRESS:
set_field(get_peer_info().rpc_address, value, "rpc_address", false);
break;
case application_state::SCHEMA:
set_field(get_peer_info().schema_version, value, "schema_version", false);
break;
case application_state::TOKENS:
// tokens are updated separately
break;
case application_state::SUPPORTED_FEATURES:
set_field(get_peer_info().supported_features, value, "supported_features", true);
break;
default:
break;
}
}
return ret;
}
std::unordered_set<locator::token> storage_service::get_tokens_for(locator::host_id endpoint) {
auto tokens_string = _gossiper.get_application_state_value(endpoint, application_state::TOKENS);
slogger.trace("endpoint={}, tokens_string={}", endpoint, tokens_string);
auto ret = versioned_value::tokens_from_string(tokens_string);
slogger.trace("endpoint={}, tokens={}", endpoint, ret);
return ret;
}
std::optional<locator::endpoint_dc_rack> storage_service::get_dc_rack_for(const gms::endpoint_state& ep_state) {
auto* dc = ep_state.get_application_state_ptr(gms::application_state::DC);
auto* rack = ep_state.get_application_state_ptr(gms::application_state::RACK);
if (!dc || !rack) {
return std::nullopt;
}
return locator::endpoint_dc_rack{
.dc = dc->value(),
.rack = rack->value(),
};
}
std::optional<locator::endpoint_dc_rack> storage_service::get_dc_rack_for(locator::host_id endpoint) {
auto eps = _gossiper.get_endpoint_state_ptr(endpoint);
if (!eps) {
return std::nullopt;
}
return get_dc_rack_for(*eps);
}
void endpoint_lifecycle_notifier::register_subscriber(endpoint_lifecycle_subscriber* subscriber)
{
_subscribers.add(subscriber);
}
future<> endpoint_lifecycle_notifier::unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept
{
return _subscribers.remove(subscriber);
}
future<> storage_service::stop_transport() {
if (!_transport_stopped.has_value()) {
promise<> stopped;
_transport_stopped = stopped.get_future();
seastar::async([this] {
slogger.info("Stop transport: starts");
slogger.debug("shutting down migration manager");
_migration_manager.invoke_on_all(&service::migration_manager::drain).get();
shutdown_protocol_servers().get();
slogger.info("Stop transport: shutdown rpc and cql server done");
_gossiper.container().invoke_on_all(&gms::gossiper::shutdown).get();
slogger.info("Stop transport: stop_gossiping done");
_messaging.invoke_on_all(&netw::messaging_service::shutdown).get();
slogger.info("Stop transport: shutdown messaging_service done");
_stream_manager.invoke_on_all(&streaming::stream_manager::shutdown).get();
slogger.info("Stop transport: shutdown stream_manager done");
slogger.info("Stop transport: done");
}).forward_to(std::move(stopped));
}
return _transport_stopped.value();
}
future<> storage_service::drain_on_shutdown() {
SCYLLA_ASSERT(this_shard_id() == 0);
return (_operation_mode == mode::DRAINING || _operation_mode == mode::DRAINED) ?
_drain_finished.get_future() : do_drain();
}
void storage_service::set_group0(raft_group0& group0) {
_group0 = &group0;
}
future<> storage_service::init_address_map(gms::gossip_address_map& address_map) {
_ip_address_updater = make_shared<ip_address_updater>(address_map, *this);
_gossiper.register_(_ip_address_updater);
co_return;
}
future<> storage_service::uninit_address_map() {
return _gossiper.unregister_(_ip_address_updater);
}
future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
start_hint_manager start_hm, gms::generation_type new_generation) {
SCYLLA_ASSERT(this_shard_id() == 0);
if (_sys_ks.local().was_decommissioned()) {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless "
"all existing data is removed and the node is bootstrapped again");
slogger.error("{}", msg);
throw std::runtime_error(msg);
}
set_mode(mode::STARTING);
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints = co_await _sys_ks.local().load_endpoint_state();
if (_group0->joined_group0()) {
// Recover the endpoint state of the node replacing with the same IP. Its IP mapping is not in system.peers.
const auto& topo = _topology_state_machine._topology;
for (const auto& [id, replica_state]: topo.transition_nodes) {
auto host_id = locator::host_id(id.uuid());
if (replica_state.state == node_state::replacing && !loaded_endpoints.contains(host_id)) {
auto replaced_id = std::get<replace_param>(topo.req_param.at(id)).replaced_id;
if (const auto it = loaded_endpoints.find(locator::host_id(replaced_id.uuid())); it != loaded_endpoints.end()) {
auto ip = it->second.endpoint;
slogger.info("Adding node {}/{} that is replacing {}/{} to loaded_endpoints", host_id, ip, replaced_id, ip);
gms::loaded_endpoint_state st{.endpoint = ip};
loaded_endpoints.emplace(host_id, std::move(st));
}
}
}
}
// Seeds are now only used as the initial contact point nodes. If the
// loaded_endpoints are empty which means this node is a completely new
// node, we use the nodes specified in seeds as the initial contact
// point nodes, otherwise use the peer nodes persisted in system table.
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints | std::views::transform([] (const auto& x) {
return x.second.endpoint;
}) | std::ranges::to<std::unordered_set<gms::inet_address>>();
gms::inet_address recovery_leader_ip;
locator::host_id recovery_leader_id;
if (_db.local().get_config().recovery_leader.is_set()) {
if (_group0->joined_group0()) {
// Something is wrong unless it is a noninitial (and unneeded) restart while recreating the new group 0 in
// the Raft-based recovery procedure.
slogger.warn(
"recovery_leader is set to {} but persistent group 0 ID is present: {}. "
"The recovery_leader option will be ignored. If you are trying to run "
"the Raft-based recovery procedure, please follow the steps in the documentation.",
_db.local().get_config().recovery_leader(), _group0->load_my_id());
} else {
recovery_leader_id = locator::host_id(_db.local().get_config().recovery_leader());
auto recovery_leader_it = loaded_endpoints.find(recovery_leader_id);
if (recovery_leader_id != my_host_id() && recovery_leader_it == loaded_endpoints.end()) {
throw std::runtime_error(
fmt::format("Recovery leader {} unrecognised as a cluster member, loaded endpoints: {}",
recovery_leader_id, loaded_endpoints));
}
recovery_leader_ip = recovery_leader_id == my_host_id() ?
get_broadcast_address() : recovery_leader_it->second.endpoint;
initial_contact_nodes = std::unordered_set{recovery_leader_ip};
if (_group0->client().in_recovery()) {
throw std::runtime_error(format(
"Entered RECOVERY mode and set recovery_leader to {}. RECOVERY mode is used in the "
"gossip-based recovery procedure, while recovery_leader is used in the Raft-based recovery "
"procedure. If the Raft-based topology is enabled in the whole cluster, use the Raft-based "
"procedure. Otherwise, use the gossip-based procedure.", recovery_leader_id));
}
if (!_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot bootstrap in the Raft-based recovery procedure");
}
// The Raft-based topology has been enabled if and only if features in system.topology are non-empty.
// Note that we cannot use the in-memory state machine here. It is not loaded at this point since
// the node hasn't joined the new group 0 yet.
if (!co_await _sys_ks.local().load_topology_features_state()) {
throw std::runtime_error(
"Cannot start in the Raft-based recovery procedure - Raft-based topology has not been enabled");
}
}
}
if (recovery_leader_id) {
// The Raft-based recovery procedure.
slogger.info("Performing Raft-based recovery procedure with recovery leader {}/{}",
recovery_leader_id, recovery_leader_ip);
auto g0_info = co_await _group0->discover_group0(std::vector{recovery_leader_ip}, _qp);
if (g0_info.id.uuid() != recovery_leader_id.uuid()) {
throw std::runtime_error(fmt::format(
"Raft-based recovery procedure - found group 0 {} with leader {}/{} not matching "
"recovery leader {}/{}. The procedure must be restarted.",
g0_info.group0_id, g0_info.id, g0_info.ip_addr, recovery_leader_id, recovery_leader_ip));
}
slogger.info("Raft-based recovery procedure - found group 0 with ID {}", g0_info.group0_id);
set_topology_change_kind(topology_change_kind::raft);
} else if (_group0->client().in_recovery()) {
// The gossip-based recovery procedure.
slogger.info("Raft recovery - starting in legacy topology operations mode");
set_topology_change_kind(topology_change_kind::legacy);
} else if (_group0->joined_group0()) {
// We are a part of group 0.
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::done) {
throw std::runtime_error(
"Cannot start - cluster is not yet upgraded to use raft topology and this version does not support legacy topology operations. "
"If you are trying to upgrade the node then first upgrade the cluster to use raft topology.");
}
set_topology_change_kind(topology_change_kind::raft);
slogger.info("The node is already in group 0");
} else if (_sys_ks.local().bootstrap_complete()) {
// setup_group0_if_exist() should already throw in this case, so do internal error here.
on_internal_error(slogger, "The node does not have group 0 but has already completed bootstrap. This should not happen.");
} else {
// We are not in group 0 and we are just bootstrapping. We need to discover group 0.
const std::vector<gms::inet_address> contact_nodes{initial_contact_nodes.begin(), initial_contact_nodes.end()};
auto g0_info = co_await _group0->discover_group0(contact_nodes, _qp);
slogger.info("Found group 0 with ID {}, with leader of ID {} and IP {}",
g0_info.group0_id, g0_info.id, g0_info.ip_addr);
if (_group0->load_my_id() == g0_info.id) {
// We're creating the group 0.
slogger.info("We are creating the group 0. Start in raft topology operations mode");
set_topology_change_kind(topology_change_kind::raft);
} else {
// Ask the current member of the raft group about which mode to use
auto params = join_node_query_params {};
auto result = co_await ser::join_node_rpc_verbs::send_join_node_query(
&_messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, std::move(params));
switch (result.topo_mode) {
case join_node_query_result::topology_mode::raft:
slogger.info("Will join existing cluster in raft topology operations mode");
set_topology_change_kind(topology_change_kind::raft);
break;
case join_node_query_result::topology_mode::legacy:
throw std::runtime_error(
"Cannot join existing cluster in legacy topology operations mode because it is no longer supported. "
"Enable consistent topology changes with Raft.");
}
}
}
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints | std::views::keys, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
co_return co_await join_topology(proxy, std::move(initial_contact_nodes),
std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm, new_generation);
}
future<token_metadata_change> storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr, const schema_getter& schema_getter) {
SCYLLA_ASSERT(this_shard_id() == 0);
std::exception_ptr ex;
token_metadata_change change;
// Collect open sessions
{
auto session = _topology_state_machine._topology.session;
if (session) {
change.open_sessions.insert(session);
}
for (auto&& [table, tables] : tmptr->tablets().all_table_groups()) {
const auto& tmap = tmptr->tablets().get_tablet_map(table);
for (auto&& [tid, trinfo]: tmap.transitions()) {
if (trinfo.session_id) {
auto id = session_id(trinfo.session_id);
change.open_sessions.insert(id);
}
}
}
}
try {
auto base_shard = this_shard_id();
change.pending_token_metadata_ptr[base_shard] = tmptr;
auto& sharded_token_metadata = _shared_token_metadata.container();
// clone a local copy of updated token_metadata on all other shards
co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> {
change.pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async());
});
// Precalculate new effective_replication_map for all keyspaces
// and clone to all shards;
//
// TODO: at the moment create on shard 0 first
// but in the future we may want to use hash() % smp::count
// to evenly distribute the load.
auto replications = schema_getter.get_keyspaces_replication();
for (const auto& [ks_name, rs] : replications) {
if (rs->is_per_table()) {
continue;
}
auto erm = co_await get_erm_factory().create_static_effective_replication_map(rs, tmptr);
change.pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm));
}
co_await container().invoke_on_others([&] (storage_service& ss) -> future<> {
auto replications = schema_getter.get_keyspaces_replication();
for (const auto& [ks_name, rs] : replications) {
if (rs->is_per_table()) {
continue;
}
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
auto erm = co_await ss.get_erm_factory().create_static_effective_replication_map(rs, tmptr);
change.pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm));
}
});
// Prepare per-table erms.
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
auto tmptr = change.pending_token_metadata_ptr[this_shard_id()];
auto replications = schema_getter.get_keyspaces_replication();
co_await schema_getter.for_each_table_schema_gently([&] (table_id id, schema_ptr table_schema) {
auto rs = replications.at(table_schema->ks_name());
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs->maybe_as_per_table()) {
erm = pt_rs->make_replication_map(id, tmptr);
} else {
erm = change.pending_effective_replication_maps[this_shard_id()][table_schema->ks_name()];
}
if (table_schema->is_view()) {
change.pending_view_erms[this_shard_id()].emplace(id, std::move(erm));
} else {
change.pending_table_erms[this_shard_id()].emplace(id, std::move(erm));
}
return make_ready_future();
});
});
} catch (...) {
ex = std::current_exception();
}
// Rollback on metadata replication error
if (ex) {
try {
co_await smp::invoke_on_all([&] () -> future<> {
auto tmptr = std::move(change.pending_token_metadata_ptr[this_shard_id()]);
auto erms = std::move(change.pending_effective_replication_maps[this_shard_id()]);
auto table_erms = std::move(change.pending_table_erms[this_shard_id()]);
auto view_erms = std::move(change.pending_view_erms[this_shard_id()]);
co_await utils::clear_gently(erms);
co_await utils::clear_gently(tmptr);
co_await utils::clear_gently(table_erms);
co_await utils::clear_gently(view_erms);
});
} catch (...) {
slogger.warn("Failure to reset pending token_metadata in cleanup path: {}. Ignored.", std::current_exception());
}
std::rethrow_exception(std::move(ex));
}
co_return change;
}
void storage_service::commit_token_metadata_change(token_metadata_change& change) noexcept {
slogger.debug("Replicating token_metadata");
// Apply changes on a single shard
try {
_shared_token_metadata.set(std::move(change.pending_token_metadata_ptr[this_shard_id()]));
_groups_manager.update(_shared_token_metadata.get());
auto& db =_db.local();
auto& erms = change.pending_effective_replication_maps[this_shard_id()];
for (auto it = erms.begin(); it != erms.end(); ) {
auto& ks = db.find_keyspace(it->first);
ks.update_static_effective_replication_map(std::move(it->second));
it = erms.erase(it);
}
auto& table_erms = change.pending_table_erms[this_shard_id()];
auto& view_erms = change.pending_view_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
// Update base/views effective_replication_maps atomically.
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
for (const auto& view_ptr : cf.views()) {
const auto& view_id = view_ptr->id();
auto& view = db.find_column_family(view_id);
auto view_it = view_erms.find(view_id);
if (view_it == view_erms.end()) {
throw std::runtime_error(format("Could not find pending effective_replication_map for view {}.{} id={}", view_ptr->ks_name(), view_ptr->cf_name(), view_id));
}
view.update_effective_replication_map(std::move(view_it->second));
if (view.uses_tablets()) {
register_tablet_split_candidate(view_it->first);
}
view_erms.erase(view_it);
}
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
}
it = table_erms.erase(it);
}
if (!view_erms.empty()) {
throw std::runtime_error(fmt::format("Found orphaned pending effective_replication_maps for the following views: {}", std::views::keys(view_erms)));
}
auto& session_mgr = get_topology_session_manager();
session_mgr.initiate_close_of_sessions_except(change.open_sessions);
for (auto id : change.open_sessions) {
session_mgr.create_session(id);
}
} catch (...) {
// applying the changes on all shards should never fail
// it will end up in an inconsistent state that we can't recover from.
slogger.error("Failed to apply token_metadata changes: {}. Aborting.", std::current_exception());
abort();
}
}
future<> token_metadata_change::destroy() {
return smp::invoke_on_all([this] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = nullptr;
co_await utils::clear_gently(pending_effective_replication_maps[this_shard_id()]);
co_await utils::clear_gently(pending_table_erms[this_shard_id()]);
co_await utils::clear_gently(pending_view_erms[this_shard_id()]);
});
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
slogger.debug("Replicating token_metadata to all cores");
class db_schema_getter : public schema_getter {
private:
sharded<replica::database>& _db;
public:
db_schema_getter(sharded<replica::database>& db) : _db(db) {};
virtual flat_hash_map<sstring, locator::replication_strategy_ptr> get_keyspaces_replication() const override {
flat_hash_map<sstring, locator::replication_strategy_ptr> out;
for (auto& [name, ks] : _db.local().get_keyspaces()) {
out.emplace(name, ks.get_replication_strategy_ptr());
}
return out;
};
virtual future<> for_each_table_schema_gently(std::function<future<>(table_id, schema_ptr)> f) const override {
auto ff = [&f](table_id id, lw_shared_ptr<replica::table> t) -> future<> {
return f(id, t->schema());
};
return _db.local().get_tables_metadata().for_each_table_gently(ff);
};
};
db_schema_getter getter{_db};
auto change = co_await prepare_token_metadata_change(tmptr, getter);
co_await container().invoke_on_all([&change] (storage_service& ss) {
ss.commit_token_metadata_change(change);
});
co_await change.destroy();
co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state().
flush_pending_repair_time_update(_db.local());
}
future<> storage_service::stop() {
co_await _migration_manager.local().unplug_storage_service();
// if there is a background "isolate" shutdown
// in progress, we need to sync with it. Mostly
// relevant for tests
if (_transport_stopped.has_value()) {
co_await stop_transport();
}
co_await uninit_messaging_service();
// make sure nobody uses the semaphore
_listeners.clear();
co_await _tablets_module->stop();
co_await _node_ops_module->stop();
co_await _global_topology_requests_module->stop();
co_await _async_gate.close();
_tablet_split_monitor_event.signal();
co_await std::move(_tablet_split_monitor);
_gossiper.set_topology_state_machine(nullptr);
}
future<> storage_service::wait_for_group0_stop() {
if (!_group0_as.abort_requested()) {
_group0_as.request_abort();
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
_view_building_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
co_await when_all(std::move(_raft_state_monitor), std::move(_sstable_vnodes_cleanup_fiber), std::move(_upgrade_to_topology_coordinator_fiber));
}
}
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<locator::host_id, sstring>& loaded_peer_features) {
slogger.debug("Starting shadow gossip round to check for endpoint collision");
return seastar::async([this, initial_contact_nodes, loaded_peer_features] {
bool found_bootstrapping_node = false;
auto local_features = _feature_service.supported_feature_set();
do {
slogger.info("Performing gossip shadow round");
_gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::yes).get();
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
auto addr = get_broadcast_address();
if (!_gossiper.is_safe_for_bootstrap(addr)) {
throw std::runtime_error(::format("A node with address {} already exists, cancelling join. "
"Use replace_address if you want to replace this node.", addr));
}
} while (found_bootstrapping_node);
slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)");
_gossiper.reset_endpoint_state_map().get();
});
}
future<> storage_service::remove_endpoint(inet_address endpoint, gms::permit_id pid) {
auto host_id_opt = _gossiper.try_get_host_id(endpoint);
if (host_id_opt) {
co_await _gossiper.remove_endpoint(*host_id_opt, pid);
}
try {
co_await _sys_ks.local().remove_endpoint(endpoint);
} catch (...) {
slogger.error("fail to remove endpoint={}: {}", endpoint, std::current_exception());
}
}
future<storage_service::replacement_info>
storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<locator::host_id, sstring>& loaded_peer_features) {
locator::host_id replace_host_id;
gms::inet_address replace_address;
auto& cfg = _db.local().get_config();
if (!cfg.replace_node_first_boot().empty()) {
replace_host_id = locator::host_id(utils::UUID(cfg.replace_node_first_boot()));
} else if (!cfg.replace_address_first_boot().empty()) {
replace_address = gms::inet_address(cfg.replace_address_first_boot());
slogger.warn("The replace_address_first_boot={} option is deprecated. Please use the replace_node_first_boot option", replace_address);
} else if (!cfg.replace_address().empty()) {
replace_address = gms::inet_address(cfg.replace_address());
slogger.warn("The replace_address={} option is deprecated. Please use the replace_node_first_boot option", replace_address);
} else {
on_internal_error(slogger, "No replace_node or replace_address configuration options found");
}
slogger.info("Gathering node replacement information for {}/{}", replace_host_id, replace_address);
auto seeds = _gossiper.get_seeds();
if (seeds.size() == 1 && seeds.contains(replace_address)) {
throw std::runtime_error(::format("Cannot replace_address {} because no seed node is up", replace_address));
}
// make magic happen
slogger.info("Performing gossip shadow round");
co_await _gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::yes);
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (replace_host_id) {
auto node = _gossiper.get_node_ip(replace_host_id);
if (!node) {
throw std::runtime_error(::format("Replaced node with Host ID {} not found", replace_host_id));
}
replace_address = *node;
} else {
replace_host_id = _gossiper.get_host_id(replace_address);
}
auto state = _gossiper.get_endpoint_state_ptr(replace_host_id);
if (!state) {
throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address));
}
// Reject to replace a node that has left the ring
auto status = _gossiper.get_gossip_status(replace_host_id);
if (status == gms::versioned_value::STATUS_LEFT || status == gms::versioned_value::REMOVED_TOKEN) {
throw std::runtime_error(::format("Cannot replace_address {} because it has left the ring, status={}", replace_address, status));
}
std::unordered_set<dht::token> tokens;
auto dc_rack = get_dc_rack_for(replace_host_id).value_or(locator::endpoint_dc_rack::default_location);
auto ri = replacement_info {
.tokens = std::move(tokens),
.dc_rack = std::move(dc_rack),
.host_id = std::move(replace_host_id),
.address = replace_address,
};
bool node_ip_specified = false;
for (auto& hoep : parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace())) {
locator::host_id host_id;
gms::loaded_endpoint_state st;
// Resolve both host_id and endpoint
if (hoep.has_endpoint()) {
st.endpoint = hoep.endpoint();
node_ip_specified = true;
} else {
host_id = hoep.id();
auto res = _gossiper.get_node_ip(host_id);
if (!res) {
throw std::runtime_error(::format("Could not find ignored node with host_id {}", host_id));
}
st.endpoint = *res;
}
auto host_id_opt = _gossiper.try_get_host_id(st.endpoint);
if (!host_id_opt) {
throw std::runtime_error(::format("Ignore node {}/{} has no endpoint state", host_id, st.endpoint));
}
if (!host_id) {
host_id = *host_id_opt;
if (!host_id) {
throw std::runtime_error(::format("Could not find host_id for ignored node {}", st.endpoint));
}
}
auto esp = _gossiper.get_endpoint_state_ptr(host_id);
st.tokens = esp->get_tokens();
st.opt_dc_rack = esp->get_dc_rack();
ri.ignore_nodes.emplace(host_id, std::move(st));
}
if (node_ip_specified) {
slogger.warn("Warning: Using IP addresses for '--ignore-dead-nodes-for-replace' is deprecated and will"
" be disabled in the next release. Please use host IDs instead. Provided values: {}",
_db.local().get_config().ignore_dead_nodes_for_replace());
}
slogger.info("Host {}/{} is replacing {}/{} ignore_nodes={}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address,
fmt::join(ri.ignore_nodes | std::views::transform ([] (const auto& x) {
return fmt::format("{}/{}", x.first, x.second.endpoint);
}), ","));
co_await _gossiper.reset_endpoint_state_map();
co_return ri;
}
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
return run_with_no_api_lock([this] (storage_service& ss) {
const auto& tm = ss.get_token_metadata();
auto token_map = dht::token::describe_ownership(tm.sorted_tokens());
// describeOwnership returns tokens in an unspecified order, let's re-order them
std::map<gms::inet_address, float> ownership;
for (auto entry : token_map) {
locator::host_id id = tm.get_endpoint(entry.first).value();
auto token_ownership = entry.second;
ownership[_address_map.get(id)] += token_ownership;
}
return ownership;
});
}
future<std::map<gms::inet_address, float>> storage_service::effective_ownership(sstring keyspace_name, sstring table_name) {
return run_with_no_api_lock([keyspace_name, table_name] (storage_service& ss) mutable -> future<std::map<gms::inet_address, float>> {
locator::effective_replication_map_ptr erm;
if (keyspace_name != "") {
//find throws no such keyspace if it is missing
const replica::keyspace& ks = ss._db.local().find_keyspace(keyspace_name);
// This is ugly, but it follows origin
auto&& rs = ks.get_replication_strategy(); // clang complains about typeid(ks.get_replication_strategy());
if (rs.is_local()) {
throw std::runtime_error("Ownership values for keyspaces with LocalStrategy are meaningless");
}
if (table_name.empty()) {
erm = ks.get_static_effective_replication_map();
} else {
auto& cf = ss._db.local().find_column_family(keyspace_name, table_name);
erm = cf.get_effective_replication_map();
}
} else {
auto non_system_keyspaces = ss._db.local().get_non_system_keyspaces();
//system_traces is a non-system keyspace however it needs to be counted as one for this process
size_t special_table_count = 0;
if (std::find(non_system_keyspaces.begin(), non_system_keyspaces.end(), "system_traces") !=
non_system_keyspaces.end()) {
special_table_count += 1;
}
if (non_system_keyspaces.size() > special_table_count) {
throw std::runtime_error("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless");
}
keyspace_name = "system_traces";
const auto& ks = ss._db.local().find_keyspace(keyspace_name);
erm = ks.get_static_effective_replication_map();
}
// The following loops seems computationally heavy, but it's not as bad.
// The upper two simply iterate over all the endpoints by iterating over all the
// DC and all the instances in each DC.
//
// The call for get_range_for_endpoint is done once per endpoint
const auto& tm = *erm->get_token_metadata_ptr();
const auto tokens = co_await std::invoke([&]() -> future<utils::chunked_vector<token>> {
if (!erm->get_replication_strategy().uses_tablets()) {
return make_ready_future<utils::chunked_vector<token>>(tm.sorted_tokens());
} else {
auto& cf = ss._db.local().find_column_family(keyspace_name, table_name);
const auto& tablets = tm.tablets().get_tablet_map(cf.schema()->id());
return tablets.get_sorted_tokens();
}
});
const auto token_ownership = dht::token::describe_ownership(tokens);
const auto datacenter_endpoints = tm.get_topology().get_datacenter_host_ids();
std::map<gms::inet_address, float> final_ownership;
for (const auto& [dc, endpoints_map] : datacenter_endpoints) {
for (auto endpoint : endpoints_map) {
// calculate the ownership with replication and add the endpoint to the final ownership map
try {
float ownership = 0.0f;
auto ranges = co_await ss.get_ranges_for_endpoint(*erm, endpoint);
for (auto& r : ranges) {
// get_ranges_for_endpoint will unwrap the first range.
// With t0 t1 t2 t3, the first range (t3,t0] will be split
// as (min,t0] and (t3,max]. Skippping the range (t3,max]
// we will get the correct ownership number as if the first
// range were not split.
if (!r.end()) {
continue;
}
auto end_token = r.end()->value();
auto loc = token_ownership.find(end_token);
if (loc != token_ownership.end()) {
ownership += loc->second;
}
}
final_ownership[ss._address_map.find(endpoint).value()] = ownership;
} catch (replica::no_such_keyspace&) {
// In case ss.get_ranges_for_endpoint(keyspace_name, endpoint) is not found, just mark it as zero and continue
final_ownership[ss._address_map.find(endpoint).value()] = 0;
}
}
}
co_return final_ownership;
});
}
void storage_service::set_mode(mode m) {
if (m == mode::MAINTENANCE && _operation_mode != mode::NONE) {
// Prevent from calling `start_maintenance_mode` after `join_cluster`.
on_fatal_internal_error(slogger, format("Node should enter maintenance mode only from mode::NONE (current mode: {})", _operation_mode));
}
if (m == mode::STARTING && _operation_mode == mode::MAINTENANCE) {
// Prevent from calling `join_cluster` after `start_maintenance_mode`.
on_fatal_internal_error(slogger, "Node in the maintenance mode cannot enter the starting mode");
}
if (m != _operation_mode) {
slogger.info("entering {} mode", m);
_operation_mode = m;
} else {
// This shouldn't happen, but it's too much for an SCYLLA_ASSERT,
// so -- just emit a warning in the hope that it will be
// noticed, reported and fixed
slogger.warn("re-entering {} mode", m);
}
}
sstring storage_service::get_release_version() {
return version::release();
}
sstring storage_service::get_schema_version() {
return _db.local().get_version().to_sstring();
}
static constexpr auto UNREACHABLE = "UNREACHABLE";
future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::describe_schema_versions() {
auto live_hosts = _gossiper.get_live_members();
std::unordered_map<sstring, std::vector<sstring>> results;
netw::messaging_service& ms = _messaging.local();
return map_reduce(std::move(live_hosts), [&ms, as = abort_source()] (auto host) mutable {
auto f0 = ser::migration_manager_rpc_verbs::send_schema_check(&ms, host, as);
return std::move(f0).then_wrapped([host] (auto f) {
if (f.failed()) {
f.ignore_ready_future();
return std::pair<locator::host_id, std::optional<table_schema_version>>(host, std::nullopt);
}
return std::pair<locator::host_id, std::optional<table_schema_version>>(host, f.get());
});
}, std::move(results), [this] (auto results, auto host_and_version) {
auto version = host_and_version.second ? host_and_version.second->to_sstring() : UNREACHABLE;
results.try_emplace(version).first->second.emplace_back(fmt::to_string(_address_map.get(host_and_version.first)));
return results;
}).then([this] (auto results) {
// we're done: the results map is ready to return to the client. the rest is just debug logging:
auto it_unreachable = results.find(UNREACHABLE);
if (it_unreachable != results.end()) {
slogger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", fmt::join(it_unreachable->second, ","));
}
auto my_version = get_schema_version();
for (auto&& entry : results) {
// check for version disagreement. log the hosts that don't agree.
if (entry.first == UNREACHABLE || entry.first == my_version) {
continue;
}
for (auto&& host : entry.second) {
slogger.debug("{} disagrees ({})", host, entry.first);
}
}
if (results.size() == 1) {
slogger.debug("Schemas are in agreement.");
}
return results;
});
};
future<storage_service::mode> storage_service::get_operation_mode() {
return run_with_no_api_lock([] (storage_service& ss) {
return make_ready_future<mode>(ss._operation_mode);
});
}
future<bool> storage_service::is_gossip_running() {
return run_with_no_api_lock([] (storage_service& ss) {
return ss._gossiper.is_enabled();
});
}
future<> storage_service::start_gossiping() {
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) -> future<> {
if (!ss._gossiper.is_enabled()) {
slogger.warn("Starting gossip by operator request");
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::start);
bool should_stop_gossiper = false; // undo action
try {
auto cdc_gen_ts = co_await ss._sys_ks.local().get_cdc_generation_id();
if (!cdc_gen_ts) {
cdc_log.warn("CDC generation timestamp missing when starting gossip");
}
co_await set_gossip_tokens(ss._gossiper,
co_await ss._sys_ks.local().get_local_tokens(),
cdc_gen_ts);
ss._gossiper.force_newer_generation();
co_await ss._gossiper.start_gossiping(gms::get_generation_number());
} catch (...) {
should_stop_gossiper = true;
}
if (should_stop_gossiper) {
co_await ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
}
});
}
future<> storage_service::stop_gossiping() {
return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) {
if (ss._gossiper.is_enabled()) {
slogger.warn("Stopping gossip by operator request");
return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
return make_ready_future<>();
});
}
static size_t count_normal_token_owners(const topology& topology) {
return std::count_if(topology.normal_nodes.begin(), topology.normal_nodes.end(), [] (const auto& node) {
return !node.second.ring.value().tokens.empty();
});
}
future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
if (_topology_state_machine._topology.normal_nodes.size() == 1) {
throw std::runtime_error("Cannot decommission last node in the cluster");
}
if (!rs.ring.value().tokens.empty() && count_normal_token_owners(_topology_state_machine._topology) == 1) {
throw std::runtime_error("Cannot decommission the last token-owning node in the cluster");
}
auto validation_result = validate_removing_node(_db.local(), locator::host_id(raft_server.id().uuid()));
if (std::holds_alternative<node_validation_failure>(validation_result)) {
throw std::runtime_error(fmt::format("Decommission failed: node decommission rejected: {}",
std::get<node_validation_failure>(validation_result).reason));
}
rtlogger.info("request decommission for: {}", raft_server.id());
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::leave)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::leave);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id()));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("decommission: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.info("decommission: waiting for completion (request ID: {})", request_id);
auto error = co_await wait_for_topology_request_completion(request_id);
if (error.empty()) {
// Need to set it otherwise gossiper will try to send shutdown on exit
rtlogger.info("decommission: successfully removed from topology (request ID: {}), updating gossip status", request_id);
co_await _gossiper.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count())));
rtlogger.info("Decommission succeeded. Request ID: {}", request_id);
} else {
auto err = fmt::format("Decommission failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
}
future<> storage_service::decommission() {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
return seastar::async([&ss] {
ss.check_ability_to_perform_topology_operation("decommission");
if (ss._operation_mode != mode::NORMAL) {
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
}
ss.raft_decommission().get();
ss.stop_transport().get();
slogger.info("DECOMMISSIONING: stopped transport");
ss.get_batchlog_manager().invoke_on_all([] (auto& bm) {
return bm.drain();
}).get();
slogger.info("DECOMMISSIONING: stop batchlog_manager done");
// StageManager.shutdownNow();
ss._sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
slogger.info("DECOMMISSIONING: set_bootstrap_state done");
ss.set_mode(mode::DECOMMISSIONED);
slogger.info("DECOMMISSIONING: done");
// let op be responsible for killing the process
});
});
}
future<> storage_service::raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
auto id = raft::server_id{host_id.uuid()};
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(id);
if (!it) {
throw std::runtime_error(::format("removenode: host id {} is not found in the cluster", host_id));
}
auto& rs = it->second; // not usable after yield
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("removenode: node {} is in '{}' state. Wait for it to be in 'normal' state", id, rs.state));
}
if (!rs.ring.value().tokens.empty() && count_normal_token_owners(_topology_state_machine._topology) == 1) {
throw std::runtime_error(::format(
"removenode: node {} cannot be removed because it is the last token-owning "
"node in the cluster. If this node is unrecoverable, the cluster has entered an incorrect "
"and unrecoverable state. All user data and a part of the system data is lost.",
id));
}
if (_gossiper.is_alive(host_id)) {
const std::string message = ::format(
"removenode: Rejected removenode operation for node {}"
"the node being removed is alive, maybe you should use decommission instead?",
id);
rtlogger.warn("{}", message);
throw std::runtime_error(message);
}
auto validation_result = validate_removing_node(_db.local(), host_id);
if (std::holds_alternative<node_validation_failure>(validation_result)) {
throw std::runtime_error(fmt::format("Removenode failed: node remove rejected: {}",
std::get<node_validation_failure>(validation_result).reason));
}
auto ignored_ids = find_raft_nodes_from_hoeps(ignore_nodes_params);
// insert node that should be removed to ignore list so that other topology operations
// can ignore it
ignored_ids.insert(id);
rtlogger.info("request removenode for: {}, new ignored nodes: {}, existing ignore nodes: {}", id, ignored_ids, _topology_state_machine._topology.ignored_nodes);
topology_mutation_builder builder(guard.write_timestamp());
builder.add_ignored_nodes(ignored_ids).with_node(id)
.set("topology_request", topology_request::remove)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::remove);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
request_id = guard.new_group0_state_id();
if (auto itr = _topology_state_machine._topology.requests.find(id);
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
}
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("removenode: concurrent operation is detected, retrying.");
continue;
}
break;
}
rtlogger.info("removenode: waiting for completion (request ID: {})", request_id);
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Removenode failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
rtlogger.info("Removenode succeeded. Request ID: {}", request_id);
}
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.mark_excluded(hosts);
});
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::unordered_set<raft::server_id> raft_hosts;
for (auto host : hosts) {
if (_gossiper.is_alive(host)) {
const std::string message = ::format("Cannot mark host {} as excluded because it's alive", host);
rtlogger.warn("{}", message);
throw std::runtime_error(message);
}
raft_hosts.insert(raft::server_id(host.uuid()));
}
topology_mutation_builder builder(guard.write_timestamp());
builder.add_ignored_nodes(raft_hosts);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("Mark as excluded: {}", hosts));
rtlogger.info("Marking nodes as excluded: {}, previous set: {}", hosts, _topology_state_machine._topology.ignored_nodes);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("mark_excluded: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("Nodes marked as excluded: {}", hosts);
break;
}
}
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
return run_with_no_api_lock([host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
ss.check_ability_to_perform_topology_operation("removenode");
ss.raft_removenode(host_id, std::move(ignore_nodes_params)).get();
});
});
}
future<> storage_service::check_and_repair_cdc_streams() {
SCYLLA_ASSERT(this_shard_id() == 0);
if (!_cdc_gens.local_is_initialized()) {
return make_exception_future<>(std::runtime_error("CDC generation service not initialized yet"));
}
check_ability_to_perform_topology_operation("checkAndRepairCdcStreams");
return raft_check_and_repair_cdc_streams();
}
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req) {
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid);
if (req.cmd == node_ops_cmd::repair_updater) {
slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator);
_db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (replica::database &db) {
for (const auto& table_id : tables) {
try {
auto& table = db.find_column_family(table_id);
table.update_off_strategy_trigger();
slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}",
ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator);
} catch (replica::no_such_column_family&) {
// The table could be dropped by user, ignore it.
} catch (...) {
throw;
}
}
}).get();
return node_ops_cmd_response(true);
}
slogger.error("node_ops_cmd_handler cmd={}, ops_uuid={} this cmd is no longer supported", req.cmd, ops_uuid);
return node_ops_cmd_response(false);
});
}
future<> storage_service::reload_schema() {
// Flush memtables and clear cache so that we use the same state we would after node restart
// to rule out potential discrepancies which could stem from merging with memtable/cache readers.
co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await _migration_manager.invoke_on(0, [] (auto& mm) {
return mm.reload_schema();
});
}
future<> storage_service::drain() {
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
if (ss._operation_mode == mode::DRAINED) {
slogger.warn("Cannot drain node (did it already happen?)");
return make_ready_future<>();
}
ss.set_mode(mode::DRAINING);
return ss.do_drain().then([&ss] {
ss._drain_finished.set_value();
ss.set_mode(mode::DRAINED);
});
});
}
future<> storage_service::do_drain() {
// Need to stop transport before group0, otherwise RPCs may fail with raft_group_not_found.
co_await stop_transport();
// Drain view builder before group0, because the view builder uses group0 to coordinate view building.
// Drain after transport is stopped, because view_builder::drain aborts view writes for user writes as well.
co_await _view_builder.invoke_on_all(&db::view::view_builder::drain);
co_await _view_building_worker.invoke_on_all(&db::view::view_building_worker::drain);
// group0 persistence relies on local storage, so we need to stop group0 first.
// This must be kept in sync with defer_verbose_shutdown for group0 in main.cc to
// handle the case when initialization fails before reaching drain_on_shutdown for ss.
_sl_controller.local().abort_group0_operations();
co_await wait_for_group0_stop();
if (_group0) {
co_await _group0->abort_and_drain();
}
co_await tracing::tracing::tracing_instance().invoke_on_all(&tracing::tracing::shutdown);
co_await get_batchlog_manager().invoke_on_all([] (auto& bm) {
return bm.drain();
});
co_await _db.invoke_on_all(&replica::database::drain);
co_await _sys_ks.invoke_on_all(&db::system_keyspace::shutdown);
co_await _repair.invoke_on_all(&repair_service::shutdown);
}
future<> storage_service::do_clusterwide_vnodes_cleanup() {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::cleanup) {
throw std::runtime_error{
"topology coordinator: cluster-wide vnodes cleanup: a different topology request is already pending, try again later"};
}
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
rtlogger.info("cluster-wide vnodes cleanup requested");
topology_mutation_builder builder(guard.write_timestamp());
utils::chunked_vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
builder.queue_global_topology_request_id(request_id);
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::cleanup);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::cleanup);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("vnodes cleanup: cluster-wide cleanup requested"));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("cluster-wide vnodes cleanup: concurrent operation is detected, retrying.");
continue;
}
break;
}
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Cluster-wide vnodes cleanup failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
}
// The wait above only waits until the command is processed by the topology coordinator which start
// the vnodes cleanup process, but we still need to wait for it to complete here.
co_await _topology_state_machine.event.when([this] {
return std::all_of(_topology_state_machine._topology.normal_nodes.begin(), _topology_state_machine._topology.normal_nodes.end(), [] (auto& n) {
return n.second.cleanup == cleanup_status::clean;
});
});
rtlogger.info("cluster-wide vnodes cleanup done");
}
future<> storage_service::reset_cleanup_needed() {
auto& server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto me = _topology_state_machine._topology.find(server.id());
if (!me || me->second.state != node_state::normal) {
throw std::runtime_error(format("cannot mark the node as clean: local node {} is either not a member of the cluster or is not in a normal state", server.id()));
}
if (me->second.cleanup != cleanup_status::needed) {
rtlogger.info("cannot reset cleanup flag when it is {}", me->second.cleanup);
co_return;
}
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup status reset by force for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("cleanup needed flag is reset by force");
break;
}
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id, bool require_entry) {
co_return co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), id, require_entry);
}
future<> storage_service::abort_topology_request(utils::UUID request_id) {
co_await container().invoke_on(0, [request_id, this] (storage_service& ss) {
return _topology_state_machine.abort_request(*ss._group0, ss._group0_as, ss._feature_service, request_id);
});
}
future<> storage_service::wait_for_topology_not_busy() {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
while (_topology_state_machine._topology.is_busy()) {
release_guard(std::move(guard));
co_await _topology_state_machine.event.when();
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
}
future<> storage_service::abort_paused_rf_change(utils::UUID request_id) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.abort_paused_rf_change(request_id);
});
}
if (!_feature_service.rack_list_rf) {
throw std::runtime_error("The RACK_LIST_RF feature is not enabled on the cluster yet");
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
bool found = std::ranges::contains(_topology_state_machine._topology.paused_rf_change_requests, request_id);
if (!found) {
slogger.warn("RF change request with id '{}' is not paused, so it can't be aborted", request_id);
co_return;
}
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.resume_rf_change_request(_topology_state_machine._topology.paused_rf_change_requests, request_id).build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(request_id)
.done("Aborted by user request")
.build()));
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
format("aborting rf change request {}", request_id));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
slogger.info("aborting request {}: concurrent modification, retrying.", request_id);
continue;
}
break;
}
}
bool storage_service::topology_global_queue_empty() const {
return !_topology_state_machine._topology.global_request.has_value();
}
semaphore& storage_service::get_do_sample_sstables_concurrency_limiter() {
return _do_sample_sstables_concurrency_limiter;
}
future<uint64_t> storage_service::estimate_total_sstable_volume(table_id t, ignore_errors errors) {
co_return co_await seastar::map_reduce(
_db.local().get_token_metadata().get_host_ids(),
[&] (auto h) -> future<uint64_t> {
try {
co_return co_await ser::storage_service_rpc_verbs::send_estimate_sstable_volume(&_messaging.local(), h, t);
}
catch(...) {
if (errors == ignore_errors::yes) {
// If the call failed we just return 0 for this one
slogger.info("call to estimate_total_sstable_volume failed for table {} and host {}, returning 0", t, h);
co_return 0;
}
throw;
}
},
uint64_t(0),
std::plus<uint64_t>()
);
}
future<std::vector<std::byte>> storage_service::train_dict(utils::chunked_vector<temporary_buffer<char>> sample) {
std::vector<std::vector<std::byte>> tmp;
tmp.reserve(sample.size());
for (const auto& s : sample) {
auto v = std::as_bytes(std::span(s));
tmp.push_back(std::vector<std::byte>(v.begin(), v.end()));
}
co_return co_await container().invoke_on(0, [tmp = std::move(tmp)] (auto& local) {
if (!local._train_dict) {
on_internal_error(slogger, "retrain_dict: _train_dict not plugged");
}
return local._train_dict(std::move(tmp));
});
}
future<> storage_service::publish_new_sstable_dict(table_id t_id, std::span<const std::byte> dict, service::raft_group0_client& group0_client) {
co_await container().invoke_on(0, coroutine::lambda([t_id, dict, &group0_client] (storage_service& local_ss) -> future<> {
auto group0_holder = local_ss._group0->hold_group0_gate();
while (true) {
try {
auto name = fmt::format("sstables/{}", t_id);
slogger.debug("publish_new_sstable_dict: trying to publish the dict as {}", name);
auto batch = service::group0_batch(co_await group0_client.start_operation(local_ss._group0_as));
auto write_ts = batch.write_timestamp();
auto new_dict_ts = db_clock::now();
auto data = bytes(reinterpret_cast<const bytes::value_type*>(dict.data()), dict.size());
auto this_host_id = local_ss._db.local().get_token_metadata().get_topology().get_config().this_host_id;
mutation publish_new_dict = co_await local_ss._sys_ks.local().get_insert_dict_mutation(name, std::move(data), this_host_id, new_dict_ts, write_ts);
batch.add_mutation(std::move(publish_new_dict), "publish new SSTable compression dictionary");
slogger.debug("publish_new_sstable_dict: committing");
co_await std::move(batch).commit(group0_client, local_ss._group0_as, {});
slogger.debug("publish_new_sstable_dict: finished");
break;
} catch (const service::group0_concurrent_modification&) {
slogger.debug("group0_concurrent_modification in publish_new_sstable_dict, retrying");
}
}
}));
}
void storage_service::set_train_dict_callback(decltype(_train_dict) cb) {
_train_dict = std::move(cb);
}
future<utils::chunked_vector<temporary_buffer<char>>> storage_service::do_sample_sstables(table_id t, uint64_t chunk_size, uint64_t n_chunks) {
uint64_t max_chunks_per_round = 16 * 1024 * 1024 / chunk_size;
uint64_t chunks_done = 0;
auto result = utils::chunked_vector<temporary_buffer<char>>();
result.reserve(n_chunks);
while (chunks_done < n_chunks) {
auto chunks_this_round = std::min(max_chunks_per_round, n_chunks - chunks_done);
auto round_result = co_await do_sample_sstables_oneshot(t, chunk_size, chunks_this_round);
std::move(round_result.begin(), round_result.end(), std::back_inserter(result));
if (round_result.size() < chunks_this_round) {
break;
}
chunks_done += chunks_this_round;
}
co_return result;
}
future<utils::chunked_vector<temporary_buffer<char>>> storage_service::do_sample_sstables_oneshot(table_id t, uint64_t chunk_size, uint64_t n_chunks) {
slogger.debug("do_sample_sstables(): called with table_id={} chunk_size={} n_chunks={}", t, chunk_size, n_chunks);
auto& db = _db.local();
auto& ms = _messaging.local();
std::unordered_map<locator::host_id, uint64_t> estimated_sizes;
co_await coroutine::parallel_for_each(
db.get_token_metadata().get_host_ids(),
[&] (auto h) -> future<> {
auto est = co_await ser::storage_service_rpc_verbs::send_estimate_sstable_volume(&ms, h, t);
if (est) {
estimated_sizes.emplace(h, est);
}
}
);
const auto total_size = std::ranges::fold_left(estimated_sizes | std::ranges::views::values, uint64_t(0), std::plus());
slogger.debug("do_sample_sstables(): estimate_sstable_volume returned {}, total={}", estimated_sizes, total_size);
std::unordered_map<locator::host_id, uint64_t> chunks_per_host;
{
uint64_t partial_sum = 0;
uint64_t covered_samples = 0;
for (const auto& [k, v] : estimated_sizes) {
partial_sum += v;
uint64_t next_covered = static_cast<double>(partial_sum) / total_size * n_chunks;
chunks_per_host.emplace(k, next_covered - covered_samples);
covered_samples = next_covered;
}
// Just a sanity check
auto covered = std::ranges::fold_left(chunks_per_host | std::ranges::views::values, uint64_t(0), std::plus());
if (total_size > 0 && covered != n_chunks) {
on_internal_error(slogger, "do_sample_sstables(): something went wrong with the sample distribution algorithm");
}
}
slogger.debug("do_sample_sstables(): sending out send_sample_sstables with proportions {}", chunks_per_host);
auto samples = co_await seastar::map_reduce(
chunks_per_host,
[&] (std::pair<locator::host_id, uint64_t> h_s) -> future<utils::chunked_vector<temporary_buffer<char>>> {
const auto& [h, sz] = h_s;
return ser::storage_service_rpc_verbs::send_sample_sstables(&ms, h, t, chunk_size, sz);
},
utils::chunked_vector<temporary_buffer<char>>(),
[] (auto v, auto some_samples) {
std::ranges::move(some_samples, std::back_inserter(v));
return v;
}
);
slogger.debug("do_sample_sstables(): returned {} chunks", samples.size());
co_return samples;
}
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
auto& raft_server = _group0->group0_server();
auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
throw std::runtime_error(::format("local node {} is not a member of the cluster", raft_server.id()));
}
const auto& rs = it->second;
if (rs.state != node_state::normal) {
throw std::runtime_error(::format("local node is not in the normal state (current state: {})", rs.state));
}
if (rs.ring.value().tokens.empty()) {
rtlogger.warn("local node does not own any tokens, skipping redundant rebuild");
co_return;
}
if (_topology_state_machine._topology.normal_nodes.size() == 1) {
throw std::runtime_error("Cannot rebuild a single node");
}
rtlogger.info("request rebuild for: {} source_dc={}", raft_server.id(), sdc_param);
topology_mutation_builder builder(guard.write_timestamp());
sstring source_dc = sdc_param.value_or("");
if (sdc_param.force() && !source_dc.empty()) {
source_dc += ":force";
}
builder.with_node(raft_server.id())
.set("topology_request", topology_request::rebuild)
.set("rebuild_option", source_dc)
.set("request_id", guard.new_group0_state_id());
topology_request_tracking_mutation_builder rtbuilder(guard.new_group0_state_id(), _feature_service.topology_requests_type_column);
rtbuilder.set("initiating_host",_group0->group0_server().id().uuid())
.set("done", false);
rtbuilder.set("request_type", topology_request::rebuild);
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("rebuild: concurrent operation is detected, retrying.");
continue;
}
break;
}
// Wait until request completes
auto err = co_await wait_for_topology_request_completion(request_id);
if (!err.empty()) {
throw std::runtime_error(::format("rebuild failed: {}", err));
}
}
future<> storage_service::raft_check_and_repair_cdc_streams() {
std::optional<cdc::generation_id_v2> last_committed_gen;
utils::UUID request_id;
while (true) {
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::optional<global_topology_request> curr_req;
if (_topology_state_machine._topology.global_request) {
curr_req = *_topology_state_machine._topology.global_request;
request_id = _topology_state_machine._topology.global_request_id.value();
} else if (!_topology_state_machine._topology.global_requests_queue.empty()) {
request_id = _topology_state_machine._topology.global_requests_queue[0];
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
curr_req = std::get<global_topology_request>(req_entry.request_type);
} else {
request_id = utils::UUID{};
}
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
if (!_feature_service.topology_global_request_queue) {
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
} else {
request_id = utils::UUID{};
}
}
if (_topology_state_machine._topology.committed_cdc_generations.empty()) {
slogger.error("check_and_repair_cdc_streams: no committed CDC generations, requesting a new one.");
} else {
last_committed_gen = _topology_state_machine._topology.committed_cdc_generations.back();
auto gen = co_await _sys_ks.local().read_cdc_generation(last_committed_gen->id);
if (cdc::is_cdc_generation_optimal(gen, get_token_metadata())) {
cdc_log.info("CDC generation {} does not need repair", last_committed_gen);
co_return;
}
cdc_log.info("CDC generation {} needs repair, requesting a new one", last_committed_gen);
}
// With global request queue coalescing requests should not be needed, but test_cdc_generation_publishing assumes that multiple new_cdc_generation
// commands will be coalesced here, so do that until the test is fixed.
if (!request_id) {
topology_mutation_builder builder(guard.write_timestamp());
utils::chunked_vector<canonical_mutation> muts;
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
builder.queue_global_topology_request_id(request_id);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::new_cdc_generation);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
}
muts.push_back(builder.build());
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
continue;
}
}
break;
}
if (request_id) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (!error.empty()) {
auto err = fmt::format("Check and repair cdc stream failed. See earlier errors ({}). Request ID: {}", error, request_id);
rtlogger.error("{}", err);
throw std::runtime_error(err);
}
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
if (last_committed_gen == gen) {
on_internal_error(rtlogger, "Wrong generation after completion of check and repair cdc stream");
}
} else {
// Wait until we commit a new CDC generation.
co_await _topology_state_machine.event.when([this, &last_committed_gen] {
auto gen = _topology_state_machine._topology.committed_cdc_generations.empty()
? std::nullopt
: std::optional(_topology_state_machine._topology.committed_cdc_generations.back());
return last_committed_gen != gen;
});
}
}
future<> storage_service::rebuild(utils::optional_param source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> {
ss.check_ability_to_perform_topology_operation("rebuild");
if (auto tablets_keyspaces = ss._db.local().get_tablets_keyspaces(); !tablets_keyspaces.empty()) {
std::ranges::sort(tablets_keyspaces);
slogger.warn("Rebuild is not supported for the following tablets-enabled keyspaces: {}: "
"Rebuild is not required for tablets-enabled keyspace after increasing replication factor. "
"However, recovering from local data loss on this node requires running repair on all nodes in the datacenter", tablets_keyspaces);
}
if (ss.raft_topology_change_enabled()) {
co_await ss.raft_rebuild(source_dc);
} else {
slogger.info("rebuild from dc: {}", source_dc);
auto tmptr = ss.get_token_metadata_ptr();
auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms();
if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
co_await ss._repair.local().rebuild_with_repair(std::move(ks_erms), tmptr, std::move(source_dc), null_topology_guard);
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
tmptr->get_my_id(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
if (source_dc) {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(*source_dc));
}
for (const auto& [keyspace_name, erm] : ks_erms) {
co_await streamer->add_ranges(keyspace_name, erm, co_await ss.get_ranges_for_endpoint(*erm, ss.my_host_id()), ss._gossiper, false);
}
try {
co_await streamer->stream_async();
slogger.info("Streaming for rebuild successful");
} catch (...) {
auto ep = std::current_exception();
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
slogger.warn("Error while rebuilding node: {}", ep);
std::rethrow_exception(std::move(ep));
}
}
}
});
}
void storage_service::check_ability_to_perform_topology_operation(std::string_view operation_name) const {
switch (_topology_change_kind_enabled) {
case topology_change_kind::unknown:
throw std::runtime_error(fmt::format("{} is not allowed at this time - the node is still starting", operation_name));
case topology_change_kind::upgrading_to_raft:
throw std::runtime_error(fmt::format("{} is not allowed at this time - the node is still in the process"
" of upgrading to raft topology", operation_name));
case topology_change_kind::legacy:
return;
case topology_change_kind::raft:
return;
}
}
int32_t storage_service::get_exception_count() {
// FIXME
// We return 0 for no exceptions, it should probably be
// replaced by some general exception handling that would count
// the unhandled exceptions.
//return (int)StorageMetrics.exceptions.count();
return 0;
}
future<std::unordered_multimap<dht::token_range, locator::host_id>>
storage_service::get_changed_ranges_for_leaving(const locator::vnode_effective_replication_map* erm, locator::host_id endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = co_await get_ranges_for_endpoint(*erm, endpoint);
slogger.debug("Node {} ranges [{}]", endpoint, ranges);
std::unordered_map<dht::token_range, host_id_vector_replica_set> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = erm->get_natural_replicas(end_token);
current_replica_endpoints.emplace(r, std::move(eps));
co_await coroutine::maybe_yield();
}
auto temp = co_await get_token_metadata_ptr()->clone_after_all_left();
// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
// command was used), it is still present in temp and must be removed.
if (temp.is_normal_token_owner(endpoint)) {
temp.remove_endpoint(endpoint);
}
std::unordered_multimap<dht::token_range, locator::host_id> changed_ranges;
// Go through the ranges and for each range check who will be
// storing replicas for these ranges when the leaving endpoint
// is gone. Whoever is present in newReplicaEndpoints list, but
// not in the currentReplicaEndpoints list, will be needing the
// range.
const auto& rs = erm->get_replication_strategy();
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
const dht::token_range& range_ = it->first;
host_id_vector_replica_set& current_eps = it->second;
slogger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints);
for (auto ep : it->second) {
auto beg = new_replica_endpoints.begin();
auto end = new_replica_endpoints.end();
new_replica_endpoints.erase(std::remove(beg, end, ep), end);
}
}
if (slogger.is_enabled(logging::log_level::debug)) {
if (new_replica_endpoints.empty()) {
slogger.debug("Range {} already in all replicas", r);
} else {
slogger.debug("Range {} will be responsibility of {}", r, new_replica_endpoints);
}
}
for (auto& ep : new_replica_endpoints) {
changed_ranges.emplace(r, ep);
}
// Replication strategy doesn't necessarily yield in calculate_natural_endpoints.
// E.g. everywhere_replication_strategy
co_await coroutine::maybe_yield();
}
co_await temp.clear_gently();
co_return changed_ranges;
}
future<> storage_service::unbootstrap() {
slogger.info("Started batchlog replay for decommission");
co_await get_batchlog_manager().local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes);
slogger.info("Finished batchlog replay for decommission");
if (is_repair_based_node_ops_enabled(streaming::stream_reason::decommission)) {
co_await _repair.local().decommission_with_repair(get_token_metadata_ptr(), null_topology_guard);
} else {
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream;
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
auto ranges_mm = co_await get_changed_ranges_for_leaving(erm->maybe_as_vnode_effective_replication_map(), my_host_id());
if (slogger.is_enabled(logging::log_level::debug)) {
std::vector<wrapping_interval<token>> ranges;
for (auto& x : ranges_mm) {
ranges.push_back(x.first);
}
slogger.debug("Ranges needing transfer for keyspace={} are [{}]", keyspace_name, ranges);
}
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
}
set_mode(mode::LEAVING);
auto stream_success = stream_ranges(std::move(ranges_to_stream));
// wait for the transfer runnables to signal the latch.
slogger.debug("waiting for stream acks.");
try {
co_await std::move(stream_success);
} catch (...) {
slogger.warn("unbootstrap fails to stream : {}", std::current_exception());
throw;
}
slogger.debug("stream acks all received.");
}
}
future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, locator::host_id leaving_node) {
auto my_address = my_host_id();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, ermp] : ks_erms) {
auto* erm = ermp->maybe_as_vnode_effective_replication_map();
std::unordered_multimap<dht::token_range, locator::host_id> changed_ranges = co_await get_changed_ranges_for_leaving(erm, leaving_node);
dht::token_range_vector my_new_ranges;
for (auto& x : changed_ranges) {
if (x.second == my_address) {
my_new_ranges.emplace_back(x.first);
}
}
std::unordered_multimap<locator::host_id, dht::token_range> source_ranges = co_await get_new_source_ranges(erm, my_new_ranges);
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto& x : source_ranges) {
ranges_per_endpoint[x.first].emplace_back(x.second);
}
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
}
}
future<> storage_service::removenode_with_stream(locator::host_id leaving_node,
frozen_topology_guard topo_guard,
shared_ptr<abort_source> as_ptr) {
return seastar::async([this, leaving_node, as_ptr, topo_guard] {
auto tmptr = get_token_metadata_ptr();
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});
if (!as_ptr) {
throw std::runtime_error("removenode_with_stream: abort_source is nullptr");
}
auto as_ptr_sub = as_ptr->subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, tmptr->get_my_id(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
} catch (...) {
slogger.warn("removenode_with_stream: stream failed: {}", std::current_exception());
throw;
}
});
}
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint_ip,
locator::host_id endpoint_hid, gms::permit_id pid) {
slogger.info("Removing tokens {} for {}", tokens, endpoint_ip);
// FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
co_await remove_endpoint(endpoint_ip, pid);
auto tmlock = std::make_optional(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
tmptr->remove_endpoint(endpoint_hid);
tmptr->remove_bootstrap_tokens(tokens);
co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint_ip));
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
co_await notify_released(endpoint_hid);
co_await notify_left(endpoint_ip, endpoint_hid);
}
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint_ip,
locator::host_id endpoint_hid, int64_t expire_time, gms::permit_id pid) {
add_expire_time_if_found(endpoint_hid, expire_time);
return excise(tokens, endpoint_ip, endpoint_hid, pid);
}
future<> storage_service::leave_ring() {
co_await _cdc_gens.local().leave_ring();
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP);
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
const auto my_id = tmptr->get_my_id();
tmptr->remove_endpoint(my_id);
return update_topology_change_info(std::move(tmptr), ::format("leave_ring {}/{}", endpoint, my_id));
});
auto expire_time = _gossiper.compute_expire_time().time_since_epoch().count();
co_await _gossiper.add_local_application_state(gms::application_state::STATUS,
versioned_value::left(co_await _sys_ks.local().get_local_tokens(), expire_time));
auto delay = std::max(get_ring_delay(), gms::gossiper::INTERVAL);
slogger.info("Announcing that I have left the ring for {}ms", delay.count());
co_await sleep_abortable(delay, _abort_source);
}
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_token_metadata_ptr()->get_my_id(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission, null_topology_guard);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
if (ranges_with_endpoints.empty()) {
continue;
}
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
dht::token_range r = end_point_entry.first;
locator::host_id endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
co_await coroutine::maybe_yield();
}
streamer.add_tx_ranges(keyspace, std::move(ranges_per_endpoint));
}
try {
co_await streamer.stream_async();
slogger.info("stream_ranges successful");
} catch (...) {
auto ep = std::current_exception();
slogger.warn("stream_ranges failed: {}", ep);
std::rethrow_exception(std::move(ep));
}
}
void storage_service::add_expire_time_if_found(locator::host_id endpoint, int64_t expire_time) {
if (expire_time != 0L) {
using clk = gms::gossiper::clk;
auto time = clk::time_point(clk::duration(expire_time));
_gossiper.add_expire_time_for_endpoint(endpoint, time);
}
}
bool storage_service::is_raft_leader() const noexcept {
return _group0->joined_group0() && _group0->group0_server().is_leader();
}
future<> storage_service::shutdown_protocol_servers() {
for (auto& server : _protocol_servers) {
slogger.info("Shutting down {} server", server->name());
try {
co_await server->stop_server();
} catch (...) {
slogger.error("Unexpected error shutting down {} server: {}",
server->name(), std::current_exception());
throw;
}
slogger.info("Shutting down {} server was successful", server->name());
}
}
future<std::unordered_multimap<locator::host_id, dht::token_range>>
storage_service::get_new_source_ranges(const locator::vnode_effective_replication_map* erm, const dht::token_range_vector& ranges) const {
auto my_address = my_host_id();
std::unordered_map<dht::token_range, host_id_vector_replica_set> range_addresses = co_await erm->get_range_host_ids();
std::unordered_multimap<locator::host_id, dht::token_range> source_ranges;
// find alive sources for our new ranges
auto tmptr = erm->get_token_metadata_ptr();
for (auto r : ranges) {
host_id_vector_replica_set sources;
auto it = range_addresses.find(r);
if (it != range_addresses.end()) {
sources = it->second;
}
tmptr->get_topology().sort_by_proximity(my_address, sources);
if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) {
auto err = ::format("get_new_source_ranges: sources={}, my_address={}", sources, my_address);
slogger.warn("{}", err);
throw std::runtime_error(err);
}
for (auto& source : sources) {
if (_gossiper.is_alive(source)) {
source_ranges.emplace(source, r);
break;
}
}
co_await coroutine::maybe_yield();
}
co_return source_ranges;
}
future<> storage_service::move(token new_token) {
return run_with_api_lock(sstring("move"), [] (storage_service& ss) mutable {
return make_exception_future<>(std::runtime_error("Move operation is not supported only more"));
});
}
future<utils::chunked_vector<storage_service::token_range_endpoints>>
storage_service::describe_ring(const sstring& keyspace, bool include_only_local_dc) const {
if (_db.local().find_keyspace(keyspace).uses_tablets()) {
throw std::runtime_error(fmt::format("The keyspace {} has tablet table. Query describe_ring with the table parameter!", keyspace));
}
co_return co_await locator::describe_ring(_db.local(), _gossiper, keyspace, include_only_local_dc);
}
future<utils::chunked_vector<dht::token_range_endpoints>>
storage_service::describe_ring_for_table(const sstring& keyspace_name, const sstring& table_name) const {
slogger.debug("describe_ring for table {}.{}", keyspace_name, table_name);
auto& t = _db.local().find_column_family(keyspace_name, table_name);
if (!t.uses_tablets()) {
auto ranges = co_await describe_ring(keyspace_name);
co_return ranges;
}
table_id tid = t.schema()->id();
auto erm = t.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
const auto& topology = erm->get_topology();
utils::chunked_vector<dht::token_range_endpoints> ranges;
ranges.reserve(tmap.tablet_count());
std::unordered_map<locator::host_id, locator::describe_ring_endpoint_info> host_infos;
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto range = tmap.get_token_range(id);
auto& replicas = info.replicas;
dht::token_range_endpoints tr;
if (range.start()) {
tr._start_token = range.start()->value().to_sstring();
}
if (range.end()) {
tr._end_token = range.end()->value().to_sstring();
}
tr._endpoints.reserve(replicas.size());
tr._rpc_endpoints.reserve(replicas.size());
tr._endpoint_details.reserve(replicas.size());
for (auto& r : replicas) {
auto& endpoint = r.host;
auto it = host_infos.find(endpoint);
if (it == host_infos.end()) {
it = host_infos.emplace(endpoint, get_describe_ring_endpoint_info(endpoint, topology, _gossiper)).first;
}
tr._rpc_endpoints.emplace_back(it->second.rpc_addr);
tr._endpoints.emplace_back(fmt::to_string(it->second.details._host));
tr._endpoint_details.emplace_back(it->second.details);
}
ranges.push_back(std::move(tr));
return make_ready_future<>();
});
co_return ranges;
}
std::map<token, inet_address> storage_service::get_token_to_endpoint_map() {
const auto& tm = get_token_metadata();
std::map<token, inet_address> result;
for (const auto [t, id]: tm.get_token_to_endpoint()) {
result.insert({t, _address_map.get(id)});
}
for (const auto [t, id]: tm.get_bootstrap_tokens()) {
result.insert({t, _address_map.get(id)});
}
return result;
}
future<std::map<token, inet_address>> storage_service::get_tablet_to_endpoint_map(table_id table) {
const auto& tm = get_token_metadata();
const auto& tmap = tm.tablets().get_tablet_map(table);
std::map<token, inet_address> result;
for (std::optional<locator::tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid, tm.get_topology()).host));
co_await coroutine::maybe_yield();
}
co_return result;
}
std::chrono::milliseconds storage_service::get_ring_delay() {
auto ring_delay = _db.local().get_config().ring_delay_ms();
slogger.trace("Get RING_DELAY: {}ms", ring_delay);
return std::chrono::milliseconds(ring_delay);
}
future<locator::token_metadata_lock> storage_service::get_token_metadata_lock() noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
return _shared_token_metadata.get_lock();
}
// Acquire the token_metadata lock and get a mutable_token_metadata_ptr.
// Pass that ptr to \c func, and when successfully done,
// replicate it to all cores.
//
// By default the merge_lock (that is unified with the token_metadata_lock)
// is acquired for mutating the token_metadata. Pass acquire_merge_lock::no
// when called from paths that already acquire the merge_lock, like
// db::schema_tables::do_merge_schema.
//
// Note: must be called on shard 0.
future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock acquire_merge_lock) noexcept {
SCYLLA_ASSERT(this_shard_id() == 0);
std::optional<token_metadata_lock> tmlock;
if (acquire_merge_lock) {
tmlock.emplace(co_await get_token_metadata_lock());
}
auto tmptr = co_await get_mutable_token_metadata_ptr();
co_await func(tmptr);
co_await replicate_to_all_cores(std::move(tmptr));
}
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
SCYLLA_ASSERT(this_shard_id() == 0);
try {
locator::dc_rack_fn get_dc_rack_by_host_id([this, &tm = *tmptr] (locator::host_id host_id) -> std::optional<locator::endpoint_dc_rack> {
const auto server_id = raft::server_id(host_id.uuid());
const auto* node = _topology_state_machine._topology.find(server_id);
if (node) {
return locator::endpoint_dc_rack {
.dc = node->second.datacenter,
.rack = node->second.rack,
};
}
return std::nullopt;
});
co_await tmptr->update_topology_change_info(get_dc_rack_by_host_id);
} catch (...) {
auto ep = std::current_exception();
slogger.error("Failed to update topology change info for {}: {}", reason, ep);
std::rethrow_exception(std::move(ep));
}
}
future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) {
return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable {
return update_topology_change_info(std::move(tmptr), std::move(reason));
}, acquire_merge_lock);
}
future<> storage_service::keyspace_changed(const sstring& ks_name) {
// The keyspace_changed notification is called on all shards
// after any keyspace schema change, but we need to mutate_token_metadata
// once after all shards are done with database::update_keyspace.
// mutate_token_metadata (via update_topology_change_info) will update the
// token metadata and effective_replication_map on all shards.
if (this_shard_id() != 0) {
return make_ready_future<>();
}
// Update pending ranges since keyspace can be changed after we calculate pending ranges.
sstring reason = ::format("keyspace {}", ks_name);
return update_topology_change_info(reason, acquire_merge_lock::no);
}
future<locator::mutable_token_metadata_ptr> storage_service::prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata) {
SCYLLA_ASSERT(this_shard_id() == 0);
if (hint) {
co_await replica::update_tablet_metadata(_db.local(), _qp, pending_token_metadata->tablets(), hint);
} else {
pending_token_metadata->set_tablets(co_await replica::read_tablet_metadata(_qp));
}
pending_token_metadata->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
co_return pending_token_metadata;
}
void storage_service::wake_up_topology_state_machine() noexcept {
_topology_state_machine.event.broadcast();
}
future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
auto change = co_await prepare_tablet_metadata(hint,
co_await get_mutable_token_metadata_ptr());
co_await replicate_to_all_cores(std::move(change));
wake_up_topology_state_machine();
}
future<> storage_service::process_tablet_split_candidate(table_id table) noexcept {
tasks::task_info tablet_split_task_info;
auto all_compaction_groups_split = [&] () mutable {
return _db.map_reduce0([table_ = table] (replica::database& db) {
auto all_split = db.find_column_family(table_).all_storage_groups_split();
return make_ready_future<bool>(all_split);
}, bool{true}, std::logical_and<bool>());
};
auto split_all_compaction_groups = [&] () -> future<> {
return _db.invoke_on_all([table, tablet_split_task_info] (replica::database& db) -> future<> {
return db.find_column_family(table).split_all_storage_groups(tablet_split_task_info);
});
};
exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
bool sleep = false;
try {
// Ensures that latest changes to tablet metadata, in group0, are visible
auto guard = co_await _group0->client().start_operation(_group0_as);
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
if (!tmap.needs_split()) {
release_guard(std::move(guard));
break;
}
tablet_split_task_info.id = tasks::task_id{tmap.resize_task_info().tablet_task_id.uuid()};
if (co_await all_compaction_groups_split()) {
slogger.debug("All compaction groups of table {} are split ready.", table);
release_guard(std::move(guard));
break;
} else {
release_guard(std::move(guard));
co_await split_all_compaction_groups();
}
} catch (const locator::no_such_tablet_map& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (const replica::no_such_column_family& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (const seastar::abort_requested_exception& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (raft::request_aborted& ex) {
slogger.warn("Failed to complete splitting of table {} due to {}", table, ex);
break;
} catch (...) {
slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds",
table, std::current_exception(), split_retry.sleep_time());
sleep = true;
}
if (sleep) {
try {
co_await split_retry.retry(_group0_as);
} catch (...) {
slogger.warn("Sleep in split monitor failed with {}", std::current_exception());
}
}
}
}
void storage_service::register_tablet_split_candidate(table_id table) noexcept {
if (this_shard_id() != 0) {
return;
}
try {
if (get_token_metadata().tablets().get_tablet_map(table).needs_split()) {
_tablet_split_candidates.push_back(table);
_tablet_split_monitor_event.signal();
}
} catch (...) {
slogger.error("Unable to register table {} as candidate for tablet splitting, due to {}", table, std::current_exception());
}
}
future<> storage_service::run_tablet_split_monitor() {
auto can_proceed = [this] { return !_async_gate.is_closed() && !_group0_as.abort_requested(); };
while (can_proceed()) {
auto tablet_split_candidates = std::exchange(_tablet_split_candidates, {});
for (auto candidate : tablet_split_candidates) {
co_await process_tablet_split_candidate(candidate);
}
co_await utils::clear_gently(tablet_split_candidates);
// Returns if there is more work to do, or shutdown was requested.
co_await _tablet_split_monitor_event.when([&] {
return _tablet_split_candidates.size() > 0 || !can_proceed();
});
}
}
void storage_service::start_tablet_split_monitor() {
if (this_shard_id() != 0) {
return;
}
slogger.info("Starting the tablet split monitor...");
_tablet_split_monitor = run_tablet_split_monitor();
}
future<> storage_service::snitch_reconfigured() {
SCYLLA_ASSERT(this_shard_id() == 0);
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
tmptr->update_topology(tmptr->get_my_id(), snitch->get_location());
return make_ready_future<>();
});
if (_gossiper.is_enabled()) {
co_await _gossiper.add_local_application_state(snitch->get_app_states());
}
}
future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
raft_topology_cmd_result result;
rtlogger.info("topology cmd rpc {} is called index={}", cmd.cmd, cmd_index);
try {
auto& raft_server = _group0->group0_server();
auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
co_await raft_server.read_barrier(&_group0_as);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;
}
auto id = raft_server.id();
group0_holder.release();
{
auto& state = _raft_topology_cmd_handler_state;
if (state.term != term) {
state.term = term;
} else if (cmd_index <= state.last_index) {
// Return an error since the command is outdated
co_return result;
}
state.last_index = cmd_index;
}
// We capture the topology version right after the checks
// above, before any yields. This is crucial since _topology_state_machine._topology
// might be altered concurrently while this method is running,
// which can cause the fence command to apply an invalid fence version.
const auto version = _topology_state_machine._topology.version;
switch (cmd.cmd) {
case raft_topology_cmd::command::barrier: {
utils::get_local_injector().inject("raft_topology_barrier_fail",
[] { throw std::runtime_error("raft topology barrier failed due to error injection"); });
// This barrier might have been issued by the topology coordinator
// as a step in enabling a feature, i.e. it noticed that all
// nodes support some feature, then issue the barrier to make
// sure that all nodes observed this fact in their local state
// (a node cannot revoke support for a feature after that), and
// after receiving a confirmation from all nodes it will mark
// the feature as enabled.
//
// However, it might happen that the node handles this request
// early in the boot process, before it did the second feature
// check that happens when the node updates its metadata
// in `system.topology`. The node might have committed a command
// that advertises support for a feature as the last node
// to do so, crashed and now it doesn't support it. This should
// be rare, but it can happen and we can detect it right here.
std::exception_ptr ex;
try {
const auto& enabled_features = _topology_state_machine._topology.enabled_features;
const auto unsafe_to_disable_features = _topology_state_machine._topology.calculate_not_yet_enabled_features();
_feature_service.check_features(enabled_features, unsafe_to_disable_features);
} catch (const gms::unsupported_feature_exception&) {
ex = std::current_exception();
}
if (ex) {
rtlogger.error("feature check during barrier failed: {}", ex);
co_await drain();
break;
}
// we already did read barrier above
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case raft_topology_cmd::command::barrier_and_drain: {
utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail_before", [] {
throw std::runtime_error("raft_topology_barrier_and_drain_fail_before injected exception");
});
co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5)));
if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) {
for (auto& n : _topology_state_machine._topology.transition_nodes) {
if (!_address_map.find(locator::host_id{n.first.uuid()})) {
rtlogger.error("The topology transition is in a double write state but the IP of the node in transition is not known");
break;
}
}
}
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
const auto current_version = ss._shared_token_metadata.get()->get_version();
rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, "
"current version {}, stale versions (version: use_count): {}",
version, current_version, ss._shared_token_metadata.describe_stale_versions());
// This shouldn't happen under normal operation, it's only plausible
// if the topology change coordinator has
// moved to another node and managed to update the topology
// parallel to this method. The previous coordinator
// should be inactive now, so it won't observe this
// exception. By returning exception we aim
// to reveal any other conditions where this may arise.
if (current_version != version) {
co_await coroutine::return_exception(std::runtime_error(
::format("raft topology: command::barrier_and_drain, the version has changed, "
"version {}, current_version {}, the topology change coordinator "
" had probably migrated to another node",
version, current_version)));
}
co_await ss._shared_token_metadata.stale_versions_in_use();
co_await get_topology_session_manager().drain_closing_sessions();
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
});
co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {
auto ks = handler.get("keyspace");
auto cf = handler.get("table");
auto last_token = dht::token::from_int64(std::atoll(handler.get("last_token")->data()));
auto table_id = _db.local().find_column_family(*ks, *cf).schema()->id();
auto stage = co_await replica::read_tablet_transition_stage(_qp, table_id, last_token);
if (stage) {
sstring want_stage(handler.get("stage").value());
if (*stage == locator::tablet_transition_stage_from_string(want_stage)) {
rtlogger.info("raft_topology_cmd: barrier handler waits");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("raft_topology_cmd: barrier handler continues");
}
}
});
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case raft_topology_cmd::command::stream_ranges: {
co_await with_scheduling_group(_stream_manager.local().get_scheduling_group(), coroutine::lambda([&] () -> future<> {
const auto* server_rs = _topology_state_machine._topology.find(id);
if (!server_rs) {
on_internal_error(rtlogger, format("Got {} request for node {} not found in topology", cmd.cmd, id));
}
const auto& rs = server_rs->second;
auto tstate = _topology_state_machine._topology.tstate;
auto session = _topology_state_machine._topology.session;
if (!rs.ring || rs.ring->tokens.empty()) {
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
co_return;
}
if (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding) {
rtlogger.warn("got {} request while the topology transition state is {} and node state is {}", cmd.cmd, tstate, rs.state);
co_return;
}
utils::get_local_injector().inject("stream_ranges_fail",
[] { throw std::runtime_error("stream_range failed due to error injection"); });
utils::get_local_injector().inject("stop_before_streaming",
[] { std::raise(SIGSTOP); });
switch(rs.state) {
case node_state::bootstrapping:
case node_state::replacing: {
set_mode(mode::BOOTSTRAP);
// See issue #4001
co_await _view_builder.local().mark_existing_views_as_built();
co_await _db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_start();
}
});
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
if (rs.state == node_state::bootstrapping) {
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::bootstrap, _bootstrap_result, [this, &rs, session] (this auto) -> future<> {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
co_await utils::get_local_injector().inject("delay_bootstrap_120s", std::chrono::seconds(120));
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, session);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, session);
}
});
co_await task->done();
}
// Bootstrap did not complete yet, but streaming did
utils::get_local_injector().inject("stop_after_streaming",
[] { std::raise(SIGSTOP); });
} else {
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, [this, &rs, &id, replaced_id, session] (this auto) -> future<> {
if (!_topology_state_machine._topology.req_param.contains(id)) {
on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
}
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
auto ignored_nodes = _topology_state_machine._topology.ignored_nodes | std::views::transform([] (const auto& id) {
return locator::host_id(id.uuid());
}) | std::ranges::to<std::unordered_set<locator::host_id>>();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
auto tmptr = get_token_metadata_ptr();
auto replaced_node = locator::host_id(replaced_id.uuid());
co_await _repair.local().replace_with_repair(std::move(ks_erms), std::move(tmptr), rs.ring.value().tokens, std::move(ignored_nodes), replaced_node, session);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, session, locator::host_id{replaced_id.uuid()});
}
});
co_await task->done();
}
co_await _db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_end();
}
});
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::decommissioning: {
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::decommission, _decommission_result, [this] (this auto) -> future<> {
co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", utils::wait_for_message(60s));
co_await unbootstrap();
});
co_await task->done();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::normal: {
// If asked to stream a node in normal state it means that remove operation is running
// Find the node that is been removed
auto it = std::ranges::find_if(_topology_state_machine._topology.transition_nodes, [] (auto& e) { return e.second.state == node_state::removing; });
if (it == _topology_state_machine._topology.transition_nodes.end()) {
rtlogger.warn("got stream_ranges request while my state is normal but cannot find a node that is been removed");
break;
}
auto id = it->first;
rtlogger.debug("streaming to remove node {}", id);
tasks::task_info parent_info{tasks::task_id{it->second.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::removenode, _remove_result[id], [this, id = locator::host_id{id.uuid()}, session] (this auto) {
auto as = make_shared<abort_source>();
auto sub = _abort_source.subscribe([as] () noexcept {
if (!as->abort_requested()) {
as->request_abort();
}
});
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
std::list<locator::host_id> ignored_ips = _topology_state_machine._topology.ignored_nodes | std::views::transform([] (const auto& id) {
return locator::host_id(id.uuid());
}) | std::ranges::to<std::list<locator::host_id>>();
auto ops = seastar::make_shared<node_ops_info>(node_ops_id::create_random_id(), as, std::move(ignored_ips));
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), id, ops, session);
} else {
return removenode_with_stream(id, session, as);
}
});
co_await task->done();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::rebuilding: {
auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_node_ops_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::rebuild, _rebuild_result, [this, &source_dc, session] (this auto) -> future<> {
auto tmptr = get_token_metadata_ptr();
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
utils::optional_param sdc_param;
bool force;
if ((force = source_dc.ends_with(":force"))) {
source_dc.resize(source_dc.size() - 6);
}
if (!source_dc.empty()) {
sdc_param.emplace(source_dc).set_user_provided().set_force(force);
}
co_await _repair.local().rebuild_with_repair(std::move(ks_erms), tmptr, std::move(sdc_param), session);
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
tmptr->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, session);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
}
for (const auto& [keyspace_name, erm] : ks_erms) {
auto ranges = co_await get_ranges_for_endpoint(*erm, my_host_id());
co_await streamer->add_ranges(keyspace_name, erm, std::move(ranges), _gossiper, false);
}
try {
co_await streamer->stream_async();
rtlogger.info("streaming for rebuild successful");
} catch (...) {
auto ep = std::current_exception();
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
rtlogger.warn("error while rebuilding node: {}", ep);
std::rethrow_exception(std::move(ep));
}
}
});
co_await task->done();
_rebuild_result.reset();
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::left:
case node_state::none:
case node_state::removing:
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
id, rs.state));
break;
}
}));
}
break;
case raft_topology_cmd::command::wait_for_ip: {
std::vector<raft::server_id> ids;
{
const auto& new_nodes = _topology_state_machine._topology.new_nodes;
ids.reserve(new_nodes.size());
for (const auto& [id, rs]: new_nodes) {
ids.push_back(id);
}
}
rtlogger.debug("Got raft_topology_cmd::wait_for_ip, new nodes [{}]", ids);
for (const auto& id: ids) {
co_await wait_for_gossiper(id, _gossiper, _abort_source);
}
rtlogger.debug("raft_topology_cmd::wait_for_ip done [{}]", ids);
result.status = raft_topology_cmd_result::command_status::success;
break;
}
}
} catch (const raft::request_aborted& e) {
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
} catch (...) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
}
rtlogger.info("topology cmd rpc {} completed with status={} index={}",
cmd.cmd, (result.status == raft_topology_cmd_result::command_status::success) ? "succeeded" : "failed", cmd_index);
co_return result;
}
future<> storage_service::update_fence_version(token_metadata::version_t new_version) {
return container().invoke_on_all([new_version] (storage_service& ss) {
ss._qp.proxy().update_fence_version(new_version);
});
}
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
// metadata according to his intent and trigger the operation,
// without passing any transient information.
//
// If the operation succeeds, and the coordinator is still valid, it means
// that the operation intended by the coordinator was performed.
// If the coordinator is no longer valid, the operation may succeed but
// the actual operation performed may be different than intended, it may
// be the one intended by the new coordinator. This is not a problem
// because the old coordinator should do nothing with such result.
//
// The triggers may be retried. They may also be reordered with older triggers, from
// the same or a different coordinator. There is a protocol which ensures that
// stale triggers won't cause operations to run beyond the migration stage they were
// intended for. For example, that streaming is not still running after the coordinator
// moved past the "streaming" stage, and that it won't be started when the stage is not appropriate.
// A non-stale trigger is the one which completed successfully and caused the valid coordinator
// to advance tablet migration to the next stage. Other triggers are called stale.
// We can divide stale triggers into categories:
// (1) Those which start after the tablet was moved to the next stage
// Those which start before the tablet was moved to the next stage,
// (2) ...but after the non-stale trigger finished
// (3) ...but before the non-stale trigger finished
//
// By "start" I mean the atomic block which inserts into _tablet_ops, and by "finish" I mean
// removal from _tablet_ops.
// So event ordering is local from the perspective of this replica, and is linear because
// this happens on the same shard.
//
// What prevents (1) from running is the fact that triggers check the state of tablet
// metadata, and will fail immediately if the stage is not appropriate. It can happen
// that the trigger is so stale that it will match with an appropriate stage of the next
// migration of the same tablet. This is not a problem because we fall into the same
// category as a stale trigger which was started in the new migration, so cases (2) or (3) apply.
//
// What prevents (2) from running is the fact that after the coordinator moves on to
// the next stage, it executes a token metadata barrier, which will wait for such triggers
// to complete as they hold on to erm via tablet_metadata_barrier. They should be aborted
// soon after the coordinator changes the stage by the means of tablet_metadata_barrier::get_abort_source().
//
// What prevents (3) from running is that they will join with the non-stale trigger, or non-stale
// trigger will join with them, depending on which came first. In that case they finish at the same time.
//
// It's very important that the global token metadata barrier involves all nodes which
// may receive stale triggers started in the previous stage, so that those nodes will
// see tablet metadata which reflects group0 state. This will cut-off stale triggers
// as soon as the coordinator moves to the next stage.
future<tablet_operation_result> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<tablet_operation_result>(locator::tablet_metadata_guard&)> op) {
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_group0_as);
if (_tablet_ops.contains(tablet)) {
rtlogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
auto result = co_await _tablet_ops[tablet].done.get_future();
co_return result;
}
locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
auto& as = guard.get_abort_source();
auto sub = _group0_as.subscribe([&as] () noexcept {
as.request_abort();
});
auto async_gate_holder = _async_gate.hold();
promise<tablet_operation_result> p;
_tablet_ops.emplace(tablet, tablet_operation {
op_name, seastar::shared_future<tablet_operation_result>(p.get_future())
});
auto erase_registry_entry = seastar::defer([&] {
_tablet_ops.erase(tablet);
});
try {
auto result = co_await op(guard);
p.set_value(result);
rtlogger.debug("{} for tablet migration of {} successful", op_name, tablet);
co_return result;
} catch (...) {
p.set_exception(std::current_exception());
rtlogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception());
throw;
}
}
future<service::tablet_operation_repair_result> storage_service::repair_tablet(locator::global_tablet_id tablet, service::session_id session_id) {
auto result = co_await do_tablet_operation(tablet, "Repair", [this, tablet, session_id] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
slogger.debug("Executing repair for tablet={}", tablet);
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this repair was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::repair && trinfo->stage != locator::tablet_transition_stage::rebuild_repair) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at repair", tablet));
}
auto session = session_id ? session_id : trinfo->session_id;
slogger.debug("repair_tablet: tablet={} session_id={}", tablet, session);
tasks::task_info global_tablet_repair_task_info;
std::optional<locator::tablet_replica_set> replicas = std::nullopt;
if (trinfo->stage == locator::tablet_transition_stage::repair) {
global_tablet_repair_task_info = {tasks::task_id{tmap.get_tablet_info(tablet.tablet).repair_task_info.tablet_task_id.uuid()}, 0};
} else {
auto migration_streaming_info = get_migration_streaming_info(get_token_metadata_ptr()->get_topology(), tmap.get_tablet_info(tablet.tablet), *trinfo);
replicas = locator::tablet_replica_set{migration_streaming_info.read_from.begin(), migration_streaming_info.read_from.end()};
}
utils::get_local_injector().inject("repair_tablet_fail_on_rpc_call",
[] { throw std::runtime_error("repair_tablet failed due to error injection"); });
auto time = co_await _repair.local().repair_tablet(_address_map, guard, tablet, global_tablet_repair_task_info, session, std::move(replicas), trinfo->stage);
co_return service::tablet_operation_repair_result{time};
});
if (std::holds_alternative<service::tablet_operation_repair_result>(result)) {
co_return std::get<service::tablet_operation_repair_result>(result);
}
on_internal_error(slogger, "Got wrong tablet_operation_repair_result");
}
future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id tablet, locator::tablet_replica leaving, locator::tablet_replica pending) {
if (leaving.host != pending.host) {
throw std::runtime_error(fmt::format("Leaving and pending tablet replicas belong to different nodes, {} and {} respectively",
leaving.host, pending.host));
}
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
bool leave_unsealed = true;
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto& table = _db.local().find_column_family(tablet.table);
auto op = table.stream_in_progress();
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
});
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto& mng = t.get_sstables_manager();
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
// The loader will consider current shard as sstable owner, despite the tablet sharder
// will still point to leaving replica at this stage in migration. If node goes down,
// SSTables will be loaded at pending replica and migration is retried, so correctness
// wise, we're good.
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
.unsealed_sstable = leave_unsealed };
co_await sst->load(sharder, cfg);
co_return sst;
};
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
// Loads cloned sstables from leaving replica into pending one.
auto& table = _db.local().find_column_family(tablet.table);
auto& sstm = table.get_sstables_manager();
auto op = table.stream_in_progress();
dht::auto_refreshing_sharder sharder(table.shared_from_this());
std::unordered_set<sstables::shared_sstable> ssts;
for (auto&& sst_desc : d) {
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (ssts.contains(loading_sst)) {
auto cfg = sstm.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
}
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
co_await utils::get_local_injector().inject("block_tablet_streaming", [this, &tablet] (auto& handler) -> future<> {
const auto keyspace = handler.get("keyspace");
const auto table = handler.get("table");
SCYLLA_ASSERT(keyspace);
SCYLLA_ASSERT(table);
auto s = _db.local().find_column_family(tablet.table).schema();
bool should_block = s->ks_name() == *keyspace && s->cf_name() == *table;
while (should_block && !handler.poll_for_message() && !_async_gate.is_closed()) {
co_await sleep(std::chrono::milliseconds(100));
}
});
co_await do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at streaming", tablet));
}
auto topo_guard = trinfo->session_id;
if (!trinfo->session_id) {
throw std::runtime_error(fmt::format("Tablet {} session is not set", tablet));
}
auto pending_replica = trinfo->pending_replica;
if (!pending_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no pending replica", tablet));
}
if (pending_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has pending replica different than this one", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
std::optional<locator::tablet_replica> leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
locator::tablet_migration_streaming_info streaming_info = get_migration_streaming_info(tm->get_topology(), tinfo, *trinfo);
locator::tablet_replica_set read_from{streaming_info.read_from.begin(), streaming_info.read_from.end()};
if (trinfo->transition == locator::tablet_transition_kind::rebuild_v2) {
auto nearest_hosts = read_from | std::views::transform([] (const auto& tr) {
return tr.host;
}) | std::ranges::to<host_id_vector_replica_set>();
tm->get_topology().sort_by_proximity(trinfo->pending_replica->host, nearest_hosts);
if (!nearest_hosts.empty()) {
auto it = std::find_if(read_from.begin(), read_from.end(), [nearest_host = nearest_hosts[0]] (const auto& tr) { return tr.host == nearest_host; });
if (it == read_from.end()) {
on_internal_error(slogger, "Nearest replica not found");
}
read_from = { *it };
} else {
read_from = {};
}
}
streaming::stream_reason reason = std::invoke([&] {
switch (trinfo->transition) {
case locator::tablet_transition_kind::migration: return streaming::stream_reason::tablet_migration;
case locator::tablet_transition_kind::intranode_migration: return streaming::stream_reason::tablet_migration;
case locator::tablet_transition_kind::rebuild: return streaming::stream_reason::rebuild;
case locator::tablet_transition_kind::rebuild_v2: return streaming::stream_reason::rebuild;
default:
throw std::runtime_error(fmt::format("stream_tablet(): Invalid tablet transition: {}", trinfo->transition));
}
});
if (trinfo->transition != locator::tablet_transition_kind::intranode_migration && _feature_service.file_stream && _db.local().get_config().enable_file_stream()) {
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
rtlogger.info("migration_streaming_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
auto dst_node = trinfo->pending_replica->host;
auto dst_shard_id = trinfo->pending_replica->shard;
auto transition = trinfo->transition;
// Release token_metadata_ptr early so it will no block barriers for other migrations
// Don't access trinfo after this.
tm = {};
co_await utils::get_local_injector().inject("stream_sstable_files", [&] (auto& handler) -> future<> {
slogger.info("stream_sstable_files: waiting");
while (!handler.poll_for_message()) {
co_await sleep_abortable(std::chrono::milliseconds(5), guard.get_abort_source());
}
slogger.info("stream_sstable_files: released");
});
for (auto src : read_from) {
// Use file stream for tablet to stream data
auto ops_id = streaming::file_stream_id::create_random_id();
auto start_time = std::chrono::steady_clock::now();
size_t stream_bytes = 0;
try {
auto& table = _db.local().find_column_family(tablet.table);
slogger.debug("stream_sstables[{}] Streaming for tablet {} of {} started table={}.{} range={} src={}",
ops_id, transition, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, src);
auto resp = co_await streaming::tablet_stream_files(ops_id, table, range, src.host, dst_node, dst_shard_id, _messaging.local(), _abort_source, topo_guard);
stream_bytes = resp.stream_bytes;
slogger.debug("stream_sstables[{}] Streaming for tablet migration of {} successful", ops_id, tablet);
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
auto bw = utils::pretty_printed_throughput(stream_bytes, duration);;
slogger.info("stream_sstables[{}] Streaming for tablet migration of {} finished table={}.{} range={} stream_bytes={} stream_time={} stream_bw={}",
ops_id, tablet, table.schema()->ks_name(), table.schema()->cf_name(), range, stream_bytes, duration, bw);
} catch (...) {
slogger.warn("stream_sstables[{}] Streaming for tablet migration of {} from {} failed: {}", ops_id, tablet, leaving_replica, std::current_exception());
throw;
}
}
} else { // Caution: following code is intentionally unindented to be in sync with OSS
if (trinfo->transition == locator::tablet_transition_kind::intranode_migration) {
if (!leaving_replica || leaving_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Invalid leaving replica for intra-node migration, tablet: {}, leaving: {}",
tablet, leaving_replica));
}
tm = nullptr;
co_await utils::get_local_injector().inject("intranode_migration_streaming_wait", [this] (auto& handler) -> future<> {
rtlogger.info("intranode_migration_streaming: waiting");
while (!handler.poll_for_message() && !_async_gate.is_closed()) {
co_await sleep(std::chrono::milliseconds(5));
}
rtlogger.info("intranode_migration_streaming: released");
});
rtlogger.info("Starting intra-node streaming of tablet {} from shard {} to {}", tablet, leaving_replica->shard, pending_replica->shard);
co_await clone_locally_tablet_storage(tablet, *leaving_replica, *pending_replica);
rtlogger.info("Finished intra-node streaming of tablet {} from shard {} to {}", tablet, leaving_replica->shard, pending_replica->shard);
} else {
if (leaving_replica && leaving_replica->host == tm->get_my_id()) {
throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}",
tablet, leaving_replica->shard, trinfo->pending_replica->shard));
}
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
rtlogger.info("migration_streaming_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto my_id = tm->get_my_id();
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm),
guard.get_abort_source(),
my_id, _snitch.local()->get_location(),
format("Tablet {}", trinfo->transition),
reason,
topo_guard,
std::move(tables));
tm = nullptr;
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));
std::unordered_map<locator::host_id, dht::token_range_vector> ranges_per_endpoint;
for (auto r: read_from) {
ranges_per_endpoint[r.host].emplace_back(range);
}
streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint));
slogger.debug("Streaming for tablet migration of {} started table={}.{} range={}", tablet, table.schema()->ks_name(), table.schema()->cf_name(), range);
co_await streamer->stream_async();
slogger.info("Streaming for tablet migration of {} finished table={}.{} range={}", tablet, table.schema()->ks_name(), table.schema()->cf_name(), range);
}
} // Traditional streaming vs file-based streaming.
// If new pending tablet replica needs splitting, streaming waits for it to complete.
// That's to provide a guarantee that once migration is over, the coordinator can finalize
// splitting under the promise that compaction groups of tablets are all split, ready
// for the subsequent topology change.
//
// FIXME:
// We could do the splitting not in the streaming stage, but in a later stage, so that
// from the tablet scheduler's perspective migrations blocked on compaction are not
// participating in streaming anymore (which is true), so it could schedule more
// migrations. This way compaction would run in parallel with streaming which can
// reduce the delay.
co_await _db.invoke_on(pending_replica->shard, [tablet] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
return table.maybe_split_compaction_group_of(tablet.tablet);
});
co_await utils::get_local_injector().inject("pause_after_streaming_tablet", [] (auto& handler) {
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(1));
});
co_return tablet_operation_result();
});
}
future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
utils::get_local_injector().inject("cleanup_tablet_crash", [] {
slogger.info("Crashing tablet cleanup");
_exit(1);
});
co_await do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
shard_id shard;
{
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this cleanup was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(fmt::format("No transition info for tablet {}", tablet));
}
if (trinfo->stage == locator::tablet_transition_stage::cleanup) {
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
std::optional<locator::tablet_replica> leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (!leaving_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no leaving replica", tablet));
}
if (leaving_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has leaving replica different than this one", tablet));
}
shard = leaving_replica->shard;
} else if (trinfo->stage == locator::tablet_transition_stage::cleanup_target) {
if (!trinfo->pending_replica) {
throw std::runtime_error(fmt::format("Tablet {} has no pending replica", tablet));
}
if (trinfo->pending_replica->host != tm->get_my_id()) {
throw std::runtime_error(fmt::format("Tablet {} has pending replica different than this one", tablet));
}
shard = trinfo->pending_replica->shard;
} else {
throw std::runtime_error(fmt::format("Tablet {} stage is not at cleanup/cleanup_target", tablet));
}
}
co_await _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks, &vbw = _view_building_worker] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
vbw.local().cleanup_staging_sstables(table.get_effective_replication_map(), tablet.table, tablet.tablet);
return table.cleanup_tablet(db, sys_ks.local(), tablet.tablet);
});
co_return tablet_operation_result();
});
}
static bool increases_replicas_per_rack(const locator::topology& topology, const locator::tablet_info& tinfo, sstring dst_rack) {
std::unordered_map<sstring, size_t> m;
for (auto& replica: tinfo.replicas) {
m[topology.get_rack(replica.host)]++;
}
auto max = *std::ranges::max_element(m | std::views::values);
return m[dst_rack] + 1 > max;
}
future<service::group0_guard> storage_service::get_guard_for_tablet_update() {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
co_return guard;
}
future<bool> storage_service::exec_tablet_update(service::group0_guard guard, utils::chunked_vector<canonical_mutation> updates, sstring reason) {
rtlogger.info("{}", reason);
rtlogger.trace("do update {} reason {}", updates, reason);
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
.set_version(_topology_state_machine._topology.version + 1)
.build());
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
co_return true;
} catch (group0_concurrent_modification&) {
rtlogger.debug("exec_tablet_update(): concurrent modification, retrying");
}
co_return false;
}
replica::tablet_mutation_builder storage_service::tablet_mutation_builder_for_base_table(api::timestamp_type ts, table_id table) {
auto base_table = get_token_metadata_ptr()->tablets().get_base_table(table);
return replica::tablet_mutation_builder(ts, base_table);
}
// Repair the tablets contain the tokens and wait for the repair to finish
// This is used to run a manual repair requested by user from the restful API.
future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant,
std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion, locator::tablet_repair_incremental_mode incremental_mode) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.add_repair_tablet_request(table, std::move(tokens_variant), std::move(hosts_filter), std::move(dcs_filter), await_completion, incremental_mode);
});
}
bool all_tokens = std::holds_alternative<all_tokens_tag>(tokens_variant);
utils::chunked_vector<dht::token> tokens;
if (!all_tokens) {
tokens = std::get<utils::chunked_vector<dht::token>>(tokens_variant);
}
if (!_feature_service.tablet_repair_scheduler) {
throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet");
}
auto repair_task_info = locator::tablet_task_info::make_user_repair_request(hosts_filter, dcs_filter, incremental_mode);
auto res = std::unordered_map<sstring, sstring>{{sstring("tablet_task_id"), repair_task_info.tablet_task_id.to_sstring()}};
auto start = std::chrono::steady_clock::now();
slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} hosts_filter={} dcs_filter={} incremental_mode={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, hosts_filter, dcs_filter, incremental_mode);
while (true) {
auto guard = co_await get_guard_for_tablet_update();
// we don't allow requesting repair on tablets of colocated tables, because the repair task info
// is stored on the base table's tablet map which is shared by all the tables that are colocated with it.
// we don't have a way currently to store the repair task info for a specific colocated table.
// repair can only be requested for the base table, and this will repair the base table's tablets
// and all its colocated tablets as well.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot set repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be made only on the base table. "
"Repairing the base table will also repair all tables colocated with it.",
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
utils::chunked_vector<canonical_mutation> updates;
if (all_tokens) {
tokens.clear();
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) -> future<> {
auto last_token = tmap.get_last_token(tid);
tokens.push_back(last_token);
co_return;
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto& req_id = tinfo.repair_task_info.tablet_task_id;
if (req_id) {
throw std::runtime_error(fmt::format("Tablet {} is already in repair by tablet_task_id={}",
locator::global_tablet_id{table, tid}, req_id));
}
auto last_token = tmap.get_last_token(tid);
updates.emplace_back(
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);
if (co_await exec_tablet_update(std::move(guard), std::move(updates), std::move(reason))) {
break;
}
}
if (!await_completion) {
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
slogger.info("Issued tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} duration={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, duration);
co_return res;
}
co_await _topology_state_machine.event.wait([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return std::all_of(tokens.begin(), tokens.end(), [&] (const dht::token& token) {
auto id = tmap.get_tablet_id(token);
return tmap.get_tablet_info(id).repair_task_info.tablet_task_id != repair_task_info.tablet_task_id;
});
});
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
slogger.info("Finished tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} duration={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, duration);
co_return res;
}
// Delete a tablet repair request by the given tablet_task_id
future<> storage_service::del_repair_tablet_request(table_id table, locator::tablet_task_id tablet_task_id) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.del_repair_tablet_request(table, tablet_task_id);
});
}
if (!_feature_service.tablet_repair_scheduler) {
throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet");
}
slogger.info("Deleting tablet repair request by API request table_id={} tablet_task_id={}", table, tablet_task_id);
while (true) {
auto guard = co_await get_guard_for_tablet_update();
// see add_repair_tablet_request. repair requests can only be added on base tables.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot delete repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be added and deleted only on the base table.",
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
utils::chunked_vector<canonical_mutation> updates;
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) -> future<> {
auto& tinfo = tmap.get_tablet_info(tid);
auto& req_id = tinfo.repair_task_info.tablet_task_id;
if (req_id != tablet_task_id) {
co_return;
}
auto last_token = tmap.get_last_token(tid);
auto* trinfo = tmap.get_tablet_transition_info(tid);
auto update = tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.del_repair_task_info(last_token, _feature_service);
if (trinfo && trinfo->transition == locator::tablet_transition_kind::repair) {
update.del_session(last_token);
}
updates.emplace_back(update.build());
});
sstring reason = format("Deleting tablet repair request by API request tablet_id={} tablet_task_id={}", table, tablet_task_id);
if (co_await exec_tablet_update(std::move(guard), std::move(updates), std::move(reason))) {
break;
}
}
slogger.info("Deleted tablet repair request by API request table_id={} tablet_task_id={}", table, tablet_task_id);
}
future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.move_tablet(table, token, src, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
if (!locator::contains(tinfo.replicas, src)) {
throw std::runtime_error(seastar::format("Tablet {} has no replica on {}", gid, src));
}
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(seastar::format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(seastar::format("Host {} does not have shard {}", *node, dst.shard));
}
if (src == dst) {
sstring reason = format("No-op move of tablet {} to {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
}
if (src.host != dst.host && locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} has replica on {}", gid, dst.host));
}
auto src_dc_rack = get_token_metadata().get_topology().get_location(src.host);
auto dst_dc_rack = get_token_metadata().get_topology().get_location(dst.host);
if (src_dc_rack.dc != dst_dc_rack.dc) {
if (force) {
slogger.warn("Moving tablet {} between DCs ({} and {})", gid, src_dc_rack.dc, dst_dc_rack.dc);
} else {
throw std::runtime_error(fmt::format("Attempted to move tablet {} between DCs ({} and {})", gid, src_dc_rack.dc, dst_dc_rack.dc));
}
}
if (src_dc_rack.rack != dst_dc_rack.rack && increases_replicas_per_rack(get_token_metadata().get_topology(), tinfo, dst_dc_rack.rack)) {
if (force) {
slogger.warn("Moving tablet {} between racks ({} and {}) which reduces availability", gid, src_dc_rack.rack, dst_dc_rack.rack);
} else {
throw std::runtime_error(fmt::format("Attempted to move tablet {} between racks ({} and {}) which would reduce availability", gid, src_dc_rack.rack, dst_dc_rack.rack));
}
}
auto migration_task_info = src.host == dst.host ? locator::tablet_task_info::make_intranode_migration_request()
: locator::tablet_task_info::make_migration_request();
migration_task_info.sched_nr++;
migration_task_info.sched_time = db_clock::now();
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, src.host == dst.host ? locator::tablet_transition_kind::intranode_migration
: locator::tablet_transition_kind::migration)
.set_migration_task_info(last_token, std::move(migration_task_info), _feature_service)
.build());
if (_feature_service.view_building_coordinator) {
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, src, last_token);
}
sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<> storage_service::add_tablet_replica(table_id table, dht::token token, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.add_tablet_replica(table, token, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", *node, dst.shard));
}
if (locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} has replica on {}", gid, dst.host));
}
locator::tablet_replica_set new_replicas(tinfo.replicas);
new_replicas.push_back(dst);
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, new_replicas)
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
.build());
sstring reason = format("Adding replica to tablet {}, node {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<> storage_service::del_tablet_replica(table_id table, dht::token token, locator::tablet_replica dst, loosen_constraints force) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.del_tablet_replica(table, token, dst, force);
});
}
co_await transit_tablet(table, token, [=, this] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", *node, dst.shard));
}
if (!locator::contains(tinfo.replicas, dst.host)) {
throw std::runtime_error(fmt::format("Tablet {} doesn't have replica on {}", gid, dst.host));
}
locator::tablet_replica_set new_replicas;
new_replicas.reserve(tinfo.replicas.size() - 1);
std::copy_if(tinfo.replicas.begin(), tinfo.replicas.end(), std::back_inserter(new_replicas), [&dst] (auto r) { return r != dst; });
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_new_replicas(last_token, new_replicas)
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
.build());
if (_feature_service.view_building_coordinator) {
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, dst, last_token);
}
sstring reason = format("Removing replica from tablet {}, node {}", gid, dst);
return std::make_tuple(std::move(updates), std::move(reason));
});
}
future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables() {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// topology coordinator only exists in shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.load_stats_for_tablet_based_tables();
});
}
// Refresh is triggered after table creation, need to make sure we see the new tablets.
co_await _group0->group0_server().read_barrier(&_group0_as);
using table_ids_t = std::unordered_set<table_id>;
const auto table_ids = co_await std::invoke([this] () -> future<table_ids_t> {
table_ids_t ids;
co_await _db.local().get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr<replica::table> table) mutable {
if (table->uses_tablets()) {
ids.insert(id);
}
return make_ready_future<>();
});
co_return std::move(ids);
});
// Helps with intra-node migration by serializing with changes to token metadata, so shards
// participating in the migration will see migration in same stage, therefore preventing
// double accounting (anomaly) in the reported size.
auto tmlock = co_await get_token_metadata_lock();
const locator::host_id this_host = _db.local().get_token_metadata().get_my_id();
// Align to 64 bytes to avoid cache line ping-pong when updating size in map_reduce0() below
struct alignas(64) aligned_tablet_size {
uint64_t size = 0;
};
std::vector<aligned_tablet_size> tablet_sizes_per_shard(smp::count);
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
// So if there are 1k nodes, there will be 1k RPCs in total.
auto load_stats = co_await _db.map_reduce0([&table_ids, &this_host, &tablet_sizes_per_shard] (replica::database& db) -> future<locator::load_stats> {
locator::load_stats load_stats{};
auto& tables_metadata = db.get_tables_metadata();
for (const auto& id : table_ids) {
auto table = tables_metadata.get_table_if_exists(id);
if (!table) {
continue;
}
auto erm = table->get_effective_replication_map();
auto& token_metadata = erm->get_token_metadata();
auto me = locator::tablet_replica { token_metadata.get_my_id(), this_shard_id() };
// It's important to tackle the anomaly in reported size, since both leaving and
// pending replicas could otherwise be accounted during tablet migration.
// If transition hasn't reached cleanup stage, then leaving replicas are accounted.
// If transition is past cleanup stage, then pending replicas are accounted.
// This helps to reduce the discrepancy window.
auto tablet_filter = [&me] (const locator::tablet_map& tmap, locator::global_tablet_id id) {
auto transition = tmap.get_tablet_transition_info(id.tablet);
auto& info = tmap.get_tablet_info(id.tablet);
// if tablet is not in transit, it's filtered in.
if (!transition) {
return true;
}
bool is_pending = transition->pending_replica == me;
bool is_leaving = locator::get_leaving_replica(info, *transition) == me;
auto s = transition->reads; // read selector
return (!is_pending && !is_leaving)
|| (is_leaving && s == locator::read_replica_set_selector::previous)
|| (is_pending && s == locator::read_replica_set_selector::next);
};
locator::combined_load_stats combined_ls { table->table_load_stats(tablet_filter) };
load_stats.tables.emplace(id, std::move(combined_ls.table_ls));
tablet_sizes_per_shard[this_shard_id()].size += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
co_await coroutine::maybe_yield();
}
co_return std::move(load_stats);
}, locator::load_stats{}, std::plus<locator::load_stats>());
load_stats.capacity[this_host] = _disk_space_monitor->space().capacity;
load_stats.critical_disk_utilization[this_host] = _disk_space_monitor->disk_utilization() > _db.local().get_config().critical_disk_utilization_level();
const std::filesystem::space_info si = _disk_space_monitor->space();
load_stats.capacity[this_host] = si.capacity;
locator::tablet_load_stats& tls = load_stats.tablet_stats[this_host];
const uint64_t config_capacity = _db.local().get_config().data_file_capacity();
if (config_capacity != 0) {
tls.effective_capacity = config_capacity;
} else {
uint64_t sum_tablet_sizes = 0;
for (const auto& ts : tablet_sizes_per_shard) {
sum_tablet_sizes += ts.size;
}
tls.effective_capacity = si.available + sum_tablet_sizes;
}
utils::get_local_injector().inject("clear_tablet_stats_in_load_stats", [&] {
load_stats.tablet_stats.erase(this_host);
});
co_return std::move(load_stats);
}
future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) {
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
bool topology_busy;
while ((topology_busy = _topology_state_machine._topology.is_busy())) {
const auto tstate = *_topology_state_machine._topology.tstate;
if (tstate == topology::transition_state::tablet_draining ||
tstate == topology::transition_state::tablet_migration) {
break;
}
rtlogger.debug("transit_tablet(): topology state machine is busy: {}", tstate);
release_guard(std::move(guard));
co_await _topology_state_machine.event.when();
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
auto tid = tmap.get_tablet_id(token);
if (tmap.get_tablet_transition_info(tid)) {
throw std::runtime_error(fmt::format("Tablet {} is in transition", locator::global_tablet_id{table, tid}));
}
auto [ updates, reason ] = prepare_mutations(tmap, guard.write_timestamp());
rtlogger.info("{}", reason);
rtlogger.trace("do update {} reason {}", updates, reason);
{
topology_mutation_builder builder(guard.write_timestamp());
if (topology_busy) {
rtlogger.debug("transit_tablet({}): topology busy, keeping transition state", locator::global_tablet_id{table, tid});
} else {
builder.set_transition_state(topology::transition_state::tablet_migration);
}
builder.set_version(_topology_state_machine._topology.version + 1);
updates.push_back(builder.build());
}
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("transit_tablet(): concurrent modification, retrying");
}
}
// Wait for transition to finish.
co_await _topology_state_machine.event.when([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token));
});
}
future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.set_tablet_balancing_enabled(enabled);
});
}
utils::UUID request_id;
auto reason = format("Setting tablet balancing to {}", enabled);
while (true) {
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(
topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(enabled)
.build()));
if (!enabled
&& _feature_service.topology_noop_request && _feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
updates.push_back(canonical_mutation(
topology_mutation_builder(guard.write_timestamp())
.queue_global_topology_request_id(request_id)
.build()));
updates.push_back(canonical_mutation(
topology_request_tracking_mutation_builder(request_id, _feature_service.topology_requests_type_column)
.set("done", false)
.set("request_type", global_topology_request::noop_request)
.build()));
}
rtlogger.info("{}", reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
}
}
if (request_id) {
co_await wait_for_topology_request_completion(request_id);
} else if (!enabled) {
while (_topology_state_machine._topology.is_busy()) {
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
co_await _topology_state_machine.event.when();
}
}
}
future<> storage_service::await_topology_quiesced() {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_await container().invoke_on(0, [&] (auto& ss) {
return ss.await_topology_quiesced();
});
co_return;
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_await _topology_state_machine.await_not_busy();
}
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.verify_topology_quiesced(expected_version);
});
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
rtlogger.info("received request to join from host_id: {}", params.host_id);
// Sanity check. We should already be using raft topology changes because
// the node asked us via join_node_query about which node to use and
// we responded that they should use raft. We cannot go back from raft
// to legacy (unless we switch to recovery between handling join_node_query
// and join_node_request, which is extremely unlikely).
check_ability_to_perform_topology_operation("join");
if (params.cluster_name != _db.local().get_config().cluster_name()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Cluster name check failed. This node cannot join the cluster "
"because it expected cluster name \"{}\" and not \"{}\"",
params.cluster_name,
_db.local().get_config().cluster_name()),
};
co_return result;
}
if (params.snitch_name != _db.local().get_snitch_name()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Snitch name check failed. This node cannot join the cluster "
"because it uses \"{}\" and not \"{}\"",
params.snitch_name,
_db.local().get_snitch_name()),
};
co_return result;
}
co_await _topology_state_machine.event.when([this] {
// The first node defines the cluster and inserts its entry to the
// `system.topology` without checking anything. It is possible that the
// `join_node_request_handler` fires before the first node sets itself
// as a normal node, therefore we might need to wait until that happens,
// here. If we didn't do it, the topology coordinator could handle the
// joining node as the first one and skip the necessary join node
// handshake.
return !_topology_state_machine._topology.normal_nodes.empty();
});
auto& g0_server = _group0->group0_server();
auto g0_holder = _group0->hold_group0_gate();
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
// There is a peculiar case that can happen if the leader is killed
// and then replaced very quickly:
//
// - Cluster with nodes `A`, `B`, `C` - `A` is the topology
// coordinator/group0 leader,
// - `A` is killed,
// - New node `D` attempts to replace `A` with the same IP as `A`,
// sends `join_node_request` rpc to node `B`,
// - Node `B` handles the RPC and wants to perform group0 operation
// and wants to perform a barrier - still thinks that `A`
// is the leader and is alive, sends an RPC to its IP,
// - `D` accidentally receives the request that was meant to `A`
// but throws an exception because of host_id mismatch,
// - Failure is propagated back to `B`, and then to `D` - and `D`
// fails the replace operation.
//
// We can try to detect if this failure might happen: if the new node
// is going to replace but the ID of the replaced node is the same
// as the leader, wait for a short while until a reelection happens.
// If replaced ID == leader ID, then this indicates either the situation
// above or an operator error (actually trying to replace a live node).
const auto timeout = std::chrono::seconds(10);
rtlogger.warn("the node {} which was requested to be"
" replaced has the same ID as the current group 0 leader ({});"
" this looks like an attempt to join a node with the same IP"
" as a leader which might have just crashed; waiting for"
" a reelection",
params.host_id, g0_server.current_leader());
abort_source as;
timer<lowres_clock> t;
t.set_callback([&as] {
as.request_abort();
});
t.arm(timeout);
try {
while (!g0_server.current_leader() || *params.replaced_id == g0_server.current_leader()) {
// FIXME: Wait for the next term instead of sleeping in a loop
// Waiting for state change is not enough because a new leader
// might be chosen without us going through the candidate state.
co_await sleep_abortable(std::chrono::milliseconds(100), as);
}
} catch (abort_requested_exception&) {
rtlogger.warn("the node {} tries to replace the"
" current leader {} but the leader didn't change within"
" {}s. Rejecting the node",
params.host_id,
*params.replaced_id,
std::chrono::duration_cast<std::chrono::seconds>(timeout).count());
result.result = join_node_request_result::rejected{
.reason = format(
"It is only allowed to replace dead nodes, however the"
" node that was requested to be replaced is still seen"
" as the group0 leader after {}s, which indicates that"
" it might be still alive. You are either trying to replace"
" a live node or trying to replace a node very quickly"
" after it went down and reelection didn't happen within"
" the timeout. Refusing to continue",
std::chrono::duration_cast<std::chrono::seconds>(timeout).count()),
};
co_return result;
}
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
const auto& rs = p->second;
if (rs.state == node_state::left) {
rtlogger.warn("the node {} attempted to join",
" but it was removed from the cluster. Rejecting"
" the node",
params.host_id);
result.result = join_node_request_result::rejected{
.reason = "The node has already been removed from the cluster",
};
} else {
rtlogger.warn("the node {} attempted to join",
" again after an unfinished attempt but it is no longer"
" allowed to do so. Rejecting the node",
params.host_id);
result.result = join_node_request_result::rejected{
.reason = "The node requested to join before but didn't finish the procedure. "
"Please clear the data directory and restart.",
};
}
co_return result;
}
utils::UUID old_request_id;
if (params.replaced_id) {
auto rhid = locator::host_id{params.replaced_id->uuid()};
if (is_me(rhid) || _gossiper.is_alive(rhid)) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("tried to replace alive node {}", *params.replaced_id),
};
co_return result;
}
auto replaced_it = _topology_state_machine._topology.normal_nodes.find(*params.replaced_id);
if (replaced_it == _topology_state_machine._topology.normal_nodes.end()) {
result.result = join_node_request_result::rejected{
.reason = ::format("Cannot replace node {} because it is not in the 'normal' state", *params.replaced_id),
};
co_return result;
}
if (replaced_it->second.datacenter != params.datacenter || replaced_it->second.rack != params.rack) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace node in {}/{} with node in {}/{}", replaced_it->second.datacenter, replaced_it->second.rack, params.datacenter, params.rack),
};
co_return result;
}
auto is_zero_token = params.num_tokens == 0 && params.tokens_string.empty();
if (replaced_it->second.ring.value().tokens.empty() && !is_zero_token) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace the zero-token node {} with a token-owning node", *params.replaced_id),
};
co_return result;
}
if (!replaced_it->second.ring.value().tokens.empty() && is_zero_token) {
result.result = join_node_request_result::rejected{
.reason = fmt::format("Cannot replace the token-owning node {} with a zero-token node", *params.replaced_id),
};
co_return result;
}
if (_topology_state_machine._topology.requests.find(*params.replaced_id) != _topology_state_machine._topology.requests.end()) {
old_request_id = replaced_it->second.request_id;
}
}
auto mutation = build_mutation_from_join_params(params, guard.write_timestamp(), old_request_id);
topology_change change{{std::move(mutation)}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
format("raft topology: placing join request for {}", params.host_id));
co_await utils::get_local_injector().inject("join-node-before-add-entry", utils::wait_for_message(5min));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("join_node_request: concurrent operation is detected, retrying.");
}
}
rtlogger.info("placed join request for {}", params.host_id);
// Success
result.result = join_node_request_result::ok {};
co_return result;
}
future<join_node_response_result> storage_service::join_node_response_handler(join_node_response_params params) {
SCYLLA_ASSERT(this_shard_id() == 0);
// Usually this handler will only run once, but there are some cases where we might get more than one RPC,
// possibly happening at the same time, e.g.:
//
// - Another node becomes the topology coordinator while the old one waits for the RPC,
// - Topology coordinator finished the RPC but failed to update the group 0 state.
// Serialize handling the responses.
auto lock = co_await get_units(_join_node_response_handler_mutex, 1);
// Wait until we sent and completed the join_node_request RPC
co_await _join_node_request_done.get_shared_future(_group0_as);
if (_join_node_response_done.available()) {
// We already handled this RPC. No need to retry it.
rtlogger.info("the node got join_node_response RPC for the second time, ignoring");
if (std::holds_alternative<join_node_response_params::accepted>(params.response)
&& _join_node_response_done.failed()) {
// The topology coordinator accepted the node that was rejected before or failed while handling
// the response. Inform the coordinator about it so it moves the node to the left state.
co_await coroutine::return_exception_ptr(_join_node_response_done.get_shared_future().get_exception());
}
co_return join_node_response_result{};
}
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
_gossiper.get_mutable_address_map().force_drop_expiring_entries();
}
try {
co_return co_await std::visit(overloaded_functor {
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", utils::wait_for_message(5min));
// Do a read barrier to read/initialize the topology state
co_await _group0->group0_server_with_timeouts().read_barrier(&_group0_as, raft_timeout{});
// Calculate nodes to ignore
// TODO: ignore_dead_nodes setting for bootstrap
std::unordered_set<raft::server_id> ignored_ids = _topology_state_machine._topology.ignored_nodes;
auto my_request_it =
_topology_state_machine._topology.req_param.find(_group0->load_my_id());
if (my_request_it != _topology_state_machine._topology.req_param.end()) {
if (auto* replace = std::get_if<service::replace_param>(&my_request_it->second)) {
ignored_ids.insert(replace->replaced_id);
}
}
// After this RPC finishes, repair or streaming will be run, and
// both of them require this node to see the normal nodes as UP.
// This condition might not be true yet as this information is
// propagated through gossip. In order to reduce the chance of
// repair/streaming failure, wait here until we see normal nodes
// as UP (or the timeout elapses).
auto sync_nodes = _topology_state_machine._topology.normal_nodes | std::views::keys
| std::ranges::views::filter([ignored_ids] (raft::server_id id) { return !ignored_ids.contains(id); })
| std::views::transform([] (raft::server_id id) { return locator::host_id{id.uuid()}; })
| std::ranges::to<std::vector<locator::host_id>>();
rtlogger.info("coordinator accepted request to join, "
"waiting for nodes {} to be alive before responding and continuing",
sync_nodes);
co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout);
rtlogger.info("nodes {} are alive", sync_nodes);
// Unblock waiting join_node_rpc_handshaker::post_server_start,
// which will start the raft server and continue
_join_node_response_done.set_value();
co_return join_node_response_result{};
},
[&] (const join_node_response_params::rejected& rej) -> future<join_node_response_result> {
auto eptr = std::make_exception_ptr(std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason)));
_join_node_response_done.set_exception(std::move(eptr));
co_return join_node_response_result{};
},
}, params.response);
} catch (...) {
auto eptr = std::current_exception();
rtlogger.warn("error while handling the join response from the topology coordinator. "
"The node will not join the cluster. Error: {}", eptr);
_join_node_response_done.set_exception(std::move(eptr));
throw;
}
}
future<utils::chunked_vector<canonical_mutation>> storage_service::get_system_mutations(schema_ptr schema) {
utils::chunked_vector<canonical_mutation> result;
auto rs = co_await db::system_keyspace::query_mutations(_db, schema);
result.reserve(rs->partitions().size());
for (const auto& p : rs->partitions()) {
result.emplace_back(co_await make_canonical_mutation_gently(co_await unfreeze_gently(p.mut(), schema)));
}
co_return result;
}
future<utils::chunked_vector<canonical_mutation>> storage_service::get_system_mutations(const sstring& ks_name, const sstring& cf_name) {
auto s = _db.local().find_schema(ks_name, cf_name);
return get_system_mutations(s);
}
node_state storage_service::get_node_state(locator::host_id id) {
if (this_shard_id() != 0) {
on_internal_error(rtlogger, "cannot access node state on non zero shard");
}
auto rid = raft::server_id{id.uuid()};
if (!_topology_state_machine._topology.contains(rid)) {
on_internal_error(rtlogger, format("unknown node {}", rid));
}
auto p = _topology_state_machine._topology.find(rid);
if (!p) {
return node_state::left;
}
return p->second.state;
}
void storage_service::init_messaging_service() {
ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
std::optional<locator::host_id> coordinator_host_id;
if (const auto* id = cinfo.retrieve_auxiliary_opt<locator::host_id>("host_id")) {
coordinator_host_id = *id;
}
return container().invoke_on(0, [coordinator, coordinator_host_id, req = std::move(req)] (auto& ss) mutable {
return ss.node_ops_cmd_handler(coordinator, coordinator_host_id, std::move(req));
});
});
auto handle_raft_rpc = [this] (raft::server_id dst_id, auto handler) {
return container().invoke_on(0, [dst_id, handler = std::move(handler)] (auto& ss) mutable {
if (!ss._group0 || !ss._group0->joined_group0()) {
throw std::runtime_error("The node did not join group 0 yet");
}
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
}
return handler(ss);
});
};
ser::streaming_rpc_verbs::register_tablet_stream_files(&_messaging.local(),
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
streaming::stream_files_response resp;
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
co_return res.stream_bytes;
},
size_t(0),
std::plus<size_t>());
co_return resp;
});
ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) {
return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd");
return ss.raft_topology_cmd_handler(term, cmd_index, cmd);
});
});
ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future<raft_snapshot> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot");
utils::chunked_vector<canonical_mutation> mutations;
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(ss._abort_source);
// We may need to send additional raft-based tables to the requester that
// are not indicated in the parameter.
// For example, when a node joins, it requests a snapshot before it knows
// which features are enabled, so it doesn't know yet if these tables exist
// on other nodes.
// In the current "legacy" mode we assume the requesting node sends 2 RPCs - one for
// topology tables and one for auth tables, service levels, and additional tables.
// When we detect it's the second RPC, we add additional tables based on our feature flags.
// In the future we want to deprecate this parameter, so this condition should
// apply only for "legacy" snapshot pull RPCs.
std::vector<table_id> additional_tables;
if (params.tables.size() > 0 && params.tables[0] != db::system_keyspace::topology()->id()) {
if (ss._feature_service.view_build_status_on_group0) {
additional_tables.push_back(db::system_keyspace::view_build_status_v2()->id());
}
if (ss._feature_service.compression_dicts) {
additional_tables.push_back(db::system_keyspace::dicts()->id());
}
if (ss._feature_service.view_building_coordinator) {
additional_tables.push_back(db::system_keyspace::view_building_tasks()->id());
}
if (ss._feature_service.cdc_with_tablets) {
additional_tables.push_back(db::system_keyspace::cdc_streams_state()->id());
additional_tables.push_back(db::system_keyspace::cdc_streams_history()->id());
}
if (ss._feature_service.client_routes) {
additional_tables.push_back(db::system_keyspace::client_routes()->id());
}
}
for (const auto& table : boost::join(params.tables, additional_tables)) {
auto schema = ss._db.local().find_schema(table);
auto muts = co_await ss.get_system_mutations(schema);
if (table == db::system_keyspace::cdc_generations_v3()->id()) {
utils::get_local_injector().inject("cdc_generation_mutations_topology_snapshot_replication",
[target_size=ss._db.local().schema_commitlog()->max_record_size() * 2, &muts] {
// Copy mutations n times, where n is picked so that the memory size of all mutations
// together exceeds `schema_commitlog()->max_record_size()`.
// We multiply by two to account for all possible deltas (like segment::entry_overhead_size).
size_t current_size = 0;
for (const auto& m: muts) {
current_size += m.representation().size();
}
const auto number_of_copies = (target_size / current_size + 1) * 2;
muts.reserve(muts.size() * number_of_copies);
const auto it_begin = muts.begin();
const auto it_end = muts.end();
for (unsigned i = 0; i < number_of_copies; ++i) {
std::copy(it_begin, it_end, std::back_inserter(muts));
}
});
}
mutations.reserve(mutations.size() + muts.size());
std::move(muts.begin(), muts.end(), std::back_inserter(mutations));
}
auto sl_driver_created_mut = co_await ss._sys_ks.local().get_service_level_driver_created_mutation();
if (sl_driver_created_mut) {
mutations.push_back(canonical_mutation(*sl_driver_created_mut));
}
auto sl_version_mut = co_await ss._sys_ks.local().get_service_levels_version_mutation();
if (sl_version_mut) {
mutations.push_back(canonical_mutation(*sl_version_mut));
}
auto auth_version_mut = co_await ss._sys_ks.local().get_auth_version_mutation();
if (auth_version_mut) {
mutations.emplace_back(*auth_version_mut);
}
auto view_builder_version_mut = co_await ss._sys_ks.local().get_view_builder_version_mutation();
if (view_builder_version_mut) {
mutations.emplace_back(*view_builder_version_mut);
}
auto vb_processing_base_mut = co_await ss._sys_ks.local().get_view_building_processing_base_id_mutation();
if (vb_processing_base_mut) {
mutations.emplace_back(*vb_processing_base_mut);
}
co_return raft_snapshot{
.mutations = std::move(mutations),
};
});
});
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.stream_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_tablet_repair(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet, rpc::optional<service::session_id> session_id) {
return handle_raft_rpc(dst_id, [tablet, session_id = session_id.value_or(service::session_id::create_null_id())] (auto& ss) -> future<service::tablet_operation_repair_result> {
auto res = co_await ss.repair_tablet(tablet, session_id);
co_return res;
});
});
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.cleanup_tablet(tablet);
});
});
ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) {
return handle_raft_rpc(dst_id, [] (auto& ss) mutable {
return ss.load_stats_for_tablet_based_tables();
});
});
ser::storage_service_rpc_verbs::register_table_load_stats_v1(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) {
return handle_raft_rpc(dst_id, [] (auto& ss) mutable {
return ss.load_stats_for_tablet_based_tables().then([] (auto stats) {
return locator::load_stats_v1{ .tables = std::move(stats.tables) };
});
});
});
ser::storage_service_rpc_verbs::register_estimate_sstable_volume(&_messaging.local(), [this] (table_id t_id) -> future<uint64_t> {
co_return co_await _db.map_reduce0(seastar::coroutine::lambda([&] (replica::database& local_db) -> future<uint64_t> {
uint64_t result = 0;
auto& t = local_db.get_tables_metadata().get_table(t_id);
auto snap = co_await t.take_sstable_set_snapshot();
for (const auto& sst : snap) {
result += sst.get()->data_size();
}
co_return result;
}), uint64_t(0), std::plus());
});
ser::storage_service_rpc_verbs::register_sample_sstables(&_messaging.local(), [this] (table_id table, uint64_t chunk_size, uint64_t n_chunks) -> future<utils::chunked_vector<temporary_buffer<char>>> {
return _db.local().sample_data_files(table, chunk_size, n_chunks);
});
ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) {
return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request");
return ss.join_node_request_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) {
return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future<join_node_response_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_response");
co_await ss._join_node_group0_started.get_shared_future(ss._group0_as);
if (ss._group0->load_my_id() != dst_id) {
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
}
co_return co_await ss.join_node_response_handler(std::move(params));
});
});
ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) {
return handle_raft_rpc(dst_id, [] (auto& ss) -> future<join_node_query_result> {
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query");
auto result = join_node_query_result{
.topo_mode = join_node_query_result::topology_mode::raft
};
return make_ready_future<join_node_query_result>(std::move(result));
});
});
ser::join_node_rpc_verbs::register_notify_banned(&_messaging.local(), [this] (const rpc::client_info& cinfo, raft::server_id dst_id) {
auto src_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(0, [src_id, dst_id] (auto& ss) -> future<rpc::no_wait_type> {
if (ss.my_host_id() != locator::host_id{dst_id.uuid()}) {
rtlogger.warn("received notify_banned from {} for {}, but my id is {}, ignoring", src_id, dst_id, ss.my_host_id());
} else if (ss._topology_state_machine._topology.tstate != topology::transition_state::left_token_ring &&
!ss._topology_state_machine._topology.left_nodes.contains(dst_id) && !ss._group0_as.abort_requested()) {
// Ignore rpc if the node is already shutting down or during decommissioning because
// the node is expected to shut itself down after being banned.
rtlogger.info("received notification of being banned from the cluster from {}, terminating.", src_id);
_exit(0);
}
co_return rpc::no_wait_type{};
});
});
}
future<> storage_service::uninit_messaging_service() {
return when_all_succeed(
ser::node_ops_rpc_verbs::unregister(&_messaging.local()),
ser::storage_service_rpc_verbs::unregister(&_messaging.local()),
ser::join_node_rpc_verbs::unregister(&_messaging.local()),
ser::streaming_rpc_verbs::unregister_tablet_stream_files(&_messaging.local())
).discard_result();
}
void storage_service::do_isolate_on_error(disk_error type)
{
if (!std::exchange(_isolated, true)) {
slogger.error("Shutting down communications due to I/O errors until operator intervention: {} error: {}", type == disk_error::commit ? "Commitlog" : "Disk", std::current_exception());
// isolated protect us against multiple stops on _this_ shard
//FIXME: discarded future.
(void)isolate();
}
}
future<> storage_service::isolate() {
auto src_shard = this_shard_id();
// this invokes on shard 0. So if we get here _from_ shard 0,
// we _should_ do the stop. If we call from another shard, we
// should test-and-set again to avoid double shutdown.
return run_with_no_api_lock([src_shard] (storage_service& ss) {
// check again to ensure secondary shard does not race
if (src_shard == this_shard_id() || !std::exchange(ss._isolated, true)) {
return ss.stop_transport();
}
return make_ready_future<>();
});
}
future<sstring> storage_service::get_removal_status() {
return run_with_no_api_lock([] (storage_service& ss) {
return make_ready_future<sstring>(sstring("No token removals in process."));
});
}
future<> storage_service::force_remove_completion() {
return make_exception_future<>(std::runtime_error("The unsafe nodetool removenode force is not supported anymore"));
}
future<dht::token_range_vector>
storage_service::get_ranges_for_endpoint(const locator::effective_replication_map& erm, const locator::host_id& ep) const {
return erm.get_ranges(ep);
}
// Caller is responsible to hold token_metadata valid until the returned future is resolved
future<dht::token_range_vector>
storage_service::get_all_ranges(const std::vector<token>& sorted_tokens) const {
if (sorted_tokens.empty())
co_return dht::token_range_vector();
int size = sorted_tokens.size();
dht::token_range_vector ranges;
ranges.reserve(size + 1);
ranges.push_back(dht::token_range::make_ending_with(interval_bound<token>(sorted_tokens[0], true)));
co_await coroutine::maybe_yield();
for (int i = 1; i < size; ++i) {
dht::token_range r(wrapping_interval<token>::bound(sorted_tokens[i - 1], false), wrapping_interval<token>::bound(sorted_tokens[i], true));
ranges.push_back(r);
co_await coroutine::maybe_yield();
}
ranges.push_back(dht::token_range::make_starting_with(interval_bound<token>(sorted_tokens[size-1], false)));
co_return ranges;
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const {
auto& table = _db.local().find_column_family(keyspace, cf);
const auto schema = table.schema();
auto pk = partition_key::from_nodetool_style_string(schema, key);
return get_natural_endpoints(keyspace, schema, table, pk);
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const std::vector<sstring>& key_components) const {
auto& table = _db.local().find_column_family(keyspace, cf);
const auto schema = table.schema();
auto pk = partition_key::from_string_components(schema, key_components);
return get_natural_endpoints(keyspace, schema, table, pk);
}
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const {
dht::token token = schema->get_partitioner().get_token(*schema, pk.view());
const auto& ks = _db.local().find_keyspace(keyspace);
host_id_vector_replica_set replicas;
if (ks.uses_tablets()) {
replicas = cf.get_effective_replication_map()->get_natural_replicas(token);
} else {
replicas = ks.get_static_effective_replication_map()->get_natural_replicas(token);
}
return replicas | std::views::transform([&] (locator::host_id id) { return _address_map.get(id); }) | std::ranges::to<inet_address_vector_replica_set>();
}
future<> endpoint_lifecycle_notifier::notify_down(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_down(endpoint, hid);
} catch (...) {
slogger.warn("Down notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_down(inet_address endpoint, locator::host_id hid) {
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
ss._messaging.local().remove_rpc_client(netw::msg_addr{endpoint, 0}, hid);
return ss._lifecycle_notifier.notify_down(endpoint, hid);
});
slogger.debug("Notify node {}/{} has been down", endpoint, hid);
}
future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_leave_cluster(endpoint, hid);
} catch (...) {
slogger.warn("Leave cluster notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> endpoint_lifecycle_notifier::notify_released(locator::host_id hid) {
return seastar::async([this, hid] {
_subscribers.thread_for_each([hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_released(hid);
} catch (...) {
slogger.warn("Node released notification failed {}: {}", hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_left(inet_address endpoint, locator::host_id hid) {
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_left(endpoint, hid);
});
slogger.debug("Notify node {} has left the cluster", endpoint);
}
future<> storage_service::notify_released(locator::host_id hid) {
co_await container().invoke_on_all([hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_released(hid);
});
slogger.debug("Notify node {} been released from the cluster and no longer owns any tokens", hid);
}
future<> endpoint_lifecycle_notifier::notify_up(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_up(endpoint, hid);
} catch (...) {
slogger.warn("Up notification failed {}/{}: {}", endpoint, hid, std::current_exception());
}
});
});
}
future<> storage_service::notify_up(inet_address endpoint, locator::host_id hid) {
if (!_gossiper.is_cql_ready(hid) || !_gossiper.is_alive(hid)) {
co_return;
}
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_up(endpoint, hid);
});
slogger.debug("Notify node {}/{} has been up", endpoint, hid);
}
future<> endpoint_lifecycle_notifier::notify_joined(gms::inet_address endpoint, locator::host_id hid) {
return seastar::async([this, endpoint, hid] {
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_join_cluster(endpoint, hid);
} catch (...) {
slogger.warn("Join cluster notification failed {}/{}: {}", endpoint, hid,std::current_exception());
}
});
});
}
future<> endpoint_lifecycle_notifier::notify_client_routes_change(const client_routes_service::client_route_keys& client_route_keys) {
co_await seastar::async([this, &client_route_keys] {
_subscribers.thread_for_each([&client_route_keys] (endpoint_lifecycle_subscriber* subscriber) {
try {
subscriber->on_client_routes_change(client_route_keys);
} catch (...) {
slogger.warn("Client routes notification failed: {}", std::current_exception());
}
});
});
}
future<> storage_service::notify_joined(inet_address endpoint, locator::host_id hid) {
co_await utils::get_local_injector().inject(
"storage_service_notify_joined_sleep", std::chrono::milliseconds{500});
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
return ss._lifecycle_notifier.notify_joined(endpoint, hid);
});
slogger.debug("Notify node {}/{} has joined the cluster", endpoint, hid);
}
future<> storage_service::remove_rpc_client_with_ignored_topology(inet_address endpoint, locator::host_id id) {
return container().invoke_on_all([endpoint, id] (auto&& ss) {
ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0}, id);
});
}
future<> storage_service::notify_cql_change(inet_address endpoint, locator::host_id hid, bool ready) {
if (ready) {
co_await notify_up(endpoint, hid);
} else {
co_await notify_down(endpoint, hid);
}
}
future<> storage_service::notify_client_routes_change(const client_routes_service::client_route_keys& client_route_keys) {
co_await _client_routes.local().notify_client_routes_change(client_route_keys);
}
bool storage_service::is_normal_state_handled_on_boot(locator::host_id node) {
return _normal_state_handled_on_boot.contains(node);
}
storage_service::topology_change_kind storage_service::upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const {
switch (upgrade_state) {
case topology::upgrade_state_type::done:
return topology_change_kind::raft;
case topology::upgrade_state_type::not_upgraded:
// Did not start upgrading to raft topology yet - use legacy
return topology_change_kind::legacy;
default:
// Upgrade is in progress - disallow topology operations
return topology_change_kind::upgrading_to_raft;
}
}
future<bool> storage_service::is_vnodes_cleanup_allowed(sstring keyspace) {
return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) {
const auto my_id = ss.get_token_metadata().get_my_id();
const auto pending_ranges = ss._db.local().find_keyspace(keyspace).get_static_effective_replication_map()->has_pending_ranges(my_id);
const bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP;
slogger.debug("is_vnodes_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}",
keyspace, is_bootstrap_mode, pending_ranges);
return !is_bootstrap_mode && !pending_ranges;
});
}
bool storage_service::is_repair_based_node_ops_enabled(streaming::stream_reason reason) {
static const std::unordered_map<sstring, streaming::stream_reason> reason_map{
{"replace", streaming::stream_reason::replace},
{"bootstrap", streaming::stream_reason::bootstrap},
{"decommission", streaming::stream_reason::decommission},
{"removenode", streaming::stream_reason::removenode},
{"rebuild", streaming::stream_reason::rebuild},
};
const sstring& enabled_list_str = _db.local().get_config().allowed_repair_based_node_ops();
std::vector<sstring> enabled_list = utils::split_comma_separated_list(enabled_list_str);
std::unordered_set<streaming::stream_reason> enabled_set;
for (const sstring& op : enabled_list) {
try {
auto it = reason_map.find(op);
if (it != reason_map.end()) {
enabled_set.insert(it->second);
} else {
throw std::invalid_argument(::format("unsupported operation name: {}", op));
}
} catch (...) {
throw std::invalid_argument(::format("Failed to parse allowed_repair_based_node_ops parameter [{}]: {}",
enabled_list_str, std::current_exception()));
}
}
bool global_enabled = _db.local().get_config().enable_repair_based_node_ops();
slogger.info("enable_repair_based_node_ops={}, allowed_repair_based_node_ops={{{}}}", global_enabled, fmt::join(enabled_set, ", "));
return global_enabled && enabled_set.contains(reason);
}
future<> storage_service::start_maintenance_mode() {
set_mode(mode::MAINTENANCE);
return mutate_token_metadata([this] (mutable_token_metadata_ptr token_metadata) -> future<> {
token_metadata->update_topology(my_host_id(), _snitch.local()->get_location(), locator::node::state::normal, smp::count);
return token_metadata->update_normal_tokens({ dht::token{} }, my_host_id());
}, acquire_merge_lock::yes);
}
void storage_service::set_topology_change_kind(topology_change_kind kind) {
_topology_change_kind_enabled = kind;
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
}
bool storage_service::raft_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::raft;
}
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
_protocol_servers.push_back(&server);
if (start_instantly) {
co_await server.start_server();
}
}
std::vector<table_id> storage_service::get_tables_with_cdc_tablet_streams() const {
return _cdc_gens.local().get_cdc_metadata().get_tables_with_cdc_tablet_streams();
}
future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f) {
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
}
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
}
} // namespace service