/* * Copyright (C) 2021-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include #include #include "mutation/mutation_fragment_v2.hh" #include "mutation/mutation.hh" #include "mutation/mutation_consumer_concepts.hh" #include "reader_permit.hh" #include "seastarx.hh" /// \brief Represents a stream of mutation fragments. /// /// Mutation fragments represent writes to the database. /// /// Each fragment has an implicit position in the database, /// which also determines its position in the stream relative to other fragments. /// The global position of a fragment is a tuple ordered lexicographically: /// /// (ring_position of a partition key, position_in_partition) /// /// The stream has a hierarchical form. All fragments which occur /// between partition_start and partition_end represent writes to the partition /// identified by the partition_start::key(). The partition key is not repeated /// with inner fragments. /// /// The stream of mutation fragments conforms to the following form: /// /// stream ::= partition* /// partition ::= partition_start static_row? clustered* partition_end /// clustered ::= clustering_row | range_tombstone_change /// /// Deletions of ranges of rows within a given partition are represented with range_tombstone_change fragments. /// At any point in the stream there is a single active clustered tombstone. /// It is initially equal to the neutral tombstone when the stream of each partition starts. /// range_tombstone_change fragments signify changes of the active clustered tombstone. /// All fragments emitted while a given clustered tombstone is active are affected by that tombstone. /// The clustered tombstone is independent from the partition tombstone carried in partition_start. /// The partition tombstone takes effect for all fragments within the partition. /// /// The stream guarantees that each partition ends with a neutral active clustered tombstone /// by closing active tombstones with a range_tombstone_change. /// In fast-forwarding mode, each sub-stream ends with a neutral active clustered tombstone. /// /// All fragments within a partition have weakly monotonically increasing position(). /// Consecutive range_tombstone_change fragments may share the position. /// All clustering row fragments within a partition have strictly monotonically increasing position(). /// /// \section Clustering restrictions /// /// A stream may produce writes relevant to only some clustering ranges, for /// example by specifying clustering ranges in a partition_slice passed to /// mutation_source::make_reader(). This will make the stream return information /// for a subset of writes that it would normally return should the stream be /// unrestricted. /// /// The restricted stream obeys the following rules: /// /// 0) The stream must contain fragments corresponding to all writes /// which are relevant to the requested ranges. /// /// 1) The ranges of non-neutral clustered tombstones must be enclosed in requested /// ranges. In other words, range tombstones don't extend beyond boundaries of requested ranges. /// /// 2) The stream will not return writes which are absent in the unrestricted stream, /// both for the requested clustering ranges and not requested ranges. /// This means that it's safe to populate cache with all the returned information. /// Even though it may be incomplete for non-requested ranges, it won't contain /// incorrect information. /// /// 3) All clustered fragments have position() which is within the requested /// ranges or, in case of range_tombstone_change fragments, equal to the end bound. /// /// 4) Streams may produce redundant range_tombstone_change fragments /// which do not change the current clustered tombstone, or have the same position. /// /// \section Intra-partition fast-forwarding mode /// /// The stream can operate in an alternative mode when streamed_mutation::forwarding::yes /// is passed to the stream constructor (see mutation_source). /// /// In this mode, the original stream is not produced at once, but divided into sub-streams, where /// each is produced at a time, ending with the end-of-stream condition (is_end_of_stream()). /// The user needs to explicitly advance the stream to the next sub-stream by calling /// fast_forward_to() or next_partition(). /// /// The original stream is divided like this: /// /// 1) For every partition, the first sub-stream will contain /// partition_start and the static_row /// /// 2) Calling fast_forward_to() moves to the next sub-stream within the /// current partition. The stream will contain all fragments relevant to /// the position_range passed to fast_forward_to(). /// /// 3) The position_range passed to fast_forward_to() is a clustering key restriction. /// Same rules apply as with clustering restrictions described above. /// /// 4) The sub-stream will not end with a non-neutral active clustered tombstone. All range tombstones are closed. /// /// 5) partition_end is never emitted, the user needs to call next_partition() /// to move to the next partition in the original stream, which will open /// the initial sub-stream of the next partition. /// An empty sub-stream after next_partition() indicates global end-of-stream (no next partition). /// /// \section Consuming /// /// The best way to consume those mutation_fragments is to call /// mutation_reader::consume with a consumer that receives the fragments. class mutation_reader final { public: class partition_range_forwarding_tag; using forwarding = bool_class; using tracked_buffer = circular_buffer>; class impl { private: tracked_buffer _buffer; size_t _buffer_size = 0; bool _close_required = false; protected: size_t max_buffer_size_in_bytes = default_max_buffer_size_in_bytes(); // The stream producer should set this to indicate that there are no // more fragments to produce. // Calling fill_buffer() will not add any new fragments // unless the reader is fast-forwarded to a new range. bool _end_of_stream = false; schema_ptr _schema; reader_permit _permit; friend class mutation_reader; protected: template void push_mutation_fragment(Args&&... args) { seastar::memory::on_alloc_point(); // for exception safety tests _buffer.emplace_back(std::forward(args)...); _buffer_size += _buffer.back().memory_usage(); } void clear_buffer() { _buffer.erase(_buffer.begin(), _buffer.end()); _buffer_size = 0; } void reserve_additional(size_t n) { _buffer.reserve(_buffer.size() + n); } void clear_buffer_to_next_partition(); template future fill_buffer_from(Source&); const tracked_buffer& buffer() const { return _buffer; } public: impl(schema_ptr s, reader_permit permit) : _buffer(permit), _schema(std::move(s)), _permit(std::move(permit)) { } virtual ~impl() {} virtual future<> fill_buffer() = 0; virtual future<> next_partition() = 0; bool is_end_of_stream() const { return _end_of_stream; } bool is_buffer_empty() const { return _buffer.empty(); } bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; } bool is_close_required() const { return _close_required; } void set_close_required() { _close_required = true; } static constexpr size_t default_max_buffer_size_in_bytes() { return 8 * 1024; } mutation_fragment_v2 pop_mutation_fragment() { auto mf = std::move(_buffer.front()); _buffer.pop_front(); _buffer_size -= mf.memory_usage(); return mf; } void unpop_mutation_fragment(mutation_fragment_v2 mf) { const auto memory_usage = mf.memory_usage(); _buffer.emplace_front(std::move(mf)); _buffer_size += memory_usage; } future operator()() { if (is_buffer_empty()) { if (is_end_of_stream()) { return make_ready_future(); } return fill_buffer().then([this] { return operator()(); }); } return make_ready_future(pop_mutation_fragment()); } template requires FlatMutationReaderConsumerV2 // Stops when consumer returns stop_iteration::yes or end of stream is reached. // Next call will start from the next mutation_fragment_v2 in the stream. future<> consume_pausable(Consumer consumer) { return repeat([this, consumer = std::move(consumer)] () mutable { if (is_buffer_empty()) { if (is_end_of_stream()) { return make_ready_future(stop_iteration::yes); } return fill_buffer().then([] { return make_ready_future(stop_iteration::no); }); } if constexpr (std::is_same_v, decltype(consumer(pop_mutation_fragment()))>) { return consumer(pop_mutation_fragment()); } else { auto result = stop_iteration::no; while ((result = consumer(pop_mutation_fragment())) != stop_iteration::yes && !is_buffer_empty() && !need_preempt()) {} return make_ready_future(result); } }); } template requires FlatMutationReaderConsumerV2 && FlattenedConsumerFilterV2 // A variant of consume_pausable() that expects to be run in // a seastar::thread. // Partitions for which filter(decorated_key) returns false are skipped // entirely and never reach the consumer. void consume_pausable_in_thread(Consumer consumer, Filter filter) { while (true) { thread::maybe_yield(); if (is_buffer_empty()) { if (is_end_of_stream()) { return; } fill_buffer().get(); continue; } auto mf = pop_mutation_fragment(); if (mf.is_partition_start() && !filter(mf.as_partition_start().key())) { next_partition().get(); continue; } if (!filter(mf)) { continue; } auto do_stop = futurize_invoke([&consumer, mf = std::move(mf)] () mutable { return consumer(std::move(mf)); }); if (do_stop.get()) { return; } } }; private: template struct consumer_adapter { mutation_reader::impl& _reader; std::optional _decorated_key; Consumer _consumer; consumer_adapter(mutation_reader::impl& reader, Consumer c) : _reader(reader) , _consumer(std::move(c)) { } future operator()(mutation_fragment_v2&& mf) { return std::move(mf).consume(*this); } future consume(static_row&& sr) { return handle_result(_consumer.consume(std::move(sr))); } future consume(clustering_row&& cr) { return handle_result(_consumer.consume(std::move(cr))); } future consume(range_tombstone_change&& rt) { return handle_result(_consumer.consume(std::move(rt))); } future consume(partition_start&& ps) { _decorated_key.emplace(std::move(ps.key())); _consumer.consume_new_partition(*_decorated_key); if (ps.partition_tombstone()) { _consumer.consume(ps.partition_tombstone()); } return make_ready_future(stop_iteration::no); } future consume(partition_end&& pe) { return futurize_invoke([this] { return _consumer.consume_end_of_partition(); }); } private: future handle_result(stop_iteration si) { if (si) { if (_consumer.consume_end_of_partition()) { return make_ready_future(stop_iteration::yes); } return _reader.next_partition().then([] { return make_ready_future(stop_iteration::no); }); } return make_ready_future(stop_iteration::no); } }; public: template requires FlattenedConsumerV2 // Stops when consumer returns stop_iteration::yes from consume_end_of_partition or end of stream is reached. // Next call will receive fragments from the next partition. // When consumer returns stop_iteration::yes from methods other than consume_end_of_partition then the read // of the current partition is ended, consume_end_of_partition is called and if it returns stop_iteration::no // then the read moves to the next partition. // Reference to the decorated key that is passed to consume_new_partition() remains valid until after // the call to consume_end_of_partition(). // // This method is useful because most of current consumers use this semantic. // // // This method returns whatever is returned from Consumer::consume_end_of_stream().S auto consume(Consumer consumer) { return do_with(consumer_adapter(*this, std::move(consumer)), [this] (consumer_adapter& adapter) { return consume_pausable(std::ref(adapter)).then([&adapter] { return adapter._consumer.consume_end_of_stream(); }); }); } template requires FlattenedConsumerV2 && FlattenedConsumerFilterV2 // A variant of consumee() that expects to be run in a seastar::thread. // Partitions for which filter(decorated_key) returns false are skipped // entirely and never reach the consumer. auto consume_in_thread(Consumer consumer, Filter filter) { auto adapter = consumer_adapter(*this, std::move(consumer)); consume_pausable_in_thread(std::ref(adapter), std::move(filter)); filter.on_end_of_stream(); return adapter._consumer.consume_end_of_stream(); }; /* * fast_forward_to is forbidden on mutation_reader created for a single partition. */ virtual future<> fast_forward_to(const dht::partition_range&) = 0; virtual future<> fast_forward_to(position_range) = 0; // close should cancel any outstanding background operations, // if possible, and wait on them to complete. // It should also transitively close underlying resources // and wait on them too. // // Once closed, the reader should be unusable. // // Similar to destructors, close must never fail. virtual future<> close() noexcept = 0; size_t buffer_size() const { return _buffer_size; } tracked_buffer detach_buffer() noexcept { _buffer_size = 0; return std::exchange(_buffer, tracked_buffer(tracking_allocator(_permit))); } void move_buffer_content_to(impl& other) { if (other._buffer.empty()) { std::swap(_buffer, other._buffer); other._buffer_size = std::exchange(_buffer_size, 0); } else { seastar::memory::on_alloc_point(); // for exception safety tests other._buffer.reserve(other._buffer.size() + _buffer.size()); std::move(_buffer.begin(), _buffer.end(), std::back_inserter(other._buffer)); _buffer.clear(); other._buffer_size += std::exchange(_buffer_size, 0); } } std::exception_ptr get_abort_exception() const noexcept { return _permit.get_abort_exception(); } db::timeout_clock::time_point timeout() const noexcept { return _permit.timeout(); } void set_timeout(db::timeout_clock::time_point timeout) noexcept { _permit.set_timeout(timeout); } }; private: std::unique_ptr _impl; mutation_reader() = default; explicit operator bool() const noexcept { return bool(_impl); } friend class optimized_optional; void do_upgrade_schema(const schema_ptr&); static void on_close_error(std::unique_ptr, std::exception_ptr ep) noexcept; public: mutation_reader(std::unique_ptr impl) noexcept : _impl(std::move(impl)) {} mutation_reader(const mutation_reader&) = delete; mutation_reader(mutation_reader&&) = default; mutation_reader& operator=(const mutation_reader&) = delete; mutation_reader& operator=(mutation_reader&& o) noexcept; ~mutation_reader(); future operator()() { _impl->set_close_required(); return _impl->operator()(); } template requires FlatMutationReaderConsumerV2 auto consume_pausable(Consumer consumer) { _impl->set_close_required(); return _impl->consume_pausable(std::move(consumer)); } template requires FlattenedConsumerV2 auto consume(Consumer consumer) { _impl->set_close_required(); return _impl->consume(std::move(consumer)); } class filter { private: std::function _partition_filter = [] (const dht::decorated_key&) { return true; }; std::function _mutation_fragment_filter = [] (const mutation_fragment_v2&) { return true; }; public: filter() = default; filter(std::function&& pf) : _partition_filter(std::move(pf)) { } filter(std::function&& pf, std::function&& mf) : _partition_filter(std::move(pf)) , _mutation_fragment_filter(std::move(mf)) { } template filter(Functor&& f) : _partition_filter(std::forward(f)) { } bool operator()(const dht::decorated_key& dk) const { return _partition_filter(dk); } bool operator()(const mutation_fragment_v2& mf) const { return _mutation_fragment_filter(mf); } void on_end_of_stream() const { } }; struct no_filter { bool operator()(const dht::decorated_key& dk) const { return true; } bool operator()(const mutation_fragment_v2& mf) const { return true; } void on_end_of_stream() const { } }; template requires FlattenedConsumerV2 && FlattenedConsumerFilterV2 auto consume_in_thread(Consumer consumer, Filter filter) { _impl->set_close_required(); return _impl->consume_in_thread(std::move(consumer), std::move(filter)); } template requires FlattenedConsumerV2 auto consume_in_thread(Consumer consumer) { _impl->set_close_required(); return consume_in_thread(std::move(consumer), no_filter{}); } // Skips to the next partition. // // Skips over the remaining fragments of the current partitions. If the // reader is currently positioned at a partition start nothing is done. // // If the last produced fragment comes from partition `P`, then the reader // is considered to still be in partition `P`, which means that `next_partition` // will move the reader to the partition immediately following `P`. // This case happens in particular when the last produced fragment was // `partition_end` for `P`. // // Only skips within the current partition range, i.e. if the current // partition is the last in the range the reader will be at EOS. // // Can be used to skip over entire partitions if interleaved with // `operator()()` calls. future<> next_partition() { _impl->set_close_required(); return _impl->next_partition(); } future<> fill_buffer() { _impl->set_close_required(); return _impl->fill_buffer(); } // Changes the range of partitions to pr. The range can only be moved // forwards. pr.begin() needs to be larger than pr.end() of the previously // used range (i.e. either the initial one passed to the constructor or a // previous fast forward target). // pr needs to be valid until the reader is destroyed or fast_forward_to() // is called again. future<> fast_forward_to(const dht::partition_range& pr) { _impl->set_close_required(); return _impl->fast_forward_to(pr); } // Skips to a later range of rows. // The new range must not overlap with the current range. // // In forwarding mode the stream does not return all fragments right away, // but only those belonging to the current clustering range. Initially // current range only covers the static row. The stream can be forwarded // (even before end-of- stream) to a later range with fast_forward_to(). // Forwarding doesn't change initial restrictions of the stream, it can // only be used to skip over data. // // Monotonicity of positions is preserved by forwarding. That is fragments // emitted after forwarding will have greater positions than any fragments // emitted before forwarding. // // For any range, all range tombstones relevant for that range which are // present in the original stream will be emitted. Range tombstones // emitted before forwarding which overlap with the new range are not // necessarily re-emitted. // // When forwarding mode is not enabled, fast_forward_to() // cannot be used. // // `fast_forward_to` can be called only when the reader is within a partition // and it affects the set of fragments returned from that partition. // In particular one must first enter a partition by fetching a `partition_start` // fragment before calling `fast_forward_to`. future<> fast_forward_to(position_range cr) { _impl->set_close_required(); return _impl->fast_forward_to(std::move(cr)); } // Closes the reader. // // Note: The reader object can can be safely destroyed after close returns. // since close makes sure to keep the underlying impl object alive until // the latter's close call is resolved. future<> close() noexcept { if (auto i = std::move(_impl)) { auto f = i->close(); // most close implementations are expected to return a ready future // so expedite processing it. if (f.available() && !f.failed()) { return f; } // close must not fail return f.handle_exception([i = std::move(i)] (std::exception_ptr ep) mutable { on_close_error(std::move(i), std::move(ep)); }); } return make_ready_future<>(); } // Returns true iff the stream reached the end. // There are no more fragments in the buffer and calling // fill_buffer() will not add any. bool is_end_of_stream() const { return _impl->is_end_of_stream() && is_buffer_empty(); } bool is_buffer_empty() const { return _impl->is_buffer_empty(); } bool is_buffer_full() const { return _impl->is_buffer_full(); } static constexpr size_t default_max_buffer_size_in_bytes() { return impl::default_max_buffer_size_in_bytes(); } mutation_fragment_v2 pop_mutation_fragment() { return _impl->pop_mutation_fragment(); } void unpop_mutation_fragment(mutation_fragment_v2 mf) { _impl->unpop_mutation_fragment(std::move(mf)); } const schema_ptr& schema() const { return _impl->_schema; } const reader_permit& permit() const { return _impl->_permit; } reader_permit& permit() { return _impl->_permit; } db::timeout_clock::time_point timeout() const noexcept { return _impl->timeout(); } void set_timeout(db::timeout_clock::time_point timeout) noexcept { _impl->set_timeout(timeout); } void set_max_buffer_size(size_t size) { _impl->max_buffer_size_in_bytes = size; } // Resolves with a pointer to the next fragment in the stream without consuming it from the stream, // or nullptr if there are no more fragments. // The returned pointer is invalidated by any other non-const call to this object. future peek() { if (!is_buffer_empty()) { return make_ready_future(&_impl->_buffer.front()); } if (is_end_of_stream()) { return make_ready_future(nullptr); } return fill_buffer().then([this] { return peek(); }); } // A peek at the next fragment in the buffer. // Cannot be called if is_buffer_empty() returns true. const mutation_fragment_v2& peek_buffer() const { return _impl->_buffer.front(); } // The actual buffer size of the reader. // Although we consistently refer to this as buffer size throughout the code // we really use "buffer size" as the size of the collective memory // used by all the mutation fragments stored in the buffer of the reader. size_t buffer_size() const { return _impl->buffer_size(); } const tracked_buffer& buffer() const { return _impl->buffer(); } // Detach the internal buffer of the reader. // Roughly equivalent to depleting it by calling pop_mutation_fragment() // until is_buffer_empty() returns true. // The reader will need to allocate a new buffer on the next fill_buffer() // call. tracked_buffer detach_buffer() noexcept { return _impl->detach_buffer(); } // Moves the buffer content to `other`. // // If the buffer of `other` is empty this is very efficient as the buffers // are simply swapped. Otherwise the content of the buffer is moved // fragmuent-by-fragment. // Allows efficient implementation of wrapping readers that do no // transformation to the fragment stream. void move_buffer_content_to(impl& other) { _impl->move_buffer_content_to(other); } // Causes this reader to conform to s. // Multiple calls of upgrade_schema() compose, effects of prior calls on the stream are preserved. void upgrade_schema(const schema_ptr& s) { if (s != schema()) [[unlikely]] { do_upgrade_schema(s); } } }; using mutation_reader_opt = optimized_optional; template mutation_reader make_mutation_reader(Args &&... args) { return mutation_reader(std::make_unique(std::forward(args)...)); } // Consumes mutation fragments until StopCondition is true. // The consumer will stop iff StopCondition returns true, in particular // reaching the end of stream alone won't stop the reader. template requires requires(StopCondition stop, ConsumeMutationFragment consume_mf, ConsumeEndOfStream consume_eos, mutation_fragment_v2 mf) { { stop() } -> std::same_as; { consume_mf(std::move(mf)) } -> std::same_as; { consume_eos() } -> std::same_as>; } future<> consume_mutation_fragments_until( mutation_reader& r, StopCondition&& stop, ConsumeMutationFragment&& consume_mf, ConsumeEndOfStream&& consume_eos) { return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos] { while (!r.is_buffer_empty()) { consume_mf(r.pop_mutation_fragment()); if (stop() || need_preempt()) { return make_ready_future<>(); } } if (r.is_end_of_stream()) { return consume_eos(); } return r.fill_buffer(); }); } // Creates a stream which is like r but with transformation applied to the elements. template requires StreamedMutationTranformerV2 mutation_reader transform(mutation_reader r, T t) { class transforming_reader : public mutation_reader::impl { mutation_reader _reader; T _t; struct consumer { transforming_reader* _owner; stop_iteration operator()(mutation_fragment_v2&& mf) { _owner->push_mutation_fragment(_owner->_t(std::move(mf))); return stop_iteration(_owner->is_buffer_full()); } }; public: transforming_reader(mutation_reader&& r, T&& t) : impl(t(r.schema()), r.permit()) , _reader(std::move(r)) , _t(std::move(t)) {} virtual future<> fill_buffer() override { if (_end_of_stream) { return make_ready_future<>(); } return _reader.consume_pausable(consumer{this}).then([this] { if (_reader.is_end_of_stream()) { _end_of_stream = true; } }); } virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { return _reader.next_partition(); } return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr) override { clear_buffer(); _end_of_stream = false; return _reader.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range pr) override { clear_buffer(); _end_of_stream = false; return _reader.fast_forward_to(std::move(pr)); } virtual future<> close() noexcept override { return _reader.close(); } }; return make_mutation_reader(std::move(r), std::move(t)); } // Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read. future read_mutation_from_mutation_reader(mutation_reader&); // Calls the consumer for each element of the reader's stream until end of stream // is reached or the consumer requests iteration to stop by returning stop_iteration::yes. // The consumer should accept mutation as the argument and return stop_iteration. // The returned future<> resolves when consumption ends. template requires MutationConsumer inline future<> consume_partitions(mutation_reader& reader, Consumer consumer) { return do_with(std::move(consumer), [&reader] (Consumer& c) -> future<> { return repeat([&reader, &c] () { return read_mutation_from_mutation_reader(reader).then([&c] (mutation_opt&& mo) -> future { if (!mo) { return make_ready_future(stop_iteration::yes); } return futurize_invoke(c, std::move(*mo)); }); }); }); } /// A consumer function that is passed a mutation_reader to be consumed from /// and returns a future<> resolved when the reader is fully consumed, and closed. /// Note: the function assumes ownership of the reader and must close it in all cases. using mutation_reader_consumer = std::function (mutation_reader)>;