Merge 'cql3: limit the concurrency of indexed statements' from Piotr Sarna

Indexed select statements fetch primary key information from
their internal materialized views and then use it to query
the base table. Unfortunately, the current mechanism for retrieving
base table rows makes it easy to overwhelm the replicas with unbounded
concurrency - the number of concurrent ops is increased exponentially
until a short read is encountered, but it's not enough to cap the
concurrency - if data is fetched row-by-row, then short reads usually
don't occur and as a result it's easy to see concurrency of 1M or
higher. In order to avoid overloading the replicas, the concurrency
of indexed queries is now capped at 4096 and additionally throttled
if enough results are already fetched. For paged queries it means that
the query returns as soon as 1MB of data is ready, and for unpaged ones
the concurrency will no longer be doubled as soon as the previous
iteration fetched 1MB of results.

The fixed 4096 value can be subject to debate, its reasoning is as follows:
for 2KiB rows, so moderately large but not huge, they result in
fetching 10MB of data, which is the granularity used by replicas.
For 200B rows, which is rather small, the result would still be
around 1MB.
At the same time, 4096 separate tasks also means 4096 allocations,
so increasing the number also strains the allocator.

Fixes #8799

Tests: unit(release),
       manual: observing metrics of modified index_paging_test

Closes #8814

* github.com:scylladb/scylla:
  cql3: limit the transitional result size for indexed queries
  cql3: return indexed pages after 1MB worth of data
  cql3: limit the concurrency of indexed statements
This commit is contained in:
Avi Kivity
2021-06-07 18:00:51 +03:00
3 changed files with 39 additions and 13 deletions

View File

@@ -530,6 +530,7 @@ indexed_table_select_statement::do_execute_base_query(
query::result_merger merger;
service::query_ranges_to_vnodes_generator ranges_to_vnodes;
size_t concurrency = 1;
size_t previous_result_size = 0;
base_query_state(uint64_t row_limit, service::query_ranges_to_vnodes_generator&& ranges_to_vnodes_)
: merger(row_limit, query::max_partitions)
, ranges_to_vnodes(std::move(ranges_to_vnodes_))
@@ -538,12 +539,14 @@ indexed_table_select_statement::do_execute_base_query(
base_query_state(const base_query_state&) = delete;
};
const bool is_paged = bool(paging_state);
base_query_state query_state{cmd->get_row_limit() * queried_ranges_count, std::move(ranges_to_vnodes)};
return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) {
return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) {
auto& merger = query_state.merger;
auto& ranges_to_vnodes = query_state.ranges_to_vnodes;
auto& concurrency = query_state.concurrency;
return repeat([this, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() {
auto& previous_result_size = query_state.previous_result_size;
return repeat([this, is_paged, &previous_result_size, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() {
// Starting with 1 range, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more ranges than before
dht::partition_range_vector prange = ranges_to_vnodes(concurrency);
@@ -577,12 +580,17 @@ indexed_table_select_statement::do_execute_base_query(
command->slice.set_range(*_schema, base_pk, row_ranges);
}
}
concurrency *= 2;
if (previous_result_size < max_base_table_query_result_bytes && concurrency < max_base_table_query_concurrency) {
concurrency *= 2;
}
return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([&ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) {
.then([is_paged, &previous_result_size, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) {
auto is_short_read = qr.query_result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && qr.query_result->buf().size() >= max_base_table_query_result_bytes;
previous_result_size = qr.query_result->buf().size();
merger(std::move(qr.query_result));
return stop_iteration(is_short_read || ranges_to_vnodes.empty());
return stop_iteration(is_short_read || ranges_to_vnodes.empty() || page_limit_reached);
});
}).then([&merger]() {
return merger.get();
@@ -622,6 +630,8 @@ indexed_table_select_statement::do_execute_base_query(
query::result_merger merger;
std::vector<primary_key> primary_keys;
std::vector<primary_key>::iterator current_primary_key;
size_t previous_result_size = 0;
size_t next_iteration_size = 0;
base_query_state(uint64_t row_limit, std::vector<primary_key>&& keys)
: merger(row_limit, query::max_partitions)
, primary_keys(std::move(keys))
@@ -632,17 +642,24 @@ indexed_table_select_statement::do_execute_base_query(
};
base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)};
return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) {
const bool is_paged = bool(paging_state);
return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) {
auto &merger = query_state.merger;
auto &keys = query_state.primary_keys;
auto &key_it = query_state.current_primary_key;
return repeat([this, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() {
auto &previous_result_size = query_state.previous_result_size;
auto &next_iteration_size = query_state.next_iteration_size;
return repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() {
// Starting with 1 key, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more key than before
auto already_done = std::distance(keys.begin(), key_it);
auto next_iteration = already_done + 1;
next_iteration = std::min<size_t>(next_iteration, keys.size() - already_done);
auto key_it_end = key_it + next_iteration;
// If the previous result already provided 1MB worth of data,
// stop increasing the number of fetched partitions
if (previous_result_size < max_base_table_query_result_bytes) {
next_iteration_size = already_done + 1;
}
next_iteration_size = std::min<size_t>({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency});
auto key_it_end = key_it + next_iteration_size;
auto command = ::make_lw_shared<query::read_command>(*cmd);
query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
@@ -658,11 +675,14 @@ indexed_table_select_statement::do_execute_base_query(
.then([] (service::storage_proxy::coordinator_query_result qr) {
return std::move(qr.query_result);
});
}, std::move(oneshot_merger)).then([&key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) {
}, std::move(oneshot_merger)).then([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) {
auto is_short_read = result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && result->buf().size() >= max_base_table_query_result_bytes;
previous_result_size = result->buf().size();
merger(std::move(result));
key_it = key_it_end;
return stop_iteration(is_short_read || key_it == keys.end());
return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached);
});
}).then([&merger] () {
return merger.get();

View File

@@ -194,6 +194,9 @@ class indexed_table_select_statement : public select_statement {
noncopyable_function<dht::partition_range_vector(const query_options&)> _get_partition_ranges_for_posting_list;
noncopyable_function<query::partition_slice(const query_options&)> _get_partition_slice_for_posting_list;
public:
static constexpr size_t max_base_table_query_result_bytes = 1024*1024;
static constexpr size_t max_base_table_query_concurrency = 4096;
static ::shared_ptr<cql3::statements::select_statement> prepare(database& db,
schema_ptr schema,
uint32_t bound_terms,

View File

@@ -39,7 +39,10 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(4321);
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE_NE(rows, nullptr);
// It's fine to get less rows than requested due to paging limit, but never more than that
BOOST_REQUIRE_LE(rows->rs().get_metadata().column_count(), 4321);
});
eventually([&] {