mv: rename view_update_builder readers and cached fragments

Rename the members of view_update_builder to reflect their roles
more precisely:

  _updates   -> _update_reader
  _existings -> _existing_reader
  _update    -> _update_fragment
  _existing  -> _existing_fragment

This makes the code easier to follow by distinguishing the readers
(which produce a stream of fragments) from the cached fragments
(the most recently read mutation_fragment_v2 from each reader).
This commit is contained in:
Wojciech Mitros
2026-04-14 00:29:30 +02:00
parent 6edacdea74
commit 490b3f5c6f
2 changed files with 67 additions and 68 deletions

View File

@@ -1345,31 +1345,31 @@ bool view_updates::generate_partition_tombstone_update(
}
future<> view_update_builder::close() noexcept {
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result();
}
future<stop_iteration> view_update_builder::advance_all() {
auto existings_f = _existings ? (*_existings)() : make_ready_future<mutation_fragment_v2_opt>();
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
_update = std::move(std::get<0>(fragments).get());
_existing = std::move(std::get<1>(fragments).get());
auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future<mutation_fragment_v2_opt>();
return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
_update_fragment = std::move(std::get<0>(fragments).get());
_existing_fragment = std::move(std::get<1>(fragments).get());
return stop_iteration::no;
});
}
future<stop_iteration> view_update_builder::advance_updates() {
return _updates().then([this] (auto&& update) mutable {
_update = std::move(update);
return _update_reader().then([this] (auto&& update) mutable {
_update_fragment = std::move(update);
return stop_iteration::no;
});
}
future<stop_iteration> view_update_builder::advance_existings() {
if (!_existings) {
if (!_existing_reader) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return (*_existings)().then([this] (auto&& existing) mutable {
_existing = std::move(existing);
return (*_existing_reader)().then([this] (auto&& existing) mutable {
_existing_fragment = std::move(existing);
return stop_iteration::no;
});
}
@@ -1380,18 +1380,17 @@ future<stop_iteration> view_update_builder::stop() const {
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_update_builder::build_some() {
(void)co_await advance_all();
if (!_update && !_existing) {
if (!_update_fragment && !_existing_fragment) {
// Tell the caller there is no more data to build.
co_return std::nullopt;
}
bool do_advance_updates = false;
bool do_advance_existings = false;
bool is_partition_tombstone_applied_on_all_views = false;
if (_update && _update->is_partition_start()) {
_key = _update->as_partition_start().key().key();
_update_partition_tombstone = _update->as_partition_start().partition_tombstone();
if (_update_fragment && _update_fragment->is_partition_start()) {
_key = _update_fragment->as_partition_start().key().key();
_update_partition_tombstone = _update_fragment->as_partition_start().partition_tombstone();
do_advance_updates = true;
if (_update_partition_tombstone) {
// For views that have the same partition key as base, generate an update of partition tombstone to delete
// the entire partition in one operation, instead of generating an update for each row.
@@ -1402,8 +1401,8 @@ future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_up
}
}
}
if (_existing && _existing->is_partition_start()) {
_existing_partition_tombstone = _existing->as_partition_start().partition_tombstone();
if (_existing_fragment && _existing_fragment->is_partition_start()) {
_existing_partition_tombstone = _existing_fragment->as_partition_start().partition_tombstone();
do_advance_existings = true;
}
if (do_advance_updates) {
@@ -1411,13 +1410,13 @@ future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_up
} else if (do_advance_existings) {
co_await advance_existings();
}
if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) {
if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) {
co_await seastar::sleep(std::chrono::milliseconds(10));
}
// If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over
// all the rows trying to generate row updates, because the partition tombstones already cover everything.
if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) {
if (is_partition_tombstone_applied_on_all_views && _update_fragment->is_end_of_partition()) {
_skip_row_updates = true;
}
@@ -1497,22 +1496,22 @@ future<stop_iteration> view_update_builder::on_results() {
});
return rows_for_view_updates >= max_rows_for_view_updates;
};
if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) {
auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) {
auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position());
if (cmp < 0) {
// We have an update where there was nothing before
if (_update->is_range_tombstone_change()) {
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
} else if (_update->is_clustering_row()) {
auto update = std::move(*_update).as_clustering_row();
if (_update_fragment->is_range_tombstone_change()) {
_update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone();
} else if (_update_fragment->is_clustering_row()) {
auto update = std::move(*_update_fragment).as_clustering_row();
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
auto existing = tombstone
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(update), std::move(existing));
} else if (_update->is_static_row()) {
auto update = std::move(*_update).as_static_row();
} else if (_update_fragment->is_static_row()) {
auto update = std::move(*_update_fragment).as_static_row();
auto tombstone = _existing_partition_tombstone;
auto existing = tombstone
? std::optional<static_row>(std::in_place)
@@ -1526,10 +1525,10 @@ future<stop_iteration> view_update_builder::on_results() {
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
// row is included, even though we didn't modify it.
if (_existing->is_range_tombstone_change()) {
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
} else if (_existing->is_clustering_row()) {
auto existing = std::move(*_existing).as_clustering_row();
if (_existing_fragment->is_range_tombstone_change()) {
_existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone();
} else if (_existing_fragment->is_clustering_row()) {
auto existing = std::move(*_existing_fragment).as_clustering_row();
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
// The way we build the read command used for existing rows, we should always have a non-empty
@@ -1539,8 +1538,8 @@ future<stop_iteration> view_update_builder::on_results() {
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}
} else if (_existing->is_static_row()) {
auto existing = std::move(*_existing).as_static_row();
} else if (_existing_fragment->is_static_row()) {
auto existing = std::move(*_existing_fragment).as_static_row();
auto tombstone = _update_partition_tombstone;
// The static row might be unintentionally included when fetching existing clustering rows,
// even if the static row was not updated. We can detect it. A static row can be affected either by:
@@ -1558,65 +1557,65 @@ future<stop_iteration> view_update_builder::on_results() {
return should_stop_updates() ? stop () : advance_existings();
}
// We're updating a row that had pre-existing data
if (_update->is_range_tombstone_change()) {
SCYLLA_ASSERT(_existing->is_range_tombstone_change());
_existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone();
_update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone();
} else if (_update->is_clustering_row()) {
SCYLLA_ASSERT(_existing->is_clustering_row());
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
if (_update_fragment->is_range_tombstone_change()) {
SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change());
_existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone();
_update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone();
} else if (_update_fragment->is_clustering_row()) {
SCYLLA_ASSERT(_existing_fragment->is_clustering_row());
_update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
});
_existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
_existing_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
});
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
} else if (_update->is_static_row()) {
if (!_existing->is_static_row()) {
generate_update(std::move(*_update_fragment).as_clustering_row(), { std::move(*_existing_fragment).as_clustering_row() });
} else if (_update_fragment->is_static_row()) {
if (!_existing_fragment->is_static_row()) {
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing)));
mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment)));
}
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone);
generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone);
}
return should_stop_updates() ? stop() : advance_all();
}
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
if (tombstone && _existing && !_existing->is_end_of_partition()) {
if (_existing->is_range_tombstone_change()) {
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
} else if (_existing->is_clustering_row()) {
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
if (tombstone && _existing_fragment && !_existing_fragment->is_end_of_partition()) {
if (_existing_fragment->is_range_tombstone_change()) {
_existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone();
} else if (_existing_fragment->is_clustering_row()) {
auto existing = clustering_row(*_schema, _existing_fragment->as_clustering_row());
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
} else if (_existing->is_static_row()) {
auto existing = static_row(*_schema, _existing->as_static_row());
} else if (_existing_fragment->is_static_row()) {
auto existing = static_row(*_schema, _existing_fragment->as_static_row());
auto update = static_row();
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
}
return should_stop_updates() ? stop() : advance_existings();
}
if (_update && !_update->is_end_of_partition()) {
if (_update->is_range_tombstone_change()) {
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
} else if (_update->is_clustering_row()) {
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
if (_update_fragment && !_update_fragment->is_end_of_partition()) {
if (_update_fragment->is_range_tombstone_change()) {
_update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone();
} else if (_update_fragment->is_clustering_row()) {
_update_fragment->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
});
auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
auto existing = existing_tombstone
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
? std::optional<clustering_row>(std::in_place, _update_fragment->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
} else if (_update->is_static_row()) {
generate_update(std::move(*_update_fragment).as_clustering_row(), std::move(existing));
} else if (_update_fragment->is_static_row()) {
auto existing_tombstone = _existing_partition_tombstone;
auto existing = existing_tombstone
? std::optional<static_row>(std::in_place)
: std::nullopt;
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
}
return should_stop_updates() ? stop() : advance_updates();
}

View File

@@ -203,14 +203,14 @@ class view_update_builder {
const replica::table& _base;
schema_ptr _schema; // The base schema
std::vector<view_updates> _view_updates;
mutation_reader _updates;
mutation_reader_opt _existings;
mutation_reader _update_reader;
mutation_reader_opt _existing_reader;
tombstone _update_partition_tombstone;
tombstone _update_current_tombstone;
tombstone _existing_partition_tombstone;
tombstone _existing_current_tombstone;
mutation_fragment_v2_opt _update;
mutation_fragment_v2_opt _existing;
mutation_fragment_v2_opt _update_fragment;
mutation_fragment_v2_opt _existing_fragment;
gc_clock::time_point _now;
partition_key _key = partition_key::make_empty();
bool _skip_row_updates = false;
@@ -225,8 +225,8 @@ public:
, _base(base)
, _schema(std::move(s))
, _view_updates(std::move(views_to_update))
, _updates(std::move(updates))
, _existings(std::move(existings))
, _update_reader(std::move(updates))
, _existing_reader(std::move(existings))
, _now(now) {
}
view_update_builder(view_update_builder&& other) noexcept = default;