diff --git a/db/view/view.cc b/db/view/view.cc index 777de28cf0..79f44957de 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1345,31 +1345,31 @@ bool view_updates::generate_partition_tombstone_update( } future<> view_update_builder::close() noexcept { - return when_all_succeed(_updates.close(), _existings->close()).discard_result(); + return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result(); } future view_update_builder::advance_all() { - auto existings_f = _existings ? (*_existings)() : make_ready_future(); - return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { - _update = std::move(std::get<0>(fragments).get()); - _existing = std::move(std::get<1>(fragments).get()); + auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future(); + return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable { + _update_fragment = std::move(std::get<0>(fragments).get()); + _existing_fragment = std::move(std::get<1>(fragments).get()); return stop_iteration::no; }); } future view_update_builder::advance_updates() { - return _updates().then([this] (auto&& update) mutable { - _update = std::move(update); + return _update_reader().then([this] (auto&& update) mutable { + _update_fragment = std::move(update); return stop_iteration::no; }); } future view_update_builder::advance_existings() { - if (!_existings) { + if (!_existing_reader) { return make_ready_future(stop_iteration::no); } - return (*_existings)().then([this] (auto&& existing) mutable { - _existing = std::move(existing); + return (*_existing_reader)().then([this] (auto&& existing) mutable { + _existing_fragment = std::move(existing); return stop_iteration::no; }); } @@ -1380,18 +1380,17 @@ future view_update_builder::stop() const { future>> view_update_builder::build_some() { (void)co_await advance_all(); - if (!_update && !_existing) { + if (!_update_fragment && !_existing_fragment) { // Tell the caller there is no more data to build. co_return std::nullopt; } bool do_advance_updates = false; bool do_advance_existings = false; bool is_partition_tombstone_applied_on_all_views = false; - if (_update && _update->is_partition_start()) { - _key = _update->as_partition_start().key().key(); - _update_partition_tombstone = _update->as_partition_start().partition_tombstone(); + if (_update_fragment && _update_fragment->is_partition_start()) { + _key = _update_fragment->as_partition_start().key().key(); + _update_partition_tombstone = _update_fragment->as_partition_start().partition_tombstone(); do_advance_updates = true; - if (_update_partition_tombstone) { // For views that have the same partition key as base, generate an update of partition tombstone to delete // the entire partition in one operation, instead of generating an update for each row. @@ -1402,8 +1401,8 @@ future>> view_up } } } - if (_existing && _existing->is_partition_start()) { - _existing_partition_tombstone = _existing->as_partition_start().partition_tombstone(); + if (_existing_fragment && _existing_fragment->is_partition_start()) { + _existing_partition_tombstone = _existing_fragment->as_partition_start().partition_tombstone(); do_advance_existings = true; } if (do_advance_updates) { @@ -1411,13 +1410,13 @@ future>> view_up } else if (do_advance_existings) { co_await advance_existings(); } - if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) { + if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) { co_await seastar::sleep(std::chrono::milliseconds(10)); } // If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over // all the rows trying to generate row updates, because the partition tombstones already cover everything. - if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) { + if (is_partition_tombstone_applied_on_all_views && _update_fragment->is_end_of_partition()) { _skip_row_updates = true; } @@ -1497,22 +1496,22 @@ future view_update_builder::on_results() { }); return rows_for_view_updates >= max_rows_for_view_updates; }; - if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) { - auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position()); + if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) { + auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position()); if (cmp < 0) { // We have an update where there was nothing before - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - auto update = std::move(*_update).as_clustering_row(); + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + auto update = std::move(*_update_fragment).as_clustering_row(); update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = tombstone ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) : std::nullopt; generate_update(std::move(update), std::move(existing)); - } else if (_update->is_static_row()) { - auto update = std::move(*_update).as_static_row(); + } else if (_update_fragment->is_static_row()) { + auto update = std::move(*_update_fragment).as_static_row(); auto tombstone = _existing_partition_tombstone; auto existing = tombstone ? std::optional(std::in_place) @@ -1526,10 +1525,10 @@ future view_update_builder::on_results() { // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). // Due to how the read command for existing rows is constructed, it is also possible that there is a static // row is included, even though we didn't modify it. - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = std::move(*_existing).as_clustering_row(); + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = std::move(*_existing_fragment).as_clustering_row(); existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); // The way we build the read command used for existing rows, we should always have a non-empty @@ -1539,8 +1538,8 @@ future view_update_builder::on_results() { auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); } - } else if (_existing->is_static_row()) { - auto existing = std::move(*_existing).as_static_row(); + } else if (_existing_fragment->is_static_row()) { + auto existing = std::move(*_existing_fragment).as_static_row(); auto tombstone = _update_partition_tombstone; // The static row might be unintentionally included when fetching existing clustering rows, // even if the static row was not updated. We can detect it. A static row can be affected either by: @@ -1558,65 +1557,65 @@ future view_update_builder::on_results() { return should_stop_updates() ? stop () : advance_existings(); } // We're updating a row that had pre-existing data - if (_update->is_range_tombstone_change()) { - SCYLLA_ASSERT(_existing->is_range_tombstone_change()); - _existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone(); - _update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - SCYLLA_ASSERT(_existing->is_clustering_row()); - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + if (_update_fragment->is_range_tombstone_change()) { + SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change()); + _existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone(); + _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); + _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); - _existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + _existing_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); }); - generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); - } else if (_update->is_static_row()) { - if (!_existing->is_static_row()) { + generate_update(std::move(*_update_fragment).as_clustering_row(), { std::move(*_existing_fragment).as_clustering_row() }); + } else if (_update_fragment->is_static_row()) { + if (!_existing_fragment->is_static_row()) { on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", - mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing))); + mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment))); } - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone); + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_all(); } auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - if (tombstone && _existing && !_existing->is_end_of_partition()) { - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = clustering_row(*_schema, _existing->as_clustering_row()); + if (tombstone && _existing_fragment && !_existing_fragment->is_end_of_partition()) { + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = clustering_row(*_schema, _existing_fragment->as_clustering_row()); existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); - } else if (_existing->is_static_row()) { - auto existing = static_row(*_schema, _existing->as_static_row()); + } else if (_existing_fragment->is_static_row()) { + auto existing = static_row(*_schema, _existing_fragment->as_static_row()); auto update = static_row(); generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_existings(); } - if (_update && !_update->is_end_of_partition()) { - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + if (_update_fragment && !_update_fragment->is_end_of_partition()) { + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = existing_tombstone - ? std::optional(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) + ? std::optional(std::in_place, _update_fragment->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) : std::nullopt; - generate_update(std::move(*_update).as_clustering_row(), std::move(existing)); - } else if (_update->is_static_row()) { + generate_update(std::move(*_update_fragment).as_clustering_row(), std::move(existing)); + } else if (_update_fragment->is_static_row()) { auto existing_tombstone = _existing_partition_tombstone; auto existing = existing_tombstone ? std::optional(std::in_place) : std::nullopt; - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_updates(); } diff --git a/db/view/view.hh b/db/view/view.hh index 60e08fcf92..702b358e2d 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -203,14 +203,14 @@ class view_update_builder { const replica::table& _base; schema_ptr _schema; // The base schema std::vector _view_updates; - mutation_reader _updates; - mutation_reader_opt _existings; + mutation_reader _update_reader; + mutation_reader_opt _existing_reader; tombstone _update_partition_tombstone; tombstone _update_current_tombstone; tombstone _existing_partition_tombstone; tombstone _existing_current_tombstone; - mutation_fragment_v2_opt _update; - mutation_fragment_v2_opt _existing; + mutation_fragment_v2_opt _update_fragment; + mutation_fragment_v2_opt _existing_fragment; gc_clock::time_point _now; partition_key _key = partition_key::make_empty(); bool _skip_row_updates = false; @@ -225,8 +225,8 @@ public: , _base(base) , _schema(std::move(s)) , _view_updates(std::move(views_to_update)) - , _updates(std::move(updates)) - , _existings(std::move(existings)) + , _update_reader(std::move(updates)) + , _existing_reader(std::move(existings)) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default;