diff --git a/repair/row_level.cc b/repair/row_level.cc index 801e419974..4040735acb 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -303,10 +303,10 @@ mutation_reader repair_reader::make_reader( return std::optional(dht::to_partition_range(*shard_range)); } return std::optional(); - }, compaction_time); + }, compaction_time, {}); } case read_strategy::multishard_filter: { - return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time), + return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}), [&remote_sharder, remote_shard](const dht::decorated_key& k) { return remote_sharder.shard_for_reads(k.token()) == remote_shard; }); diff --git a/replica/database.cc b/replica/database.cc index 50f1fd3a91..4a5d0d3308 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2910,11 +2910,12 @@ void database::unplug_view_update_generator() noexcept { mutation_reader make_multishard_streaming_reader(distributed& db, schema_ptr schema, reader_permit permit, std::function()> range_generator, - gc_clock::time_point compaction_time) { + gc_clock::time_point compaction_time, + std::optional multishard_reader_buffer_size) { auto& table = db.local().find_column_family(schema); auto erm = table.get_effective_replication_map(); - auto ms = mutation_source([&db, erm, compaction_time] (schema_ptr s, + auto ms = mutation_source([&db, erm, compaction_time, multishard_reader_buffer_size] (schema_ptr s, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, @@ -2922,8 +2923,13 @@ mutation_reader make_multishard_streaming_reader(distributed& streamed_mutation::forwarding, mutation_reader::forwarding fwd_mr) { auto table_id = s->id(); - return make_multishard_combining_reader_v2(seastar::make_shared(db, table_id, compaction_time), - std::move(s), erm, std::move(permit), pr, ps, std::move(trace_state), fwd_mr); + const auto buffer_hint = multishard_reader_buffer_hint(multishard_reader_buffer_size.has_value()); + auto rd = make_multishard_combining_reader_v2(seastar::make_shared(db, table_id, compaction_time), + std::move(s), erm, std::move(permit), pr, ps, std::move(trace_state), fwd_mr, buffer_hint); + if (multishard_reader_buffer_size) { + rd.set_max_buffer_size(*multishard_reader_buffer_size); + } + return rd; }); auto&& full_slice = schema->full_slice(); return make_flat_multi_range_reader(schema, std::move(permit), std::move(ms), @@ -2931,18 +2937,29 @@ mutation_reader make_multishard_streaming_reader(distributed& } mutation_reader make_multishard_streaming_reader(distributed& db, - schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time) + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + gc_clock::time_point compaction_time, + std::optional multishard_reader_buffer_size) { const auto table_id = schema->id(); const auto& full_slice = schema->full_slice(); auto erm = db.local().find_column_family(schema).get_effective_replication_map(); - return make_multishard_combining_reader_v2( + auto rd = make_multishard_combining_reader_v2( seastar::make_shared(db, table_id, compaction_time), std::move(schema), std::move(erm), std::move(permit), range, - full_slice); + full_slice, + {}, + mutation_reader::forwarding::no, + multishard_reader_buffer_hint(multishard_reader_buffer_size.has_value())); + if (multishard_reader_buffer_size) { + rd.set_max_buffer_size(*multishard_reader_buffer_size); + } + return rd; } auto fmt::formatter::format(gc_clock::time_point tp, fmt::format_context& ctx) const diff --git a/replica/database.hh b/replica/database.hh index ed8845d3b2..9db47ae901 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1911,11 +1911,24 @@ future<> start_large_data_handler(sharded& db); // Range generator must generate disjoint, monotonically increasing ranges. // Opt-in for compacting the output by passing `compaction_time`, see // make_streaming_reader() for more details. -mutation_reader make_multishard_streaming_reader(distributed& db, schema_ptr schema, reader_permit permit, - std::function()> range_generator, gc_clock::time_point compaction_time); +// Setting multishard_reader_buffer_size enables the multishard reader's buffer +// size optimization (see make_multishard_combining_reader_v2()), using the +// given size. +mutation_reader make_multishard_streaming_reader( + distributed& db, + schema_ptr schema, + reader_permit permit, + std::function()> range_generator, + gc_clock::time_point compaction_time, + std::optional multishard_reader_buffer_size); -mutation_reader make_multishard_streaming_reader(distributed& db, - schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time); +mutation_reader make_multishard_streaming_reader( + distributed& db, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + gc_clock::time_point compaction_time, + std::optional multishard_reader_buffer_size); bool is_internal_keyspace(std::string_view name); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index a1a0b2476b..5950225822 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2326,7 +2326,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return dht::to_partition_range(*next); } return std::nullopt; - }, gc_clock::now()); + }, gc_clock::now(), {}); auto close_tested_reader = deferred_close(tested_reader); auto reader_factory = [db = &env.db()] (