From 81da6b756fddfa549269f3514dd2456e6df92edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Jan 2021 16:35:55 +0200 Subject: [PATCH 1/4] mutation_reader: foreign_reader: remove next_partition() workaround `next_partition()` now returns a future<>, so we can forward it to the remote shard in the scope of the next partition call, remove the now obsolete workaround for the synchronous next partition. --- mutation_reader.cc | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 73a0778670..7765421617 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -857,10 +857,6 @@ class foreign_reader : public flat_mutation_reader::impl { foreign_unique_ptr _reader; foreign_unique_ptr> _read_ahead_future; - // Set this flag when next_partition() is called. - // This pending call will be executed the next time we go to the remote - // reader (a fill_buffer() or a fast_forward_to() call). - bool _pending_next_partition = false; streamed_mutation::forwarding _fwd_sm; // Forward an operation to the reader on the remote shard. @@ -874,22 +870,15 @@ class foreign_reader : public flat_mutation_reader::impl { Result forward_operation(db::timeout_clock::time_point timeout, Operation op) { return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), read_ahead_future = std::exchange(_read_ahead_future, nullptr), - pending_next_partition = std::exchange(_pending_next_partition, false), timeout, op = std::move(op)] () mutable { auto exec_op_and_read_ahead = [=] () mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = reader->next_partition(); - } - return maybe_next_partition.then([=] () mutable { // Not really variadic, we expect 0 (void) or 1 parameter. return op().then([=] (auto... result) { auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer(timeout)); return make_ready_future>, decltype(result)...>>( std::tuple(make_foreign(std::move(f)), std::move(result)...)); }); - }); }; if (read_ahead_future) { return read_ahead_future->then(std::move(exec_op_and_read_ahead)); @@ -973,15 +962,16 @@ future<> foreign_reader::next_partition() { if (_fwd_sm == streamed_mutation::forwarding::yes) { clear_buffer(); _end_of_stream = false; - _pending_next_partition = true; } else { clear_buffer_to_next_partition(); - if (is_buffer_empty()) { - _end_of_stream = false; - _pending_next_partition = true; + if (!is_buffer_empty()) { + co_return; } + _end_of_stream = false; } - return make_ready_future<>(); + co_await forward_operation(db::no_timeout, [reader = _reader.get()] () { + return reader->next_partition(); + }); } future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { From febd2feb4cf1b7250e108fd3633263e839a99352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Jan 2021 17:10:01 +0200 Subject: [PATCH 2/4] mutation_reader: shard_reader: remove next_partition() workaround `next_partition()` now returns a future<>, so we can forward it to the remote shard in the scope of the next partition call, remove the now obsolete workaround for the synchronous next partition. --- mutation_reader.cc | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 7765421617..ee51e741b1 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1574,7 +1574,6 @@ private: const io_priority_class& _pc; tracing::global_trace_state_ptr _trace_state; const mutation_reader::forwarding _fwd_mr; - bool _pending_next_partition = false; bool _stopped = false; std::optional> _read_ahead; foreign_ptr> _reader; @@ -1652,14 +1651,13 @@ void shard_reader::stop() noexcept { for (const auto& mf : *remote_buffer) { buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. } - return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition}; + return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), false}; }); }).finally([zis = shared_from_this()] {})); } future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { auto fill_buf_fut = make_ready_future(); - const auto pending_next_partition = std::exchange(_pending_next_partition, false); struct reader_and_buffer_fill_result { foreign_ptr> reader; @@ -1693,16 +1691,10 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { return std::move(res.result); }); } else { - fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = _reader->next_partition(); - } - return maybe_next_partition.then([this, timeout] { + fill_buf_fut = smp::submit_to(_shard, [this, timeout] () mutable { return _reader->fill_buffer(timeout).then([this] { return remote_fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream()); }); - }); }); } @@ -1725,11 +1717,19 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) { } future<> shard_reader::next_partition() { - if (_reader) { + if (!_reader) { + co_return; + } + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + } clear_buffer_to_next_partition(); - _pending_next_partition = is_buffer_empty(); - } - return make_ready_future<>(); + if (!is_buffer_empty()) { + co_return; + } + co_return co_await smp::submit_to(_shard, [this] { + return _reader->next_partition(); + }); } future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { From 4eb65b12a0f4015a8535876071146dc279337426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Jan 2021 17:15:27 +0200 Subject: [PATCH 3/4] mutation_reader: evictable_reader: remove next_partition() workaround `next_partition()` now returns a future<>, so we can forward it to the remote shard in the scope of the next partition call, remove the now obsolete workaround for the synchronous next partition. --- mutation_reader.cc | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index ee51e741b1..2963a74bfa 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1052,7 +1052,6 @@ private: // differs from the original ones. std::optional _range_override; std::optional _slice_override; - bool _pending_next_partition = false; flat_mutation_reader_opt _reader; @@ -1466,35 +1465,26 @@ evictable_reader::~evictable_reader() { } future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) { - const auto pending_next_partition = std::exchange(_pending_next_partition, false); - if (pending_next_partition) { - _next_position_in_partition = position_in_partition::for_partition_start(); - } if (is_end_of_stream()) { return make_ready_future<>(); } - return do_with(resume_or_create_reader(), - [this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable { - auto maybe_next_partition = make_ready_future<>(); - if (pending_next_partition) { - maybe_next_partition = reader.next_partition(); - } - return maybe_next_partition.then([this, timeout, &reader] { + return do_with(resume_or_create_reader(), [this, timeout] (flat_mutation_reader& reader) mutable { return fill_buffer(reader, timeout).then([this, &reader] { _end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty(); maybe_pause(std::move(reader)); }); - }); }); } future<> evictable_reader::next_partition() { + _next_position_in_partition = position_in_partition::for_partition_start(); clear_buffer_to_next_partition(); - if (is_buffer_empty()) { - _pending_next_partition = true; - _next_position_in_partition = position_in_partition::for_partition_start(); + if (!is_buffer_empty()) { + co_return; } - return make_ready_future<>(); + auto reader = resume_or_create_reader(); + co_await reader.next_partition(); + maybe_pause(std::move(reader)); } future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { From 226088d12e05335903a33242e7e4fd08e23152f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Jan 2021 18:02:48 +0200 Subject: [PATCH 4/4] mutation_reader: reader_lifecycle_policy::stopped_reader: drop pending_next_partition flag Its not used anymore. --- multishard_mutation_query.cc | 10 +--------- mutation_reader.cc | 2 +- mutation_reader.hh | 1 - 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index cb35f015c2..1582d18e10 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -118,7 +118,6 @@ class read_context : public reader_lifecycle_policy { reader_state state = reader_state::inexistent; foreign_unique_ptr rparts; foreign_unique_ptr handle; - bool has_pending_next_partition = false; std::optional buffer; reader_meta() = default; @@ -338,7 +337,6 @@ void read_context::destroy_reader(shard_id shard, future reader_ if (rm.state == reader_state::used) { rm.state = reader_state::saving; rm.handle = std::move(reader.handle); - rm.has_pending_next_partition = reader.has_pending_next_partition; rm.buffer = std::move(reader.unconsumed_fragments); } else { mmq_log.warn( @@ -467,12 +465,6 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las return make_ready_future<>(); } - auto maybe_next_partition = make_ready_future<>(); - if (rm.has_pending_next_partition) { - maybe_next_partition = reader->next_partition(); - } - - return maybe_next_partition.then([this, query_uuid, query_ranges, &rm, &last_pkey, &last_ckey, gts = std::move(gts), &db, reader = std::move(reader)] () mutable { auto& buffer = *rm.buffer; const auto fragments = buffer.size(); const auto size_before = reader->buffer_size(); @@ -500,7 +492,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las db.get_stats().multishard_query_unpopped_fragments += fragments; db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before); - }); + return make_ready_future<>(); } catch (...) { // We don't want to fail a read just because of a failure to // save any of the readers. diff --git a/mutation_reader.cc b/mutation_reader.cc index 2963a74bfa..22c382bfbc 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1641,7 +1641,7 @@ void shard_reader::stop() noexcept { for (const auto& mf : *remote_buffer) { buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. } - return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), false}; + return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)}; }); }).finally([zis = shared_from_this()] {})); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 40245744af..281e68e490 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -438,7 +438,6 @@ public: struct stopped_reader { foreign_ptr> handle; flat_mutation_reader::tracked_buffer unconsumed_fragments; - bool has_pending_next_partition; }; protected: