replica: table: Reserve reader list capacity through a callback

add_memtables_to_reader_list() will be adapted to compaction groups.
For point queries, it will add memtables of a single group.
With the callback, add_memtables_to_reader_list() can tell its
caller the exact amount of memtable readers to be added, so it
can reserve precisely the readers capacity.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2022-12-15 15:19:53 -03:00
parent e841508685
commit f2ea79f26c
2 changed files with 18 additions and 10 deletions

View File

@@ -612,6 +612,7 @@ private:
return _config.dirty_memory_manager->region_group();
}
// reserve_fn will be called before any element is added to readers
void add_memtables_to_reader_list(std::vector<flat_mutation_reader_v2>& readers,
const schema_ptr& s,
const reader_permit& permit,
@@ -620,7 +621,8 @@ private:
const io_priority_class& pc,
const tracing::trace_state_ptr& trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
mutation_reader::forwarding fwd_mr,
std::function<void(size_t)> reserve_fn) const;
public:
sstring dir() const {
return _config.datadir;

View File

@@ -156,7 +156,9 @@ table::add_memtables_to_reader_list(std::vector<flat_mutation_reader_v2>& reader
const io_priority_class& pc,
const tracing::trace_state_ptr& trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
std::function<void(size_t)> reserve_fn) const {
reserve_fn(_compaction_group->memtables()->size());
for (auto&& mt : *_compaction_group->memtables()) {
if (auto reader_opt = mt->make_flat_reader_opt(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)) {
readers.emplace_back(std::move(*reader_opt));
@@ -188,7 +190,6 @@ table::make_reader_v2(schema_ptr s,
auto& slice = unreversed_slice ? *unreversed_slice : query_slice;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_compaction_group->memtables()->size() + 1);
// We're assuming that cache and memtables are both read atomically
// for single-key queries, so we don't need to special case memtable
@@ -210,7 +211,9 @@ table::make_reader_v2(schema_ptr s,
// https://github.com/scylladb/scylla/issues/309
// https://github.com/scylladb/scylla/issues/185
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr);
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
if (cache_enabled() && !bypass_cache && !(reversed && _config.reversed_reads_auto_bypass_cache())) {
@@ -257,8 +260,9 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_compaction_group->memtables()->size() + 1);
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr);
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
});
@@ -273,8 +277,9 @@ flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_p
const auto fwd = streamed_mutation::forwarding::no;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_compaction_group->memtables()->size() + 1);
add_memtables_to_reader_list(readers, schema, permit, range, slice, pc, trace_state, fwd, fwd_mr);
add_memtables_to_reader_list(readers, schema, permit, range, slice, pc, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
}
@@ -2359,9 +2364,10 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_compaction_group->memtables()->size() + 1);
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr);
add_memtables_to_reader_list(readers, s, permit, range, slice, pc, trace_state, fwd, fwd_mr, [&] (size_t memtable_count) {
readers.reserve(memtable_count + 1);
});
auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));