diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 81e83fa225..64322fb3b6 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -753,7 +753,7 @@ static future scan_table( auto my_host_id = erm->get_topology().my_host_id(); const auto &tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s->id()); for (std::optional tablet = tablet_map.first_tablet(); tablet; tablet = tablet_map.next_tablet(*tablet)) { - auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet); + auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet, erm->get_topology()); // check if this is the primary replica for the current tablet if (tablet_primary_replica.host == my_host_id && tablet_primary_replica.shard == this_shard_id()) { co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map); diff --git a/locator/tablets.cc b/locator/tablets.cc index 890d9a9467..1b54995597 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -13,6 +13,7 @@ #include "locator/tablet_sharder.hh" #include "locator/token_range_splitter.hh" #include "db/system_keyspace.hh" +#include "locator/topology.hh" #include "replica/database.hh" #include "utils/stall_free.hh" #include "utils/rjson.hh" @@ -240,7 +241,7 @@ tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migra return replace_replica(tinfo.replicas, mig.src, mig.dst); } -tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, std::function filter) { +tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, tablet_id tid, const locator::topology& topo, std::function filter) { const auto& info = tablet_map.get_tablet_info(tid); const auto* transition = tablet_map.get_tablet_transition_info(tid); @@ -250,8 +251,8 @@ tablet_replica_set get_primary_replicas(const locator::tablet_map& tablet_map, t } return transition->writes; }; - auto primary = [tid, filter = std::move(filter)] (tablet_replica_set set) -> std::optional { - return maybe_get_primary_replica(tid, set, filter); + auto primary = [tid, filter = std::move(filter), &topo] (tablet_replica_set set) -> std::optional { + return maybe_get_primary_replica(tid, set, topo, filter); }; auto add = [] (tablet_replica r1, tablet_replica r2) -> tablet_replica_set { // if primary replica is not the one leaving, then only primary will be streamed to. @@ -555,14 +556,30 @@ dht::token_range tablet_map::get_token_range_after_split(const token& t) const n return get_token_range(id_after_split, log2_tablets_after_split); } -std::optional maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function filter) { - const auto replicas = replica_set | std::views::filter(std::move(filter)) | std::ranges::to(); +auto tablet_replica_comparator(const locator::topology& topo) { + return [&topo](const tablet_replica& a, const tablet_replica& b) { + const auto loc_a = topo.get_location(a.host); + const auto loc_b = topo.get_location(b.host); + if (loc_a.dc != loc_b.dc) { + return loc_a.dc < loc_b.dc; + } + if (loc_a.rack != loc_b.rack) { + return loc_a.rack < loc_b.rack; + } + return a.host < b.host; + }; +} + +std::optional maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function filter) { + tablet_replica_set replica_set_copy = replica_set; + std::ranges::sort(replica_set_copy, tablet_replica_comparator(topo)); + const auto replicas = replica_set_copy | std::views::filter(std::move(filter)) | std::ranges::to(); return !replicas.empty() ? std::make_optional(replicas.at(size_t(id) % replicas.size())) : std::nullopt; } -tablet_replica tablet_map::get_primary_replica(tablet_id id) const { +tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topology& topo) const { const auto& replicas = get_tablet_info(id).replicas; - return replicas.at(size_t(id) % replicas.size()); + return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value(); } tablet_replica tablet_map::get_secondary_replica(tablet_id id) const { @@ -574,7 +591,7 @@ tablet_replica tablet_map::get_secondary_replica(tablet_id id) const { } std::optional tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const { - return maybe_get_primary_replica(id, get_tablet_info(id).replicas, [&] (const auto& tr) { + return maybe_get_primary_replica(id, get_tablet_info(id).replicas, topo, [&] (const auto& tr) { return tablet_task_info.selected_by_filters(tr, topo); }); } diff --git a/locator/tablets.hh b/locator/tablets.hh index d4658711e7..e3d09991e2 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -355,7 +355,7 @@ class tablet_map; /// Returns the replica set which will become the replica set of the tablet after executing a given tablet transition. tablet_replica_set get_new_replicas(const tablet_info&, const tablet_migration_info&); // If filter returns true, the replica can be chosen as primary replica. -tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, std::function filter); +tablet_replica_set get_primary_replicas(const locator::tablet_map&, tablet_id, const locator::topology&, std::function filter); tablet_transition_info migration_to_transition_info(const tablet_info&, const tablet_migration_info&); /// Describes streaming required for a given tablet transition. @@ -606,7 +606,7 @@ public: dht::token_range get_token_range(tablet_id id) const; /// Returns the primary replica for the tablet - tablet_replica get_primary_replica(tablet_id id) const; + tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const; /// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector /// \throws std::runtime_error if the tablet has less than 2 replicas. @@ -794,7 +794,7 @@ public: // Check that all tablets which have replicas on this host, have a valid replica shard (< smp::count). future check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host); -std::optional maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, std::function filter); +std::optional maybe_get_primary_replica(tablet_id id, const tablet_replica_set& replica_set, const locator::topology& topo, std::function filter); struct tablet_routing_info { tablet_replica_set tablet_replicas; @@ -870,6 +870,10 @@ void assert_rf_rack_valid_keyspace(std::string_view ks, const token_metadata_ptr /// Returns the list of racks that can be used for placing replicas in a given DC. rack_list get_allowed_racks(const locator::token_metadata&, const sstring& dc); +/// Returns a comparator function that can be used to sort tablet_replicas +/// according to order in the given topology. +auto tablet_replica_comparator(const locator::topology& topo); + } template <> diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f6da2c8d60..8c0ab3670e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1190,7 +1190,7 @@ static unsigned get_cas_shard(const schema& s, dht::token token, const locator:: if (const auto& rs = erm.get_replication_strategy(); rs.uses_tablets()) { const auto& tablet_map = erm.get_token_metadata().tablets().get_tablet_map(s.id()); const auto tablet_id = tablet_map.get_tablet_id(token); - return tablet_map.get_primary_replica(tablet_id).shard % smp::count; + return tablet_map.get_primary_replica(tablet_id, erm.get_topology()).shard % smp::count; } else { on_internal_error(paxos::paxos_state::logger, format("failed to detect shard for reads for non-tablet-based rs {}, table {}.{}", diff --git a/service/storage_service.cc b/service/storage_service.cc index 2c0bfdff53..04117bf510 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5696,7 +5696,7 @@ future> storage_service::get_tablet_to_endpoint_ma const auto& tmap = tm.tablets().get_tablet_map(table); std::map result; for (std::optional tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) { - result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid).host)); + result.emplace(tmap.get_last_token(*tid), _address_map.get(tmap.get_primary_replica(*tid, tm.get_topology()).host)); co_await coroutine::maybe_yield(); } co_return result; @@ -8300,4 +8300,3 @@ future<> storage_service::query_cdc_streams(table_id table, noncopyable_function } } // namespace service - diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index fd27d4e99d..9c9df95f39 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1487,7 +1487,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.info("Skipped tablet rebuild repair of {} as no tablet replica was found", gid); return make_ready_future<>(); } - auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()}, [] (const auto& tr) { return true; }).value().host; + auto dst = locator::maybe_get_primary_replica(gid.tablet, {tsi.read_from.begin(), tsi.read_from.end()}, + _db.get_token_metadata().get_topology(), [] (const auto& tr) { return true; }).value().host; rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid); return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) { return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) { @@ -1713,7 +1714,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { const auto& topo = _db.get_token_metadata().get_topology(); locator::host_id dst; if (hosts_filter.empty() && dcs_filter.empty()) { - auto primary = tmap.get_primary_replica(gid.tablet); + auto primary = tmap.get_primary_replica(gid.tablet, topo); dst = primary.host; } else { auto dst_opt = tmap.maybe_get_selected_replica(gid.tablet, topo, tinfo.repair_task_info); diff --git a/sstables_loader.cc b/sstables_loader.cc index 7b9b4fdd21..536fe77b92 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -245,7 +245,7 @@ host_id_vector_replica_set sstable_streamer::get_primary_endpoints(const dht::to host_id_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(const dht::token& token, std::function filter) const { auto tid = _tablet_map.get_tablet_id(token); - auto replicas = locator::get_primary_replicas(_tablet_map, tid, [filter = std::move(filter)] (const locator::tablet_replica& replica) { + auto replicas = locator::get_primary_replicas(_tablet_map, tid, _erm->get_topology(), [filter = std::move(filter)] (const locator::tablet_replica& replica) { return filter(replica.host); }); return to_replica_set(replicas);