diff --git a/cdc/log.cc b/cdc/log.cc index 955f1f290f..61575ea7d3 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -8,7 +8,7 @@ #include #include - +#include #include #include #include @@ -47,6 +47,7 @@ #include "tracing/trace_state.hh" #include "stats.hh" #include "utils/labels.hh" +#include "alternator/executor.hh" namespace std { @@ -1068,6 +1069,14 @@ public: return create_ck(_batch_no - 1); } + api::timestamp_type get_timestamp() const { + return _ts; + } + + ttl_opt get_ttl() const { + return _ttl; + } + // A common pattern is to allocate a row and then immediately set its `cdc$operation` column. clustering_key allocate_new_log_row(operation op) { auto log_ck = allocate_new_log_row(); @@ -1209,15 +1218,25 @@ struct process_row_visitor { row_states_map& _clustering_row_states; const bool _generate_delta_values = true; + + // true if we are processing changes that were produced by Alternator + const bool _alternator; + // will be set to true, if any kind of change in row will be detected. Used only, when processing Alternator's changes. + bool _alternator_any_value_changed = false; + + // will be set to true, if Alternator's collection column (:attrs) will be modified only by removing elements + // Used only, when processing Alternator's changes. + bool _alternator_only_deletes = false; + process_row_visitor( const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder, bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state, - row_states_map& clustering_row_states, bool generate_delta_values) + row_states_map& clustering_row_states, bool generate_delta_values, bool alternator = false) : _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder), _enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state), _clustering_row_states(clustering_row_states), - _generate_delta_values(generate_delta_values) + _generate_delta_values(generate_delta_values), _alternator(alternator) {} void update_row_state(const column_definition& cdef, managed_bytes_opt value) { @@ -1227,7 +1246,17 @@ struct process_row_visitor { auto [it, _] = _clustering_row_states.try_emplace(*_base_ck); _row_state = &it->second; } - (*_row_state)[&cdef] = std::move(value); + auto [ it, inserted ] = _row_state->insert({ &cdef, std::nullopt }); + + // we ignore `_alternator_any_value_changed` for non-alternator changes. + // we don't filter if `_enable_updating_state` is false, as on top of needing pre image + // we also need cdc to build post image for us + // we add check for `_alternator` here for performance reasons - no point in byte compare objects + // if the return value will be ignored + if (_alternator && _enable_updating_state) { + _alternator_any_value_changed = _alternator_any_value_changed || it->second != value; + } + it->second = std::move(value); } void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) { @@ -1377,6 +1406,8 @@ struct process_row_visitor { auto&& deleted_keys = std::get<1>(result); auto&& added_cells = std::get<2>(result); + _alternator_only_deletes = cdef.name_as_text() == alternator::executor::ATTRS_COLUMN_NAME && !deleted_keys.empty() && !added_cells.has_value(); + // FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob, // then we deserialize again when merging images below managed_bytes_opt deleted_elements = std::nullopt; @@ -1434,12 +1465,31 @@ struct process_change_visitor { const bool _enable_updating_state = false; row_states_map& _clustering_row_states; + + // clustering keys' as bytes of rows that should be ignored, when writing cdc log changes + // filtering will be done in `clean_up_noop_rows` function. Used only, when processing Alternator's changes. + // Since Alternator clustering key is always at most single column, we store unpacked clustering key. + // If Alternator table is without clustering key, that means partition has at most one row, any value present + // in _alternator_clustering_keys_to_ignore will make us ignore that single row - + // we will use an empty bytes object. + std::unordered_set& _alternator_clustering_keys_to_ignore; + cell_map& _static_row_state; + const bool _alternator_schema_has_no_clustering_key = false; + const bool _is_update = false; const bool _generate_delta_values = true; + // only called, when processing Alternator's change + void alternator_add_ckey_to_rows_to_ignore(const clustering_key& ckey) { + throwing_assert(_request_options.alternator); + auto res = ckey.explode(); + auto ckey_exploded = !res.empty() ? res[0] : bytes{}; + _alternator_clustering_keys_to_ignore.insert(ckey_exploded); + } + void static_row_cells(auto&& visit_row_cells) { _touched_parts.set(); @@ -1471,16 +1521,29 @@ struct process_change_visitor { } }; + auto row_state = get_row_state(_clustering_row_states, ckey); clustering_row_cells_visitor v( log_ck, _touched_parts, _builder, - _enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey), - _clustering_row_states, _generate_delta_values); + _enable_updating_state, &ckey, row_state, + _clustering_row_states, _generate_delta_values, _request_options.alternator); if (_is_update && _request_options.alternator) { - v._marker_op = operation::update; + v._marker_op = row_state ? operation::update : operation::insert; } visit_row_cells(v); if (_enable_updating_state) { + if (_request_options.alternator && !v._alternator_any_value_changed) { + // we need additional checks here: + // - without `row_state != nullptr` inserting new key without additional fields (so only partition / clustering key) would be + // treated as no-change, because without additional fields given by the user `v` visitor won't visit any cells + // and _alternator_any_value_changed will be false (thus item will be skipped), + // - without `row_state == nullptr && v._alternator_only_deletes` check we won't properly ignore + // column deletes for existing items, but without the column we want to delete - + // item exists (so row_state != nullptr), but we delete non-existing column, so no-op + if (row_state != nullptr || (row_state == nullptr && v._alternator_only_deletes)) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } + } // #7716: if there are no regular columns, our visitor would not have visited any cells, // hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced. // Ensure that the row state exists. @@ -1497,8 +1560,12 @@ struct process_change_visitor { auto log_ck = _builder.allocate_new_log_row(_row_delete_op); _builder.set_clustering_columns(log_ck, ckey); - if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) { - _clustering_row_states.erase(ckey); + if (_enable_updating_state) { + if (get_row_state(_clustering_row_states, ckey)) { + _clustering_row_states.erase(ckey); + } else if (_request_options.alternator) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } } } @@ -1540,6 +1607,22 @@ struct process_change_visitor { _touched_parts.set(); auto log_ck = _builder.allocate_new_log_row(_partition_delete_op); if (_enable_updating_state) { + if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) { + // Alternator's table can be with or without clustering key. If the clustering key exists, + // delete request will be `clustered_row_delete` and will be hanlded there. + // If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here. + // The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells` + // and has some value as clustering_key (the value currently seems to be empty bytes object). + // We don't want to rely on knowing the value exactly, instead we rely on the fact that + // there will be at most one item in a partition. So if `_clustering_row_states` is empty, + // we know the delete is for a non-existing item and we should ignore it. + // If `_clustering_row_states` is not empty, then we know the delete is for an existing item + // we should log it and clear `_clustering_row_states`. + // The same logic applies to `alternator_add_ckey_to_rows_to_ignore` call in `clustered_row_delete` + // we need to insert "anything" for no-clustering-key case, so further logic will check + // if map is empty or not and will know if it should ignore the single partition item and keep it. + alternator_add_ckey_to_rows_to_ignore({}); + } _clustering_row_states.clear(); } } @@ -1647,6 +1730,47 @@ private: stats::part_type_set _touched_parts; + std::unordered_set _alternator_clustering_keys_to_ignore; + const column_definition* _alternator_clustering_key_column = nullptr; + + // the function will process mutations and remove rows that are in _alternator_clustering_keys_to_ignore + // we need to take care and reindex clustering keys (cdc$batch_seq_no) + // this is used for Alternator's changes only + // NOTE: `_alternator_clustering_keys_to_ignore` must be not empty. + mutation clean_up_noop_rows(mutation mut) { + throwing_assert(!_alternator_clustering_keys_to_ignore.empty()); + auto after_mut = mutation(_log_schema, mut.key()); + if (!_alternator_clustering_key_column) { + // no clustering key - only single row per partition + // since _alternator_clustering_keys_to_ignore is not empty we need to drop that single row + // so we just return empty mutation instead + return after_mut; + } + int batch_seq = 0; + for (rows_entry &row : mut.partition().mutable_non_dummy_rows()) { + auto cell = row.row().cells().find_cell(_alternator_clustering_key_column->id); + if (cell) { + auto val = cell->as_atomic_cell(*_alternator_clustering_key_column).value().linearize(); + + if (_alternator_clustering_keys_to_ignore.contains(val)) { + continue; + } + } + auto new_key = _builder->create_ck(batch_seq++); + after_mut.partition().clustered_row(*_log_schema, std::move(new_key)) = std::move(row.row()); + } + + if (batch_seq > 0) { + // update end_of_batch marker + // we don't need to clear previous one, as we only removed rows + // we need to set it on the last row, because original last row might have been deleted + // batch_seq == 0 -> no rows, after_mut is empty, all entries were dropped and there's nothing to write to cdc log + auto last_key = _builder->create_ck(batch_seq - 1); + after_mut.set_cell(last_key, log_meta_column_name_bytes("end_of_batch"), data_value(true), _builder->get_timestamp(), _builder->get_ttl()); + } + + return after_mut; + } public: transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options) : _ctx(ctx) @@ -1656,7 +1780,20 @@ public: , _options(options) , _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema)) , _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets()) + , _alternator_clustering_keys_to_ignore() { + if (_options.alternator) { + auto cks = _schema->clustering_key_columns(); + const column_definition *ck_def = nullptr; + if (!cks.empty()) { + auto it = _log_schema->columns_by_name().find(cks.front().name()); + if (it == _log_schema->columns_by_name().end()) { + on_internal_error(cdc_log, fmt::format("failed to find clustering key `{}` in cdc log table `{}`", cks.front().name(), _log_schema->id())); + } + ck_def = it->second; + } + _alternator_clustering_key_column = ck_def; + } } // DON'T move the transformer after this @@ -1664,7 +1801,10 @@ public: const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token()); _result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema)); _builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema); - _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()); + // alternator_streams_increased_compatibility set to true reads preimage, but we need to set + // _enable_updating_state to true to keep track of changes and produce correct pre/post images even + // if upper layer didn't request them explicitly. + _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()) || (_options.alternator && _options.alternator_streams_increased_compatibility); } void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override { @@ -1761,7 +1901,9 @@ public: ._builder = *_builder, ._enable_updating_state = _enable_updating_state, ._clustering_row_states = _clustering_row_states, + ._alternator_clustering_keys_to_ignore = _alternator_clustering_keys_to_ignore, ._static_row_state = _static_row_state, + ._alternator_schema_has_no_clustering_key = (_alternator_clustering_key_column == nullptr), ._is_update = _is_update, ._generate_delta_values = generate_delta_values(_builder->base_schema()) }; @@ -1771,6 +1913,19 @@ public: void end_record() override { SCYLLA_ASSERT(_builder); _builder->end_record(); + + if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) { + // we filter mutations for Alternator's changes here. + // We do it per mutation object (user might submit a batch of those in one go + // and some might be splitted because of different timestamps), + // ignore key set is cleared afterwards. + // If single mutation object contains two separate changes to the same row + // and at least one of them is ignored, all of them will be ignored. + // This is not possible in Alternator - Alternator spec forbids reusing + // primary key in single batch. + _result_mutations.back() = clean_up_noop_rows(std::move(_result_mutations.back())); + _alternator_clustering_keys_to_ignore.clear(); + } } const row_states_map& clustering_row_states() const override { diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 5b2198f3e0..dc0e3f79da 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -325,6 +325,13 @@ experimental: in July 2025 to optimize shard discovery, is not yet implemented in Alternator. + * With the ``alternator_streams_increased_compatibility`` configuration + option enabled, operations that do not change the database state + (e.g., deleting a non-existent item, removing a non-existent + attribute, or re-inserting an identical item) will not produce + stream events. Without this option, such no-op operations may still + generate spurious stream events. + ## Unimplemented API features diff --git a/mutation/mutation_partition.hh b/mutation/mutation_partition.hh index 7cf6278677..7c000759e0 100644 --- a/mutation/mutation_partition.hh +++ b/mutation/mutation_partition.hh @@ -1486,6 +1486,10 @@ public: return std::ranges::subrange(_rows.begin(), _rows.end()) | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); } + auto mutable_non_dummy_rows() { + return std::ranges::subrange(_rows.begin(), _rows.end()) + | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); + } void accept(const schema&, mutation_partition_visitor&) const; // Returns the number of live CQL rows in this partition. diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index d54ef3202b..e6f833c269 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -291,7 +291,6 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): # Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag. # Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue. # Reproduces SCYLLADB-1528. -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -314,7 +313,6 @@ def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_it # only delete column from non-existing item used to incorrectly emit a MODIFY event # test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail # Reproduces SCYLLADB-1528 -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -346,7 +344,6 @@ def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_imag do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') # the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): null = None def do_updates(table, p, c): @@ -1812,42 +1809,34 @@ def do_updates_1_no_ck(table, p, _): return events -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE') -@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')