diff --git a/db/view/view.cc b/db/view/view.cc index 73ca6b6aa7..72fcb75ee9 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1346,29 +1346,29 @@ future<> view_update_builder::close() noexcept { return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result(); } -future view_update_builder::read_both_next_fragments() { +future view_update_builder::read_both_next_fragments(stop_iteration si) { auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future(); - return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable { + return when_all(_update_reader(), std::move(existings_f)).then([this, si] (auto&& fragments) mutable { _update_fragment = std::move(std::get<0>(fragments).get()); _existing_fragment = std::move(std::get<1>(fragments).get()); - return stop_iteration::no; + return si; }); } -future view_update_builder::read_next_update_fragment() { - return _update_reader().then([this] (auto&& update) mutable { +future view_update_builder::read_next_update_fragment(stop_iteration si) { + return _update_reader().then([this, si] (auto&& update) mutable { _update_fragment = std::move(update); - return stop_iteration::no; + return si; }); } -future view_update_builder::read_next_existing_fragment() { +future view_update_builder::read_next_existing_fragment(stop_iteration si) { if (!_existing_reader) { - return make_ready_future(stop_iteration::no); + return make_ready_future(si); } - return (*_existing_reader)().then([this] (auto&& existing) mutable { + return (*_existing_reader)().then([this, si] (auto&& existing) mutable { _existing_fragment = std::move(existing); - return stop_iteration::no; + return si; }); } @@ -1452,13 +1452,13 @@ void view_update_builder::consume_existing_fragment() { } } -future view_update_builder::stop() const { - return make_ready_future(stop_iteration::yes); +future<> view_update_builder::initialize() { + return read_both_next_fragments().discard_result(); } future>> view_update_builder::build_some() { - (void)co_await read_both_next_fragments(); - if (!_update_fragment && !_existing_fragment) { + if (((!_update_fragment || _update_fragment->is_end_of_partition()) && (!_existing_fragment || _existing_fragment->is_end_of_partition())) + || _skip_row_updates) { // Tell the caller there is no more data to build. co_return std::nullopt; } @@ -1578,31 +1578,31 @@ future view_update_builder::generate_updates() { auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position()); if (cmp < 0) { consume_update_fragment(); - return should_stop_updates() ? stop() : read_next_update_fragment(); + return read_next_update_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no); } if (cmp > 0) { consume_existing_fragment(); - return should_stop_updates() ? stop () : read_next_existing_fragment(); + return read_next_existing_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no); } // We're updating a row that had pre-existing data consume_both_fragments(); - return should_stop_updates() ? stop() : read_both_next_fragments(); + return read_both_next_fragments(should_stop_updates() ? stop_iteration::yes : stop_iteration::no); } if (_existing_fragment && !_existing_fragment->is_end_of_partition()) { consume_existing_fragment(); - return should_stop_updates() ? stop() : read_next_existing_fragment(); + return read_next_existing_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no); } if (_update_fragment && !_update_fragment->is_end_of_partition()) { consume_update_fragment(); - return should_stop_updates() ? stop() : read_next_update_fragment(); + return read_next_update_fragment(should_stop_updates() ? stop_iteration::yes : stop_iteration::no); } - return stop(); + return make_ready_future(stop_iteration::yes); } -view_update_builder make_view_update_builder( +future make_view_update_builder( data_dictionary::database db, const replica::table& base_table, const schema_ptr& base, @@ -1613,7 +1613,9 @@ view_update_builder make_view_update_builder( auto vs = views_to_update | std::views::transform([&] (view_ptr v) { return view_updates(std::move(v), base); }) | std::ranges::to>(); - return view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now); + auto builder = view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now); + co_await builder.initialize(); + co_return builder; } future calculate_affected_clustering_ranges(data_dictionary::database db, diff --git a/db/view/view.hh b/db/view/view.hh index 5c32f4bd15..1f69af7b89 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -244,6 +244,7 @@ public: // std::optional, which is disengaged when the iteration is done. future>> build_some(); + future<> initialize(); future<> close() noexcept; private: @@ -253,19 +254,17 @@ private: // generate updates from the read fragments and read new fragments for the next iteration future generate_updates(); - future read_both_next_fragments(); - future read_next_update_fragment(); - future read_next_existing_fragment(); + future read_both_next_fragments(stop_iteration si = stop_iteration::no); + future read_next_update_fragment(stop_iteration si = stop_iteration::no); + future read_next_existing_fragment(stop_iteration si = stop_iteration::no); void consume_both_fragments(); void consume_update_fragment(); void consume_existing_fragment(); - - future stop() const; }; // The readers provided for the view_update_builder should span the same single partition. -view_update_builder make_view_update_builder( +future make_view_update_builder( data_dictionary::database db, const replica::table& base_table, const schema_ptr& base_schema, diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 266ea7e4b8..7dcbebca71 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -384,7 +384,7 @@ future<> view_update_generator::populate_views(const replica::table& table, mutation_reader&& reader, gc_clock::time_point now) { auto schema = reader.schema(); - view_update_builder builder = make_view_update_builder( + view_update_builder builder = co_await make_view_update_builder( get_db().as_data_dictionary(), table, schema, @@ -460,7 +460,7 @@ future<> view_update_generator::generate_and_propagate_view_updates(const replic db::timeout_clock::time_point timeout) { auto base_token = m.token(); auto m_schema = m.schema(); - view_update_builder builder = make_view_update_builder( + view_update_builder builder = co_await make_view_update_builder( get_db().as_data_dictionary(), table, base, diff --git a/test/cqlpy/test_materialized_view.py b/test/cqlpy/test_materialized_view.py index ec9860784b..144fd20a3e 100644 --- a/test/cqlpy/test_materialized_view.py +++ b/test/cqlpy/test_materialized_view.py @@ -1757,3 +1757,32 @@ def test_mv_select_key_columns(cql, test_keyspace, cassandra_bug): # hurt to make sure. cql.execute(f'insert into {table} (p, c, v1, v2, v3) values (1, 2, 3, 4, 5)') assert [(3,1,2,4)] == list(cql.execute(f'select * from {mv} where v1=3')) + +# When a mutation generates more view updates than max_rows_for_view_updates +# (100), view_update_builder splits the work into multiple batches. There was +# a bug where fragments read between batches were lost: when stopping a batch, +# the next fragments were not read, and on the next build_some() call, +# read_both_next_fragments() advanced both readers - skipping any fragment +# that was already buffered but not yet consumed. This caused existing fragment +# to be lost, leading to a missed view update and a ghost rows in the materialized view. +# Reproduces scylladb/scylladb#29155. +def test_view_update_builder_does_not_lose_fragments_across_batches(cql, test_keyspace): + with new_test_table(cql, test_keyspace, 'p int, c int, primary key (p, c)') as table: + with new_materialized_view(cql, table, '*', 'c, p', 'c is not null and p is not null') as mv: + # Insert a row which deletion will be lost if the bug is present. The row needs to have a clustering key + # higher than the first 100 clustering keys in the batch, so that it will be unprocessed after the first + # batch (100) of view updates is generated. + cql.execute(f'INSERT INTO {table} (p, c) VALUES (1, {200})') + + # Create a batch that creates >100 view updates and deletes the prepared row. + cmd = f'BEGIN BATCH ' + for i in range(105): + cmd += f'INSERT INTO {table} (p, c) VALUES (1, {i}); ' + cmd += f'DELETE FROM {table} WHERE p=1 AND c={200}; ' + cmd += 'APPLY BATCH;' + cql.execute(cmd) + + base_count = cql.execute(f'SELECT count(*) FROM {table}').one().count + view_count = cql.execute(f'SELECT count(*) FROM {mv}').one().count + assert base_count == 105 + assert view_count == 105