diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 15d6ac0e3b..1b6a28de9e 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -530,6 +530,7 @@ indexed_table_select_statement::do_execute_base_query( query::result_merger merger; service::query_ranges_to_vnodes_generator ranges_to_vnodes; size_t concurrency = 1; + size_t previous_result_size = 0; base_query_state(uint64_t row_limit, service::query_ranges_to_vnodes_generator&& ranges_to_vnodes_) : merger(row_limit, query::max_partitions) , ranges_to_vnodes(std::move(ranges_to_vnodes_)) @@ -538,12 +539,14 @@ indexed_table_select_statement::do_execute_base_query( base_query_state(const base_query_state&) = delete; }; + const bool is_paged = bool(paging_state); base_query_state query_state{cmd->get_row_limit() * queried_ranges_count, std::move(ranges_to_vnodes)}; - return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { + return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { auto& merger = query_state.merger; auto& ranges_to_vnodes = query_state.ranges_to_vnodes; auto& concurrency = query_state.concurrency; - return repeat([this, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { + auto& previous_result_size = query_state.previous_result_size; + return repeat([this, is_paged, &previous_result_size, &ranges_to_vnodes, &merger, &proxy, &state, &options, &concurrency, cmd, timeout]() { // Starting with 1 range, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more ranges than before dht::partition_range_vector prange = ranges_to_vnodes(concurrency); @@ -577,12 +580,17 @@ indexed_table_select_statement::do_execute_base_query( command->slice.set_range(*_schema, base_pk, row_ranges); } } - concurrency *= 2; + if (previous_result_size < max_base_table_query_result_bytes && concurrency < max_base_table_query_concurrency) { + concurrency *= 2; + } return proxy.query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()}) - .then([&ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { + .then([is_paged, &previous_result_size, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) { auto is_short_read = qr.query_result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && qr.query_result->buf().size() >= max_base_table_query_result_bytes; + previous_result_size = qr.query_result->buf().size(); merger(std::move(qr.query_result)); - return stop_iteration(is_short_read || ranges_to_vnodes.empty()); + return stop_iteration(is_short_read || ranges_to_vnodes.empty() || page_limit_reached); }); }).then([&merger]() { return merger.get(); @@ -622,6 +630,8 @@ indexed_table_select_statement::do_execute_base_query( query::result_merger merger; std::vector primary_keys; std::vector::iterator current_primary_key; + size_t previous_result_size = 0; + size_t next_iteration_size = 0; base_query_state(uint64_t row_limit, std::vector&& keys) : merger(row_limit, query::max_partitions) , primary_keys(std::move(keys)) @@ -632,17 +642,24 @@ indexed_table_select_statement::do_execute_base_query( }; base_query_state query_state{cmd->get_row_limit(), std::move(primary_keys)}; - return do_with(std::move(query_state), [this, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { + const bool is_paged = bool(paging_state); + return do_with(std::move(query_state), [this, is_paged, &proxy, &state, &options, cmd, timeout] (auto&& query_state) { auto &merger = query_state.merger; auto &keys = query_state.primary_keys; auto &key_it = query_state.current_primary_key; - return repeat([this, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { + auto &previous_result_size = query_state.previous_result_size; + auto &next_iteration_size = query_state.next_iteration_size; + return repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &proxy, &state, &options, cmd, timeout]() { // Starting with 1 key, we check if the result was a short read, and if not, // we continue exponentially, asking for 2x more key than before auto already_done = std::distance(keys.begin(), key_it); - auto next_iteration = already_done + 1; - next_iteration = std::min(next_iteration, keys.size() - already_done); - auto key_it_end = key_it + next_iteration; + // If the previous result already provided 1MB worth of data, + // stop increasing the number of fetched partitions + if (previous_result_size < max_base_table_query_result_bytes) { + next_iteration_size = already_done + 1; + } + next_iteration_size = std::min({next_iteration_size, keys.size() - already_done, max_base_table_query_concurrency}); + auto key_it_end = key_it + next_iteration_size; auto command = ::make_lw_shared(*cmd); query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions); @@ -658,11 +675,14 @@ indexed_table_select_statement::do_execute_base_query( .then([] (service::storage_proxy::coordinator_query_result qr) { return std::move(qr.query_result); }); - }, std::move(oneshot_merger)).then([&key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { + }, std::move(oneshot_merger)).then([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr> result) { auto is_short_read = result->is_short_read(); + // Results larger than 1MB should be shipped to the client immediately + const bool page_limit_reached = is_paged && result->buf().size() >= max_base_table_query_result_bytes; + previous_result_size = result->buf().size(); merger(std::move(result)); key_it = key_it_end; - return stop_iteration(is_short_read || key_it == keys.end()); + return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached); }); }).then([&merger] () { return merger.get(); diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index cdf212d9ee..f2af46b1cd 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -194,6 +194,9 @@ class indexed_table_select_statement : public select_statement { noncopyable_function _get_partition_ranges_for_posting_list; noncopyable_function _get_partition_slice_for_posting_list; public: + static constexpr size_t max_base_table_query_result_bytes = 1024*1024; + static constexpr size_t max_base_table_query_concurrency = 4096; + static ::shared_ptr prepare(database& db, schema_ptr schema, uint32_t bound_terms, diff --git a/test/boost/index_with_paging_test.cc b/test/boost/index_with_paging_test.cc index 25132127c0..e010afced4 100644 --- a/test/boost/index_with_paging_test.cc +++ b/test/boost/index_with_paging_test.cc @@ -39,7 +39,10 @@ SEASTAR_TEST_CASE(test_index_with_paging) { auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); - assert_that(res).is_rows().with_size(4321); + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE_NE(rows, nullptr); + // It's fine to get less rows than requested due to paging limit, but never more than that + BOOST_REQUIRE_LE(rows->rs().get_metadata().column_count(), 4321); }); eventually([&] {