forward_service: limit the number of partition ranges fetched

The forward service uses a vector of ranges owned by a particular
shard in order to split and delegate the work. The number can
grow large though, which can cause large allocations.
This commit limits the number of ranges handled at a time to 256.

Fixes #10725

Closes #11182
This commit is contained in:
Piotr Sarna
2022-08-01 13:44:47 +02:00
committed by Avi Kivity
parent 663f2e2a8f
commit dd2417618e

View File

@@ -277,20 +277,6 @@ public:
}
};
static dht::partition_range_vector retain_ranges_owned_by_this_shard(
schema_ptr schema,
dht::partition_range_vector pr
) {
partition_ranges_owned_by_this_shard owned_iter(schema, std::move(pr));
dht::partition_range_vector res;
while (std::optional<dht::partition_range> p = owned_iter.next(*schema)) {
res.push_back(*p);
}
return res;
}
locator::token_metadata_ptr forward_service::get_token_metadata_ptr() const noexcept {
return _shared_token_metadata.get();
}
@@ -412,16 +398,7 @@ future<query::forward_result> forward_service::execute_on_this_shard(
cql3::query_options::specific_options::DEFAULT,
cql_serialization_format::latest()
);
auto pager = service::pager::query_pagers::pager(
_proxy,
schema,
selection,
*query_state,
*query_options,
make_lw_shared<query::read_command>(std::move(req.cmd)),
retain_ranges_owned_by_this_shard(schema, std::move(req.pr)),
nullptr // No filtering restrictions
);
auto rs_builder = cql3::selection::result_set_builder(
*selection,
now,
@@ -429,10 +406,42 @@ future<query::forward_result> forward_service::execute_on_this_shard(
std::vector<size_t>() // Represents empty GROUP BY indices.
);
// Execute query.
while (!pager->is_exhausted()) {
co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout);
}
dht::partition_range_vector ranges_owned_by_this_shard;
partition_ranges_owned_by_this_shard owned_iter(schema, std::move(req.pr));
std::optional<dht::partition_range> current_range;
do {
// We serve up to 256 ranges at a time to avoid allocating a huge vector for ranges
static constexpr size_t max_ranges = 256;
while ((current_range = owned_iter.next(*schema))) {
ranges_owned_by_this_shard.push_back(*current_range);
if (ranges_owned_by_this_shard.size() >= max_ranges) {
break;
}
}
if (ranges_owned_by_this_shard.empty()) {
break;
}
flogger.trace("Forwarding to {} ranges owned by this shard", ranges_owned_by_this_shard.size());
auto pager = service::pager::query_pagers::pager(
_proxy,
schema,
selection,
*query_state,
*query_options,
make_lw_shared<query::read_command>(req.cmd),
std::move(ranges_owned_by_this_shard),
nullptr // No filtering restrictions
);
// Execute query.
while (!pager->is_exhausted()) {
co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout);
}
ranges_owned_by_this_shard.clear();
} while (current_range);
co_return co_await rs_builder.with_thread_if_needed([&req, &rs_builder, reductions = req.reduction_types, tr_state = std::move(tr_state)] {
auto rs = rs_builder.build();