Improve choice distribution for primary replica

I noticed during tests that `maybe_get_primary_replica`
would not distribute uniformly the choice of primary replica
because `info.replicas` on some shards would have an order whilst
on others it'd be ordered differently, thus making the function choose
a node as primary replica multiple times when it clearly could've
chosen a different nodes.

This patch sorts the replica set before passing it through the
scope filter.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
This commit is contained in:
Robert Bindar
2025-10-30 14:47:11 +02:00
parent d4e43bd34c
commit 817fdadd49
7 changed files with 39 additions and 18 deletions

View File

@@ -753,7 +753,7 @@ static future<bool> scan_table(
auto my_host_id = erm->get_topology().my_host_id();
const auto &tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (std::optional tablet = tablet_map.first_tablet(); tablet; tablet = tablet_map.next_tablet(*tablet)) {
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet);
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet, erm->get_topology());
// check if this is the primary replica for the current tablet
if (tablet_primary_replica.host == my_host_id && tablet_primary_replica.shard == this_shard_id()) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);

View File

@@ -13,6 +13,7 @@
#include "locator/tablet_sharder.hh"
#include "locator/token_range_splitter.hh"
#include "db/system_keyspace.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "utils/stall_free.hh"
#include "utils/rjson.hh"
@@ -240,7 +241,7 @@ tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migra
return replace_replica(tinfo.replicas, mig.src, mig.dst);
}
tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, std::function<bool(const tablet_replica&)> filter) {
tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
const auto& info = tablet_map.get_tablet_info(tid);
const auto* transition = tablet_map.get_tablet_transition_info(tid);
@@ -250,8 +251,8 @@ tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, t
}
return transition->writes;
};
auto primary = [tid, filter = std::move(filter)] (tablet_replica_set set) -> std::optional<tablet_replica> {
return maybe_get_primary_replica(tid, set, filter);
auto primary = [tid, filter = std::move(filter), &topo] (tablet_replica_set set) -> std::optional<tablet_replica> {
return maybe_get_primary_replica(tid, set, topo, filter);
};
auto add = [] (tablet_replica r1, tablet_replica r2) -> tablet_replica_set {
// if primary replica is not the one leaving, then only primary will be streamed to.
@@ -555,14 +556,30 @@ dht::token_range tablet_map::get_token_range_after_split(const token& t) const n
return get_token_range(id_after_split, log2_tablets_after_split);
}
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function<bool(const tablet_replica&)> filter) {
const auto replicas = replica_set | std::views::filter(std::move(filter)) | std::ranges::to<tablet_replica_set>();
auto tablet_replica_comparator(const locator::topology& topo) {
return [&topo](const tablet_replica& a, const tablet_replica& b) {
const auto loc_a = topo.get_location(a.host);
const auto loc_b = topo.get_location(b.host);
if (loc_a.dc != loc_b.dc) {
return loc_a.dc < loc_b.dc;
}
if (loc_a.rack != loc_b.rack) {
return loc_a.rack < loc_b.rack;
}
return a.host < b.host;
};
}
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter) {
tablet_replica_set replica_set_copy = replica_set;
std::ranges::sort(replica_set_copy, tablet_replica_comparator(topo));
const auto replicas = replica_set_copy | std::views::filter(std::move(filter)) | std::ranges::to<tablet_replica_set>();
return !replicas.empty() ? std::make_optional(replicas.at(size_t(id) % replicas.size())) : std::nullopt;
}
tablet_replica tablet_map::get_primary_replica(tablet_id id) const {
tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topology& topo) const {
const auto& replicas = get_tablet_info(id).replicas;
return replicas.at(size_t(id) % replicas.size());
return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value();
}
tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
@@ -574,7 +591,7 @@ tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
}
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, [&] (const auto& tr) {
return maybe_get_primary_replica(id, get_tablet_info(id).replicas, topo, [&] (const auto& tr) {
return tablet_task_info.selected_by_filters(tr, topo);
});
}

View File

@@ -355,7 +355,7 @@ class tablet_map;
/// Returns the replica set which will become the replica set of the tablet after executing a given tablet transition.
tablet_replica_set get_new_replicas(const tablet_info&, const tablet_migration_info&);
// If filter returns true, the replica can be chosen as primary replica.
tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, std::function<bool(const tablet_replica&)> filter);
tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, const locator::topology&, std::function<bool(const tablet_replica&)> filter);
tablet_transition_info migration_to_transition_info(const tablet_info&, const tablet_migration_info&);
/// Describes streaming required for a given tablet transition.
@@ -606,7 +606,7 @@ public:
dht::token_range get_token_range(tablet_id id) const;
/// Returns the primary replica for the tablet
tablet_replica get_primary_replica(tablet_id id) const;
tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const;
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
/// \throws std::runtime_error if the tablet has less than 2 replicas.
@@ -794,7 +794,7 @@ public:
// Check that all tablets which have replicas on this host, have a valid replica shard (< smp::count).
future<bool> check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host);
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function<bool(const tablet_replica&)> filter);
std::optional<tablet_replica> maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function<bool(const tablet_replica&)> filter);
struct tablet_routing_info {
tablet_replica_set tablet_replicas;
@@ -870,6 +870,10 @@ void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr
/// Returns the list of racks that can be used for placing replicas in a given DC.
rack_list get_allowed_racks(const locator::token_metadata&, const sstring& dc);
/// Returns a comparator function that can be used to sort tablet_replicas
/// according to <dc, rack, host_id> order in the given topology.
auto tablet_replica_comparator(const locator::topology& topo);
}
template <>

View File

@@ -1190,7 +1190,7 @@ static unsigned get_cas_shard(const schema& s, dht::token token, const locator::
if (const auto& rs = erm.get_replication_strategy(); rs.uses_tablets()) {
const auto& tablet_map = erm.get_token_metadata().tablets().get_tablet_map(s.id());
const auto tablet_id = tablet_map.get_tablet_id(token);
return tablet_map.get_primary_replica(tablet_id).shard % smp::count;
return tablet_map.get_primary_replica(tablet_id, erm.get_topology()).shard % smp::count;
} else {
on_internal_error(paxos::paxos_state::logger,
format("failed to detect shard for reads for non-tablet-based rs {}, table {}.{}",

View File

@@ -5696,7 +5696,7 @@ future<std::map<token, inet_address>> storage_service::get_tablet_to_endpoint_ma
const auto& tmap = tm.tablets().get_tablet_map(table);
std::map<token, inet_address> result;
for (std::optional<locator::tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid).host));
result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid, tm.get_topology()).host));
co_await coroutine::maybe_yield();
}
co_return result;
@@ -8300,4 +8300,3 @@ future<> storage_service::query_cdc_streams(table_id table, noncopyable_function
}
} // namespace service

View File

@@ -1487,7 +1487,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.info("Skipped tablet rebuild repair of {} as no tablet replica was found", gid);
return make_ready_future<>();
}
auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()}, [] (const auto& tr) { return true; }).value().host;
auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()},
_db.get_token_metadata().get_topology(), [] (const auto& tr) { return true; }).value().host;
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
@@ -1713,7 +1714,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
const auto& topo = _db.get_token_metadata().get_topology();
locator::host_id dst;
if (hosts_filter.empty() && dcs_filter.empty()) {
auto primary = tmap.get_primary_replica(gid.tablet);
auto primary = tmap.get_primary_replica(gid.tablet, topo);
dst = primary.host;
} else {
auto dst_opt = tmap.maybe_get_selected_replica(gid.tablet, topo, tinfo.repair_task_info);

View File

@@ -245,7 +245,7 @@ host_id_vector_replica_set sstable_streamer::get_primary_endpoints(const dht::to
host_id_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(const dht::token& token, std::function<bool(const locator::host_id&)> filter) const {
auto tid = _tablet_map.get_tablet_id(token);
auto replicas = locator::get_primary_replicas(_tablet_map, tid, [filter = std::move(filter)] (const locator::tablet_replica& replica) {
auto replicas = locator::get_primary_replicas(_tablet_map, tid, _erm->get_topology(), [filter = std::move(filter)] (const locator::tablet_replica& replica) {
return filter(replica.host);
});
return to_replica_set(replicas);