From 6dbbb80aaeb25fb78fc7e120f2debcea7feccd1e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 27 Jul 2025 16:51:10 +0300 Subject: [PATCH] locator: abstract_replication_strategy: implement local_replication_strategy Derive both vnode_effective_replication_map and local_effective_replication_map from static_effective_replication_map as both are static and per-keyspace. However, local_effective_replication_map does not need vnodes for the mapping of all tokens to the local node. Note that everywhere_replication_strategy is not abstracted in a similar way, although it could, since the plan is to get rid of it once all system keyspaces areconverted to local or tablets replication (and propagated everywhere if needed using raft group0) Signed-off-by: Benny Halevy --- alternator/ttl.cc | 10 +- dht/range_streamer.cc | 3 +- locator/abstract_replication_strategy.cc | 39 +++-- locator/abstract_replication_strategy.hh | 150 +++++++++++++++---- locator/local_strategy.cc | 62 ++++++++ locator/token_metadata.cc | 17 +++ repair/repair.cc | 11 +- replica/database.hh | 1 + service/storage_service.cc | 10 +- service/topology_coordinator.cc | 3 +- test/boost/network_topology_strategy_test.cc | 3 +- 11 files changed, 253 insertions(+), 56 deletions(-) diff --git a/alternator/ttl.cc b/alternator/ttl.cc index a70c8efb4a..685bef3201 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -769,10 +769,14 @@ static future scan_table( } } } else { // VNodes - locator::static_effective_replication_map_ptr erm = + locator::static_effective_replication_map_ptr ermp = db.real_database().find_keyspace(s->ks_name()).get_static_effective_replication_map(); + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); + if (!erm) { + on_internal_error(tlogger, format("Keyspace {} is local", s->ks_name())); + } auto my_host_id = erm->get_topology().my_host_id(); - token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm.get(), my_host_id)); + token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm, my_host_id)); while (std::optional range = my_ranges.next_partition_range()) { // Note that because of issue #9167 we need to run a separate // query on each partition range, and can't pass several of @@ -792,7 +796,7 @@ static future scan_table( // by tasking another node to take over scanning of the dead node's primary // ranges. What we do here is that this node will also check expiration // on its *secondary* ranges - but only those whose primary owner is down. - token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm.get(), my_host_id, gossiper)); + token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm, my_host_id, gossiper)); while (std::optional range = my_secondary_ranges.next_partition_range()) { expiration_stats.secondary_ranges_scanned++; dht::partition_range_vector partition_ranges; diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 686a842589..00c1fa71c1 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -220,7 +220,8 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::stati throw std::runtime_error("Mixed sending and receiving is not supported"); } _nr_rx_added++; - auto erm = ermp.get(); + auto erm = ermp->maybe_as_vnode_effective_replication_map(); + SCYLLA_ASSERT(erm != nullptr); auto ranges_for_keyspace = !is_replacing && use_strict_sources_for_ranges(keyspace_name, *erm) ? get_all_ranges_with_strict_sources_for(keyspace_name, erm, std::move(ranges), gossiper) : get_all_ranges_with_sources_for(keyspace_name, erm, std::move(ranges)); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 2488753b1a..4eb820e80e 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -8,6 +8,7 @@ #include "locator/abstract_replication_strategy.hh" #include "locator/tablet_replication_strategy.hh" +#include "locator/local_strategy.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" #include @@ -495,9 +496,14 @@ stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until( return stop_iteration::no; } -vnode_effective_replication_map::~vnode_effective_replication_map() { +static_effective_replication_map::~static_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); + } +} + +vnode_effective_replication_map::~vnode_effective_replication_map() { + if (is_registered()) { try { _factory->submit_background_work(clear_gently(std::move(_replication_map), std::move(*_pending_endpoints), @@ -531,19 +537,26 @@ future effective_replication_map_factory:: co_return erm; } - // try to find a reference erm on shard 0 - // TODO: - // - use hash of key to distribute the load - // - instaintiate only on NUMA nodes - auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { - auto erm = ermf.find_effective_replication_map(key); - co_return make_foreign(std::move(erm)); - }); mutable_static_effective_replication_map_ptr new_erm; - if (ref_erm) { - new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr)); + + if (rs->is_local()) { + // Local replication strategy does not benefit from cloning across shards + // to save an expensive calculate function like `calculate_vnode_effective_replication_map` + new_erm = make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr)); } else { - new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr)); + // try to find a reference erm on shard 0 + // TODO: + // - use hash of key to distribute the load + // - instaintiate only on NUMA nodes + auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { + auto erm = ermf.find_effective_replication_map(key); + co_return make_foreign(std::move(erm)); + }); + if (ref_erm) { + new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr)); + } else { + new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr)); + } } co_return insert_effective_replication_map(std::move(new_erm), std::move(key)); } @@ -568,7 +581,7 @@ static_effective_replication_map_ptr effective_replication_map_factory::insert_e return res; } -bool effective_replication_map_factory::erase_effective_replication_map(vnode_effective_replication_map* erm) { +bool effective_replication_map_factory::erase_effective_replication_map(static_effective_replication_map* erm) { const auto& key = erm->get_factory_key(); auto it = _effective_replication_maps.find(key); if (it == _effective_replication_maps.end()) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 65cf71cbd5..dde756607d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -55,7 +55,7 @@ using replication_map = std::unordered_map; using host_id_set = utils::basic_sequenced_set; -class vnode_effective_replication_map; +class static_effective_replication_map; class effective_replication_map_factory; class per_table_replication_strategy; class tablet_aware_replication_strategy; @@ -63,7 +63,7 @@ class effective_replication_map; class abstract_replication_strategy : public seastar::enable_shared_from_this { - friend class vnode_effective_replication_map; + friend class static_effective_replication_map; friend class per_table_replication_strategy; friend class tablet_aware_replication_strategy; protected: @@ -290,7 +290,9 @@ public: virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const = 0; }; -using static_effective_replication_map = vnode_effective_replication_map; +class static_effective_replication_map; +class vnode_effective_replication_map; +class local_effective_replication_map; using static_effective_replication_map_ptr = shared_ptr; using mutable_static_effective_replication_map_ptr = shared_ptr; @@ -301,7 +303,7 @@ using mutable_static_erm_ptr = mutable_static_effective_replication_map_ptr; // effective replication strategy over the given token_metadata // and replication_strategy_config_options. // Used for token-based replication strategies. -class vnode_effective_replication_map : public enable_shared_from_this +class static_effective_replication_map : public enable_shared_from_this , public effective_replication_map { public: struct factory_key { @@ -320,6 +322,86 @@ public: bool operator==(const factory_key& o) const = default; }; +private: + std::optional _factory_key = std::nullopt; + effective_replication_map_factory* _factory = nullptr; + + friend class abstract_replication_strategy; + friend class effective_replication_map_factory; +public: + explicit static_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, size_t replication_factor) noexcept + : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) + { } + static_effective_replication_map() = delete; + static_effective_replication_map(static_effective_replication_map&&) = default; + ~static_effective_replication_map(); + + virtual const vnode_effective_replication_map* maybe_as_vnode_effective_replication_map() const { + return nullptr; + } + + virtual const local_effective_replication_map* maybe_as_local_effective_replication_map() const { + return nullptr; + } + + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const = 0; + +public: + static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr); + + const factory_key& get_factory_key() const noexcept { + return *_factory_key; + } + + void set_factory(effective_replication_map_factory& factory, factory_key key) noexcept { + _factory = &factory; + _factory_key.emplace(std::move(key)); + } + + bool is_registered() const noexcept { + return _factory != nullptr; + } + + void unregister() noexcept { + _factory = nullptr; + } +}; + +// Apply the replication strategy over the current configuration and the given token_metadata. +future calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr); + +// Class to hold a coherent view of a keyspace +// effective replication map on all shards +class global_static_effective_replication_map { + std::vector> _erms; + +public: + global_static_effective_replication_map() : _erms(smp::count) {} + global_static_effective_replication_map(global_static_effective_replication_map&&) = default; + global_static_effective_replication_map& operator=(global_static_effective_replication_map&&) = default; + + future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); + + const static_effective_replication_map& get() const noexcept { + return *_erms[this_shard_id()]; + } + + const static_effective_replication_map& operator*() const noexcept { + return get(); + } + + const static_effective_replication_map* operator->() const noexcept { + return &get(); + } +}; + +future make_global_static_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); + +// Holds the full replication_map resulting from applying the +// effective replication strategy over the given token_metadata +// and replication_strategy_config_options. +// Used for token-based replication strategies. +class vnode_effective_replication_map : public static_effective_replication_map { private: replication_map _replication_map; ring_mapping _pending_endpoints; @@ -343,7 +425,7 @@ public: // effective_replication_map public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints, ring_mapping read_endpoints, std::unordered_set dirty_endpoints, size_t replication_factor) noexcept - : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) + : static_effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _replication_map(std::move(replication_map)) , _pending_endpoints(std::move(pending_endpoints)) , _read_endpoints(std::move(read_endpoints)) @@ -353,7 +435,11 @@ public: vnode_effective_replication_map(vnode_effective_replication_map&&) = default; ~vnode_effective_replication_map(); - future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const; + virtual const vnode_effective_replication_map* maybe_as_vnode_effective_replication_map() const override { + return this; + } + + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const override; // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible @@ -417,35 +503,42 @@ inline mutable_static_erm_ptr make_vnode_effective_replication_map_ptr(replicati std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), replication_factor); } -// Apply the replication strategy over the current configuration and the given token_metadata. -future calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr); - -// Class to hold a coherent view of a keyspace -// static effective replication map on all shards -class global_static_effective_replication_map { - std::vector> _erms; - +// Holds the full replication_map resulting from applying the +// effective replication strategy over the given token_metadata +// and replication_strategy_config_options. +// Used for token-based replication strategies. +class local_effective_replication_map : public static_effective_replication_map { + host_id_vector_replica_set _replica_set; + dht::token_range_vector _local_ranges; public: - global_static_effective_replication_map() : _erms(smp::count) {} - global_static_effective_replication_map(global_static_effective_replication_map&&) = default; - global_static_effective_replication_map& operator=(global_static_effective_replication_map&&) = default; + explicit local_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) noexcept + : static_effective_replication_map(std::move(rs), std::move(tmptr), 1) + , _replica_set({_tmptr->get_topology().my_host_id()}) + , _local_ranges({dht::token_range::make_open_ended_both_sides()}) + { } + local_effective_replication_map() = delete; + local_effective_replication_map(local_effective_replication_map&&) = default; - future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); - - const static_effective_replication_map& get() const noexcept { - return *_erms[this_shard_id()]; + virtual const local_effective_replication_map* maybe_as_local_effective_replication_map() const override { + return this; } - const static_effective_replication_map& operator*() const noexcept { - return get(); - } + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const override; - const static_effective_replication_map* operator->() const noexcept { - return &get(); - } + host_id_vector_replica_set get_natural_replicas(const token& search_token) const override; + host_id_vector_topology_change get_pending_replicas(const token& search_token) const override; + host_id_vector_replica_set get_replicas_for_reading(const token& token) const override; + host_id_vector_replica_set get_replicas(const token& search_token) const override; + std::optional check_locality(const token& token) const override; + bool has_pending_ranges(locator::host_id endpoint) const override; + std::unique_ptr make_splitter() const override; + const dht::sharder& get_sharder(const schema& s) const override; + future get_ranges(host_id ep) const override; }; -future make_global_static_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); +inline mutable_static_erm_ptr make_local_effective_replication_map_ptr(replication_strategy_ptr rs, token_metadata_ptr tmptr) { + return seastar::make_shared(std::move(rs), std::move(tmptr)); +} } // namespace locator @@ -515,6 +608,7 @@ private: void submit_background_work(future<> fut); + friend class static_effective_replication_map; friend class vnode_effective_replication_map; }; diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 242c1288c2..ca575e2c3a 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -8,6 +8,7 @@ #include #include "local_strategy.hh" +#include "dht/token.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" @@ -40,6 +41,67 @@ sstring local_strategy::sanity_check_read_replicas(const effective_replication_m return {}; } +future local_effective_replication_map::clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const { + return make_ready_future(make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr))); +} + +host_id_vector_replica_set local_effective_replication_map::get_natural_replicas(const token&) const { + return _replica_set; +} + +host_id_vector_topology_change local_effective_replication_map::get_pending_replicas(const token&) const { + return host_id_vector_topology_change{}; +} + +host_id_vector_replica_set local_effective_replication_map::get_replicas_for_reading(const token& token) const { + return _replica_set; +} + +host_id_vector_replica_set local_effective_replication_map::get_replicas(const token&) const { + return _replica_set; +} + +std::optional local_effective_replication_map::check_locality(const token& token) const { + return std::nullopt; +} + +bool local_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const { + return false; +} + +std::unique_ptr local_effective_replication_map::make_splitter() const { + class local_splitter : public token_range_splitter { + std::optional _cur; + public: + local_splitter() + : _cur(dht::minimum_token()) + { } + + void reset(dht::ring_position_view pos) override { + _cur = pos.token(); + } + + std::optional next_token() override { + if (auto cur = std::exchange(_cur, std::nullopt)) { + return cur; + } + return std::nullopt; + } + }; + return std::make_unique(); +} + +const dht::sharder& local_effective_replication_map::get_sharder(const schema& s) const { + return s.get_sharder(); +} + +future local_effective_replication_map::get_ranges(host_id ep) const { + if (ep == _tmptr->get_topology().my_host_id()) { + return make_ready_future(_local_ranges); + } + return make_ready_future(); +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.LocalStrategy"); static registry registrator_short_name("LocalStrategy"); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 2acb6bf56b..60ad38943b 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -948,6 +948,23 @@ std::unique_ptr make_splitter(token_metadata_ptr return std::make_unique(std::move(tmptr)); } +class local_splitter : public locator::token_range_splitter { + std::optional _next_token; +public: + local_splitter() : _next_token(dht::minimum_token()) {} + + void reset(dht::ring_position_view pos) override { + _next_token = pos.token(); + } + + std::optional next_token() override { + if (auto ret = std::exchange(_next_token, std::nullopt)) { + return ret; + } + return std::nullopt; + } +}; + topology& token_metadata::get_topology() { return _impl->get_topology(); diff --git a/repair/repair.cc b/repair/repair.cc index 1213217551..80fbd81097 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -9,6 +9,7 @@ #include "db/config.hh" #include "repair.hh" #include "gms/gossip_address_map.hh" +#include "locator/abstract_replication_strategy.hh" #include "repair/row_level.hh" #include "locator/network_topology_strategy.hh" @@ -1211,15 +1212,17 @@ future repair_service::do_repair_start(gms::gossip_address_map& addr_map, s } auto germs = make_lw_shared(co_await locator::make_global_static_effective_replication_map(sharded_db, keyspace)); - auto& erm = germs->get(); - auto& topology = erm.get_token_metadata().get_topology(); - auto my_host_id = erm.get_topology().my_host_id(); + auto* ermp = germs->get().maybe_as_vnode_effective_replication_map(); - if (erm.get_replication_strategy().is_local()) { + if (!ermp) { rlogger.info("repair[{}]: completed successfully: nothing to repair for keyspace {} with local replication strategy", id.uuid(), keyspace); co_return id.id; } + auto& erm = *ermp; + auto& topology = erm.get_token_metadata().get_topology(); + auto my_host_id = erm.get_topology().my_host_id(); + if (!_gossiper.local().is_normal(my_host_id)) { throw std::runtime_error("Node is not in NORMAL status yet!"); } diff --git a/replica/database.hh b/replica/database.hh index 1b4c43918c..228214aa31 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1798,6 +1798,7 @@ public: std::vector get_all_keyspaces() const; std::vector get_non_local_strategy_keyspaces() const; std::vector get_non_local_vnode_based_strategy_keyspaces() const; + // All static_effective_replication_map_ptr must hold a vnode_effective_replication_map std::unordered_map get_non_local_strategy_keyspaces_erms() const; std::vector get_tablets_keyspaces() const; column_family& find_column_family(std::string_view ks, std::string_view name); diff --git a/service/storage_service.cc b/service/storage_service.cc index 2ff2633023..da7edbfad4 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5247,9 +5247,8 @@ future<> storage_service::unbootstrap() { std::unordered_map> ranges_to_stream; auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); - for (const auto& [keyspace_name, ermp] : ks_erms) { - auto* erm = ermp.get(); - auto ranges_mm = co_await get_changed_ranges_for_leaving(erm, my_host_id()); + for (const auto& [keyspace_name, erm] : ks_erms) { + auto ranges_mm = co_await get_changed_ranges_for_leaving(erm->maybe_as_vnode_effective_replication_map(), my_host_id()); if (slogger.is_enabled(logging::log_level::debug)) { std::vector> ranges; for (auto& x : ranges_mm) { @@ -5280,7 +5279,7 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptrmaybe_as_vnode_effective_replication_map(); std::unordered_multimap changed_ranges = co_await get_changed_ranges_for_leaving(erm, leaving_node); dht::token_range_vector my_new_ranges; for (auto& x : changed_ranges) { @@ -6037,7 +6036,8 @@ future storage_service::raft_topology_cmd_handler(raft streamer->add_source_filter(std::make_unique(source_dc)); } for (const auto& [keyspace_name, erm] : ks_erms) { - co_await streamer->add_ranges(keyspace_name, erm, co_await get_ranges_for_endpoint(*erm, my_host_id()), _gossiper, false); + auto ranges = co_await get_ranges_for_endpoint(*erm, my_host_id()); + co_await streamer->add_ranges(keyspace_name, erm, std::move(ranges), _gossiper, false); } try { co_await streamer->stream_async(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 56ec46dedf..64578327f0 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2938,7 +2938,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { muts.reserve(topo.normal_nodes.size()); std::unordered_set dirty_nodes; - for (auto& [_, erm] : _db.get_non_local_strategy_keyspaces_erms()) { + for (auto& [_, ermp] : _db.get_non_local_strategy_keyspaces_erms()) { + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); const std::unordered_set& nodes = rollback ? erm->get_all_pending_nodes() : erm->get_dirty_endpoints(); dirty_nodes.insert(nodes.begin(), nodes.end()); } diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index e51a042880..c1b0fb9d2e 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -74,7 +74,8 @@ static void verify_sorted(const dht::token_range_vector& trv) { BOOST_CHECK(std::ranges::adjacent_find(trv, not_strictly_before) == trv.end()); } -static future<> check_ranges_are_sorted(static_effective_replication_map_ptr erm, locator::host_id ep) { +static future<> check_ranges_are_sorted(static_effective_replication_map_ptr ermp, locator::host_id ep) { + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); verify_sorted(co_await erm->get_ranges(ep)); verify_sorted(co_await erm->get_primary_ranges(ep)); verify_sorted(co_await erm->get_primary_ranges_within_dc(ep));