diff --git a/db/view/view.cc b/db/view/view.cc index 5abdd35302..5455edb8fd 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1219,8 +1219,12 @@ future view_update_builder::stop() const { return make_ready_future(stop_iteration::yes); } -future> view_update_builder::build_some() { +future>> view_update_builder::build_some() { (void)co_await advance_all(); + if (!_update && !_existing) { + // Tell the caller there is no more data to build. + co_return std::nullopt; + } bool do_advance_updates = false; bool do_advance_existings = false; if (_update && _update->is_partition_start()) { diff --git a/db/view/view.hh b/db/view/view.hh index e0697a1d69..24067171e4 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -275,7 +275,15 @@ public: } view_update_builder(view_update_builder&& other) noexcept = default; - future> build_some(); + + // build_some() works on batches of 100 (max_rows_for_view_updates) + // updated rows, but can_skip_view_updates() can decide that some of + // these rows do not effect the view, and as a result build_some() can + // fewer than 100 rows - in extreme cases even zero (see issue #12297). + // So we can't use an empty returned vector to signify that the view + // update building is done - and we wrap the return value in an + // std::optional, which is disengaged when the iteration is done. + future>> build_some(); future<> close() noexcept; diff --git a/replica/table.cc b/replica/table.cc index 505d9b07dc..3252a6dde7 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1809,20 +1809,20 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base, std::exception_ptr err = nullptr; while (true) { - utils::chunked_vector updates; + std::optional> updates; try { updates = co_await builder.build_some(); } catch (...) { err = std::current_exception(); break; } - if (updates.empty()) { + if (!updates) { break; } - tracing::trace(tr_state, "Generated {} view update mutations", updates.size()); - auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); + tracing::trace(tr_state, "Generated {} view update mutations", updates->size()); + auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates)); try { - co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, tr_state, + co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state, std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no); } catch (...) { // Ignore exceptions: any individual failure to propagate a view update will be reported @@ -1947,14 +1947,14 @@ future<> table::populate_views( while (true) { try { auto updates = co_await builder.build_some(); - if (updates.empty()) { + if (!updates) { break; } - size_t update_size = memory_usage_of(updates); + size_t update_size = memory_usage_of(*updates); size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for); units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for)); - co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, + co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes); } catch (...) { if (!err) { diff --git a/test/cql-pytest/test_materialized_view.py b/test/cql-pytest/test_materialized_view.py index 09d9a6b751..a0907e5fce 100644 --- a/test/cql-pytest/test_materialized_view.py +++ b/test/cql-pytest/test_materialized_view.py @@ -541,3 +541,106 @@ def test_view_update_and_alter_base(cql, test_keyspace, scylla_only): # Try to modify an item. This failed in #11542. cql.execute(f'UPDATE {table} SET v=-1 WHERE p=1') assert len(list(cql.execute(f"SELECT v from {mv}"))) == 0 + +# Reproducer for issue #12297, reproducing a specific way in which a view +# table could be made inconsistent with the base table: +# The test writes 500 rows to one partition in a base table, and then uses +# USING TIMESTAMP with the right value to cause a base partition deletion +# which deletes not the entire partition but just its last 50 rows. As the +# 50 rows of the base partition get deleted, we expect 50 rows from the +# view table to also get deleted - but bug #12297 was that this wasn't +# happening - rather, all rows remained in the view. +# The bug cannot be reproduced with 100 rows (and deleting the last 10) +# but 113 rows (and 101 rows after deleting the last 12) does reproduce +# it. Reproducing the bug also required a setup where USING TIMESTAMP +# deleted the *last* rows - using it to delete the *first* rows did not +# have a bug (the view rows were deleted fine). +@pytest.mark.parametrize("size", [100, 113, 500]) +def test_long_skipped_view_update_delete_with_timestamp(cql, test_keyspace, size): + with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table: + with new_materialized_view(cql, table, '*', 'p, x, c', 'p is not null and x is not null and c is not null') as mv: + # Write size rows with c=0..(size-1). Because the iteration is in + # reverse order, the first row in clustering order (c=0) will + # have the latest write timestamp. + for i in reversed(range(size)): + cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})') + assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1")) + # Get the timestamp of the size*0.9th item. Because we wrote items + # in reverse, items 0.9-1.0*size all have earlier timestamp than + # that. + t = list(cql.execute(f"SELECT writetime(y) FROM {table} WHERE p = 1 and c = {int(size*0.9)}"))[0].writetime_y + cql.execute(f'DELETE FROM {table} USING TIMESTAMP {t} WHERE p=1') + # After the deletion we expect to see size*0.9 rows remaining + # (timestamp ties cannot happen for separate writes, if they + # did we could have a bit less), but most importantly, the view + # should have exactly the same rows. + assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1")) + +# Same test as above, just that in this version the view partition key is +# different from the base's, so we can be sure that Scylla needs to go +# through the loop of deleting many view rows and cannot delete an entire +# view partition in one fell swoop. In the above test, Scylla *may* contain +# such an optimization (currently it doesn't), so it may reach a different +# code path. +def test_long_skipped_view_update_delete_with_timestamp2(cql, test_keyspace): + size = 200 + with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table: + with new_materialized_view(cql, table, '*', 'x, p, c', 'p is not null and x is not null and c is not null') as mv: + for i in reversed(range(size)): + cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})') + assert list(cql.execute(f"SELECT c FROM {table}")) == sorted(list(cql.execute(f"SELECT c FROM {mv}"))) + t = list(cql.execute(f"SELECT writetime(y) FROM {table} WHERE p = 1 and c = {int(size*0.9)}"))[0].writetime_y + cql.execute(f'DELETE FROM {table} USING TIMESTAMP {t} WHERE p=1') + assert list(cql.execute(f"SELECT c FROM {table}")) == sorted(list(cql.execute(f"SELECT c FROM {mv}"))) + +# Another, more fundemental, reproducer for issue #12297 where a certain +# modification to a base partition modifing more than 100 rows was not +# applied to the view beyond the 100th row. +# The test above, test_long_skipped_view_update_delete_with_timestamp was one +# such specific case, which involved a partition tombstone and a specific +# choice of timestamp which causes the first 100 rows to NOT be changed. +# In this test we show that the bug is not just about do-nothing tombstones: +# In any base modification which involves more than 100 rows, if the first +# 100 rows don't change the view (as decided by the can_skip_view_updates() +# function), the other rows are wrongly skipped at well and not applied to +# the view! +# The specific case we use here is an update that sets some irrelevant +# (not-selected-by-the-view) column y on 200 rows, and additionally writes +# a new row as the 201st row. With bug #12297, that 201st row will be +# missing in the view. +def test_long_skipped_view_update_irrelevant_column(cql, test_keyspace): + size = 200 + with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table: + # Note that column "y" is not selected by the materialized view + with new_materialized_view(cql, table, 'p, x, c', 'p, x, c', 'p is not null and x is not null and c is not null') as mv: + for i in range(size): + cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})') + # In a single batch (a single mutation), update "y" column in all + # 'size' existing rows, plus add one new row in the last position + # (the partition is sorted by the "c" column). The first 'size' + # UPDATEs can be skipped in the view (because y isn't selected), + # but the last INSERT can't be skipped - it really adds a new row. + cmd = 'BEGIN BATCH ' + for i in range(size): + cmd += f'UPDATE {table} SET y=7 where p=1 and c={i}; ' + cmd += f'INSERT INTO {table} (p,c,x,y) VALUES (1,{size+1},{size+1},{size+1}); ' + cmd += 'APPLY BATCH;' + cql.execute(cmd) + # We should now have the same size+1 rows in both base and view + assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1")) + +# After the previous tests checked elaborate conditions where modifying a +# base-table partition resulted in many skipped view updates, let's also +# check the more basic situation where the base-table partition modification +# (in this case, a deletion) result in many view-table updates, and all +# of them should happen even if the code needs to do it internally in +# several batches of 100 (for example). +def test_mv_long_delete(cql, test_keyspace): + size = 300 + with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table: + with new_materialized_view(cql, table, '*', 'p, x, c', 'p is not null and x is not null and c is not null') as mv: + for i in range(size): + cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})') + cql.execute(f'DELETE FROM {table} WHERE p=1') + assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == [] + assert list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1")) == []