diff --git a/cdc/log.cc b/cdc/log.cc index 77d607fc7e..e56065711a 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -1726,7 +1726,8 @@ public: const mutation& m) { auto& p = m.partition(); - if (p.clustered_rows().empty() && p.static_row().empty()) { + const bool no_ck_schema_partition_deletion = m.schema()->clustering_key_size() == 0 && bool(p.partition_tombstone()); + if (p.clustered_rows().empty() && p.static_row().empty() && !no_ck_schema_partition_deletion) { return make_ready_future>(); } @@ -1775,12 +1776,12 @@ public: }); } } - if (!p.clustered_rows().empty()) { + if (!p.clustered_rows().empty() || no_ck_schema_partition_deletion) { const bool has_row_delete = std::any_of(p.clustered_rows().begin(), p.clustered_rows().end(), [] (const rows_entry& re) { return re.row().deleted_at(); }); // for postimage we need everything... - if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) { + if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage() || no_ck_schema_partition_deletion) { for (const column_definition& c: _schema->regular_columns()) { regular_columns.emplace_back(c.id); columns.emplace_back(&c); diff --git a/cdc/split.cc b/cdc/split.cc index 48d2b6f280..24fa75727d 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -111,6 +111,15 @@ struct batch { ret.insert(std::make_pair(change.key, all_columns)); } } + // While deleting a full partition avoids row-by-row logging for performance + // reasons, we must explicitly log single-row deletions for tables without a + // clustering key. This ensures consistent behavior with deletions of single + // rows from tables with a clustering key. See issue #26382. + if (partition_deletions && s.clustering_key_size() == 0) { + cdc::one_kind_column_set all_columns{s.regular_columns_count()}; + all_columns.set(0, s.regular_columns_count(), true); + ret.emplace(clustering_key::make_empty(), all_columns); + } auto process_change_type = [&] (const auto& changes) { for (const auto& change : changes) { diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 71ce1de884..651bcaa3ac 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -548,6 +548,10 @@ def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams): def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams): yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'KEYS_ONLY') +@pytest.fixture(scope="function") +def test_table_s_no_ck_old_image(dynamodb, dynamodbstreams): + yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'OLD_IMAGE') + @pytest.fixture(scope="function") def test_table_s_no_ck_new_and_old_images(dynamodb, dynamodbstreams): yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') @@ -870,8 +874,7 @@ def test_streams_putitem_no_ck_new_items_override_old(test_table_s_no_ck_new_and # differently by CDC. In issue #26382 - which this test reproduces - we # discovered that our implementation doesn't select a preimage for partition # deletions, resulting in a missing OldImage. -@pytest.mark.xfail(reason="issue #26382") -def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): +def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, test_table_s_no_ck_old_image, dynamodb, dynamodbstreams): def do_updates(table, p, _): events = [] table.update_item(Key={'p': p}, @@ -881,6 +884,7 @@ def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_image table.delete_item(Key={'p': p}) events.append(['REMOVE', {'p': p}, {'p': p, 'x': 1}, None]) return events + do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE') do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') # Test a single UpdateItem. Should result in a single INSERT event. diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index 664529bb5f..70b332c087 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -6,7 +6,9 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include #include +#include #include #undef SEASTAR_TESTING_MAIN #include @@ -1989,6 +1991,63 @@ SEASTAR_THREAD_TEST_CASE(test_batch_pre_post_image) { test_batch_images(true, true); } +// Deleting a row in a table with a clustering key and preimage enabled logs +// the preimage. In tables without a clustering key, though, the preimage is +// missing. Reproduces #26382. +SEASTAR_THREAD_TEST_CASE(test_preimage_delete_no_clustering_key) { + do_with_cql_env_thread([](cql_test_env& e) { + using oper_ut = std::underlying_type_t; + for (const auto pre : {cdc::image_mode::on, cdc::image_mode::full}) { + cquery_nofail(e, format("CREATE TABLE ks.t (pk INT, val INT, PRIMARY KEY (pk)) WITH cdc = {{'enabled': true, 'preimage': '{}'}}", pre)); + cquery_nofail(e, "INSERT INTO ks.t (pk, val) VALUES (1, 2)"); + cquery_nofail(e, "INSERT INTO ks.t (pk, val) VALUES (1, 3)"); + cquery_nofail(e, "DELETE FROM ks.t WHERE pk = 1"); + + const auto result = get_result(e, { + data_type_for(), int32_type, int32_type}, + "SELECT \"cdc$operation\", pk, val FROM ks.t_scylla_cdc_log"); + + const std::vector> expected = { + {oper_ut(cdc::operation::insert), int32_t(1), int32_t(2)}, + {oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(2)}, + {oper_ut(cdc::operation::insert), int32_t(1), int32_t(3)}, + {oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(3)}, + {oper_ut(cdc::operation::partition_delete), int32_t(1), data_value::make_null(int32_type)} + }; + + BOOST_REQUIRE_EQUAL(expected, result); + cquery_nofail(e, "DROP TABLE ks.t"); + } + }).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_preimage_delete_clustering_key) { + do_with_cql_env_thread([](cql_test_env& e) { + using oper_ut = std::underlying_type_t; + for (const auto pre : {cdc::image_mode::on, cdc::image_mode::full}) { + cquery_nofail(e, format("CREATE TABLE ks.t (pk INT, ck INT, val INT, PRIMARY KEY (pk, ck)) WITH cdc = {{'enabled': true, 'preimage': '{}'}}", pre)); + cquery_nofail(e, "INSERT INTO ks.t (pk, ck, val) VALUES (1, 11, 2)"); + cquery_nofail(e, "INSERT INTO ks.t (pk, ck, val) VALUES (1, 11, 3)"); + cquery_nofail(e, "DELETE FROM ks.t WHERE pk = 1 AND ck = 11"); + + const auto result = get_result(e, { + data_type_for(), int32_type, int32_type}, + "SELECT \"cdc$operation\", pk, val FROM ks.t_scylla_cdc_log"); + + const std::vector> expected = { + {oper_ut(cdc::operation::insert), int32_t(1), int32_t(2)}, + {oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(2)}, + {oper_ut(cdc::operation::insert), int32_t(1), int32_t(3)}, + {oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(3)}, + {oper_ut(cdc::operation::row_delete), int32_t(1), data_value::make_null(int32_type)} + }; + + BOOST_REQUIRE_EQUAL(expected, result); + cquery_nofail(e, "DROP TABLE ks.t"); + } + }).get(); +} + // Regression test for #7716 SEASTAR_THREAD_TEST_CASE(test_postimage_with_no_regular_columns) { do_with_cql_env_thread([] (cql_test_env& e) {