diff --git a/cdc/log.cc b/cdc/log.cc index 89fbcabe6c..c5067f5d93 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -8,7 +8,7 @@ #include #include - +#include #include #include #include @@ -47,6 +47,7 @@ #include "tracing/trace_state.hh" #include "stats.hh" #include "utils/labels.hh" +#include "alternator/executor.hh" namespace std { @@ -1068,6 +1069,14 @@ public: return create_ck(_batch_no - 1); } + api::timestamp_type get_timestamp() const { + return _ts; + } + + ttl_opt get_ttl() const { + return _ttl; + } + // A common pattern is to allocate a row and then immediately set its `cdc$operation` column. clustering_key allocate_new_log_row(operation op) { auto log_ck = allocate_new_log_row(); @@ -1209,15 +1218,25 @@ struct process_row_visitor { row_states_map& _clustering_row_states; const bool _generate_delta_values = true; + + // true if we are processing changes that were produced by Alternator + const bool _alternator; + // will be set to true, if any kind of change in row will be detected. Used only, when processing Alternator's changes. + bool _alternator_any_value_changed = false; + + // will be set to true, if Alternator's collection column (:attrs) will be modified only by removing elements + // Used only, when processing Alternator's changes. + bool _alternator_only_deletes = false; + process_row_visitor( const clustering_key& log_ck, stats::part_type_set& touched_parts, log_mutation_builder& builder, bool enable_updating_state, const clustering_key* base_ck, cell_map* row_state, - row_states_map& clustering_row_states, bool generate_delta_values) + row_states_map& clustering_row_states, bool generate_delta_values, bool alternator = false) : _log_ck(log_ck), _touched_parts(touched_parts), _builder(builder), _enable_updating_state(enable_updating_state), _base_ck(base_ck), _row_state(row_state), _clustering_row_states(clustering_row_states), - _generate_delta_values(generate_delta_values) + _generate_delta_values(generate_delta_values), _alternator(alternator) {} void update_row_state(const column_definition& cdef, managed_bytes_opt value) { @@ -1227,7 +1246,17 @@ struct process_row_visitor { auto [it, _] = _clustering_row_states.try_emplace(*_base_ck); _row_state = &it->second; } - (*_row_state)[&cdef] = std::move(value); + auto [ it, inserted ] = _row_state->insert({ &cdef, std::nullopt }); + + // we ignore `_alternator_any_value_changed` for non-alternator changes. + // we don't filter if `_enable_updating_state` is false, as on top of needing pre image + // we also need cdc to build post image for us + // we add check for `_alternator` here for performance reasons - no point in byte compare objects + // if the return value will be ignored + if (_alternator && _enable_updating_state) { + _alternator_any_value_changed = _alternator_any_value_changed || it->second != value; + } + it->second = std::move(value); } void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) { @@ -1377,6 +1406,8 @@ struct process_row_visitor { auto&& deleted_keys = std::get<1>(result); auto&& added_cells = std::get<2>(result); + _alternator_only_deletes = cdef.name_as_text() == alternator::executor::ATTRS_COLUMN_NAME && !deleted_keys.empty() && !added_cells.has_value(); + // FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob, // then we deserialize again when merging images below managed_bytes_opt deleted_elements = std::nullopt; @@ -1434,12 +1465,31 @@ struct process_change_visitor { const bool _enable_updating_state = false; row_states_map& _clustering_row_states; + + // clustering keys' as bytes of rows that should be ignored, when writing cdc log changes + // filtering will be done in `clean_up_noop_rows` function. Used only, when processing Alternator's changes. + // Since Alternator clustering key is always at most single column, we store unpacked clustering key. + // If Alternator table is without clustering key, that means partition has at most one row, any value present + // in _alternator_clustering_keys_to_ignore will make us ignore that single row - + // we will use an empty bytes object. + std::unordered_set& _alternator_clustering_keys_to_ignore; + cell_map& _static_row_state; + const bool _alternator_schema_has_no_clustering_key = false; + const bool _is_update = false; const bool _generate_delta_values = true; + // only called, when processing Alternator's change + void alternator_add_ckey_to_rows_to_ignore(const clustering_key& ckey) { + throwing_assert(_request_options.alternator); + auto res = ckey.explode(); + auto ckey_exploded = !res.empty() ? res[0] : bytes{}; + _alternator_clustering_keys_to_ignore.insert(ckey_exploded); + } + void static_row_cells(auto&& visit_row_cells) { _touched_parts.set(); @@ -1471,16 +1521,29 @@ struct process_change_visitor { } }; + auto row_state = get_row_state(_clustering_row_states, ckey); clustering_row_cells_visitor v( log_ck, _touched_parts, _builder, - _enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey), - _clustering_row_states, _generate_delta_values); + _enable_updating_state, &ckey, row_state, + _clustering_row_states, _generate_delta_values, _request_options.alternator); if (_is_update && _request_options.alternator) { - v._marker_op = operation::update; + v._marker_op = row_state ? operation::update : operation::insert; } visit_row_cells(v); if (_enable_updating_state) { + if (_request_options.alternator && !v._alternator_any_value_changed) { + // we need additional checks here: + // - without `row_state != nullptr` inserting new key without additional fields (so only partition / clustering key) would be + // treated as no-change, because without additional fields given by the user `v` visitor won't visit any cells + // and _alternator_any_value_changed will be false (thus item will be skipped), + // - without `row_state == nullptr && v._alternator_only_deletes` check we won't properly ignore + // column deletes for existing items, but without the column we want to delete - + // item exists (so row_state != nullptr), but we delete non-existing column, so no-op + if (row_state != nullptr || (row_state == nullptr && v._alternator_only_deletes)) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } + } // #7716: if there are no regular columns, our visitor would not have visited any cells, // hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced. // Ensure that the row state exists. @@ -1497,8 +1560,12 @@ struct process_change_visitor { auto log_ck = _builder.allocate_new_log_row(_row_delete_op); _builder.set_clustering_columns(log_ck, ckey); - if (_enable_updating_state && get_row_state(_clustering_row_states, ckey)) { - _clustering_row_states.erase(ckey); + if (_enable_updating_state) { + if (get_row_state(_clustering_row_states, ckey)) { + _clustering_row_states.erase(ckey); + } else if (_request_options.alternator) { + alternator_add_ckey_to_rows_to_ignore(ckey); + } } } @@ -1540,6 +1607,22 @@ struct process_change_visitor { _touched_parts.set(); auto log_ck = _builder.allocate_new_log_row(_partition_delete_op); if (_enable_updating_state) { + if (_request_options.alternator && _alternator_schema_has_no_clustering_key && _clustering_row_states.empty()) { + // Alternator's table can be with or without clustering key. If the clustering key exists, + // delete request will be `clustered_row_delete` and will be hanlded there. + // If the clustering key doesn't exist, delete request will be `partition_delete` and will be handled here. + // The no-clustering-key case is slightly tricky, because insert of such item is handled by `clustered_row_cells` + // and has some value as clustering_key (the value currently seems to be empty bytes object). + // We don't want to rely on knowing the value exactly, instead we rely on the fact that + // there will be at most one item in a partition. So if `_clustering_row_states` is empty, + // we know the delete is for a non-existing item and we should ignore it. + // If `_clustering_row_states` is not empty, then we know the delete is for an existing item + // we should log it and clear `_clustering_row_states`. + // The same logic applies to `alternator_add_ckey_to_rows_to_ignore` call in `clustered_row_delete` + // we need to insert "anything" for no-clustering-key case, so further logic will check + // if map is empty or not and will know if it should ignore the single partition item and keep it. + alternator_add_ckey_to_rows_to_ignore({}); + } _clustering_row_states.clear(); } } @@ -1647,6 +1730,47 @@ private: stats::part_type_set _touched_parts; + std::unordered_set _alternator_clustering_keys_to_ignore; + const column_definition* _alternator_clustering_key_column = nullptr; + + // the function will process mutations and remove rows that are in _alternator_clustering_keys_to_ignore + // we need to take care and reindex clustering keys (cdc$batch_seq_no) + // this is used for Alternator's changes only + // NOTE: `_alternator_clustering_keys_to_ignore` must be not empty. + mutation clean_up_noop_rows(mutation mut) { + throwing_assert(!_alternator_clustering_keys_to_ignore.empty()); + auto after_mut = mutation(_log_schema, mut.key()); + if (!_alternator_clustering_key_column) { + // no clustering key - only single row per partition + // since _alternator_clustering_keys_to_ignore is not empty we need to drop that single row + // so we just return empty mutation instead + return after_mut; + } + int batch_seq = 0; + for (rows_entry &row : mut.partition().mutable_non_dummy_rows()) { + auto cell = row.row().cells().find_cell(_alternator_clustering_key_column->id); + if (cell) { + auto val = cell->as_atomic_cell(*_alternator_clustering_key_column).value().linearize(); + + if (_alternator_clustering_keys_to_ignore.contains(val)) { + continue; + } + } + auto new_key = _builder->create_ck(batch_seq++); + after_mut.partition().clustered_row(*_log_schema, std::move(new_key)) = std::move(row.row()); + } + + if (batch_seq > 0) { + // update end_of_batch marker + // we don't need to clear previous one, as we only removed rows + // we need to set it on the last row, because original last row might have been deleted + // batch_seq == 0 -> no rows, after_mut is empty, all entries were dropped and there's nothing to write to cdc log + auto last_key = _builder->create_ck(batch_seq - 1); + after_mut.set_cell(last_key, log_meta_column_name_bytes("end_of_batch"), data_value(true), _builder->get_timestamp(), _builder->get_ttl()); + } + + return after_mut; + } public: transformer(db_context ctx, schema_ptr s, dht::decorated_key dk, const per_request_options& options) : _ctx(ctx) @@ -1656,7 +1780,20 @@ public: , _options(options) , _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema)) , _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets()) + , _alternator_clustering_keys_to_ignore() { + if (_options.alternator) { + auto cks = _schema->clustering_key_columns(); + const column_definition *ck_def = nullptr; + if (!cks.empty()) { + auto it = _log_schema->columns_by_name().find(cks.front().name()); + if (it == _log_schema->columns_by_name().end()) { + on_internal_error(cdc_log, fmt::format("failed to find clustering key `{}` in cdc log table `{}`", cks.front().name(), _log_schema->id())); + } + ck_def = it->second; + } + _alternator_clustering_key_column = ck_def; + } } // DON'T move the transformer after this @@ -1664,7 +1801,10 @@ public: const auto stream_id = _uses_tablets ? _ctx._cdc_metadata.get_tablet_stream(_log_schema->id(), ts, _dk.token()) : _ctx._cdc_metadata.get_vnode_stream(ts, _dk.token()); _result_mutations.emplace_back(_log_schema, stream_id.to_partition_key(*_log_schema)); _builder.emplace(_result_mutations.back(), ts, _dk.key(), *_schema); - _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()); + // alternator_streams_increased_compatibility set to true reads preimage, but we need to set + // _enable_updating_state to true to keep track of changes and produce correct pre/post images even + // if upper layer didn't request them explicitly. + _enable_updating_state = _schema->cdc_options().postimage() || (!is_last && _schema->cdc_options().preimage()) || (_options.alternator && _options.alternator_streams_increased_compatibility); } void produce_preimage(const clustering_key* ck, const one_kind_column_set& columns_to_include) override { @@ -1761,7 +1901,9 @@ public: ._builder = *_builder, ._enable_updating_state = _enable_updating_state, ._clustering_row_states = _clustering_row_states, + ._alternator_clustering_keys_to_ignore = _alternator_clustering_keys_to_ignore, ._static_row_state = _static_row_state, + ._alternator_schema_has_no_clustering_key = (_alternator_clustering_key_column == nullptr), ._is_update = _is_update, ._generate_delta_values = generate_delta_values(_builder->base_schema()) }; @@ -1771,10 +1913,19 @@ public: void end_record() override { SCYLLA_ASSERT(_builder); _builder->end_record(); - } - const row_states_map& clustering_row_states() const override { - return _clustering_row_states; + if (_options.alternator && !_alternator_clustering_keys_to_ignore.empty()) { + // we filter mutations for Alternator's changes here. + // We do it per mutation object (user might submit a batch of those in one go + // and some might be splitted because of different timestamps), + // ignore key set is cleared afterwards. + // If single mutation object contains two separate changes to the same row + // and at least one of them is ignored, all of them will be ignored. + // This is not possible in Alternator - Alternator spec forbids reusing + // primary key in single batch. + _result_mutations.back() = clean_up_noop_rows(std::move(_result_mutations.back())); + _alternator_clustering_keys_to_ignore.clear(); + } } // Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime. @@ -2013,7 +2164,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key()); } - return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr rs) mutable { + return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr rs) mutable { auto& m = mutations[idx]; auto& s = m.schema(); @@ -2031,10 +2182,10 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout, if (should_split(m, options)) { tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key()); details.was_split = true; - process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility); + process_changes_with_splitting(m, trans, preimage, postimage); } else { tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key()); - process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility); + process_changes_without_splitting(m, trans, preimage, postimage); } auto [log_mut, touched_parts] = std::move(trans).finish(); const int generated_count = log_mut.size(); diff --git a/cdc/split.cc b/cdc/split.cc index a441fd5414..66183d9e1b 100644 --- a/cdc/split.cc +++ b/cdc/split.cc @@ -6,26 +6,15 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 */ -#include "bytes.hh" -#include "bytes_fwd.hh" -#include "mutation/atomic_cell.hh" -#include "mutation/atomic_cell_or_collection.hh" -#include "mutation/collection_mutation.hh" #include "mutation/mutation.hh" -#include "mutation/tombstone.hh" #include "schema/schema.hh" -#include #include "types/concrete_types.hh" -#include "types/types.hh" #include "types/user.hh" #include "split.hh" #include "log.hh" #include "change_visitor.hh" -#include "utils/managed_bytes.hh" -#include -#include extern logging::logger cdc_log; @@ -610,109 +599,8 @@ bool should_split(const mutation& m, const per_request_options& options) { || v._ts == api::missing_timestamp; } -// Returns true if the row state and the atomic and nonatomic entries represent -// an equivalent item. -static bool entries_match_row_state(const schema_ptr& base_schema, const cell_map& row_state, const std::vector& atomic_entries, - std::vector& nonatomic_entries) { - for (const auto& update : atomic_entries) { - const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id); - const auto it = row_state.find(&cdef); - if (it == row_state.end()) { - return false; - } - if (to_managed_bytes_opt(update.cell.value().linearize()) != it->second) { - return false; - } - } - if (nonatomic_entries.empty()) { - return true; - } - - for (const auto& update : nonatomic_entries) { - const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id); - const auto it = row_state.find(&cdef); - if (it == row_state.end()) { - return false; - } - - // The only collection used by Alternator is a non-frozen map. - auto current_raw_map = cdef.type->deserialize(*it->second); - map_type_impl::native_type current_values = value_cast(current_raw_map); - - if (current_values.size() != update.cells.size()) { - return false; - } - - std::unordered_map current_values_map; - for (const auto& entry : current_values) { - const auto attr_name = std::string_view(value_cast(entry.first)); - current_values_map[attr_name] = value_cast(entry.second); - } - - for (const auto& [key, value] : update.cells) { - const auto key_str = to_string_view(key); - if (!value.is_live()) { - if (current_values_map.contains(key_str)) { - return false; - } - } else if (current_values_map[key_str] != value.value().linearize()) { - return false; - } - } - } - return true; -} - -bool should_skip(batch& changes, const mutation& base_mutation, change_processor& processor) { - const schema_ptr& base_schema = base_mutation.schema(); - // Alternator doesn't use static updates and clustered range deletions. - if (!changes.static_updates.empty() || !changes.clustered_range_deletions.empty()) { - return false; - } - - for (clustered_row_insert& u : changes.clustered_inserts) { - const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key); - if (!row_state) { - return false; - } - if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) { - return false; - } - } - - for (clustered_row_update& u : changes.clustered_updates) { - const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key); - if (!row_state) { - return false; - } - if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) { - return false; - } - } - - // Skip only if the row being deleted does not exist (i.e. the deletion is a no-op). - for (const auto& row_deletion : changes.clustered_row_deletions) { - if (processor.clustering_row_states().contains(row_deletion.key)) { - return false; - } - } - - // Don't skip if the item exists. - // - // Increased DynamoDB Streams compatibility guarantees that single-item - // operations will read the item and store it in the clustering row states. - // If it is not found there, we may skip CDC. This is safe as long as the - // assumptions of this operation's write isolation are not violated. - if (changes.partition_deletions && processor.clustering_row_states().contains(clustering_key::make_empty())) { - return false; - } - - cdc_log.trace("Skipping CDC log for mutation {}", base_mutation); - return true; -} - void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) { + bool enable_preimage, bool enable_postimage) { const auto base_schema = base_mutation.schema(); auto changes = extract_changes(base_mutation); auto pk = base_mutation.key(); @@ -732,10 +620,6 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces affected_clustered_columns_per_row = btch.get_affected_clustered_columns_per_row(*base_mutation.schema()); } - if (alternator_strict_compatibility && should_skip(btch, base_mutation, processor)) { - continue; - } - const bool is_last = change_ts == last_timestamp; processor.begin_timestamp(change_ts, is_last); if (enable_preimage) { @@ -825,13 +709,7 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces } void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) { - if (alternator_strict_compatibility) { - auto changes = extract_changes(base_mutation); - if (should_skip(changes.begin()->second, base_mutation, processor)) { - return; - } - } + bool enable_preimage, bool enable_postimage) { auto ts = find_timestamp(base_mutation); processor.begin_timestamp(ts, true); diff --git a/cdc/split.hh b/cdc/split.hh index cda25a7aa9..dd57f397b5 100644 --- a/cdc/split.hh +++ b/cdc/split.hh @@ -66,14 +66,12 @@ public: // Tells processor we have reached end of record - last part // of a given timestamp batch virtual void end_record() = 0; - - virtual const row_states_map& clustering_row_states() const = 0; }; bool should_split(const mutation& base_mutation, const per_request_options& options); void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility); + bool enable_preimage, bool enable_postimage); void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor, - bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility); + bool enable_preimage, bool enable_postimage); } diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 311245e0c0..0ff7fc5e9a 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -317,6 +317,13 @@ experimental: in July 2025 to optimize shard discovery, is not yet implemented in Alternator. + * With the ``alternator_streams_increased_compatibility`` configuration + option enabled, operations that do not change the database state + (e.g., deleting a non-existent item, removing a non-existent + attribute, or re-inserting an identical item) will not produce + stream events. Without this option, such no-op operations may still + generate spurious stream events. + ## Unimplemented API features diff --git a/mutation/mutation_partition.hh b/mutation/mutation_partition.hh index 7cf6278677..7c000759e0 100644 --- a/mutation/mutation_partition.hh +++ b/mutation/mutation_partition.hh @@ -1486,6 +1486,10 @@ public: return std::ranges::subrange(_rows.begin(), _rows.end()) | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); } + auto mutable_non_dummy_rows() { + return std::ranges::subrange(_rows.begin(), _rows.end()) + | std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); }); + } void accept(const schema&, mutation_partition_visitor&) const; // Returns the number of live CQL rows in this partition. diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 60522c2752..bfaf675bda 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -305,6 +305,93 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): # local java throws here. real does not. ensure_java_server(dynamodbstreams, error=None) +# We run a batch that mixes noop operations (in our concrete example - one delete of non-existing item) with real changes (put of two new items). +# Expected behaviour - Streams will return only two inserts (no delete). Observed behaviour - we get modify for delete as well. +# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag. +# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue. +# Reproduces SCYLLADB-1528. +def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + + with table.batch_writer() as batch: + batch.put_item({'p': p, 'c': c + '0', 'a': 0}) + events.append(['INSERT',{'c':c + '0','p':p},null,{'c':c + '0','a':0,'p':p}]) + batch.delete_item(Key={'p': p, 'c': c + '1'}) + batch.put_item({'p': p, 'c': c + '2', 'a': 2}) + + events.append(['INSERT',{'c':c + '2','p':p},null,{'c':c + '2','a':2,'p':p}]) + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# Running update_item with UpdateExpression set to remove column should not emit a MODIFY event when item does not exist +# The test tries all combinations (delete column from non-existing item, delete existing column from existing item, delete non-existing column from existing item) +# only delete column from non-existing item used to incorrectly emit a MODIFY event +# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail +# Reproduces SCYLLADB-1528 +def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression="SET e = :e, g = :g", ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166, 'g': 166} + events.append(['INSERT',{'c':c,'p':p},null,{'c':c,'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + events.append(['MODIFY',{'c':c,'p':p},{'c':c,'e':166,'g':166,'p':p}, {'c':c,'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key +def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p}, UpdateExpression='SET e = :e, g = :g', ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166, 'g': 166} + events.append(['INSERT',{'p':p},null,{'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + events.append(['MODIFY',{'p':p},{'e':166,'g':166,'p':p}, {'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + + return events + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + def test_get_shard_iterator(dynamodb, dynamodbstreams): with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: streams = dynamodbstreams.list_streams(TableName=table.name) @@ -1691,6 +1778,14 @@ def do_updates_1(table, p, c): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]}) events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}]) + + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p, 'c': c + '1'}) + events.append(['INSERT', {'p': p, 'c': c + '1'}, None, {'p': p, 'c': c + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p, 'c': c + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p, 'c': c + '1'}, UpdateExpression='REMOVE g') return events # The tested events are the same as in do_updates_1, but the table doesn't have @@ -1723,6 +1818,14 @@ def do_updates_1_no_ck(table, p, _): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]}) events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}]) + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p + '1'}) + events.append(['INSERT', {'p': p + '1'}, None, {'p': p + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p + '1'}, UpdateExpression='REMOVE g') + return events def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):