diff --git a/mutation_reader.cc b/mutation_reader.cc index b6e416c421..756166ef68 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -141,23 +141,25 @@ public: // stream of mutation-fragments. class mutation_reader_merger { public: + using reader_iterator = std::list::iterator; + struct reader_and_fragment { - flat_mutation_reader* reader; + reader_iterator reader{}; mutation_fragment fragment; - reader_and_fragment(flat_mutation_reader* r, mutation_fragment f) + reader_and_fragment(reader_iterator r, mutation_fragment f) : reader(r) , fragment(std::move(f)) { } }; struct reader_and_last_fragment_kind { - flat_mutation_reader* reader = nullptr; + reader_iterator reader{}; mutation_fragment::kind last_kind = mutation_fragment::kind::partition_end; reader_and_last_fragment_kind() = default; - reader_and_last_fragment_kind(flat_mutation_reader* r, mutation_fragment::kind k) + reader_and_last_fragment_kind(reader_iterator r, mutation_fragment::kind k) : reader(r) , last_kind(k) { } @@ -172,6 +174,9 @@ private: // We need a list because we need stable addresses across additions // and removals. std::list _all_readers; + // We remove unneeded readers in batches. Until it is their time they + // are kept in _to_remove. + std::list _to_remove; // Readers positioned at a partition, different from the one we are // reading from now. For these readers the attached fragment is // always partition_start. Used to pick the next partition. @@ -268,8 +273,7 @@ void mutation_reader_merger::maybe_add_readers(const std::optional new_readers) { for (auto&& new_reader : new_readers) { _all_readers.emplace_back(std::move(new_reader)); - auto* r = &_all_readers.back(); - _next.emplace_back(r, mutation_fragment::kind::partition_end); + _next.emplace_back(std::prev(_all_readers.end()), mutation_fragment::kind::partition_end); } } @@ -320,7 +324,10 @@ future<> mutation_reader_merger::prepare_next(db::timeout_clock::time_point time // end are out of data for good for the current range. _halted_readers.push_back(rk); } else if (_fwd_mr == mutation_reader::forwarding::no) { - _all_readers.remove_if([mr = rk.reader] (auto& r) { return &r == mr; }); + _to_remove.splice(_to_remove.end(), _all_readers, rk.reader); + if (_to_remove.size() >= 4) { + _to_remove.clear(); + } } }); }).then([this] { @@ -343,7 +350,7 @@ void mutation_reader_merger::prepare_forwardable_readers() { _next.reserve(_halted_readers.size() + _fragment_heap.size() + _next.size()); std::move(_halted_readers.begin(), _halted_readers.end(), std::back_inserter(_next)); - if (_single_reader.reader) { + if (_single_reader.reader != reader_iterator{}) { _next.emplace_back(std::exchange(_single_reader.reader, {}), _single_reader.last_kind); } for (auto& df : _fragment_heap) { @@ -368,7 +375,7 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema, future mutation_reader_merger::operator()(db::timeout_clock::time_point timeout) { // Avoid merging-related logic if we know that only a single reader owns // current partition. - if (_single_reader.reader) { + if (_single_reader.reader != reader_iterator{}) { if (_single_reader.reader->is_buffer_empty()) { if (_single_reader.reader->is_end_of_stream()) { _current.clear(); @@ -447,8 +454,10 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr, _fragment_heap.clear(); _reader_heap.clear(); + for (auto it = _all_readers.begin(); it != _all_readers.end(); ++it) { + _next.emplace_back(it, mutation_fragment::kind::partition_end); + } return parallel_for_each(_all_readers, [this, &pr, timeout] (flat_mutation_reader& mr) { - _next.emplace_back(&mr, mutation_fragment::kind::partition_end); return mr.fast_forward_to(pr, timeout); }).then([this, &pr, timeout] { add_readers(_selector->fast_forward_to(pr, timeout)); diff --git a/tests/perf/perf_fast_forward.cc b/tests/perf/perf_fast_forward.cc index 52a875983a..454aa4d4ff 100644 --- a/tests/perf/perf_fast_forward.cc +++ b/tests/perf/perf_fast_forward.cc @@ -805,13 +805,14 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range })) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, slice); + auto rd = cf.make_reader(cf.schema(), pr, slice, default_priority_class(), nullptr, + streamed_mutation::forwarding::no, mutation_reader::forwarding::no); return test_reading_all(rd); } static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) { auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes); + auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); metrics_snapshot before; assert_partition_start(rd); @@ -939,7 +940,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered slice, default_priority_class(), nullptr, - streamed_mutation::forwarding::yes); + streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); uint64_t fragments = 0; metrics_snapshot before;