mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-05 06:23:03 +00:00
abstract_replication_strategy: introduce effective_replication_map
effective_replication_map holds the full replication_map resulting from applying the effective replication strategy over the given token_metadata and replication_strategy_config_options. It is calculated once, in make_effective_replication_map(), and then it can be used for retrieving the endpoints/token_ranges synchronously from the precalculated map. A new virtual get_natural_endpoints(const token&, const effective_replication_map&) method has been added to abstract_replication_strategy so that local_strategy and everywhere_replication_strategy can override it as they may be needed before the token_metadata is established. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "utils/stall_free.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -99,6 +100,12 @@ inet_address_vector_replica_set abstract_replication_strategy::do_get_natural_en
|
||||
return res->second;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const {
|
||||
const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token);
|
||||
auto res = erm.get_replication_map().find(key_token);
|
||||
return res->second;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) {
|
||||
token_metadata_ptr tmptr = _shared_token_metadata.get();
|
||||
inet_address_vector_replica_set natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield);
|
||||
@@ -336,4 +343,23 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
|
||||
return ret;
|
||||
}
|
||||
|
||||
future<mutable_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) {
|
||||
replication_map replication_map;
|
||||
|
||||
for (const auto &t : tmptr->sorted_tokens()) {
|
||||
replication_map.emplace(t, co_await rs->calculate_natural_endpoints(t, *tmptr));
|
||||
}
|
||||
|
||||
co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map));
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set effective_replication_map::get_natural_endpoints(const token& search_token) const {
|
||||
return _rs->get_natural_endpoints(search_token, *this);
|
||||
}
|
||||
|
||||
future<> effective_replication_map::clear_gently() noexcept {
|
||||
co_await utils::clear_gently(_replication_map);
|
||||
co_await utils::clear_gently(_tmptr);
|
||||
}
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -55,6 +55,8 @@ using replication_strategy_config_options = std::map<sstring, sstring>;
|
||||
|
||||
using replication_map = std::unordered_map<token, inet_address_vector_replica_set>;
|
||||
|
||||
class effective_replication_map;
|
||||
|
||||
class abstract_replication_strategy {
|
||||
private:
|
||||
long _last_invalidated_ring_version = 0;
|
||||
@@ -63,6 +65,8 @@ private:
|
||||
|
||||
replication_map&
|
||||
get_cached_endpoints(const token_metadata& tm);
|
||||
|
||||
friend class effective_replication_map;
|
||||
protected:
|
||||
replication_strategy_config_options _config_options;
|
||||
const shared_token_metadata& _shared_token_metadata;
|
||||
@@ -108,6 +112,7 @@ public:
|
||||
const replication_strategy_config_options& config_options);
|
||||
static void validate_replication_factor(sstring rf);
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token& search_token, can_yield = can_yield::no);
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const;
|
||||
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no);
|
||||
virtual void validate_options() const = 0;
|
||||
virtual std::optional<std::set<sstring>> recognized_options() const = 0;
|
||||
@@ -134,6 +139,7 @@ public:
|
||||
dht::token_range_vector get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield can_yield = can_yield::no) const {
|
||||
return do_get_ranges(ep, std::move(tmptr), can_yield);
|
||||
}
|
||||
|
||||
private:
|
||||
// Caller must ensure that token_metadata will not change throughout the call if can_yield::yes.
|
||||
dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const;
|
||||
@@ -168,4 +174,46 @@ public:
|
||||
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, can_yield) const;
|
||||
};
|
||||
|
||||
// Holds the full replication_map resulting from applying the
|
||||
// effective replication strategy over the given token_metadata
|
||||
// and replication_strategy_config_options.
|
||||
class effective_replication_map {
|
||||
private:
|
||||
abstract_replication_strategy::ptr_type _rs;
|
||||
token_metadata_ptr _tmptr;
|
||||
replication_map _replication_map;
|
||||
|
||||
friend class abstract_replication_strategy;
|
||||
public:
|
||||
explicit effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map) noexcept
|
||||
: _rs(std::move(rs))
|
||||
, _tmptr(std::move(tmptr))
|
||||
, _replication_map(std::move(replication_map))
|
||||
{ }
|
||||
effective_replication_map() = delete;
|
||||
effective_replication_map(effective_replication_map&&) = default;
|
||||
|
||||
const token_metadata_ptr& get_token_metadata_ptr() const noexcept {
|
||||
return _tmptr;
|
||||
}
|
||||
|
||||
const replication_map& get_replication_map() const noexcept {
|
||||
return _replication_map;
|
||||
}
|
||||
|
||||
future<> clear_gently() noexcept;
|
||||
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const;
|
||||
};
|
||||
|
||||
using effective_replication_map_ptr = lw_shared_ptr<const effective_replication_map>;
|
||||
using mutable_effective_replication_map_ptr = lw_shared_ptr<effective_replication_map>;
|
||||
|
||||
inline mutable_effective_replication_map_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map) {
|
||||
return make_lw_shared<effective_replication_map>(std::move(rs), std::move(tmptr), std::move(replication_map));
|
||||
}
|
||||
|
||||
// Apply the replication strategy over the current configuration and the given token_metadata.
|
||||
future<mutable_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
|
||||
|
||||
}
|
||||
|
||||
@@ -66,6 +66,14 @@ size_t everywhere_replication_strategy::get_replication_factor() const {
|
||||
return _shared_token_metadata.get()->count_normal_token_owners();
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const effective_replication_map& erm) const {
|
||||
const auto& tm = *erm.get_token_metadata_ptr();
|
||||
if (tm.sorted_tokens().empty()) {
|
||||
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
|
||||
}
|
||||
return boost::copy_range<inet_address_vector_replica_set>(tm.get_all_endpoints());
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const shared_token_metadata&, snitch_ptr&, const replication_strategy_config_options&>;
|
||||
static registry registrator("org.apache.cassandra.locator.EverywhereStrategy");
|
||||
static registry registrator_short_name("EverywhereStrategy");
|
||||
|
||||
@@ -62,5 +62,11 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to override this because the default implementation depends
|
||||
* on token calculations but everywhere_replication_strategy may be used before tokens are set up.
|
||||
*/
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -54,6 +54,10 @@ size_t local_strategy::get_replication_factor() const {
|
||||
return 1;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const effective_replication_map&) const {
|
||||
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, local_strategy, const shared_token_metadata&, snitch_ptr&, const replication_strategy_config_options&>;
|
||||
static registry registrator("org.apache.cassandra.locator.LocalStrategy");
|
||||
static registry registrator_short_name("LocalStrategy");
|
||||
|
||||
@@ -58,6 +58,11 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to override this because the default implementation depends
|
||||
* on token calculations but LocalStrategy may be used before tokens are set up.
|
||||
*/
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user