From 8eeac10ded098ba738c268453c8e82636a5f930f Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 7 Jun 2021 12:58:11 +0200 Subject: [PATCH 1/3] cql3: limit the concurrency of indexed statements Indexed select statements fetch primary key information from their internal materialized views and then use it to query the base table. Unfortunately, the current mechanism for retrieving base table rows makes it easy to overwhelm the replicas with unbounded concurrency - the number of concurrent ops is increased exponentially until a short read is encountered, but it's not enough to cap the concurrency - if data is fetched row-by-row, then short reads usually don't occur and as a result it's easy to see concurrency of 1M or higher. In order to avoid overloading the replicas, the concurrency of indexed queries is now capped at 4096. The number can be subject to debate, its reasoning is as follows: for 2KiB rows, so moderately large but not huge, they result in fetching 10MB of data, which is the granularity used by replicas. For 200B rows, which is rather small, the result would still be around 1MB. At the same time, 4096 separate tasks also means 4096 allocations, so increasing the number also strains the allocator. Fixes #8799 Tests: unit(release), manual: observing metrics of modified index_paging_test --- cql3/statements/select_statement.cc | 6 ++++-- cql3/statements/select_statement.hh | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index b83309f83b..38d2e82a2b 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -574,7 +574,9 @@ indexed_table_select_statement::do_execute_base_query( command->slice.set_range(*_schema, base_pk, row_ranges); } } - concurrency *= 2; + if (concurrency < max_base_table_query_concurrency) { + concurrency *= 2; + } return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) .then([&ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { auto is_short_read = qr.query_result->is_short_read(); @@ -638,7 +640,7 @@ indexed_table_select_statement::do_execute_base_query( // we continue exponentially, asking for 2x more key than before auto already_done = std::distance(keys.begin(), key_it); auto next_iteration = already_done + 1; - next_iteration = std::min(next_iteration, keys.size() - already_done); + next_iteration = std::min({next_iteration, keys.size() - already_done, max_base_table_query_concurrency}); auto key_it_end = key_it + next_iteration; auto command = ::make_lw_shared(*cmd); diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index d254fb6cf3..87279ced18 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -188,6 +188,8 @@ class indexed_table_select_statement : public select_statement { noncopyable_function _get_partition_ranges_for_posting_list; noncopyable_function _get_partition_slice_for_posting_list; public: + static constexpr size_t max_base_table_query_concurrency = 4096; + static ::shared_ptr prepare(database& db, schema_ptr schema, uint32_t bound_terms, From 60e55b6c7fb8b756cabf8d6368e09b892463f666 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 7 Jun 2021 15:42:17 +0200 Subject: [PATCH 2/3] cql3: return indexed pages after 1MB worth of data Currently there's no practical limit of the resulting page size for an indexed query, because it simply translates a page worth of base primary keys into base rows. In order to avoid sending too large pages, the result is returned after hitting a 1MB limit. --- cql3/statements/select_statement.cc | 22 ++++++++++++++-------- cql3/statements/select_statement.hh | 1 + 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 38d2e82a2b..bab2e1e542 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -535,12 +535,13 @@ indexed_table_select_statement::do_execute_base_query( base_query_state(const base_query_state&) = delete; }; + const bool is_paged = bool(paging_state); base_query_state query_state{cmd->get_row_limit() * queried_ranges_count, std::move(ranges_to_vnodes)}; - return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { + return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { auto& merger = query_state.merger; auto& ranges_to_vnodes = query_state.ranges_to_vnodes; auto& concurrency = query_state.concurrency; - return repeat([this, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { + return repeat([this, is_paged, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { // Starting with 1 range, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more ranges than before dht::partition_range_vector prange = ranges_to_vnodes(concurrency); @@ -578,10 +579,12 @@ indexed_table_select_statement::do_execute_base_query( concurrency *= 2; } return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) - .then([&ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { + .then([is_paged, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { auto is_short_read = qr.query_result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && qr.query_result->buf().size() >= max_base_table_query_result_bytes; merger(std::move(qr.query_result)); - return stop_iteration(is_short_read || ranges_to_vnodes.empty()); + return stop_iteration(is_short_read || ranges_to_vnodes.empty() || page_limit_reached); }); }).then([&merger]() { return merger.get(); @@ -631,11 +634,12 @@ indexed_table_select_statement::do_execute_base_query( }; base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)}; - return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { + const bool is_paged = bool(paging_state); + return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { auto &merger = query_state.merger; auto &keys = query_state.primary_keys; auto &key_it = query_state.current_primary_key; - return repeat([this, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { + return repeat([this, is_paged, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { // Starting with 1 key, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more key than before auto already_done = std::distance(keys.begin(), key_it); @@ -657,11 +661,13 @@ indexed_table_select_statement::do_execute_base_query( .then([] (service::storage_proxy::coordinator_query_result qr) { return std::move(qr.query_result); }); - }, std::move(oneshot_merger)).then([&key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { + }, std::move(oneshot_merger)).then([is_paged, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { auto is_short_read = result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && result->buf().size() >= max_base_table_query_result_bytes; merger(std::move(result)); key_it = key_it_end; - return stop_iteration(is_short_read || key_it == keys.end()); + return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached); }); }).then([&merger] () { return merger.get(); diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index 87279ced18..649d113ff6 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -188,6 +188,7 @@ class indexed_table_select_statement : public select_statement { noncopyable_function _get_partition_ranges_for_posting_list; noncopyable_function _get_partition_slice_for_posting_list; public: + static constexpr size_t max_base_table_query_result_bytes = 1024*1024; static constexpr size_t max_base_table_query_concurrency = 4096; static ::shared_ptr prepare(database& db, From df0d44486a728573c7272ce6e1320354290e1b71 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 7 Jun 2021 16:01:11 +0200 Subject: [PATCH 3/3] cql3: limit the transitional result size for indexed queries Unpaged indexed queries already have a concurrency limit of 4096, but now the concurrency is further limited by previous number of bytes fetched. Once this number reached 1MB, the concurrency will not be increased in consecutive queries to avoid overload. --- cql3/statements/select_statement.cc | 28 ++++++++++++++++++++-------- test/boost/index_with_paging_test.cc | 5 ++++- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index bab2e1e542..21fccc08e1 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -527,6 +527,7 @@ indexed_table_select_statement::do_execute_base_query( query::result_merger merger; service::query_ranges_to_vnodes_generator ranges_to_vnodes; size_t concurrency = 1; + size_t previous_result_size = 0; base_query_state(uint64_t row_limit, service::query_ranges_to_vnodes_generator&& ranges_to_vnodes_) : merger(row_limit, query::max_partitions) , ranges_to_vnodes(std::move(ranges_to_vnodes_)) @@ -541,7 +542,8 @@ indexed_table_select_statement::do_execute_base_query( auto& merger = query_state.merger; auto& ranges_to_vnodes = query_state.ranges_to_vnodes; auto& concurrency = query_state.concurrency; - return repeat([this, is_paged, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { + auto& previous_result_size = query_state.previous_result_size; + return repeat([this, is_paged, &previous_result_size, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { // Starting with 1 range, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more ranges than before dht::partition_range_vector prange = ranges_to_vnodes(concurrency); @@ -575,14 +577,15 @@ indexed_table_select_statement::do_execute_base_query( command->slice.set_range(*_schema, base_pk, row_ranges); } } - if (concurrency < max_base_table_query_concurrency) { + if (previous_result_size < max_base_table_query_result_bytes && concurrency < max_base_table_query_concurrency) { concurrency *= 2; } return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) - .then([is_paged, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { + .then([is_paged, &previous_result_size, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { auto is_short_read = qr.query_result->is_short_read(); // Results larger than 1MB should be shipped to the client immediately const bool page_limit_reached = is_paged && qr.query_result->buf().size() >= max_base_table_query_result_bytes; + previous_result_size = qr.query_result->buf().size(); merger(std::move(qr.query_result)); return stop_iteration(is_short_read || ranges_to_vnodes.empty() || page_limit_reached); }); @@ -624,6 +627,8 @@ indexed_table_select_statement::do_execute_base_query( query::result_merger merger; std::vector primary_keys; std::vector::iterator current_primary_key; + size_t previous_result_size = 0; + size_t next_iteration_size = 0; base_query_state(uint64_t row_limit, std::vector&& keys) : merger(row_limit, query::max_partitions) , primary_keys(std::move(keys)) @@ -639,13 +644,19 @@ indexed_table_select_statement::do_execute_base_query( auto &merger = query_state.merger; auto &keys = query_state.primary_keys; auto &key_it = query_state.current_primary_key; - return repeat([this, is_paged, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { + auto &previous_result_size = query_state.previous_result_size; + auto &next_iteration_size = query_state.next_iteration_size; + return repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { // Starting with 1 key, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more key than before auto already_done = std::distance(keys.begin(), key_it); - auto next_iteration = already_done + 1; - next_iteration = std::min({next_iteration, keys.size() - already_done, max_base_table_query_concurrency}); - auto key_it_end = key_it + next_iteration; + // If the previous result already provided 1MB worth of data, + // stop increasing the number of fetched partitions + if (previous_result_size < max_base_table_query_result_bytes) { + next_iteration_size = already_done + 1; + } + next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); + auto key_it_end = key_it + next_iteration_size; auto command = ::make_lw_shared(*cmd); query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); @@ -661,10 +672,11 @@ indexed_table_select_statement::do_execute_base_query( .then([] (service::storage_proxy::coordinator_query_result qr) { return std::move(qr.query_result); }); - }, std::move(oneshot_merger)).then([is_paged, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { + }, std::move(oneshot_merger)).then([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { auto is_short_read = result->is_short_read(); // Results larger than 1MB should be shipped to the client immediately const bool page_limit_reached = is_paged && result->buf().size() >= max_base_table_query_result_bytes; + previous_result_size = result->buf().size(); merger(std::move(result)); key_it = key_it_end; return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached); diff --git a/test/boost/index_with_paging_test.cc b/test/boost/index_with_paging_test.cc index 25132127c0..e010afced4 100644 --- a/test/boost/index_with_paging_test.cc +++ b/test/boost/index_with_paging_test.cc @@ -39,7 +39,10 @@ SEASTAR_TEST_CASE(test_index_with_paging) { auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); - assert_that(res).is_rows().with_size(4321); + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE_NE(rows, nullptr); + // It's fine to get less rows than requested due to paging limit, but never more than that + BOOST_REQUIRE_LE(rows->rs().get_metadata().column_count(), 4321); }); eventually([&] {