diff --git a/service/mapreduce_service.cc b/service/mapreduce_service.cc index 24d180fb92..5a021559e1 100644 --- a/service/mapreduce_service.cc +++ b/service/mapreduce_service.cc @@ -548,25 +548,23 @@ future<> mapreduce_service::uninit_messaging_service() { return ser::mapreduce_request_rpc_verbs::unregister(&_messaging); } +std::optional get_next_partition_range(query_ranges_to_vnodes_generator& generator) { + if (auto vnode = generator(1); !vnode.empty()) { + return vnode[0]; + } + return {}; +} + future mapreduce_service::dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state) { schema_ptr schema = local_schema_registry().get(req.cmd.schema_version); replica::table& cf = _db.local().find_column_family(schema); auto erm = cf.get_effective_replication_map(); - // next_vnode is used to iterate through all vnodes produced by - // query_ranges_to_vnodes_generator. - auto next_vnode = [ - generator = query_ranges_to_vnodes_generator(erm->make_splitter(), schema, req.pr) - ] () mutable -> std::optional { - if (auto vnode = generator(1); !vnode.empty()) { - return vnode[0]; - } - return {}; - }; // Group vnodes by assigned endpoint. std::map vnodes_per_addr; const auto& topo = get_token_metadata_ptr()->get_topology(); - while (std::optional vnode = next_vnode()) { + auto generator = query_ranges_to_vnodes_generator(erm->make_splitter(), schema, req.pr); + while (std::optional vnode = get_next_partition_range(generator)) { host_id_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode)); // Do not choose an endpoint outside the current datacenter if a request has a local consistency if (db::is_datacenter_local(req.cl)) {