mapreduce: change next_vnode lambda to get_next_partition_range function

The motivation of this code reorganization is to shorten
the time when ERM is being kept, done later in this patch series.

Ref. scylladb#21831
This commit is contained in:
Andrzej Jackowski
2025-06-03 13:39:25 +02:00
parent 2d716f3ffe
commit c4e8a2c44e

View File

@@ -548,25 +548,23 @@ future<> mapreduce_service::uninit_messaging_service() {
return ser::mapreduce_request_rpc_verbs::unregister(&_messaging);
}
std::optional<dht::partition_range> get_next_partition_range(query_ranges_to_vnodes_generator& generator) {
if (auto vnode = generator(1); !vnode.empty()) {
return vnode[0];
}
return {};
}
future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state) {
schema_ptr schema = local_schema_registry().get(req.cmd.schema_version);
replica::table& cf = _db.local().find_column_family(schema);
auto erm = cf.get_effective_replication_map();
// next_vnode is used to iterate through all vnodes produced by
// query_ranges_to_vnodes_generator.
auto next_vnode = [
generator = query_ranges_to_vnodes_generator(erm->make_splitter(), schema, req.pr)
] () mutable -> std::optional<dht::partition_range> {
if (auto vnode = generator(1); !vnode.empty()) {
return vnode[0];
}
return {};
};
// Group vnodes by assigned endpoint.
std::map<locator::host_id, dht::partition_range_vector> vnodes_per_addr;
const auto& topo = get_token_metadata_ptr()->get_topology();
while (std::optional<dht::partition_range> vnode = next_vnode()) {
auto generator = query_ranges_to_vnodes_generator(erm->make_splitter(), schema, req.pr);
while (std::optional<dht::partition_range> vnode = get_next_partition_range(generator)) {
host_id_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode));
// Do not choose an endpoint outside the current datacenter if a request has a local consistency
if (db::is_datacenter_local(req.cl)) {