Merge "Fix regression in perf_fast_forward results" from Paweł

"
After adcb3ec20c ("row_cache: read is not
single-partition if inter-partition forwarding is enabled") we have
noticed a regression in the results of some perf_fast_forward tests.
This was caused by those tests not disabling partition-level
fast-forwarding even though it was not needed and the commit in question
fixed an incorrect optimisation in such cases.

However, after solving that issue it has also become apparent that
mutation_reader_merger performs worse when the fast-forwarding is
disabled. This was attributed to logic responsible for dropping readers
as soon as they have reached the end of stream (which cannot be done if
fast-forwarding is enabled). This problem was mitigated with avoiding a
scan of the list and removing readers in small batches.

Fixes #4246.
Fixes #4254.

Tests: unit(dev)
"

* tag 'perf_fast_forward-fix-regression/v1' of https://github.com/pdziepak/scylla:
  mutation_reader_merger: drop unneded readers in small batches
  mutation_reader_merger: track readers by iterators and not pointers
  tests/perf_fast_forward: disable partition-level fast-forwarding if not needed
This commit is contained in:
Avi Kivity
2019-02-24 19:24:00 +02:00
2 changed files with 23 additions and 13 deletions

View File

@@ -141,23 +141,25 @@ public:
// stream of mutation-fragments.
class mutation_reader_merger {
public:
using reader_iterator = std::list<flat_mutation_reader>::iterator;
struct reader_and_fragment {
flat_mutation_reader* reader;
reader_iterator reader{};
mutation_fragment fragment;
reader_and_fragment(flat_mutation_reader* r, mutation_fragment f)
reader_and_fragment(reader_iterator r, mutation_fragment f)
: reader(r)
, fragment(std::move(f)) {
}
};
struct reader_and_last_fragment_kind {
flat_mutation_reader* reader = nullptr;
reader_iterator reader{};
mutation_fragment::kind last_kind = mutation_fragment::kind::partition_end;
reader_and_last_fragment_kind() = default;
reader_and_last_fragment_kind(flat_mutation_reader* r, mutation_fragment::kind k)
reader_and_last_fragment_kind(reader_iterator r, mutation_fragment::kind k)
: reader(r)
, last_kind(k) {
}
@@ -172,6 +174,9 @@ private:
// We need a list because we need stable addresses across additions
// and removals.
std::list<flat_mutation_reader> _all_readers;
// We remove unneeded readers in batches. Until it is their time they
// are kept in _to_remove.
std::list<flat_mutation_reader> _to_remove;
// Readers positioned at a partition, different from the one we are
// reading from now. For these readers the attached fragment is
// always partition_start. Used to pick the next partition.
@@ -268,8 +273,7 @@ void mutation_reader_merger::maybe_add_readers(const std::optional<dht::ring_pos
void mutation_reader_merger::add_readers(std::vector<flat_mutation_reader> new_readers) {
for (auto&& new_reader : new_readers) {
_all_readers.emplace_back(std::move(new_reader));
auto* r = &_all_readers.back();
_next.emplace_back(r, mutation_fragment::kind::partition_end);
_next.emplace_back(std::prev(_all_readers.end()), mutation_fragment::kind::partition_end);
}
}
@@ -320,7 +324,10 @@ future<> mutation_reader_merger::prepare_next(db::timeout_clock::time_point time
// end are out of data for good for the current range.
_halted_readers.push_back(rk);
} else if (_fwd_mr == mutation_reader::forwarding::no) {
_all_readers.remove_if([mr = rk.reader] (auto& r) { return &r == mr; });
_to_remove.splice(_to_remove.end(), _all_readers, rk.reader);
if (_to_remove.size() >= 4) {
_to_remove.clear();
}
}
});
}).then([this] {
@@ -343,7 +350,7 @@ void mutation_reader_merger::prepare_forwardable_readers() {
_next.reserve(_halted_readers.size() + _fragment_heap.size() + _next.size());
std::move(_halted_readers.begin(), _halted_readers.end(), std::back_inserter(_next));
if (_single_reader.reader) {
if (_single_reader.reader != reader_iterator{}) {
_next.emplace_back(std::exchange(_single_reader.reader, {}), _single_reader.last_kind);
}
for (auto& df : _fragment_heap) {
@@ -368,7 +375,7 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema,
future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::operator()(db::timeout_clock::time_point timeout) {
// Avoid merging-related logic if we know that only a single reader owns
// current partition.
if (_single_reader.reader) {
if (_single_reader.reader != reader_iterator{}) {
if (_single_reader.reader->is_buffer_empty()) {
if (_single_reader.reader->is_end_of_stream()) {
_current.clear();
@@ -447,8 +454,10 @@ future<> mutation_reader_merger::fast_forward_to(const dht::partition_range& pr,
_fragment_heap.clear();
_reader_heap.clear();
for (auto it = _all_readers.begin(); it != _all_readers.end(); ++it) {
_next.emplace_back(it, mutation_fragment::kind::partition_end);
}
return parallel_for_each(_all_readers, [this, &pr, timeout] (flat_mutation_reader& mr) {
_next.emplace_back(&mr, mutation_fragment::kind::partition_end);
return mr.fast_forward_to(pr, timeout);
}).then([this, &pr, timeout] {
add_readers(_selector->fast_forward_to(pr, timeout));

View File

@@ -805,13 +805,14 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range
}))
.build();
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
auto rd = cf.make_reader(cf.schema(), pr, slice);
auto rd = cf.make_reader(cf.schema(), pr, slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return test_reading_all(rd);
}
static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) {
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes);
auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
metrics_snapshot before;
assert_partition_start(rd);
@@ -939,7 +940,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered
slice,
default_priority_class(),
nullptr,
streamed_mutation::forwarding::yes);
streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
uint64_t fragments = 0;
metrics_snapshot before;