replica/database: make_multishard_streaming_reader(): expose buffer_hint parameter

Expose the buffer hint functionality added by the previous commits, to
callers of make_multishard_streaming_reader(). All callers disable it
currently, it will be used in the next patch.
This commit is contained in:
Botond Dénes
2024-11-06 09:01:02 -05:00
parent 3c25e6fcb4
commit e2344e28b6
4 changed files with 44 additions and 14 deletions

View File

@@ -303,10 +303,10 @@ mutation_reader repair_reader::make_reader(
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, compaction_time);
}, compaction_time, {});
}
case read_strategy::multishard_filter: {
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time),
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_for_reads(k.token()) == remote_shard;
});

View File

@@ -2910,11 +2910,12 @@ void database::unplug_view_update_generator() noexcept {
mutation_reader make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator,
gc_clock::time_point compaction_time) {
gc_clock::time_point compaction_time,
std::optional<size_t> multishard_reader_buffer_size) {
auto& table = db.local().find_column_family(schema);
auto erm = table.get_effective_replication_map();
auto ms = mutation_source([&db, erm, compaction_time] (schema_ptr s,
auto ms = mutation_source([&db, erm, compaction_time, multishard_reader_buffer_size] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
@@ -2922,8 +2923,13 @@ mutation_reader make_multishard_streaming_reader(distributed<replica::database>&
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
auto table_id = s->id();
return make_multishard_combining_reader_v2(seastar::make_shared<streaming_reader_lifecycle_policy>(db, table_id, compaction_time),
std::move(s), erm, std::move(permit), pr, ps, std::move(trace_state), fwd_mr);
const auto buffer_hint = multishard_reader_buffer_hint(multishard_reader_buffer_size.has_value());
auto rd = make_multishard_combining_reader_v2(seastar::make_shared<streaming_reader_lifecycle_policy>(db, table_id, compaction_time),
std::move(s), erm, std::move(permit), pr, ps, std::move(trace_state), fwd_mr, buffer_hint);
if (multishard_reader_buffer_size) {
rd.set_max_buffer_size(*multishard_reader_buffer_size);
}
return rd;
});
auto&& full_slice = schema->full_slice();
return make_flat_multi_range_reader(schema, std::move(permit), std::move(ms),
@@ -2931,18 +2937,29 @@ mutation_reader make_multishard_streaming_reader(distributed<replica::database>&
}
mutation_reader make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time)
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
gc_clock::time_point compaction_time,
std::optional<size_t> multishard_reader_buffer_size)
{
const auto table_id = schema->id();
const auto& full_slice = schema->full_slice();
auto erm = db.local().find_column_family(schema).get_effective_replication_map();
return make_multishard_combining_reader_v2(
auto rd = make_multishard_combining_reader_v2(
seastar::make_shared<streaming_reader_lifecycle_policy>(db, table_id, compaction_time),
std::move(schema),
std::move(erm),
std::move(permit),
range,
full_slice);
full_slice,
{},
mutation_reader::forwarding::no,
multishard_reader_buffer_hint(multishard_reader_buffer_size.has_value()));
if (multishard_reader_buffer_size) {
rd.set_max_buffer_size(*multishard_reader_buffer_size);
}
return rd;
}
auto fmt::formatter<gc_clock::time_point>::format(gc_clock::time_point tp, fmt::format_context& ctx) const

View File

@@ -1911,11 +1911,24 @@ future<> start_large_data_handler(sharded<replica::database>& db);
// Range generator must generate disjoint, monotonically increasing ranges.
// Opt-in for compacting the output by passing `compaction_time`, see
// make_streaming_reader() for more details.
mutation_reader make_multishard_streaming_reader(distributed<replica::database>& db, schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator, gc_clock::time_point compaction_time);
// Setting multishard_reader_buffer_size enables the multishard reader's buffer
// size optimization (see make_multishard_combining_reader_v2()), using the
// given size.
mutation_reader make_multishard_streaming_reader(
distributed<replica::database>& db,
schema_ptr schema,
reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator,
gc_clock::time_point compaction_time,
std::optional<size_t> multishard_reader_buffer_size);
mutation_reader make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time);
mutation_reader make_multishard_streaming_reader(
distributed<replica::database>& db,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
gc_clock::time_point compaction_time,
std::optional<size_t> multishard_reader_buffer_size);
bool is_internal_keyspace(std::string_view name);

View File

@@ -2326,7 +2326,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
return dht::to_partition_range(*next);
}
return std::nullopt;
}, gc_clock::now());
}, gc_clock::now(), {});
auto close_tested_reader = deferred_close(tested_reader);
auto reader_factory = [db = &env.db()] (