Merge "mutation readers: remove next_partition() workarounds" from Botond

"
`next_partition()` used to return void, so readers that had to call
future returning code had to work around this. Now that
`next_partition()` returns a future, we can get rid of these
workarounds.

Tests: unit(release, debug)
"

* 'next-partition-cross-shard-readers/v1' of https://github.com/denesb/scylla:
  mutation_reader: reader_lifecycle_policy::stopped_reader: drop pending_next_partition flag
  mutation_reader: evictable_reader: remove next_partition() workaround
  mutation_reader: shard_reader: remove next_partition() workaround
  mutation_reader: foreign_reader: remove next_partition() workaround
This commit is contained in:
Avi Kivity
2021-01-25 11:02:06 +02:00
3 changed files with 28 additions and 57 deletions

View File

@@ -118,7 +118,6 @@ class read_context : public reader_lifecycle_policy {
reader_state state = reader_state::inexistent;
foreign_unique_ptr<remote_parts> rparts;
foreign_unique_ptr<reader_concurrency_semaphore::inactive_read_handle> handle;
bool has_pending_next_partition = false;
std::optional<flat_mutation_reader::tracked_buffer> buffer;
reader_meta() = default;
@@ -338,7 +337,6 @@ void read_context::destroy_reader(shard_id shard, future<stopped_reader> reader_
if (rm.state == reader_state::used) {
rm.state = reader_state::saving;
rm.handle = std::move(reader.handle);
rm.has_pending_next_partition = reader.has_pending_next_partition;
rm.buffer = std::move(reader.unconsumed_fragments);
} else {
mmq_log.warn(
@@ -467,12 +465,6 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
return make_ready_future<>();
}
auto maybe_next_partition = make_ready_future<>();
if (rm.has_pending_next_partition) {
maybe_next_partition = reader->next_partition();
}
return maybe_next_partition.then([this, query_uuid, query_ranges, &rm, &last_pkey, &last_ckey, gts = std::move(gts), &db, reader = std::move(reader)] () mutable {
auto& buffer = *rm.buffer;
const auto fragments = buffer.size();
const auto size_before = reader->buffer_size();
@@ -500,7 +492,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
db.get_stats().multishard_query_unpopped_fragments += fragments;
db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before);
});
return make_ready_future<>();
} catch (...) {
// We don't want to fail a read just because of a failure to
// save any of the readers.

View File

@@ -857,10 +857,6 @@ class foreign_reader : public flat_mutation_reader::impl {
foreign_unique_ptr<flat_mutation_reader> _reader;
foreign_unique_ptr<future<>> _read_ahead_future;
// Set this flag when next_partition() is called.
// This pending call will be executed the next time we go to the remote
// reader (a fill_buffer() or a fast_forward_to() call).
bool _pending_next_partition = false;
streamed_mutation::forwarding _fwd_sm;
// Forward an operation to the reader on the remote shard.
@@ -874,22 +870,15 @@ class foreign_reader : public flat_mutation_reader::impl {
Result forward_operation(db::timeout_clock::time_point timeout, Operation op) {
return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(),
read_ahead_future = std::exchange(_read_ahead_future, nullptr),
pending_next_partition = std::exchange(_pending_next_partition, false),
timeout,
op = std::move(op)] () mutable {
auto exec_op_and_read_ahead = [=] () mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
maybe_next_partition = reader->next_partition();
}
return maybe_next_partition.then([=] () mutable {
// Not really variadic, we expect 0 (void) or 1 parameter.
return op().then([=] (auto... result) {
auto f = reader->is_end_of_stream() ? nullptr : std::make_unique<future<>>(reader->fill_buffer(timeout));
return make_ready_future<std::tuple<foreign_unique_ptr<future<>>, decltype(result)...>>(
std::tuple(make_foreign(std::move(f)), std::move(result)...));
});
});
};
if (read_ahead_future) {
return read_ahead_future->then(std::move(exec_op_and_read_ahead));
@@ -973,15 +962,16 @@ future<> foreign_reader::next_partition() {
if (_fwd_sm == streamed_mutation::forwarding::yes) {
clear_buffer();
_end_of_stream = false;
_pending_next_partition = true;
} else {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = false;
_pending_next_partition = true;
if (!is_buffer_empty()) {
co_return;
}
_end_of_stream = false;
}
return make_ready_future<>();
co_await forward_operation(db::no_timeout, [reader = _reader.get()] () {
return reader->next_partition();
});
}
future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -1062,7 +1052,6 @@ private:
// differs from the original ones.
std::optional<dht::partition_range> _range_override;
std::optional<query::partition_slice> _slice_override;
bool _pending_next_partition = false;
flat_mutation_reader_opt _reader;
@@ -1476,35 +1465,26 @@ evictable_reader::~evictable_reader() {
}
future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
if (pending_next_partition) {
_next_position_in_partition = position_in_partition::for_partition_start();
}
if (is_end_of_stream()) {
return make_ready_future<>();
}
return do_with(resume_or_create_reader(),
[this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
maybe_next_partition = reader.next_partition();
}
return maybe_next_partition.then([this, timeout, &reader] {
return do_with(resume_or_create_reader(), [this, timeout] (flat_mutation_reader& reader) mutable {
return fill_buffer(reader, timeout).then([this, &reader] {
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
maybe_pause(std::move(reader));
});
});
});
}
future<> evictable_reader::next_partition() {
_next_position_in_partition = position_in_partition::for_partition_start();
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_pending_next_partition = true;
_next_position_in_partition = position_in_partition::for_partition_start();
if (!is_buffer_empty()) {
co_return;
}
return make_ready_future<>();
auto reader = resume_or_create_reader();
co_await reader.next_partition();
maybe_pause(std::move(reader));
}
future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -1584,7 +1564,6 @@ private:
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
bool _pending_next_partition = false;
bool _stopped = false;
std::optional<future<>> _read_ahead;
foreign_ptr<std::unique_ptr<evictable_reader>> _reader;
@@ -1662,14 +1641,13 @@ void shard_reader::stop() noexcept {
for (const auto& mf : *remote_buffer) {
buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard.
}
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition};
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)};
});
}).finally([zis = shared_from_this()] {}));
}
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
auto fill_buf_fut = make_ready_future<remote_fill_buffer_result>();
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
struct reader_and_buffer_fill_result {
foreign_ptr<std::unique_ptr<evictable_reader>> reader;
@@ -1703,16 +1681,10 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
return std::move(res.result);
});
} else {
fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
maybe_next_partition = _reader->next_partition();
}
return maybe_next_partition.then([this, timeout] {
fill_buf_fut = smp::submit_to(_shard, [this, timeout] () mutable {
return _reader->fill_buffer(timeout).then([this] {
return remote_fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream());
});
});
});
}
@@ -1735,11 +1707,19 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) {
}
future<> shard_reader::next_partition() {
if (_reader) {
if (!_reader) {
co_return;
}
if (_read_ahead) {
co_await *std::exchange(_read_ahead, std::nullopt);
}
clear_buffer_to_next_partition();
_pending_next_partition = is_buffer_empty();
}
return make_ready_future<>();
if (!is_buffer_empty()) {
co_return;
}
co_return co_await smp::submit_to(_shard, [this] {
return _reader->next_partition();
});
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {

View File

@@ -438,7 +438,6 @@ public:
struct stopped_reader {
foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> handle;
flat_mutation_reader::tracked_buffer unconsumed_fragments;
bool has_pending_next_partition;
};
protected: