combined: mergers: remove recursion in operator()()

In mutation_reader_merger and clustering_order_reader_merger, the
operator()() is responsible for producing mutation fragments that will
be merged and pushed to the combined reader's buffer. Sometimes, it
might have to advance existing readers, open new and / or close some
existing ones, which requires calling a helper method and then calling
operator()() recursively.

In some unlucky circumstances, a stack overflow can occur:

- Readers have to be opened incrementally,
- Most or all readers must not produce any fragments and need to report
  end of stream without preemption,
- There has to be enough readers opened within the lifetime of the
  combined reader (~500),
- All of the above needs to happen within a single task quota.

In order to prevent such a situation, the code of both reader merger
classes were modified not to perform recursion at all. Most of the code
of the operator()() was moved to maybe_produce_batch which does not
recur if it is not possible for it to produce a fragment, instead it
returns std::nullopt and operator()() calls this method in a loop via
seastar::repeat_until_value.

A regression test is added.

Fixes: scylladb/scylladb#14415

Closes #14452
This commit is contained in:
Piotr Dulikowski
2023-06-29 18:23:05 +02:00
committed by Botond Dénes
parent 5648bfb9a0
commit ee9bfb583c
2 changed files with 72 additions and 16 deletions

View File

@@ -32,6 +32,7 @@ struct mutation_fragment_and_stream_id {
};
using mutation_fragment_batch = boost::iterator_range<merger_vector<mutation_fragment_and_stream_id>::iterator>;
using mutation_fragment_batch_opt = std::optional<mutation_fragment_batch>;
template<typename Producer>
concept FragmentProducer = requires(Producer p, dht::partition_range part_range, position_range pos_range) {
@@ -226,6 +227,7 @@ private:
streamed_mutation::forwarding _fwd_sm;
mutation_reader::forwarding _fwd_mr;
private:
future<mutation_fragment_batch_opt> maybe_produce_batch();
void maybe_add_readers_at_partition_boundary();
void maybe_add_readers(const std::optional<dht::ring_position_view>& pos);
void add_readers(std::vector<flat_mutation_reader_v2> new_readers);
@@ -469,15 +471,21 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema,
}
future<mutation_fragment_batch> mutation_reader_merger::operator()() {
return repeat_until_value([this] { return maybe_produce_batch(); });
}
future<mutation_fragment_batch_opt> mutation_reader_merger::maybe_produce_batch() {
// Avoid merging-related logic if we know that only a single reader owns
// current partition.
if (_single_reader.reader != reader_iterator{}) {
if (_single_reader.reader->is_buffer_empty()) {
if (_single_reader.reader->is_end_of_stream()) {
_current.clear();
return make_ready_future<mutation_fragment_batch>(_current, &_single_reader);
return make_ready_future<mutation_fragment_batch_opt>(mutation_fragment_batch(_current, &_single_reader));
}
return _single_reader.reader->fill_buffer().then([this] { return operator()(); });
return _single_reader.reader->fill_buffer().then([] {
return make_ready_future<mutation_fragment_batch_opt>();
});
}
_current.clear();
_current.emplace_back(_single_reader.reader->pop_mutation_fragment(), &*_single_reader.reader);
@@ -485,22 +493,22 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
if (_current.back().fragment.is_end_of_partition()) {
_next.emplace_back(std::exchange(_single_reader.reader, {}), mutation_fragment_v2::kind::partition_end);
}
return make_ready_future<mutation_fragment_batch>(_current);
return make_ready_future<mutation_fragment_batch_opt>(_current);
}
if (in_gallop_mode()) {
return advance_galloping_reader().then([this] (needs_merge needs_merge) {
if (!needs_merge) {
return make_ready_future<mutation_fragment_batch>(_current);
return make_ready_future<mutation_fragment_batch_opt>(_current);
}
// Galloping reader may have lost to some other reader. In that case, we should proceed
// with standard merging logic.
return (*this)();
return make_ready_future<mutation_fragment_batch_opt>();
});
}
if (!_next.empty()) {
return prepare_next().then([this] { return (*this)(); });
return prepare_next().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
}
_current.clear();
@@ -509,7 +517,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
// readers for the next one.
if (_fragment_heap.empty()) {
if (!_halted_readers.empty() || _reader_heap.empty()) {
return make_ready_future<mutation_fragment_batch>(_current);
return make_ready_future<mutation_fragment_batch_opt>(_current);
}
auto key = [] (const merger_vector<reader_and_fragment>& heap) -> const dht::decorated_key& {
@@ -529,7 +537,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
_current.emplace_back(std::move(_fragment_heap.back().fragment), &*_single_reader.reader);
_fragment_heap.clear();
_gallop_mode_hits = 0;
return make_ready_future<mutation_fragment_batch>(_current);
return make_ready_future<mutation_fragment_batch_opt>(_current);
}
}
@@ -555,7 +563,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()() {
_gallop_mode_hits = 1;
}
return make_ready_future<mutation_fragment_batch>(_current);
return make_ready_future<mutation_fragment_batch_opt>(_current);
}
future<> mutation_reader_merger::next_partition() {
@@ -918,7 +926,7 @@ class clustering_order_reader_merger {
//
// If the galloping reader wins with other readers again, the fragment is returned as the next batch.
// Otherwise, the reader is pushed onto _peeked_readers and we retry in non-galloping mode.
future<mutation_fragment_batch> peek_galloping_reader() {
future<mutation_fragment_batch_opt> peek_galloping_reader() {
return _galloping_reader->reader.peek().then([this] (mutation_fragment_v2* mf) {
bool erase = false;
if (mf) {
@@ -943,7 +951,7 @@ class clustering_order_reader_merger {
|| _cmp(mf->position(), _peeked_readers.front()->reader.peek_buffer().position()) < 0)) {
_current_batch.emplace_back(_galloping_reader->reader.pop_mutation_fragment(), &_galloping_reader->reader);
return make_ready_future<mutation_fragment_batch>(_current_batch);
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
}
// One of the existing readers won with the galloping reader,
@@ -969,7 +977,7 @@ class clustering_order_reader_merger {
return maybe_erase.then([this] {
_galloping_reader = {};
_gallop_mode_hits = 0;
return (*this)();
return make_ready_future<mutation_fragment_batch_opt>();
});
});
}
@@ -994,6 +1002,10 @@ public:
// returned by the previous operator() call after calling operator() again
// (the data from the previous batch is destroyed).
future<mutation_fragment_batch> operator()() {
return repeat_until_value([this] { return maybe_produce_batch(); });
}
future<mutation_fragment_batch_opt> maybe_produce_batch() {
_current_batch.clear();
if (in_gallop_mode()) {
@@ -1001,7 +1013,7 @@ public:
}
if (!_unpeeked_readers.empty()) {
return peek_readers().then([this] { return (*this)(); });
return peek_readers().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
}
// Before we return a batch of fragments using currently opened readers we must check the queue
@@ -1026,7 +1038,7 @@ public:
_all_readers.push_front(std::move(r));
_unpeeked_readers.push_back(_all_readers.begin());
}
return peek_readers().then([this] { return (*this)(); });
return peek_readers().then([] { return make_ready_future<mutation_fragment_batch_opt>(); });
}
if (_peeked_readers.empty()) {
@@ -1040,7 +1052,7 @@ public:
}
_should_emit_partition_end = false;
}
return make_ready_future<mutation_fragment_batch>(_current_batch);
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
}
// Take all fragments with the next smallest position (there may be multiple such fragments).
@@ -1074,7 +1086,7 @@ public:
_gallop_mode_hits = 1;
}
return make_ready_future<mutation_fragment_batch>(_current_batch);
return make_ready_future<mutation_fragment_batch_opt>(_current_batch);
}
future<> next_partition() {

View File

@@ -669,6 +669,50 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
assertions.produces_end_of_stream();
}
class selector_of_empty_readers : public reader_selector {
schema_ptr _schema;
reader_permit _permit;
size_t _remaining;
public:
selector_of_empty_readers(schema_ptr s, reader_permit permit, size_t count)
: reader_selector(s, dht::ring_position_view::min())
, _schema(s)
, _permit(std::move(permit))
, _remaining(count) {
}
virtual std::vector<flat_mutation_reader_v2> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
if (_remaining == 0) {
return {};
}
--_remaining;
std::vector<flat_mutation_reader_v2> ret;
ret.push_back(make_empty_flat_reader_v2(_schema, _permit));
return ret;
}
virtual std::vector<flat_mutation_reader_v2> fast_forward_to(const dht::partition_range& pr) override {
assert(false); // Fast forward not supported by this reader
return {};
}
};
// Reproduces scylladb/scylladb#14415
SEASTAR_THREAD_TEST_CASE(test_combined_reader_with_incrementally_opened_empty_readers) {
static constexpr size_t empty_reader_count = 10 * 1000;
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto reader = make_combined_reader(s.schema(), permit,
std::make_unique<selector_of_empty_readers>(s.schema(), permit, empty_reader_count),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
// Expect that the reader won't produce a stack overflow
assert_that(std::move(reader))
.produces_end_of_stream();
}
SEASTAR_TEST_CASE(combined_mutation_reader_test) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
simple_schema s;