From ee9bfb583c01aefa693eed2e87536cedecd00228 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 29 Jun 2023 18:23:05 +0200 Subject: [PATCH] combined: mergers: remove recursion in operator()() In mutation_reader_merger and clustering_order_reader_merger, the operator()() is responsible for producing mutation fragments that will be merged and pushed to the combined reader's buffer. Sometimes, it might have to advance existing readers, open new and / or close some existing ones, which requires calling a helper method and then calling operator()() recursively. In some unlucky circumstances, a stack overflow can occur: - Readers have to be opened incrementally, - Most or all readers must not produce any fragments and need to report end of stream without preemption, - There has to be enough readers opened within the lifetime of the combined reader (~500), - All of the above needs to happen within a single task quota. In order to prevent such a situation, the code of both reader merger classes were modified not to perform recursion at all. Most of the code of the operator()() was moved to maybe_produce_batch which does not recur if it is not possible for it to produce a fragment, instead it returns std::nullopt and operator()() calls this method in a loop via seastar::repeat_until_value. A regression test is added. Fixes: scylladb/scylladb#14415 Closes #14452 --- readers/combined.cc | 44 +++++++++++++++++++----------- test/boost/mutation_reader_test.cc | 44 ++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 16 deletions(-) diff --git a/readers/combined.cc b/readers/combined.cc index cf676e9f4c..0cbe31a6d8 100644 --- a/readers/combined.cc +++ b/readers/combined.cc @@ -32,6 +32,7 @@ struct mutation_fragment_and_stream_id { }; using mutation_fragment_batch = boost::iterator_range::iterator>; +using mutation_fragment_batch_opt = std::optional; template concept FragmentProducer = requires(Producer p, dht::partition_range part_range, position_range pos_range) { @@ -226,6 +227,7 @@ private: streamed_mutation::forwarding _fwd_sm; mutation_reader::forwarding _fwd_mr; private: + future maybe_produce_batch(); void maybe_add_readers_at_partition_boundary(); void maybe_add_readers(const std::optional& pos); void add_readers(std::vector new_readers); @@ -469,15 +471,21 @@ mutation_reader_merger::mutation_reader_merger(schema_ptr schema, } future mutation_reader_merger::operator()() { + return repeat_until_value([this] { return maybe_produce_batch(); }); +} + +future mutation_reader_merger::maybe_produce_batch() { // Avoid merging-related logic if we know that only a single reader owns // current partition. if (_single_reader.reader != reader_iterator{}) { if (_single_reader.reader->is_buffer_empty()) { if (_single_reader.reader->is_end_of_stream()) { _current.clear(); - return make_ready_future(_current, &_single_reader); + return make_ready_future(mutation_fragment_batch(_current, &_single_reader)); } - return _single_reader.reader->fill_buffer().then([this] { return operator()(); }); + return _single_reader.reader->fill_buffer().then([] { + return make_ready_future(); + }); } _current.clear(); _current.emplace_back(_single_reader.reader->pop_mutation_fragment(), &*_single_reader.reader); @@ -485,22 +493,22 @@ future mutation_reader_merger::operator()() { if (_current.back().fragment.is_end_of_partition()) { _next.emplace_back(std::exchange(_single_reader.reader, {}), mutation_fragment_v2::kind::partition_end); } - return make_ready_future(_current); + return make_ready_future(_current); } if (in_gallop_mode()) { return advance_galloping_reader().then([this] (needs_merge needs_merge) { if (!needs_merge) { - return make_ready_future(_current); + return make_ready_future(_current); } // Galloping reader may have lost to some other reader. In that case, we should proceed // with standard merging logic. - return (*this)(); + return make_ready_future(); }); } if (!_next.empty()) { - return prepare_next().then([this] { return (*this)(); }); + return prepare_next().then([] { return make_ready_future(); }); } _current.clear(); @@ -509,7 +517,7 @@ future mutation_reader_merger::operator()() { // readers for the next one. if (_fragment_heap.empty()) { if (!_halted_readers.empty() || _reader_heap.empty()) { - return make_ready_future(_current); + return make_ready_future(_current); } auto key = [] (const merger_vector& heap) -> const dht::decorated_key& { @@ -529,7 +537,7 @@ future mutation_reader_merger::operator()() { _current.emplace_back(std::move(_fragment_heap.back().fragment), &*_single_reader.reader); _fragment_heap.clear(); _gallop_mode_hits = 0; - return make_ready_future(_current); + return make_ready_future(_current); } } @@ -555,7 +563,7 @@ future mutation_reader_merger::operator()() { _gallop_mode_hits = 1; } - return make_ready_future(_current); + return make_ready_future(_current); } future<> mutation_reader_merger::next_partition() { @@ -918,7 +926,7 @@ class clustering_order_reader_merger { // // If the galloping reader wins with other readers again, the fragment is returned as the next batch. // Otherwise, the reader is pushed onto _peeked_readers and we retry in non-galloping mode. - future peek_galloping_reader() { + future peek_galloping_reader() { return _galloping_reader->reader.peek().then([this] (mutation_fragment_v2* mf) { bool erase = false; if (mf) { @@ -943,7 +951,7 @@ class clustering_order_reader_merger { || _cmp(mf->position(), _peeked_readers.front()->reader.peek_buffer().position()) < 0)) { _current_batch.emplace_back(_galloping_reader->reader.pop_mutation_fragment(), &_galloping_reader->reader); - return make_ready_future(_current_batch); + return make_ready_future(_current_batch); } // One of the existing readers won with the galloping reader, @@ -969,7 +977,7 @@ class clustering_order_reader_merger { return maybe_erase.then([this] { _galloping_reader = {}; _gallop_mode_hits = 0; - return (*this)(); + return make_ready_future(); }); }); } @@ -994,6 +1002,10 @@ public: // returned by the previous operator() call after calling operator() again // (the data from the previous batch is destroyed). future operator()() { + return repeat_until_value([this] { return maybe_produce_batch(); }); + } + + future maybe_produce_batch() { _current_batch.clear(); if (in_gallop_mode()) { @@ -1001,7 +1013,7 @@ public: } if (!_unpeeked_readers.empty()) { - return peek_readers().then([this] { return (*this)(); }); + return peek_readers().then([] { return make_ready_future(); }); } // Before we return a batch of fragments using currently opened readers we must check the queue @@ -1026,7 +1038,7 @@ public: _all_readers.push_front(std::move(r)); _unpeeked_readers.push_back(_all_readers.begin()); } - return peek_readers().then([this] { return (*this)(); }); + return peek_readers().then([] { return make_ready_future(); }); } if (_peeked_readers.empty()) { @@ -1040,7 +1052,7 @@ public: } _should_emit_partition_end = false; } - return make_ready_future(_current_batch); + return make_ready_future(_current_batch); } // Take all fragments with the next smallest position (there may be multiple such fragments). @@ -1074,7 +1086,7 @@ public: _gallop_mode_hits = 1; } - return make_ready_future(_current_batch); + return make_ready_future(_current_batch); } future<> next_partition() { diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index be9b7cb736..60e4742dcb 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -669,6 +669,50 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping assertions.produces_end_of_stream(); } +class selector_of_empty_readers : public reader_selector { + schema_ptr _schema; + reader_permit _permit; + size_t _remaining; +public: + selector_of_empty_readers(schema_ptr s, reader_permit permit, size_t count) + : reader_selector(s, dht::ring_position_view::min()) + , _schema(s) + , _permit(std::move(permit)) + , _remaining(count) { + } + virtual std::vector create_new_readers(const std::optional& pos) override { + if (_remaining == 0) { + return {}; + } + --_remaining; + std::vector ret; + ret.push_back(make_empty_flat_reader_v2(_schema, _permit)); + return ret; + } + virtual std::vector fast_forward_to(const dht::partition_range& pr) override { + assert(false); // Fast forward not supported by this reader + return {}; + } +}; + +// Reproduces scylladb/scylladb#14415 +SEASTAR_THREAD_TEST_CASE(test_combined_reader_with_incrementally_opened_empty_readers) { + static constexpr size_t empty_reader_count = 10 * 1000; + + simple_schema s; + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto reader = make_combined_reader(s.schema(), permit, + std::make_unique(s.schema(), permit, empty_reader_count), + streamed_mutation::forwarding::no, + mutation_reader::forwarding::no); + + // Expect that the reader won't produce a stack overflow + assert_that(std::move(reader)) + .produces_end_of_stream(); +} + SEASTAR_TEST_CASE(combined_mutation_reader_test) { return sstables::test_env::do_with_async([] (sstables::test_env& env) { simple_schema s;