Merge "Convert input side of mutation compactor to v2" from Botond

"
With this series the mutation compactor can now consume a v2 stream. On
the output side it still uses v1, so it can now act as an online
v2->v1 converter. This allows us to push out v2->v1 conversion to as far
as the compactor, usually the next to last component in a read pipeline,
just before the final consumer. For reads this is as far as we can go,
as the intra-node ABI and hence the result-sets built are v1. For
compaction we could go further and eliminate conversion altogether, but
this requires some further work on both the compactor and the sstable
writer and so it is left to be done later.
To summarize, this patchset enables a v2 input for the compactor and it
updates compaction and single partition reads to use it.
"

* 'mutation-compactor-consume-v2/v1' of https://github.com/denesb/scylla:
  table: add make_reader_v2()
  querier: convert querier_cache and {data,mutation}_querier to v2
  compaction: upgrade compaction::make_interposer_consumer() to v2
  mutation_reader: remove unecessary stable_flattened_mutations_consumer
  compaction/compaction_strategy: convert make_interposer_consumer() to v2
  mutation_writer: migrate timestamp_based_splitting_writer to v2
  mutation_writer: migrate shard_based_splitting_writer to v2
  mutation_writer: add v2 clone of feed_writer and bucket_writer
  flat_mutation_reader_v2: add reader_consumer_v2 typedef
  mutation_reader: add v2 clone of queue_reader
  compact_mutation: make start_new_page() independent of mutation_fragment version
  compact_mutation: add support for consuming a v2 stream
  compact_mutation: extract range tombstone consumption into own method
  range_tombstone_assembler: add get_range_tombstone_change()
  range_tombstone_assembler: add get_current_tombstone()
This commit is contained in:
Avi Kivity
2022-01-12 14:37:19 +02:00
27 changed files with 566 additions and 136 deletions

View File

@@ -696,14 +696,14 @@ private:
// compacting_reader. It's useful for allowing data from different buckets
// to be compacted together.
future<> consume_without_gc_writer(gc_clock::time_point compaction_time) {
auto consumer = make_interposer_consumer([this] (flat_mutation_reader reader) mutable {
return seastar::async([this, reader = std::move(reader)] () mutable {
auto consumer = make_interposer_consumer([this] (flat_mutation_reader_v2 reader) mutable {
return seastar::async([this, reader = downgrade_to_v1(std::move(reader))] () mutable {
auto close_reader = deferred_close(reader);
auto cfc = make_stable_flattened_mutations_consumer<compacted_fragments_writer>(get_compacted_fragments_writer());
auto cfc = compacted_fragments_writer(get_compacted_fragments_writer());
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func()));
return consumer(upgrade_to_v2(make_compacting_reader(downgrade_to_v1(make_sstable_reader()), compaction_time, max_purgeable_func())));
}
future<> consume() {
@@ -714,14 +714,14 @@ private:
if (!enable_garbage_collected_sstable_writer() && use_interposer_consumer()) {
return consume_without_gc_writer(now);
}
auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader reader) mutable
auto consumer = make_interposer_consumer([this, now] (flat_mutation_reader_v2 reader) mutable
{
return seastar::async([this, reader = std::move(reader), now] () mutable {
auto close_reader = deferred_close(reader);
if (enable_garbage_collected_sstable_writer()) {
using compact_mutations = compact_for_compaction<compacted_fragments_writer, compacted_fragments_writer>;
auto cfc = make_stable_flattened_mutations_consumer<compact_mutations>(*schema(), now,
auto cfc = compact_mutations(*schema(), now,
max_purgeable_func(),
get_compacted_fragments_writer(),
get_gc_compacted_fragments_writer());
@@ -730,17 +730,17 @@ private:
return;
}
using compact_mutations = compact_for_compaction<compacted_fragments_writer, noop_compacted_fragments_consumer>;
auto cfc = make_stable_flattened_mutations_consumer<compact_mutations>(*schema(), now,
auto cfc = compact_mutations(*schema(), now,
max_purgeable_func(),
get_compacted_fragments_writer(),
noop_compacted_fragments_consumer());
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(downgrade_to_v1(make_sstable_reader()));
return consumer(make_sstable_reader());
}
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) {
virtual reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) {
return _table_s.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
}
@@ -1466,16 +1466,16 @@ public:
}
}
reader_consumer make_interposer_consumer(reader_consumer end_consumer) override {
reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override {
if (!use_interposer_consumer()) {
return end_consumer;
}
return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> {
return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> {
auto cfg = mutation_writer::segregate_config{_io_priority, memory::stats().total_memory() / 10};
return mutation_writer::segregate_by_partition(std::move(reader), cfg,
return mutation_writer::segregate_by_partition(downgrade_to_v1(std::move(reader)), cfg,
[consumer = std::move(end_consumer), this] (flat_mutation_reader rd) {
++_bucket_count;
return consumer(std::move(rd));
return consumer(upgrade_to_v2(std::move(rd)));
});
};
}
@@ -1549,8 +1549,8 @@ public:
}
reader_consumer make_interposer_consumer(reader_consumer end_consumer) override {
return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> {
reader_consumer_v2 make_interposer_consumer(reader_consumer_v2 end_consumer) override {
return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 reader) mutable -> future<> {
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
};
}

View File

@@ -87,7 +87,7 @@ uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_sour
return partition_estimate;
}
reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) {
reader_consumer_v2 compaction_strategy_impl::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) {
return end_consumer;
}
@@ -667,7 +667,7 @@ uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_me
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
}
reader_consumer compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) {
reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) {
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
}

View File

@@ -122,7 +122,7 @@ public:
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer);
// Returns whether or not interposer consumer is used by a given strategy.
bool use_interposer_consumer() const;

View File

@@ -80,7 +80,7 @@ public:
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer);
virtual bool use_interposer_consumer() const {
return false;

View File

@@ -123,12 +123,12 @@ uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutati
return partition_estimate / std::max(1UL, uint64_t(estimated_window_count));
}
reader_consumer time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) {
reader_consumer_v2 time_window_compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) {
if (ms_meta.min_timestamp && ms_meta.max_timestamp
&& get_window_for(_options, *ms_meta.min_timestamp) == get_window_for(_options, *ms_meta.max_timestamp)) {
return end_consumer;
}
return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader rd) mutable -> future<> {
return [options = _options, end_consumer = std::move(end_consumer)] (flat_mutation_reader_v2 rd) mutable -> future<> {
return mutation_writer::segregate_by_timestamp(
std::move(rd),
classify_by_timestamp(std::move(options)),

View File

@@ -191,7 +191,7 @@ public:
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override;
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override;
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) override;
virtual bool use_interposer_consumer() const override {
return true;

View File

@@ -857,3 +857,8 @@ make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<m
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr, reader_permit, std::deque<mutation_fragment_v2>, const dht::partition_range& pr, const query::partition_slice& slice);
/// A cosumer function that is passed a flat_mutation_reader to be consumed from
/// and returns a future<> resolved when the reader is fully consumed, and closed.
/// Note: the function assumes ownership of the reader and must close it in all cases.
using reader_consumer_v2 = noncopyable_function<future<> (flat_mutation_reader_v2)>;

View File

@@ -451,11 +451,13 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
auto& sharder = _schema->get_sharder();
const auto shard = sharder.shard_of(compaction_state.partition_start.key().token());
auto& range_tombstones = std::get<std::deque<range_tombstone>>(compaction_state.range_tombstones);
// It is possible that the reader this partition originates from does not
// exist anymore. Either because we failed stopping it or because it was
// evicted.
if (_readers[shard].state != reader_state::saving) {
for (auto& rt : compaction_state.range_tombstones) {
for (auto& rt : range_tombstones) {
stats.add_discarded(*_schema, rt);
}
if (compaction_state.static_row) {
@@ -467,7 +469,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
for (auto& rt : compaction_state.range_tombstones | boost::adaptors::reversed) {
for (auto& rt : range_tombstones | boost::adaptors::reversed) {
stats.add(*_schema, rt);
shard_buffer.emplace_front(*_schema, _permit, std::move(rt));
}

View File

@@ -23,6 +23,7 @@
#include "compaction/compaction_garbage_collector.hh"
#include "mutation_fragment.hh"
#include "range_tombstone_assembler.hh"
#include "tombstone_gc.hh"
static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) {
@@ -57,7 +58,7 @@ concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::dec
struct detached_compaction_state {
::partition_start partition_start;
std::optional<::static_row> static_row;
std::deque<range_tombstone> range_tombstones;
std::variant<std::deque<range_tombstone>, std::optional<range_tombstone_change>> range_tombstones;
};
class noop_compacted_fragments_consumer {
@@ -160,7 +161,8 @@ class compact_mutation_state {
uint32_t _partition_limit{};
uint64_t _partition_row_limit{};
range_tombstone_accumulator _range_tombstones;
range_tombstone_accumulator _range_tombstones; // used when consuming v1 stream and for storing the partition tombstone
std::optional<range_tombstone_assembler> _rt_assembler; // used when consuming a v2 stream
bool _static_row_live{};
uint64_t _rows_in_current_partition;
@@ -177,6 +179,32 @@ class compact_mutation_state {
compaction_stats _stats;
private:
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumer<Consumer> && CompactedFragmentsConsumer<GCConsumer>
stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) {
if (rt.tomb <= _range_tombstones.get_partition_tombstone()) {
return stop_iteration::no;
}
if (can_purge_tombstone(rt.tomb)) {
partition_is_not_empty_for_gc_consumer(gc_consumer);
return gc_consumer.consume(std::move(rt));
} else {
partition_is_not_empty(consumer);
return consumer.consume(std::move(rt));
}
}
template <typename Consumer, typename GCConsumer>
tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) {
if (!_rt_assembler) { // we are either consuming v1 or consumed no range tombstone [change] at all
return _range_tombstones.tombstone_for_row(ckey);
}
if (_rt_assembler->needs_flush()) {
if (auto rt_opt = _rt_assembler->flush(_schema, position_in_partition::after_key(ckey))) {
do_consume(std::move(*rt_opt), consumer, gc_consumer);
}
}
return std::max(_range_tombstones.get_partition_tombstone(), _rt_assembler->get_current_tombstone());
}
static constexpr bool only_live() {
return OnlyLive == emit_only_live_rows::yes;
}
@@ -292,6 +320,9 @@ public:
_rows_in_current_partition = 0;
_static_row_live = false;
_range_tombstones.clear();
if (_rt_assembler) {
_rt_assembler->reset();
}
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_max_purgeable = api::missing_timestamp;
_gc_before = std::nullopt;
@@ -345,7 +376,7 @@ public:
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumer<Consumer> && CompactedFragmentsConsumer<GCConsumer>
stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) {
auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key());
auto current_tombstone = tombstone_for_row(cr.key(), consumer, gc_consumer);
auto t = cr.tomb();
t.apply(current_tombstone);
@@ -409,21 +440,28 @@ public:
++_stats.range_tombstones;
_range_tombstones.apply(rt);
// FIXME: drop tombstone if it is fully covered by other range tombstones
if (rt.tomb > _range_tombstones.get_partition_tombstone()) {
if (can_purge_tombstone(rt.tomb)) {
partition_is_not_empty_for_gc_consumer(gc_consumer);
return gc_consumer.consume(std::move(rt));
} else {
partition_is_not_empty(consumer);
return consumer.consume(std::move(rt));
}
}
return do_consume(std::move(rt), consumer, gc_consumer);
}
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumer<Consumer> && CompactedFragmentsConsumer<GCConsumer>
stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
++_stats.range_tombstones;
if (!_rt_assembler) {
_rt_assembler.emplace();
}
if (auto rt_opt = _rt_assembler->consume(_schema, std::move(rtc))) {
do_consume(std::move(*rt_opt), consumer, gc_consumer);
}
return stop_iteration::no;
}
template <typename Consumer, typename GCConsumer>
requires CompactedFragmentsConsumer<Consumer> && CompactedFragmentsConsumer<GCConsumer>
stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) {
if (_rt_assembler) {
_rt_assembler->on_end_of_stream();
}
if (!_empty_partition_in_gc_consumer) {
gc_consumer.consume_end_of_partition();
}
@@ -475,7 +513,7 @@ public:
void start_new_page(uint64_t row_limit,
uint32_t partition_limit,
gc_clock::time_point query_time,
mutation_fragment::kind next_fragment_kind,
partition_region next_fragment_region,
Consumer& consumer) {
_empty_partition = true;
_static_row_live = false;
@@ -486,8 +524,7 @@ public:
_query_time = query_time;
_stats = {};
if ((next_fragment_kind == mutation_fragment::kind::clustering_row || next_fragment_kind == mutation_fragment::kind::range_tombstone)
&& _last_static_row) {
if (next_fragment_region == partition_region::clustered && _last_static_row) {
// Stopping here would cause an infinite loop so ignore return value.
noop_compacted_fragments_consumer nc;
consume(*std::exchange(_last_static_row, {}), consumer, nc);
@@ -507,6 +544,9 @@ public:
/// allows the compaction state to be stored in the compacted reader.
detached_compaction_state detach_state() && {
partition_start ps(std::move(_last_dk), _range_tombstones.get_partition_tombstone());
if (_rt_assembler) {
return {std::move(ps), std::move(_last_static_row), std::move(*_rt_assembler).get_range_tombstone_change()};
}
return {std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()};
}
@@ -564,6 +604,10 @@ public:
return _state->consume(std::move(rt), _consumer, _gc_consumer);
}
stop_iteration consume(range_tombstone_change&& rtc) {
return _state->consume(std::move(rtc), _consumer, _gc_consumer);
}
stop_iteration consume_end_of_partition() {
return _state->consume_end_of_partition(_consumer, _gc_consumer);
}

View File

@@ -2309,7 +2309,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
// do_with() doesn't support immovable objects
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
auto cwqrb = counter_write_query_result_builder(*s);
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
auto cfq = compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>(
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb));
auto f = r_a_r->reader.consume(std::move(cfq));
return f.finally([r_a_r = std::move(r_a_r)] {

View File

@@ -3108,6 +3108,180 @@ std::pair<flat_mutation_reader, queue_reader_handle> make_queue_reader(schema_pt
return {flat_mutation_reader(std::move(impl)), std::move(handle)};
}
class queue_reader_v2 final : public flat_mutation_reader_v2::impl {
friend class queue_reader_handle_v2;
private:
queue_reader_handle_v2* _handle = nullptr;
std::optional<promise<>> _not_full;
std::optional<promise<>> _full;
std::exception_ptr _ex;
private:
void push_and_maybe_notify(mutation_fragment_v2&& mf) {
push_mutation_fragment(std::move(mf));
if (_full && is_buffer_full()) {
_full->set_value();
_full.reset();
}
}
public:
explicit queue_reader_v2(schema_ptr s, reader_permit permit)
: impl(std::move(s), std::move(permit)) {
}
virtual future<> fill_buffer() override {
if (_ex) {
return make_exception_future<>(_ex);
}
if (_end_of_stream || !is_buffer_empty()) {
return make_ready_future<>();
}
if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
_full.emplace();
return _full->get_future();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
return fill_buffer().then([this] {
return next_partition();
});
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
// wake up any waiters to prevent broken_promise errors
if (_full) {
_full->set_value();
_full.reset();
} else if (_not_full) {
_not_full->set_value();
_not_full.reset();
}
// detach from the queue_reader_handle
// since it should never access the reader after close.
if (_handle) {
_handle->_reader = nullptr;
_handle = nullptr;
}
return make_ready_future<>();
}
future<> push(mutation_fragment_v2&& mf) {
push_and_maybe_notify(std::move(mf));
if (!is_buffer_full()) {
return make_ready_future<>();
}
_not_full.emplace();
return _not_full->get_future();
}
void push_end_of_stream() {
_end_of_stream = true;
if (_full) {
_full->set_value();
_full.reset();
}
}
void abort(std::exception_ptr ep) noexcept {
_ex = std::move(ep);
if (_full) {
_full->set_exception(_ex);
_full.reset();
} else if (_not_full) {
_not_full->set_exception(_ex);
_not_full.reset();
}
}
};
void queue_reader_handle_v2::abandon() noexcept {
std::exception_ptr ex;
try {
ex = std::make_exception_ptr<std::runtime_error>(std::runtime_error("Abandoned queue_reader_handle_v2"));
} catch (...) {
ex = std::current_exception();
}
abort(std::move(ex));
}
queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_v2& reader) noexcept : _reader(&reader) {
_reader->_handle = this;
}
queue_reader_handle_v2::queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept
: _reader(std::exchange(o._reader, nullptr))
, _ex(std::exchange(o._ex, nullptr))
{
if (_reader) {
_reader->_handle = this;
}
}
queue_reader_handle_v2::~queue_reader_handle_v2() {
abandon();
}
queue_reader_handle_v2& queue_reader_handle_v2::operator=(queue_reader_handle_v2&& o) {
abandon();
_reader = std::exchange(o._reader, nullptr);
_ex = std::exchange(o._ex, {});
if (_reader) {
_reader->_handle = this;
}
return *this;
}
future<> queue_reader_handle_v2::push(mutation_fragment_v2 mf) {
if (!_reader) {
if (_ex) {
return make_exception_future<>(_ex);
}
return make_exception_future<>(std::runtime_error("Dangling queue_reader_handle_v2"));
}
return _reader->push(std::move(mf));
}
void queue_reader_handle_v2::push_end_of_stream() {
if (!_reader) {
throw std::runtime_error("Dangling queue_reader_handle_v2");
}
_reader->push_end_of_stream();
_reader->_handle = nullptr;
_reader = nullptr;
}
bool queue_reader_handle_v2::is_terminated() const {
return _reader == nullptr;
}
void queue_reader_handle_v2::abort(std::exception_ptr ep) {
_ex = std::move(ep);
if (_reader) {
_reader->abort(_ex);
_reader->_handle = nullptr;
_reader = nullptr;
}
}
std::exception_ptr queue_reader_handle_v2::get_exception() const noexcept {
return _ex;
}
std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> make_queue_reader_v2(schema_ptr s, reader_permit permit) {
auto impl = std::make_unique<queue_reader_v2>(std::move(s), std::move(permit));
auto handle = queue_reader_handle_v2(*impl);
return {flat_mutation_reader_v2(std::move(impl)), std::move(handle)};
}
namespace {
class compacting_reader : public flat_mutation_reader::impl {

View File

@@ -359,26 +359,6 @@ snapshot_source make_empty_snapshot_source();
using mutation_source_opt = optimized_optional<mutation_source>;
// Adapts a non-movable FlattenedConsumer to a movable one.
template<typename FlattenedConsumer>
class stable_flattened_mutations_consumer {
std::unique_ptr<FlattenedConsumer> _ptr;
public:
stable_flattened_mutations_consumer(std::unique_ptr<FlattenedConsumer> ptr) : _ptr(std::move(ptr)) {}
auto consume_new_partition(const dht::decorated_key& dk) { return _ptr->consume_new_partition(dk); }
auto consume(tombstone t) { return _ptr->consume(t); }
auto consume(static_row&& sr) { return _ptr->consume(std::move(sr)); }
auto consume(clustering_row&& cr) { return _ptr->consume(std::move(cr)); }
auto consume(range_tombstone&& rt) { return _ptr->consume(std::move(rt)); }
auto consume_end_of_partition() { return _ptr->consume_end_of_partition(); }
auto consume_end_of_stream() { return _ptr->consume_end_of_stream(); }
};
template<typename FlattenedConsumer, typename... Args>
stable_flattened_mutations_consumer<FlattenedConsumer> make_stable_flattened_mutations_consumer(Args&&... args) {
return { std::make_unique<FlattenedConsumer>(std::forward<Args>(args)...) };
}
/// Make a foreign_reader.
///
/// foreign_reader is a local representant of a reader located on a remote
@@ -800,6 +780,51 @@ public:
std::pair<flat_mutation_reader, queue_reader_handle> make_queue_reader(schema_ptr s, reader_permit permit);
class queue_reader_v2;
/// Calls to different methods cannot overlap!
/// The handle can be used only while the reader is still alive. Once
/// `push_end_of_stream()` is called, the reader and the handle can be destroyed
/// in any order. The reader can be destroyed at any time.
class queue_reader_handle_v2 {
friend std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> make_queue_reader_v2(schema_ptr, reader_permit);
friend class queue_reader_v2;
private:
queue_reader_v2* _reader = nullptr;
std::exception_ptr _ex;
private:
explicit queue_reader_handle_v2(queue_reader_v2& reader) noexcept;
void abandon() noexcept;
public:
queue_reader_handle_v2(queue_reader_handle_v2&& o) noexcept;
~queue_reader_handle_v2();
queue_reader_handle_v2& operator=(queue_reader_handle_v2&& o);
future<> push(mutation_fragment_v2 mf);
/// Terminate the queue.
///
/// The reader will be set to EOS. The handle cannot be used anymore.
void push_end_of_stream();
/// Aborts the queue.
///
/// All future operations on the handle or the reader will raise `ep`.
void abort(std::exception_ptr ep);
/// Checks if the queue is already terminated with either a success or failure (abort)
bool is_terminated() const;
/// Get the stored exception, if any
std::exception_ptr get_exception() const noexcept;
};
std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> make_queue_reader_v2(schema_ptr s, reader_permit permit);
/// Creates a compacting reader.
///
/// The compaction is done with a \ref mutation_compactor, using compaction-type

View File

@@ -63,4 +63,44 @@ future<> bucket_writer::close() noexcept {
return std::move(_consume_fut);
}
bucket_writer_v2::bucket_writer_v2(schema_ptr schema, std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> queue_reader, reader_consumer_v2& consumer)
: _schema(schema)
, _handle(std::move(queue_reader.second))
, _consume_fut(consumer(std::move(queue_reader.first)))
{ }
bucket_writer_v2::bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer)
: bucket_writer_v2(schema, make_queue_reader_v2(schema, std::move(permit)), consumer)
{ }
future<> bucket_writer_v2::consume(mutation_fragment_v2 mf) {
if (_handle.is_terminated()) {
// When the handle is terminated, it was aborted
// or associated reader was closed prematurely.
// In this case return _consume_fut that will propagate
// the root-cause error.
auto ex = _handle.get_exception();
if (!ex) {
// shouldn't really happen
ex = make_exception_ptr(std::runtime_error("queue_reader_handle_v2 is terminated"));
}
return std::exchange(_consume_fut, make_exception_future<>(ex)).then([ex = std::move(ex)] () mutable {
return make_exception_future<>(std::move(ex));
});
}
return _handle.push(std::move(mf));
}
void bucket_writer_v2::consume_end_of_stream() {
_handle.push_end_of_stream();
}
void bucket_writer_v2::abort(std::exception_ptr ep) noexcept {
_handle.abort(std::move(ep));
}
future<> bucket_writer_v2::close() noexcept {
return std::move(_consume_fut);
}
} // mutation_writer

View File

@@ -90,4 +90,67 @@ future<> feed_writer(flat_mutation_reader&& rd_ref, Writer wr) {
}
}
class bucket_writer_v2 {
schema_ptr _schema;
queue_reader_handle_v2 _handle;
future<> _consume_fut;
private:
bucket_writer_v2(schema_ptr schema, std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> queue_reader_v2, reader_consumer_v2& consumer);
public:
bucket_writer_v2(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer);
future<> consume(mutation_fragment_v2 mf);
void consume_end_of_stream();
void abort(std::exception_ptr ep) noexcept;
future<> close() noexcept;
};
template <typename Writer>
requires MutationFragmentConsumerV2<Writer, future<>>
future<> feed_writer(flat_mutation_reader_v2&& rd_ref, Writer wr) {
// Only move in reader if stack was successfully allocated, so caller can close reader otherwise.
auto rd = std::move(rd_ref);
std::exception_ptr ex;
try {
while (!rd.is_end_of_stream() || !rd.is_buffer_empty()) {
co_await rd.fill_buffer();
while (!rd.is_buffer_empty()) {
co_await rd.pop_mutation_fragment().consume(wr);
}
}
} catch (...) {
ex = std::current_exception();
}
co_await rd.close();
try {
if (ex) {
wr.abort(ex);
} else {
wr.consume_end_of_stream();
}
} catch (...) {
if (!ex) {
ex = std::current_exception();
}
}
try {
co_await wr.close();
} catch (...) {
if (!ex) {
ex = std::current_exception();
}
}
if (ex) {
std::rethrow_exception(ex);
}
}
}

View File

@@ -31,21 +31,21 @@
namespace mutation_writer {
class shard_based_splitting_mutation_writer {
using shard_writer = bucket_writer;
using shard_writer = bucket_writer_v2;
private:
schema_ptr _schema;
reader_permit _permit;
reader_consumer _consumer;
reader_consumer_v2 _consumer;
unsigned _current_shard;
std::vector<std::optional<shard_writer>> _shards;
future<> write_to_shard(mutation_fragment&& mf) {
future<> write_to_shard(mutation_fragment_v2&& mf) {
auto& writer = *_shards[_current_shard];
return writer.consume(std::move(mf));
}
public:
shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer)
shard_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2 consumer)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _consumer(std::move(consumer))
@@ -57,23 +57,23 @@ public:
if (!_shards[_current_shard]) {
_shards[_current_shard] = shard_writer(_schema, _permit, _consumer);
}
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(ps)));
return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(ps)));
}
future<> consume(static_row&& sr) {
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(sr)));
return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(sr)));
}
future<> consume(clustering_row&& cr) {
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(cr)));
return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(cr)));
}
future<> consume(range_tombstone&& rt) {
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(rt)));
future<> consume(range_tombstone_change&& rt) {
return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(rt)));
}
future<> consume(partition_end&& pe) {
return write_to_shard(mutation_fragment(*_schema, _permit, std::move(pe)));
return write_to_shard(mutation_fragment_v2(*_schema, _permit, std::move(pe)));
}
void consume_end_of_stream() {
@@ -97,7 +97,7 @@ public:
}
};
future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) {
future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer) {
auto schema = producer.schema();
auto permit = producer.permit();
return feed_writer(

View File

@@ -31,6 +31,6 @@ namespace mutation_writer {
// manner. This is useful, for instance, in the resharding process where a user changes
// the amount of CPU assigned to Scylla and we have to rewrite the SSTables to their new
// owners.
future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer);
future<> segregate_by_shard(flat_mutation_reader_v2 producer, reader_consumer_v2 consumer);
} // namespace mutation_writer

View File

@@ -109,12 +109,12 @@ small_flat_map<Key, Value, Size>::find(const key_type& k) {
class timestamp_based_splitting_mutation_writer {
using bucket_id = int64_t;
class timestamp_bucket_writer : public bucket_writer {
class timestamp_bucket_writer : public bucket_writer_v2 {
bool _has_current_partition = false;
public:
timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer& consumer)
: bucket_writer(schema, std::move(permit), consumer) {
timestamp_bucket_writer(schema_ptr schema, reader_permit permit, reader_consumer_v2& consumer)
: bucket_writer_v2(schema, std::move(permit), consumer) {
}
void set_has_current_partition() {
_has_current_partition = true;
@@ -131,13 +131,14 @@ private:
schema_ptr _schema;
reader_permit _permit;
classify_by_timestamp _classifier;
reader_consumer _consumer;
reader_consumer_v2 _consumer;
partition_start _current_partition_start;
std::unordered_map<bucket_id, timestamp_bucket_writer> _buckets;
std::vector<bucket_id> _buckets_used_for_current_partition;
std::optional<bucket_id> _last_rtc_bucket;
private:
future<> write_to_bucket(bucket_id bucket, mutation_fragment&& mf);
future<> write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf);
std::optional<bucket_id> examine_column(const atomic_cell_or_collection& c, const column_definition& cdef);
std::optional<bucket_id> examine_row(const row& r, column_kind kind);
@@ -150,7 +151,7 @@ private:
future<> write_marker_and_tombstone(const clustering_row& cr);
public:
timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer consumer)
timestamp_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, classify_by_timestamp classifier, reader_consumer_v2 consumer)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _classifier(std::move(classifier))
@@ -161,7 +162,7 @@ public:
future<> consume(partition_start&& ps);
future<> consume(static_row&& sr);
future<> consume(clustering_row&& cr);
future<> consume(range_tombstone&& rt);
future<> consume(range_tombstone_change&& rt);
future<> consume(partition_end&& pe);
void consume_end_of_stream() {
@@ -181,7 +182,7 @@ public:
}
};
future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) {
future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment_v2&& mf) {
auto it = _buckets.try_emplace(bucket, _schema, _permit, _consumer).first;
auto& writer = it->second;
@@ -199,7 +200,7 @@ future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bu
});
}
return writer.consume(mutation_fragment(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable {
return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_start(_current_partition_start))).then([this, bucket = it->first, &writer, mf = std::move(mf)] () mutable {
writer.set_has_current_partition();
_buckets_used_for_current_partition.push_back(bucket);
return writer.consume(std::move(mf));
@@ -384,28 +385,29 @@ future<> timestamp_based_splitting_mutation_writer::write_marker_and_tombstone(c
}
if (marker_bucket_id == tomb_bucket_id) {
return write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {})));
return write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), cr.marker(), {})));
}
auto write_marker_fut = make_ready_future<>();
if (marker_bucket_id) {
write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {})));
write_marker_fut = write_to_bucket(*marker_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), {}, cr.marker(), {})));
}
auto write_tomb_fut = make_ready_future<>();
if (tomb_bucket_id) {
write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {})));
write_tomb_fut = write_to_bucket(*tomb_bucket_id, mutation_fragment_v2(*_schema, _permit, clustering_row(cr.key(), cr.tomb(), {}, {})));
}
return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut)).discard_result();
}
future<> timestamp_based_splitting_mutation_writer::consume(partition_start&& ps) {
_current_partition_start = std::move(ps);
_last_rtc_bucket.reset();
if (auto& tomb = _current_partition_start.partition_tombstone()) {
auto bucket = _classifier(tomb.timestamp);
auto ps = partition_start(_current_partition_start);
tomb = {};
return write_to_bucket(bucket, mutation_fragment(mutation_fragment(*_schema, _permit, std::move(ps))));
return write_to_bucket(bucket, mutation_fragment_v2(mutation_fragment_v2(*_schema, _permit, std::move(ps))));
}
return make_ready_future<>();
}
@@ -416,11 +418,11 @@ future<> timestamp_based_splitting_mutation_writer::consume(static_row&& sr) {
}
if (const auto bucket = examine_static_row(sr)) {
return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(sr)));
return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(sr)));
}
return parallel_for_each(split_static_row(std::move(sr)), [this] (std::pair<bucket_id, static_row>& sr_piece) {
return write_to_bucket(sr_piece.first, mutation_fragment(*_schema, _permit, static_row(std::move(sr_piece.second))));
return write_to_bucket(sr_piece.first, mutation_fragment_v2(*_schema, _permit, static_row(std::move(sr_piece.second))));
});
}
@@ -430,23 +432,27 @@ future<> timestamp_based_splitting_mutation_writer::consume(clustering_row&& cr)
}
if (const auto bucket = examine_clustering_row(cr)) {
return write_to_bucket(*bucket, mutation_fragment(*_schema, _permit, std::move(cr)));
return write_to_bucket(*bucket, mutation_fragment_v2(*_schema, _permit, std::move(cr)));
}
return parallel_for_each(split_clustering_row(std::move(cr)), [this] (std::pair<bucket_id, clustering_row>& cr_piece) {
return write_to_bucket(cr_piece.first, mutation_fragment(*_schema, _permit, std::move(cr_piece.second)));
return write_to_bucket(cr_piece.first, mutation_fragment_v2(*_schema, _permit, std::move(cr_piece.second)));
});
}
future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone&& rt) {
auto timestamp = _classifier(rt.tomb.timestamp);
return write_to_bucket(timestamp, mutation_fragment(*_schema, _permit, std::move(rt)));
future<> timestamp_based_splitting_mutation_writer::consume(range_tombstone_change&& rtc) {
auto id = _classifier(rtc.tombstone().timestamp);
if (_last_rtc_bucket && *_last_rtc_bucket != id) {
co_await write_to_bucket(*_last_rtc_bucket, mutation_fragment_v2(*_schema, _permit, range_tombstone_change(rtc.position(), {})));
}
_last_rtc_bucket = id;
co_await write_to_bucket(id, mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
}
future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe) {
return parallel_for_each(_buckets_used_for_current_partition, [this, pe = std::move(pe)] (bucket_id bucket) {
auto& writer = _buckets.at(bucket);
return writer.consume(mutation_fragment(*_schema, _permit, partition_end(pe))).then([&writer] {
return writer.consume(mutation_fragment_v2(*_schema, _permit, partition_end(pe))).then([&writer] {
writer.clear_has_current_partition();
});
}).then([this] {
@@ -454,7 +460,7 @@ future<> timestamp_based_splitting_mutation_writer::consume(partition_end&& pe)
});
}
future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer) {
future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer) {
//FIXME: make this into a consume() variant?
auto schema = producer.schema();
auto permit = producer.permit();

View File

@@ -28,6 +28,6 @@
namespace mutation_writer {
using classify_by_timestamp = noncopyable_function<int64_t(api::timestamp_type)>;
future<> segregate_by_timestamp(flat_mutation_reader producer, classify_by_timestamp classifier, reader_consumer consumer);
future<> segregate_by_timestamp(flat_mutation_reader_v2 producer, classify_by_timestamp classifier, reader_consumer_v2 consumer);
} // namespace mutation_writer

View File

@@ -216,13 +216,13 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl)
}
struct querier_utils {
static flat_mutation_reader get_reader(querier_base& q) noexcept {
return std::move(std::get<flat_mutation_reader>(q._reader));
static flat_mutation_reader_v2 get_reader(querier_base& q) noexcept {
return std::move(std::get<flat_mutation_reader_v2>(q._reader));
}
static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept {
return std::move(std::get<reader_concurrency_semaphore::inactive_read_handle>(q._reader));
}
static void set_reader(querier_base& q, flat_mutation_reader r) noexcept {
static void set_reader(querier_base& q, flat_mutation_reader_v2 r) noexcept {
q._reader = std::move(r);
}
static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept {
@@ -255,7 +255,7 @@ void querier_cache::insert_querier(
auto& sem = q.permit().semaphore();
auto irh = sem.register_inactive_read(upgrade_to_v2(querier_utils::get_reader(q)));
auto irh = sem.register_inactive_read(querier_utils::get_reader(q));
if (!irh) {
++stats.resource_based_evictions;
return;
@@ -340,7 +340,7 @@ std::optional<Querier> querier_cache::lookup_querier(
throw std::runtime_error("lookup_querier(): found querier that is evicted");
}
reader_opt->set_timeout(timeout);
querier_utils::set_reader(q, downgrade_to_v1(std::move(*reader_opt)));
querier_utils::set_reader(q, std::move(*reader_opt));
--stats.population;
const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice);
@@ -393,7 +393,7 @@ std::optional<shard_mutation_querier> querier_cache::lookup_shard_mutation_queri
future<> querier_base::close() noexcept {
struct variant_closer {
querier_base& q;
future<> operator()(flat_mutation_reader& reader) {
future<> operator()(flat_mutation_reader_v2& reader) {
return reader.close();
}
future<> operator()(reader_concurrency_semaphore::inactive_read_handle& irh) {

View File

@@ -86,11 +86,44 @@ auto consume_page(flat_mutation_reader& reader,
gc_clock::time_point query_time) {
return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] (
mutation_fragment* next_fragment) mutable {
const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, consumer);
const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer);
auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
auto reader_consumer = make_stable_flattened_mutations_consumer<compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>>(
auto reader_consumer = compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>(
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable {
static_assert(sizeof...(results) <= 1);
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
});
});
}
/// Consume a page worth of data from the reader.
///
/// Uses `compaction_state` for compacting the fragments and `consumer` for
/// building the results.
/// Returns a future containing a tuple with the last consumed clustering key,
/// or std::nullopt if the last row wasn't a clustering row, and whatever the
/// consumer's `consume_end_of_stream()` method returns.
template <emit_only_live_rows OnlyLive, typename Consumer>
requires CompactedFragmentsConsumer<Consumer>
auto consume_page(flat_mutation_reader_v2& reader,
lw_shared_ptr<compact_for_query_state<OnlyLive>> compaction_state,
const query::partition_slice& slice,
Consumer&& consumer,
uint64_t row_limit,
uint32_t partition_limit,
gc_clock::time_point query_time) {
return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] (
mutation_fragment_v2* next_fragment) mutable {
const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer);
auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
auto reader_consumer = compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>(
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
@@ -114,12 +147,12 @@ protected:
reader_permit _permit;
lw_shared_ptr<const dht::partition_range> _range;
std::unique_ptr<const query::partition_slice> _slice;
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
std::variant<flat_mutation_reader_v2, reader_concurrency_semaphore::inactive_read_handle> _reader;
dht::partition_ranges_view _query_ranges;
public:
querier_base(reader_permit permit, lw_shared_ptr<const dht::partition_range> range,
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader_v2 reader, dht::partition_ranges_view query_ranges)
: _schema(reader.schema())
, _permit(std::move(permit))
, _range(std::move(range))
@@ -134,7 +167,7 @@ public:
, _permit(std::move(permit))
, _range(make_lw_shared<const dht::partition_range>(std::move(range)))
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _reader(ms.make_reader_v2(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _query_ranges(*_range)
{ }
@@ -217,7 +250,7 @@ public:
uint32_t partition_limit,
gc_clock::time_point query_time,
tracing::trace_state_ptr trace_ptr = {}) {
return ::query::consume_page(std::get<flat_mutation_reader>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
return ::query::consume_page(std::get<flat_mutation_reader_v2>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) {
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
const auto& cstats = _compaction_state->stats();
@@ -274,7 +307,7 @@ private:
reader_permit permit,
dht::decorated_key nominal_pkey,
std::optional<clustering_key_prefix> nominal_ckey)
: querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges)
: querier_base(permit, std::move(reader_range), std::move(reader_slice), upgrade_to_v2(std::move(reader)), *query_ranges)
, _query_ranges(std::move(query_ranges))
, _nominal_pkey(std::move(nominal_pkey))
, _nominal_ckey(std::move(nominal_ckey)) {
@@ -307,7 +340,7 @@ public:
}
flat_mutation_reader reader() && {
return std::move(std::get<flat_mutation_reader>(_reader));
return downgrade_to_v1(std::move(std::get<flat_mutation_reader_v2>(_reader)));
}
};

View File

@@ -53,6 +53,14 @@ private:
return _prev_rt && _prev_rt->tombstone();
}
public:
tombstone get_current_tombstone() const {
return _prev_rt ? _prev_rt->tombstone() : tombstone();
}
std::optional<range_tombstone_change> get_range_tombstone_change() && {
return std::move(_prev_rt);
}
void reset() {
_prev_rt = std::nullopt;
}

View File

@@ -673,6 +673,14 @@ public:
// Mutations returned by the reader will all have given schema.
// If I/O needs to be issued to read anything in the specified range, the operations
// will be scheduled under the priority class given by pc.
flat_mutation_reader_v2 make_reader_v2(schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
flat_mutation_reader make_reader(schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,

View File

@@ -143,8 +143,8 @@ table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& pa
});
}
flat_mutation_reader
table::make_reader(schema_ptr s,
flat_mutation_reader_v2
table::make_reader_v2(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -153,7 +153,7 @@ table::make_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
if (_virtual_reader) [[unlikely]] {
return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr);
return (*_virtual_reader).make_reader_v2(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr);
}
std::vector<flat_mutation_reader_v2> readers;
@@ -205,14 +205,26 @@ table::make_reader(schema_ptr s,
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
auto comb_reader = downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
auto comb_reader = make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
if (_config.data_listeners && !_config.data_listeners->empty()) {
return _config.data_listeners->on_read(s, range, slice, std::move(comb_reader));
return upgrade_to_v2(_config.data_listeners->on_read(s, range, slice, downgrade_to_v1(std::move(comb_reader))));
} else {
return comb_reader;
}
}
flat_mutation_reader
table::make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
return downgrade_to_v1(make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<sstring> subdir) {
sstring dir = _config.datadir;
if (subdir) {
@@ -630,7 +642,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
metadata.max_timestamp = old->get_max_timestamp();
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count());
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader reader) mutable -> future<> {
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs, metadata, estimated_partitions] (flat_mutation_reader_v2 reader) mutable -> future<> {
auto&& priority = service::get_local_memtable_flush_priority();
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer("memtable");
cfg.backup = incremental_backups_enabled();
@@ -642,7 +654,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_strategy,
old->get_max_timestamp());
co_return co_await write_memtable_to_sstable(std::move(reader), *old, newtab, estimated_partitions, monitor, cfg, priority);
co_return co_await write_memtable_to_sstable(downgrade_to_v1(std::move(reader)), *old, newtab, estimated_partitions, monitor, cfg, priority);
});
flat_mutation_reader reader = old->make_flush_reader(
@@ -664,7 +676,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
co_return stop_iteration::yes;
}
auto f = consumer(std::move(reader));
auto f = consumer(upgrade_to_v2(std::move(reader)));
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
@@ -2163,7 +2175,7 @@ table::as_mutation_source() const {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return this->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
return this->make_reader_v2(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
}

View File

@@ -47,7 +47,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer end_consumer) mutable {
auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable {
// postpone data segregation to off-strategy compaction if enabled
if (offstrategy) {
return end_consumer;
@@ -56,7 +56,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
};
auto consumer = make_interposer_consumer(metadata,
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader reader) {
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader_v2 reader) {
sstables::shared_sstable sst;
try {
sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
@@ -68,7 +68,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
return sst->write_components(downgrade_to_v1(std::move(reader)), adjusted_estimated_partitions, s,
cf->get_sstables_manager().configure_writer(origin),
encoding_stats{}, pc).then([sst] {
return sst->open_data();
@@ -86,7 +86,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
return vug.local().register_staging_sstable(sst, std::move(cf));
});
});
co_return co_await consumer(std::move(reader));
co_return co_await consumer(upgrade_to_v2(std::move(reader)));
} catch (...) {
ex = std::current_exception();
}

View File

@@ -2951,7 +2951,7 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p
return api::max_timestamp;
};
auto gc_grace_seconds = schema.gc_grace_seconds();
auto consumer = make_stable_flattened_mutations_consumer<compact_for_compaction<survived_compacted_fragments_consumer, purged_compacted_fragments_consumer>>(
auto consumer = compact_for_compaction<survived_compacted_fragments_consumer, purged_compacted_fragments_consumer>(
schema,
query_time,
get_max_purgeable,

View File

@@ -191,6 +191,8 @@ class test_bucket_writer {
mutation_opt _current_mutation;
bool _is_first_mutation = true;
range_tombstone_change _current_rtc;
size_t _throw_after;
size_t _mutation_consumed = 0;
@@ -246,8 +248,10 @@ private:
}
verify_row_bucket_id(cr.cells(), column_kind::regular_column);
}
void verify_range_tombstone(const range_tombstone& rt) {
check_timestamp(rt.tomb.timestamp);
void verify_range_tombstone_change(const range_tombstone_change& rtc) {
if (rtc.tombstone()) {
check_timestamp(rtc.tombstone().timestamp);
}
}
void maybe_throw() {
@@ -263,6 +267,7 @@ public:
, _permit(std::move(permit))
, _classify(std::move(classify))
, _buckets(buckets)
, _current_rtc(position_in_partition::before_all_clustered_rows(), tombstone())
, _throw_after(throw_after)
{ }
void consume_new_partition(const dht::decorated_key& dk) {
@@ -290,17 +295,22 @@ public:
_current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(cr)));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
stop_iteration consume(range_tombstone_change&& rtc) {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
verify_range_tombstone(rt);
_current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt)));
verify_range_tombstone_change(rtc);
if (_current_rtc.tombstone()) {
auto rt = range_tombstone(_current_rtc.position(), position_in_partition_view::before_key(rtc.position()), _current_rtc.tombstone());
_current_mutation->apply(mutation_fragment(*_schema, _permit, std::move(rt)));
}
_current_rtc = std::move(rtc);
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
maybe_throw();
BOOST_REQUIRE(_current_mutation);
BOOST_REQUIRE(_bucket_id);
BOOST_REQUIRE(!_current_rtc.tombstone());
auto& bucket = _buckets[*_bucket_id];
if (_is_first_mutation) {
@@ -352,13 +362,13 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) {
std::unordered_map<int64_t, std::vector<mutation>> buckets;
auto consumer = [&] (flat_mutation_reader bucket_reader) {
return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) {
auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) {
return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) {
return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets));
});
};
segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get();
segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get();
testlog.debug("Data split into {} buckets: {}", buckets.size(), boost::copy_range<std::vector<int64_t>>(buckets | boost::adaptors::map_keys));
@@ -422,14 +432,14 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) {
int throw_after = tests::random::get_int(muts.size() - 1);
testlog.info("Will raise exception after {}/{} mutations", throw_after, muts.size());
auto consumer = [&] (flat_mutation_reader bucket_reader) {
return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader& rd) {
auto consumer = [&] (flat_mutation_reader_v2 bucket_reader) {
return with_closeable(std::move(bucket_reader), [&] (flat_mutation_reader_v2& rd) {
return rd.consume(test_bucket_writer(random_schema.schema(), rd.permit(), classify_fn, buckets, throw_after));
});
};
try {
segregate_by_timestamp(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get();
segregate_by_timestamp(make_flat_mutation_reader_from_mutations_v2(random_schema.schema(), semaphore.make_permit(), muts), classify_fn, std::move(consumer)).get();
} catch (const test_bucket_writer::expected_exception&) {
BOOST_TEST_PASSPOINT();
} catch (const seastar::broken_promise&) {

View File

@@ -3372,7 +3372,7 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
gc_before = gc_now - s->gc_grace_seconds();
auto gc_grace_seconds = s->gc_grace_seconds();
auto cfc = make_stable_flattened_mutations_consumer<compact_for_compaction<compacting_sstable_writer_test, compacting_sstable_writer_test>>(
auto cfc = compact_for_compaction<compacting_sstable_writer_test, compacting_sstable_writer_test>(
*s, gc_now, max_purgeable_func, std::move(cr), std::move(purged_cr));
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options());