diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 881b2f9785..d54ef3202b 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -286,6 +286,96 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): # local java throws here. real does not. ensure_java_server(dynamodbstreams, error=None) +# We run a batch that mixes noop operations (in our concrete example - one delete of non-existing item) with real changes (put of two new items). +# Expected behaviour - Streams will return only two inserts (no delete). Observed behaviour - we get modify for delete as well. +# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag. +# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue. +# Reproduces SCYLLADB-1528. +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + + with table.batch_writer() as batch: + batch.put_item({'p': p, 'c': c + '0', 'a': 0}) + events.append(['INSERT',{'c':c + '0','p':p},null,{'c':c + '0','a':0,'p':p}]) + batch.delete_item(Key={'p': p, 'c': c + '1'}) + batch.put_item({'p': p, 'c': c + '2', 'a': 2}) + + events.append(['INSERT',{'c':c + '2','p':p},null,{'c':c + '2','a':2,'p':p}]) + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# Running update_item with UpdateExpression set to remove column should not emit a MODIFY event when item does not exist +# The test tries all combinations (delete column from non-existing item, delete existing column from existing item, delete non-existing column from existing item) +# only delete column from non-existing item used to incorrectly emit a MODIFY event +# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail +# Reproduces SCYLLADB-1528 +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression="SET e = :e, g = :g", ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166, 'g': 166} + events.append(['INSERT',{'c':c,'p':p},null,{'c':c,'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + events.append(['MODIFY',{'c':c,'p':p},{'c':c,'e':166,'g':166,'p':p}, {'c':c,'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p, 'c': c})['Item'] + assert v == {'p': p, 'c': c, 'e': 166} + + return events + + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + +# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") +def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): + null = None + def do_updates(table, p, c): + events = [] + # first we try to remove column from non existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p}).get('Item', None) + assert v is None + + # then we try to remove existing column from existing item + table.update_item(Key={'p': p}, UpdateExpression='SET e = :e, g = :g', ExpressionAttributeValues={':e': 166, ':g': 166}) + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166, 'g': 166} + events.append(['INSERT',{'p':p},null,{'e':166,'g':166,'p':p}]) + + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + events.append(['MODIFY',{'p':p},{'e':166,'g':166,'p':p}, {'e':166,'p':p}]) + + # finally we try again to remove the same column (non existing) from existing item + table.update_item(Key={'p': p}, UpdateExpression='REMOVE g') + v = table.get_item(Key={'p': p})['Item'] + assert v == {'p': p, 'e': 166} + + return events + with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): + do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + def test_get_shard_iterator(dynamodb, dynamodbstreams): with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: streams = dynamodbstreams.list_streams(TableName=table.name) @@ -1672,6 +1762,14 @@ def do_updates_1(table, p, c): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]}) events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}]) + + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p, 'c': c + '1'}) + events.append(['INSERT', {'p': p, 'c': c + '1'}, None, {'p': p, 'c': c + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p, 'c': c + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p, 'c': c + '1'}, UpdateExpression='REMOVE g') return events # The tested events are the same as in do_updates_1, but the table doesn't have @@ -1704,36 +1802,52 @@ def do_updates_1_no_ck(table, p, _): # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event. table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]}) events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}]) + # put item with just a key, must be an insert as it doesn't exist + table.put_item(Item={'p': p + '1'}) + events.append(['INSERT', {'p': p + '1'}, None, {'p': p + '1'}]) + # put item with just a key but it exists, so no event + table.put_item(Item={'p': p + '1'}) + # delete non-existing column from item with only key fields - no event + table.update_item(Key={'p': p + '1'}, UpdateExpression='REMOVE g') + return events +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE') +@pytest.mark.xfail(reason="Temporary - fix coming in next commit") def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams): with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)): do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')