diff --git a/alternator/ttl.cc b/alternator/ttl.cc index a70c8efb4a..685bef3201 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -769,10 +769,14 @@ static future scan_table( } } } else { // VNodes - locator::static_effective_replication_map_ptr erm = + locator::static_effective_replication_map_ptr ermp = db.real_database().find_keyspace(s->ks_name()).get_static_effective_replication_map(); + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); + if (!erm) { + on_internal_error(tlogger, format("Keyspace {} is local", s->ks_name())); + } auto my_host_id = erm->get_topology().my_host_id(); - token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm.get(), my_host_id)); + token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm, my_host_id)); while (std::optional range = my_ranges.next_partition_range()) { // Note that because of issue #9167 we need to run a separate // query on each partition range, and can't pass several of @@ -792,7 +796,7 @@ static future scan_table( // by tasking another node to take over scanning of the dead node's primary // ranges. What we do here is that this node will also check expiration // on its *secondary* ranges - but only those whose primary owner is down. - token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm.get(), my_host_id, gossiper)); + token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm, my_host_id, gossiper)); while (std::optional range = my_secondary_ranges.next_partition_range()) { expiration_stats.secondary_ranges_scanned++; dht::partition_range_vector partition_ranges; diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 686a842589..00c1fa71c1 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -220,7 +220,8 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::stati throw std::runtime_error("Mixed sending and receiving is not supported"); } _nr_rx_added++; - auto erm = ermp.get(); + auto erm = ermp->maybe_as_vnode_effective_replication_map(); + SCYLLA_ASSERT(erm != nullptr); auto ranges_for_keyspace = !is_replacing && use_strict_sources_for_ranges(keyspace_name, *erm) ? get_all_ranges_with_strict_sources_for(keyspace_name, erm, std::move(ranges), gossiper) : get_all_ranges_with_sources_for(keyspace_name, erm, std::move(ranges)); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 2488753b1a..4eb820e80e 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -8,6 +8,7 @@ #include "locator/abstract_replication_strategy.hh" #include "locator/tablet_replication_strategy.hh" +#include "locator/local_strategy.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" #include @@ -495,9 +496,14 @@ stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until( return stop_iteration::no; } -vnode_effective_replication_map::~vnode_effective_replication_map() { +static_effective_replication_map::~static_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); + } +} + +vnode_effective_replication_map::~vnode_effective_replication_map() { + if (is_registered()) { try { _factory->submit_background_work(clear_gently(std::move(_replication_map), std::move(*_pending_endpoints), @@ -531,19 +537,26 @@ future effective_replication_map_factory:: co_return erm; } - // try to find a reference erm on shard 0 - // TODO: - // - use hash of key to distribute the load - // - instaintiate only on NUMA nodes - auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { - auto erm = ermf.find_effective_replication_map(key); - co_return make_foreign(std::move(erm)); - }); mutable_static_effective_replication_map_ptr new_erm; - if (ref_erm) { - new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr)); + + if (rs->is_local()) { + // Local replication strategy does not benefit from cloning across shards + // to save an expensive calculate function like `calculate_vnode_effective_replication_map` + new_erm = make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr)); } else { - new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr)); + // try to find a reference erm on shard 0 + // TODO: + // - use hash of key to distribute the load + // - instaintiate only on NUMA nodes + auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { + auto erm = ermf.find_effective_replication_map(key); + co_return make_foreign(std::move(erm)); + }); + if (ref_erm) { + new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr)); + } else { + new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr)); + } } co_return insert_effective_replication_map(std::move(new_erm), std::move(key)); } @@ -568,7 +581,7 @@ static_effective_replication_map_ptr effective_replication_map_factory::insert_e return res; } -bool effective_replication_map_factory::erase_effective_replication_map(vnode_effective_replication_map* erm) { +bool effective_replication_map_factory::erase_effective_replication_map(static_effective_replication_map* erm) { const auto& key = erm->get_factory_key(); auto it = _effective_replication_maps.find(key); if (it == _effective_replication_maps.end()) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 65cf71cbd5..dde756607d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -55,7 +55,7 @@ using replication_map = std::unordered_map; using host_id_set = utils::basic_sequenced_set; -class vnode_effective_replication_map; +class static_effective_replication_map; class effective_replication_map_factory; class per_table_replication_strategy; class tablet_aware_replication_strategy; @@ -63,7 +63,7 @@ class effective_replication_map; class abstract_replication_strategy : public seastar::enable_shared_from_this { - friend class vnode_effective_replication_map; + friend class static_effective_replication_map; friend class per_table_replication_strategy; friend class tablet_aware_replication_strategy; protected: @@ -290,7 +290,9 @@ public: virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const = 0; }; -using static_effective_replication_map = vnode_effective_replication_map; +class static_effective_replication_map; +class vnode_effective_replication_map; +class local_effective_replication_map; using static_effective_replication_map_ptr = shared_ptr; using mutable_static_effective_replication_map_ptr = shared_ptr; @@ -301,7 +303,7 @@ using mutable_static_erm_ptr = mutable_static_effective_replication_map_ptr; // effective replication strategy over the given token_metadata // and replication_strategy_config_options. // Used for token-based replication strategies. -class vnode_effective_replication_map : public enable_shared_from_this +class static_effective_replication_map : public enable_shared_from_this , public effective_replication_map { public: struct factory_key { @@ -320,6 +322,86 @@ public: bool operator==(const factory_key& o) const = default; }; +private: + std::optional _factory_key = std::nullopt; + effective_replication_map_factory* _factory = nullptr; + + friend class abstract_replication_strategy; + friend class effective_replication_map_factory; +public: + explicit static_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, size_t replication_factor) noexcept + : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) + { } + static_effective_replication_map() = delete; + static_effective_replication_map(static_effective_replication_map&&) = default; + ~static_effective_replication_map(); + + virtual const vnode_effective_replication_map* maybe_as_vnode_effective_replication_map() const { + return nullptr; + } + + virtual const local_effective_replication_map* maybe_as_local_effective_replication_map() const { + return nullptr; + } + + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const = 0; + +public: + static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr); + + const factory_key& get_factory_key() const noexcept { + return *_factory_key; + } + + void set_factory(effective_replication_map_factory& factory, factory_key key) noexcept { + _factory = &factory; + _factory_key.emplace(std::move(key)); + } + + bool is_registered() const noexcept { + return _factory != nullptr; + } + + void unregister() noexcept { + _factory = nullptr; + } +}; + +// Apply the replication strategy over the current configuration and the given token_metadata. +future calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr); + +// Class to hold a coherent view of a keyspace +// effective replication map on all shards +class global_static_effective_replication_map { + std::vector> _erms; + +public: + global_static_effective_replication_map() : _erms(smp::count) {} + global_static_effective_replication_map(global_static_effective_replication_map&&) = default; + global_static_effective_replication_map& operator=(global_static_effective_replication_map&&) = default; + + future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); + + const static_effective_replication_map& get() const noexcept { + return *_erms[this_shard_id()]; + } + + const static_effective_replication_map& operator*() const noexcept { + return get(); + } + + const static_effective_replication_map* operator->() const noexcept { + return &get(); + } +}; + +future make_global_static_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); + +// Holds the full replication_map resulting from applying the +// effective replication strategy over the given token_metadata +// and replication_strategy_config_options. +// Used for token-based replication strategies. +class vnode_effective_replication_map : public static_effective_replication_map { private: replication_map _replication_map; ring_mapping _pending_endpoints; @@ -343,7 +425,7 @@ public: // effective_replication_map public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints, ring_mapping read_endpoints, std::unordered_set dirty_endpoints, size_t replication_factor) noexcept - : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) + : static_effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _replication_map(std::move(replication_map)) , _pending_endpoints(std::move(pending_endpoints)) , _read_endpoints(std::move(read_endpoints)) @@ -353,7 +435,11 @@ public: vnode_effective_replication_map(vnode_effective_replication_map&&) = default; ~vnode_effective_replication_map(); - future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const; + virtual const vnode_effective_replication_map* maybe_as_vnode_effective_replication_map() const override { + return this; + } + + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const override; // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible @@ -417,35 +503,42 @@ inline mutable_static_erm_ptr make_vnode_effective_replication_map_ptr(replicati std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), replication_factor); } -// Apply the replication strategy over the current configuration and the given token_metadata. -future calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr); - -// Class to hold a coherent view of a keyspace -// static effective replication map on all shards -class global_static_effective_replication_map { - std::vector> _erms; - +// Holds the full replication_map resulting from applying the +// effective replication strategy over the given token_metadata +// and replication_strategy_config_options. +// Used for token-based replication strategies. +class local_effective_replication_map : public static_effective_replication_map { + host_id_vector_replica_set _replica_set; + dht::token_range_vector _local_ranges; public: - global_static_effective_replication_map() : _erms(smp::count) {} - global_static_effective_replication_map(global_static_effective_replication_map&&) = default; - global_static_effective_replication_map& operator=(global_static_effective_replication_map&&) = default; + explicit local_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) noexcept + : static_effective_replication_map(std::move(rs), std::move(tmptr), 1) + , _replica_set({_tmptr->get_topology().my_host_id()}) + , _local_ranges({dht::token_range::make_open_ended_both_sides()}) + { } + local_effective_replication_map() = delete; + local_effective_replication_map(local_effective_replication_map&&) = default; - future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); - - const static_effective_replication_map& get() const noexcept { - return *_erms[this_shard_id()]; + virtual const local_effective_replication_map* maybe_as_local_effective_replication_map() const override { + return this; } - const static_effective_replication_map& operator*() const noexcept { - return get(); - } + virtual future clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const override; - const static_effective_replication_map* operator->() const noexcept { - return &get(); - } + host_id_vector_replica_set get_natural_replicas(const token& search_token) const override; + host_id_vector_topology_change get_pending_replicas(const token& search_token) const override; + host_id_vector_replica_set get_replicas_for_reading(const token& token) const override; + host_id_vector_replica_set get_replicas(const token& search_token) const override; + std::optional check_locality(const token& token) const override; + bool has_pending_ranges(locator::host_id endpoint) const override; + std::unique_ptr make_splitter() const override; + const dht::sharder& get_sharder(const schema& s) const override; + future get_ranges(host_id ep) const override; }; -future make_global_static_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); +inline mutable_static_erm_ptr make_local_effective_replication_map_ptr(replication_strategy_ptr rs, token_metadata_ptr tmptr) { + return seastar::make_shared(std::move(rs), std::move(tmptr)); +} } // namespace locator @@ -515,6 +608,7 @@ private: void submit_background_work(future<> fut); + friend class static_effective_replication_map; friend class vnode_effective_replication_map; }; diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 242c1288c2..ca575e2c3a 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -8,6 +8,7 @@ #include #include "local_strategy.hh" +#include "dht/token.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" @@ -40,6 +41,67 @@ sstring local_strategy::sanity_check_read_replicas(const effective_replication_m return {}; } +future local_effective_replication_map::clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const { + return make_ready_future(make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr))); +} + +host_id_vector_replica_set local_effective_replication_map::get_natural_replicas(const token&) const { + return _replica_set; +} + +host_id_vector_topology_change local_effective_replication_map::get_pending_replicas(const token&) const { + return host_id_vector_topology_change{}; +} + +host_id_vector_replica_set local_effective_replication_map::get_replicas_for_reading(const token& token) const { + return _replica_set; +} + +host_id_vector_replica_set local_effective_replication_map::get_replicas(const token&) const { + return _replica_set; +} + +std::optional local_effective_replication_map::check_locality(const token& token) const { + return std::nullopt; +} + +bool local_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const { + return false; +} + +std::unique_ptr local_effective_replication_map::make_splitter() const { + class local_splitter : public token_range_splitter { + std::optional _cur; + public: + local_splitter() + : _cur(dht::minimum_token()) + { } + + void reset(dht::ring_position_view pos) override { + _cur = pos.token(); + } + + std::optional next_token() override { + if (auto cur = std::exchange(_cur, std::nullopt)) { + return cur; + } + return std::nullopt; + } + }; + return std::make_unique(); +} + +const dht::sharder& local_effective_replication_map::get_sharder(const schema& s) const { + return s.get_sharder(); +} + +future local_effective_replication_map::get_ranges(host_id ep) const { + if (ep == _tmptr->get_topology().my_host_id()) { + return make_ready_future(_local_ranges); + } + return make_ready_future(); +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.LocalStrategy"); static registry registrator_short_name("LocalStrategy"); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 2acb6bf56b..60ad38943b 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -948,6 +948,23 @@ std::unique_ptr make_splitter(token_metadata_ptr return std::make_unique(std::move(tmptr)); } +class local_splitter : public locator::token_range_splitter { + std::optional _next_token; +public: + local_splitter() : _next_token(dht::minimum_token()) {} + + void reset(dht::ring_position_view pos) override { + _next_token = pos.token(); + } + + std::optional next_token() override { + if (auto ret = std::exchange(_next_token, std::nullopt)) { + return ret; + } + return std::nullopt; + } +}; + topology& token_metadata::get_topology() { return _impl->get_topology(); diff --git a/repair/repair.cc b/repair/repair.cc index 1213217551..80fbd81097 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -9,6 +9,7 @@ #include "db/config.hh" #include "repair.hh" #include "gms/gossip_address_map.hh" +#include "locator/abstract_replication_strategy.hh" #include "repair/row_level.hh" #include "locator/network_topology_strategy.hh" @@ -1211,15 +1212,17 @@ future repair_service::do_repair_start(gms::gossip_address_map& addr_map, s } auto germs = make_lw_shared(co_await locator::make_global_static_effective_replication_map(sharded_db, keyspace)); - auto& erm = germs->get(); - auto& topology = erm.get_token_metadata().get_topology(); - auto my_host_id = erm.get_topology().my_host_id(); + auto* ermp = germs->get().maybe_as_vnode_effective_replication_map(); - if (erm.get_replication_strategy().is_local()) { + if (!ermp) { rlogger.info("repair[{}]: completed successfully: nothing to repair for keyspace {} with local replication strategy", id.uuid(), keyspace); co_return id.id; } + auto& erm = *ermp; + auto& topology = erm.get_token_metadata().get_topology(); + auto my_host_id = erm.get_topology().my_host_id(); + if (!_gossiper.local().is_normal(my_host_id)) { throw std::runtime_error("Node is not in NORMAL status yet!"); } diff --git a/replica/database.hh b/replica/database.hh index 1b4c43918c..228214aa31 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1798,6 +1798,7 @@ public: std::vector get_all_keyspaces() const; std::vector get_non_local_strategy_keyspaces() const; std::vector get_non_local_vnode_based_strategy_keyspaces() const; + // All static_effective_replication_map_ptr must hold a vnode_effective_replication_map std::unordered_map get_non_local_strategy_keyspaces_erms() const; std::vector get_tablets_keyspaces() const; column_family& find_column_family(std::string_view ks, std::string_view name); diff --git a/service/storage_service.cc b/service/storage_service.cc index 2ff2633023..da7edbfad4 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5247,9 +5247,8 @@ future<> storage_service::unbootstrap() { std::unordered_map> ranges_to_stream; auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); - for (const auto& [keyspace_name, ermp] : ks_erms) { - auto* erm = ermp.get(); - auto ranges_mm = co_await get_changed_ranges_for_leaving(erm, my_host_id()); + for (const auto& [keyspace_name, erm] : ks_erms) { + auto ranges_mm = co_await get_changed_ranges_for_leaving(erm->maybe_as_vnode_effective_replication_map(), my_host_id()); if (slogger.is_enabled(logging::log_level::debug)) { std::vector> ranges; for (auto& x : ranges_mm) { @@ -5280,7 +5279,7 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptrmaybe_as_vnode_effective_replication_map(); std::unordered_multimap changed_ranges = co_await get_changed_ranges_for_leaving(erm, leaving_node); dht::token_range_vector my_new_ranges; for (auto& x : changed_ranges) { @@ -6037,7 +6036,8 @@ future storage_service::raft_topology_cmd_handler(raft streamer->add_source_filter(std::make_unique(source_dc)); } for (const auto& [keyspace_name, erm] : ks_erms) { - co_await streamer->add_ranges(keyspace_name, erm, co_await get_ranges_for_endpoint(*erm, my_host_id()), _gossiper, false); + auto ranges = co_await get_ranges_for_endpoint(*erm, my_host_id()); + co_await streamer->add_ranges(keyspace_name, erm, std::move(ranges), _gossiper, false); } try { co_await streamer->stream_async(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 56ec46dedf..64578327f0 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2938,7 +2938,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { muts.reserve(topo.normal_nodes.size()); std::unordered_set dirty_nodes; - for (auto& [_, erm] : _db.get_non_local_strategy_keyspaces_erms()) { + for (auto& [_, ermp] : _db.get_non_local_strategy_keyspaces_erms()) { + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); const std::unordered_set& nodes = rollback ? erm->get_all_pending_nodes() : erm->get_dirty_endpoints(); dirty_nodes.insert(nodes.begin(), nodes.end()); } diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index e51a042880..c1b0fb9d2e 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -74,7 +74,8 @@ static void verify_sorted(const dht::token_range_vector& trv) { BOOST_CHECK(std::ranges::adjacent_find(trv, not_strictly_before) == trv.end()); } -static future<> check_ranges_are_sorted(static_effective_replication_map_ptr erm, locator::host_id ep) { +static future<> check_ranges_are_sorted(static_effective_replication_map_ptr ermp, locator::host_id ep) { + auto* erm = ermp->maybe_as_vnode_effective_replication_map(); verify_sorted(co_await erm->get_ranges(ep)); verify_sorted(co_await erm->get_primary_ranges(ep)); verify_sorted(co_await erm->get_primary_ranges_within_dc(ep));