/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "locator/abstract_replication_strategy.hh" #include "locator/tablet_replication_strategy.hh" #include "locator/local_strategy.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" #include #include #include #include #include "replica/database.hh" #include "utils/stall_free.hh" #include #include namespace locator { using ring_mapping_underlying_type = boost::icl::interval_map>; struct ring_mapping_impl { ring_mapping_underlying_type map; }; ring_mapping::ring_mapping() : _impl(std::make_unique()) { } ring_mapping::ring_mapping(ring_mapping&&) noexcept = default; ring_mapping& ring_mapping::operator=(ring_mapping&&) noexcept = default; ring_mapping::~ring_mapping() = default; auto* ring_mapping::operator->() { return &_impl->map; } auto* ring_mapping::operator->() const { return &std::as_const(_impl->map); } auto& ring_mapping::operator*() { return _impl->map; } auto& ring_mapping::operator*() const { return std::as_const(_impl->map); } logging::logger rslogger("replication_strategy"); abstract_replication_strategy::abstract_replication_strategy( replication_strategy_params params, replication_strategy_type my_type) : _config_options(params.options) , _my_type(my_type) { if (params.initial_tablets.has_value()) { _uses_tablets = true; } } abstract_replication_strategy::ptr_type abstract_replication_strategy::create_replication_strategy(const sstring& strategy_name, replication_strategy_params params, const locator::topology* topo) { try { return create_object(strategy_name, std::move(params), std::move(topo)); } catch (const no_such_class& e) { throw exceptions::configuration_exception(e.what()); } } // class registry signature must match the actual replication strategies' signature using strategy_class_registry = class_registry< locator::abstract_replication_strategy, replication_strategy_params, const topology*>; sstring abstract_replication_strategy::to_qualified_class_name(std::string_view strategy_class_name) { return strategy_class_registry::to_qualified_class_name(strategy_class_name); } static const std::unordered_set* find_token(const ring_mapping& ring_mapping, const token& token) { if (ring_mapping->empty()) { return nullptr; } const auto interval = token_metadata::range_to_interval(wrapping_interval(token)); const auto it = ring_mapping->find(interval); return it != ring_mapping->end() ? &it->second : nullptr; } host_id_vector_topology_change vnode_effective_replication_map::get_pending_replicas(const token& search_token) const { const auto* pending_endpoints = find_token(_pending_endpoints, search_token); if (!pending_endpoints) { return host_id_vector_topology_change{}; } return *pending_endpoints | std::ranges::to(); } host_id_vector_replica_set vnode_effective_replication_map::get_replicas_for_reading(const token& token, bool is_vnode) const { const auto* endpoints = find_token(_read_endpoints, token); if (endpoints == nullptr) { return get_natural_replicas(token, is_vnode); } return *endpoints | std::ranges::to(); } std::optional vnode_effective_replication_map::check_locality(const token& token) const { return {}; } bool vnode_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const { for (const auto& item : *_pending_endpoints) { const auto& nodes = item.second; if (nodes.contains(endpoint)) { return true; } } return false; } std::unordered_set vnode_effective_replication_map::get_all_pending_nodes() const { std::unordered_set endpoints; for (const auto& item : *_pending_endpoints) { endpoints.insert(item.second.begin(), item.second.end()); } return endpoints; } std::unique_ptr vnode_effective_replication_map::make_splitter() const { return locator::make_splitter(_tmptr); } const dht::sharder& vnode_effective_replication_map::get_sharder(const schema& s) const { return s.get_sharder(); } const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const { if (!_per_table) { return nullptr; } return dynamic_cast(this); } const tablet_aware_replication_strategy* abstract_replication_strategy::maybe_as_tablet_aware() const { if (!_uses_tablets) { return nullptr; } return dynamic_cast(this); } size_t replication_factor_data::parse(const sstring& rf) { if (rf.empty() || std::any_of(rf.begin(), rf.end(), [] (char c) {return !isdigit(c);})) { throw exceptions::configuration_exception( format("Replication factor must be numeric and non-negative, found '{}'", rf)); } try { return std::stol(rf); } catch (...) { throw exceptions::configuration_exception( sstring("Replication factor must be numeric; found ") + rf); } } replication_factor_data abstract_replication_strategy::parse_replication_factor(const replication_strategy_config_option& rf) { return replication_factor_data(rf); } size_t get_replication_factor(const replication_strategy_config_option& opt) { return replication_factor_data(opt).count(); } replication_factor_data::replication_factor_data(const replication_strategy_config_option& rf) { std::visit(overloaded_functor { [&] (const sstring& rf) { auto rf_value = parse(rf); _data.emplace(rf_value); _count = rf_value; }, [&] (const std::vector& racks) { _data.emplace>(racks); _count = racks.size(); } }, rf); } void replication_factor_data::validate(const std::unordered_set& allowed_racks) { std::visit(overloaded_functor { [&] (const size_t& rf) {}, [&] (const std::vector& racks) { for (const auto& rack : racks) { if (!allowed_racks.contains(rack)) { throw exceptions::configuration_exception( fmt::format("Unrecognized rack name '{}'. allowed_racks={}", rack, allowed_racks)); } } } }, _data); } rack_diff diff_racks(const rack_list& old_racks, const rack_list& new_racks) { std::set old_racks_set(old_racks.begin(), old_racks.end()); std::set new_racks_set(new_racks.begin(), new_racks.end()); rack_diff diff; std::ranges::set_difference(new_racks_set, old_racks_set, std::back_inserter(diff.added)); std::ranges::set_difference(old_racks_set, new_racks_set, std::back_inserter(diff.removed)); return diff; } static void insert_token_range_to_sorted_container_while_unwrapping( const dht::token& prev_tok, const dht::token& tok, dht::token_range_vector& ret) { if (prev_tok < tok) { auto pos = ret.end(); if (!ret.empty() && !std::prev(pos)->end()) { // We inserted a wrapped range (a, b] previously as // (-inf, b], (a, +inf). So now we insert in the next-to-last // position to keep the last range (a, +inf) at the end. pos = std::prev(pos); } ret.insert(pos, dht::token_range{ dht::token_range::bound(prev_tok, false), dht::token_range::bound(tok, true)}); } else { ret.emplace_back( dht::token_range::bound(prev_tok, false), std::nullopt); // Insert in front to maintain sorded order ret.emplace( ret.begin(), std::nullopt, dht::token_range::bound(tok, true)); } } future vnode_effective_replication_map::do_get_ranges(noncopyable_function consider_range_for_endpoint) const { dht::token_range_vector ret; const auto& tm = *_tmptr; const auto& sorted_tokens = tm.sorted_tokens(); if (sorted_tokens.empty()) { on_internal_error(rslogger, "Token metadata is empty"); } auto prev_tok = sorted_tokens.back(); for (const auto& tok : sorted_tokens) { bool add_range = false; for_each_natural_endpoint_until(tok, [&] (const host_id& ep) { return consider_range_for_endpoint(add_range, ep); }); if (add_range) { insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); } prev_tok = tok; co_await coroutine::maybe_yield(); } co_return ret; } future vnode_effective_replication_map::get_ranges(host_id ep) const { // The callback function below is called for each endpoint // in each token natural endpoints. // Add the range if `ep` is found in the token's natural endpoints return do_get_ranges([ep] (bool& add_range, const host_id& e) { if ((add_range = (e == ep))) { // stop iteration a match is found return stop_iteration::yes; } return stop_iteration::no; }); } // Caller must ensure that token_metadata will not change throughout the call. future abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const { co_return co_await get_ranges(ep, *tmptr); } // Caller must ensure that token_metadata will not change throughout the call. future abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const { dht::token_range_vector ret; if (!tm.is_normal_token_owner(ep)) { co_return ret; } const auto& sorted_tokens = tm.sorted_tokens(); if (sorted_tokens.empty()) { on_internal_error(rslogger, "Token metadata is empty"); } auto prev_tok = sorted_tokens.back(); for (auto tok : sorted_tokens) { bool should_add = false; if (get_type() == replication_strategy_type::everywhere_topology) { // everywhere_topology deserves this special fast path. // Using the common path would make the function quadratic in the number of endpoints. should_add = true; } else { auto eps = co_await calculate_natural_endpoints(tok, tm); should_add = eps.contains(ep); } if (should_add) { insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); } prev_tok = tok; } co_return ret; } future vnode_effective_replication_map::get_primary_ranges(locator::host_id ep) const { // The callback function below is called for each endpoint // in each token natural endpoints. // Add the range if `ep` is the primary replica in the token's natural endpoints. // The primary replica is first in the natural endpoints list. return do_get_ranges([ep] (bool& add_range, const locator::host_id& e) { add_range = (e == ep); // stop the iteration once the first node was considered. return stop_iteration::yes; }); } future vnode_effective_replication_map::get_primary_ranges_within_dc(locator::host_id ep) const { const topology& topo = _tmptr->get_topology(); sstring local_dc = topo.get_datacenter(ep); std::unordered_set local_dc_nodes = _tmptr->get_datacenter_token_owners().at(local_dc); // The callback function below is called for each endpoint // in each token natural endpoints. // Add the range if `ep` is the datacenter primary replica in the token's natural endpoints. // The primary replica in each datacenter is determined by the natural endpoints list order. return do_get_ranges([ep, local_dc_nodes = std::move(local_dc_nodes)] (bool& add_range, const locator::host_id& e) { // Unlike get_primary_ranges() which checks if ep is the first // owner of this range, here we check if ep is the first just // among nodes which belong to the local dc of ep. if (!local_dc_nodes.contains(e)) { return stop_iteration::no; } add_range = (e == ep); // stop the iteration once the first node contained the local datacenter was considered. return stop_iteration::yes; }); } future> vnode_effective_replication_map::get_range_host_ids() const { const token_metadata& tm = *_tmptr; std::unordered_map ret; for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); for (auto& r : ranges) { ret.emplace(r, do_get_replicas(t, true)); } co_await coroutine::maybe_yield(); } co_return ret; } future> abstract_replication_strategy::get_range_host_ids(const token_metadata& tm) const { std::unordered_map ret; for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); auto eps = co_await calculate_natural_endpoints(t, tm); for (auto& r : ranges) { ret.emplace(r, eps.get_vector()); } } co_return ret; } future abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const { dht::token_range_vector ret; auto temp = co_await tmptr->clone_only_token_map(); temp.update_topology(pending_address, std::move(dr)); co_await temp.update_normal_tokens(pending_tokens, pending_address); for (const auto& t : temp.sorted_tokens()) { auto eps = co_await calculate_natural_endpoints(t, temp); if (eps.contains(pending_address)) { dht::token_range_vector r = temp.get_primary_ranges_for(t); rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address); ret.insert(ret.end(), r.begin(), r.end()); } } co_await temp.clear_gently(); co_return ret; } static const auto default_replication_map_key = dht::token::from_int64(0); future calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; ring_mapping pending_endpoints; ring_mapping read_endpoints; std::unordered_set dirty_endpoints; const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) { const auto& all_tokens = topology_changes->all_tokens; const auto& current_tokens = tmptr->get_token_to_endpoint(); for (size_t i = 0, size = all_tokens.size(); i < size; ++i) { co_await coroutine::maybe_yield(); const auto token = all_tokens[i]; auto current_endpoints = co_await rs->calculate_natural_endpoints(token, *tmptr); auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata); auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { using interval = ring_mapping_underlying_type::interval_type; if (!depend_on_token) { *target += std::make_pair( interval::open(dht::minimum_token(), dht::maximum_token()), std::move(endpoints)); } else if (i == 0) { *target += std::make_pair( interval::open(all_tokens.back(), dht::maximum_token()), endpoints); *target += std::make_pair( interval::left_open(dht::minimum_token(), token), std::move(endpoints)); } else { *target += std::make_pair( interval::left_open(all_tokens[i - 1], token), std::move(endpoints)); } }; { host_id_set endpoints_diff; for (const auto& e: target_endpoints) { if (!current_endpoints.contains(e)) { endpoints_diff.insert(e); } } if (!endpoints_diff.empty()) { add_mapping(pending_endpoints, std::move(endpoints_diff).extract_set()); } } // If an endpoint is in target endpoints, but not in current endpoints it means // it loses a range and becomes dirty for (auto& h : current_endpoints) { if (!target_endpoints.contains(h)) { dirty_endpoints.emplace(h); } } // in order not to waste memory, we update read_endpoints only if the // new endpoints differs from the old one if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { add_mapping(read_endpoints, std::move(target_endpoints).extract_set()); } if (!depend_on_token) { replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector()); break; } else if (current_tokens.contains(token)) { replication_map.emplace(token, std::move(current_endpoints).extract_vector()); } } } else if (depend_on_token) { for (const auto &t : sorted_tokens) { auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); replication_map.emplace(t, std::move(eps).extract_vector()); } } else { auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector()); } auto rf = rs->get_replication_factor(*tmptr); co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map), std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), rf); } future vnode_effective_replication_map::clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const { replication_map replication_map; ring_mapping pending_endpoints; ring_mapping read_endpoints; std::unordered_set dirty_endpoints; for (auto& i : _replication_map) { replication_map.emplace(i.first, i.second); co_await coroutine::maybe_yield(); } for (const auto& i : *_pending_endpoints) { *pending_endpoints += i; co_await coroutine::maybe_yield(); } for (const auto& i : *_read_endpoints) { *read_endpoints += i; co_await coroutine::maybe_yield(); } // no need to yield while copying since this is bound by nodes, not vnodes dirty_endpoints = _dirty_endpoints; co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map), std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), _replication_factor); } host_id_vector_replica_set vnode_effective_replication_map::do_get_replicas(const token& tok, bool is_vnode) const { const token& key_token = _rs->natural_endpoints_depend_on_token() ? (is_vnode ? tok : _tmptr->first_token(tok)) : default_replication_map_key; const auto it = _replication_map.find(key_token); if (it == _replication_map.end()) { on_internal_error(rslogger, format("Token {} not found in replication map: natural_endpoints_depend_on_token={} token={} vnode_token={}", key_token, _rs->natural_endpoints_depend_on_token(), tok, _tmptr->first_token(tok))); } return it->second; } host_id_vector_replica_set vnode_effective_replication_map::get_replicas(const token& tok, bool is_vnode) const { return do_get_replicas(tok, is_vnode); } host_id_vector_replica_set vnode_effective_replication_map::get_natural_replicas(const token& search_token, bool is_vnode) const { return get_replicas(search_token, is_vnode); } stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function& func) const { for (const auto& ep : do_get_replicas(vnode_tok, true)) { if (func(ep) == stop_iteration::yes) { return stop_iteration::yes; } } return stop_iteration::no; } static_effective_replication_map::~static_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); } } vnode_effective_replication_map::~vnode_effective_replication_map() { if (is_registered()) { try { _factory->submit_background_work(dispose_gently(std::move(_replication_map), std::move(*_pending_endpoints), std::move(*_read_endpoints), std::move(_tmptr))); } catch (...) { // ignore } } } effective_replication_map::effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, size_t replication_factor) noexcept : _rs(std::move(rs)) , _tmptr(std::move(tmptr)) , _replication_factor(replication_factor) , _validity_abort_source(std::make_unique()) { } static_effective_replication_map::factory_key static_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr) { return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version()); } future effective_replication_map_factory::create_static_effective_replication_map(replication_strategy_ptr rs, const token_metadata_ptr& tmptr) { // lookup key on local shard auto key = static_effective_replication_map::make_factory_key(rs, tmptr); auto erm = find_effective_replication_map(key); if (erm) { rslogger.debug("create_static_effective_replication_map: found {} [{}]", key, fmt::ptr(erm.get())); co_return erm; } mutable_static_effective_replication_map_ptr new_erm; if (rs->is_local()) { // Local replication strategy does not benefit from cloning across shards // to save an expensive calculate function like `calculate_vnode_effective_replication_map` new_erm = make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr)); } else { // try to find a reference erm on shard 0 // TODO: // - use hash of key to distribute the load // - instaintiate only on NUMA nodes auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { auto erm = ermf.find_effective_replication_map(key); co_return make_foreign(std::move(erm)); }); if (ref_erm) { new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr)); } else { new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr)); } } co_return insert_effective_replication_map(std::move(new_erm), std::move(key)); } static_effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const static_effective_replication_map::factory_key& key) const { auto it = _effective_replication_maps.find(key); if (it != _effective_replication_maps.end()) { return it->second->shared_from_this(); } return {}; } static_effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_static_effective_replication_map_ptr erm, static_effective_replication_map::factory_key key) { auto [it, inserted] = _effective_replication_maps.insert({key, erm.get()}); if (inserted) { rslogger.debug("insert_effective_replication_map: inserted {} [{}]", key, fmt::ptr(erm.get())); erm->set_factory(*this, std::move(key)); return erm; } auto res = it->second->shared_from_this(); rslogger.debug("insert_effective_replication_map: found {} [{}]", key, fmt::ptr(res.get())); return res; } bool effective_replication_map_factory::erase_effective_replication_map(static_effective_replication_map* erm) { const auto& key = erm->get_factory_key(); auto it = _effective_replication_maps.find(key); if (it == _effective_replication_maps.end()) { rslogger.warn("Could not unregister effective_replication_map {} [{}]: key not found", key, fmt::ptr(erm)); return false; } if (it->second != erm) { rslogger.warn("Could not unregister effective_replication_map {} [{}]: different instance [{}] is currently registered", key, fmt::ptr(erm), fmt::ptr(it->second)); return false; } rslogger.debug("erase_effective_replication_map: erased {} [{}]", key, fmt::ptr(erm)); _effective_replication_maps.erase(it); return true; } future<> effective_replication_map_factory::stop() noexcept { _stopped = true; if (!_effective_replication_maps.empty()) { for (auto it = _effective_replication_maps.begin(); it != _effective_replication_maps.end(); it = _effective_replication_maps.erase(it)) { auto& [key, erm] = *it; rslogger.debug("effective_replication_map_factory::stop found outstanding map {} [{}]", key, fmt::ptr(erm)); // unregister outstanding effective_replication_maps // so they won't try to submit background work // to gently clear their contents when they are destroyed. erm->unregister(); } // FIXME: reinstate the internal error // when https://github.com/scylladb/scylla/issues/8995 // is fixed and shutdown order ensures that no outstanding maps // are expected here. // (see also https://github.com/scylladb/scylla/issues/9684) // on_internal_error_noexcept(rslogger, "effective_replication_map_factory stopped with outstanding maps"); } return std::exchange(_background_work, make_ready_future<>()); } void effective_replication_map_factory::submit_background_work(future<> fut) { if (fut.available() && !fut.failed()) { return; } if (_stopped) { on_internal_error(rslogger, "Cannot submit background work: registry already stopped"); } _background_work = _background_work.then([fut = std::move(fut)] () mutable { return std::move(fut).handle_exception([] (std::exception_ptr ex) { // Ignore errors since we have nothing else to do about them. rslogger.warn("effective_replication_map_factory background task failed: {}. Ignored.", std::move(ex)); }); }); } future<> global_static_effective_replication_map::get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name) { return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> { // To ensure we get the same effective_replication_map // on all shards, acquire the shared_token_metadata lock. // // As a sanity check compare the ring_version on each shard // to the reference version on shard 0. // // This invariant is achieved by storage_service::mutate_token_metadata // and storage_service::replicate_to_all_cores that first acquire the // shared_token_metadata lock, then prepare a mutated token metadata // that will have an incremented ring_version, use it to re-calculate // all e_r_m:s and clone both on all shards. including the ring version, // all under the lock. auto lk = co_await db.get_shared_token_metadata().get_lock(); auto erm = db.find_keyspace(keyspace_name).get_static_effective_replication_map(); utils::get_local_injector().inject("get_keyspace_erms_throw_no_such_keyspace", [&keyspace_name] { throw data_dictionary::no_such_keyspace{keyspace_name}; }); auto ring_version = erm->get_token_metadata().get_ring_version(); _erms[0] = make_foreign(std::move(erm)); co_await coroutine::parallel_for_each(std::views::iota(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> { _erms[shard] = co_await sharded_db.invoke_on(shard, [keyspace_name, ring_version] (const replica::database& db) { const auto& ks = db.find_keyspace(keyspace_name); auto erm = ks.get_static_effective_replication_map(); auto local_ring_version = erm->get_token_metadata().get_ring_version(); if (local_ring_version != ring_version) { on_internal_error(rslogger, format("Inconsistent effective_replication_map ring_verion {}, expected {}", local_ring_version, ring_version)); } return make_foreign(std::move(erm)); }); }); }); } future make_global_static_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name) { global_static_effective_replication_map ret; co_await ret.get_keyspace_erms(sharded_db, keyspace_name); co_return ret; } } // namespace locator auto fmt::formatter::format(locator::replication_strategy_type t, fmt::format_context& ctx) const -> decltype(ctx.out()) { std::string_view name; switch (t) { using enum locator::replication_strategy_type; case simple: name = "simple"; break; case local: name = "local"; break; case network_topology: name = "network_topology"; break; case everywhere_topology: name = "everywhere_topology"; break; }; return fmt::format_to(ctx.out(), "{}", name); } auto fmt::formatter::format(const locator::static_effective_replication_map::factory_key& key, fmt::format_context& ctx) const -> decltype(ctx.out()) { auto out = fmt::format_to(ctx.out(), "{}.{}", key.rs_type, key.ring_version); char sep = ':'; for (const auto& [opt, val] : key.rs_config_options) { out = fmt::format_to(out, "{}{}={}", sep, opt, val); sep = ','; } return out; } auto fmt::formatter::format(const locator::replication_factor_data& rf, fmt::format_context& ctx) const -> decltype(ctx.out()) { return fmt::format_to(ctx.out(), "{}", rf.count()); }