Files
scylladb/mutation_reader.cc
2022-03-30 15:42:51 +03:00

1707 lines
64 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <boost/range/algorithm/heap_algorithm.hpp>
#include <boost/range/algorithm/reverse.hpp>
#include <boost/move/iterator.hpp>
#include <variant>
#include <seastar/core/future-util.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/closeable.hh>
#include "mutation_reader.hh"
#include "readers/flat_mutation_reader.hh"
#include "readers/empty.hh"
#include "schema_registry.hh"
#include "mutation_compactor.hh"
#include "dht/sharder.hh"
#include "readers/empty_v2.hh"
#include "readers/combined.hh"
logging::logger mrlog("mutation_reader");
snapshot_source make_empty_snapshot_source() {
return snapshot_source([] {
return make_empty_mutation_source();
});
}
mutation_source make_empty_mutation_source() {
return mutation_source([](schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) {
return make_empty_flat_reader(s, std::move(permit));
}, [] {
return [] (const dht::decorated_key& key) {
return partition_presence_checker_result::definitely_doesnt_exist;
};
});
}
mutation_source make_combined_mutation_source(std::vector<mutation_source> addends) {
return mutation_source([addends = std::move(addends)] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader_v2> rd;
rd.reserve(addends.size());
for (auto&& ms : addends) {
rd.emplace_back(ms.make_reader_v2(s, permit, pr, slice, pc, tr, fwd_sm, fwd_mr));
}
return make_combined_reader(s, std::move(permit), std::move(rd), fwd_sm, fwd_mr);
});
}
namespace {
struct remote_fill_buffer_result_v2 {
foreign_ptr<std::unique_ptr<const flat_mutation_reader_v2::tracked_buffer>> buffer;
bool end_of_stream = false;
remote_fill_buffer_result_v2() = default;
remote_fill_buffer_result_v2(flat_mutation_reader_v2::tracked_buffer&& buffer, bool end_of_stream)
: buffer(make_foreign(std::make_unique<const flat_mutation_reader_v2::tracked_buffer>(std::move(buffer))))
, end_of_stream(end_of_stream) {
}
};
}
/// See make_foreign_reader() for description.
class foreign_reader : public flat_mutation_reader_v2::impl {
template <typename T>
using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
using fragment_buffer = flat_mutation_reader_v2::tracked_buffer;
foreign_unique_ptr<flat_mutation_reader_v2> _reader;
foreign_unique_ptr<future<>> _read_ahead_future;
streamed_mutation::forwarding _fwd_sm;
// Forward an operation to the reader on the remote shard.
// If the remote reader has an ongoing read-ahead, bring it to the
// foreground (wait on it) and execute the operation after.
// After the operation completes, kick off a new read-ahead (fill_buffer())
// and move it to the background (save it's future but don't wait on it
// now). If all works well read-aheads complete by the next operation and
// we don't have to wait on the remote reader filling its buffer.
template <typename Operation, typename Result = futurize_t<std::result_of_t<Operation()>>>
Result forward_operation(Operation op) {
reader_permit::blocked_guard bg{_permit};
return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(),
read_ahead_future = std::exchange(_read_ahead_future, nullptr),
op = std::move(op)] () mutable {
auto exec_op_and_read_ahead = [=] () mutable {
// Not really variadic, we expect 0 (void) or 1 parameter.
return op().then([=] (auto... result) {
auto f = reader->is_end_of_stream() ? nullptr : std::make_unique<future<>>(reader->fill_buffer());
return make_ready_future<std::tuple<foreign_unique_ptr<future<>>, decltype(result)...>>(
std::tuple(make_foreign(std::move(f)), std::move(result)...));
});
};
if (read_ahead_future) {
return read_ahead_future->then(std::move(exec_op_and_read_ahead));
} else {
return exec_op_and_read_ahead();
}
}).then([this] (auto fut_and_result) {
_read_ahead_future = std::get<0>(std::move(fut_and_result));
static_assert(std::tuple_size<decltype(fut_and_result)>::value <= 2);
if constexpr (std::tuple_size<decltype(fut_and_result)>::value == 1) {
return make_ready_future<>();
} else {
auto result = std::get<1>(std::move(fut_and_result));
return make_ready_future<decltype(result)>(std::move(result));
}
}).finally([bg = std::move(bg)] { });
}
public:
foreign_reader(schema_ptr schema,
reader_permit permit,
foreign_unique_ptr<flat_mutation_reader_v2> reader,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
// this is captured.
foreign_reader(const foreign_reader&) = delete;
foreign_reader& operator=(const foreign_reader&) = delete;
foreign_reader(foreign_reader&&) = delete;
foreign_reader& operator=(foreign_reader&&) = delete;
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
virtual future<> fast_forward_to(position_range pr) override;
virtual future<> close() noexcept override;
};
foreign_reader::foreign_reader(schema_ptr schema,
reader_permit permit,
foreign_unique_ptr<flat_mutation_reader_v2> reader,
streamed_mutation::forwarding fwd_sm)
: impl(std::move(schema), std::move(permit))
, _reader(std::move(reader))
, _fwd_sm(fwd_sm) {
}
future<> foreign_reader::fill_buffer() {
if (_end_of_stream || is_buffer_full()) {
return make_ready_future();
}
return forward_operation([reader = _reader.get()] () {
auto f = reader->is_buffer_empty() ? reader->fill_buffer() : make_ready_future<>();
return f.then([=] {
return make_ready_future<remote_fill_buffer_result_v2>(remote_fill_buffer_result_v2(reader->detach_buffer(), reader->is_end_of_stream()));
});
}).then([this] (remote_fill_buffer_result_v2 res) mutable {
_end_of_stream = res.end_of_stream;
for (const auto& mf : *res.buffer) {
// Need a copy since the mf is on the remote shard.
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf));
}
});
}
future<> foreign_reader::next_partition() {
if (_fwd_sm == streamed_mutation::forwarding::yes) {
clear_buffer();
_end_of_stream = false;
} else {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
co_return;
}
_end_of_stream = false;
}
co_await forward_operation([reader = _reader.get()] () {
return reader->next_partition();
});
}
future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) {
clear_buffer();
_end_of_stream = false;
return forward_operation([reader = _reader.get(), &pr] () {
return reader->fast_forward_to(pr);
});
}
future<> foreign_reader::fast_forward_to(position_range pr) {
forward_buffer_to(pr.start());
_end_of_stream = false;
return forward_operation([reader = _reader.get(), pr = std::move(pr)] () {
return reader->fast_forward_to(std::move(pr));
});
}
future<> foreign_reader::close() noexcept {
if (!_reader) {
if (_read_ahead_future) {
on_internal_error_noexcept(mrlog, "foreign_reader::close can't wait on read_ahead future with disengaged reader");
}
return make_ready_future<>();
}
return smp::submit_to(_reader.get_owner_shard(),
[reader = std::move(_reader), read_ahead_future = std::exchange(_read_ahead_future, nullptr)] () mutable {
auto read_ahead = read_ahead_future ? std::move(*read_ahead_future.get()) : make_ready_future<>();
return read_ahead.then_wrapped([reader = std::move(reader)] (future<> f) mutable {
if (f.failed()) {
auto ex = f.get_exception();
mrlog.warn("foreign_reader: benign read_ahead failure during close: {}. Ignoring.", ex);
}
return reader->close();
});
});
}
flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema,
reader_permit permit,
foreign_ptr<std::unique_ptr<flat_mutation_reader_v2>> reader,
streamed_mutation::forwarding fwd_sm) {
if (reader.get_owner_shard() == this_shard_id()) {
return std::move(*reader);
}
return make_flat_mutation_reader_v2<foreign_reader>(std::move(schema), std::move(permit), std::move(reader), fwd_sm);
}
template <typename... Arg>
static void require(bool condition, const char* msg, const Arg&... arg) {
if (!condition) {
on_internal_error(mrlog, format(msg, arg...));
}
}
// Encapsulates all data and logic that is local to the remote shard the
// reader lives on.
class evictable_reader_v2 : public flat_mutation_reader_v2::impl {
public:
using auto_pause = bool_class<class auto_pause_tag>;
private:
auto_pause _auto_pause;
mutation_source _ms;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
reader_concurrency_semaphore::inactive_read_handle _irh;
bool _reader_recreated = false; // set if reader was recreated since last operation
position_in_partition::tri_compare _tri_cmp;
std::optional<dht::decorated_key> _last_pkey;
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
// These are used when the reader has to be recreated (after having been
// evicted while paused) and the range and/or slice it is recreated with
// differs from the original ones.
std::optional<dht::partition_range> _range_override;
std::optional<query::partition_slice> _slice_override;
flat_mutation_reader_v2_opt _reader;
private:
void do_pause(flat_mutation_reader_v2 reader);
void maybe_pause(flat_mutation_reader_v2 reader);
flat_mutation_reader_v2_opt try_resume();
void update_next_position();
void adjust_partition_slice();
flat_mutation_reader_v2 recreate_reader();
future<flat_mutation_reader_v2> resume_or_create_reader();
void validate_partition_start(const partition_start& ps);
void validate_position_in_partition(position_in_partition_view pos) const;
void examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3);
public:
evictable_reader_v2(
auto_pause ap,
mutation_source ms,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
virtual future<> fast_forward_to(position_range) override {
throw_with_backtrace<std::bad_function_call>();
}
virtual future<> close() noexcept override {
if (_reader) {
return _reader->close();
}
if (auto reader_opt = try_resume()) {
return reader_opt->close();
}
return make_ready_future<>();
}
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
return std::move(_irh);
}
void pause() {
if (_reader) {
do_pause(std::move(*_reader));
}
}
reader_permit permit() {
return _permit;
}
};
void evictable_reader_v2::do_pause(flat_mutation_reader_v2 reader) {
assert(!_irh);
_irh = _permit.semaphore().register_inactive_read(std::move(reader));
}
void evictable_reader_v2::maybe_pause(flat_mutation_reader_v2 reader) {
if (_auto_pause) {
do_pause(std::move(reader));
} else {
_reader = std::move(reader);
}
}
flat_mutation_reader_v2_opt evictable_reader_v2::try_resume() {
if (auto reader_opt = _permit.semaphore().unregister_inactive_read(std::move(_irh))) {
return std::move(*reader_opt);
}
return {};
}
void evictable_reader_v2::update_next_position() {
if (is_buffer_empty()) {
return;
}
auto rbegin = std::reverse_iterator(buffer().end());
auto rend = std::reverse_iterator(buffer().begin());
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment_v2::is_partition_start)); pk_it != rend) {
_last_pkey = pk_it->as_partition_start().key();
}
const auto last_pos = buffer().back().position();
switch (last_pos.region()) {
case partition_region::partition_start:
_next_position_in_partition = position_in_partition::for_static_row();
break;
case partition_region::static_row:
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
break;
case partition_region::clustered:
if (!_reader->is_buffer_empty() && _reader->peek_buffer().is_end_of_partition()) {
push_mutation_fragment(_reader->pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition::after_key(last_pos);
}
break;
case partition_region::partition_end:
_next_position_in_partition = position_in_partition::for_partition_start();
break;
}
}
void evictable_reader_v2::adjust_partition_slice() {
const auto reversed = _ps.options.contains(query::partition_slice::option::reversed);
_slice_override = reversed ? query::legacy_reverse_slice_to_native_reverse_slice(*_schema, _ps) : _ps;
auto ranges = _slice_override->default_row_ranges();
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
_slice_override->clear_ranges();
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
if (reversed) {
_slice_override = query::native_reverse_slice_to_legacy_reverse_slice(*_schema, std::move(*_slice_override));
}
}
flat_mutation_reader_v2 evictable_reader_v2::recreate_reader() {
const dht::partition_range* range = _pr;
const query::partition_slice* slice = &_ps;
_range_override.reset();
_slice_override.reset();
if (_last_pkey) {
bool partition_range_is_inclusive = true;
switch (_next_position_in_partition.region()) {
case partition_region::partition_start:
partition_range_is_inclusive = false;
break;
case partition_region::static_row:
break;
case partition_region::clustered:
adjust_partition_slice();
slice = &*_slice_override;
break;
case partition_region::partition_end:
partition_range_is_inclusive = false;
break;
}
// The original range contained a single partition and we've read it
// all. We'd have to create a reader with an empty range that would
// immediately be at EOS. This is not possible so just create an empty
// reader instead.
// This should be extremely rare (who'd create a multishard reader to
// read a single partition) but still, let's make sure we handle it
// correctly.
if (_pr->is_singular() && !partition_range_is_inclusive) {
return make_empty_flat_reader_v2(_schema, _permit);
}
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
range = &*_range_override;
_reader_recreated = true;
}
return _ms.make_reader_v2(
_schema,
_permit,
*range,
*slice,
_pc,
_trace_state,
streamed_mutation::forwarding::no,
_fwd_mr);
}
future<flat_mutation_reader_v2> evictable_reader_v2::resume_or_create_reader() {
if (_reader) {
co_return std::move(*_reader);
}
if (auto reader_opt = try_resume()) {
co_return std::move(*reader_opt);
}
// When the reader is created the first time and we are actually resuming a
// saved reader in `recreate_reader()`, we have two cases here:
// * the reader is still alive (in inactive state)
// * the reader was evicted
// We check for this below with `needs_readmission()` and it is very
// important to not allow for preemption between said check and
// `recreate_reader()`, otherwise the reader might be evicted between the
// check and `recreate_reader()` and the latter will recreate it without
// waiting for re-admission.
if (_permit.needs_readmission()) {
co_await _permit.wait_readmission();
}
co_return recreate_reader();
}
void evictable_reader_v2::validate_partition_start(const partition_start& ps) {
const auto tri_cmp = dht::ring_position_comparator(*_schema);
// If we recreated the reader after fast-forwarding it we won't have
// _last_pkey set. In this case it is enough to check if the partition
// is in range.
if (_last_pkey) {
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
if (_next_position_in_partition.region() != partition_region::partition_start) { // we expect to continue from the same partition
// We cannot assume the partition we stopped the read at is still alive
// when we recreate the reader. It might have been compacted away in the
// meanwhile, so allow for a larger partition too.
require(
cmp_res <= 0,
"{}(): validation failed, expected partition with key larger or equal to _last_pkey {}, but got {}",
__FUNCTION__,
*_last_pkey,
ps.key());
// Reset next pos if we are not continuing from the same partition
if (cmp_res < 0) {
// Close previous partition, we are not going to continue it.
push_mutation_fragment(*_schema, _permit, partition_end{});
_next_position_in_partition = position_in_partition::for_partition_start();
}
} else { // should be a larger partition
require(
cmp_res < 0,
"{}(): validation failed, expected partition with key larger than _last_pkey {}, but got {}",
__FUNCTION__,
*_last_pkey,
ps.key());
}
}
const auto& prange = _range_override ? *_range_override : *_pr;
require(
// TODO: somehow avoid this copy
prange.contains(ps.key(), tri_cmp),
"{}(): validation failed, expected partition with key that falls into current range {}, but got {}",
__FUNCTION__,
prange,
ps.key());
}
void evictable_reader_v2::validate_position_in_partition(position_in_partition_view pos) const {
require(
_tri_cmp(_next_position_in_partition, pos) <= 0,
"{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}",
__FUNCTION__,
_next_position_in_partition,
pos);
if (_slice_override && pos.region() == partition_region::clustered) {
const auto reversed = _ps.options.contains(query::partition_slice::option::reversed);
std::optional<query::partition_slice> native_slice;
if (reversed) {
native_slice = query::legacy_reverse_slice_to_native_reverse_slice(*_schema, *_slice_override);
}
auto& slice = reversed ? *native_slice : *_slice_override;
const auto ranges = slice.row_ranges(*_schema, _last_pkey->key());
const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) {
// TODO: somehow avoid this copy
auto range = position_range(cr);
// We cannot use range.contains() because that treats range as a
// [a, b) range, meaning a range tombstone change with position
// after_key(b) will be considered outside of it. Such range
// tombstone changes can be emitted however when recreating the
// reader on clustering range edge.
return _tri_cmp(range.start(), pos) <= 0 && _tri_cmp(pos, range.end()) <= 0;
});
require(
any_contains,
"{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}",
__FUNCTION__,
slice,
pos);
}
}
void evictable_reader_v2::examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3) {
if (!mf1) {
return; // the reader is at EOS
}
// If engaged, the first fragment is always a partition-start.
validate_partition_start(mf1->as_partition_start());
if (_tri_cmp(mf1->position(), _next_position_in_partition) < 0) {
mf1 = {}; // drop mf1
}
const auto continue_same_partition = _next_position_in_partition.region() != partition_region::partition_start;
// If we have a first fragment, we are guaranteed to have a second one -- if not else, a partition-end.
if (mf2->is_end_of_partition()) {
return; // no further fragments, nothing to do
}
// We want to validate the position of the first non-dropped fragment.
// If mf2 is a static row and we need to drop it, this will be mf3.
if (mf2->is_static_row() && _tri_cmp(mf2->position(), _next_position_in_partition) < 0) {
mf2 = {}; // drop mf2
} else {
if (continue_same_partition) {
validate_position_in_partition(mf2->position());
}
return;
}
if (mf3->is_end_of_partition()) {
return; // no further fragments, nothing to do
} else if (continue_same_partition) {
validate_position_in_partition(mf3->position());
}
}
evictable_reader_v2::evictable_reader_v2(
auto_pause ap,
mutation_source ms,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(schema), std::move(permit))
, _auto_pause(ap)
, _ms(std::move(ms))
, _pr(&pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _tri_cmp(*_schema) {
}
future<> evictable_reader_v2::fill_buffer() {
if (is_end_of_stream()) {
co_return;
}
_reader = co_await resume_or_create_reader();
if (_reader_recreated) {
// Recreating the reader breaks snapshot isolation and creates all sorts
// of complications around the continuity of range tombstone changes,
// e.g. a range tombstone started by the previous reader object
// might not exist anymore with the new reader object.
// To avoid complications we reset the tombstone state on each reader
// recreation by emitting a null tombstone change, if we read at least
// one clustering fragment from the partition.
if (_next_position_in_partition.region() == partition_region::clustered
&& _tri_cmp(_next_position_in_partition, position_in_partition::before_all_clustered_rows()) > 0) {
push_mutation_fragment(*_schema, _permit, range_tombstone_change{position_in_partition_view::before_key(_next_position_in_partition), {}});
}
auto mf1 = co_await (*_reader)();
auto mf2 = co_await (*_reader)();
auto mf3 = co_await (*_reader)();
examine_first_fragments(mf1, mf2, mf3);
if (mf3) {
_reader->unpop_mutation_fragment(std::move(*mf3));
}
if (mf2) {
_reader->unpop_mutation_fragment(std::move(*mf2));
}
if (mf1) {
_reader->unpop_mutation_fragment(std::move(*mf1));
}
_reader_recreated = false;
} else {
co_await _reader->fill_buffer();
}
_reader->move_buffer_content_to(*this);
// Ensure that each buffer represents forward progress. Only a concern when
// the last fragment in the buffer is range tombstone change. In this case
// ensure that:
// * buffer().back().position() > _next_position_in_partition;
// * _reader.peek()->position() > buffer().back().position();
if (!is_buffer_empty() && buffer().back().is_range_tombstone_change()) {
auto* next_mf = co_await _reader->peek();
// First make sure we've made progress w.r.t. _next_position_in_partition.
while (next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) {
push_mutation_fragment(_reader->pop_mutation_fragment());
next_mf = co_await _reader->peek();
}
const auto last_pos = position_in_partition(buffer().back().position());
while (next_mf && _tri_cmp(last_pos, next_mf->position()) == 0) {
push_mutation_fragment(_reader->pop_mutation_fragment());
next_mf = co_await _reader->peek();
}
}
update_next_position();
_end_of_stream = _reader->is_end_of_stream();
maybe_pause(std::move(*_reader));
}
future<> evictable_reader_v2::next_partition() {
_next_position_in_partition = position_in_partition::for_partition_start();
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
co_return;
}
auto reader = co_await resume_or_create_reader();
co_await reader.next_partition();
maybe_pause(std::move(reader));
}
future<> evictable_reader_v2::fast_forward_to(const dht::partition_range& pr) {
_pr = &pr;
_last_pkey.reset();
_next_position_in_partition = position_in_partition::for_partition_start();
clear_buffer();
_end_of_stream = false;
if (_reader) {
co_await _reader->fast_forward_to(pr);
_range_override.reset();
co_return;
}
if (auto reader_opt = try_resume()) {
co_await reader_opt->fast_forward_to(pr);
_range_override.reset();
maybe_pause(std::move(*reader_opt));
}
}
evictable_reader_handle_v2::evictable_reader_handle_v2(evictable_reader_v2& r) : _r(&r)
{ }
void evictable_reader_handle_v2::evictable_reader_handle_v2::pause() {
_r->pause();
}
flat_mutation_reader_v2 make_auto_paused_evictable_reader_v2(
mutation_source ms,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
return make_flat_mutation_reader_v2<evictable_reader_v2>(evictable_reader_v2::auto_pause::yes, std::move(ms), std::move(schema), std::move(permit), pr, ps,
pc, std::move(trace_state), fwd_mr);
}
std::pair<flat_mutation_reader_v2, evictable_reader_handle_v2> make_manually_paused_evictable_reader_v2(
mutation_source ms,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
auto reader = std::make_unique<evictable_reader_v2>(evictable_reader_v2::auto_pause::no, std::move(ms), std::move(schema), std::move(permit), pr, ps,
pc, std::move(trace_state), fwd_mr);
auto handle = evictable_reader_handle_v2(*reader.get());
return std::pair(flat_mutation_reader_v2(std::move(reader)), handle);
}
namespace {
// A special-purpose shard reader.
//
// Shard reader manages a reader located on a remote shard. It transparently
// supports read-ahead (background fill_buffer() calls).
// This reader is not for general use, it was designed to serve the
// multishard_combining_reader.
// Although it implements the flat_mutation_reader_v2:impl interface it cannot be
// wrapped into a flat_mutation_reader_v2, as it needs to be managed by a shared
// pointer.
class shard_reader_v2 : public flat_mutation_reader_v2::impl {
private:
shared_ptr<reader_lifecycle_policy_v2> _lifecycle_policy;
const unsigned _shard;
foreign_ptr<lw_shared_ptr<const dht::partition_range>> _pr;
const query::partition_slice& _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
std::optional<future<>> _read_ahead;
foreign_ptr<std::unique_ptr<evictable_reader_v2>> _reader;
private:
future<> do_fill_buffer();
public:
shard_reader_v2(
schema_ptr schema,
reader_permit permit,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
unsigned shard,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(schema), std::move(permit))
, _lifecycle_policy(std::move(lifecycle_policy))
, _shard(shard)
, _pr(make_foreign(make_lw_shared<const dht::partition_range>(pr)))
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr) {
}
shard_reader_v2(shard_reader_v2&&) = delete;
shard_reader_v2& operator=(shard_reader_v2&&) = delete;
shard_reader_v2(const shard_reader_v2&) = delete;
shard_reader_v2& operator=(const shard_reader_v2&) = delete;
const mutation_fragment_v2& peek_buffer() const {
return buffer().front();
}
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
virtual future<> fast_forward_to(position_range) override;
virtual future<> close() noexcept override;
bool done() const {
return _reader && is_buffer_empty() && is_end_of_stream();
}
void read_ahead();
bool is_read_ahead_in_progress() const {
return _read_ahead.has_value();
}
};
future<> shard_reader_v2::close() noexcept {
if (_read_ahead) {
try {
co_await *std::exchange(_read_ahead, std::nullopt);
} catch (...) {
mrlog.warn("shard_reader::close(): read_ahead on shard {} failed: {}", _shard, std::current_exception());
}
}
try {
co_await smp::submit_to(_shard, [this] {
if (!_reader) {
return make_ready_future<>();
}
auto irh = std::move(*_reader).inactive_read_handle();
return with_closeable(flat_mutation_reader_v2(_reader.release()), [this] (flat_mutation_reader_v2& reader) mutable {
auto permit = reader.permit();
const auto& schema = *reader.schema();
auto unconsumed_fragments = reader.detach_buffer();
auto rit = std::reverse_iterator(buffer().cend());
auto rend = std::reverse_iterator(buffer().cbegin());
for (; rit != rend; ++rit) {
unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard.
}
return unconsumed_fragments;
}).then([this, irh = std::move(irh)] (flat_mutation_reader_v2::tracked_buffer&& buf) mutable {
return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)});
});
});
} catch (...) {
mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception());
}
}
future<> shard_reader_v2::do_fill_buffer() {
auto fill_buf_fut = make_ready_future<remote_fill_buffer_result_v2>();
struct reader_and_buffer_fill_result {
foreign_ptr<std::unique_ptr<evictable_reader_v2>> reader;
remote_fill_buffer_result_v2 result;
};
if (!_reader) {
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema)] () -> future<reader_and_buffer_fill_result> {
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr ts,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr);
});
auto s = gs.get();
auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout());
auto rreader = make_foreign(std::make_unique<evictable_reader_v2>(evictable_reader_v2::auto_pause::yes, std::move(ms),
s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr));
std::exception_ptr ex;
try {
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
reader_permit::used_guard ug{rreader->permit()};
co_await rreader->fill_buffer();
auto res = remote_fill_buffer_result_v2(rreader->detach_buffer(), rreader->is_end_of_stream());
co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)};
} catch (...) {
ex = std::current_exception();
}
co_await rreader->close();
std::rethrow_exception(std::move(ex));
}).then([this] (reader_and_buffer_fill_result res) {
_reader = std::move(res.reader);
return std::move(res.result);
});
} else {
fill_buf_fut = smp::submit_to(_shard, [this] () mutable {
reader_permit::used_guard ug{_reader->permit()};
return _reader->fill_buffer().then([this, ug = std::move(ug)] {
return remote_fill_buffer_result_v2(_reader->detach_buffer(), _reader->is_end_of_stream());
});
});
}
return fill_buf_fut.then([this] (remote_fill_buffer_result_v2 res) mutable {
_end_of_stream = res.end_of_stream;
for (const auto& mf : *res.buffer) {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf));
}
});
}
future<> shard_reader_v2::fill_buffer() {
// FIXME: want to move this to the inner scopes but it makes clang miscompile the code.
reader_permit::blocked_guard guard(_permit);
if (_read_ahead) {
co_await *std::exchange(_read_ahead, std::nullopt);
co_return;
}
if (!is_buffer_empty()) {
co_return;
}
co_await do_fill_buffer();
}
future<> shard_reader_v2::next_partition() {
if (!_reader) {
co_return;
}
// FIXME: want to move this to the inner scopes but it makes clang miscompile the code.
reader_permit::blocked_guard guard(_permit);
if (_read_ahead) {
co_await *std::exchange(_read_ahead, std::nullopt);
}
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
co_return;
}
co_return co_await smp::submit_to(_shard, [this] {
return _reader->next_partition();
});
}
future<> shard_reader_v2::fast_forward_to(const dht::partition_range& pr) {
if (!_reader && !_read_ahead) {
// No need to fast-forward uncreated readers, they will be passed the new
// range when created.
_pr = make_foreign(make_lw_shared<const dht::partition_range>(pr));
co_return;
}
reader_permit::blocked_guard guard(_permit);
if (_read_ahead) {
co_await *std::exchange(_read_ahead, std::nullopt);
}
_end_of_stream = false;
clear_buffer();
_pr = co_await smp::submit_to(_shard, [this, &pr] () -> future<foreign_ptr<lw_shared_ptr<const dht::partition_range>>> {
auto new_pr = make_lw_shared<const dht::partition_range>(pr);
co_await _reader->fast_forward_to(*new_pr);
_lifecycle_policy->update_read_range(new_pr);
co_return make_foreign(std::move(new_pr));
});
}
future<> shard_reader_v2::fast_forward_to(position_range) {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
void shard_reader_v2::read_ahead() {
if (_read_ahead || is_end_of_stream() || !is_buffer_empty()) {
return;
}
_read_ahead.emplace(do_fill_buffer());
}
} // anonymous namespace
// See make_multishard_combining_reader() for description.
class multishard_combining_reader_v2 : public flat_mutation_reader_v2::impl {
struct shard_and_token {
shard_id shard;
dht::token token;
bool operator<(const shard_and_token& o) const {
// Reversed, as we want a min-heap.
return token > o.token;
}
};
const dht::sharder& _sharder;
std::vector<std::unique_ptr<shard_reader_v2>> _shard_readers;
// Contains the position of each shard with token granularity, organized
// into a min-heap. Used to select the shard with the smallest token each
// time a shard reader produces a new partition.
std::vector<shard_and_token> _shard_selection_min_heap;
unsigned _current_shard;
bool _crossed_shards;
unsigned _concurrency = 1;
void on_partition_range_change(const dht::partition_range& pr);
bool maybe_move_to_next_shard(const dht::token* const t = nullptr);
future<> handle_empty_reader_buffer();
public:
multishard_combining_reader_v2(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
// this is captured.
multishard_combining_reader_v2(const multishard_combining_reader_v2&) = delete;
multishard_combining_reader_v2& operator=(const multishard_combining_reader_v2&) = delete;
multishard_combining_reader_v2(multishard_combining_reader_v2&&) = delete;
multishard_combining_reader_v2& operator=(multishard_combining_reader_v2&&) = delete;
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
virtual future<> fast_forward_to(position_range pr) override;
virtual future<> close() noexcept override;
};
void multishard_combining_reader_v2::on_partition_range_change(const dht::partition_range& pr) {
_shard_selection_min_heap.clear();
_shard_selection_min_heap.reserve(_sharder.shard_count());
auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token();
_current_shard = _sharder.shard_of(token);
auto sharder = dht::ring_position_range_sharder(_sharder, pr);
auto next = sharder.next(*_schema);
// The first value of `next` is thrown away, as it is the ring range of the current shard.
// We only want to do a full round, until we get back to the shard we started from (`_current_shard`).
// We stop earlier if the sharder has no ranges for the remaining shards.
for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) {
_shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()});
boost::push_heap(_shard_selection_min_heap);
}
}
bool multishard_combining_reader_v2::maybe_move_to_next_shard(const dht::token* const t) {
if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) {
return false;
}
boost::pop_heap(_shard_selection_min_heap);
const auto next_shard = _shard_selection_min_heap.back().shard;
_shard_selection_min_heap.pop_back();
if (t) {
_shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t});
boost::push_heap(_shard_selection_min_heap);
}
_crossed_shards = true;
_current_shard = next_shard;
return true;
}
future<> multishard_combining_reader_v2::handle_empty_reader_buffer() {
auto& reader = *_shard_readers[_current_shard];
if (reader.is_end_of_stream()) {
if (_shard_selection_min_heap.empty()) {
_end_of_stream = true;
} else {
maybe_move_to_next_shard();
}
return make_ready_future<>();
} else if (reader.is_read_ahead_in_progress()) {
return reader.fill_buffer();
} else {
// If we crossed shards and the next reader has an empty buffer we
// double concurrency so the next time we cross shards we will have
// more chances of hitting the reader's buffer.
if (_crossed_shards) {
_concurrency = std::min(_concurrency * 2, _sharder.shard_count());
// Read ahead shouldn't change the min selection heap so we work on a local copy.
auto shard_selection_min_heap_copy = _shard_selection_min_heap;
// If concurrency > 1 we kick-off concurrency-1 read-aheads in the
// background. They will be brought to the foreground when we move
// to their respective shard.
for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) {
boost::pop_heap(shard_selection_min_heap_copy);
const auto next_shard = shard_selection_min_heap_copy.back().shard;
shard_selection_min_heap_copy.pop_back();
_shard_readers[next_shard]->read_ahead();
}
}
return reader.fill_buffer();
}
}
multishard_combining_reader_v2::multishard_combining_reader_v2(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(s), std::move(permit)), _sharder(sharder) {
on_partition_range_change(pr);
_shard_readers.reserve(_sharder.shard_count());
for (unsigned i = 0; i < _sharder.shard_count(); ++i) {
_shard_readers.emplace_back(std::make_unique<shard_reader_v2>(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
}
}
future<> multishard_combining_reader_v2::fill_buffer() {
_crossed_shards = false;
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] {
auto& reader = *_shard_readers[_current_shard];
if (reader.is_buffer_empty()) {
return handle_empty_reader_buffer();
}
while (!reader.is_buffer_empty() && !is_buffer_full()) {
if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) {
return make_ready_future<>();
}
push_mutation_fragment(reader.pop_mutation_fragment());
}
return make_ready_future<>();
});
}
future<> multishard_combining_reader_v2::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
return _shard_readers[_current_shard]->next_partition();
}
return make_ready_future<>();
}
future<> multishard_combining_reader_v2::fast_forward_to(const dht::partition_range& pr) {
clear_buffer();
_end_of_stream = false;
on_partition_range_change(pr);
return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr<shard_reader_v2>& sr) {
return sr->fast_forward_to(pr);
});
}
future<> multishard_combining_reader_v2::fast_forward_to(position_range pr) {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
future<> multishard_combining_reader_v2::close() noexcept {
return parallel_for_each(_shard_readers, [] (std::unique_ptr<shard_reader_v2>& sr) {
return sr->close();
});
}
flat_mutation_reader_v2 make_multishard_combining_reader_v2(
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
const dht::sharder& sharder = schema->get_sharder();
return make_flat_mutation_reader_v2<multishard_combining_reader_v2>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
std::move(trace_state), fwd_mr);
}
flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
return make_flat_mutation_reader_v2<multishard_combining_reader_v2>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
std::move(trace_state), fwd_mr);
}
class queue_reader final : public flat_mutation_reader::impl {
friend class queue_reader_handle;
private:
queue_reader_handle* _handle = nullptr;
std::optional<promise<>> _not_full;
std::optional<promise<>> _full;
std::exception_ptr _ex;
private:
void push_and_maybe_notify(mutation_fragment&& mf) {
push_mutation_fragment(std::move(mf));
if (_full && is_buffer_full()) {
_full->set_value();
_full.reset();
}
}
public:
explicit queue_reader(schema_ptr s, reader_permit permit)
: impl(std::move(s), std::move(permit)) {
}
virtual future<> fill_buffer() override {
if (_ex) {
return make_exception_future<>(_ex);
}
if (_end_of_stream || !is_buffer_empty()) {
return make_ready_future<>();
}
if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
_full.emplace();
return _full->get_future();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
return fill_buffer().then([this] {
return next_partition();
});
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
// wake up any waiters to prevent broken_promise errors
if (_full) {
_full->set_value();
_full.reset();
} else if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
// detach from the queue_reader_handle
// since it should never access the reader after close.
if (_handle) {
_handle->_reader = nullptr;
_handle = nullptr;
}
return make_ready_future<>();
}
future<> push(mutation_fragment&& mf) {
push_and_maybe_notify(std::move(mf));
if (!is_buffer_full()) {
return make_ready_future<>();
}
_not_full.emplace();
return _not_full->get_future();
}
void push_end_of_stream() {
_end_of_stream = true;
if (_full) {
_full->set_value();
_full.reset();
}
}
void abort(std::exception_ptr ep) noexcept {
_ex = std::move(ep);
if (_full) {
_full->set_exception(_ex);
_full.reset();
} else if (_not_full) {
_not_full->set_exception(_ex);
_not_full.reset();
}
}
};
void queue_reader_handle::abandon() noexcept {
std::exception_ptr ex;
try {
ex = std::make_exception_ptr<std::runtime_error>(std::runtime_error("Abandoned queue_reader_handle"));
} catch (...) {
ex = std::current_exception();
}
abort(std::move(ex));
}
queue_reader_handle::queue_reader_handle(queue_reader& reader) noexcept : _reader(&reader) {
_reader->_handle = this;
}
queue_reader_handle::queue_reader_handle(queue_reader_handle&& o) noexcept
: _reader(std::exchange(o._reader, nullptr))
, _ex(std::exchange(o._ex, nullptr))
{
if (_reader) {
_reader->_handle = this;
}
}
queue_reader_handle::~queue_reader_handle() {
abandon();
}
queue_reader_handle& queue_reader_handle::operator=(queue_reader_handle&& o) {
abandon();
_reader = std::exchange(o._reader, nullptr);
_ex = std::exchange(o._ex, {});
if (_reader) {
_reader->_handle = this;
}
return *this;
}
future<> queue_reader_handle::push(mutation_fragment mf) {
if (!_reader) {
if (_ex) {
return make_exception_future<>(_ex);
}
return make_exception_future<>(std::runtime_error("Dangling queue_reader_handle"));
}
return _reader->push(std::move(mf));
}
void queue_reader_handle::push_end_of_stream() {
if (!_reader) {
throw std::runtime_error("Dangling queue_reader_handle");
}
_reader->push_end_of_stream();
_reader->_handle = nullptr;
_reader = nullptr;
}
bool queue_reader_handle::is_terminated() const {
return _reader == nullptr;
}
void queue_reader_handle::abort(std::exception_ptr ep) {
_ex = std::move(ep);
if (_reader) {
_reader->abort(_ex);
_reader->_handle = nullptr;
_reader = nullptr;
}
}
std::exception_ptr queue_reader_handle::get_exception() const noexcept {
return _ex;
}
std::pair<flat_mutation_reader, queue_reader_handle> make_queue_reader(schema_ptr s, reader_permit permit) {
auto impl = std::make_unique<queue_reader>(std::move(s), std::move(permit));
auto handle = queue_reader_handle(*impl);
return {flat_mutation_reader(std::move(impl)), std::move(handle)};
}
class queue_reader_v2 final : public flat_mutation_reader_v2::impl {
friend class queue_reader_handle_v2;
private:
queue_reader_handle_v2* _handle = nullptr;
std::optional<promise<>> _not_full;
std::optional<promise<>> _full;
std::exception_ptr _ex;
private:
void push_and_maybe_notify(mutation_fragment_v2&& mf) {
push_mutation_fragment(std::move(mf));
if (_full && is_buffer_full()) {
_full->set_value();
_full.reset();
}
}
public:
explicit queue_reader_v2(schema_ptr s, reader_permit permit)
: impl(std::move(s), std::move(permit)) {
}
virtual future<> fill_buffer() override {
if (_ex) {
return make_exception_future<>(_ex);
}
if (_end_of_stream || !is_buffer_empty()) {
return make_ready_future<>();
}
if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
_full.emplace();
return _full->get_future();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
return fill_buffer().then([this] {
return next_partition();
});
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
// wake up any waiters to prevent broken_promise errors
if (_full) {
_full->set_value();
_full.reset();
} else if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
// detach from the queue_reader_handle
// since it should never access the reader after close.
if (_handle) {
_handle->_reader = nullptr;
_handle = nullptr;
}
return make_ready_future<>();
}
future<> push(mutation_fragment_v2&& mf) {
push_and_maybe_notify(std::move(mf));
if (!is_buffer_full()) {
return make_ready_future<>();
}
_not_full.emplace();
return _not_full->get_future();
}
void push_end_of_stream() {
_end_of_stream = true;
if (_full) {
_full->set_value();
_full.reset();
}
}
void abort(std::exception_ptr ep) noexcept {
_ex = std::move(ep);
if (_full) {
_full->set_exception(_ex);
_full.reset();
} else if (_not_full) {
_not_full->set_exception(_ex);
_not_full.reset();
}
}
};
void queue_reader_handle_v2::abandon() noexcept {
std::exception_ptr ex;
try {
ex = std::make_exception_ptr<std::runtime_error>(std::runtime_error("Abandoned queue_reader_handle_v2"));
} catch (...) {
ex = std::current_exception();
}
abort(std::move(ex));
}
queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_v2& reader) noexcept : _reader(&reader) {
_reader->_handle = this;
}
queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept
: _reader(std::exchange(o._reader, nullptr))
, _ex(std::exchange(o._ex, nullptr))
{
if (_reader) {
_reader->_handle = this;
}
}
queue_reader_handle_v2::~queue_reader_handle_v2() {
abandon();
}
queue_reader_handle_v2& queue_reader_handle_v2::operator=(queue_reader_handle_v2&& o) {
abandon();
_reader = std::exchange(o._reader, nullptr);
_ex = std::exchange(o._ex, {});
if (_reader) {
_reader->_handle = this;
}
return *this;
}
future<> queue_reader_handle_v2::push(mutation_fragment_v2 mf) {
if (!_reader) {
if (_ex) {
return make_exception_future<>(_ex);
}
return make_exception_future<>(std::runtime_error("Dangling queue_reader_handle_v2"));
}
return _reader->push(std::move(mf));
}
void queue_reader_handle_v2::push_end_of_stream() {
if (!_reader) {
throw std::runtime_error("Dangling queue_reader_handle_v2");
}
_reader->push_end_of_stream();
_reader->_handle = nullptr;
_reader = nullptr;
}
bool queue_reader_handle_v2::is_terminated() const {
return _reader == nullptr;
}
void queue_reader_handle_v2::abort(std::exception_ptr ep) {
_ex = std::move(ep);
if (_reader) {
_reader->abort(_ex);
_reader->_handle = nullptr;
_reader = nullptr;
}
}
std::exception_ptr queue_reader_handle_v2::get_exception() const noexcept {
return _ex;
}
std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> make_queue_reader_v2(schema_ptr s, reader_permit permit) {
auto impl = std::make_unique<queue_reader_v2>(std::move(s), std::move(permit));
auto handle = queue_reader_handle_v2(*impl);
return {flat_mutation_reader_v2(std::move(impl)), std::move(handle)};
}
namespace {
class compacting_reader : public flat_mutation_reader_v2::impl {
friend class compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes>;
private:
flat_mutation_reader_v2 _reader;
compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes> _compactor;
noop_compacted_fragments_consumer _gc_consumer;
// Uncompacted stream
partition_start _last_uncompacted_partition_start;
mutation_fragment_v2::kind _last_uncompacted_kind = mutation_fragment_v2::kind::partition_end;
// Compacted stream
bool _has_compacted_partition_start = false;
bool _ignore_partition_end = false;
private:
void maybe_push_partition_start() {
if (_has_compacted_partition_start) {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(_last_uncompacted_partition_start)));
_has_compacted_partition_start = false;
}
}
void maybe_inject_partition_end() {
// The compactor needs a valid stream, but downstream doesn't care about
// the injected partition end, so ignore it.
if (_last_uncompacted_kind != mutation_fragment_v2::kind::partition_end) {
_ignore_partition_end = true;
_compactor.consume_end_of_partition(*this, _gc_consumer);
_ignore_partition_end = false;
}
}
void consume_new_partition(const dht::decorated_key& dk) {
_has_compacted_partition_start = true;
// We need to reset the partition's tombstone here. If the tombstone is
// compacted away, `consume(tombstone)` below is simply not called. If
// it is not compacted away, `consume(tombstone)` below will restore it.
_last_uncompacted_partition_start.partition_tombstone() = {};
}
void consume(tombstone t) {
_last_uncompacted_partition_start.partition_tombstone() = t;
maybe_push_partition_start();
}
stop_iteration consume(static_row&& sr, tombstone, bool) {
maybe_push_partition_start();
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(sr)));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
maybe_push_partition_start();
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(cr)));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rtc) {
// The compactor will close the active tombstone (if any) on partition
// end. We ignore this when we don't care about the partition-end.
if (_ignore_partition_end) {
return stop_iteration::no;
}
maybe_push_partition_start();
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
maybe_push_partition_start();
if (!_ignore_partition_end) {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end{}));
}
return stop_iteration::no;
}
void consume_end_of_stream() {
}
streamed_mutation::forwarding _fwd;
public:
compacting_reader(flat_mutation_reader_v2 source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no)
: impl(source.schema(), source.permit())
, _reader(std::move(source))
, _compactor(*_schema, compaction_time, get_max_purgeable)
, _last_uncompacted_partition_start(dht::decorated_key(dht::minimum_token(), partition_key::make_empty()), tombstone{})
, _fwd(fwd) {
}
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return _reader.fill_buffer().then([this] {
if (_reader.is_buffer_empty()) {
_end_of_stream = _reader.is_end_of_stream();
if (_end_of_stream && _fwd) {
maybe_push_partition_start();
}
}
// It is important to not consume more than we actually need.
// Doing so leads to corner cases around `next_partition()`. The
// fragments consumed after our buffer is full might not be
// emitted by the compactor, so on a following `next_partition()`
// call we won't be able to determine whether we are at a
// partition boundary or not and thus whether we need to forward
// it to the underlying reader or not.
// This problem doesn't exist when we want more fragments, in this
// case we'll keep reading until the compactor emits something or
// we read EOS, and thus we'll know where we are.
while (!_reader.is_buffer_empty() && !is_buffer_full()) {
auto mf = _reader.pop_mutation_fragment();
_last_uncompacted_kind = mf.mutation_fragment_kind();
switch (mf.mutation_fragment_kind()) {
case mutation_fragment_v2::kind::static_row:
_compactor.consume(std::move(mf).as_static_row(), *this, _gc_consumer);
break;
case mutation_fragment_v2::kind::clustering_row:
_compactor.consume(std::move(mf).as_clustering_row(), *this, _gc_consumer);
break;
case mutation_fragment_v2::kind::range_tombstone_change:
_compactor.consume(std::move(mf).as_range_tombstone_change(), *this, _gc_consumer);
break;
case mutation_fragment_v2::kind::partition_start:
_last_uncompacted_partition_start = std::move(mf).as_partition_start();
_compactor.consume_new_partition(_last_uncompacted_partition_start.key());
if (_last_uncompacted_partition_start.partition_tombstone()) {
_compactor.consume(_last_uncompacted_partition_start.partition_tombstone(), *this, _gc_consumer);
}
if (_fwd) {
_compactor.force_partition_not_empty(*this);
}
break;
case mutation_fragment_v2::kind::partition_end:
_compactor.consume_end_of_partition(*this, _gc_consumer);
break;
}
}
});
});
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return make_ready_future<>();
}
_end_of_stream = false;
maybe_inject_partition_end();
return _reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_end_of_stream = false;
maybe_inject_partition_end();
return _reader.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
forward_buffer_to(pr.start());
_end_of_stream = false;
return _reader.fast_forward_to(std::move(pr));
}
virtual future<> close() noexcept override {
return _reader.close();
}
};
} // anonymous namespace
flat_mutation_reader_v2 make_compacting_reader(flat_mutation_reader_v2 source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_v2<compacting_reader>(std::move(source), compaction_time, get_max_purgeable, fwd);
}