From 985403ab99b097f64c280e201edd541f636b4368 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 13 Jan 2022 17:27:08 +0200 Subject: [PATCH] view: convert build_progress_virtual_reader to flat_mutation_reader_v2 build_progress_virtual_reader is a virtual reader that trims off the last clustering key column from an underlying base table. It is here converted to flat_mutation_reader_v2. Because range_tombstone_change uses position_in_partition, not clustering_key_prefix, we need a new adjust_ckey() overload. Note the transformation is likely incorrect. When trimming the last clustering key column, an inclusive bound changes should change to exclusive. However, the original code did not do this, so we don't fix it here. It's immaterial anyway since the base table doesn't include range tombstones. Test: unit (dev) (which has a test for this reader) Closes #9913 --- db/view/build_progress_virtual_reader.hh | 40 ++++++++++++++---------- mutation_fragment_v2.hh | 5 ++- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index eff56dc93c..b6edb454b7 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -23,7 +23,7 @@ #include "db/system_keyspace.hh" #include "db/timeout_clock.hh" #include "dht/i_partitioner.hh" -#include "flat_mutation_reader.hh" +#include "flat_mutation_reader_v2.hh" #include "mutation_fragment.hh" #include "mutation_reader.hh" #include "query-request.hh" @@ -52,14 +52,14 @@ namespace db::view { class build_progress_virtual_reader { replica::database& _db; - struct build_progress_reader : flat_mutation_reader::impl { + struct build_progress_reader : flat_mutation_reader_v2::impl { column_id _scylla_next_token_col; column_id _scylla_generation_number_col; column_id _legacy_last_token_col; column_id _legacy_generation_number_col; const query::partition_slice& _legacy_slice; query::partition_slice _slice; - flat_mutation_reader _underlying; + flat_mutation_reader_v2 _underlying; std::optional _previous_clustering_key; build_progress_reader( @@ -72,14 +72,14 @@ class build_progress_virtual_reader { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) - : flat_mutation_reader::impl(std::move(legacy_schema), permit) + : flat_mutation_reader_v2::impl(std::move(legacy_schema), permit) , _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id) , _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id) , _legacy_last_token_col(_schema->get_column_definition("last_token")->id) , _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id) , _legacy_slice(slice) , _slice(adjust_partition_slice()) - , _underlying(scylla_views_build_progress.make_reader( + , _underlying(scylla_views_build_progress.make_reader_v2( scylla_views_build_progress.schema(), std::move(permit), range, @@ -120,6 +120,16 @@ class build_progress_virtual_reader { return clustering_key_prefix::from_exploded(r); } + position_in_partition adjust_ckey(position_in_partition&& underlying_pip) { + auto underlying_ck_opt = underlying_pip.get_clustering_key_prefix(); + if (!underlying_ck_opt || !underlying_ck_opt->is_full(underlying_schema())) { + return std::move(underlying_pip); + } + return position_in_partition(underlying_pip.region(), + underlying_pip.get_bound_weight(), + adjust_ckey(*underlying_ck_opt)); + } + virtual future<> fill_buffer() override { return _underlying.fill_buffer().then([this] { _end_of_stream = _underlying.is_end_of_stream(); @@ -141,19 +151,17 @@ class build_progress_virtual_reader { continue; } _previous_clustering_key = ck; - mf = mutation_fragment(*_schema, _permit, clustering_row( + mf = mutation_fragment_v2(*_schema, _permit, clustering_row( std::move(ck), scylla_in_progress_row.tomb(), std::move(scylla_in_progress_row.marker()), std::move(legacy_in_progress_row))); - } else if (mf.is_range_tombstone()) { - auto scylla_in_progress_rt = std::move(mf).as_range_tombstone(); - mf = mutation_fragment(*_schema, _permit, range_tombstone( - adjust_ckey(scylla_in_progress_rt.start), - scylla_in_progress_rt.start_kind, - adjust_ckey(scylla_in_progress_rt.end), - scylla_in_progress_rt.end_kind, - scylla_in_progress_rt.tomb)); + } else if (mf.is_range_tombstone_change()) { + auto scylla_in_progress_rtc = std::move(mf).as_range_tombstone_change(); + auto ts = scylla_in_progress_rtc.tombstone(); + auto pos = std::move(scylla_in_progress_rtc).position(); + mf = mutation_fragment_v2(*_schema, _permit, + range_tombstone_change(adjust_ckey(std::move(pos)), ts)); } else if (mf.is_end_of_partition()) { _previous_clustering_key.reset(); } @@ -193,7 +201,7 @@ public: : _db(db) { } - flat_mutation_reader operator()( + flat_mutation_reader_v2 operator()( schema_ptr s, reader_permit permit, const dht::partition_range& range, @@ -202,7 +210,7 @@ public: tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return flat_mutation_reader(std::make_unique( + return flat_mutation_reader_v2(std::make_unique( std::move(s), std::move(permit), _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), diff --git a/mutation_fragment_v2.hh b/mutation_fragment_v2.hh index d601b6f3ca..4d5175c981 100644 --- a/mutation_fragment_v2.hh +++ b/mutation_fragment_v2.hh @@ -53,9 +53,12 @@ public: : _pos(pos) , _tomb(tomb) { } - const position_in_partition& position() const { + const position_in_partition& position() const & { return _pos; } + position_in_partition position() && { + return std::move(_pos); + } void set_position(position_in_partition pos) { _pos = std::move(pos); }