diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 04e9032bbc..50e82e57e6 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -696,14 +696,14 @@ private: // compacting_reader. It's useful for allowing data from different buckets // to be compacted together. future<> consume_without_gc_writer(gc_clock::time_point compaction_time) { - auto consumer = make_interposer_consumer([this] (flat_mutation_reader reader) mutable { - return seastar::async([this, reader = std::move(reader)] () mutable { + auto consumer = make_interposer_consumer([this] (flat_mutation_reader_v2 reader) mutable { + return seastar::async([this, reader = downgrade_to_v1(std::move(reader))] () mutable { auto close_reader = deferred_close(reader); - auto cfc = make_stable_flattened_mutations_consumer(get_compacted_fragments_writer()); + auto cfc = compacted_fragments_writer(get_compacted_fragments_writer()); reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func())); + return consumer(upgrade_to_v2(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func()))); } future<> consume() { @@ -714,14 +714,14 @@ private: if (!enable_garbage_collected_sstable_writer() && use_interposer_consumer()) { return consume_without_gc_writer(now); } - auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader reader) mutable + auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader_v2 reader) mutable { return seastar::async([this, reader = std::move(reader), now] () mutable { auto close_reader = deferred_close(reader); if (enable_garbage_collected_sstable_writer()) { using compact_mutations = compact_for_compaction; - auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, + auto cfc = compact_mutations(*schema(), now, max_purgeable_func(), get_compacted_fragments_writer(), get_gc_compacted_fragments_writer()); @@ -730,17 +730,17 @@ private: return; } using compact_mutations = compact_for_compaction; - auto cfc = make_stable_flattened_mutations_consumer(*schema(), now, + auto cfc = compact_mutations(*schema(), now, max_purgeable_func(), get_compacted_fragments_writer(), noop_compacted_fragments_consumer()); reader.consume_in_thread(std::move(cfc)); }); }); - return consumer(downgrade_to_v1(make_sstable_reader())); + return consumer(make_sstable_reader()); } - virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) { + virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) { return _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer)); } @@ -1466,16 +1466,16 @@ public: } } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { + reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override { if (!use_interposer_consumer()) { return end_consumer; } - return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> { auto cfg = mutation_writer::segregate_config{_io_priority, memory::stats().total_memory() / 10}; - return mutation_writer::segregate_by_partition(std::move(reader), cfg, + return mutation_writer::segregate_by_partition(downgrade_to_v1(std::move(reader)), cfg, [consumer = std::move(end_consumer), this] (flat_mutation_reader rd) { ++_bucket_count; - return consumer(std::move(rd)); + return consumer(upgrade_to_v2(std::move(rd))); }); }; } @@ -1549,8 +1549,8 @@ public: } - reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { - return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { + reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> { return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer)); }; } diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 2eaf9c48eb..cda032e8a5 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -87,7 +87,7 @@ uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_sour return partition_estimate; } -reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { return end_consumer; } @@ -667,7 +667,7 @@ uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_me return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate); } -reader_consumer compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer)); } diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index 614c74b538..f792d5620d 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -122,7 +122,7 @@ public: uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); - reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); + reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer); // Returns whether or not interposer consumer is used by a given strategy. bool use_interposer_consumer() const; diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index a5d54fc3b7..37f9215a4d 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -80,7 +80,7 @@ public: virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate); - virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer); + virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer); virtual bool use_interposer_consumer() const { return false; diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index c8cceff6b9..8602eea22b 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -123,12 +123,12 @@ uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutati return partition_estimate / std::max(1UL, uint64_t(estimated_window_count)); } -reader_consumer time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) { +reader_consumer_v2 time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) { if (ms_meta.min_timestamp && ms_meta.max_timestamp && get_window_for(_options, *ms_meta.min_timestamp) == get_window_for(_options, *ms_meta.max_timestamp)) { return end_consumer; } - return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader rd) mutable -> future<> { + return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) mutable -> future<> { return mutation_writer::segregate_by_timestamp( std::move(rd), classify_by_timestamp(std::move(options)), diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 09c2f9946c..944aa7558c 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -191,7 +191,7 @@ public: virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override; - virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override; + virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) override; virtual bool use_interposer_consumer() const override { return true; diff --git a/flat_mutation_reader_v2.hh b/flat_mutation_reader_v2.hh index 34e67fcc44..551b77a9e4 100644 --- a/flat_mutation_reader_v2.hh +++ b/flat_mutation_reader_v2.hh @@ -857,3 +857,8 @@ make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque, const dht::partition_range& pr, const query::partition_slice& slice); + +/// A cosumer function that is passed a flat_mutation_reader to be consumed from +/// and returns a future<> resolved when the reader is fully consumed, and closed. +/// Note: the function assumes ownership of the reader and must close it in all cases. +using reader_consumer_v2 = noncopyable_function (flat_mutation_reader_v2)>; diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 21a305f472..429ef4bc07 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -451,11 +451,13 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de auto& sharder = _schema->get_sharder(); const auto shard = sharder.shard_of(compaction_state.partition_start.key().token()); + auto& range_tombstones = std::get>(compaction_state.range_tombstones); + // It is possible that the reader this partition originates from does not // exist anymore. Either because we failed stopping it or because it was // evicted. if (_readers[shard].state != reader_state::saving) { - for (auto& rt : compaction_state.range_tombstones) { + for (auto& rt : range_tombstones) { stats.add_discarded(*_schema, rt); } if (compaction_state.static_row) { @@ -467,7 +469,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit); - for (auto& rt : compaction_state.range_tombstones | boost::adaptors::reversed) { + for (auto& rt : range_tombstones | boost::adaptors::reversed) { stats.add(*_schema, rt); shard_buffer.emplace_front(*_schema, _permit, std::move(rt)); } diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 9b6ff5c3a8..1bcd6965bb 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -23,6 +23,7 @@ #include "compaction/compaction_garbage_collector.hh" #include "mutation_fragment.hh" +#include "range_tombstone_assembler.hh" #include "tombstone_gc.hh" static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) { @@ -57,7 +58,7 @@ concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::dec struct detached_compaction_state { ::partition_start partition_start; std::optional<::static_row> static_row; - std::deque range_tombstones; + std::variant, std::optional> range_tombstones; }; class noop_compacted_fragments_consumer { @@ -160,7 +161,8 @@ class compact_mutation_state { uint32_t _partition_limit{}; uint64_t _partition_row_limit{}; - range_tombstone_accumulator _range_tombstones; + range_tombstone_accumulator _range_tombstones; // used when consuming v1 stream and for storing the partition tombstone + std::optional _rt_assembler; // used when consuming a v2 stream bool _static_row_live{}; uint64_t _rows_in_current_partition; @@ -177,6 +179,32 @@ class compact_mutation_state { compaction_stats _stats; private: + template + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { + if (rt.tomb <= _range_tombstones.get_partition_tombstone()) { + return stop_iteration::no; + } + if (can_purge_tombstone(rt.tomb)) { + partition_is_not_empty_for_gc_consumer(gc_consumer); + return gc_consumer.consume(std::move(rt)); + } else { + partition_is_not_empty(consumer); + return consumer.consume(std::move(rt)); + } + } + template + tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { + if (!_rt_assembler) { // we are either consuming v1 or consumed no range tombstone [change] at all + return _range_tombstones.tombstone_for_row(ckey); + } + if (_rt_assembler->needs_flush()) { + if (auto rt_opt = _rt_assembler->flush(_schema, position_in_partition::after_key(ckey))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } + } + return std::max(_range_tombstones.get_partition_tombstone(), _rt_assembler->get_current_tombstone()); + } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } @@ -292,6 +320,9 @@ public: _rows_in_current_partition = 0; _static_row_live = false; _range_tombstones.clear(); + if (_rt_assembler) { + _rt_assembler->reset(); + } _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; _gc_before = std::nullopt; @@ -345,7 +376,7 @@ public: template requires CompactedFragmentsConsumer && CompactedFragmentsConsumer stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { - auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key()); + auto current_tombstone = tombstone_for_row(cr.key(), consumer, gc_consumer); auto t = cr.tomb(); t.apply(current_tombstone); @@ -409,21 +440,28 @@ public: ++_stats.range_tombstones; _range_tombstones.apply(rt); // FIXME: drop tombstone if it is fully covered by other range tombstones - if (rt.tomb > _range_tombstones.get_partition_tombstone()) { - if (can_purge_tombstone(rt.tomb)) { - partition_is_not_empty_for_gc_consumer(gc_consumer); - return gc_consumer.consume(std::move(rt)); - } else { - partition_is_not_empty(consumer); - return consumer.consume(std::move(rt)); - } - } + return do_consume(std::move(rt), consumer, gc_consumer); + } + + template + requires CompactedFragmentsConsumer && CompactedFragmentsConsumer + stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { + ++_stats.range_tombstones; + if (!_rt_assembler) { + _rt_assembler.emplace(); + } + if (auto rt_opt = _rt_assembler->consume(_schema, std::move(rtc))) { + do_consume(std::move(*rt_opt), consumer, gc_consumer); + } return stop_iteration::no; } template requires CompactedFragmentsConsumer && CompactedFragmentsConsumer stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { + if (_rt_assembler) { + _rt_assembler->on_end_of_stream(); + } if (!_empty_partition_in_gc_consumer) { gc_consumer.consume_end_of_partition(); } @@ -475,7 +513,7 @@ public: void start_new_page(uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, - mutation_fragment::kind next_fragment_kind, + partition_region next_fragment_region, Consumer& consumer) { _empty_partition = true; _static_row_live = false; @@ -486,8 +524,7 @@ public: _query_time = query_time; _stats = {}; - if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone) - && _last_static_row) { + if (next_fragment_region == partition_region::clustered && _last_static_row) { // Stopping here would cause an infinite loop so ignore return value. noop_compacted_fragments_consumer nc; consume(*std::exchange(_last_static_row, {}), consumer, nc); @@ -507,6 +544,9 @@ public: /// allows the compaction state to be stored in the compacted reader. detached_compaction_state detach_state() && { partition_start ps(std::move(_last_dk), _range_tombstones.get_partition_tombstone()); + if (_rt_assembler) { + return {std::move(ps), std::move(_last_static_row), std::move(*_rt_assembler).get_range_tombstone_change()}; + } return {std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()}; } @@ -564,6 +604,10 @@ public: return _state->consume(std::move(rt), _consumer, _gc_consumer); } + stop_iteration consume(range_tombstone_change&& rtc) { + return _state->consume(std::move(rtc), _consumer, _gc_consumer); + } + stop_iteration consume_end_of_partition() { return _state->consume_end_of_partition(_consumer, _gc_consumer); } diff --git a/mutation_partition.cc b/mutation_partition.cc index dcbd567763..be0501f41c 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2309,7 +2309,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so // do_with() doesn't support immovable objects auto r_a_r = std::make_unique(s, source, std::move(permit), dk, slice, std::move(trace_ptr)); auto cwqrb = counter_write_query_result_builder(*s); - auto cfq = make_stable_flattened_mutations_consumer>( + auto cfq = compact_for_query( *s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb)); auto f = r_a_r->reader.consume(std::move(cfq)); return f.finally([r_a_r = std::move(r_a_r)] { diff --git a/mutation_reader.cc b/mutation_reader.cc index 633efc359c..86674ba29d 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -3108,6 +3108,180 @@ std::pair make_queue_reader(schema_pt return {flat_mutation_reader(std::move(impl)), std::move(handle)}; } +class queue_reader_v2 final : public flat_mutation_reader_v2::impl { + friend class queue_reader_handle_v2; + +private: + queue_reader_handle_v2* _handle = nullptr; + std::optional> _not_full; + std::optional> _full; + std::exception_ptr _ex; + +private: + void push_and_maybe_notify(mutation_fragment_v2&& mf) { + push_mutation_fragment(std::move(mf)); + if (_full && is_buffer_full()) { + _full->set_value(); + _full.reset(); + } + } + +public: + explicit queue_reader_v2(schema_ptr s, reader_permit permit) + : impl(std::move(s), std::move(permit)) { + } + virtual future<> fill_buffer() override { + if (_ex) { + return make_exception_future<>(_ex); + } + if (_end_of_stream || !is_buffer_empty()) { + return make_ready_future<>(); + } + if (_not_full) { + _not_full->set_value(); + _not_full.reset(); + } + _full.emplace(); + return _full->get_future(); + } + virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty() && !is_end_of_stream()) { + return fill_buffer().then([this] { + return next_partition(); + }); + } + return make_ready_future<>(); + } + virtual future<> fast_forward_to(const dht::partition_range&) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> fast_forward_to(position_range) override { + return make_exception_future<>(make_backtraced_exception_ptr()); + } + virtual future<> close() noexcept override { + // wake up any waiters to prevent broken_promise errors + if (_full) { + _full->set_value(); + _full.reset(); + } else if (_not_full) { + _not_full->set_value(); + _not_full.reset(); + } + // detach from the queue_reader_handle + // since it should never access the reader after close. + if (_handle) { + _handle->_reader = nullptr; + _handle = nullptr; + } + return make_ready_future<>(); + } + future<> push(mutation_fragment_v2&& mf) { + push_and_maybe_notify(std::move(mf)); + if (!is_buffer_full()) { + return make_ready_future<>(); + } + _not_full.emplace(); + return _not_full->get_future(); + } + void push_end_of_stream() { + _end_of_stream = true; + if (_full) { + _full->set_value(); + _full.reset(); + } + } + void abort(std::exception_ptr ep) noexcept { + _ex = std::move(ep); + if (_full) { + _full->set_exception(_ex); + _full.reset(); + } else if (_not_full) { + _not_full->set_exception(_ex); + _not_full.reset(); + } + } +}; + +void queue_reader_handle_v2::abandon() noexcept { + std::exception_ptr ex; + try { + ex = std::make_exception_ptr(std::runtime_error("Abandoned queue_reader_handle_v2")); + } catch (...) { + ex = std::current_exception(); + } + abort(std::move(ex)); +} + +queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_v2& reader) noexcept : _reader(&reader) { + _reader->_handle = this; +} + +queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept + : _reader(std::exchange(o._reader, nullptr)) + , _ex(std::exchange(o._ex, nullptr)) +{ + if (_reader) { + _reader->_handle = this; + } +} + +queue_reader_handle_v2::~queue_reader_handle_v2() { + abandon(); +} + +queue_reader_handle_v2& queue_reader_handle_v2::operator=(queue_reader_handle_v2&& o) { + abandon(); + _reader = std::exchange(o._reader, nullptr); + _ex = std::exchange(o._ex, {}); + if (_reader) { + _reader->_handle = this; + } + return *this; +} + +future<> queue_reader_handle_v2::push(mutation_fragment_v2 mf) { + if (!_reader) { + if (_ex) { + return make_exception_future<>(_ex); + } + return make_exception_future<>(std::runtime_error("Dangling queue_reader_handle_v2")); + } + return _reader->push(std::move(mf)); +} + +void queue_reader_handle_v2::push_end_of_stream() { + if (!_reader) { + throw std::runtime_error("Dangling queue_reader_handle_v2"); + } + _reader->push_end_of_stream(); + _reader->_handle = nullptr; + _reader = nullptr; +} + +bool queue_reader_handle_v2::is_terminated() const { + return _reader == nullptr; +} + +void queue_reader_handle_v2::abort(std::exception_ptr ep) { + _ex = std::move(ep); + if (_reader) { + _reader->abort(_ex); + _reader->_handle = nullptr; + _reader = nullptr; + } +} + +std::exception_ptr queue_reader_handle_v2::get_exception() const noexcept { + return _ex; +} + +std::pair make_queue_reader_v2(schema_ptr s, reader_permit permit) { + auto impl = std::make_unique(std::move(s), std::move(permit)); + auto handle = queue_reader_handle_v2(*impl); + return {flat_mutation_reader_v2(std::move(impl)), std::move(handle)}; +} + namespace { class compacting_reader : public flat_mutation_reader::impl { diff --git a/mutation_reader.hh b/mutation_reader.hh index ab7acc9528..3a33c019c3 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -359,26 +359,6 @@ snapshot_source make_empty_snapshot_source(); using mutation_source_opt = optimized_optional; -// Adapts a non-movable FlattenedConsumer to a movable one. -template -class stable_flattened_mutations_consumer { - std::unique_ptr _ptr; -public: - stable_flattened_mutations_consumer(std::unique_ptr ptr) : _ptr(std::move(ptr)) {} - auto consume_new_partition(const dht::decorated_key& dk) { return _ptr->consume_new_partition(dk); } - auto consume(tombstone t) { return _ptr->consume(t); } - auto consume(static_row&& sr) { return _ptr->consume(std::move(sr)); } - auto consume(clustering_row&& cr) { return _ptr->consume(std::move(cr)); } - auto consume(range_tombstone&& rt) { return _ptr->consume(std::move(rt)); } - auto consume_end_of_partition() { return _ptr->consume_end_of_partition(); } - auto consume_end_of_stream() { return _ptr->consume_end_of_stream(); } -}; - -template -stable_flattened_mutations_consumer make_stable_flattened_mutations_consumer(Args&&... args) { - return { std::make_unique(std::forward(args)...) }; -} - /// Make a foreign_reader. /// /// foreign_reader is a local representant of a reader located on a remote @@ -800,6 +780,51 @@ public: std::pair make_queue_reader(schema_ptr s, reader_permit permit); +class queue_reader_v2; + +/// Calls to different methods cannot overlap! +/// The handle can be used only while the reader is still alive. Once +/// `push_end_of_stream()` is called, the reader and the handle can be destroyed +/// in any order. The reader can be destroyed at any time. +class queue_reader_handle_v2 { + friend std::pair make_queue_reader_v2(schema_ptr, reader_permit); + friend class queue_reader_v2; + +private: + queue_reader_v2* _reader = nullptr; + std::exception_ptr _ex; + +private: + explicit queue_reader_handle_v2(queue_reader_v2& reader) noexcept; + + void abandon() noexcept; + +public: + queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept; + ~queue_reader_handle_v2(); + queue_reader_handle_v2& operator=(queue_reader_handle_v2&& o); + + future<> push(mutation_fragment_v2 mf); + + /// Terminate the queue. + /// + /// The reader will be set to EOS. The handle cannot be used anymore. + void push_end_of_stream(); + + /// Aborts the queue. + /// + /// All future operations on the handle or the reader will raise `ep`. + void abort(std::exception_ptr ep); + + /// Checks if the queue is already terminated with either a success or failure (abort) + bool is_terminated() const; + + /// Get the stored exception, if any + std::exception_ptr get_exception() const noexcept; +}; + +std::pair make_queue_reader_v2(schema_ptr s, reader_permit permit); + /// Creates a compacting reader. /// /// The compaction is done with a \ref mutation_compactor, using compaction-type diff --git a/mutation_writer/feed_writers.cc b/mutation_writer/feed_writers.cc index 8427e8a098..3f15e11269 100644 --- a/mutation_writer/feed_writers.cc +++ b/mutation_writer/feed_writers.cc @@ -63,4 +63,44 @@ future<> bucket_writer::close() noexcept { return std::move(_consume_fut); } +bucket_writer_v2::bucket_writer_v2(schema_ptr schema, std::pair queue_reader, reader_consumer_v2& consumer) + : _schema(schema) + , _handle(std::move(queue_reader.second)) + , _consume_fut(consumer(std::move(queue_reader.first))) +{ } + +bucket_writer_v2::bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer) + : bucket_writer_v2(schema, make_queue_reader_v2(schema, std::move(permit)), consumer) +{ } + +future<> bucket_writer_v2::consume(mutation_fragment_v2 mf) { + if (_handle.is_terminated()) { + // When the handle is terminated, it was aborted + // or associated reader was closed prematurely. + // In this case return _consume_fut that will propagate + // the root-cause error. + auto ex = _handle.get_exception(); + if (!ex) { + // shouldn't really happen + ex = make_exception_ptr(std::runtime_error("queue_reader_handle_v2 is terminated")); + } + return std::exchange(_consume_fut, make_exception_future<>(ex)).then([ex = std::move(ex)] () mutable { + return make_exception_future<>(std::move(ex)); + }); + } + return _handle.push(std::move(mf)); +} + +void bucket_writer_v2::consume_end_of_stream() { + _handle.push_end_of_stream(); +} + +void bucket_writer_v2::abort(std::exception_ptr ep) noexcept { + _handle.abort(std::move(ep)); +} + +future<> bucket_writer_v2::close() noexcept { + return std::move(_consume_fut); +} + } // mutation_writer diff --git a/mutation_writer/feed_writers.hh b/mutation_writer/feed_writers.hh index 3e6bd8fc3c..c2f1f83e51 100644 --- a/mutation_writer/feed_writers.hh +++ b/mutation_writer/feed_writers.hh @@ -90,4 +90,67 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) { } } +class bucket_writer_v2 { + schema_ptr _schema; + queue_reader_handle_v2 _handle; + future<> _consume_fut; + +private: + bucket_writer_v2(schema_ptr schema, std::pair queue_reader_v2, reader_consumer_v2& consumer); + +public: + bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer); + + future<> consume(mutation_fragment_v2 mf); + + void consume_end_of_stream(); + + void abort(std::exception_ptr ep) noexcept; + + future<> close() noexcept; +}; + +template +requires MutationFragmentConsumerV2> +future<> feed_writer(flat_mutation_reader_v2&& rd_ref, Writer wr) { + // Only move in reader if stack was successfully allocated, so caller can close reader otherwise. + auto rd = std::move(rd_ref); + std::exception_ptr ex; + try { + while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) { + co_await rd.fill_buffer(); + while (!rd.is_buffer_empty()) { + co_await rd.pop_mutation_fragment().consume(wr); + } + } + } catch (...) { + ex = std::current_exception(); + } + + co_await rd.close(); + + try { + if (ex) { + wr.abort(ex); + } else { + wr.consume_end_of_stream(); + } + } catch (...) { + if (!ex) { + ex = std::current_exception(); + } + } + + try { + co_await wr.close(); + } catch (...) { + if (!ex) { + ex = std::current_exception(); + } + } + if (ex) { + std::rethrow_exception(ex); + } +} + } diff --git a/mutation_writer/shard_based_splitting_writer.cc b/mutation_writer/shard_based_splitting_writer.cc index 2129dc0c44..e32917c1ec 100644 --- a/mutation_writer/shard_based_splitting_writer.cc +++ b/mutation_writer/shard_based_splitting_writer.cc @@ -31,21 +31,21 @@ namespace mutation_writer { class shard_based_splitting_mutation_writer { - using shard_writer = bucket_writer; + using shard_writer = bucket_writer_v2; private: schema_ptr _schema; reader_permit _permit; - reader_consumer _consumer; + reader_consumer_v2 _consumer; unsigned _current_shard; std::vector> _shards; - future<> write_to_shard(mutation_fragment&& mf) { + future<> write_to_shard(mutation_fragment_v2&& mf) { auto& writer = *_shards[_current_shard]; return writer.consume(std::move(mf)); } public: - shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer) + shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2 consumer) : _schema(std::move(schema)) , _permit(std::move(permit)) , _consumer(std::move(consumer)) @@ -57,23 +57,23 @@ public: if (!_shards[_current_shard]) { _shards[_current_shard] = shard_writer(_schema, _permit, _consumer); } - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(ps))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(ps))); } future<> consume(static_row&& sr) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(sr))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(sr))); } future<> consume(clustering_row&& cr) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(cr))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(cr))); } - future<> consume(range_tombstone&& rt) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(rt))); + future<> consume(range_tombstone_change&& rt) { + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(rt))); } future<> consume(partition_end&& pe) { - return write_to_shard(mutation_fragment(*_schema, _permit, std::move(pe))); + return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(pe))); } void consume_end_of_stream() { @@ -97,7 +97,7 @@ public: } }; -future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) { +future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer) { auto schema = producer.schema(); auto permit = producer.permit(); return feed_writer( diff --git a/mutation_writer/shard_based_splitting_writer.hh b/mutation_writer/shard_based_splitting_writer.hh index 187025a7cb..ac8b291eae 100644 --- a/mutation_writer/shard_based_splitting_writer.hh +++ b/mutation_writer/shard_based_splitting_writer.hh @@ -31,6 +31,6 @@ namespace mutation_writer { // manner. This is useful, for instance, in the resharding process where a user changes // the amount of CPU assigned to Scylla and we have to rewrite the SSTables to their new // owners. -future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer); +future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer); } // namespace mutation_writer diff --git a/mutation_writer/timestamp_based_splitting_writer.cc b/mutation_writer/timestamp_based_splitting_writer.cc index 5f05c7f4b3..6989752010 100644 --- a/mutation_writer/timestamp_based_splitting_writer.cc +++ b/mutation_writer/timestamp_based_splitting_writer.cc @@ -109,12 +109,12 @@ small_flat_map::find(const key_type& k) { class timestamp_based_splitting_mutation_writer { using bucket_id = int64_t; - class timestamp_bucket_writer : public bucket_writer { + class timestamp_bucket_writer : public bucket_writer_v2 { bool _has_current_partition = false; public: - timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer) - : bucket_writer(schema, std::move(permit), consumer) { + timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer) + : bucket_writer_v2(schema, std::move(permit), consumer) { } void set_has_current_partition() { _has_current_partition = true; @@ -131,13 +131,14 @@ private: schema_ptr _schema; reader_permit _permit; classify_by_timestamp _classifier; - reader_consumer _consumer; + reader_consumer_v2 _consumer; partition_start _current_partition_start; std::unordered_map _buckets; std::vector _buckets_used_for_current_partition; + std::optional _last_rtc_bucket; private: - future<> write_to_bucket(bucket_id bucket, mutation_fragment&& mf); + future<> write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf); std::optional examine_column(const atomic_cell_or_collection& c, const column_definition& cdef); std::optional examine_row(const row& r, column_kind kind); @@ -150,7 +151,7 @@ private: future<> write_marker_and_tombstone(const clustering_row& cr); public: - timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer consumer) + timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer_v2 consumer) : _schema(std::move(schema)) , _permit(std::move(permit)) , _classifier(std::move(classifier)) @@ -161,7 +162,7 @@ public: future<> consume(partition_start&& ps); future<> consume(static_row&& sr); future<> consume(clustering_row&& cr); - future<> consume(range_tombstone&& rt); + future<> consume(range_tombstone_change&& rt); future<> consume(partition_end&& pe); void consume_end_of_stream() { @@ -181,7 +182,7 @@ public: } }; -future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) { +future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf) { auto it = _buckets.try_emplace(bucket, _schema, _permit, _consumer).first; auto& writer = it->second; @@ -199,7 +200,7 @@ future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bu }); } - return writer.consume(mutation_fragment(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable { + return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable { writer.set_has_current_partition(); _buckets_used_for_current_partition.push_back(bucket); return writer.consume(std::move(mf)); @@ -384,28 +385,29 @@ future<> timestamp_based_splitting_mutation_writer::write_marker_and_tombstone(c } if (marker_bucket_id == tomb_bucket_id) { - return write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {}))); + return write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {}))); } auto write_marker_fut = make_ready_future<>(); if (marker_bucket_id) { - write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {}))); + write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {}))); } auto write_tomb_fut = make_ready_future<>(); if (tomb_bucket_id) { - write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {}))); + write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {}))); } return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut)).discard_result(); } future<> timestamp_based_splitting_mutation_writer::consume(partition_start&& ps) { _current_partition_start = std::move(ps); + _last_rtc_bucket.reset(); if (auto& tomb = _current_partition_start.partition_tombstone()) { auto bucket = _classifier(tomb.timestamp); auto ps = partition_start(_current_partition_start); tomb = {}; - return write_to_bucket(bucket, mutation_fragment(mutation_fragment(*_schema, _permit, std::move(ps)))); + return write_to_bucket(bucket, mutation_fragment_v2(mutation_fragment_v2(*_schema, _permit, std::move(ps)))); } return make_ready_future<>(); } @@ -416,11 +418,11 @@ future<> timestamp_based_splitting_mutation_writer::consume(static_row&& sr) { } if (const auto bucket = examine_static_row(sr)) { - return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(sr))); + return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(sr))); } return parallel_for_each(split_static_row(std::move(sr)), [this] (std::pair& sr_piece) { - return write_to_bucket(sr_piece.first, mutation_fragment(*_schema, _permit, static_row(std::move(sr_piece.second)))); + return write_to_bucket(sr_piece.first, mutation_fragment_v2(*_schema, _permit, static_row(std::move(sr_piece.second)))); }); } @@ -430,23 +432,27 @@ future<> timestamp_based_splitting_mutation_writer::consume(clustering_row&& cr) } if (const auto bucket = examine_clustering_row(cr)) { - return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(cr))); + return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(cr))); } return parallel_for_each(split_clustering_row(std::move(cr)), [this] (std::pair& cr_piece) { - return write_to_bucket(cr_piece.first, mutation_fragment(*_schema, _permit, std::move(cr_piece.second))); + return write_to_bucket(cr_piece.first, mutation_fragment_v2(*_schema, _permit, std::move(cr_piece.second))); }); } -future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone&& rt) { - auto timestamp = _classifier(rt.tomb.timestamp); - return write_to_bucket(timestamp, mutation_fragment(*_schema, _permit, std::move(rt))); +future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone_change&& rtc) { + auto id = _classifier(rtc.tombstone().timestamp); + if (_last_rtc_bucket && *_last_rtc_bucket != id) { + co_await write_to_bucket(*_last_rtc_bucket, mutation_fragment_v2(*_schema, _permit, range_tombstone_change(rtc.position(), {}))); + } + _last_rtc_bucket = id; + co_await write_to_bucket(id, mutation_fragment_v2(*_schema, _permit, std::move(rtc))); } future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) { return parallel_for_each(_buckets_used_for_current_partition, [this, pe = std::move(pe)] (bucket_id bucket) { auto& writer = _buckets.at(bucket); - return writer.consume(mutation_fragment(*_schema, _permit, partition_end(pe))).then([&writer] { + return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_end(pe))).then([&writer] { writer.clear_has_current_partition(); }); }).then([this] { @@ -454,7 +460,7 @@ future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) }); } -future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer) { +future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer) { //FIXME: make this into a consume() variant? auto schema = producer.schema(); auto permit = producer.permit(); diff --git a/mutation_writer/timestamp_based_splitting_writer.hh b/mutation_writer/timestamp_based_splitting_writer.hh index b395bcf26d..5c62eb1c99 100644 --- a/mutation_writer/timestamp_based_splitting_writer.hh +++ b/mutation_writer/timestamp_based_splitting_writer.hh @@ -28,6 +28,6 @@ namespace mutation_writer { using classify_by_timestamp = noncopyable_function; -future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer); +future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer); } // namespace mutation_writer diff --git a/querier.cc b/querier.cc index 45569b5dcd..df7d4faf37 100644 --- a/querier.cc +++ b/querier.cc @@ -216,13 +216,13 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl) } struct querier_utils { - static flat_mutation_reader get_reader(querier_base& q) noexcept { - return std::move(std::get(q._reader)); + static flat_mutation_reader_v2 get_reader(querier_base& q) noexcept { + return std::move(std::get(q._reader)); } static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept { return std::move(std::get(q._reader)); } - static void set_reader(querier_base& q, flat_mutation_reader r) noexcept { + static void set_reader(querier_base& q, flat_mutation_reader_v2 r) noexcept { q._reader = std::move(r); } static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept { @@ -255,7 +255,7 @@ void querier_cache::insert_querier( auto& sem = q.permit().semaphore(); - auto irh = sem.register_inactive_read(upgrade_to_v2(querier_utils::get_reader(q))); + auto irh = sem.register_inactive_read(querier_utils::get_reader(q)); if (!irh) { ++stats.resource_based_evictions; return; @@ -340,7 +340,7 @@ std::optional querier_cache::lookup_querier( throw std::runtime_error("lookup_querier(): found querier that is evicted"); } reader_opt->set_timeout(timeout); - querier_utils::set_reader(q, downgrade_to_v1(std::move(*reader_opt))); + querier_utils::set_reader(q, std::move(*reader_opt)); --stats.population; const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice); @@ -393,7 +393,7 @@ std::optional querier_cache::lookup_shard_mutation_queri future<> querier_base::close() noexcept { struct variant_closer { querier_base& q; - future<> operator()(flat_mutation_reader& reader) { + future<> operator()(flat_mutation_reader_v2& reader) { return reader.close(); } future<> operator()(reader_concurrency_semaphore::inactive_read_handle& irh) { diff --git a/querier.hh b/querier.hh index 5f6cc04fde..d21ff252b8 100644 --- a/querier.hh +++ b/querier.hh @@ -86,11 +86,44 @@ auto consume_page(flat_mutation_reader& reader, gc_clock::time_point query_time) { return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( mutation_fragment* next_fragment) mutable { - const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; - compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, consumer); + const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); - auto reader_consumer = make_stable_flattened_mutations_consumer>>( + auto reader_consumer = compact_for_query>( + compaction_state, + clustering_position_tracker(std::move(consumer), last_ckey)); + + return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { + static_assert(sizeof...(results) <= 1); + return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); + }); + }); +} + +/// Consume a page worth of data from the reader. +/// +/// Uses `compaction_state` for compacting the fragments and `consumer` for +/// building the results. +/// Returns a future containing a tuple with the last consumed clustering key, +/// or std::nullopt if the last row wasn't a clustering row, and whatever the +/// consumer's `consume_end_of_stream()` method returns. +template +requires CompactedFragmentsConsumer +auto consume_page(flat_mutation_reader_v2& reader, + lw_shared_ptr> compaction_state, + const query::partition_slice& slice, + Consumer&& consumer, + uint64_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time) { + return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( + mutation_fragment_v2* next_fragment) mutable { + const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); + + auto last_ckey = make_lw_shared>(); + auto reader_consumer = compact_for_query>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); @@ -114,12 +147,12 @@ protected: reader_permit _permit; lw_shared_ptr _range; std::unique_ptr _slice; - std::variant _reader; + std::variant _reader; dht::partition_ranges_view _query_ranges; public: querier_base(reader_permit permit, lw_shared_ptr range, - std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) + std::unique_ptr slice, flat_mutation_reader_v2 reader, dht::partition_ranges_view query_ranges) : _schema(reader.schema()) , _permit(std::move(permit)) , _range(std::move(range)) @@ -134,7 +167,7 @@ public: , _permit(std::move(permit)) , _range(make_lw_shared(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _reader(ms.make_reader_v2(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _query_ranges(*_range) { } @@ -217,7 +250,7 @@ public: uint32_t partition_limit, gc_clock::time_point query_time, tracing::trace_state_ptr trace_ptr = {}) { - return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, + return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) { _last_ckey = std::get>(std::move(results)); const auto& cstats = _compaction_state->stats(); @@ -274,7 +307,7 @@ private: reader_permit permit, dht::decorated_key nominal_pkey, std::optional nominal_ckey) - : querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges) + : querier_base(permit, std::move(reader_range), std::move(reader_slice), upgrade_to_v2(std::move(reader)), *query_ranges) , _query_ranges(std::move(query_ranges)) , _nominal_pkey(std::move(nominal_pkey)) , _nominal_ckey(std::move(nominal_ckey)) { @@ -307,7 +340,7 @@ public: } flat_mutation_reader reader() && { - return std::move(std::get(_reader)); + return downgrade_to_v1(std::move(std::get(_reader))); } }; diff --git a/range_tombstone_assembler.hh b/range_tombstone_assembler.hh index 83a7728378..e6720b189b 100644 --- a/range_tombstone_assembler.hh +++ b/range_tombstone_assembler.hh @@ -53,6 +53,14 @@ private: return _prev_rt && _prev_rt->tombstone(); } public: + tombstone get_current_tombstone() const { + return _prev_rt ? _prev_rt->tombstone() : tombstone(); + } + + std::optional get_range_tombstone_change() && { + return std::move(_prev_rt); + } + void reset() { _prev_rt = std::nullopt; } diff --git a/replica/database.hh b/replica/database.hh index 0b0b35c919..abc386d886 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -673,6 +673,14 @@ public: // Mutations returned by the reader will all have given schema. // If I/O needs to be issued to read anything in the specified range, the operations // will be scheduled under the priority class given by pc. + flat_mutation_reader_v2 make_reader_v2(schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc = default_priority_class(), + tracing::trace_state_ptr trace_state = nullptr, + streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; flat_mutation_reader make_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, diff --git a/replica/table.cc b/replica/table.cc index 03ef785fd5..b49001fe54 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -143,8 +143,8 @@ table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& pa }); } -flat_mutation_reader -table::make_reader(schema_ptr s, +flat_mutation_reader_v2 +table::make_reader_v2(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -153,7 +153,7 @@ table::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { if (_virtual_reader) [[unlikely]] { - return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr); + return (*_virtual_reader).make_reader_v2(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr); } std::vector readers; @@ -205,14 +205,26 @@ table::make_reader(schema_ptr s, readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } - auto comb_reader = downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr)); + auto comb_reader = make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); if (_config.data_listeners && !_config.data_listeners->empty()) { - return _config.data_listeners->on_read(s, range, slice, std::move(comb_reader)); + return upgrade_to_v2(_config.data_listeners->on_read(s, range, slice, downgrade_to_v1(std::move(comb_reader)))); } else { return comb_reader; } } +flat_mutation_reader +table::make_reader(schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const { + return downgrade_to_v1(make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); +} + sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional subdir) { sstring dir = _config.datadir; if (subdir) { @@ -630,7 +642,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ metadata.max_timestamp = old->get_max_timestamp(); auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count()); - auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader reader) mutable -> future<> { + auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader_v2 reader) mutable -> future<> { auto&& priority = service::get_local_memtable_flush_priority(); sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable"); cfg.backup = incremental_backups_enabled(); @@ -642,7 +654,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_strategy, old->get_max_timestamp()); - co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg, priority); + co_return co_await write_memtable_to_sstable(downgrade_to_v1(std::move(reader)), *old, newtab, estimated_partitions, monitor, cfg, priority); }); flat_mutation_reader reader = old->make_flush_reader( @@ -664,7 +676,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ co_return stop_iteration::yes; } - auto f = consumer(std::move(reader)); + auto f = consumer(upgrade_to_v2(std::move(reader))); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to @@ -2163,7 +2175,7 @@ table::as_mutation_source() const { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/streaming/consumer.cc b/streaming/consumer.cc index ab7e89c5e4..baabccdbe6 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -47,7 +47,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy(); const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); - auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer end_consumer) mutable { + auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable { // postpone data segregation to off-strategy compaction if enabled if (offstrategy) { return end_consumer; @@ -56,7 +56,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o }; auto consumer = make_interposer_consumer(metadata, - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader reader) { + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader_v2 reader) { sstables::shared_sstable sst; try { sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); @@ -68,7 +68,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o schema_ptr s = reader.schema(); auto& pc = service::get_local_streaming_priority(); - return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, + return sst->write_components(downgrade_to_v1(std::move(reader)), adjusted_estimated_partitions, s, cf->get_sstables_manager().configure_writer(origin), encoding_stats{}, pc).then([sst] { return sst->open_data(); @@ -86,7 +86,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o return vug.local().register_staging_sstable(sst, std::move(cf)); }); }); - co_return co_await consumer(std::move(reader)); + co_return co_await consumer(upgrade_to_v2(std::move(reader))); } catch (...) { ex = std::current_exception(); } diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 4cb7786f9b..a4bdbc1220 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2951,7 +2951,7 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p return api::max_timestamp; }; auto gc_grace_seconds = schema.gc_grace_seconds(); - auto consumer = make_stable_flattened_mutations_consumer>( + auto consumer = compact_for_compaction( schema, query_time, get_max_purgeable, diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index 30d26a589e..da13034d86 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -191,6 +191,8 @@ class test_bucket_writer { mutation_opt _current_mutation; bool _is_first_mutation = true; + range_tombstone_change _current_rtc; + size_t _throw_after; size_t _mutation_consumed = 0; @@ -246,8 +248,10 @@ private: } verify_row_bucket_id(cr.cells(), column_kind::regular_column); } - void verify_range_tombstone(const range_tombstone& rt) { - check_timestamp(rt.tomb.timestamp); + void verify_range_tombstone_change(const range_tombstone_change& rtc) { + if (rtc.tombstone()) { + check_timestamp(rtc.tombstone().timestamp); + } } void maybe_throw() { @@ -263,6 +267,7 @@ public: , _permit(std::move(permit)) , _classify(std::move(classify)) , _buckets(buckets) + , _current_rtc(position_in_partition::before_all_clustered_rows(), tombstone()) , _throw_after(throw_after) { } void consume_new_partition(const dht::decorated_key& dk) { @@ -290,17 +295,22 @@ public: _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(cr))); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { maybe_throw(); BOOST_REQUIRE(_current_mutation); - verify_range_tombstone(rt); - _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt))); + verify_range_tombstone_change(rtc); + if (_current_rtc.tombstone()) { + auto rt = range_tombstone(_current_rtc.position(), position_in_partition_view::before_key(rtc.position()), _current_rtc.tombstone()); + _current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt))); + } + _current_rtc = std::move(rtc); return stop_iteration::no; } stop_iteration consume_end_of_partition() { maybe_throw(); BOOST_REQUIRE(_current_mutation); BOOST_REQUIRE(_bucket_id); + BOOST_REQUIRE(!_current_rtc.tombstone()); auto& bucket = _buckets[*_bucket_id]; if (_is_first_mutation) { @@ -352,13 +362,13 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) { std::unordered_map> buckets; - auto consumer = [&] (flat_mutation_reader bucket_reader) { - return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) { + auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) { + return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) { return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets)); }); }; - segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); + segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); testlog.debug("Data split into {} buckets: {}", buckets.size(), boost::copy_range>(buckets | boost::adaptors::map_keys)); @@ -422,14 +432,14 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) { int throw_after = tests::random::get_int(muts.size() - 1); testlog.info("Will raise exception after {}/{} mutations", throw_after, muts.size()); - auto consumer = [&] (flat_mutation_reader bucket_reader) { - return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) { + auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) { + return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) { return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets, throw_after)); }); }; try { - segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); + segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get(); } catch (const test_bucket_writer::expected_exception&) { BOOST_TEST_PASSPOINT(); } catch (const seastar::broken_promise&) { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 196ec27ff3..6edd3c226c 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3372,7 +3372,7 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { gc_before = gc_now - s->gc_grace_seconds(); auto gc_grace_seconds = s->gc_grace_seconds(); - auto cfc = make_stable_flattened_mutations_consumer>( + auto cfc = compact_for_compaction( *s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr)); auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options());