mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
abstract_replication_strategy: move get_ranges and get_primary_ranges* to effective_replication_map
Provide a sync get_ranges method by effective_replication_map that uses the precalculated map to get all token ranges owned by or replicated on a given endpoint. Reuse do_get_ranges as common infrastructure for all 3 cases: get_ranges, get_primary_ranges, and get_primary_ranges_within_dc. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -2189,7 +2189,7 @@ const sstring& database::get_snitch_name() const {
|
||||
}
|
||||
|
||||
dht::token_range_vector database::get_keyspace_local_ranges(sstring ks) {
|
||||
return find_keyspace(ks).get_replication_strategy().get_ranges(utils::fb_utilities::get_broadcast_address());
|
||||
return find_keyspace(ks).get_effective_replication_map()->get_ranges(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
/*!
|
||||
|
||||
@@ -154,6 +154,32 @@ insert_token_range_to_sorted_container_while_unwrapping(
|
||||
}
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
effective_replication_map::do_get_ranges(noncopyable_function<bool(inet_address_vector_replica_set)> should_add_range) const {
|
||||
dht::token_range_vector ret;
|
||||
const auto& tm = *_tmptr;
|
||||
auto prev_tok = tm.sorted_tokens().back();
|
||||
for (const auto& tok : tm.sorted_tokens()) {
|
||||
if (should_add_range(get_natural_endpoints(tok))) {
|
||||
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
||||
}
|
||||
prev_tok = tok;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
effective_replication_map::get_ranges(inet_address ep) const {
|
||||
return do_get_ranges([ep] (inet_address_vector_replica_set eps) {
|
||||
for (auto a : eps) {
|
||||
if (a == ep) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
// Caller must ensure that token_metadata will not change throughout the call if can_yield::yes.
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield can_yield) const {
|
||||
@@ -176,49 +202,27 @@ abstract_replication_strategy::do_get_ranges(inet_address ep, const token_metada
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_primary_ranges(inet_address ep, can_yield can_yield) {
|
||||
dht::token_range_vector ret;
|
||||
token_metadata_ptr tmptr = _shared_token_metadata.get();
|
||||
auto prev_tok = tmptr->sorted_tokens().back();
|
||||
for (auto tok : tmptr->sorted_tokens()) {
|
||||
auto&& eps = do_calculate_natural_endpoints(tok, *tmptr, can_yield);
|
||||
if (eps.size() > 0 && eps[0] == ep) {
|
||||
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
||||
}
|
||||
prev_tok = tok;
|
||||
if (can_yield) {
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
effective_replication_map::get_primary_ranges(inet_address ep) const {
|
||||
return do_get_ranges([ep] (inet_address_vector_replica_set eps) {
|
||||
return eps.size() > 0 && eps[0] == ep;
|
||||
});
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
abstract_replication_strategy::get_primary_ranges_within_dc(inet_address ep, can_yield can_yield) {
|
||||
dht::token_range_vector ret;
|
||||
sstring local_dc = _snitch->get_datacenter(ep);
|
||||
token_metadata_ptr tmptr = _shared_token_metadata.get();
|
||||
std::unordered_set<inet_address> local_dc_nodes = tmptr->get_topology().get_datacenter_endpoints().at(local_dc);
|
||||
auto prev_tok = tmptr->sorted_tokens().back();
|
||||
for (auto tok : tmptr->sorted_tokens()) {
|
||||
auto&& eps = do_calculate_natural_endpoints(tok, *tmptr, can_yield);
|
||||
effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
|
||||
sstring local_dc = _rs->_snitch->get_datacenter(ep);
|
||||
std::unordered_set<inet_address> local_dc_nodes = _tmptr->get_topology().get_datacenter_endpoints().at(local_dc);
|
||||
return do_get_ranges([ep, local_dc_nodes = std::move(local_dc_nodes)] (inet_address_vector_replica_set eps) {
|
||||
// Unlike get_primary_ranges() which checks if ep is the first
|
||||
// owner of this range, here we check if ep is the first just
|
||||
// among nodes which belong to the local dc of ep.
|
||||
for (auto& e : eps) {
|
||||
if (local_dc_nodes.contains(e)) {
|
||||
if (e == ep) {
|
||||
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
||||
}
|
||||
break;
|
||||
return e == ep;
|
||||
}
|
||||
}
|
||||
prev_tok = tok;
|
||||
if (can_yield) {
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_multimap<inet_address, dht::token_range>
|
||||
|
||||
@@ -115,14 +115,6 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const = 0;
|
||||
replication_strategy_type get_type() const { return _my_type; }
|
||||
|
||||
// get_ranges() returns the list of ranges held by the given endpoint.
|
||||
// The list is sorted, and its elements are non overlapping and non wrap-around.
|
||||
// It the analogue of Origin's getAddressRanges().get(endpoint).
|
||||
// This function is not efficient, and not meant for the fast path.
|
||||
dht::token_range_vector get_ranges(inet_address ep, can_yield can_yield = can_yield::no) const {
|
||||
return do_get_ranges(ep, _shared_token_metadata.get(), can_yield);
|
||||
}
|
||||
|
||||
// Use the token_metadata provided by the caller instead of _token_metadata
|
||||
// Caller must ensure that token_metadata will not change throughout the call if can_yield::yes
|
||||
dht::token_range_vector get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield can_yield = can_yield::no) const {
|
||||
@@ -140,18 +132,6 @@ protected:
|
||||
inet_address_vector_replica_set do_calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const;
|
||||
|
||||
public:
|
||||
// get_primary_ranges() returns the list of "primary ranges" for the given
|
||||
// endpoint. "Primary ranges" are the ranges that the node is responsible
|
||||
// for storing replica primarily, which means this is the first node
|
||||
// returned calculate_natural_endpoints().
|
||||
// This function is the analogue of Origin's
|
||||
// StorageService.getPrimaryRangesForEndpoint().
|
||||
dht::token_range_vector get_primary_ranges(inet_address ep, can_yield);
|
||||
// get_primary_ranges_within_dc() is similar to get_primary_ranges()
|
||||
// except it assigns a primary node for each range within each dc,
|
||||
// instead of one node globally.
|
||||
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep, can_yield);
|
||||
|
||||
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, can_yield) const;
|
||||
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield) const;
|
||||
|
||||
@@ -195,6 +175,28 @@ public:
|
||||
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const;
|
||||
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const;
|
||||
|
||||
// get_ranges() returns the list of ranges held by the given endpoint.
|
||||
// The list is sorted, and its elements are non overlapping and non wrap-around.
|
||||
// It the analogue of Origin's getAddressRanges().get(endpoint).
|
||||
// This function is not efficient, and not meant for the fast path.
|
||||
dht::token_range_vector get_ranges(inet_address ep) const;
|
||||
|
||||
// get_primary_ranges() returns the list of "primary ranges" for the given
|
||||
// endpoint. "Primary ranges" are the ranges that the node is responsible
|
||||
// for storing replica primarily, which means this is the first node
|
||||
// returned calculate_natural_endpoints().
|
||||
// This function is the analogue of Origin's
|
||||
// StorageService.getPrimaryRangesForEndpoint().
|
||||
dht::token_range_vector get_primary_ranges(inet_address ep) const;
|
||||
|
||||
// get_primary_ranges_within_dc() is similar to get_primary_ranges()
|
||||
// except it assigns a primary node for each range within each dc,
|
||||
// instead of one node globally.
|
||||
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep) const;
|
||||
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(noncopyable_function<bool(inet_address_vector_replica_set)> should_add_range) const;
|
||||
};
|
||||
|
||||
using effective_replication_map_ptr = lw_shared_ptr<const effective_replication_map>;
|
||||
|
||||
@@ -706,15 +706,14 @@ future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
}
|
||||
|
||||
static dht::token_range_vector get_primary_ranges_for_endpoint(
|
||||
database& db, sstring keyspace, gms::inet_address ep, utils::can_yield can_yield = utils::can_yield::no) {
|
||||
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
|
||||
return rs.get_primary_ranges(ep, can_yield);
|
||||
database& db, sstring keyspace, gms::inet_address ep) {
|
||||
return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges(ep);
|
||||
}
|
||||
|
||||
static dht::token_range_vector get_primary_ranges(
|
||||
database& db, sstring keyspace, utils::can_yield can_yield = utils::can_yield::no) {
|
||||
database& db, sstring keyspace) {
|
||||
return get_primary_ranges_for_endpoint(db, keyspace,
|
||||
utils::fb_utilities::get_broadcast_address(), can_yield);
|
||||
utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
// get_primary_ranges_within_dc() is similar to get_primary_ranges(),
|
||||
@@ -722,10 +721,8 @@ static dht::token_range_vector get_primary_ranges(
|
||||
// across the entire cluster, here each range is assigned a primary
|
||||
// owner in each of the clusters.
|
||||
static dht::token_range_vector get_primary_ranges_within_dc(
|
||||
database& db, sstring keyspace, utils::can_yield can_yield = utils::can_yield::no) {
|
||||
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
|
||||
return rs.get_primary_ranges_within_dc(
|
||||
utils::fb_utilities::get_broadcast_address(), can_yield);
|
||||
database& db, sstring keyspace) {
|
||||
return db.find_keyspace(keyspace).get_effective_replication_map()->get_primary_ranges_within_dc(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
static sstring get_local_dc() {
|
||||
@@ -1479,8 +1476,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
continue;
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(leaving_node, utils::can_yield::yes);
|
||||
dht::token_range_vector ranges = ks.get_effective_replication_map()->get_ranges(leaving_node);
|
||||
nr_ranges_total += ranges.size();
|
||||
}
|
||||
if (reason == streaming::stream_reason::decommission) {
|
||||
@@ -1502,8 +1498,9 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
}
|
||||
auto& ks = db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
auto erm = ks.get_effective_replication_map();
|
||||
// First get all ranges the leaving node is responsible for
|
||||
dht::token_range_vector ranges = strat.get_ranges(leaving_node, utils::can_yield::yes);
|
||||
dht::token_range_vector ranges = erm->get_ranges(leaving_node);
|
||||
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size());
|
||||
size_t nr_ranges_total = ranges.size();
|
||||
size_t nr_ranges_skipped = 0;
|
||||
|
||||
@@ -3442,7 +3442,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
|
||||
|
||||
dht::token_range_vector
|
||||
storage_service::get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const {
|
||||
return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep);
|
||||
return _db.local().find_keyspace(name).get_effective_replication_map()->get_ranges(ep);
|
||||
}
|
||||
|
||||
dht::token_range_vector
|
||||
|
||||
@@ -61,7 +61,6 @@ void print_natural_endpoints(double point, const inet_address_vector_replica_set
|
||||
testlog.debug("{}", strm.str());
|
||||
}
|
||||
|
||||
#ifndef SEASTAR_DEBUG
|
||||
static void verify_sorted(const dht::token_range_vector& trv) {
|
||||
auto not_strictly_before = [] (const dht::token_range a, const dht::token_range b) {
|
||||
return !b.start()
|
||||
@@ -71,15 +70,11 @@ static void verify_sorted(const dht::token_range_vector& trv) {
|
||||
};
|
||||
BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end());
|
||||
}
|
||||
#endif
|
||||
|
||||
static void check_ranges_are_sorted(abstract_replication_strategy::ptr_type ars, gms::inet_address ep) {
|
||||
// Too slow in debug mode
|
||||
#ifndef SEASTAR_DEBUG
|
||||
verify_sorted(ars->get_ranges(ep));
|
||||
verify_sorted(ars->get_primary_ranges(ep, utils::can_yield::no));
|
||||
verify_sorted(ars->get_primary_ranges_within_dc(ep, utils::can_yield::no));
|
||||
#endif
|
||||
static void check_ranges_are_sorted(effective_replication_map_ptr erm, gms::inet_address ep) {
|
||||
verify_sorted(erm->get_ranges(ep));
|
||||
verify_sorted(erm->get_primary_ranges(ep));
|
||||
verify_sorted(erm->get_primary_ranges_within_dc(ep));
|
||||
}
|
||||
|
||||
void strategy_sanity_check(
|
||||
@@ -179,7 +174,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
auto endpoints2 = erm->get_natural_endpoints(t2);
|
||||
|
||||
endpoints_check(ars_ptr, endpoints2);
|
||||
check_ranges_are_sorted(ars_ptr, rp.host);
|
||||
check_ranges_are_sorted(erm, rp.host);
|
||||
BOOST_CHECK(endpoints1 == endpoints2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user