diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index eff56dc93c..b6edb454b7 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -23,7 +23,7 @@ #include "db/system_keyspace.hh" #include "db/timeout_clock.hh" #include "dht/i_partitioner.hh" -#include "flat_mutation_reader.hh" +#include "flat_mutation_reader_v2.hh" #include "mutation_fragment.hh" #include "mutation_reader.hh" #include "query-request.hh" @@ -52,14 +52,14 @@ namespace db::view { class build_progress_virtual_reader { replica::database& _db; - struct build_progress_reader : flat_mutation_reader::impl { + struct build_progress_reader : flat_mutation_reader_v2::impl { column_id _scylla_next_token_col; column_id _scylla_generation_number_col; column_id _legacy_last_token_col; column_id _legacy_generation_number_col; const query::partition_slice& _legacy_slice; query::partition_slice _slice; - flat_mutation_reader _underlying; + flat_mutation_reader_v2 _underlying; std::optional _previous_clustering_key; build_progress_reader( @@ -72,14 +72,14 @@ class build_progress_virtual_reader { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) - : flat_mutation_reader::impl(std::move(legacy_schema), permit) + : flat_mutation_reader_v2::impl(std::move(legacy_schema), permit) , _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id) , _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id) , _legacy_last_token_col(_schema->get_column_definition("last_token")->id) , _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id) , _legacy_slice(slice) , _slice(adjust_partition_slice()) - , _underlying(scylla_views_build_progress.make_reader( + , _underlying(scylla_views_build_progress.make_reader_v2( scylla_views_build_progress.schema(), std::move(permit), range, @@ -120,6 +120,16 @@ class build_progress_virtual_reader { return clustering_key_prefix::from_exploded(r); } + position_in_partition adjust_ckey(position_in_partition&& underlying_pip) { + auto underlying_ck_opt = underlying_pip.get_clustering_key_prefix(); + if (!underlying_ck_opt || !underlying_ck_opt->is_full(underlying_schema())) { + return std::move(underlying_pip); + } + return position_in_partition(underlying_pip.region(), + underlying_pip.get_bound_weight(), + adjust_ckey(*underlying_ck_opt)); + } + virtual future<> fill_buffer() override { return _underlying.fill_buffer().then([this] { _end_of_stream = _underlying.is_end_of_stream(); @@ -141,19 +151,17 @@ class build_progress_virtual_reader { continue; } _previous_clustering_key = ck; - mf = mutation_fragment(*_schema, _permit, clustering_row( + mf = mutation_fragment_v2(*_schema, _permit, clustering_row( std::move(ck), scylla_in_progress_row.tomb(), std::move(scylla_in_progress_row.marker()), std::move(legacy_in_progress_row))); - } else if (mf.is_range_tombstone()) { - auto scylla_in_progress_rt = std::move(mf).as_range_tombstone(); - mf = mutation_fragment(*_schema, _permit, range_tombstone( - adjust_ckey(scylla_in_progress_rt.start), - scylla_in_progress_rt.start_kind, - adjust_ckey(scylla_in_progress_rt.end), - scylla_in_progress_rt.end_kind, - scylla_in_progress_rt.tomb)); + } else if (mf.is_range_tombstone_change()) { + auto scylla_in_progress_rtc = std::move(mf).as_range_tombstone_change(); + auto ts = scylla_in_progress_rtc.tombstone(); + auto pos = std::move(scylla_in_progress_rtc).position(); + mf = mutation_fragment_v2(*_schema, _permit, + range_tombstone_change(adjust_ckey(std::move(pos)), ts)); } else if (mf.is_end_of_partition()) { _previous_clustering_key.reset(); } @@ -193,7 +201,7 @@ public: : _db(db) { } - flat_mutation_reader operator()( + flat_mutation_reader_v2 operator()( schema_ptr s, reader_permit permit, const dht::partition_range& range, @@ -202,7 +210,7 @@ public: tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return flat_mutation_reader(std::make_unique( + return flat_mutation_reader_v2(std::make_unique( std::move(s), std::move(permit), _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), diff --git a/mutation_fragment_v2.hh b/mutation_fragment_v2.hh index d601b6f3ca..4d5175c981 100644 --- a/mutation_fragment_v2.hh +++ b/mutation_fragment_v2.hh @@ -53,9 +53,12 @@ public: : _pos(pos) , _tomb(tomb) { } - const position_in_partition& position() const { + const position_in_partition& position() const & { return _pos; } + position_in_partition position() && { + return std::move(_pos); + } void set_position(position_in_partition pos) { _pos = std::move(pos); }