cql3: limit the concurrency of indexed statements

Indexed select statements fetch primary key information from
their internal materialized views and then use it to query
the base table. Unfortunately, the current mechanism for retrieving
base table rows makes it easy to overwhelm the replicas with unbounded
concurrency - the number of concurrent ops is increased exponentially
until a short read is encountered, but it's not enough to cap the
concurrency - if data is fetched row-by-row, then short reads usually
don't occur and as a result it's easy to see concurrency of 1M or
higher. In order to avoid overloading the replicas, the concurrency
of indexed queries is now capped at 4096.
The number can be subject to debate, its reasoning is as follows:
for 2KiB rows, so moderately large but not huge, they result in
fetching 10MB of data, which is the granularity used by replicas.
For 200B rows, which is rather small, the result would still be
around 1MB.
At the same time, 4096 separate tasks also means 4096 allocations,
so increasing the number also strains the allocator.

Fixes #8799

Tests: unit(release),
       manual: observing metrics of modified index_paging_test
This commit is contained in:
Piotr Sarna
2021-06-07 12:58:11 +02:00
parent bd168d57ff
commit 8eeac10ded
2 changed files with 6 additions and 2 deletions

View File

@@ -574,7 +574,9 @@ indexed_table_select_statement::do_execute_base_query(
command->slice.set_range(*_schema, base_pk, row_ranges);
}
}
concurrency *= 2;
if (concurrency < max_base_table_query_concurrency) {
concurrency *= 2;
}
return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([&ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) {
auto is_short_read = qr.query_result->is_short_read();
@@ -638,7 +640,7 @@ indexed_table_select_statement::do_execute_base_query(
// we continue exponentially, asking for 2x more key than before
auto already_done = std::distance(keys.begin(), key_it);
auto next_iteration = already_done + 1;
next_iteration = std::min<size_t>(next_iteration, keys.size() - already_done);
next_iteration = std::min<size_t>({next_iteration, keys.size() - already_done, max_base_table_query_concurrency});
auto key_it_end = key_it + next_iteration;
auto command = ::make_lw_shared<query::read_command>(*cmd);

View File

@@ -188,6 +188,8 @@ class indexed_table_select_statement : public select_statement {
noncopyable_function<dht::partition_range_vector(const query_options&)> _get_partition_ranges_for_posting_list;
noncopyable_function<query::partition_slice(const query_options&)> _get_partition_slice_for_posting_list;
public:
static constexpr size_t max_base_table_query_concurrency = 4096;
static ::shared_ptr<cql3::statements::select_statement> prepare(database& db,
schema_ptr schema,
uint32_t bound_terms,