Files
scylladb/locator/abstract_replication_strategy.cc
Aleksandra Martyniuk 761ace4f05 config: add enforce_rack_list option
Add enforce_rack_list option. When the option is set to true,
all tablet keyspaces have rack list replication factor.

When the option is on:
- CREATE STATEMENT always auto-extends rf to rack lists;
- ALTER STATEMENT fails when there is numeric rf in any DC.

The flag is set to false by default and a node needs to be restarted
in order to change its value. Starting a node with enforce_rack_list
option will fail, if there are any tablet keyspaces with numeric rf
in any DC.

enforce_rack_list is a per-node option and a user needs to ensure
that no tablet keyspace is altered or created while nodes in
the cluster don't have the consistent value.
2026-01-20 09:58:51 +01:00

777 lines
32 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablet_replication_strategy.hh"
#include "locator/local_strategy.hh"
#include "utils/class_registrator.hh"
#include "exceptions/exceptions.hh"
#include <fmt/ranges.h>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "replica/database.hh"
#include "utils/stall_free.hh"
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include <variant>
namespace locator {
using ring_mapping_underlying_type = boost::icl::interval_map<token, std::unordered_set<locator::host_id>>;
struct ring_mapping_impl {
ring_mapping_underlying_type map;
};
ring_mapping::ring_mapping()
: _impl(std::make_unique<ring_mapping_impl>()) {
}
ring_mapping::ring_mapping(ring_mapping&&) noexcept = default;
ring_mapping&
ring_mapping::operator=(ring_mapping&&) noexcept = default;
ring_mapping::~ring_mapping() = default;
auto* ring_mapping::operator->() {
return &_impl->map;
}
auto* ring_mapping::operator->() const {
return &std::as_const(_impl->map);
}
auto& ring_mapping::operator*() {
return _impl->map;
}
auto& ring_mapping::operator*() const {
return std::as_const(_impl->map);
}
logging::logger rslogger("replication_strategy");
abstract_replication_strategy::abstract_replication_strategy(
replication_strategy_params params,
replication_strategy_type my_type)
: _config_options(params.options)
, _my_type(my_type) {
if (params.initial_tablets.has_value()) {
_uses_tablets = true;
}
}
abstract_replication_strategy::ptr_type abstract_replication_strategy::create_replication_strategy(const sstring& strategy_name, replication_strategy_params params, const locator::topology* topo) {
try {
return create_object<abstract_replication_strategy, replication_strategy_params, const locator::topology*>(strategy_name, std::move(params), std::move(topo));
} catch (const no_such_class& e) {
throw exceptions::configuration_exception(e.what());
}
}
// class registry signature must match the actual replication strategies' signature
using strategy_class_registry = class_registry<
locator::abstract_replication_strategy,
replication_strategy_params,
const topology*>;
sstring abstract_replication_strategy::to_qualified_class_name(std::string_view strategy_class_name) {
return strategy_class_registry::to_qualified_class_name(strategy_class_name);
}
static const std::unordered_set<locator::host_id>* find_token(const ring_mapping& ring_mapping, const token& token) {
if (ring_mapping->empty()) {
return nullptr;
}
const auto interval = token_metadata::range_to_interval(wrapping_interval<dht::token>(token));
const auto it = ring_mapping->find(interval);
return it != ring_mapping->end() ? &it->second : nullptr;
}
host_id_vector_topology_change vnode_effective_replication_map::get_pending_replicas(const token& search_token) const {
const auto* pending_endpoints = find_token(_pending_endpoints, search_token);
if (!pending_endpoints) {
return host_id_vector_topology_change{};
}
return *pending_endpoints | std::ranges::to<host_id_vector_topology_change>();
}
host_id_vector_replica_set vnode_effective_replication_map::get_replicas_for_reading(const token& token, bool is_vnode) const {
const auto* endpoints = find_token(_read_endpoints, token);
if (endpoints == nullptr) {
return get_natural_replicas(token, is_vnode);
}
return *endpoints | std::ranges::to<host_id_vector_replica_set>();
}
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token& token) const {
return {};
}
bool vnode_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const {
for (const auto& item : *_pending_endpoints) {
const auto& nodes = item.second;
if (nodes.contains(endpoint)) {
return true;
}
}
return false;
}
std::unordered_set<locator::host_id> vnode_effective_replication_map::get_all_pending_nodes() const {
std::unordered_set<locator::host_id> endpoints;
for (const auto& item : *_pending_endpoints) {
endpoints.insert(item.second.begin(), item.second.end());
}
return endpoints;
}
std::unique_ptr<token_range_splitter> vnode_effective_replication_map::make_splitter() const {
return locator::make_splitter(_tmptr);
}
const dht::sharder& vnode_effective_replication_map::get_sharder(const schema& s) const {
return s.get_sharder();
}
const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const {
if (!_per_table) {
return nullptr;
}
return dynamic_cast<const per_table_replication_strategy*>(this);
}
const tablet_aware_replication_strategy* abstract_replication_strategy::maybe_as_tablet_aware() const {
if (!_uses_tablets) {
return nullptr;
}
return dynamic_cast<const tablet_aware_replication_strategy*>(this);
}
size_t replication_factor_data::parse(const sstring& rf) {
if (rf.empty() || std::any_of(rf.begin(), rf.end(), [] (char c) {return !isdigit(c);})) {
throw exceptions::configuration_exception(
format("Replication factor must be numeric and non-negative, found '{}'", rf));
}
try {
return std::stol(rf);
} catch (...) {
throw exceptions::configuration_exception(
sstring("Replication factor must be numeric; found ") + rf);
}
}
replication_factor_data abstract_replication_strategy::parse_replication_factor(const replication_strategy_config_option& rf)
{
return replication_factor_data(rf);
}
size_t get_replication_factor(const replication_strategy_config_option& opt) {
return replication_factor_data(opt).count();
}
bool uses_rack_list_exclusively(const replication_strategy_config_options& opts) {
for (auto& [_, val] : opts) {
if (!std::holds_alternative<rack_list>(val)) {
return false;
}
}
return true;
}
replication_factor_data::replication_factor_data(const replication_strategy_config_option& rf) {
std::visit(overloaded_functor {
[&] (const sstring& rf) {
auto rf_value = parse(rf);
_data.emplace<size_t>(rf_value);
_count = rf_value;
},
[&] (const std::vector<sstring>& racks) {
_data.emplace<std::vector<sstring>>(racks);
_count = racks.size();
}
}, rf);
}
void replication_factor_data::validate(const std::unordered_set<sstring>& allowed_racks) {
std::visit(overloaded_functor {
[&] (const size_t& rf) {},
[&] (const std::vector<sstring>& racks) {
for (const auto& rack : racks) {
if (!allowed_racks.contains(rack)) {
throw exceptions::configuration_exception(
fmt::format("Unrecognized rack name '{}'. allowed_racks={}", rack, allowed_racks));
}
}
}
}, _data);
}
rack_diff diff_racks(const rack_list& old_racks, const rack_list& new_racks) {
std::set<sstring> old_racks_set(old_racks.begin(), old_racks.end());
std::set<sstring> new_racks_set(new_racks.begin(), new_racks.end());
rack_diff diff;
std::ranges::set_difference(new_racks_set, old_racks_set, std::back_inserter(diff.added));
std::ranges::set_difference(old_racks_set, new_racks_set, std::back_inserter(diff.removed));
return diff;
}
static
void
insert_token_range_to_sorted_container_while_unwrapping(
const dht::token& prev_tok,
const dht::token& tok,
dht::token_range_vector& ret) {
if (prev_tok < tok) {
auto pos = ret.end();
if (!ret.empty() && !std::prev(pos)->end()) {
// We inserted a wrapped range (a, b] previously as
// (-inf, b], (a, +inf). So now we insert in the next-to-last
// position to keep the last range (a, +inf) at the end.
pos = std::prev(pos);
}
ret.insert(pos,
dht::token_range{
dht::token_range::bound(prev_tok, false),
dht::token_range::bound(tok, true)});
} else {
ret.emplace_back(
dht::token_range::bound(prev_tok, false),
std::nullopt);
// Insert in front to maintain sorded order
ret.emplace(
ret.begin(),
std::nullopt,
dht::token_range::bound(tok, true));
}
}
future<dht::token_range_vector>
vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const host_id&)> consider_range_for_endpoint) const {
dht::token_range_vector ret;
const auto& tm = *_tmptr;
const auto& sorted_tokens = tm.sorted_tokens();
if (sorted_tokens.empty()) {
on_internal_error(rslogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
bool add_range = false;
for_each_natural_endpoint_until(tok, [&] (const host_id& ep) {
return consider_range_for_endpoint(add_range, ep);
});
if (add_range) {
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
}
prev_tok = tok;
co_await coroutine::maybe_yield();
}
co_return ret;
}
future<dht::token_range_vector>
vnode_effective_replication_map::get_ranges(host_id ep) const {
// The callback function below is called for each endpoint
// in each token natural endpoints.
// Add the range if `ep` is found in the token's natural endpoints
return do_get_ranges([ep] (bool& add_range, const host_id& e) {
if ((add_range = (e == ep))) {
// stop iteration a match is found
return stop_iteration::yes;
}
return stop_iteration::no;
});
}
// Caller must ensure that token_metadata will not change throughout the call.
future<dht::token_range_vector>
abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const {
co_return co_await get_ranges(ep, *tmptr);
}
// Caller must ensure that token_metadata will not change throughout the call.
future<dht::token_range_vector>
abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const {
dht::token_range_vector ret;
if (!tm.is_normal_token_owner(ep)) {
co_return ret;
}
const auto& sorted_tokens = tm.sorted_tokens();
if (sorted_tokens.empty()) {
on_internal_error(rslogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (auto tok : sorted_tokens) {
bool should_add = false;
if (get_type() == replication_strategy_type::everywhere_topology) {
// everywhere_topology deserves this special fast path.
// Using the common path would make the function quadratic in the number of endpoints.
should_add = true;
} else {
auto eps = co_await calculate_natural_endpoints(tok, tm);
should_add = eps.contains(ep);
}
if (should_add) {
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
}
prev_tok = tok;
}
co_return ret;
}
future<dht::token_range_vector>
vnode_effective_replication_map::get_primary_ranges(locator::host_id ep) const {
// The callback function below is called for each endpoint
// in each token natural endpoints.
// Add the range if `ep` is the primary replica in the token's natural endpoints.
// The primary replica is first in the natural endpoints list.
return do_get_ranges([ep] (bool& add_range, const locator::host_id& e) {
add_range = (e == ep);
// stop the iteration once the first node was considered.
return stop_iteration::yes;
});
}
future<dht::token_range_vector>
vnode_effective_replication_map::get_primary_ranges_within_dc(locator::host_id ep) const {
const topology& topo = _tmptr->get_topology();
sstring local_dc = topo.get_datacenter(ep);
std::unordered_set<locator::host_id> local_dc_nodes = _tmptr->get_datacenter_token_owners().at(local_dc);
// The callback function below is called for each endpoint
// in each token natural endpoints.
// Add the range if `ep` is the datacenter primary replica in the token's natural endpoints.
// The primary replica in each datacenter is determined by the natural endpoints list order.
return do_get_ranges([ep, local_dc_nodes = std::move(local_dc_nodes)] (bool& add_range, const locator::host_id& e) {
// Unlike get_primary_ranges() which checks if ep is the first
// owner of this range, here we check if ep is the first just
// among nodes which belong to the local dc of ep.
if (!local_dc_nodes.contains(e)) {
return stop_iteration::no;
}
add_range = (e == ep);
// stop the iteration once the first node contained the local datacenter was considered.
return stop_iteration::yes;
});
}
future<std::unordered_map<dht::token_range, host_id_vector_replica_set>>
vnode_effective_replication_map::get_range_host_ids() const {
const token_metadata& tm = *_tmptr;
std::unordered_map<dht::token_range, host_id_vector_replica_set> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
for (auto& r : ranges) {
ret.emplace(r, do_get_replicas(t, true));
}
co_await coroutine::maybe_yield();
}
co_return ret;
}
future<std::unordered_map<dht::token_range, host_id_vector_replica_set>>
abstract_replication_strategy::get_range_host_ids(const token_metadata& tm) const {
std::unordered_map<dht::token_range, host_id_vector_replica_set> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
auto eps = co_await calculate_natural_endpoints(t, tm);
for (auto& r : ranges) {
ret.emplace(r, eps.get_vector());
}
}
co_return ret;
}
future<dht::token_range_vector>
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const {
dht::token_range_vector ret;
auto temp = co_await tmptr->clone_only_token_map();
temp.update_topology(pending_address, std::move(dr));
co_await temp.update_normal_tokens(pending_tokens, pending_address);
for (const auto& t : temp.sorted_tokens()) {
auto eps = co_await calculate_natural_endpoints(t, temp);
if (eps.contains(pending_address)) {
dht::token_range_vector r = temp.get_primary_ranges_for(t);
rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address);
ret.insert(ret.end(), r.begin(), r.end());
}
}
co_await temp.clear_gently();
co_return ret;
}
static const auto default_replication_map_key = dht::token::from_int64(0);
future<mutable_static_effective_replication_map_ptr> calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
replication_map replication_map;
ring_mapping pending_endpoints;
ring_mapping read_endpoints;
std::unordered_set<locator::host_id> dirty_endpoints;
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
const auto& sorted_tokens = tmptr->sorted_tokens();
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
const auto& all_tokens = topology_changes->all_tokens;
const auto& current_tokens = tmptr->get_token_to_endpoint();
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
co_await coroutine::maybe_yield();
const auto token = all_tokens[i];
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, *tmptr);
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata);
auto add_mapping = [&](ring_mapping& target, std::unordered_set<locator::host_id>&& endpoints) {
using interval = ring_mapping_underlying_type::interval_type;
if (!depend_on_token) {
*target += std::make_pair(
interval::open(dht::minimum_token(), dht::maximum_token()),
std::move(endpoints));
} else if (i == 0) {
*target += std::make_pair(
interval::open(all_tokens.back(), dht::maximum_token()),
endpoints);
*target += std::make_pair(
interval::left_open(dht::minimum_token(), token),
std::move(endpoints));
} else {
*target += std::make_pair(
interval::left_open(all_tokens[i - 1], token),
std::move(endpoints));
}
};
{
host_id_set endpoints_diff;
for (const auto& e: target_endpoints) {
if (!current_endpoints.contains(e)) {
endpoints_diff.insert(e);
}
}
if (!endpoints_diff.empty()) {
add_mapping(pending_endpoints, std::move(endpoints_diff).extract_set());
}
}
// If an endpoint is in target endpoints, but not in current endpoints it means
// it loses a range and becomes dirty
for (auto& h : current_endpoints) {
if (!target_endpoints.contains(h)) {
dirty_endpoints.emplace(h);
}
}
// in order not to waste memory, we update read_endpoints only if the
// new endpoints differs from the old one
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
}
if (!depend_on_token) {
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
break;
} else if (current_tokens.contains(token)) {
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
}
}
} else if (depend_on_token) {
for (const auto &t : sorted_tokens) {
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
replication_map.emplace(t, std::move(eps).extract_vector());
}
} else {
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
}
auto rf = rs->get_replication_factor(*tmptr);
co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map),
std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), rf);
}
future<mutable_static_effective_replication_map_ptr> vnode_effective_replication_map::clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const {
replication_map replication_map;
ring_mapping pending_endpoints;
ring_mapping read_endpoints;
std::unordered_set<locator::host_id> dirty_endpoints;
for (auto& i : _replication_map) {
replication_map.emplace(i.first, i.second);
co_await coroutine::maybe_yield();
}
for (const auto& i : *_pending_endpoints) {
*pending_endpoints += i;
co_await coroutine::maybe_yield();
}
for (const auto& i : *_read_endpoints) {
*read_endpoints += i;
co_await coroutine::maybe_yield();
}
// no need to yield while copying since this is bound by nodes, not vnodes
dirty_endpoints = _dirty_endpoints;
co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map),
std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), _replication_factor);
}
host_id_vector_replica_set vnode_effective_replication_map::do_get_replicas(const token& tok,
bool is_vnode) const
{
const token& key_token = _rs->natural_endpoints_depend_on_token()
? (is_vnode ? tok : _tmptr->first_token(tok))
: default_replication_map_key;
const auto it = _replication_map.find(key_token);
if (it == _replication_map.end()) {
on_internal_error(rslogger, format("Token {} not found in replication map: natural_endpoints_depend_on_token={} token={} vnode_token={}",
key_token, _rs->natural_endpoints_depend_on_token(), tok, _tmptr->first_token(tok)));
}
return it->second;
}
host_id_vector_replica_set vnode_effective_replication_map::get_replicas(const token& tok, bool is_vnode) const {
return do_get_replicas(tok, is_vnode);
}
host_id_vector_replica_set vnode_effective_replication_map::get_natural_replicas(const token& search_token, bool is_vnode) const {
return get_replicas(search_token, is_vnode);
}
stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function<stop_iteration(const host_id&)>& func) const {
for (const auto& ep : do_get_replicas(vnode_tok, true)) {
if (func(ep) == stop_iteration::yes) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
static_effective_replication_map::~static_effective_replication_map() {
if (is_registered()) {
_factory->erase_effective_replication_map(this);
}
}
vnode_effective_replication_map::~vnode_effective_replication_map() {
if (is_registered()) {
try {
_factory->submit_background_work(dispose_gently(std::move(_replication_map),
std::move(*_pending_endpoints),
std::move(*_read_endpoints),
std::move(_tmptr)));
} catch (...) {
// ignore
}
}
}
effective_replication_map::effective_replication_map(replication_strategy_ptr rs,
token_metadata_ptr tmptr,
size_t replication_factor) noexcept
: _rs(std::move(rs))
, _tmptr(std::move(tmptr))
, _replication_factor(replication_factor)
, _validity_abort_source(std::make_unique<abort_source>())
{ }
static_effective_replication_map::factory_key static_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr) {
return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version());
}
future<static_effective_replication_map_ptr> effective_replication_map_factory::create_static_effective_replication_map(replication_strategy_ptr rs, const token_metadata_ptr& tmptr) {
// lookup key on local shard
auto key = static_effective_replication_map::make_factory_key(rs, tmptr);
auto erm = find_effective_replication_map(key);
if (erm) {
rslogger.debug("create_static_effective_replication_map: found {} [{}]", key, fmt::ptr(erm.get()));
co_return erm;
}
mutable_static_effective_replication_map_ptr new_erm;
if (rs->is_local()) {
// Local replication strategy does not benefit from cloning across shards
// to save an expensive calculate function like `calculate_vnode_effective_replication_map`
new_erm = make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr));
} else {
// try to find a reference erm on shard 0
// TODO:
// - use hash of key to distribute the load
// - instaintiate only on NUMA nodes
auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future<foreign_ptr<static_effective_replication_map_ptr>> {
auto erm = ermf.find_effective_replication_map(key);
co_return make_foreign<static_effective_replication_map_ptr>(std::move(erm));
});
if (ref_erm) {
new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr));
} else {
new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr));
}
}
co_return insert_effective_replication_map(std::move(new_erm), std::move(key));
}
static_effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const static_effective_replication_map::factory_key& key) const {
auto it = _effective_replication_maps.find(key);
if (it != _effective_replication_maps.end()) {
return it->second->shared_from_this();
}
return {};
}
static_effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_static_effective_replication_map_ptr erm, static_effective_replication_map::factory_key key) {
auto [it, inserted] = _effective_replication_maps.insert({key, erm.get()});
if (inserted) {
rslogger.debug("insert_effective_replication_map: inserted {} [{}]", key, fmt::ptr(erm.get()));
erm->set_factory(*this, std::move(key));
return erm;
}
auto res = it->second->shared_from_this();
rslogger.debug("insert_effective_replication_map: found {} [{}]", key, fmt::ptr(res.get()));
return res;
}
bool effective_replication_map_factory::erase_effective_replication_map(static_effective_replication_map* erm) {
const auto& key = erm->get_factory_key();
auto it = _effective_replication_maps.find(key);
if (it == _effective_replication_maps.end()) {
rslogger.warn("Could not unregister effective_replication_map {} [{}]: key not found", key, fmt::ptr(erm));
return false;
}
if (it->second != erm) {
rslogger.warn("Could not unregister effective_replication_map {} [{}]: different instance [{}] is currently registered", key, fmt::ptr(erm), fmt::ptr(it->second));
return false;
}
rslogger.debug("erase_effective_replication_map: erased {} [{}]", key, fmt::ptr(erm));
_effective_replication_maps.erase(it);
return true;
}
future<> effective_replication_map_factory::stop() noexcept {
_stopped = true;
if (!_effective_replication_maps.empty()) {
for (auto it = _effective_replication_maps.begin(); it != _effective_replication_maps.end(); it = _effective_replication_maps.erase(it)) {
auto& [key, erm] = *it;
rslogger.debug("effective_replication_map_factory::stop found outstanding map {} [{}]", key, fmt::ptr(erm));
// unregister outstanding effective_replication_maps
// so they won't try to submit background work
// to gently clear their contents when they are destroyed.
erm->unregister();
}
// FIXME: reinstate the internal error
// when https://github.com/scylladb/scylla/issues/8995
// is fixed and shutdown order ensures that no outstanding maps
// are expected here.
// (see also https://github.com/scylladb/scylla/issues/9684)
// on_internal_error_noexcept(rslogger, "effective_replication_map_factory stopped with outstanding maps");
}
return std::exchange(_background_work, make_ready_future<>());
}
void effective_replication_map_factory::submit_background_work(future<> fut) {
if (fut.available() && !fut.failed()) {
return;
}
if (_stopped) {
on_internal_error(rslogger, "Cannot submit background work: registry already stopped");
}
_background_work = _background_work.then([fut = std::move(fut)] () mutable {
return std::move(fut).handle_exception([] (std::exception_ptr ex) {
// Ignore errors since we have nothing else to do about them.
rslogger.warn("effective_replication_map_factory background task failed: {}. Ignored.", std::move(ex));
});
});
}
future<> global_static_effective_replication_map::get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> {
// To ensure we get the same effective_replication_map
// on all shards, acquire the shared_token_metadata lock.
//
// As a sanity check compare the ring_version on each shard
// to the reference version on shard 0.
//
// This invariant is achieved by storage_service::mutate_token_metadata
// and storage_service::replicate_to_all_cores that first acquire the
// shared_token_metadata lock, then prepare a mutated token metadata
// that will have an incremented ring_version, use it to re-calculate
// all e_r_m:s and clone both on all shards. including the ring version,
// all under the lock.
auto lk = co_await db.get_shared_token_metadata().get_lock();
auto erm = db.find_keyspace(keyspace_name).get_static_effective_replication_map();
utils::get_local_injector().inject("get_keyspace_erms_throw_no_such_keyspace",
[&keyspace_name] { throw data_dictionary::no_such_keyspace{keyspace_name}; });
auto ring_version = erm->get_token_metadata().get_ring_version();
_erms[0] = make_foreign(std::move(erm));
co_await coroutine::parallel_for_each(std::views::iota(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> {
_erms[shard] = co_await sharded_db.invoke_on(shard, [keyspace_name, ring_version] (const replica::database& db) {
const auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_static_effective_replication_map();
auto local_ring_version = erm->get_token_metadata().get_ring_version();
if (local_ring_version != ring_version) {
on_internal_error(rslogger, format("Inconsistent effective_replication_map ring_verion {}, expected {}", local_ring_version, ring_version));
}
return make_foreign(std::move(erm));
});
});
});
}
future<global_static_effective_replication_map> make_global_static_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
global_static_effective_replication_map ret;
co_await ret.get_keyspace_erms(sharded_db, keyspace_name);
co_return ret;
}
} // namespace locator
auto fmt::formatter<locator::replication_strategy_type>::format(locator::replication_strategy_type t,
fmt::format_context& ctx) const -> decltype(ctx.out()) {
std::string_view name;
switch (t) {
using enum locator::replication_strategy_type;
case simple:
name = "simple";
break;
case local:
name = "local";
break;
case network_topology:
name = "network_topology";
break;
case everywhere_topology:
name = "everywhere_topology";
break;
};
return fmt::format_to(ctx.out(), "{}", name);
}
auto fmt::formatter<locator::static_effective_replication_map::factory_key>::format(const locator::static_effective_replication_map::factory_key& key,
fmt::format_context& ctx) const -> decltype(ctx.out()) {
auto out = fmt::format_to(ctx.out(), "{}.{}", key.rs_type, key.ring_version);
char sep = ':';
for (const auto& [opt, val] : key.rs_config_options) {
out = fmt::format_to(out, "{}{}={}", sep, opt, val);
sep = ',';
}
return out;
}
auto fmt::formatter<locator::replication_factor_data>::format(const locator::replication_factor_data& rf, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", rf.count());
}