From 3efb17a6616acd653dddcb97a7cd1f5940535de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 09:46:06 +0200 Subject: [PATCH 01/15] range_tombstone_assembler: add get_current_tombstone() --- range_tombstone_assembler.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/range_tombstone_assembler.hh b/range_tombstone_assembler.hh index 83a7728378..d847a0dbd8 100644 --- a/range_tombstone_assembler.hh +++ b/range_tombstone_assembler.hh @@ -53,6 +53,10 @@ private: return _prev_rt && _prev_rt->tombstone(); } public: + tombstone get_current_tombstone() const { + return _prev_rt ? _prev_rt->tombstone() : tombstone(); + } + void reset() { _prev_rt = std::nullopt; } From 172c09438829995d660fba717d4a0391c5afef96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 15:25:02 +0200 Subject: [PATCH 02/15] range_tombstone_assembler: add get_range_tombstone_change() --- range_tombstone_assembler.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/range_tombstone_assembler.hh b/range_tombstone_assembler.hh index d847a0dbd8..e6720b189b 100644 --- a/range_tombstone_assembler.hh +++ b/range_tombstone_assembler.hh @@ -57,6 +57,10 @@ public: return _prev_rt ? _prev_rt->tombstone() : tombstone(); } + std::optional get_range_tombstone_change() && { + return std::move(_prev_rt); + } + void reset() { _prev_rt = std::nullopt; } From 1d842e980a04afc7b563562d5e2341561b4f065f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 09:47:39 +0200 Subject: [PATCH 03/15] compact_mutation: extract range tombstone consumption into own method Next patch wants to reuse the same code. --- mutation_compactor.hh | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 9b6ff5c3a8..89c6abcc3c 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -177,6 +177,20 @@ class compact_mutation_state { compaction_stats _stats; private: + template + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { + if (rt.tomb <= _range_tombstones.get_partition_tombstone()) { + return stop_iteration::no; + } + if (can_purge_tombstone(rt.tomb)) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + return gc_consumer.consume(std::move(rt)); + } else { + partition_is_not_empty(consumer); + return consumer.consume(std::move(rt)); + } + } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } @@ -409,16 +423,7 @@ public: ++_stats.range_tombstones; _range_tombstones.apply(rt); // FIXME: drop tombstone if it is fully covered by other range tombstones - if (rt.tomb > _range_tombstones.get_partition_tombstone()) { - if (can_purge_tombstone(rt.tomb)) { - partition_is_not_empty_for_gc_consumer(gc_consumer); - return gc_consumer.consume(std::move(rt)); - } else { - partition_is_not_empty(consumer); - return consumer.consume(std::move(rt)); - } - } - return stop_iteration::no; + return do_consume(std::move(rt), consumer, gc_consumer); } template From 790e73141f2cdec75ea84db6a51831bc46f42728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 09:50:27 +0200 Subject: [PATCH 04/15] compact_mutation: add support for consuming a v2 stream Consuming either a v1 or v2 stream is supported now, but compacted fragments are still emitted in the v1 format, thus the compactor acts an online downgrader when consuming a v2 stream. This allows pushing out downgrade to v1 on the input side all the way into the compactor. This means that reads for example can now use an all v2 reader pipeline, the still mandatory downgrade to v1 happening at the last possible place: just before creating the result-set. Mandatory because our intra-node ABI is still v1. There are consumers who are ready for v2 in principle (e.g. compaction), they have to wait a little bit more. --- multishard_mutation_query.cc | 6 +++-- mutation_compactor.hh | 46 +++++++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 89e47f57e8..d2b7481399 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -451,11 +451,13 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de auto& sharder = _schema->get_sharder(); const auto shard = sharder.shard_of(compaction_state.partition_start.key().token()); + auto& range_tombstones = std::get>(compaction_state.range_tombstones); + // It is possible that the reader this partition originates from does not // exist anymore. Either because we failed stopping it or because it was // evicted. if (_readers[shard].state != reader_state::saving) { - for (auto& rt : compaction_state.range_tombstones) { + for (auto& rt : range_tombstones) { stats.add_discarded(*_schema, rt); } if (compaction_state.static_row) { @@ -467,7 +469,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit); - for (auto& rt : compaction_state.range_tombstones | boost::adaptors::reversed) { + for (auto& rt : range_tombstones | boost::adaptors::reversed) { stats.add(*_schema, rt); shard_buffer.emplace_front(*_schema, _permit, std::move(rt)); } diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 89c6abcc3c..83f8b18769 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -23,6 +23,7 @@ #include "compaction/compaction_garbage_collector.hh" #include "mutation_fragment.hh" +#include "range_tombstone_assembler.hh" #include "tombstone_gc.hh" static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) { @@ -57,7 +58,7 @@ concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::dec struct detached_compaction_state { ::partition_start partition_start; std::optional<::static_row> static_row; - std::deque range_tombstones; + std::variant, std::optional> range_tombstones; }; class noop_compacted_fragments_consumer { @@ -160,7 +161,8 @@ class compact_mutation_state { uint32_t _partition_limit{}; uint64_t _partition_row_limit{}; - range_tombstone_accumulator _range_tombstones; + range_tombstone_accumulator _range_tombstones; // used when consuming v1 stream and for storing the partition tombstone + std::optional _rt_assembler; // used when consuming a v2 stream bool _static_row_live{}; uint64_t _rows_in_current_partition; @@ -191,6 +193,18 @@ private: return consumer.consume(std::move(rt)); } } + template + tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { + if (!_rt_assembler) { // we are either consuming v1 or consumed no range tombstone [change] at all + return _range_tombstones.tombstone_for_row(ckey); + } + if (_rt_assembler->needs_flush()) { + if (auto rt_opt = _rt_assembler->flush(_schema, position_in_partition::after_key(ckey))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + } + return std::max(_range_tombstones.get_partition_tombstone(), _rt_assembler->get_current_tombstone()); + } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } @@ -306,6 +320,9 @@ public: _rows_in_current_partition = 0; _static_row_live = false; _range_tombstones.clear(); + if (_rt_assembler) { + _rt_assembler->reset(); + } _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; _gc_before = std::nullopt; @@ -359,7 +376,7 @@ public: template requires CompactedFragmentsConsumer && CompactedFragmentsConsumer stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { - auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key()); + auto current_tombstone = tombstone_for_row(cr.key(), consumer, gc_consumer); auto t = cr.tomb(); t.apply(current_tombstone); @@ -426,9 +443,25 @@ public: return do_consume(std::move(rt), consumer, gc_consumer); } + template + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { + ++_stats.range_tombstones; + if (!_rt_assembler) { + _rt_assembler.emplace(); + } + if (auto rt_opt = _rt_assembler->consume(_schema, std::move(rtc))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + return stop_iteration::no; + } + template requires CompactedFragmentsConsumer && CompactedFragmentsConsumer stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { + if (_rt_assembler) { + _rt_assembler->on_end_of_stream(); + } if (!_empty_partition_in_gc_consumer) { gc_consumer.consume_end_of_partition(); } @@ -512,6 +545,9 @@ public: /// allows the compaction state to be stored in the compacted reader. detached_compaction_state detach_state() && { partition_start ps(std::move(_last_dk), _range_tombstones.get_partition_tombstone()); + if (_rt_assembler) { + return {std::move(ps), std::move(_last_static_row), std::move(*_rt_assembler).get_range_tombstone_change()}; + } return {std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()}; } @@ -569,6 +605,10 @@ public: return _state->consume(std::move(rt), _consumer, _gc_consumer); } + stop_iteration consume(range_tombstone_change&& rtc) { + return _state->consume(std::move(rtc), _consumer, _gc_consumer); + } + stop_iteration consume_end_of_partition() { return _state->consume_end_of_partition(_consumer, _gc_consumer); } From e8a918b25cb42e960ddebfba369f1595c6b17c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 15:59:36 +0200 Subject: [PATCH 05/15] compact_mutation: make start_new_page() independent of mutation_fragment version By using partition_region instead of mutation_fragment::kind. This will make incremental migration of users to v2 easier. --- mutation_compactor.hh | 5 ++--- querier.hh | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 83f8b18769..1bcd6965bb 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -513,7 +513,7 @@ public: void start_new_page(uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, - mutation_fragment::kind next_fragment_kind, + partition_region next_fragment_region, Consumer& consumer) { _empty_partition = true; _static_row_live = false; @@ -524,8 +524,7 @@ public: _query_time = query_time; _stats = {}; - if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone) - && _last_static_row) { + if (next_fragment_region == partition_region::clustered && _last_static_row) { // Stopping here would cause an infinite loop so ignore return value. noop_compacted_fragments_consumer nc; consume(*std::exchange(_last_static_row, {}), consumer, nc); diff --git a/querier.hh b/querier.hh index 5f6cc04fde..e77a0c8b13 100644 --- a/querier.hh +++ b/querier.hh @@ -86,8 +86,8 @@ auto consume_page(flat_mutation_reader& reader, gc_clock::time_point query_time) { return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( mutation_fragment* next_fragment) mutable { - const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; - compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, consumer); + const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); auto reader_consumer = make_stable_flattened_mutations_consumer>>( From 8556cb78cccf7f6ad9feb0f121e6625fca09c9c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 10:43:50 +0200 Subject: [PATCH 06/15] mutation_reader: add v2 clone of queue_reader As this reader is used in a wide variety of places, it would be a nightmare to upgrade all such sites in one go. So create a v2 clone and migrate users incrementally. --- mutation_reader.cc | 174 +++++++++++++++++++++++++++++++++++++++++++++ mutation_reader.hh | 45 ++++++++++++ 2 files changed, 219 insertions(+) diff --git a/mutation_reader.cc b/mutation_reader.cc index f5b0f59093..f8f10ef5db 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -2201,6 +2201,180 @@ std::pair make_queue_reader(schema_pt return {flat_mutation_reader(std::move(impl)), std::move(handle)}; } +class queue_reader_v2 final : public flat_mutation_reader_v2::impl { + friend class queue_reader_handle_v2; + +private: + queue_reader_handle_v2* _handle = nullptr; + std::optional> _not_full; + std::optional> _full; + std::exception_ptr _ex; + +private: + void push_and_maybe_notify(mutation_fragment_v2&& mf) { + push_mutation_fragment(std::move(mf)); + if (_full && is_buffer_full()) { + _full->set_value(); + _full.reset(); + } + } + +public: + explicit queue_reader_v2(schema_ptr s, reader_permit permit) + : impl(std::move(s), std::move(permit)) { + } + virtual future<> fill_buffer() override { + if (_ex) { + return make_exception_future<>(_ex); + } + if (_end_of_stream || !is_buffer_empty()) { + return make_ready_future<>(); + } + if (_not_full) { + _not_full->set_value(); + _not_full.reset(); + } + _full.emplace(); + return _full->get_future(); + } + virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty() && !is_end_of_stream()) { + return fill_buffer().then([this] { + return next_partition(); + }); + } + return make_ready_future<>(); + } + virtual future<> fast_forward_to(const dht::partition_range&) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> fast_forward_to(position_range) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> close() noexcept override { + // wake up any waiters to prevent broken_promise errors + if (_full) { + _full->set_value(); + _full.reset(); + } else if (_not_full) { + _not_full->set_value(); + _not_full.reset(); + } + // detach from the queue_reader_handle + // since it should never access the reader after close. + if (_handle) { + _handle->_reader = nullptr; + _handle = nullptr; + } + return make_ready_future<>(); + } + future<> push(mutation_fragment_v2&& mf) { + push_and_maybe_notify(std::move(mf)); + if (!is_buffer_full()) { + return make_ready_future<>(); + } + _not_full.emplace(); + return _not_full->get_future(); + } + void push_end_of_stream() { + _end_of_stream = true; + if (_full) { + _full->set_value(); + _full.reset(); + } + } + void abort(std::exception_ptr ep) noexcept { + _ex = std::move(ep); + if (_full) { + _full->set_exception(_ex); + _full.reset(); + } else if (_not_full) { + _not_full->set_exception(_ex); + _not_full.reset(); + } + } +}; + +void queue_reader_handle_v2::abandon() noexcept { + std::exception_ptr ex; + try { + ex = std::make_exception_ptr(std::runtime_error("Abandoned queue_reader_handle_v2")); + } catch (...) { + ex = std::current_exception(); + } + abort(std::move(ex)); +} + +queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_v2& reader) noexcept : _reader(&reader) { + _reader->_handle = this; +} + +queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept + : _reader(std::exchange(o._reader, nullptr)) + , _ex(std::exchange(o._ex, nullptr)) +{ + if (_reader) { + _reader->_handle = this; + } +} + +queue_reader_handle_v2::~queue_reader_handle_v2() { + abandon(); +} + +queue_reader_handle_v2& queue_reader_handle_v2::operator=(queue_reader_handle_v2&& o) { + abandon(); + _reader = std::exchange(o._reader, nullptr); + _ex = std::exchange(o._ex, {}); + if (_reader) { + _reader->_handle = this; + } + return *this; +} + +future<> queue_reader_handle_v2::push(mutation_fragment_v2 mf) { + if (!_reader) { + if (_ex) { + return make_exception_future<>(_ex); + } + return make_exception_future<>(std::runtime_error("Dangling queue_reader_handle_v2")); + } + return _reader->push(std::move(mf)); +} + +void queue_reader_handle_v2::push_end_of_stream() { + if (!_reader) { + throw std::runtime_error("Dangling queue_reader_handle_v2"); + } + _reader->push_end_of_stream(); + _reader->_handle = nullptr; + _reader = nullptr; +} + +bool queue_reader_handle_v2::is_terminated() const { + return _reader == nullptr; +} + +void queue_reader_handle_v2::abort(std::exception_ptr ep) { + _ex = std::move(ep); + if (_reader) { + _reader->abort(_ex); + _reader->_handle = nullptr; + _reader = nullptr; + } +} + +std::exception_ptr queue_reader_handle_v2::get_exception() const noexcept { + return _ex; +} + +std::pair make_queue_reader_v2(schema_ptr s, reader_permit permit) { + auto impl = std::make_unique(std::move(s), std::move(permit)); + auto handle = queue_reader_handle_v2(*impl); + return {flat_mutation_reader_v2(std::move(impl)), std::move(handle)}; +} + namespace { class compacting_reader : public flat_mutation_reader::impl { diff --git a/mutation_reader.hh b/mutation_reader.hh index 9d14b4ce98..ec9850912f 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -621,6 +621,51 @@ public: std::pair make_queue_reader(schema_ptr s, reader_permit permit); +class queue_reader_v2; + +/// Calls to different methods cannot overlap! +/// The handle can be used only while the reader is still alive. Once +/// `push_end_of_stream()` is called, the reader and the handle can be destroyed +/// in any order. The reader can be destroyed at any time. +class queue_reader_handle_v2 { + friend std::pair make_queue_reader_v2(schema_ptr, reader_permit); + friend class queue_reader_v2; + +private: + queue_reader_v2* _reader = nullptr; + std::exception_ptr _ex; + +private: + explicit queue_reader_handle_v2(queue_reader_v2& reader) noexcept; + + void abandon() noexcept; + +public: + queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept; + ~queue_reader_handle_v2(); + queue_reader_handle_v2& operator=(queue_reader_handle_v2&& o); + + future<> push(mutation_fragment_v2 mf); + + /// Terminate the queue. + /// + /// The reader will be set to EOS. The handle cannot be used anymore. + void push_end_of_stream(); + + /// Aborts the queue. + /// + /// All future operations on the handle or the reader will raise `ep`. + void abort(std::exception_ptr ep); + + /// Checks if the queue is already terminated with either a success or failure (abort) + bool is_terminated() const; + + /// Get the stored exception, if any + std::exception_ptr get_exception() const noexcept; +}; + +std::pair make_queue_reader_v2(schema_ptr s, reader_permit permit); + /// Creates a compacting reader. /// /// The compaction is done with a \ref mutation_compactor, using compaction-type From 2d7625f4b3418c4347e3ce759b35df4e4a93e126 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 11:01:17 +0200 Subject: [PATCH 07/15] flat_mutation_reader_v2: add reader_consumer_v2 typedef v2 version of the reader_consumer typedef. --- flat_mutation_reader_v2.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flat_mutation_reader_v2.hh b/flat_mutation_reader_v2.hh index ba751d099b..efed254b5a 100644 --- a/flat_mutation_reader_v2.hh +++ b/flat_mutation_reader_v2.hh @@ -827,3 +827,8 @@ make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque, const dht::partition_range& pr, const query::partition_slice& slice); + +/// A cosumer function that is passed a flat_mutation_reader to be consumed from +/// and returns a future<> resolved when the reader is fully consumed, and closed. +/// Note: the function assumes ownership of the reader and must close it in all cases. +using reader_consumer_v2 = noncopyable_function (flat_mutation_reader_v2)>; From 92244ae8ecc5e2fa37a88f173775e5debd6bd57f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 10:59:59 +0200 Subject: [PATCH 08/15] mutation_writer: add v2 clone of feed_writer and bucket_writer Since we have multiple writers using this that we don't want to migrate all at once, we create a v2 version of said classes so we can migrate them incrementally. --- mutation_writer/feed_writers.hh | 63 +++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/mutation_writer/feed_writers.hh b/mutation_writer/feed_writers.hh index 3e6bd8fc3c..c2f1f83e51 100644 --- a/mutation_writer/feed_writers.hh +++ b/mutation_writer/feed_writers.hh @@ -90,4 +90,67 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) { } } +class bucket_writer_v2 { + schema_ptr _schema; + queue_reader_handle_v2 _handle; + future<> _consume_fut; + +private: + bucket_writer_v2(schema_ptr schema, std::pair queue_reader_v2, reader_consumer_v2& consumer); + +public: + bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer); + + future<> consume(mutation_fragment_v2 mf); + + void consume_end_of_stream(); + + void abort(std::exception_ptr ep) noexcept; + + future<> close() noexcept; +}; + +template +requires MutationFragmentConsumerV2> +future<> feed_writer(flat_mutation_reader_v2&& rd_ref, Writer wr) { + // Only move in reader if stack was successfully allocated, so caller can close reader otherwise. + auto rd = std::move(rd_ref); + std::exception_ptr ex; + try { + while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) { + co_await rd.fill_buffer(); + while (!rd.is_buffer_empty()) { + co_await rd.pop_mutation_fragment().consume(wr); + } + } + } catch (...) { + ex = std::current_exception(); + } + + co_await rd.close(); + + try { + if (ex) { + wr.abort(ex); + } else { + wr.consume_end_of_stream(); + } + } catch (...) { + if (!ex) { + ex = std::current_exception(); + } + } + + try { + co_await wr.close(); + } catch (...) { + if (!ex) { + ex = std::current_exception(); + } + } + if (ex) { + std::rethrow_exception(ex); + } +} + } From 0601a465a2963731a15567280e1ecefc4f31c8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 11:01:39 +0200 Subject: [PATCH 09/15] mutation_writer: migrate shard_based_splitting_writer to v2 --- compaction/compaction.cc | 4 +- mutation_writer/feed_writers.cc | 40 +++++++++++++++++++ .../shard_based_splitting_writer.cc | 22 +++++----- .../shard_based_splitting_writer.hh | 2 +- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 04e9032bbc..cfe44eeac4 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -1551,7 +1551,9 @@ public: reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { - return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer)); + return mutation_writer::segregate_by_shard(upgrade_to_v2(std::move(reader)), [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) { + return end_consumer(downgrade_to_v1(std::move(rd))); + }); }; } diff --git a/mutation_writer/feed_writers.cc b/mutation_writer/feed_writers.cc index 8427e8a098..3f15e11269 100644 --- a/mutation_writer/feed_writers.cc +++ b/mutation_writer/feed_writers.cc @@ -63,4 +63,44 @@ future<> bucket_writer::close() noexcept { return std::move(_consume_fut); } +bucket_writer_v2::bucket_writer_v2(schema_ptr schema, std::pair queue_reader, reader_consumer_v2& consumer) + : _schema(schema) + , _handle(std::move(queue_reader.second)) + , _consume_fut(consumer(std::move(queue_reader.first))) +{ } + +bucket_writer_v2::bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer) + : bucket_writer_v2(schema, make_queue_reader_v2(schema, std::move(permit)), consumer) +{ } + +future<> bucket_writer_v2::consume(mutation_fragment_v2 mf) { + if (_handle.is_terminated()) { + // When the handle is terminated, it was aborted + // or associated reader was closed prematurely. + // In this case return _consume_fut that will propagate + // the root-cause error. + auto ex = _handle.get_exception(); + if (!ex) { + // shouldn't really happen + ex = make_exception_ptr(std::runtime_error("queue_reader_handle_v2 is terminated")); + } + return std::exchange(_consume_fut, make_exception_future<>(ex)).then([ex = std::move(ex)] () mutable { + return make_exception_future<>(std::move(ex)); + }); + } + return _handle.push(std::move(mf)); +} + +void bucket_writer_v2::consume_end_of_stream() { + _handle.push_end_of_stream(); +} + +void bucket_writer_v2::abort(std::exception_ptr ep) noexcept { + _handle.abort(std::move(ep)); +} + +future<> bucket_writer_v2::close() noexcept { + return std::move(_consume_fut); +} + } // mutation_writer diff --git a/mutation_writer/shard_based_splitting_writer.cc b/mutation_writer/shard_based_splitting_writer.cc index 2129dc0c44..e32917c1ec 100644 --- a/mutation_writer/shard_based_splitting_writer.cc +++ b/mutation_writer/shard_based_splitting_writer.cc @@ -31,21 +31,21 @@ namespace mutation_writer { class shard_based_splitting_mutation_writer { - using shard_writer = bucket_writer; + using shard_writer = bucket_writer_v2; private: schema_ptr _schema; reader_permit _permit; - reader_consumer _consumer; + reader_consumer_v2 _consumer; unsigned _current_shard; std::vector> _shards; - future<> write_to_shard(mutation_fragment&& mf) { + future<> write_to_shard(mutation_fragment_v2&& mf) { auto& writer = *_shards[_current_shard]; return writer.consume(std::move(mf)); } public: - shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer) + shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2 consumer) : _schema(std::move(schema)) , _permit(std::move(permit)) , _consumer(std::move(consumer)) @@ -57,23 +57,23 @@ public: if (!_shards[_current_shard]) { _shards[_current_shard] = shard_writer(_schema, _permit, _consumer); } - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(ps))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(ps))); } future<> consume(static_row&& sr) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(sr))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(sr))); } future<> consume(clustering_row&& cr) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(cr))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(cr))); } - future<> consume(range_tombstone&& rt) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(rt))); + future<> consume(range_tombstone_change&& rt) { + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(rt))); } future<> consume(partition_end&& pe) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(pe))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(pe))); } void consume_end_of_stream() { @@ -97,7 +97,7 @@ public: } }; -future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) { +future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer) { auto schema = producer.schema(); auto permit = producer.permit(); return feed_writer( diff --git a/mutation_writer/shard_based_splitting_writer.hh b/mutation_writer/shard_based_splitting_writer.hh index 187025a7cb..ac8b291eae 100644 --- a/mutation_writer/shard_based_splitting_writer.hh +++ b/mutation_writer/shard_based_splitting_writer.hh @@ -31,6 +31,6 @@ namespace mutation_writer { // manner. This is useful, for instance, in the resharding process where a user changes // the amount of CPU assigned to Scylla and we have to rewrite the SSTables to their new // owners. -future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer); +future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer); } // namespace mutation_writer From 9826b5d732a17ccc74ae80b23fd93fa7035b8968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 11:50:29 +0200 Subject: [PATCH 10/15] mutation_writer: migrate timestamp_based_splitting_writer to v2 --- compaction/time_window_compaction_strategy.cc | 4 +- .../timestamp_based_splitting_writer.cc | 50 +++++++++++-------- .../timestamp_based_splitting_writer.hh | 2 +- test/boost/mutation_writer_test.cc | 32 ++++++++---- 4 files changed, 52 insertions(+), 36 deletions(-) diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index c8cceff6b9..e165958f32 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -130,9 +130,9 @@ reader_consumer time_window_compaction_strategy::make_interposer_consumer(const } return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader rd) mutable -> future<> { return mutation_writer::segregate_by_timestamp( - std::move(rd), + upgrade_to_v2(std::move(rd)), classify_by_timestamp(std::move(options)), - std::move(end_consumer)); + [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) { return end_consumer(downgrade_to_v1(std::move(rd))); }); }; } diff --git a/mutation_writer/timestamp_based_splitting_writer.cc b/mutation_writer/timestamp_based_splitting_writer.cc index 5f05c7f4b3..6989752010 100644 --- a/mutation_writer/timestamp_based_splitting_writer.cc +++ b/mutation_writer/timestamp_based_splitting_writer.cc @@ -109,12 +109,12 @@ small_flat_map::find(const key_type& k) { class timestamp_based_splitting_mutation_writer { using bucket_id = int64_t; - class timestamp_bucket_writer : public bucket_writer { + class timestamp_bucket_writer : public bucket_writer_v2 { bool _has_current_partition = false; public: - timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer) - : bucket_writer(schema, std::move(permit), consumer) { + timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer) + : bucket_writer_v2(schema, std::move(permit), consumer) { } void set_has_current_partition() { _has_current_partition = true; @@ -131,13 +131,14 @@ private: schema_ptr _schema; reader_permit _permit; classify_by_timestamp _classifier; - reader_consumer _consumer; + reader_consumer_v2 _consumer; partition_start _current_partition_start; std::unordered_map _buckets; std::vector _buckets_used_for_current_partition; + std::optional _last_rtc_bucket; private: - future<> write_to_bucket(bucket_id bucket, mutation_fragment&& mf); + future<> write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf); std::optional examine_column(const atomic_cell_or_collection& c, const column_definition& cdef); std::optional examine_row(const row& r, column_kind kind); @@ -150,7 +151,7 @@ private: future<> write_marker_and_tombstone(const clustering_row& cr); public: - timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer consumer) + timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer_v2 consumer) : _schema(std::move(schema)) , _permit(std::move(permit)) , _classifier(std::move(classifier)) @@ -161,7 +162,7 @@ public: future<> consume(partition_start&& ps); future<> consume(static_row&& sr); future<> consume(clustering_row&& cr); - future<> consume(range_tombstone&& rt); + future<> consume(range_tombstone_change&& rt); future<> consume(partition_end&& pe); void consume_end_of_stream() { @@ -181,7 +182,7 @@ public: } }; -future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) { +future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf) { auto it = _buckets.try_emplace(bucket, _schema, _permit, _consumer).first; auto& writer = it->second; @@ -199,7 +200,7 @@ future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bu }); } - return writer.consume(mutation_fragment(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable { + return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable { writer.set_has_current_partition(); _buckets_used_for_current_partition.push_back(bucket); return writer.consume(std::move(mf)); @@ -384,28 +385,29 @@ future<> timestamp_based_splitting_mutation_writer::write_marker_and_tombstone(c } if (marker_bucket_id == tomb_bucket_id) { - return write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {}))); + return write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {}))); } auto write_marker_fut = make_ready_future<>(); if (marker_bucket_id) { - write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {}))); + write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {}))); } auto write_tomb_fut = make_ready_future<>(); if (tomb_bucket_id) { - write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {}))); + write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {}))); } return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut)).discard_result(); } future<> timestamp_based_splitting_mutation_writer::consume(partition_start&& ps) { _current_partition_start = std::move(ps); + _last_rtc_bucket.reset(); if (auto& tomb = _current_partition_start.partition_tombstone()) { auto bucket = _classifier(tomb.timestamp); auto ps = partition_start(_current_partition_start); tomb = {}; - return write_to_bucket(bucket, mutation_fragment(mutation_fragment(*_schema, _permit, std::move(ps)))); + return write_to_bucket(bucket, mutation_fragment_v2(mutation_fragment_v2(*_schema, _permit, std::move(ps)))); } return make_ready_future<>(); } @@ -416,11 +418,11 @@ future<> timestamp_based_splitting_mutation_writer::consume(static_row&& sr) { } if (const auto bucket = examine_static_row(sr)) { - return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(sr))); + return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(sr))); } return parallel_for_each(split_static_row(std::move(sr)), [this] (std::pair& sr_piece) { - return write_to_bucket(sr_piece.first, mutation_fragment(*_schema, _permit, static_row(std::move(sr_piece.second)))); + return write_to_bucket(sr_piece.first, mutation_fragment_v2(*_schema, _permit, static_row(std::move(sr_piece.second)))); }); } @@ -430,23 +432,27 @@ future<> timestamp_based_splitting_mutation_writer::consume(clustering_row&& cr) } if (const auto bucket = examine_clustering_row(cr)) { - return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(cr))); + return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(cr))); } return parallel_for_each(split_clustering_row(std::move(cr)), [this] (std::pair& cr_piece) { - return write_to_bucket(cr_piece.first, mutation_fragment(*_schema, _permit, std::move(cr_piece.second))); + return write_to_bucket(cr_piece.first, mutation_fragment_v2(*_schema, _permit, std::move(cr_piece.second))); }); } -future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone&& rt) { - auto timestamp = _classifier(rt.tomb.timestamp); - return write_to_bucket(timestamp, mutation_fragment(*_schema, _permit, std::move(rt))); +future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone_change&& rtc) { + auto id = _classifier(rtc.tombstone().timestamp); + if (_last_rtc_bucket && *_last_rtc_bucket != id) { + co_await write_to_bucket(*_last_rtc_bucket, mutation_fragment_v2(*_schema, _permit, range_tombstone_change(rtc.position(), {}))); + } + _last_rtc_bucket = id; + co_await write_to_bucket(id, mutation_fragment_v2(*_schema, _permit, std::move(rtc))); } future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) { return parallel_for_each(_buckets_used_for_current_partition, [this, pe = std::move(pe)] (bucket_id bucket) { auto& writer = _buckets.at(bucket); - return writer.consume(mutation_fragment(*_schema, _permit, partition_end(pe))).then([&writer] { + return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_end(pe))).then([&writer] { writer.clear_has_current_partition(); }); }).then([this] { @@ -454,7 +460,7 @@ future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) }); } -future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer) { +future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer) { //FIXME: make this into a consume() variant? auto schema = producer.schema(); auto permit = producer.permit(); diff --git a/mutation_writer/timestamp_based_splitting_writer.hh b/mutation_writer/timestamp_based_splitting_writer.hh index b395bcf26d..5c62eb1c99 100644 --- a/mutation_writer/timestamp_based_splitting_writer.hh +++ b/mutation_writer/timestamp_based_splitting_writer.hh @@ -28,6 +28,6 @@ namespace mutation_writer { using classify_by_timestamp = noncopyable_function; -future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer); +future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer); } // namespace mutation_writer diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index 30d26a589e..da13034d86 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -191,6 +191,8 @@ class test_bucket_writer { mutation_opt _current_mutation; bool _is_first_mutation = true; + range_tombstone_change _current_rtc; + size_t _throw_after; size_t _mutation_consumed = 0; @@ -246,8 +248,10 @@ private: } verify_row_bucket_id(cr.cells(), column_kind::regular_column); } - void verify_range_tombstone(const range_tombstone& rt) { - check_timestamp(rt.tomb.timestamp); + void verify_range_tombstone_change(const range_tombstone_change& rtc) { + if (rtc.tombstone()) { + check_timestamp(rtc.tombstone().timestamp); + } } void maybe_throw() { @@ -263,6 +267,7 @@ public: , _permit(std::move(permit)) , _classify(std::move(classify)) , _buckets(buckets) + , _current_rtc(position_in_partition::before_all_clustered_rows(), tombstone()) , _throw_after(throw_after) { } void consume_new_partition(const dht::decorated_key& dk) { @@ -290,17 +295,22 @@ public: _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(cr))); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { maybe_throw(); BOOST_REQUIRE(_current_mutation); - verify_range_tombstone(rt); - _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt))); + verify_range_tombstone_change(rtc); + if (_current_rtc.tombstone()) { + auto rt = range_tombstone(_current_rtc.position(), position_in_partition_view::before_key(rtc.position()), _current_rtc.tombstone()); + _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt))); + } + _current_rtc = std::move(rtc); return stop_iteration::no; } stop_iteration consume_end_of_partition() { maybe_throw(); BOOST_REQUIRE(_current_mutation); BOOST_REQUIRE(_bucket_id); + BOOST_REQUIRE(!_current_rtc.tombstone()); auto& bucket = _buckets[*_bucket_id]; if (_is_first_mutation) { @@ -352,13 +362,13 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) { std::unordered_map> buckets; - auto consumer = [&] (flat_mutation_reader bucket_reader) { - return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) { + auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) { + return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) { return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets)); }); }; - segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); + segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); testlog.debug("Data split into {} buckets: {}", buckets.size(), boost::copy_range>(buckets | boost::adaptors::map_keys)); @@ -422,14 +432,14 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) { int throw_after = tests::random::get_int(muts.size() - 1); testlog.info("Will raise exception after {}/{} mutations", throw_after, muts.size()); - auto consumer = [&] (flat_mutation_reader bucket_reader) { - return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) { + auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) { + return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) { return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets, throw_after)); }); }; try { - segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); + segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); } catch (const test_bucket_writer::expected_exception&) { BOOST_TEST_PASSPOINT(); } catch (const seastar::broken_promise&) { From 1ba19c2aa45623f1aaeaa8e35f15f044f528e395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 14:08:22 +0200 Subject: [PATCH 11/15] compaction/compaction_strategy: convert make_interposer_consumer() to v2 The underlying timestamp-based splitter is v2 already. --- compaction/compaction.cc | 7 ++++++- compaction/compaction_strategy.cc | 4 ++-- compaction/compaction_strategy.hh | 2 +- compaction/compaction_strategy_impl.hh | 2 +- compaction/time_window_compaction_strategy.cc | 8 ++++---- compaction/time_window_compaction_strategy.hh | 2 +- streaming/consumer.cc | 8 ++++---- table.cc | 6 +++--- 8 files changed, 22 insertions(+), 17 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index cfe44eeac4..9923dc150c 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -741,7 +741,12 @@ private: } virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) { - return _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); + auto consumer = _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) { + return end_consumer(downgrade_to_v1(std::move(reader))); + }); + return [consumer = std::move(consumer)] (flat_mutation_reader reader) mutable -> future<> { + return consumer(upgrade_to_v2(std::move(reader))); + }; } virtual bool use_interposer_consumer() const { diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 2eaf9c48eb..cda032e8a5 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -87,7 +87,7 @@ uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_sour return partition_estimate; } -reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { return end_consumer; } @@ -667,7 +667,7 @@ uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_me return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); } -reader_consumer compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer)); } diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index 614c74b538..f792d5620d 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -122,7 +122,7 @@ public: uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); - reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); + reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer); // Returns whether or not interposer consumer is used by a given strategy. bool use_interposer_consumer() const; diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index a5d54fc3b7..37f9215a4d 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -80,7 +80,7 @@ public: virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); - virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); + virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer); virtual bool use_interposer_consumer() const { return false; diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index e165958f32..8602eea22b 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -123,16 +123,16 @@ uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutati return partition_estimate / std::max(1UL, uint64_t(estimated_window_count)); } -reader_consumer time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { if (ms_meta.min_timestamp && ms_meta.max_timestamp && get_window_for(_options, *ms_meta.min_timestamp) == get_window_for(_options, *ms_meta.max_timestamp)) { return end_consumer; } - return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader rd) mutable -> future<> { + return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) mutable -> future<> { return mutation_writer::segregate_by_timestamp( - upgrade_to_v2(std::move(rd)), + std::move(rd), classify_by_timestamp(std::move(options)), - [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) { return end_consumer(downgrade_to_v1(std::move(rd))); }); + std::move(end_consumer)); }; } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 09c2f9946c..944aa7558c 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -191,7 +191,7 @@ public: virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override; - virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override; + virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) override; virtual bool use_interposer_consumer() const override { return true; diff --git a/streaming/consumer.cc b/streaming/consumer.cc index d37bf891cc..6412779dce 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -47,7 +47,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy(); const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); - auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer end_consumer) mutable { + auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable { // postpone data segregation to off-strategy compaction if enabled if (offstrategy) { return end_consumer; @@ -56,7 +56,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o }; auto consumer = make_interposer_consumer(metadata, - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader reader) { + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader_v2 reader) { sstables::shared_sstable sst; try { sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); @@ -68,7 +68,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o schema_ptr s = reader.schema(); auto& pc = service::get_local_streaming_priority(); - return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, + return sst->write_components(downgrade_to_v1(std::move(reader)), adjusted_estimated_partitions, s, cf->get_sstables_manager().configure_writer(origin), encoding_stats{}, pc).then([sst] { return sst->open_data(); @@ -86,7 +86,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o return vug.local().register_staging_sstable(sst, std::move(cf)); }); }); - co_return co_await consumer(std::move(reader)); + co_return co_await consumer(upgrade_to_v2(std::move(reader))); } catch (...) { ex = std::current_exception(); } diff --git a/table.cc b/table.cc index 5bb9bb0aaf..7273610d23 100644 --- a/table.cc +++ b/table.cc @@ -627,7 +627,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ metadata.max_timestamp = old->get_max_timestamp(); auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count()); - auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader reader) mutable -> future<> { + auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader_v2 reader) mutable -> future<> { auto&& priority = service::get_local_memtable_flush_priority(); sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable"); cfg.backup = incremental_backups_enabled(); @@ -639,7 +639,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_strategy, old->get_max_timestamp()); - co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg, priority); + co_return co_await write_memtable_to_sstable(downgrade_to_v1(std::move(reader)), *old, newtab, estimated_partitions, monitor, cfg, priority); }); flat_mutation_reader reader = old->make_flush_reader( @@ -661,7 +661,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ co_return stop_iteration::yes; } - auto f = consumer(std::move(reader)); + auto f = consumer(upgrade_to_v2(std::move(reader))); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to From aa3c943f4c7618684939f28f3dbe7360dd297721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 14:57:45 +0200 Subject: [PATCH 12/15] mutation_reader: remove unecessary stable_flattened_mutations_consumer Said wrapper was conceived to make unmovable `compact_mutation` because readers wanted movable consumers. But `compact_mutation` is movable for years now, as all its unmovable bits were moved into an `lw_shared_ptr<>` member. So drop this unnecessary wrapper and its unnecessary usages. --- compaction/compaction.cc | 6 +++--- mutation_partition.cc | 2 +- mutation_reader.hh | 20 -------------------- querier.hh | 2 +- test/boost/mutation_test.cc | 2 +- test/boost/sstable_compaction_test.cc | 2 +- 6 files changed, 7 insertions(+), 27 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 9923dc150c..ac23ddc313 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -699,7 +699,7 @@ private: auto consumer = make_interposer_consumer([this] (flat_mutation_reader reader) mutable { return seastar::async([this, reader = std::move(reader)] () mutable { auto close_reader = deferred_close(reader); - auto cfc = make_stable_flattened_mutations_consumer(get_compacted_fragments_writer()); + auto cfc = compacted_fragments_writer(get_compacted_fragments_writer()); reader.consume_in_thread(std::move(cfc)); }); }); @@ -721,7 +721,7 @@ private: if (enable_garbage_collected_sstable_writer()) { using compact_mutations = compact_for_compaction; - auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, + auto cfc = compact_mutations(*schema(), now, max_purgeable_func(), get_compacted_fragments_writer(), get_gc_compacted_fragments_writer()); @@ -730,7 +730,7 @@ private: return; } using compact_mutations = compact_for_compaction; - auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, + auto cfc = compact_mutations(*schema(), now, max_purgeable_func(), get_compacted_fragments_writer(), noop_compacted_fragments_consumer()); diff --git a/mutation_partition.cc b/mutation_partition.cc index dcbd567763..be0501f41c 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2309,7 +2309,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so // do_with() doesn't support immovable objects auto r_a_r = std::make_unique(s, source, std::move(permit), dk, slice, std::move(trace_ptr)); auto cwqrb = counter_write_query_result_builder(*s); - auto cfq = make_stable_flattened_mutations_consumer>( + auto cfq = compact_for_query( *s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb)); auto f = r_a_r->reader.consume(std::move(cfq)); return f.finally([r_a_r = std::move(r_a_r)] { diff --git a/mutation_reader.hh b/mutation_reader.hh index ec9850912f..e0c5a94e0d 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -359,26 +359,6 @@ snapshot_source make_empty_snapshot_source(); using mutation_source_opt = optimized_optional; -// Adapts a non-movable FlattenedConsumer to a movable one. -template -class stable_flattened_mutations_consumer { - std::unique_ptr _ptr; -public: - stable_flattened_mutations_consumer(std::unique_ptr ptr) : _ptr(std::move(ptr)) {} - auto consume_new_partition(const dht::decorated_key& dk) { return _ptr->consume_new_partition(dk); } - auto consume(tombstone t) { return _ptr->consume(t); } - auto consume(static_row&& sr) { return _ptr->consume(std::move(sr)); } - auto consume(clustering_row&& cr) { return _ptr->consume(std::move(cr)); } - auto consume(range_tombstone&& rt) { return _ptr->consume(std::move(rt)); } - auto consume_end_of_partition() { return _ptr->consume_end_of_partition(); } - auto consume_end_of_stream() { return _ptr->consume_end_of_stream(); } -}; - -template -stable_flattened_mutations_consumer make_stable_flattened_mutations_consumer(Args&&... args) { - return { std::make_unique(std::forward(args)...) }; -} - /// Make a foreign_reader. /// /// foreign_reader is a local representant of a reader located on a remote diff --git a/querier.hh b/querier.hh index e77a0c8b13..c5c11b8630 100644 --- a/querier.hh +++ b/querier.hh @@ -90,7 +90,7 @@ auto consume_page(flat_mutation_reader& reader, compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); - auto reader_consumer = make_stable_flattened_mutations_consumer>>( + auto reader_consumer = compact_for_query>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 20ae830de5..31ded55afa 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2951,7 +2951,7 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p return api::max_timestamp; }; auto gc_grace_seconds = schema.gc_grace_seconds(); - auto consumer = make_stable_flattened_mutations_consumer>( + auto consumer = compact_for_compaction( schema, query_time, get_max_purgeable, diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 8254ae2967..f753bbfe4b 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3372,7 +3372,7 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { gc_before = gc_now - s->gc_grace_seconds(); auto gc_grace_seconds = s->gc_grace_seconds(); - auto cfc = make_stable_flattened_mutations_consumer>( + auto cfc = compact_for_compaction( *s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr)); auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options()); From 15d8ea983e82a455d9c3a1f6c87e04a26332ba77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 14:08:31 +0200 Subject: [PATCH 13/15] compaction: upgrade compaction::make_interposer_consumer() to v2 Almost all (except the scrub one) actual interposer consumers are v2. --- compaction/compaction.cc | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index ac23ddc313..50e82e57e6 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -696,14 +696,14 @@ private: // compacting_reader. It's useful for allowing data from different buckets // to be compacted together. future<> consume_without_gc_writer(gc_clock::time_point compaction_time) { - auto consumer = make_interposer_consumer([this] (flat_mutation_reader reader) mutable { - return seastar::async([this, reader = std::move(reader)] () mutable { + auto consumer = make_interposer_consumer([this] (flat_mutation_reader_v2 reader) mutable { + return seastar::async([this, reader = downgrade_to_v1(std::move(reader))] () mutable { auto close_reader = deferred_close(reader); auto cfc = compacted_fragments_writer(get_compacted_fragments_writer()); reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func())); + return consumer(upgrade_to_v2(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func()))); } future<> consume() { @@ -714,7 +714,7 @@ private: if (!enable_garbage_collected_sstable_writer() && use_interposer_consumer()) { return consume_without_gc_writer(now); } - auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader reader) mutable + auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader_v2 reader) mutable { return seastar::async([this, reader = std::move(reader), now] () mutable { auto close_reader = deferred_close(reader); @@ -737,16 +737,11 @@ private: reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(downgrade_to_v1(make_sstable_reader())); + return consumer(make_sstable_reader()); } - virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) { - auto consumer = _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) { - return end_consumer(downgrade_to_v1(std::move(reader))); - }); - return [consumer = std::move(consumer)] (flat_mutation_reader reader) mutable -> future<> { - return consumer(upgrade_to_v2(std::move(reader))); - }; + virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) { + return _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); } virtual bool use_interposer_consumer() const { @@ -1471,16 +1466,16 @@ public: } } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { + reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override { if (!use_interposer_consumer()) { return end_consumer; } - return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> { auto cfg = mutation_writer::segregate_config{_io_priority, memory::stats().total_memory() / 10}; - return mutation_writer::segregate_by_partition(std::move(reader), cfg, + return mutation_writer::segregate_by_partition(downgrade_to_v1(std::move(reader)), cfg, [consumer = std::move(end_consumer), this] (flat_mutation_reader rd) { ++_bucket_count; - return consumer(std::move(rd)); + return consumer(upgrade_to_v2(std::move(rd))); }); }; } @@ -1554,11 +1549,9 @@ public: } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { - return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { - return mutation_writer::segregate_by_shard(upgrade_to_v2(std::move(reader)), [end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) { - return end_consumer(downgrade_to_v1(std::move(rd))); - }); + reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> { + return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer)); }; } From 85c42a5d763d69ffc4f8f264db984f6f465d029f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 15:43:02 +0200 Subject: [PATCH 14/15] querier: convert querier_cache and {data,mutation}_querier to v2 The shard_mutation_querier is left using a v1 reader in its API as the multishard query code is not ready yet. When saving this reader it is upgraded to v2 and on lookup it is downgraded to v1. This should cancel out thanks to upgrade/downgrade unwrapping. --- querier.cc | 12 ++++++------ querier.hh | 45 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/querier.cc b/querier.cc index 45569b5dcd..df7d4faf37 100644 --- a/querier.cc +++ b/querier.cc @@ -216,13 +216,13 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl) } struct querier_utils { - static flat_mutation_reader get_reader(querier_base& q) noexcept { - return std::move(std::get(q._reader)); + static flat_mutation_reader_v2 get_reader(querier_base& q) noexcept { + return std::move(std::get(q._reader)); } static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept { return std::move(std::get(q._reader)); } - static void set_reader(querier_base& q, flat_mutation_reader r) noexcept { + static void set_reader(querier_base& q, flat_mutation_reader_v2 r) noexcept { q._reader = std::move(r); } static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept { @@ -255,7 +255,7 @@ void querier_cache::insert_querier( auto& sem = q.permit().semaphore(); - auto irh = sem.register_inactive_read(upgrade_to_v2(querier_utils::get_reader(q))); + auto irh = sem.register_inactive_read(querier_utils::get_reader(q)); if (!irh) { ++stats.resource_based_evictions; return; @@ -340,7 +340,7 @@ std::optional querier_cache::lookup_querier( throw std::runtime_error("lookup_querier(): found querier that is evicted"); } reader_opt->set_timeout(timeout); - querier_utils::set_reader(q, downgrade_to_v1(std::move(*reader_opt))); + querier_utils::set_reader(q, std::move(*reader_opt)); --stats.population; const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice); @@ -393,7 +393,7 @@ std::optional querier_cache::lookup_shard_mutation_queri future<> querier_base::close() noexcept { struct variant_closer { querier_base& q; - future<> operator()(flat_mutation_reader& reader) { + future<> operator()(flat_mutation_reader_v2& reader) { return reader.close(); } future<> operator()(reader_concurrency_semaphore::inactive_read_handle& irh) { diff --git a/querier.hh b/querier.hh index c5c11b8630..d21ff252b8 100644 --- a/querier.hh +++ b/querier.hh @@ -101,6 +101,39 @@ auto consume_page(flat_mutation_reader& reader, }); } +/// Consume a page worth of data from the reader. +/// +/// Uses `compaction_state` for compacting the fragments and `consumer` for +/// building the results. +/// Returns a future containing a tuple with the last consumed clustering key, +/// or std::nullopt if the last row wasn't a clustering row, and whatever the +/// consumer's `consume_end_of_stream()` method returns. +template +requires CompactedFragmentsConsumer +auto consume_page(flat_mutation_reader_v2& reader, + lw_shared_ptr> compaction_state, + const query::partition_slice& slice, + Consumer&& consumer, + uint64_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time) { + return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( + mutation_fragment_v2* next_fragment) mutable { + const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); + + auto last_ckey = make_lw_shared>(); + auto reader_consumer = compact_for_query>( + compaction_state, + clustering_position_tracker(std::move(consumer), last_ckey)); + + return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { + static_assert(sizeof...(results) <= 1); + return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); + }); + }); +} + struct position_view { const dht::decorated_key* partition_key; const clustering_key_prefix* clustering_key; @@ -114,12 +147,12 @@ protected: reader_permit _permit; lw_shared_ptr _range; std::unique_ptr _slice; - std::variant _reader; + std::variant _reader; dht::partition_ranges_view _query_ranges; public: querier_base(reader_permit permit, lw_shared_ptr range, - std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) + std::unique_ptr slice, flat_mutation_reader_v2 reader, dht::partition_ranges_view query_ranges) : _schema(reader.schema()) , _permit(std::move(permit)) , _range(std::move(range)) @@ -134,7 +167,7 @@ public: , _permit(std::move(permit)) , _range(make_lw_shared(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _reader(ms.make_reader_v2(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _query_ranges(*_range) { } @@ -217,7 +250,7 @@ public: uint32_t partition_limit, gc_clock::time_point query_time, tracing::trace_state_ptr trace_ptr = {}) { - return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, + return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) { _last_ckey = std::get>(std::move(results)); const auto& cstats = _compaction_state->stats(); @@ -274,7 +307,7 @@ private: reader_permit permit, dht::decorated_key nominal_pkey, std::optional nominal_ckey) - : querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges) + : querier_base(permit, std::move(reader_range), std::move(reader_slice), upgrade_to_v2(std::move(reader)), *query_ranges) , _query_ranges(std::move(query_ranges)) , _nominal_pkey(std::move(nominal_pkey)) , _nominal_ckey(std::move(nominal_ckey)) { @@ -307,7 +340,7 @@ public: } flat_mutation_reader reader() && { - return std::move(std::get(_reader)); + return downgrade_to_v1(std::move(std::get(_reader))); } }; From 1a97f4c3554e3c3e37ca480490292b61b838b51f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 6 Jan 2022 15:46:16 +0200 Subject: [PATCH 15/15] table: add make_reader_v2() In fact the existing `make_reader()` is renamed to `make_reader_v2()`, dropping the `downgrade_to_v1()` from the returned reader. To ease incremental migration we add a `make_reader()` implementation which downgrades this reader back to v1. `table::as_mutation_source()` is also updated to use the v2 reader factory method. --- database.hh | 8 ++++++++ table.cc | 24 ++++++++++++++++++------ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/database.hh b/database.hh index 26a9960217..39198821e6 100644 --- a/database.hh +++ b/database.hh @@ -666,6 +666,14 @@ public: // Mutations returned by the reader will all have given schema. // If I/O needs to be issued to read anything in the specified range, the operations // will be scheduled under the priority class given by pc. + flat_mutation_reader_v2 make_reader_v2(schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc = default_priority_class(), + tracing::trace_state_ptr trace_state = nullptr, + streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; flat_mutation_reader make_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, diff --git a/table.cc b/table.cc index 7273610d23..947577ada9 100644 --- a/table.cc +++ b/table.cc @@ -140,8 +140,8 @@ table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& pa }); } -flat_mutation_reader -table::make_reader(schema_ptr s, +flat_mutation_reader_v2 +table::make_reader_v2(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -150,7 +150,7 @@ table::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { if (_virtual_reader) [[unlikely]] { - return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr); + return (*_virtual_reader).make_reader_v2(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr); } std::vector readers; @@ -202,14 +202,26 @@ table::make_reader(schema_ptr s, readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } - auto comb_reader = downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr)); + auto comb_reader = make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); if (_config.data_listeners && !_config.data_listeners->empty()) { - return _config.data_listeners->on_read(s, range, slice, std::move(comb_reader)); + return upgrade_to_v2(_config.data_listeners->on_read(s, range, slice, downgrade_to_v1(std::move(comb_reader)))); } else { return comb_reader; } } +flat_mutation_reader +table::make_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const { + return downgrade_to_v1(make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); +} + sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional subdir) { sstring dir = _config.datadir; if (subdir) { @@ -2157,7 +2169,7 @@ table::as_mutation_source() const { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); }