cdc: Emit a preimage for non-clustered tables

Until this patch, CDC haven't fetched a preimage for mutations
containing only a partition tombstone. Therefore, single-row deletions
in a table witout a clustering key didn't include a preimage, which was
inconsistent with single-row clustered deletions. This commit addresses
this inconsistency.

Second reason is compatibility with DynamoDB Streams, which doesn't
support entire-partition deletes. Alternator uses partition tombstones
for single-row deletions, though, and in these cases the 'OldImage' was
missing from REMOVE records.

Fixes https://github.com/scylladb/scylladb/issues/26382

Closes scylladb/scylladb#26578
This commit is contained in:
Piotr Wieczorek
2025-10-16 23:52:38 +02:00
committed by Nadav Har'El
parent 29ed1f3de7
commit 2812e67f47
4 changed files with 78 additions and 5 deletions

View File

@@ -1726,7 +1726,8 @@ public:
const mutation& m)
{
auto& p = m.partition();
if (p.clustered_rows().empty() && p.static_row().empty()) {
const bool no_ck_schema_partition_deletion = m.schema()->clustering_key_size() == 0 && bool(p.partition_tombstone());
if (p.clustered_rows().empty() && p.static_row().empty() && !no_ck_schema_partition_deletion) {
return make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>();
}
@@ -1775,12 +1776,12 @@ public:
});
}
}
if (!p.clustered_rows().empty()) {
if (!p.clustered_rows().empty() || no_ck_schema_partition_deletion) {
const bool has_row_delete = std::any_of(p.clustered_rows().begin(), p.clustered_rows().end(), [] (const rows_entry& re) {
return re.row().deleted_at();
});
// for postimage we need everything...
if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage()) {
if (has_row_delete || _schema->cdc_options().postimage() || _schema->cdc_options().full_preimage() || no_ck_schema_partition_deletion) {
for (const column_definition& c: _schema->regular_columns()) {
regular_columns.emplace_back(c.id);
columns.emplace_back(&c);

View File

@@ -111,6 +111,15 @@ struct batch {
ret.insert(std::make_pair(change.key, all_columns));
}
}
// While deleting a full partition avoids row-by-row logging for performance
// reasons, we must explicitly log single-row deletions for tables without a
// clustering key. This ensures consistent behavior with deletions of single
// rows from tables with a clustering key. See issue #26382.
if (partition_deletions && s.clustering_key_size() == 0) {
cdc::one_kind_column_set all_columns{s.regular_columns_count()};
all_columns.set(0, s.regular_columns_count(), true);
ret.emplace(clustering_key::make_empty(), all_columns);
}
auto process_change_type = [&] (const auto& changes) {
for (const auto& change : changes) {

View File

@@ -548,6 +548,10 @@ def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'KEYS_ONLY')
@pytest.fixture(scope="function")
def test_table_s_no_ck_old_image(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'OLD_IMAGE')
@pytest.fixture(scope="function")
def test_table_s_no_ck_new_and_old_images(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
@@ -870,8 +874,7 @@ def test_streams_putitem_no_ck_new_items_override_old(test_table_s_no_ck_new_and
# differently by CDC. In issue #26382 - which this test reproduces - we
# discovered that our implementation doesn't select a preimage for partition
# deletions, resulting in a missing OldImage.
@pytest.mark.xfail(reason="issue #26382")
def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, _):
events = []
table.update_item(Key={'p': p},
@@ -881,6 +884,7 @@ def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_image
table.delete_item(Key={'p': p})
events.append(['REMOVE', {'p': p}, {'p': p, 'x': 1}, None])
return events
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test a single UpdateItem. Should result in a single INSERT event.

View File

@@ -6,7 +6,9 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <fmt/base.h>
#include <fmt/core.h>
#include <iostream>
#include <seastar/util/defer.hh>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
@@ -1989,6 +1991,63 @@ SEASTAR_THREAD_TEST_CASE(test_batch_pre_post_image) {
test_batch_images(true, true);
}
// Deleting a row in a table with a clustering key and preimage enabled logs
// the preimage. In tables without a clustering key, though, the preimage is
// missing. Reproduces #26382.
SEASTAR_THREAD_TEST_CASE(test_preimage_delete_no_clustering_key) {
do_with_cql_env_thread([](cql_test_env& e) {
using oper_ut = std::underlying_type_t<cdc::operation>;
for (const auto pre : {cdc::image_mode::on, cdc::image_mode::full}) {
cquery_nofail(e, format("CREATE TABLE ks.t (pk INT, val INT, PRIMARY KEY (pk)) WITH cdc = {{'enabled': true, 'preimage': '{}'}}", pre));
cquery_nofail(e, "INSERT INTO ks.t (pk, val) VALUES (1, 2)");
cquery_nofail(e, "INSERT INTO ks.t (pk, val) VALUES (1, 3)");
cquery_nofail(e, "DELETE FROM ks.t WHERE pk = 1");
const auto result = get_result(e, {
data_type_for<oper_ut>(), int32_type, int32_type},
"SELECT \"cdc$operation\", pk, val FROM ks.t_scylla_cdc_log");
const std::vector<std::vector<data_value>> expected = {
{oper_ut(cdc::operation::insert), int32_t(1), int32_t(2)},
{oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(2)},
{oper_ut(cdc::operation::insert), int32_t(1), int32_t(3)},
{oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(3)},
{oper_ut(cdc::operation::partition_delete), int32_t(1), data_value::make_null(int32_type)}
};
BOOST_REQUIRE_EQUAL(expected, result);
cquery_nofail(e, "DROP TABLE ks.t");
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_preimage_delete_clustering_key) {
do_with_cql_env_thread([](cql_test_env& e) {
using oper_ut = std::underlying_type_t<cdc::operation>;
for (const auto pre : {cdc::image_mode::on, cdc::image_mode::full}) {
cquery_nofail(e, format("CREATE TABLE ks.t (pk INT, ck INT, val INT, PRIMARY KEY (pk, ck)) WITH cdc = {{'enabled': true, 'preimage': '{}'}}", pre));
cquery_nofail(e, "INSERT INTO ks.t (pk, ck, val) VALUES (1, 11, 2)");
cquery_nofail(e, "INSERT INTO ks.t (pk, ck, val) VALUES (1, 11, 3)");
cquery_nofail(e, "DELETE FROM ks.t WHERE pk = 1 AND ck = 11");
const auto result = get_result(e, {
data_type_for<oper_ut>(), int32_type, int32_type},
"SELECT \"cdc$operation\", pk, val FROM ks.t_scylla_cdc_log");
const std::vector<std::vector<data_value>> expected = {
{oper_ut(cdc::operation::insert), int32_t(1), int32_t(2)},
{oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(2)},
{oper_ut(cdc::operation::insert), int32_t(1), int32_t(3)},
{oper_ut(cdc::operation::pre_image), int32_t(1), int32_t(3)},
{oper_ut(cdc::operation::row_delete), int32_t(1), data_value::make_null(int32_type)}
};
BOOST_REQUIRE_EQUAL(expected, result);
cquery_nofail(e, "DROP TABLE ks.t");
}
}).get();
}
// Regression test for #7716
SEASTAR_THREAD_TEST_CASE(test_postimage_with_no_regular_columns) {
do_with_cql_env_thread([] (cql_test_env& e) {