mv: fix view_update_builder losing fragments across batch boundaries

When a mutation generates more view updates than max_rows_for_view_updates
(100), view_update_builder::build_some() splits the work into multiple
batches. There was a bug in how fragments were read between batches:

When should_stop_updates() returned true, the old code called stop()
which returned stop_iteration::yes without reading the next fragments.
On the next build_some() call, read_both_next_fragments() was called
at the start, which advanced BOTH readers - skipping any fragment that
was already read but not yet consumed. A row could be not consumed if
either:
- the 100th (last in the batch) update was a row insertion and we still
  had insertions/updates remaining
- the 100th (last in the batch) update was a row deletion and we still
  had deletions/updates remaining
For the most common case where work is split in batches, i.e. range
deletions, we couldn't hit this because range delete generates only
view row deletions.
On tables with a single materialized view, we also couldn't get this
for any batches with less than 50 statements (unless the batch also
contained range deletions), because one non-range-delete update can
generate up to 2 view updates.
Howeveer, for a range of scenarios outside these 2, we could lose
view updates, resulting in persistent inconsistencies.

The fix:
- read_*_next_fragment() now accept a stop_iteration parameter, so the
  next fragments are always read after consuming (even when stopping),
  but stop_iteration::yes is correctly propagated to break the loop.
- build_some() no longer re-reads fragments at the start. Instead, an
  initialize() method performs the initial read once at construction.
- because now we only advance readers after consuming, we won't advance
  readers after end_of_partition, so we extend the break condition to
  accept either readers evaluating to `false` or them being at the
  end_of_partition. We also handle the optimization with
  _skip_row_updates

Fixes: scylladb/scylladb#29155

Closes scylladb/scylladb#29498
This commit is contained in:
Wojciech Mitros
2026-04-15 20:45:26 +02:00
committed by Piotr Dulikowski
parent c59985c38b
commit ae0d77257f
4 changed files with 60 additions and 30 deletions

View File

@@ -1346,29 +1346,29 @@ future<> view_update_builder::close() noexcept {
return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result();
}
future<stop_iteration> view_update_builder::read_both_next_fragments() {
future<stop_iteration> view_update_builder::read_both_next_fragments(stop_iteration si) {
auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future<mutation_fragment_v2_opt>();
return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
return when_all(_update_reader(), std::move(existings_f)).then([this, si] (auto&& fragments) mutable {
_update_fragment = std::move(std::get<0>(fragments).get());
_existing_fragment = std::move(std::get<1>(fragments).get());
return stop_iteration::no;
return si;
});
}
future<stop_iteration> view_update_builder::read_next_update_fragment() {
return _update_reader().then([this] (auto&& update) mutable {
future<stop_iteration> view_update_builder::read_next_update_fragment(stop_iteration si) {
return _update_reader().then([this, si] (auto&& update) mutable {
_update_fragment = std::move(update);
return stop_iteration::no;
return si;
});
}
future<stop_iteration> view_update_builder::read_next_existing_fragment() {
future<stop_iteration> view_update_builder::read_next_existing_fragment(stop_iteration si) {
if (!_existing_reader) {
return make_ready_future<stop_iteration>(stop_iteration::no);
return make_ready_future<stop_iteration>(si);
}
return (*_existing_reader)().then([this] (auto&& existing) mutable {
return (*_existing_reader)().then([this, si] (auto&& existing) mutable {
_existing_fragment = std::move(existing);
return stop_iteration::no;
return si;
});
}
@@ -1452,13 +1452,13 @@ void view_update_builder::consume_existing_fragment() {
}
}
future<stop_iteration> view_update_builder::stop() const {
return make_ready_future<stop_iteration>(stop_iteration::yes);
future<> view_update_builder::initialize() {
return read_both_next_fragments().discard_result();
}
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_update_builder::build_some() {
(void)co_await read_both_next_fragments();
if (!_update_fragment && !_existing_fragment) {
if (((!_update_fragment || _update_fragment->is_end_of_partition()) && (!_existing_fragment || _existing_fragment->is_end_of_partition()))
|| _skip_row_updates) {
// Tell the caller there is no more data to build.
co_return std::nullopt;
}
@@ -1578,31 +1578,31 @@ future<stop_iteration> view_update_builder::generate_updates() {
auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position());
if (cmp < 0) {
consume_update_fragment();
return should_stop_updates() ? stop() : read_next_update_fragment();
return read_next_update_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no);
}
if (cmp > 0) {
consume_existing_fragment();
return should_stop_updates() ? stop () : read_next_existing_fragment();
return read_next_existing_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no);
}
// We're updating a row that had pre-existing data
consume_both_fragments();
return should_stop_updates() ? stop() : read_both_next_fragments();
return read_both_next_fragments(should_stop_updates() ? stop_iteration::yes : stop_iteration::no);
}
if (_existing_fragment && !_existing_fragment->is_end_of_partition()) {
consume_existing_fragment();
return should_stop_updates() ? stop() : read_next_existing_fragment();
return read_next_existing_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no);
}
if (_update_fragment && !_update_fragment->is_end_of_partition()) {
consume_update_fragment();
return should_stop_updates() ? stop() : read_next_update_fragment();
return read_next_update_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no);
}
return stop();
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
view_update_builder make_view_update_builder(
future<view_update_builder> make_view_update_builder(
data_dictionary::database db,
const replica::table& base_table,
const schema_ptr& base,
@@ -1613,7 +1613,9 @@ view_update_builder make_view_update_builder(
auto vs = views_to_update | std::views::transform([&] (view_ptr v) {
return view_updates(std::move(v), base);
}) | std::ranges::to<std::vector<view_updates>>();
return view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
auto builder = view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
co_await builder.initialize();
co_return builder;
}
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_dictionary::database db,

View File

@@ -244,6 +244,7 @@ public:
// std::optional, which is disengaged when the iteration is done.
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> build_some();
future<> initialize();
future<> close() noexcept;
private:
@@ -253,19 +254,17 @@ private:
// generate updates from the read fragments and read new fragments for the next iteration
future<stop_iteration> generate_updates();
future<stop_iteration> read_both_next_fragments();
future<stop_iteration> read_next_update_fragment();
future<stop_iteration> read_next_existing_fragment();
future<stop_iteration> read_both_next_fragments(stop_iteration si = stop_iteration::no);
future<stop_iteration> read_next_update_fragment(stop_iteration si = stop_iteration::no);
future<stop_iteration> read_next_existing_fragment(stop_iteration si = stop_iteration::no);
void consume_both_fragments();
void consume_update_fragment();
void consume_existing_fragment();
future<stop_iteration> stop() const;
};
// The readers provided for the view_update_builder should span the same single partition.
view_update_builder make_view_update_builder(
future<view_update_builder> make_view_update_builder(
data_dictionary::database db,
const replica::table& base_table,
const schema_ptr& base_schema,

View File

@@ -384,7 +384,7 @@ future<> view_update_generator::populate_views(const replica::table& table,
mutation_reader&& reader,
gc_clock::time_point now) {
auto schema = reader.schema();
view_update_builder builder = make_view_update_builder(
view_update_builder builder = co_await make_view_update_builder(
get_db().as_data_dictionary(),
table,
schema,
@@ -460,7 +460,7 @@ future<> view_update_generator::generate_and_propagate_view_updates(const replic
db::timeout_clock::time_point timeout) {
auto base_token = m.token();
auto m_schema = m.schema();
view_update_builder builder = make_view_update_builder(
view_update_builder builder = co_await make_view_update_builder(
get_db().as_data_dictionary(),
table,
base,

View File

@@ -1757,3 +1757,32 @@ def test_mv_select_key_columns(cql, test_keyspace, cassandra_bug):
# hurt to make sure.
cql.execute(f'insert into {table} (p, c, v1, v2, v3) values (1, 2, 3, 4, 5)')
assert [(3,1,2,4)] == list(cql.execute(f'select * from {mv} where v1=3'))
# When a mutation generates more view updates than max_rows_for_view_updates
# (100), view_update_builder splits the work into multiple batches. There was
# a bug where fragments read between batches were lost: when stopping a batch,
# the next fragments were not read, and on the next build_some() call,
# read_both_next_fragments() advanced both readers - skipping any fragment
# that was already buffered but not yet consumed. This caused existing fragment
# to be lost, leading to a missed view update and a ghost rows in the materialized view.
# Reproduces scylladb/scylladb#29155.
def test_view_update_builder_does_not_lose_fragments_across_batches(cql, test_keyspace):
with new_test_table(cql, test_keyspace, 'p int, c int, primary key (p, c)') as table:
with new_materialized_view(cql, table, '*', 'c, p', 'c is not null and p is not null') as mv:
# Insert a row which deletion will be lost if the bug is present. The row needs to have a clustering key
# higher than the first 100 clustering keys in the batch, so that it will be unprocessed after the first
# batch (100) of view updates is generated.
cql.execute(f'INSERT INTO {table} (p, c) VALUES (1, {200})')
# Create a batch that creates >100 view updates and deletes the prepared row.
cmd = f'BEGIN BATCH '
for i in range(105):
cmd += f'INSERT INTO {table} (p, c) VALUES (1, {i}); '
cmd += f'DELETE FROM {table} WHERE p=1 AND c={200}; '
cmd += 'APPLY BATCH;'
cql.execute(cmd)
base_count = cql.execute(f'SELECT count(*) FROM {table}').one().count
view_count = cql.execute(f'SELECT count(*) FROM {mv}').one().count
assert base_count == 105
assert view_count == 105