diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index d0dd3c50e4..5d4f184b08 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -24,6 +24,7 @@ #include "exceptions/exceptions.hh" #include #include +#include "utils/stall_free.hh" namespace locator { @@ -99,6 +100,12 @@ inet_address_vector_replica_set abstract_replication_strategy::do_get_natural_en return res->second; } +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const { + const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); + auto res = erm.get_replication_map().find(key_token); + return res->second; +} + inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) { token_metadata_ptr tmptr = _shared_token_metadata.get(); inet_address_vector_replica_set natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield); @@ -336,4 +343,23 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p return ret; } +future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) { + replication_map replication_map; + + for (const auto &t : tmptr->sorted_tokens()) { + replication_map.emplace(t, co_await rs->calculate_natural_endpoints(t, *tmptr)); + } + + co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map)); +} + +inet_address_vector_replica_set effective_replication_map::get_natural_endpoints(const token& search_token) const { + return _rs->get_natural_endpoints(search_token, *this); +} + +future<> effective_replication_map::clear_gently() noexcept { + co_await utils::clear_gently(_replication_map); + co_await utils::clear_gently(_tmptr); +} + } // namespace locator diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index fbe7eeaf7b..417a02f175 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -55,6 +55,8 @@ using replication_strategy_config_options = std::map; using replication_map = std::unordered_map; +class effective_replication_map; + class abstract_replication_strategy { private: long _last_invalidated_ring_version = 0; @@ -63,6 +65,8 @@ private: replication_map& get_cached_endpoints(const token_metadata& tm); + + friend class effective_replication_map; protected: replication_strategy_config_options _config_options; const shared_token_metadata& _shared_token_metadata; @@ -108,6 +112,7 @@ public: const replication_strategy_config_options& config_options); static void validate_replication_factor(sstring rf); inet_address_vector_replica_set get_natural_endpoints(const token& search_token, can_yield = can_yield::no); + virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const; inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no); virtual void validate_options() const = 0; virtual std::optional> recognized_options() const = 0; @@ -134,6 +139,7 @@ public: dht::token_range_vector get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield can_yield = can_yield::no) const { return do_get_ranges(ep, std::move(tmptr), can_yield); } + private: // Caller must ensure that token_metadata will not change throughout the call if can_yield::yes. dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const; @@ -168,4 +174,46 @@ public: dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, inet_address pending_address, can_yield) const; }; +// Holds the full replication_map resulting from applying the +// effective replication strategy over the given token_metadata +// and replication_strategy_config_options. +class effective_replication_map { +private: + abstract_replication_strategy::ptr_type _rs; + token_metadata_ptr _tmptr; + replication_map _replication_map; + + friend class abstract_replication_strategy; +public: + explicit effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map) noexcept + : _rs(std::move(rs)) + , _tmptr(std::move(tmptr)) + , _replication_map(std::move(replication_map)) + { } + effective_replication_map() = delete; + effective_replication_map(effective_replication_map&&) = default; + + const token_metadata_ptr& get_token_metadata_ptr() const noexcept { + return _tmptr; + } + + const replication_map& get_replication_map() const noexcept { + return _replication_map; + } + + future<> clear_gently() noexcept; + + inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const; +}; + +using effective_replication_map_ptr = lw_shared_ptr; +using mutable_effective_replication_map_ptr = lw_shared_ptr; + +inline mutable_effective_replication_map_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map) { + return make_lw_shared(std::move(rs), std::move(tmptr), std::move(replication_map)); +} + +// Apply the replication strategy over the current configuration and the given token_metadata. +future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); + } diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index e64547c567..cfe04b0853 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -66,6 +66,14 @@ size_t everywhere_replication_strategy::get_replication_factor() const { return _shared_token_metadata.get()->count_normal_token_owners(); } +inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const effective_replication_map& erm) const { + const auto& tm = *erm.get_token_metadata_ptr(); + if (tm.sorted_tokens().empty()) { + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); + } + return boost::copy_range(tm.get_all_endpoints()); +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.EverywhereStrategy"); static registry registrator_short_name("EverywhereStrategy"); diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index 23bb8ec9f4..b1361cb530 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -62,5 +62,11 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return true; } + + /** + * We need to override this because the default implementation depends + * on token calculations but everywhere_replication_strategy may be used before tokens are set up. + */ + virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override; }; } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 185fa663fc..6bb13a9616 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -54,6 +54,10 @@ size_t local_strategy::get_replication_factor() const { return 1; } +inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const effective_replication_map&) const { + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.locator.LocalStrategy"); static registry registrator_short_name("LocalStrategy"); diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index ae43394130..dbe5504d37 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -58,6 +58,11 @@ public: return false; } + /** + * We need to override this because the default implementation depends + * on token calculations but LocalStrategy may be used before tokens are set up. + */ + inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override; }; }