From dd2417618e5f0f11e5308d0123c7d6cddfed12fe Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 1 Aug 2022 13:44:47 +0200 Subject: [PATCH] forward_service: limit the number of partition ranges fetched The forward service uses a vector of ranges owned by a particular shard in order to split and delegate the work. The number can grow large though, which can cause large allocations. This commit limits the number of ranges handled at a time to 256. Fixes #10725 Closes #11182 --- service/forward_service.cc | 65 ++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/service/forward_service.cc b/service/forward_service.cc index d5160c09c3..c81fbb34f1 100644 --- a/service/forward_service.cc +++ b/service/forward_service.cc @@ -277,20 +277,6 @@ public: } }; -static dht::partition_range_vector retain_ranges_owned_by_this_shard( - schema_ptr schema, - dht::partition_range_vector pr -) { - partition_ranges_owned_by_this_shard owned_iter(schema, std::move(pr)); - dht::partition_range_vector res; - while (std::optional p = owned_iter.next(*schema)) { - res.push_back(*p); - } - - return res; -} - - locator::token_metadata_ptr forward_service::get_token_metadata_ptr() const noexcept { return _shared_token_metadata.get(); } @@ -412,16 +398,7 @@ future forward_service::execute_on_this_shard( cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest() ); - auto pager = service::pager::query_pagers::pager( - _proxy, - schema, - selection, - *query_state, - *query_options, - make_lw_shared(std::move(req.cmd)), - retain_ranges_owned_by_this_shard(schema, std::move(req.pr)), - nullptr // No filtering restrictions - ); + auto rs_builder = cql3::selection::result_set_builder( *selection, now, @@ -429,10 +406,42 @@ future forward_service::execute_on_this_shard( std::vector() // Represents empty GROUP BY indices. ); - // Execute query. - while (!pager->is_exhausted()) { - co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout); - } + dht::partition_range_vector ranges_owned_by_this_shard; + partition_ranges_owned_by_this_shard owned_iter(schema, std::move(req.pr)); + + std::optional current_range; + do { + // We serve up to 256 ranges at a time to avoid allocating a huge vector for ranges + static constexpr size_t max_ranges = 256; + while ((current_range = owned_iter.next(*schema))) { + ranges_owned_by_this_shard.push_back(*current_range); + if (ranges_owned_by_this_shard.size() >= max_ranges) { + break; + } + } + if (ranges_owned_by_this_shard.empty()) { + break; + } + flogger.trace("Forwarding to {} ranges owned by this shard", ranges_owned_by_this_shard.size()); + + auto pager = service::pager::query_pagers::pager( + _proxy, + schema, + selection, + *query_state, + *query_options, + make_lw_shared(req.cmd), + std::move(ranges_owned_by_this_shard), + nullptr // No filtering restrictions + ); + + // Execute query. + while (!pager->is_exhausted()) { + co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout); + } + + ranges_owned_by_this_shard.clear(); + } while (current_range); co_return co_await rs_builder.with_thread_if_needed([&req, &rs_builder, reductions = req.reduction_types, tr_state = std::move(tr_state)] { auto rs = rs_builder.build();