From a796a58a1ffe56fdde6c8818087a4de091aa860f Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 22 Apr 2026 00:11:33 +0200 Subject: [PATCH 1/7] mv: document single-partition builder scope Add comments to view_update_builder and make_view_update_builder() documenting that one builder instance processes at most one base partition, and that the readers provided should span the same single partition. --- db/view/view.hh | 4 ++++ db/view/view_update_generator.cc | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/db/view/view.hh b/db/view/view.hh index fe797a4022..60e08fcf92 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -231,6 +231,9 @@ public: } view_update_builder(view_update_builder&& other) noexcept = default; + // One builder instance processes at most one base partition, but it may + // need multiple build_some() calls to emit all of that partition's view + // updates. // build_some() works on batches of 100 (max_rows_for_view_updates) // updated rows, but can_skip_view_updates() can decide that some of @@ -255,6 +258,7 @@ private: future stop() const; }; +// The readers provided for the view_update_builder should span the same single partition. view_update_builder make_view_update_builder( data_dictionary::database db, const replica::table& base_table, diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index f77e0cb785..0d31fdb7af 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -370,7 +370,7 @@ static size_t memory_usage_of(const utils::chunked_vector view_update_generator::populate_views(const replica::table& table, From 6edacdea7414f113a5770a827eb3877982fddfab Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 22 Apr 2026 00:11:50 +0200 Subject: [PATCH 2/7] mv: drop redundant std::move from partition key extraction The expression std::move(std::move(_update)->as_partition_start().key().key()) contains two ineffective std::move calls: 1. The inner std::move(_update) has no effect because there is no overload for optimized_optional::operator->() which takes "this" by rvalue reference. 2. The outer std::move is applied to a const partition_key& (decorated_key::key() returns const&), producing a const partition_key&& that still binds to the copy constructor, not the move constructor. Drop both std::move calls to avoid misleading the reader. --- db/view/view.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/view/view.cc b/db/view/view.cc index 860fc6485f..777de28cf0 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1388,7 +1388,7 @@ future>> view_up bool do_advance_existings = false; bool is_partition_tombstone_applied_on_all_views = false; if (_update && _update->is_partition_start()) { - _key = std::move(std::move(_update)->as_partition_start().key().key()); + _key = _update->as_partition_start().key().key(); _update_partition_tombstone = _update->as_partition_start().partition_tombstone(); do_advance_updates = true; From 490b3f5c6f5b4e97ca38ba4e399f2f088a974dcc Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Tue, 14 Apr 2026 00:29:30 +0200 Subject: [PATCH 3/7] mv: rename view_update_builder readers and cached fragments Rename the members of view_update_builder to reflect their roles more precisely: _updates -> _update_reader _existings -> _existing_reader _update -> _update_fragment _existing -> _existing_fragment This makes the code easier to follow by distinguishing the readers (which produce a stream of fragments) from the cached fragments (the most recently read mutation_fragment_v2 from each reader). --- db/view/view.cc | 123 ++++++++++++++++++++++++------------------------ db/view/view.hh | 12 ++--- 2 files changed, 67 insertions(+), 68 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 777de28cf0..79f44957de 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1345,31 +1345,31 @@ bool view_updates::generate_partition_tombstone_update( } future<> view_update_builder::close() noexcept { - return when_all_succeed(_updates.close(), _existings->close()).discard_result(); + return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result(); } future view_update_builder::advance_all() { - auto existings_f = _existings ? (*_existings)() : make_ready_future(); - return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { - _update = std::move(std::get<0>(fragments).get()); - _existing = std::move(std::get<1>(fragments).get()); + auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future(); + return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable { + _update_fragment = std::move(std::get<0>(fragments).get()); + _existing_fragment = std::move(std::get<1>(fragments).get()); return stop_iteration::no; }); } future view_update_builder::advance_updates() { - return _updates().then([this] (auto&& update) mutable { - _update = std::move(update); + return _update_reader().then([this] (auto&& update) mutable { + _update_fragment = std::move(update); return stop_iteration::no; }); } future view_update_builder::advance_existings() { - if (!_existings) { + if (!_existing_reader) { return make_ready_future(stop_iteration::no); } - return (*_existings)().then([this] (auto&& existing) mutable { - _existing = std::move(existing); + return (*_existing_reader)().then([this] (auto&& existing) mutable { + _existing_fragment = std::move(existing); return stop_iteration::no; }); } @@ -1380,18 +1380,17 @@ future view_update_builder::stop() const { future>> view_update_builder::build_some() { (void)co_await advance_all(); - if (!_update && !_existing) { + if (!_update_fragment && !_existing_fragment) { // Tell the caller there is no more data to build. co_return std::nullopt; } bool do_advance_updates = false; bool do_advance_existings = false; bool is_partition_tombstone_applied_on_all_views = false; - if (_update && _update->is_partition_start()) { - _key = _update->as_partition_start().key().key(); - _update_partition_tombstone = _update->as_partition_start().partition_tombstone(); + if (_update_fragment && _update_fragment->is_partition_start()) { + _key = _update_fragment->as_partition_start().key().key(); + _update_partition_tombstone = _update_fragment->as_partition_start().partition_tombstone(); do_advance_updates = true; - if (_update_partition_tombstone) { // For views that have the same partition key as base, generate an update of partition tombstone to delete // the entire partition in one operation, instead of generating an update for each row. @@ -1402,8 +1401,8 @@ future>> view_up } } } - if (_existing && _existing->is_partition_start()) { - _existing_partition_tombstone = _existing->as_partition_start().partition_tombstone(); + if (_existing_fragment && _existing_fragment->is_partition_start()) { + _existing_partition_tombstone = _existing_fragment->as_partition_start().partition_tombstone(); do_advance_existings = true; } if (do_advance_updates) { @@ -1411,13 +1410,13 @@ future>> view_up } else if (do_advance_existings) { co_await advance_existings(); } - if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) { + if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) { co_await seastar::sleep(std::chrono::milliseconds(10)); } // If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over // all the rows trying to generate row updates, because the partition tombstones already cover everything. - if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) { + if (is_partition_tombstone_applied_on_all_views && _update_fragment->is_end_of_partition()) { _skip_row_updates = true; } @@ -1497,22 +1496,22 @@ future view_update_builder::on_results() { }); return rows_for_view_updates >= max_rows_for_view_updates; }; - if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) { - auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position()); + if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) { + auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position()); if (cmp < 0) { // We have an update where there was nothing before - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - auto update = std::move(*_update).as_clustering_row(); + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + auto update = std::move(*_update_fragment).as_clustering_row(); update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = tombstone ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) : std::nullopt; generate_update(std::move(update), std::move(existing)); - } else if (_update->is_static_row()) { - auto update = std::move(*_update).as_static_row(); + } else if (_update_fragment->is_static_row()) { + auto update = std::move(*_update_fragment).as_static_row(); auto tombstone = _existing_partition_tombstone; auto existing = tombstone ? std::optional(std::in_place) @@ -1526,10 +1525,10 @@ future view_update_builder::on_results() { // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). // Due to how the read command for existing rows is constructed, it is also possible that there is a static // row is included, even though we didn't modify it. - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = std::move(*_existing).as_clustering_row(); + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = std::move(*_existing_fragment).as_clustering_row(); existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); // The way we build the read command used for existing rows, we should always have a non-empty @@ -1539,8 +1538,8 @@ future view_update_builder::on_results() { auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); } - } else if (_existing->is_static_row()) { - auto existing = std::move(*_existing).as_static_row(); + } else if (_existing_fragment->is_static_row()) { + auto existing = std::move(*_existing_fragment).as_static_row(); auto tombstone = _update_partition_tombstone; // The static row might be unintentionally included when fetching existing clustering rows, // even if the static row was not updated. We can detect it. A static row can be affected either by: @@ -1558,65 +1557,65 @@ future view_update_builder::on_results() { return should_stop_updates() ? stop () : advance_existings(); } // We're updating a row that had pre-existing data - if (_update->is_range_tombstone_change()) { - SCYLLA_ASSERT(_existing->is_range_tombstone_change()); - _existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone(); - _update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - SCYLLA_ASSERT(_existing->is_clustering_row()); - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + if (_update_fragment->is_range_tombstone_change()) { + SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change()); + _existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone(); + _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); + _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); - _existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + _existing_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); }); - generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); - } else if (_update->is_static_row()) { - if (!_existing->is_static_row()) { + generate_update(std::move(*_update_fragment).as_clustering_row(), { std::move(*_existing_fragment).as_clustering_row() }); + } else if (_update_fragment->is_static_row()) { + if (!_existing_fragment->is_static_row()) { on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", - mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing))); + mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment))); } - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone); + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_all(); } auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - if (tombstone && _existing && !_existing->is_end_of_partition()) { - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = clustering_row(*_schema, _existing->as_clustering_row()); + if (tombstone && _existing_fragment && !_existing_fragment->is_end_of_partition()) { + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = clustering_row(*_schema, _existing_fragment->as_clustering_row()); existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); - } else if (_existing->is_static_row()) { - auto existing = static_row(*_schema, _existing->as_static_row()); + } else if (_existing_fragment->is_static_row()) { + auto existing = static_row(*_schema, _existing_fragment->as_static_row()); auto update = static_row(); generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_existings(); } - if (_update && !_update->is_end_of_partition()) { - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { + if (_update_fragment && !_update_fragment->is_end_of_partition()) { + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = existing_tombstone - ? std::optional(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) + ? std::optional(std::in_place, _update_fragment->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) : std::nullopt; - generate_update(std::move(*_update).as_clustering_row(), std::move(existing)); - } else if (_update->is_static_row()) { + generate_update(std::move(*_update_fragment).as_clustering_row(), std::move(existing)); + } else if (_update_fragment->is_static_row()) { auto existing_tombstone = _existing_partition_tombstone; auto existing = existing_tombstone ? std::optional(std::in_place) : std::nullopt; - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } return should_stop_updates() ? stop() : advance_updates(); } diff --git a/db/view/view.hh b/db/view/view.hh index 60e08fcf92..702b358e2d 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -203,14 +203,14 @@ class view_update_builder { const replica::table& _base; schema_ptr _schema; // The base schema std::vector _view_updates; - mutation_reader _updates; - mutation_reader_opt _existings; + mutation_reader _update_reader; + mutation_reader_opt _existing_reader; tombstone _update_partition_tombstone; tombstone _update_current_tombstone; tombstone _existing_partition_tombstone; tombstone _existing_current_tombstone; - mutation_fragment_v2_opt _update; - mutation_fragment_v2_opt _existing; + mutation_fragment_v2_opt _update_fragment; + mutation_fragment_v2_opt _existing_fragment; gc_clock::time_point _now; partition_key _key = partition_key::make_empty(); bool _skip_row_updates = false; @@ -225,8 +225,8 @@ public: , _base(base) , _schema(std::move(s)) , _view_updates(std::move(views_to_update)) - , _updates(std::move(updates)) - , _existings(std::move(existings)) + , _update_reader(std::move(updates)) + , _existing_reader(std::move(existings)) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default; From 7727a3708531a6290950476374e5a3be899a75b0 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Tue, 14 Apr 2026 03:12:21 +0200 Subject: [PATCH 4/7] mv: rename methods in view_update_builder for clarity Rename advance_all(), advance_updates() and advance_existings() to read_both_next_fragments(), read_next_update_fragment() and read_next_existing_fragment(), respectively. The new names make it clear that these methods read the next mutation fragment from the corresponding reader into the cached fragment member. Also rename on_results() to generate_updates(), which better describes its role of generating view updates from the previously read fragments. --- db/view/view.cc | 26 +++++++++++++------------- db/view/view.hh | 10 ++++++---- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 79f44957de..8500c18f15 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1348,7 +1348,7 @@ future<> view_update_builder::close() noexcept { return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result(); } -future view_update_builder::advance_all() { +future view_update_builder::read_both_next_fragments() { auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future(); return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable { _update_fragment = std::move(std::get<0>(fragments).get()); @@ -1357,14 +1357,14 @@ future view_update_builder::advance_all() { }); } -future view_update_builder::advance_updates() { +future view_update_builder::read_next_update_fragment() { return _update_reader().then([this] (auto&& update) mutable { _update_fragment = std::move(update); return stop_iteration::no; }); } -future view_update_builder::advance_existings() { +future view_update_builder::read_next_existing_fragment() { if (!_existing_reader) { return make_ready_future(stop_iteration::no); } @@ -1379,7 +1379,7 @@ future view_update_builder::stop() const { } future>> view_update_builder::build_some() { - (void)co_await advance_all(); + (void)co_await read_both_next_fragments(); if (!_update_fragment && !_existing_fragment) { // Tell the caller there is no more data to build. co_return std::nullopt; @@ -1406,9 +1406,9 @@ future>> view_up do_advance_existings = true; } if (do_advance_updates) { - co_await (do_advance_existings ? advance_all() : advance_updates()); + co_await (do_advance_existings ? read_both_next_fragments() : read_next_update_fragment()); } else if (do_advance_existings) { - co_await advance_existings(); + co_await read_next_existing_fragment(); } if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) { co_await seastar::sleep(std::chrono::milliseconds(10)); @@ -1420,7 +1420,7 @@ future>> view_up _skip_row_updates = true; } - while (!_skip_row_updates && co_await on_results() == stop_iteration::no) {}; + while (!_skip_row_updates && co_await generate_updates() == stop_iteration::no) {}; utils::chunked_vector mutations; for (auto& update : _view_updates) { @@ -1488,7 +1488,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone& } } -future view_update_builder::on_results() { +future view_update_builder::generate_updates() { constexpr size_t max_rows_for_view_updates = 100; auto should_stop_updates = [this] () -> bool { size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) { @@ -1518,7 +1518,7 @@ future view_update_builder::on_results() { : std::nullopt; generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } - return should_stop_updates() ? stop() : advance_updates(); + return should_stop_updates() ? stop() : read_next_update_fragment(); } if (cmp > 0) { // We have something existing but no update (which will happen either because it's a range tombstone marker in @@ -1554,7 +1554,7 @@ future view_update_builder::on_results() { generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } } - return should_stop_updates() ? stop () : advance_existings(); + return should_stop_updates() ? stop () : read_next_existing_fragment(); } // We're updating a row that had pre-existing data if (_update_fragment->is_range_tombstone_change()) { @@ -1578,7 +1578,7 @@ future view_update_builder::on_results() { generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); } - return should_stop_updates() ? stop() : advance_all(); + return should_stop_updates() ? stop() : read_both_next_fragments(); } auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); @@ -1595,7 +1595,7 @@ future view_update_builder::on_results() { auto update = static_row(); generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } - return should_stop_updates() ? stop() : advance_existings(); + return should_stop_updates() ? stop() : read_next_existing_fragment(); } if (_update_fragment && !_update_fragment->is_end_of_partition()) { @@ -1617,7 +1617,7 @@ future view_update_builder::on_results() { : std::nullopt; generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } - return should_stop_updates() ? stop() : advance_updates(); + return should_stop_updates() ? stop() : read_next_update_fragment(); } return stop(); diff --git a/db/view/view.hh b/db/view/view.hh index 702b358e2d..ff18bcdf0e 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -249,11 +249,13 @@ public: private: void generate_update(clustering_row&& update, std::optional&& existing); void generate_update(static_row&& update, const tombstone& update_tomb, std::optional&& existing, const tombstone& existing_tomb); - future on_results(); - future advance_all(); - future advance_updates(); - future advance_existings(); + // generate updates from the read fragments and read new fragments for the next iteration + future generate_updates(); + + future read_both_next_fragments(); + future read_next_update_fragment(); + future read_next_existing_fragment(); future stop() const; }; From 74902dceac1629d0829f392640164791ccb7fcc8 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 15 Apr 2026 17:59:09 +0200 Subject: [PATCH 5/7] mv: simplify clustering row handling in generate_updates() Two of the three clustering-row cases in generate_updates() used mutate_as_clustering_row() to apply a tombstone to the row in-place, then immediately moved the row out of the fragment. This triggered an unnecessary memory usage recalculation in the reader permit, since: 1. apply(tombstone) does not change external memory usage (tombstone is stored inline, not heap-allocated), so the recalculation will yield the same result. 2. The fragment is consumed on the very next line, so the tracking window is effectively zero. Simplify these two cases to match the first case (cmp < 0), which already uses the simpler pattern of moving the row out of the fragment first, then applying the tombstone on the extracted row. --- db/view/view.cc | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 8500c18f15..0a2298e79a 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1563,13 +1563,11 @@ future view_update_builder::generate_updates() { _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); } else if (_update_fragment->is_clustering_row()) { SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); - _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - }); - _existing_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - }); - generate_update(std::move(*_update_fragment).as_clustering_row(), { std::move(*_existing_fragment).as_clustering_row() }); + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto existing = std::move(*_existing_fragment).as_clustering_row(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + generate_update(std::move(update), { std::move(existing) }); } else if (_update_fragment->is_static_row()) { if (!_existing_fragment->is_static_row()) { on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", @@ -1602,14 +1600,13 @@ future view_update_builder::generate_updates() { if (_update_fragment->is_range_tombstone_change()) { _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); } else if (_update_fragment->is_clustering_row()) { - _update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - }); + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = existing_tombstone - ? std::optional(std::in_place, _update_fragment->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) + ? std::optional(std::in_place, update.key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) : std::nullopt; - generate_update(std::move(*_update_fragment).as_clustering_row(), std::move(existing)); + generate_update(std::move(update), std::move(existing)); } else if (_update_fragment->is_static_row()) { auto existing_tombstone = _existing_partition_tombstone; auto existing = existing_tombstone From 00be36e08fc1867dc87bab36cdd68474b81292f7 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 15 Apr 2026 18:50:18 +0200 Subject: [PATCH 6/7] mv: avoid unnecessary copies of existing rows in generate_updates() In the existing-only tail block of generate_updates(), the clustering row and static row were extracted from the fragment using a deep copy constructor (e.g. clustering_row(*_schema, fragment.as_clustering_row())) even though the fragment is not used afterwards. Replace with moves, matching the pattern used in all other cases. --- db/view/view.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 0a2298e79a..a17509a0a2 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1584,12 +1584,12 @@ future view_update_builder::generate_updates() { if (_existing_fragment->is_range_tombstone_change()) { _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); } else if (_existing_fragment->is_clustering_row()) { - auto existing = clustering_row(*_schema, _existing_fragment->as_clustering_row()); + auto existing = std::move(*_existing_fragment).as_clustering_row(); existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); } else if (_existing_fragment->is_static_row()) { - auto existing = static_row(*_schema, _existing_fragment->as_static_row()); + auto existing = std::move(*_existing_fragment).as_static_row(); auto update = static_row(); generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } From 667a928e819cf1a6d95770dd610c17e84de03b2a Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 15 Apr 2026 19:44:38 +0200 Subject: [PATCH 7/7] mv: deduplicate code for consuming fragments in view_update_builder Deduplicate the fragment-consuming logic in view_update_builder::generate_updates() by extracting it into three private methods: consume_both_fragments(), consume_update_fragment(), and consume_existing_fragment(). The three inlined blocks for cmp < 0, cmp > 0, and cmp == 0 were identical to the trailing "update only" and "existing only" blocks. The only semantic change is in the trailing "existing only" path: the outer tombstone guard is replaced by per-branch tombstone checks inside consume_existing_fragment(), which is both sufficient and more precise for the static_row case (uses partition tombstone only, not range tombstone which is irrelevant for static rows). --- db/view/view.cc | 188 ++++++++++++++++++++++-------------------------- db/view/view.hh | 4 ++ 2 files changed, 90 insertions(+), 102 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index a17509a0a2..5ef56f0002 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1374,6 +1374,86 @@ future view_update_builder::read_next_existing_fragment() { }); } +void view_update_builder::consume_both_fragments() { + if (_update_fragment->is_range_tombstone_change()) { + SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change()); + _existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone(); + _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto existing = std::move(*_existing_fragment).as_clustering_row(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + generate_update(std::move(update), { std::move(existing) }); + } else if (_update_fragment->is_static_row()) { + if (!_existing_fragment->is_static_row()) { + on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", + mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment))); + } + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); + + } +} + +void view_update_builder::consume_update_fragment() { + // We have an update where there was nothing before + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); + auto existing = tombstone + ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) + : std::nullopt; + generate_update(std::move(update), std::move(existing)); + } else if (_update_fragment->is_static_row()) { + auto update = std::move(*_update_fragment).as_static_row(); + auto tombstone = _existing_partition_tombstone; + auto existing = tombstone + ? std::optional(std::in_place) + : std::nullopt; + generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); + } +} + +void view_update_builder::consume_existing_fragment() { + // We have something existing but no update (which will happen either because it's a range tombstone marker in + // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). + // Due to how the read command for existing rows is constructed, it is also possible that there is a static + // row is included, even though we didn't modify it. + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = std::move(*_existing_fragment).as_clustering_row(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); + // The way we build the read command used for existing rows, we should always have a non-empty + // tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the + // read method ever changes. + if (tombstone) { + auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); + generate_update(std::move(update), { std::move(existing) }); + } + } else if (_existing_fragment->is_static_row()) { + auto existing = std::move(*_existing_fragment).as_static_row(); + auto tombstone = _update_partition_tombstone; + // The static row might be unintentionally included when fetching existing clustering rows, + // even if the static row was not updated. We can detect it. A static row can be affected either by: + // + // 1. A static row in the update mutation + // 2. A partition tombstone in the update mutation + // + // If neither of those is present, this means that the static row is included accidentally. + // If we are here, this means that (1) is not present. The `if` that follows checks for (2). + if (tombstone) { + auto update = static_row(); + generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); + } + } +} + future view_update_builder::stop() const { return make_ready_future(stop_iteration::yes); } @@ -1499,121 +1579,25 @@ future view_update_builder::generate_updates() { if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) { auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position()); if (cmp < 0) { - // We have an update where there was nothing before - if (_update_fragment->is_range_tombstone_change()) { - _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); - } else if (_update_fragment->is_clustering_row()) { - auto update = std::move(*_update_fragment).as_clustering_row(); - update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); - auto existing = tombstone - ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) - : std::nullopt; - generate_update(std::move(update), std::move(existing)); - } else if (_update_fragment->is_static_row()) { - auto update = std::move(*_update_fragment).as_static_row(); - auto tombstone = _existing_partition_tombstone; - auto existing = tombstone - ? std::optional(std::in_place) - : std::nullopt; - generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); - } + consume_update_fragment(); return should_stop_updates() ? stop() : read_next_update_fragment(); } if (cmp > 0) { - // We have something existing but no update (which will happen either because it's a range tombstone marker in - // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). - // Due to how the read command for existing rows is constructed, it is also possible that there is a static - // row is included, even though we didn't modify it. - if (_existing_fragment->is_range_tombstone_change()) { - _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); - } else if (_existing_fragment->is_clustering_row()) { - auto existing = std::move(*_existing_fragment).as_clustering_row(); - existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - // The way we build the read command used for existing rows, we should always have a non-empty - // tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the - // read method ever changes. - if (tombstone) { - auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); - generate_update(std::move(update), { std::move(existing) }); - } - } else if (_existing_fragment->is_static_row()) { - auto existing = std::move(*_existing_fragment).as_static_row(); - auto tombstone = _update_partition_tombstone; - // The static row might be unintentionally included when fetching existing clustering rows, - // even if the static row was not updated. We can detect it. A static row can be affected either by: - // - // 1. A static row in the update mutation - // 2. A partition tombstone in the update mutation - // - // If neither of those is present, this means that the static row is included accidentally. - // If we are here, this means that (1) is not present. The `if` that follows checks for (2). - if (tombstone) { - auto update = static_row(); - generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); - } - } + consume_existing_fragment(); return should_stop_updates() ? stop () : read_next_existing_fragment(); } // We're updating a row that had pre-existing data - if (_update_fragment->is_range_tombstone_change()) { - SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change()); - _existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone(); - _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); - } else if (_update_fragment->is_clustering_row()) { - SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); - auto update = std::move(*_update_fragment).as_clustering_row(); - update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - auto existing = std::move(*_existing_fragment).as_clustering_row(); - existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - generate_update(std::move(update), { std::move(existing) }); - } else if (_update_fragment->is_static_row()) { - if (!_existing_fragment->is_static_row()) { - on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", - mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment))); - } - generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); - - } + consume_both_fragments(); return should_stop_updates() ? stop() : read_both_next_fragments(); } - auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - if (tombstone && _existing_fragment && !_existing_fragment->is_end_of_partition()) { - if (_existing_fragment->is_range_tombstone_change()) { - _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); - } else if (_existing_fragment->is_clustering_row()) { - auto existing = std::move(*_existing_fragment).as_clustering_row(); - existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); - generate_update(std::move(update), { std::move(existing) }); - } else if (_existing_fragment->is_static_row()) { - auto existing = std::move(*_existing_fragment).as_static_row(); - auto update = static_row(); - generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); - } + if (_existing_fragment && !_existing_fragment->is_end_of_partition()) { + consume_existing_fragment(); return should_stop_updates() ? stop() : read_next_existing_fragment(); } if (_update_fragment && !_update_fragment->is_end_of_partition()) { - if (_update_fragment->is_range_tombstone_change()) { - _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); - } else if (_update_fragment->is_clustering_row()) { - auto update = std::move(*_update_fragment).as_clustering_row(); - update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); - auto existing = existing_tombstone - ? std::optional(std::in_place, update.key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) - : std::nullopt; - generate_update(std::move(update), std::move(existing)); - } else if (_update_fragment->is_static_row()) { - auto existing_tombstone = _existing_partition_tombstone; - auto existing = existing_tombstone - ? std::optional(std::in_place) - : std::nullopt; - generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); - } + consume_update_fragment(); return should_stop_updates() ? stop() : read_next_update_fragment(); } diff --git a/db/view/view.hh b/db/view/view.hh index ff18bcdf0e..5c32f4bd15 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -257,6 +257,10 @@ private: future read_next_update_fragment(); future read_next_existing_fragment(); + void consume_both_fragments(); + void consume_update_fragment(); + void consume_existing_fragment(); + future stop() const; };