/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "utils/assert.hh" #include #include #include #include "dht/sharder.hh" #include "readers/empty.hh" #include "readers/evictable.hh" #include "readers/mutation_reader.hh" #include "readers/foreign.hh" #include "readers/multishard.hh" #include "readers/mutation_source.hh" #include "schema/schema_registry.hh" #include "locator/abstract_replication_strategy.hh" extern logger mrlog; namespace { struct remote_fill_buffer_result { foreign_ptr> buffer; bool end_of_stream = false; remote_fill_buffer_result() = default; remote_fill_buffer_result(mutation_reader::tracked_buffer&& buffer, bool end_of_stream) : buffer(make_foreign(std::make_unique(std::move(buffer)))) , end_of_stream(end_of_stream) { } }; } /// See make_foreign_reader() for description. class foreign_reader : public mutation_reader::impl { template using foreign_unique_ptr = foreign_ptr>; using fragment_buffer = mutation_reader::tracked_buffer; foreign_unique_ptr _reader; foreign_unique_ptr> _read_ahead_future; streamed_mutation::forwarding _fwd_sm; // Forward an operation to the reader on the remote shard. // If the remote reader has an ongoing read-ahead, bring it to the // foreground (wait on it) and execute the operation after. // After the operation completes, kick off a new read-ahead (fill_buffer()) // and move it to the background (save it's future but don't wait on it // now). If all works well read-aheads complete by the next operation and // we don't have to wait on the remote reader filling its buffer. template >> Result forward_operation(Operation op) { reader_permit::awaits_guard awaits_guard{_permit}; return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), read_ahead_future = std::exchange(_read_ahead_future, nullptr), op = std::move(op)] () mutable { auto exec_op_and_read_ahead = [=] () mutable { // Not really variadic, we expect 0 (void) or 1 parameter. return op().then([=] (auto... result) { auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer()); return make_ready_future>, decltype(result)...>>( std::tuple(make_foreign(std::move(f)), std::move(result)...)); }); }; if (read_ahead_future) { return read_ahead_future->then(std::move(exec_op_and_read_ahead)); } else { return exec_op_and_read_ahead(); } }).then([this] (auto fut_and_result) { _read_ahead_future = std::get<0>(std::move(fut_and_result)); // NOLINT(bugprone-use-after-move) static_assert(std::tuple_size::value <= 2); if constexpr (std::tuple_size::value == 1) { return make_ready_future<>(); } else { auto result = std::get<1>(std::move(fut_and_result)); // NOLINT(bugprone-use-after-move) return make_ready_future(std::move(result)); } }).finally([awaits_guard = std::move(awaits_guard)] { }); } public: foreign_reader(schema_ptr schema, reader_permit permit, foreign_unique_ptr reader, streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no); // this is captured. foreign_reader(const foreign_reader&) = delete; foreign_reader& operator=(const foreign_reader&) = delete; foreign_reader(foreign_reader&&) = delete; foreign_reader& operator=(foreign_reader&&) = delete; virtual future<> fill_buffer() override; virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr) override; virtual future<> fast_forward_to(position_range pr) override; virtual future<> close() noexcept override; }; foreign_reader::foreign_reader(schema_ptr schema, reader_permit permit, foreign_unique_ptr reader, streamed_mutation::forwarding fwd_sm) : impl(std::move(schema), std::move(permit)) , _reader(std::move(reader)) , _fwd_sm(fwd_sm) { } future<> foreign_reader::fill_buffer() { if (_end_of_stream || is_buffer_full()) { return make_ready_future(); } return forward_operation([reader = _reader.get()] () { auto f = reader->is_buffer_empty() ? reader->fill_buffer() : make_ready_future<>(); return f.then([=] { return make_ready_future(remote_fill_buffer_result(reader->detach_buffer(), reader->is_end_of_stream())); }); }).then([this] (remote_fill_buffer_result res) mutable { _end_of_stream = res.end_of_stream; for (const auto& mf : *res.buffer) { // Need a copy since the mf is on the remote shard. push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); } }); } future<> foreign_reader::next_partition() { if (_fwd_sm == streamed_mutation::forwarding::yes) { clear_buffer(); _end_of_stream = false; } else { clear_buffer_to_next_partition(); if (!is_buffer_empty()) { co_return; } _end_of_stream = false; } co_await forward_operation([reader = _reader.get()] () { return reader->next_partition(); }); } future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) { clear_buffer(); _end_of_stream = false; return forward_operation([reader = _reader.get(), &pr] () { return reader->fast_forward_to(pr); }); } future<> foreign_reader::fast_forward_to(position_range pr) { clear_buffer(); _end_of_stream = false; return forward_operation([reader = _reader.get(), pr = std::move(pr)] () { return reader->fast_forward_to(std::move(pr)); }); } future<> foreign_reader::close() noexcept { if (!_reader) { if (_read_ahead_future) { on_internal_error_noexcept(mrlog, "foreign_reader::close can't wait on read_ahead future with disengaged reader"); } return make_ready_future<>(); } return smp::submit_to(_reader.get_owner_shard(), [reader = std::move(_reader), read_ahead_future = std::exchange(_read_ahead_future, nullptr)] () mutable { auto read_ahead = read_ahead_future ? std::move(*read_ahead_future.get()) : make_ready_future<>(); return read_ahead.then_wrapped([reader = std::move(reader)] (future<> f) mutable { if (f.failed()) { auto ex = f.get_exception(); mrlog.warn("foreign_reader: benign read_ahead failure during close: {}. Ignoring.", ex); } return reader->close(); }); }); } mutation_reader make_foreign_reader(schema_ptr schema, reader_permit permit, foreign_ptr> reader, streamed_mutation::forwarding fwd_sm) { if (reader.get_owner_shard() == this_shard_id()) { return std::move(*reader); } return make_mutation_reader(std::move(schema), std::move(permit), std::move(reader), fwd_sm); } template static void require(bool condition, fmt::format_string fmt, Arg&&... arg) { if (!condition) { on_internal_error(mrlog, seastar::format(fmt, std::forward(arg)...)) ; } } // Encapsulates all data and logic that is local to the remote shard the // reader lives on. class evictable_reader : public mutation_reader::impl { public: using auto_pause = bool_class; class auto_pause_disable_guard { evictable_reader& _reader; public: auto_pause_disable_guard(evictable_reader& reader) : _reader(reader) { _reader._auto_pause = auto_pause::no; } ~auto_pause_disable_guard() { _reader._auto_pause = auto_pause::yes; _reader.pause(); } }; private: auto_pause _auto_pause; mutation_source _ms; const dht::partition_range* _pr; const query::partition_slice& _ps; tracing::global_trace_state_ptr _trace_state; const mutation_reader::forwarding _fwd_mr; reader_concurrency_semaphore::inactive_read_handle _irh; bool _reader_recreated = false; // set if reader was recreated since last operation position_in_partition::tri_compare _tri_cmp; std::optional _last_pkey; position_in_partition _next_position_in_partition = position_in_partition::for_partition_start(); // These are used when the reader has to be recreated (after having been // evicted while paused) and the range and/or slice it is recreated with // differs from the original ones. std::optional _range_override; std::optional _slice_override; mutation_reader_opt _reader; private: void do_pause(mutation_reader reader) noexcept; void maybe_pause(mutation_reader reader) noexcept; mutation_reader_opt try_resume(); void update_next_position(); void adjust_partition_slice(); mutation_reader recreate_reader(); future resume_or_create_reader(); void validate_partition_start(const partition_start& ps); void validate_position_in_partition(position_in_partition_view pos) const; void examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3); public: evictable_reader( auto_pause ap, mutation_source ms, mutation_reader_opt reader, schema_ptr schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr); virtual future<> fill_buffer() override; virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr) override; virtual future<> fast_forward_to(position_range) override { throw_with_backtrace(); } virtual future<> close() noexcept override { if (_reader) { return _reader->close(); } if (auto reader_opt = try_resume()) { return reader_opt->close(); } return make_ready_future<>(); } reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && { return std::move(_irh); } void pause() { if (_reader) { do_pause(std::move(*_reader)); } } reader_permit permit() { return _permit; } }; void evictable_reader::do_pause(mutation_reader reader) noexcept { SCYLLA_ASSERT(!_irh); _irh = _permit.semaphore().register_inactive_read(std::move(reader)); } void evictable_reader::maybe_pause(mutation_reader reader) noexcept { if (_auto_pause) { do_pause(std::move(reader)); } else { _reader = std::move(reader); } } mutation_reader_opt evictable_reader::try_resume() { if (auto reader_opt = _permit.semaphore().unregister_inactive_read(std::move(_irh))) { return std::move(*reader_opt); } return {}; } void evictable_reader::update_next_position() { if (is_buffer_empty()) { return; } auto rbegin = std::reverse_iterator(buffer().end()); auto rend = std::reverse_iterator(buffer().begin()); if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment_v2::is_partition_start)); pk_it != rend) { _last_pkey = pk_it->as_partition_start().key(); } const auto last_pos = buffer().back().position(); switch (last_pos.region()) { case partition_region::partition_start: _next_position_in_partition = position_in_partition::for_static_row(); break; case partition_region::static_row: _next_position_in_partition = position_in_partition::before_all_clustered_rows(); break; case partition_region::clustered: if (!_reader->is_buffer_empty() && _reader->peek_buffer().is_end_of_partition()) { push_mutation_fragment(_reader->pop_mutation_fragment()); _next_position_in_partition = position_in_partition::for_partition_start(); } else { _next_position_in_partition = position_in_partition::after_key(*_schema, last_pos); } break; case partition_region::partition_end: _next_position_in_partition = position_in_partition::for_partition_start(); break; } } void evictable_reader::adjust_partition_slice() { _slice_override = _ps; auto ranges = _slice_override->default_row_ranges(); query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition); _slice_override->clear_ranges(); _slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges)); } mutation_reader evictable_reader::recreate_reader() { const dht::partition_range* range = _pr; const query::partition_slice* slice = &_ps; _range_override.reset(); _slice_override.reset(); if (_last_pkey) { bool partition_range_is_inclusive = true; switch (_next_position_in_partition.region()) { case partition_region::partition_start: partition_range_is_inclusive = false; break; case partition_region::static_row: break; case partition_region::clustered: adjust_partition_slice(); slice = &*_slice_override; break; case partition_region::partition_end: partition_range_is_inclusive = false; break; } // The original range contained a single partition and we've read it // all. We'd have to create a reader with an empty range that would // immediately be at EOS. This is not possible so just create an empty // reader instead. // This should be extremely rare (who'd create a multishard reader to // read a single partition) but still, let's make sure we handle it // correctly. if (_pr->is_singular() && !partition_range_is_inclusive) { return make_empty_mutation_reader(_schema, _permit); } _range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end()); range = &*_range_override; _reader_recreated = true; } return _ms.make_mutation_reader( _schema, _permit, *range, *slice, _trace_state, streamed_mutation::forwarding::no, _fwd_mr); } future evictable_reader::resume_or_create_reader() { if (_reader) { co_return std::move(*_reader); } if (auto reader_opt = try_resume()) { co_return std::move(*reader_opt); } // When the reader is created the first time and we are actually resuming a // saved reader in `recreate_reader()`, we have two cases here: // * the reader is still alive (in inactive state) // * the reader was evicted // We check for this below with `needs_readmission()` and it is very // important to not allow for preemption between said check and // `recreate_reader()`, otherwise the reader might be evicted between the // check and `recreate_reader()` and the latter will recreate it without // waiting for re-admission. if (_permit.needs_readmission()) { co_await _permit.wait_readmission(); } co_return recreate_reader(); } void evictable_reader::validate_partition_start(const partition_start& ps) { const auto tri_cmp = dht::ring_position_comparator(*_schema); // If we recreated the reader after fast-forwarding it we won't have // _last_pkey set. In this case it is enough to check if the partition // is in range. if (_last_pkey) { const auto cmp_res = tri_cmp(*_last_pkey, ps.key()); if (_next_position_in_partition.region() != partition_region::partition_start) { // we expect to continue from the same partition // We cannot assume the partition we stopped the read at is still alive // when we recreate the reader. It might have been compacted away in the // meanwhile, so allow for a larger partition too. require( cmp_res <= 0, "{}(): validation failed, expected partition with key larger or equal to _last_pkey {}, but got {}", __FUNCTION__, *_last_pkey, ps.key()); // Reset next pos if we are not continuing from the same partition if (cmp_res < 0) { // Close previous partition, we are not going to continue it. push_mutation_fragment(*_schema, _permit, partition_end{}); _next_position_in_partition = position_in_partition::for_partition_start(); } } else { // should be a larger partition require( cmp_res < 0, "{}(): validation failed, expected partition with key larger than _last_pkey {}, but got {}", __FUNCTION__, *_last_pkey, ps.key()); } } const auto& prange = _range_override ? *_range_override : *_pr; require( // TODO: somehow avoid this copy prange.contains(ps.key(), tri_cmp), "{}(): validation failed, expected partition with key that falls into current range {}, but got {}", __FUNCTION__, prange, ps.key()); } void evictable_reader::validate_position_in_partition(position_in_partition_view pos) const { require( _tri_cmp(_next_position_in_partition, pos) <= 0, "{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}", __FUNCTION__, _next_position_in_partition, pos); if (_slice_override && pos.region() == partition_region::clustered) { const auto ranges = _slice_override->row_ranges(*_schema, _last_pkey->key()); const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) { // TODO: somehow avoid this copy auto range = position_range(cr); // We cannot use range.contains() because that treats range as a // [a, b) range, meaning a range tombstone change with position // after_key(b) will be considered outside of it. Such range // tombstone changes can be emitted however when recreating the // reader on clustering range edge. return _tri_cmp(range.start(), pos) <= 0 && _tri_cmp(pos, range.end()) <= 0; }); require( any_contains, "{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}", __FUNCTION__, *_slice_override, pos); } } void evictable_reader::examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3) { if (!mf1) { return; // the reader is at EOS } // If engaged, the first fragment is always a partition-start. validate_partition_start(mf1->as_partition_start()); if (_tri_cmp(mf1->position(), _next_position_in_partition) < 0) { mf1 = {}; // drop mf1 } const auto continue_same_partition = _next_position_in_partition.region() != partition_region::partition_start; // If we have a first fragment, we are guaranteed to have a second one -- if not else, a partition-end. if (mf2->is_end_of_partition()) { return; // no further fragments, nothing to do } // We want to validate the position of the first non-dropped fragment. // If mf2 is a static row and we need to drop it, this will be mf3. if (mf2->is_static_row() && _tri_cmp(mf2->position(), _next_position_in_partition) < 0) { mf2 = {}; // drop mf2 } else { if (continue_same_partition) { validate_position_in_partition(mf2->position()); } return; } if (mf3->is_end_of_partition()) { return; // no further fragments, nothing to do } else if (continue_same_partition) { validate_position_in_partition(mf3->position()); } } evictable_reader::evictable_reader( auto_pause ap, mutation_source ms, mutation_reader_opt reader, schema_ptr schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) : impl(std::move(schema), std::move(permit)) , _auto_pause(ap) , _ms(std::move(ms)) , _pr(&pr) , _ps(ps) , _trace_state(std::move(trace_state)) , _fwd_mr(fwd_mr) , _tri_cmp(*_schema) , _reader(std::move(reader)) { } future<> evictable_reader::fill_buffer() { if (is_end_of_stream()) { co_return; } _reader = co_await resume_or_create_reader(); if (_reader_recreated) { // Recreating the reader breaks snapshot isolation and creates all sorts // of complications around the continuity of range tombstone changes, // e.g. a range tombstone started by the previous reader object // might not exist anymore with the new reader object. // To avoid complications we reset the tombstone state on each reader // recreation by emitting a null tombstone change, if we read at least // one clustering fragment from the partition. if (_next_position_in_partition.region() == partition_region::clustered && _tri_cmp(_next_position_in_partition, position_in_partition::before_all_clustered_rows()) > 0) { push_mutation_fragment(*_schema, _permit, range_tombstone_change{position_in_partition_view::before_key(_next_position_in_partition), {}}); } auto mf1 = co_await (*_reader)(); auto mf2 = co_await (*_reader)(); auto mf3 = co_await (*_reader)(); examine_first_fragments(mf1, mf2, mf3); if (mf3) { _reader->unpop_mutation_fragment(std::move(*mf3)); } if (mf2) { _reader->unpop_mutation_fragment(std::move(*mf2)); } if (mf1) { _reader->unpop_mutation_fragment(std::move(*mf1)); } _reader_recreated = false; } else { co_await _reader->fill_buffer(); } _reader->move_buffer_content_to(*this); // Ensure that each buffer represents forward progress. Only a concern when // the last fragment in the buffer is range tombstone change. In this case // ensure that: // * buffer().back().position() > _next_position_in_partition; // * _reader.peek()->position() > buffer().back().position(); if (!is_buffer_empty() && buffer().back().is_range_tombstone_change()) { auto* next_mf = co_await _reader->peek(); // First make sure we've made progress w.r.t. _next_position_in_partition. // This loop becomes infinite when next pos is a partition start. // In that case progress is guaranteed anyway, so skip this loop entirely. while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(buffer().back().position(), _next_position_in_partition) <= 0) { push_mutation_fragment(_reader->pop_mutation_fragment()); next_mf = co_await _reader->peek(); } const auto last_pos = position_in_partition(buffer().back().position()); while (next_mf && _tri_cmp(last_pos, next_mf->position()) == 0) { push_mutation_fragment(_reader->pop_mutation_fragment()); next_mf = co_await _reader->peek(); } } update_next_position(); _end_of_stream = _reader->is_end_of_stream(); maybe_pause(std::move(*_reader)); } future<> evictable_reader::next_partition() { _next_position_in_partition = position_in_partition::for_partition_start(); clear_buffer_to_next_partition(); if (!is_buffer_empty()) { co_return; } auto reader = co_await resume_or_create_reader(); co_await reader.next_partition(); maybe_pause(std::move(reader)); } future<> evictable_reader::fast_forward_to(const dht::partition_range& pr) { _pr = ≺ _last_pkey.reset(); _next_position_in_partition = position_in_partition::for_partition_start(); clear_buffer(); _end_of_stream = false; if (_reader) { co_await _reader->fast_forward_to(pr); _range_override.reset(); co_return; } if (auto reader_opt = try_resume()) { std::exception_ptr ex; try { co_await reader_opt->fast_forward_to(pr); _range_override.reset(); } catch (...) { ex = std::current_exception(); } if (ex) { co_await reader_opt->close(); std::rethrow_exception(std::move(ex)); } maybe_pause(std::move(*reader_opt)); } } evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r) { } void evictable_reader_handle::evictable_reader_handle::pause() { _r->pause(); } mutation_reader make_auto_paused_evictable_reader( mutation_source ms, schema_ptr schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) { return make_mutation_reader(evictable_reader::auto_pause::yes, std::move(ms), std::nullopt, std::move(schema), std::move(permit), pr, ps, std::move(trace_state), fwd_mr); } std::pair make_manually_paused_evictable_reader( mutation_source ms, schema_ptr schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) { auto reader = std::make_unique(evictable_reader::auto_pause::no, std::move(ms), std::nullopt, std::move(schema), std::move(permit), pr, ps, std::move(trace_state), fwd_mr); auto handle = evictable_reader_handle(*reader.get()); return std::pair(mutation_reader(std::move(reader)), handle); } namespace { struct buffer_fill_hint { size_t size; dht::token stop_token; }; // A special-purpose shard reader. // // Shard reader manages a reader located on a remote shard. It transparently // supports read-ahead (background fill_buffer() calls). // This reader is not for general use, it was designed to serve the // multishard_combining_reader. // Although it implements the mutation_reader:impl interface it cannot be // wrapped into a mutation_reader, as it needs to be managed by a shared // pointer. class shard_reader : public mutation_reader::impl { private: shared_ptr _lifecycle_policy; const unsigned _shard; foreign_ptr> _pr; const query::partition_slice& _ps; tracing::global_trace_state_ptr _trace_state; const mutation_reader::forwarding _fwd_mr; std::optional> _read_ahead; foreign_ptr> _reader; private: future fill_reader_buffer(evictable_reader& reader, std::optional hint); future<> do_fill_buffer(std::optional hint); public: shard_reader( schema_ptr schema, reader_permit permit, shared_ptr lifecycle_policy, unsigned shard, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) : impl(std::move(schema), std::move(permit)) , _lifecycle_policy(std::move(lifecycle_policy)) , _shard(shard) , _pr(make_foreign(make_lw_shared(pr))) , _ps(ps) , _trace_state(std::move(trace_state)) , _fwd_mr(fwd_mr) { } shard_reader(shard_reader&&) = delete; shard_reader& operator=(shard_reader&&) = delete; shard_reader(const shard_reader&) = delete; shard_reader& operator=(const shard_reader&) = delete; const mutation_fragment_v2& peek_buffer() const { return buffer().front(); } future<> fill_buffer(std::optional hint); virtual future<> fill_buffer() override { return fill_buffer(std::nullopt); } virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr) override; virtual future<> fast_forward_to(position_range) override; virtual future<> close() noexcept override; bool done() const { return _reader && is_buffer_empty() && is_end_of_stream(); } void read_ahead(); bool is_read_ahead_in_progress() const { return _read_ahead.has_value(); } }; future<> shard_reader::close() noexcept { if (_read_ahead) { try { co_await *std::exchange(_read_ahead, std::nullopt); } catch (...) { auto ex = std::current_exception(); if (!is_timeout_exception(ex)) { mrlog.warn("shard_reader::close(): read_ahead on shard {} failed: {}", _shard, ex); } } } try { co_await smp::submit_to(_shard, [this] { if (!_reader) { return make_ready_future<>(); } auto irh = std::move(*_reader).inactive_read_handle(); return with_closeable(mutation_reader(_reader.release()), [this] (mutation_reader& reader) mutable { auto permit = reader.permit(); const auto& schema = *reader.schema(); auto unconsumed_fragments = reader.detach_buffer(); auto rit = std::reverse_iterator(buffer().cend()); auto rend = std::reverse_iterator(buffer().cbegin()); for (; rit != rend; ++rit) { unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard. } return unconsumed_fragments; }).then([this, irh = std::move(irh)] (mutation_reader::tracked_buffer&& buf) mutable { return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)}); }); }); } catch (...) { mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); } } future shard_reader::fill_reader_buffer(evictable_reader& reader, std::optional hint) { evictable_reader::auto_pause_disable_guard auto_pause_guard{reader}; reader_permit::need_cpu_guard ncpu_guard{reader.permit()}; co_await reader.fill_buffer(); if (!hint) { co_return remote_fill_buffer_result(reader.detach_buffer(), reader.is_end_of_stream()); } mutation_reader::tracked_buffer buffer(reader.permit()); size_t buffer_size = 0; auto drain_buffer_and_check_hint = [&] { bool reached_stop_token = false; while (!reader.is_buffer_empty()) { auto mf = reader.pop_mutation_fragment(); reached_stop_token |= mf.is_partition_start() && mf.as_partition_start().key().token() >= hint->stop_token; buffer.push_back(std::move(mf)); buffer_size += buffer.back().memory_usage(); } // Finish reading if reader is EOS or we read enough. // We read enough if we read at least max_buffer_size_in_bytes and we also fulfilled the hint. return reader.is_end_of_stream() || ((reached_stop_token || buffer_size >= hint->size) && buffer_size >= max_buffer_size_in_bytes); }; while (!drain_buffer_and_check_hint()) { co_await reader.fill_buffer(); } co_return remote_fill_buffer_result(std::move(buffer), reader.is_end_of_stream()); } future<> shard_reader::do_fill_buffer(std::optional hint) { struct reader_and_buffer_fill_result { foreign_ptr> reader; remote_fill_buffer_result result; }; auto res = co_await std::invoke([&] () -> future { if (!_reader) { reader_and_buffer_fill_result res = co_await smp::submit_to(_shard, coroutine::lambda([this, gs = global_schema_ptr(_schema), hint] () -> future { auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] ( schema_ptr s, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr ts, streamed_mutation::forwarding, mutation_reader::forwarding fwd_mr) { return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, std::move(ts), fwd_mr); }); auto s = gs.get(); auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout(), _trace_state); if (permit.needs_readmission()) { co_await permit.wait_readmission(); } auto underlying_reader = _lifecycle_policy->create_reader(s, permit, *_pr, _ps, _trace_state, _fwd_mr); std::exception_ptr ex; try { // The reader might have been saved from a previous page and // missed some fast-forwarding since the new page started. // Fast forward it to the correct range if that is the case. if (auto pr = _lifecycle_policy->get_read_range(); pr && _pr->start() && pr->after(_pr->start()->value(), dht::ring_position_comparator(*_schema))) { auto new_pr = _pr.get_owner_shard() == this_shard_id() ? _pr.release() : make_lw_shared(*_pr); co_await underlying_reader.fast_forward_to(*new_pr); _lifecycle_policy->update_read_range(new_pr); _pr = make_foreign(std::move(new_pr)); } } catch (...) { ex = std::current_exception(); } if (ex) { co_await underlying_reader.close(); std::rethrow_exception(std::move(ex)); } auto rreader = make_foreign(std::make_unique(evictable_reader::auto_pause::yes, std::move(ms), std::move(underlying_reader), s, std::move(permit), *_pr, _ps, _trace_state, _fwd_mr)); try { tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id()); auto res = co_await fill_reader_buffer(*rreader, hint); co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)}; } catch (...) { ex = std::current_exception(); } co_await rreader->close(); std::rethrow_exception(std::move(ex)); })); _reader = std::move(res.reader); co_return std::move(res.result); } else { co_return co_await smp::submit_to(_shard, coroutine::lambda([this, hint] () -> future { return fill_reader_buffer(*_reader, hint); })); } }); reserve_additional(res.buffer->size()); for (const auto& mf : *res.buffer) { push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); co_await coroutine::maybe_yield(); } _end_of_stream = res.end_of_stream; } future<> shard_reader::fill_buffer(std::optional hint) { // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. reader_permit::awaits_guard guard(_permit); if (_read_ahead) { co_await *std::exchange(_read_ahead, std::nullopt); co_return; } if (!is_buffer_empty()) { co_return; } co_await do_fill_buffer(hint); } future<> shard_reader::next_partition() { if (!_reader) { co_return; } // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. reader_permit::awaits_guard guard(_permit); if (_read_ahead) { co_await *std::exchange(_read_ahead, std::nullopt); } clear_buffer_to_next_partition(); if (!is_buffer_empty()) { co_return; } co_return co_await smp::submit_to(_shard, [this] { return _reader->next_partition(); }); } future<> shard_reader::fast_forward_to(const dht::partition_range& pr) { if (!_reader && !_read_ahead) { // No need to fast-forward uncreated readers, they will be passed the new // range when created. _pr = make_foreign(make_lw_shared(pr)); co_return; } reader_permit::awaits_guard guard(_permit); if (_read_ahead) { co_await *std::exchange(_read_ahead, std::nullopt); } _end_of_stream = false; clear_buffer(); _pr = co_await smp::submit_to(_shard, [this, &pr] () -> future>> { auto new_pr = make_lw_shared(pr); co_await _reader->fast_forward_to(*new_pr); _lifecycle_policy->update_read_range(new_pr); co_return make_foreign(std::move(new_pr)); }); } future<> shard_reader::fast_forward_to(position_range) { return make_exception_future<>(make_backtraced_exception_ptr()); } void shard_reader::read_ahead() { if (_read_ahead || is_end_of_stream() || !is_buffer_empty()) { return; } _read_ahead.emplace(do_fill_buffer(std::nullopt)); } } // anonymous namespace // See make_multishard_combining_reader() for description. class multishard_combining_reader : public mutation_reader::impl { struct shard_and_token { shard_id shard; dht::token token; bool operator<(const shard_and_token& o) const { // Reversed, as we want a min-heap. return token > o.token; } }; std::any _keep_alive_sharder; const dht::sharder& _sharder; std::vector> _shard_readers; // Contains the position of each shard with token granularity, organized // into a min-heap. Used to select the shard with the smallest token each // time a shard reader produces a new partition. std::vector _shard_selection_min_heap; unsigned _current_shard; bool _crossed_shards; unsigned _concurrency = 1; multishard_reader_buffer_hint _buffer_hint = multishard_reader_buffer_hint::no; read_ahead _read_ahead = read_ahead::yes; void on_partition_range_change(const dht::partition_range& pr); bool maybe_move_to_next_shard(const dht::token* const t = nullptr); future<> handle_empty_reader_buffer(); public: multishard_combining_reader( const dht::sharder& sharder, std::any keep_alive_sharder, shared_ptr lifecycle_policy, schema_ptr s, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr, multishard_reader_buffer_hint buffer_hint, read_ahead read_ahead); // this is captured. multishard_combining_reader(const multishard_combining_reader&) = delete; multishard_combining_reader& operator=(const multishard_combining_reader&) = delete; multishard_combining_reader(multishard_combining_reader&&) = delete; multishard_combining_reader& operator=(multishard_combining_reader&&) = delete; virtual future<> fill_buffer() override; virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr) override; virtual future<> fast_forward_to(position_range pr) override; virtual future<> close() noexcept override; }; void multishard_combining_reader::on_partition_range_change(const dht::partition_range& pr) { _shard_selection_min_heap.clear(); _shard_selection_min_heap.reserve(_sharder.shard_count()); auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token(); _current_shard = _sharder.shard_for_reads(token); auto sharder = dht::ring_position_range_sharder(_sharder, pr); auto next = sharder.next(*_schema); // The first value of `next` is thrown away, as it is the ring range of the current shard. // We only want to do a full round, until we get back to the shard we started from (`_current_shard`). // We stop earlier if the sharder has no ranges for the remaining shards. for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) { _shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()}); std::ranges::push_heap(_shard_selection_min_heap, std::less{}); } } bool multishard_combining_reader::maybe_move_to_next_shard(const dht::token* const t) { if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) { return false; } std::ranges::pop_heap(_shard_selection_min_heap, std::less{}); const auto next_shard = _shard_selection_min_heap.back().shard; _shard_selection_min_heap.pop_back(); if (t) { _shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t}); std::ranges::push_heap(_shard_selection_min_heap, std::less{}); } _crossed_shards = true; _current_shard = next_shard; return true; } future<> multishard_combining_reader::handle_empty_reader_buffer() { auto& reader = *_shard_readers[_current_shard]; if (reader.is_end_of_stream()) { if (_shard_selection_min_heap.empty()) { _end_of_stream = true; } else { maybe_move_to_next_shard(); } return make_ready_future<>(); } else if (reader.is_read_ahead_in_progress()) { return reader.fill_buffer(); } else { // If we crossed shards and the next reader has an empty buffer we // double concurrency so the next time we cross shards we will have // more chances of hitting the reader's buffer. if (_crossed_shards && _read_ahead == read_ahead::yes) { _concurrency = std::min(_concurrency * 2, _sharder.shard_count()); // Read ahead shouldn't change the min selection heap so we work on a local copy. auto shard_selection_min_heap_copy = _shard_selection_min_heap; // If concurrency > 1 we kick-off concurrency-1 read-aheads in the // background. They will be brought to the foreground when we move // to their respective shard. for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) { std::ranges::pop_heap(shard_selection_min_heap_copy, std::less{}); const auto next_shard = shard_selection_min_heap_copy.back().shard; shard_selection_min_heap_copy.pop_back(); _shard_readers[next_shard]->read_ahead(); } } std::optional hint; if (_buffer_hint) { hint.emplace(max_buffer_size_in_bytes - buffer_size(), _shard_selection_min_heap.empty() ? dht::maximum_token() : _shard_selection_min_heap.front().token); } return reader.fill_buffer(std::move(hint)); } } multishard_combining_reader::multishard_combining_reader( const dht::sharder& sharder, std::any keep_alive_sharder, shared_ptr lifecycle_policy, schema_ptr s, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr, multishard_reader_buffer_hint buffer_hint, read_ahead read_ahead) : impl(std::move(s), std::move(permit)), _keep_alive_sharder(std::move(keep_alive_sharder)), _sharder(sharder), _buffer_hint(buffer_hint), _read_ahead(read_ahead) { // The permit of the multishard reader is destroyed after the permits of its child readers. // Therefore its semaphore resources won't be automatically released // until children acquire their own resources. // // This creates a dependency (an edge in the "resource allocation graph"), // where the semaphore used by the multishard reader depends on the semaphores used by children. // When such dependencies create a cycle, and permits are acquired by different reads // in just the right order, a deadlock will happen. // // One way to prevent the deadlock is to avoid the resource dependency by ensuring // that the resources of multishard reader are released before the children attempt to acquire theirs. // We do this here. _permit.release_base_resources(); on_partition_range_change(pr); _shard_readers.reserve(_sharder.shard_count()); for (unsigned i = 0; i < _sharder.shard_count(); ++i) { _shard_readers.emplace_back(std::make_unique(_schema, _permit, lifecycle_policy, i, pr, ps, trace_state, fwd_mr)); } } future<> multishard_combining_reader::fill_buffer() { _crossed_shards = false; return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] { auto& reader = *_shard_readers[_current_shard]; if (reader.is_buffer_empty()) { return handle_empty_reader_buffer(); } while (!reader.is_buffer_empty() && !is_buffer_full()) { if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) { return make_ready_future<>(); } push_mutation_fragment(reader.pop_mutation_fragment()); } return make_ready_future<>(); }); } future<> multishard_combining_reader::next_partition() { clear_buffer_to_next_partition(); if (is_buffer_empty()) { return _shard_readers[_current_shard]->next_partition(); } return make_ready_future<>(); } future<> multishard_combining_reader::fast_forward_to(const dht::partition_range& pr) { clear_buffer(); _end_of_stream = false; on_partition_range_change(pr); return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr& sr) { return sr->fast_forward_to(pr); }); } future<> multishard_combining_reader::fast_forward_to(position_range pr) { return make_exception_future<>(make_backtraced_exception_ptr()); } future<> multishard_combining_reader::close() noexcept { return parallel_for_each(_shard_readers, [] (std::unique_ptr& sr) { return sr->close(); }); } mutation_reader make_multishard_combining_reader( shared_ptr lifecycle_policy, schema_ptr schema, locator::effective_replication_map_ptr erm, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr, multishard_reader_buffer_hint buffer_hint, read_ahead read_ahead) { auto& sharder = erm->get_sharder(*schema); return make_mutation_reader(sharder, std::any(std::move(erm)), std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, std::move(trace_state), fwd_mr, buffer_hint, read_ahead); } mutation_reader make_multishard_combining_reader_for_tests( const dht::sharder& sharder, shared_ptr lifecycle_policy, schema_ptr schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr, multishard_reader_buffer_hint buffer_hint, read_ahead read_ahead) { return make_mutation_reader(sharder, std::any(), std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, std::move(trace_state), fwd_mr, buffer_hint, read_ahead); }