From 2894542e57ae4669c3fba2f8115b29cfcf75d756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 2 Feb 2026 17:40:28 +0100 Subject: [PATCH 1/3] alternator: add failing tests for Streams Add failing tests for Streams functionality. Trying to remove column from non-existing item is producing a MODIFY event (while it should none). Doing batch write with operations working on the same partition, where one operation is without side effects and second with will produce events for both operations, even though first changes nothing. First test has two versions - with and without clustering key. Second has only with clustering key, as we can't produce batch write with two items for the same partition - batch write can't use primary key more than once in single call. We also add a test for batch write, where one of three operations has no observable side effects and should not show up in Streams output, but in current scylla's version it does show. --- test/alternator/test_streams.py | 114 ++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 881b2f9785..d54ef3202b 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -286,6 +286,96 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): # local java throws here. real does not. ensure_java_server(dynamodbstreams, error=None) +# We run a batch that mixes noop operations (in our concrete example - one delete of non-existing item) with real changes (put of two new items). +# Expected behaviour - Streams will return only two inserts (no delete). Observed behaviour - we get modify for delete as well. +# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag. +# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue. +# Reproduces SCYLLADB-1528. +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + + with table.batch_writer() as batch: + batch.put_item({'p': p, 'c': c + '0', 'a': 0}) + events.append(['INSERT',{'c':c + '0','p':p},null,{'c':c + '0','a':0,'p':p}]) + batch.delete_item(Key={'p': p, 'c': c + '1'}) + batch.put_item({'p': p, 'c': c + '2', 'a': 2}) + + events.append(['INSERT',{'c':c + '2','p':p},null,{'c':c + '2','a':2,'p':p}]) + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# Running update_item with UpdateExpression set to remove column should not emit a MODIFY event when item does not exist +# The test tries all combinations (delete column from non-existing item, delete existing column from existing item, delete non-existing column from existing item) +# only delete column from non-existing item used to incorrectly emit a MODIFY event +# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail +# Reproduces SCYLLADB-1528 +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression="SET e = :e, g = :g", ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166, 'g': 166} + events.append(['INSERT',{'c':c,'p':p},null,{'c':c,'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + events.append(['MODIFY',{'c':c,'p':p},{'c':c,'e':166,'g':166,'p':p}, {'c':c,'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p}, UpdateExpression='SET e = :e, g = :g', ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166, 'g': 166} + events.append(['INSERT',{'p':p},null,{'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + events.append(['MODIFY',{'p':p},{'e':166,'g':166,'p':p}, {'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + + return events + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + def test_get_shard_iterator(dynamodb, dynamodbstreams): with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: streams = dynamodbstreams.list_streams(TableName=table.name) @@ -1672,6 +1762,14 @@ def do_updates_1(table, p, c): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]}) events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}]) + + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p, 'c': c + '1'}) + events.append(['INSERT', {'p': p, 'c': c + '1'}, None, {'p': p, 'c': c + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p, 'c': c + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p, 'c': c + '1'}, UpdateExpression='REMOVE g') return events # The tested events are the same as in do_updates_1, but the table doesn't have @@ -1704,36 +1802,52 @@ def do_updates_1_no_ck(table, p, _): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]}) events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}]) + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p + '1'}) + events.append(['INSERT', {'p': p + '1'}, None, {'p': p + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p + '1'}, UpdateExpression='REMOVE g') + return events +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES') From 6e5aaa85b6033dbee0a40addbd05daa1b6d7f518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 2 Feb 2026 13:15:49 +0100 Subject: [PATCH 2/3] alternator: fix Alternator writing unnecesary cdc entries Work in this patch is a result of two bugs - spurious MODIFY event, when remove column is used in `update_item` on non-existing item and spurious events, when batch write item mixed noop operations with operations involving actual changes (the former would still emit cdc log entries). The latter issue required rework of Piotr Wieczorek's algorithm, which fixed former issue as well. Piotr Wieczorek previously wrote checks, that should prevent unnecesary cdc events from being written. His implementation missed the fact, that a single `mutation` object passed to cdc code to be analysed for cdc log entries can contain modifications for multiple rows (with the same timestamp - for example as a result to BatchWriteItem call). His code tries to skip whole `mutation`, which in such case is not possible, because BatchWriteItem might have one item that does nothing and second item that does modification (this is the reason for the second bug). His algorithm was extended and moved. Originally it was working as follows - user would sent a `mutation` object with some changes to be "augmented". The cdc would process those changes and built a set of cdc log changes based on them, that would be added to cdc log table. Piotr added a `should_skip` function, which processes user changes and tried to determine if they all should be dropped or not. New version, instead of trying to skip adding rows to cdc log `mutation` object, builds a rows-to-ignore set. After whole cdc log `mutation` object is completed, it processes it and go through it row by row. Any row that was previously added to a `rows_to_ignore` set will now be removed. Remaining rows are written to new cdc log `mutation` with new clustering key (`cdc$batch_seq_no` index value should probably be consecutive - we just want to be safe here) and returns new `mutation` object to be sent to cdc log table. The first bug is fixed as a side effect of new algorithm, which contains more precise checks detecting, if given mutation actually made a difference. Fixes: #28368 Fixes: SCYLLADB-538 Fixes: SCYLLADB-1528 Refs: #28452 --- cdc/log.cc | 175 +++++++++++++++++++++++++++++-- docs/alternator/compatibility.md | 7 ++ mutation/mutation_partition.hh | 4 + test/alternator/test_streams.py | 11 -- 4 files changed, 176 insertions(+), 21 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index 955f1f290f..61575ea7d3 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -8,7 +8,7 @@ #include #include - +#include #include #include #include @@ -47,6 +47,7 @@ #include "tracing/trace_state.hh" #include "stats.hh" #include "utils/labels.hh" +#include "alternator/executor.hh" namespace std { @@ -1068,6 +1069,14 @@ public: return create_ck(_batch_no - 1); } + api::timestamp_type get_timestamp() const { + return _ts; + } + + ttl_opt get_ttl() const { + return _ttl; + } + // A common pattern is to allocate a row and then immediately set its `cdc$operation` column. clustering_key allocate_new_log_row(operation op) { auto log_ck = allocate_new_log_row(); @@ -1209,15 +1218,25 @@ struct process_row_visitor { row_states_map& _clustering_row_states; const bool _generate_delta_values = true; + + // true if we are processing changes that were produced by Alternator + const bool _alternator; + // will be set to true, if any kind of change in row will be detected. Used only, when processing Alternator's changes. + bool _alternator_any_value_changed = false; + + // will be set to true, if Alternator's collection column (:attrs) will be modified only by removing elements + // Used only, when processing Alternator's changes. + bool _alternator_only_deletes = false; + process_row_visitor( const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder, bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state, - row_states_map& clustering_row_states, bool generate_delta_values) + row_states_map& clustering_row_states, bool generate_delta_values, bool alternator = false) : _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder), _enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state), _clustering_row_states(clustering_row_states), - _generate_delta_values(generate_delta_values) + _generate_delta_values(generate_delta_values), _alternator(alternator) {} void update_row_state(const column_definition& cdef, managed_bytes_opt value) { @@ -1227,7 +1246,17 @@ struct process_row_visitor { auto [it, _] = _clustering_row_states.try_emplace(*_base_ck); _row_state = &it->second; } - (*_row_state)[&cdef] = std::move(value); + auto [ it, inserted ] = _row_state->insert({ &cdef, std::nullopt }); + + // we ignore `_alternator_any_value_changed` for non-alternator changes. + // we don't filter if `_enable_updating_state` is false, as on top of needing pre image + // we also need cdc to build post image for us + // we add check for `_alternator` here for performance reasons - no point in byte compare objects + // if the return value will be ignored + if (_alternator && _enable_updating_state) { + _alternator_any_value_changed = _alternator_any_value_changed || it->second != value; + } + it->second = std::move(value); } void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) { @@ -1377,6 +1406,8 @@ struct process_row_visitor { auto&& deleted_keys = std::get<1>(result); auto&& added_cells = std::get<2>(result); + _alternator_only_deletes = cdef.name_as_text() == alternator::executor::ATTRS_COLUMN_NAME && !deleted_keys.empty() && !added_cells.has_value(); + // FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob, // then we deserialize again when merging images below managed_bytes_opt deleted_elements = std::nullopt; @@ -1434,12 +1465,31 @@ struct process_change_visitor { const bool _enable_updating_state = false; row_states_map& _clustering_row_states; + + // clustering keys' as bytes of rows that should be ignored, when writing cdc log changes + // filtering will be done in `clean_up_noop_rows` function. Used only, when processing Alternator's changes. + // Since Alternator clustering key is always at most single column, we store unpacked clustering key. + // If Alternator table is without clustering key, that means partition has at most one row, any value present + // in _alternator_clustering_keys_to_ignore will make us ignore that single row - + // we will use an empty bytes object. + std::unordered_set& _alternator_clustering_keys_to_ignore; + cell_map& _static_row_state; + const bool _alternator_schema_has_no_clustering_key = false; + const bool _is_update = false; const bool _generate_delta_values = true; + // only called, when processing Alternator's change + void alternator_add_ckey_to_rows_to_ignore(const clustering_key& ckey) { + throwing_assert(_request_options.alternator); + auto res = ckey.explode(); + auto ckey_exploded = !res.empty() ? res[0] : bytes{}; + _alternator_clustering_keys_to_ignore.insert(ckey_exploded); + } + void static_row_cells(auto&& visit_row_cells) { _touched_parts.set(); @@ -1471,16 +1521,29 @@ struct process_change_visitor { } }; + auto row_state = get_row_state(_clustering_row_states, ckey); clustering_row_cells_visitor v( log_ck, _touched_parts, _builder, - _enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey), - _clustering_row_states, _generate_delta_values); + _enable_updating_state, &ckey, row_state, + _clustering_row_states, _generate_delta_values, _request_options.alternator); if (_is_update && _request_options.alternator) { - v._marker_op = operation::update; + v._marker_op = row_state ? operation::update : operation::insert; } visit_row_cells(v); if (_enable_updating_state) { + if (_request_options.alternator && !v._alternator_any_value_changed) { + // we need additional checks here: + // - without `row_state != nullptr` inserting new key without additional fields (so only partition / clustering key) would be + // treated as no-change, because without additional fields given by the user `v` visitor won't visit any cells + // and _alternator_any_value_changed will be false (thus item will be skipped), + // - without `row_state == nullptr && v._alternator_only_deletes` check we won't properly ignore + // column deletes for existing items, but without the column we want to delete - + // item exists (so row_state != nullptr), but we delete non-existing column, so no-op + if (row_state != nullptr || (row_state == nullptr && v._alternator_only_deletes)) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } + } // #7716: if there are no regular columns, our visitor would not have visited any cells, // hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced. // Ensure that the row state exists. @@ -1497,8 +1560,12 @@ struct process_change_visitor { auto log_ck = _builder.allocate_new_log_row(_row_delete_op); _builder.set_clustering_columns(log_ck, ckey); - if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) { - _clustering_row_states.erase(ckey); + if (_enable_updating_state) { + if (get_row_state(_clustering_row_states, ckey)) { + _clustering_row_states.erase(ckey); + } else if (_request_options.alternator) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } } } @@ -1540,6 +1607,22 @@ struct process_change_visitor { _touched_parts.set(); auto log_ck = _builder.allocate_new_log_row(_partition_delete_op); if (_enable_updating_state) { + if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) { + // Alternator's table can be with or without clustering key. If the clustering key exists, + // delete request will be `clustered_row_delete` and will be hanlded there. + // If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here. + // The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells` + // and has some value as clustering_key (the value currently seems to be empty bytes object). + // We don't want to rely on knowing the value exactly, instead we rely on the fact that + // there will be at most one item in a partition. So if `_clustering_row_states` is empty, + // we know the delete is for a non-existing item and we should ignore it. + // If `_clustering_row_states` is not empty, then we know the delete is for an existing item + // we should log it and clear `_clustering_row_states`. + // The same logic applies to `alternator_add_ckey_to_rows_to_ignore` call in `clustered_row_delete` + // we need to insert "anything" for no-clustering-key case, so further logic will check + // if map is empty or not and will know if it should ignore the single partition item and keep it. + alternator_add_ckey_to_rows_to_ignore({}); + } _clustering_row_states.clear(); } } @@ -1647,6 +1730,47 @@ private: stats::part_type_set _touched_parts; + std::unordered_set _alternator_clustering_keys_to_ignore; + const column_definition* _alternator_clustering_key_column = nullptr; + + // the function will process mutations and remove rows that are in _alternator_clustering_keys_to_ignore + // we need to take care and reindex clustering keys (cdc$batch_seq_no) + // this is used for Alternator's changes only + // NOTE: `_alternator_clustering_keys_to_ignore` must be not empty. + mutation clean_up_noop_rows(mutation mut) { + throwing_assert(!_alternator_clustering_keys_to_ignore.empty()); + auto after_mut = mutation(_log_schema, mut.key()); + if (!_alternator_clustering_key_column) { + // no clustering key - only single row per partition + // since _alternator_clustering_keys_to_ignore is not empty we need to drop that single row + // so we just return empty mutation instead + return after_mut; + } + int batch_seq = 0; + for (rows_entry &row : mut.partition().mutable_non_dummy_rows()) { + auto cell = row.row().cells().find_cell(_alternator_clustering_key_column->id); + if (cell) { + auto val = cell->as_atomic_cell(*_alternator_clustering_key_column).value().linearize(); + + if (_alternator_clustering_keys_to_ignore.contains(val)) { + continue; + } + } + auto new_key = _builder->create_ck(batch_seq++); + after_mut.partition().clustered_row(*_log_schema, std::move(new_key)) = std::move(row.row()); + } + + if (batch_seq > 0) { + // update end_of_batch marker + // we don't need to clear previous one, as we only removed rows + // we need to set it on the last row, because original last row might have been deleted + // batch_seq == 0 -> no rows, after_mut is empty, all entries were dropped and there's nothing to write to cdc log + auto last_key = _builder->create_ck(batch_seq - 1); + after_mut.set_cell(last_key, log_meta_column_name_bytes("end_of_batch"), data_value(true), _builder->get_timestamp(), _builder->get_ttl()); + } + + return after_mut; + } public: transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options) : _ctx(ctx) @@ -1656,7 +1780,20 @@ public: , _options(options) , _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema)) , _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets()) + , _alternator_clustering_keys_to_ignore() { + if (_options.alternator) { + auto cks = _schema->clustering_key_columns(); + const column_definition *ck_def = nullptr; + if (!cks.empty()) { + auto it = _log_schema->columns_by_name().find(cks.front().name()); + if (it == _log_schema->columns_by_name().end()) { + on_internal_error(cdc_log, fmt::format("failed to find clustering key `{}` in cdc log table `{}`", cks.front().name(), _log_schema->id())); + } + ck_def = it->second; + } + _alternator_clustering_key_column = ck_def; + } } // DON'T move the transformer after this @@ -1664,7 +1801,10 @@ public: const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token()); _result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema)); _builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema); - _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()); + // alternator_streams_increased_compatibility set to true reads preimage, but we need to set + // _enable_updating_state to true to keep track of changes and produce correct pre/post images even + // if upper layer didn't request them explicitly. + _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()) || (_options.alternator && _options.alternator_streams_increased_compatibility); } void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override { @@ -1761,7 +1901,9 @@ public: ._builder = *_builder, ._enable_updating_state = _enable_updating_state, ._clustering_row_states = _clustering_row_states, + ._alternator_clustering_keys_to_ignore = _alternator_clustering_keys_to_ignore, ._static_row_state = _static_row_state, + ._alternator_schema_has_no_clustering_key = (_alternator_clustering_key_column == nullptr), ._is_update = _is_update, ._generate_delta_values = generate_delta_values(_builder->base_schema()) }; @@ -1771,6 +1913,19 @@ public: void end_record() override { SCYLLA_ASSERT(_builder); _builder->end_record(); + + if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) { + // we filter mutations for Alternator's changes here. + // We do it per mutation object (user might submit a batch of those in one go + // and some might be splitted because of different timestamps), + // ignore key set is cleared afterwards. + // If single mutation object contains two separate changes to the same row + // and at least one of them is ignored, all of them will be ignored. + // This is not possible in Alternator - Alternator spec forbids reusing + // primary key in single batch. + _result_mutations.back() = clean_up_noop_rows(std::move(_result_mutations.back())); + _alternator_clustering_keys_to_ignore.clear(); + } } const row_states_map& clustering_row_states() const override { diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 5b2198f3e0..dc0e3f79da 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -325,6 +325,13 @@ experimental: in July 2025 to optimize shard discovery, is not yet implemented in Alternator. + * With the ``alternator_streams_increased_compatibility`` configuration + option enabled, operations that do not change the database state + (e.g., deleting a non-existent item, removing a non-existent + attribute, or re-inserting an identical item) will not produce + stream events. Without this option, such no-op operations may still + generate spurious stream events. + ## Unimplemented API features diff --git a/mutation/mutation_partition.hh b/mutation/mutation_partition.hh index 7cf6278677..7c000759e0 100644 --- a/mutation/mutation_partition.hh +++ b/mutation/mutation_partition.hh @@ -1486,6 +1486,10 @@ public: return std::ranges::subrange(_rows.begin(), _rows.end()) | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); } + auto mutable_non_dummy_rows() { + return std::ranges::subrange(_rows.begin(), _rows.end()) + | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); + } void accept(const schema&, mutation_partition_visitor&) const; // Returns the number of live CQL rows in this partition. diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index d54ef3202b..e6f833c269 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -291,7 +291,6 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): # Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag. # Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue. # Reproduces SCYLLADB-1528. -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -314,7 +313,6 @@ def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_it # only delete column from non-existing item used to incorrectly emit a MODIFY event # test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail # Reproduces SCYLLADB-1528 -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -346,7 +344,6 @@ def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_imag do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') # the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -1812,42 +1809,34 @@ def do_updates_1_no_ck(table, p, _): return events -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES') From 04b9d3875ff5b1a94dc2c2ddd3500dce22ad273b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Mon, 2 Feb 2026 13:04:59 +0100 Subject: [PATCH 3/3] alternator: remove unnecesary code After our fix, that prevents no-op changes being written into cdc log we will remove Piotr Wieczorek's previous attempt, which is now unnecesary. --- cdc/log.cc | 10 ++-- cdc/split.cc | 126 +-------------------------------------------------- cdc/split.hh | 6 +-- 3 files changed, 7 insertions(+), 135 deletions(-) diff --git a/cdc/log.cc b/cdc/log.cc index 61575ea7d3..42c9728357 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -1928,10 +1928,6 @@ public: } } - const row_states_map& clustering_row_states() const override { - return _clustering_row_states; - } - // Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime. // The `transformer` object on which this method was called on should not be used anymore. std::tuple, stats::part_type_set> finish() && { @@ -2168,7 +2164,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key()); } - return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr rs) mutable { + return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr rs) mutable { auto& m = mutations[idx]; auto& s = m.schema(); @@ -2186,10 +2182,10 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, if (should_split(m, options)) { tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key()); details.was_split = true; - process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility); + process_changes_with_splitting(m, trans, preimage, postimage); } else { tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key()); - process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility); + process_changes_without_splitting(m, trans, preimage, postimage); } auto [log_mut, touched_parts] = std::move(trans).finish(); const int generated_count = log_mut.size(); diff --git a/cdc/split.cc b/cdc/split.cc index a441fd5414..66183d9e1b 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -6,26 +6,15 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 */ -#include "bytes.hh" -#include "bytes_fwd.hh" -#include "mutation/atomic_cell.hh" -#include "mutation/atomic_cell_or_collection.hh" -#include "mutation/collection_mutation.hh" #include "mutation/mutation.hh" -#include "mutation/tombstone.hh" #include "schema/schema.hh" -#include #include "types/concrete_types.hh" -#include "types/types.hh" #include "types/user.hh" #include "split.hh" #include "log.hh" #include "change_visitor.hh" -#include "utils/managed_bytes.hh" -#include -#include extern logging::logger cdc_log; @@ -610,109 +599,8 @@ bool should_split(const mutation& m, const per_request_options& options) { || v._ts == api::missing_timestamp; } -// Returns true if the row state and the atomic and nonatomic entries represent -// an equivalent item. -static bool entries_match_row_state(const schema_ptr& base_schema, const cell_map& row_state, const std::vector& atomic_entries, - std::vector& nonatomic_entries) { - for (const auto& update : atomic_entries) { - const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id); - const auto it = row_state.find(&cdef); - if (it == row_state.end()) { - return false; - } - if (to_managed_bytes_opt(update.cell.value().linearize()) != it->second) { - return false; - } - } - if (nonatomic_entries.empty()) { - return true; - } - - for (const auto& update : nonatomic_entries) { - const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id); - const auto it = row_state.find(&cdef); - if (it == row_state.end()) { - return false; - } - - // The only collection used by Alternator is a non-frozen map. - auto current_raw_map = cdef.type->deserialize(*it->second); - map_type_impl::native_type current_values = value_cast(current_raw_map); - - if (current_values.size() != update.cells.size()) { - return false; - } - - std::unordered_map current_values_map; - for (const auto& entry : current_values) { - const auto attr_name = std::string_view(value_cast(entry.first)); - current_values_map[attr_name] = value_cast(entry.second); - } - - for (const auto& [key, value] : update.cells) { - const auto key_str = to_string_view(key); - if (!value.is_live()) { - if (current_values_map.contains(key_str)) { - return false; - } - } else if (current_values_map[key_str] != value.value().linearize()) { - return false; - } - } - } - return true; -} - -bool should_skip(batch& changes, const mutation& base_mutation, change_processor& processor) { - const schema_ptr& base_schema = base_mutation.schema(); - // Alternator doesn't use static updates and clustered range deletions. - if (!changes.static_updates.empty() || !changes.clustered_range_deletions.empty()) { - return false; - } - - for (clustered_row_insert& u : changes.clustered_inserts) { - const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key); - if (!row_state) { - return false; - } - if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) { - return false; - } - } - - for (clustered_row_update& u : changes.clustered_updates) { - const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key); - if (!row_state) { - return false; - } - if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) { - return false; - } - } - - // Skip only if the row being deleted does not exist (i.e. the deletion is a no-op). - for (const auto& row_deletion : changes.clustered_row_deletions) { - if (processor.clustering_row_states().contains(row_deletion.key)) { - return false; - } - } - - // Don't skip if the item exists. - // - // Increased DynamoDB Streams compatibility guarantees that single-item - // operations will read the item and store it in the clustering row states. - // If it is not found there, we may skip CDC. This is safe as long as the - // assumptions of this operation's write isolation are not violated. - if (changes.partition_deletions && processor.clustering_row_states().contains(clustering_key::make_empty())) { - return false; - } - - cdc_log.trace("Skipping CDC log for mutation {}", base_mutation); - return true; -} - void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) { + bool enable_preimage, bool enable_postimage) { const auto base_schema = base_mutation.schema(); auto changes = extract_changes(base_mutation); auto pk = base_mutation.key(); @@ -732,10 +620,6 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces affected_clustered_columns_per_row = btch.get_affected_clustered_columns_per_row(*base_mutation.schema()); } - if (alternator_strict_compatibility && should_skip(btch, base_mutation, processor)) { - continue; - } - const bool is_last = change_ts == last_timestamp; processor.begin_timestamp(change_ts, is_last); if (enable_preimage) { @@ -825,13 +709,7 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces } void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) { - if (alternator_strict_compatibility) { - auto changes = extract_changes(base_mutation); - if (should_skip(changes.begin()->second, base_mutation, processor)) { - return; - } - } + bool enable_preimage, bool enable_postimage) { auto ts = find_timestamp(base_mutation); processor.begin_timestamp(ts, true); diff --git a/cdc/split.hh b/cdc/split.hh index cda25a7aa9..dd57f397b5 100644 --- a/cdc/split.hh +++ b/cdc/split.hh @@ -66,14 +66,12 @@ public: // Tells processor we have reached end of record - last part // of a given timestamp batch virtual void end_record() = 0; - - virtual const row_states_map& clustering_row_states() const = 0; }; bool should_split(const mutation& base_mutation, const per_request_options& options); void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility); + bool enable_preimage, bool enable_postimage); void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility); + bool enable_preimage, bool enable_postimage); }