From 5d5777f85edf92373e9172b2cdbbdfb50a3076dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 20 Feb 2019 14:42:03 +0000 Subject: [PATCH 1/3] tests/perf_fast_forward: disable partition-level fast-forwarding if not needed Several of the test cases in perf_fast_forward do not need partition-level fast-forwarding. However, since the defaults are used to construct most of the readers the fast-forwarding is enabled regardless. This showed an apparent regression in the perf_fast_forward results after adcb3ec20ccd3e8ecda7b9ddac81671e2eb47f3a ("row_cache: read is not single-partition if inter-partition forwarding is enabled") which disabled an optimisation that was invalid when partition-level fast-forwarind was requested. This patch ensures that all single-partition reads that do not need partition-level fast-forwarding keep it disabled. --- tests/perf/perf_fast_forward.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/perf/perf_fast_forward.cc b/tests/perf/perf_fast_forward.cc index 52a875983a..454aa4d4ff 100644 --- a/tests/perf/perf_fast_forward.cc +++ b/tests/perf/perf_fast_forward.cc @@ -805,13 +805,14 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range })) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, slice); + auto rd = cf.make_reader(cf.schema(), pr, slice, default_priority_class(), nullptr, + streamed_mutation::forwarding::no, mutation_reader::forwarding::no); return test_reading_all(rd); } static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) { auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes); + auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); metrics_snapshot before; assert_partition_start(rd); @@ -939,7 +940,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered slice, default_priority_class(), nullptr, - streamed_mutation::forwarding::yes); + streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); uint64_t fragments = 0; metrics_snapshot before; From 435e24f509c671fe8be30c24e1959e55553d786e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Thu, 21 Feb 2019 13:41:00 +0000 Subject: [PATCH 2/3] mutation_reader_merger: track readers by iterators and not pointers mutation_reader_merger uses a std::list of mutation_reader to keep them alive while the rest of the logic operates on non-owning pointers. This means that when it is a time to drop some of the readers that are no longer needed, the merger needs to scan the list looking for them. That's not ideal. The solution is to make the logic use iterators to elements in that list, which allows for O(1) removal of an unneeded reader. Iterators to list are just pointers to the node and are not invalidated by unrelated additions and removals. --- mutation_reader.cc | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index b6e416c421..f0a6188710 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -141,23 +141,25 @@ public: // stream of mutation-fragments. class mutation_reader_merger { public: + using reader_iterator = std::list::iterator; + struct reader_and_fragment { - flat_mutation_reader* reader; + reader_iterator reader{}; mutation_fragment fragment; - reader_and_fragment(flat_mutation_reader* r, mutation_fragment f) + reader_and_fragment(reader_iterator r, mutation_fragment f) : reader(r) , fragment(std::move(f)) { } }; struct reader_and_last_fragment_kind { - flat_mutation_reader* reader = nullptr; + reader_iterator reader{}; mutation_fragment::kind last_kind = mutation_fragment::kind::partition_end; reader_and_last_fragment_kind() = default; - reader_and_last_fragment_kind(flat_mutation_reader* r, mutation_fragment::kind k) + reader_and_last_fragment_kind(reader_iterator r, mutation_fragment::kind k) : reader(r) , last_kind(k) { } @@ -268,8 +270,7 @@ void mutation_reader_merger::maybe_add_readers(const std::optional new_readers) { for (auto&& new_reader : new_readers) { _all_readers.emplace_back(std::move(new_reader)); - auto* r = &_all_readers.back(); - _next.emplace_back(r, mutation_fragment::kind::partition_end); + _next.emplace_back(std::prev(_all_readers.end()), mutation_fragment::kind::partition_end); } } @@ -320,7 +321,7 @@ future<> mutation_reader_merger::prepare_next(db::timeout_clock::time_point time // end are out of data for good for the current range. _halted_readers.push_back(rk); } else if (_fwd_mr == mutation_reader::forwarding::no) { - _all_readers.remove_if([mr = rk.reader] (auto& r) { return &r == mr; }); + _all_readers.erase(rk.reader); } }); }).then([this] { @@ -343,7 +344,7 @@ void mutation_reader_merger::prepare_forwardable_readers() { _next.reserve(_halted_readers.size() + _fragment_heap.size() + _next.size()); std::move(_halted_readers.begin(), _halted_readers.end(), std::back_inserter(_next)); - if (_single_reader.reader) { + if (_single_reader.reader != reader_iterator{}) { _next.emplace_back(std::exchange(_single_reader.reader, {}), _single_reader.last_kind); } for (auto& df : _fragment_heap) { @@ -368,7 +369,7 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema, future mutation_reader_merger::operator()(db::timeout_clock::time_point timeout) { // Avoid merging-related logic if we know that only a single reader owns // current partition. - if (_single_reader.reader) { + if (_single_reader.reader != reader_iterator{}) { if (_single_reader.reader->is_buffer_empty()) { if (_single_reader.reader->is_end_of_stream()) { _current.clear(); @@ -447,8 +448,10 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr, _fragment_heap.clear(); _reader_heap.clear(); + for (auto it = _all_readers.begin(); it != _all_readers.end(); ++it) { + _next.emplace_back(it, mutation_fragment::kind::partition_end); + } return parallel_for_each(_all_readers, [this, &pr, timeout] (flat_mutation_reader& mr) { - _next.emplace_back(&mr, mutation_fragment::kind::partition_end); return mr.fast_forward_to(pr, timeout); }).then([this, &pr, timeout] { add_readers(_selector->fast_forward_to(pr, timeout)); From b524f96a74fa6b956b4056825f8524f485bda9f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Thu, 21 Feb 2019 14:32:06 +0000 Subject: [PATCH 3/3] mutation_reader_merger: drop unneded readers in small batches It was observed that destroying readers as soon as they are not needed negatively affects performance of relatively small reads. We don't want to keep them alive for too long either, since they may own a lot of memory, but deferring the destruction slightly and removing them in batches of 4 seems to solve the problem for the small reads. --- mutation_reader.cc | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index f0a6188710..756166ef68 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -174,6 +174,9 @@ private: // We need a list because we need stable addresses across additions // and removals. std::list _all_readers; + // We remove unneeded readers in batches. Until it is their time they + // are kept in _to_remove. + std::list _to_remove; // Readers positioned at a partition, different from the one we are // reading from now. For these readers the attached fragment is // always partition_start. Used to pick the next partition. @@ -321,7 +324,10 @@ future<> mutation_reader_merger::prepare_next(db::timeout_clock::time_point time // end are out of data for good for the current range. _halted_readers.push_back(rk); } else if (_fwd_mr == mutation_reader::forwarding::no) { - _all_readers.erase(rk.reader); + _to_remove.splice(_to_remove.end(), _all_readers, rk.reader); + if (_to_remove.size() >= 4) { + _to_remove.clear(); + } } }); }).then([this] {