diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index cb35f015c2..1582d18e10 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -118,7 +118,6 @@ class read_context : public reader_lifecycle_policy { reader_state state = reader_state::inexistent; foreign_unique_ptr rparts; foreign_unique_ptr handle; - bool has_pending_next_partition = false; std::optional buffer; reader_meta() = default; @@ -338,7 +337,6 @@ void read_context::destroy_reader(shard_id shard, future reader_ if (rm.state == reader_state::used) { rm.state = reader_state::saving; rm.handle = std::move(reader.handle); - rm.has_pending_next_partition = reader.has_pending_next_partition; rm.buffer = std::move(reader.unconsumed_fragments); } else { mmq_log.warn( @@ -467,12 +465,6 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las return make_ready_future<>(); } - auto maybe_next_partition = make_ready_future<>(); - if (rm.has_pending_next_partition) { - maybe_next_partition = reader->next_partition(); - } - - return maybe_next_partition.then([this, query_uuid, query_ranges, &rm, &last_pkey, &last_ckey, gts = std::move(gts), &db, reader = std::move(reader)] () mutable { auto& buffer = *rm.buffer; const auto fragments = buffer.size(); const auto size_before = reader->buffer_size(); @@ -500,7 +492,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las db.get_stats().multishard_query_unpopped_fragments += fragments; db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before); - }); + return make_ready_future<>(); } catch (...) { // We don't want to fail a read just because of a failure to // save any of the readers. diff --git a/mutation_reader.cc b/mutation_reader.cc index 73a0778670..22c382bfbc 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -857,10 +857,6 @@ class foreign_reader : public flat_mutation_reader::impl { foreign_unique_ptr _reader; foreign_unique_ptr> _read_ahead_future; - // Set this flag when next_partition() is called. - // This pending call will be executed the next time we go to the remote - // reader (a fill_buffer() or a fast_forward_to() call). - bool _pending_next_partition = false; streamed_mutation::forwarding _fwd_sm; // Forward an operation to the reader on the remote shard. @@ -874,22 +870,15 @@ class foreign_reader : public flat_mutation_reader::impl { Result forward_operation(db::timeout_clock::time_point timeout, Operation op) { return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), read_ahead_future = std::exchange(_read_ahead_future, nullptr), - pending_next_partition = std::exchange(_pending_next_partition, false), timeout, op = std::move(op)] () mutable { auto exec_op_and_read_ahead = [=] () mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = reader->next_partition(); - } - return maybe_next_partition.then([=] () mutable { // Not really variadic, we expect 0 (void) or 1 parameter. return op().then([=] (auto... result) { auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer(timeout)); return make_ready_future>, decltype(result)...>>( std::tuple(make_foreign(std::move(f)), std::move(result)...)); }); - }); }; if (read_ahead_future) { return read_ahead_future->then(std::move(exec_op_and_read_ahead)); @@ -973,15 +962,16 @@ future<> foreign_reader::next_partition() { if (_fwd_sm == streamed_mutation::forwarding::yes) { clear_buffer(); _end_of_stream = false; - _pending_next_partition = true; } else { clear_buffer_to_next_partition(); - if (is_buffer_empty()) { - _end_of_stream = false; - _pending_next_partition = true; + if (!is_buffer_empty()) { + co_return; } + _end_of_stream = false; } - return make_ready_future<>(); + co_await forward_operation(db::no_timeout, [reader = _reader.get()] () { + return reader->next_partition(); + }); } future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -1062,7 +1052,6 @@ private: // differs from the original ones. std::optional _range_override; std::optional _slice_override; - bool _pending_next_partition = false; flat_mutation_reader_opt _reader; @@ -1476,35 +1465,26 @@ evictable_reader::~evictable_reader() { } future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) { - const auto pending_next_partition = std::exchange(_pending_next_partition, false); - if (pending_next_partition) { - _next_position_in_partition = position_in_partition::for_partition_start(); - } if (is_end_of_stream()) { return make_ready_future<>(); } - return do_with(resume_or_create_reader(), - [this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = reader.next_partition(); - } - return maybe_next_partition.then([this, timeout, &reader] { + return do_with(resume_or_create_reader(), [this, timeout] (flat_mutation_reader& reader) mutable { return fill_buffer(reader, timeout).then([this, &reader] { _end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty(); maybe_pause(std::move(reader)); }); - }); }); } future<> evictable_reader::next_partition() { + _next_position_in_partition = position_in_partition::for_partition_start(); clear_buffer_to_next_partition(); - if (is_buffer_empty()) { - _pending_next_partition = true; - _next_position_in_partition = position_in_partition::for_partition_start(); + if (!is_buffer_empty()) { + co_return; } - return make_ready_future<>(); + auto reader = resume_or_create_reader(); + co_await reader.next_partition(); + maybe_pause(std::move(reader)); } future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -1584,7 +1564,6 @@ private: const io_priority_class& _pc; tracing::global_trace_state_ptr _trace_state; const mutation_reader::forwarding _fwd_mr; - bool _pending_next_partition = false; bool _stopped = false; std::optional> _read_ahead; foreign_ptr> _reader; @@ -1662,14 +1641,13 @@ void shard_reader::stop() noexcept { for (const auto& mf : *remote_buffer) { buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. } - return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition}; + return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)}; }); }).finally([zis = shared_from_this()] {})); } future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { auto fill_buf_fut = make_ready_future(); - const auto pending_next_partition = std::exchange(_pending_next_partition, false); struct reader_and_buffer_fill_result { foreign_ptr> reader; @@ -1703,16 +1681,10 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { return std::move(res.result); }); } else { - fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = _reader->next_partition(); - } - return maybe_next_partition.then([this, timeout] { + fill_buf_fut = smp::submit_to(_shard, [this, timeout] () mutable { return _reader->fill_buffer(timeout).then([this] { return remote_fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream()); }); - }); }); } @@ -1735,11 +1707,19 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) { } future<> shard_reader::next_partition() { - if (_reader) { + if (!_reader) { + co_return; + } + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + } clear_buffer_to_next_partition(); - _pending_next_partition = is_buffer_empty(); - } - return make_ready_future<>(); + if (!is_buffer_empty()) { + co_return; + } + co_return co_await smp::submit_to(_shard, [this] { + return _reader->next_partition(); + }); } future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { diff --git a/mutation_reader.hh b/mutation_reader.hh index 40245744af..281e68e490 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -438,7 +438,6 @@ public: struct stopped_reader { foreign_ptr> handle; flat_mutation_reader::tracked_buffer unconsumed_fragments; - bool has_pending_next_partition; }; protected: