diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 189d74159c..c2f0b215c4 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -71,6 +71,14 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring& } } +inet_address_vector_replica_set abstract_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) const { + if (!can_yield) { + return calculate_natural_endpoints_sync(search_token, tm); + } else { + return calculate_natural_endpoints_async(search_token, tm).get0(); + } +} + inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) { return do_get_natural_endpoints(search_token, *_shared_token_metadata.get(), can_yield); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index a4985dacdf..1e67d22d3f 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -96,7 +96,11 @@ public: // is small, that implementation may not yield since by itself it won't cause a reactor stall (assuming practical // cluster sizes and number of tokens per node). The caller is responsible for yielding if they call this function // in a loop. - virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0; + inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const; + + // FIXME: temporary, until all users are converted to use the async version + virtual inet_address_vector_replica_set calculate_natural_endpoints_sync(const token& search_token, const token_metadata& tm) const = 0; + virtual future calculate_natural_endpoints_async(const token& search_token, const token_metadata& tm) const = 0; virtual ~abstract_replication_strategy() {} static std::unique_ptr create_replication_strategy(const sstring& strategy_name, const shared_token_metadata& stm, const replication_strategy_config_options& config_options); diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index e274d4b9e4..e6700000e3 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -47,15 +47,19 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const shared_token_metadata& token_metadata, snitch_ptr& snitch, const replication_strategy_config_options& config_options) : abstract_replication_strategy(token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {} -inet_address_vector_replica_set everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const { +inet_address_vector_replica_set everywhere_replication_strategy::calculate_natural_endpoints_sync(const token& search_token, const token_metadata& tm) const { return boost::copy_range(tm.get_all_endpoints()); } -inet_address_vector_replica_set everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { +future everywhere_replication_strategy::calculate_natural_endpoints_async(const token& search_token, const token_metadata& tm) const { + return make_ready_future(boost::copy_range(tm.get_all_endpoints())); +} + +inet_address_vector_replica_set everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) { if (tm.sorted_tokens().empty()) { return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } - return calculate_natural_endpoints(search_token, tm, can_yield); + return calculate_natural_endpoints_sync(search_token, tm); } size_t everywhere_replication_strategy::get_replication_factor() const { diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index 74dcd5e7a0..5e50778890 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -46,7 +46,8 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const shared_token_metadata& token_metadata, snitch_ptr& snitch, const replication_strategy_config_options& config_options); - virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints_sync(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints_async(const token& search_token, const token_metadata& tm) const override; inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; virtual void validate_options() const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index d222060384..c42d56895b 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -34,10 +34,14 @@ inet_address_vector_replica_set local_strategy::do_get_natural_endpoints(const t return calculate_natural_endpoints(t, tm, can_yield); } -inet_address_vector_replica_set local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const { +inet_address_vector_replica_set local_strategy::calculate_natural_endpoints_sync(const token& t, const token_metadata& tm) const { return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } +future local_strategy::calculate_natural_endpoints_async(const token& t, const token_metadata& tm) const { + return make_ready_future(inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})); +} + void local_strategy::validate_options() const { } diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index f6d4ab582d..f45d2a19ca 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -35,12 +35,14 @@ using inet_address = gms::inet_address; using token = dht::token; class local_strategy : public abstract_replication_strategy { -protected: - virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: local_strategy(const shared_token_metadata& token_metadata, snitch_ptr& snitch, const replication_strategy_config_options& config_options); virtual ~local_strategy() {}; virtual size_t get_replication_factor() const override; + + virtual inet_address_vector_replica_set calculate_natural_endpoints_sync(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints_async(const token& search_token, const token_metadata& tm) const override; + /** * We need to override this even if we override calculateNaturalEndpoints, * because the default implementation depends on token calculations but diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index fcc7ed5bd2..e55d40a454 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -37,6 +37,10 @@ */ #include + +#include +#include + #include "locator/network_topology_strategy.hh" #include "utils/sequenced_set.hh" #include @@ -268,16 +272,12 @@ public: }; inet_address_vector_replica_set -network_topology_strategy::calculate_natural_endpoints( - const token& search_token, const token_metadata& tm, can_yield can_yield) const { +network_topology_strategy::calculate_natural_endpoints_sync( + const token& search_token, const token_metadata& tm) const { natural_endpoints_tracker tracker(tm, _dc_rep_factor); for (auto& next : tm.ring_range(search_token)) { - if (can_yield) { - seastar::thread::maybe_yield(); - } - inet_address ep = *tm.get_endpoint(next); if (tracker.add_endpoint_and_check_if_done(ep)) { break; @@ -287,6 +287,24 @@ network_topology_strategy::calculate_natural_endpoints( return boost::copy_range(tracker.replicas().get_vector()); } +future +network_topology_strategy::calculate_natural_endpoints_async( + const token& search_token, const token_metadata& tm) const { + + natural_endpoints_tracker tracker(tm, _dc_rep_factor); + + for (auto& next : tm.ring_range(search_token)) { + co_await coroutine::maybe_yield(); + + inet_address ep = *tm.get_endpoint(next); + if (tracker.add_endpoint_and_check_if_done(ep)) { + break; + } + } + + co_return boost::copy_range(tracker.replicas().get_vector()); +} + void network_topology_strategy::validate_options() const { for (auto& c : _config_options) { if (c.first == sstring("replication_factor")) { diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index 1a56b25632..577b0cce71 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -74,8 +74,10 @@ protected: * calculate endpoints in one pass through the tokens by tracking our * progress in each DC, rack etc. */ - virtual inet_address_vector_replica_set calculate_natural_endpoints( - const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints_sync( + const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints_async( + const token& search_token, const token_metadata& tm) const override; virtual void validate_options() const override; diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 7fe07cec47..c833f9f890 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -20,6 +20,10 @@ */ #include + +#include +#include + #include "simple_strategy.hh" #include "utils/class_registrator.hh" #include @@ -42,7 +46,7 @@ simple_strategy::simple_strategy(const shared_token_metadata& token_metadata, sn } } -inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const { +inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints_sync(const token& t, const token_metadata& tm) const { const std::vector& tokens = tm.sorted_tokens(); if (tokens.empty()) { @@ -57,9 +61,7 @@ inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(con if (endpoints.size() == replicas) { break; } - if (can_yield) { - seastar::thread::maybe_yield(); - } + auto ep = tm.get_endpoint(token); assert(ep); @@ -69,6 +71,32 @@ inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(con return boost::copy_range(endpoints.get_vector()); } +future simple_strategy::calculate_natural_endpoints_async(const token& t, const token_metadata& tm) const { + const std::vector& tokens = tm.sorted_tokens(); + + if (tokens.empty()) { + co_return inet_address_vector_replica_set(); + } + + size_t replicas = get_replication_factor(); + utils::sequenced_set endpoints; + endpoints.reserve(replicas); + + for (auto& token : tm.ring_range(t)) { + if (endpoints.size() == replicas) { + break; + } + + auto ep = tm.get_endpoint(token); + assert(ep); + + endpoints.push_back(*ep); + co_await coroutine::maybe_yield(); + } + + co_return boost::copy_range(endpoints.get_vector()); +} + size_t simple_strategy::get_replication_factor() const { return _replication_factor; } diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index 0d2f4eed38..7eef4941cd 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -29,8 +29,6 @@ namespace locator { class simple_strategy : public abstract_replication_strategy { -protected: - virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: simple_strategy(const shared_token_metadata& token_metadata, snitch_ptr& snitch, const replication_strategy_config_options& config_options); virtual ~simple_strategy() {}; @@ -40,6 +38,9 @@ public: virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override { return true; } + + virtual inet_address_vector_replica_set calculate_natural_endpoints_sync(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints_async(const token& search_token, const token_metadata& tm) const override; private: size_t _replication_factor = 1; };