alternator: add failing tests for Streams

Add failing tests for Streams functionality.
Trying to remove column from non-existing item is producing
a MODIFY event (while it should none).
Doing batch write with operations working on the same partition,
where one operation is without side effects and second with
will produce events for both operations, even though first changes nothing.

First test has two versions - with and without clustering key.
Second has only with clustering key, as we can't produce
batch write with two items for the same partition -
batch write can't use primary key more than once in single call.
We also add a test for batch write, where one of three operations
has no observable side effects and should not show up in Streams
output, but in current scylla's version it does show.
This commit is contained in:
Radosław Cybulski
2026-02-02 17:40:28 +01:00
parent f0e9177130
commit 2894542e57

View File

@@ -286,6 +286,96 @@ def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
# local java throws here. real does not.
ensure_java_server(dynamodbstreams, error=None)
# We run a batch that mixes noop operations (in our concrete example - one delete of non-existing item) with real changes (put of two new items).
# Expected behaviour - Streams will return only two inserts (no delete). Observed behaviour - we get modify for delete as well.
# Test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail due to how Streams work without the flag.
# Test requires write_isolation set to always, otherwise upper layer will split batch write into separate cdc operations, sidestepping the issue.
# Reproduces SCYLLADB-1528.
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_spurious_modify_mixing_noop_with_real_changes_in_batch_write_item(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
events = []
with table.batch_writer() as batch:
batch.put_item({'p': p, 'c': c + '0', 'a': 0})
events.append(['INSERT',{'c':c + '0','p':p},null,{'c':c + '0','a':0,'p':p}])
batch.delete_item(Key={'p': p, 'c': c + '1'})
batch.put_item({'p': p, 'c': c + '2', 'a': 2})
events.append(['INSERT',{'c':c + '2','p':p},null,{'c':c + '2','a':2,'p':p}])
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images_write_isolation_always, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Running update_item with UpdateExpression set to remove column should not emit a MODIFY event when item does not exist
# The test tries all combinations (delete column from non-existing item, delete existing column from existing item, delete non-existing column from existing item)
# only delete column from non-existing item used to incorrectly emit a MODIFY event
# test requires `alternator_streams_increased_compatibility` set to be true, otherwise will fail
# Reproduces SCYLLADB-1528
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_noop_update_expr_on_missing_item(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
events = []
# first we try to remove column from non existing item
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p, 'c': c}).get('Item', None)
assert v is None
# then we try to remove existing column from existing item
table.update_item(Key={'p': p, 'c': c}, UpdateExpression="SET e = :e, g = :g", ExpressionAttributeValues={':e': 166, ':g': 166})
v = table.get_item(Key={'p': p, 'c': c})['Item']
assert v == {'p': p, 'c': c, 'e': 166, 'g': 166}
events.append(['INSERT',{'c':c,'p':p},null,{'c':c,'e':166,'g':166,'p':p}])
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p, 'c': c})['Item']
assert v == {'p': p, 'c': c, 'e': 166}
events.append(['MODIFY',{'c':c,'p':p},{'c':c,'e':166,'g':166,'p':p}, {'c':c,'e':166,'p':p}])
# finally we try again to remove the same column (non existing) from existing item
table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p, 'c': c})['Item']
assert v == {'p': p, 'c': c, 'e': 166}
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# the same as test_streams_noop_update_expr_on_missing_item but for a table without clustering key
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_noop_update_expr_on_missing_item_on_no_clustering_key_table(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
null = None
def do_updates(table, p, c):
events = []
# first we try to remove column from non existing item
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p}).get('Item', None)
assert v is None
# then we try to remove existing column from existing item
table.update_item(Key={'p': p}, UpdateExpression='SET e = :e, g = :g', ExpressionAttributeValues={':e': 166, ':g': 166})
v = table.get_item(Key={'p': p})['Item']
assert v == {'p': p, 'e': 166, 'g': 166}
events.append(['INSERT',{'p':p},null,{'e':166,'g':166,'p':p}])
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p})['Item']
assert v == {'p': p, 'e': 166}
events.append(['MODIFY',{'p':p},{'e':166,'g':166,'p':p}, {'e':166,'p':p}])
# finally we try again to remove the same column (non existing) from existing item
table.update_item(Key={'p': p}, UpdateExpression='REMOVE g')
v = table.get_item(Key={'p': p})['Item']
assert v == {'p': p, 'e': 166}
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
def test_get_shard_iterator(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
@@ -1672,6 +1762,14 @@ def do_updates_1(table, p, c):
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])
# put item with just a key, must be an insert as it doesn't exist
table.put_item(Item={'p': p, 'c': c + '1'})
events.append(['INSERT', {'p': p, 'c': c + '1'}, None, {'p': p, 'c': c + '1'}])
# put item with just a key but it exists, so no event
table.put_item(Item={'p': p, 'c': c + '1'})
# delete non-existing column from item with only key fields - no event
table.update_item(Key={'p': p, 'c': c + '1'}, UpdateExpression='REMOVE g')
return events
# The tested events are the same as in do_updates_1, but the table doesn't have
@@ -1704,36 +1802,52 @@ def do_updates_1_no_ck(table, p, _):
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}])
# put item with just a key, must be an insert as it doesn't exist
table.put_item(Item={'p': p + '1'})
events.append(['INSERT', {'p': p + '1'}, None, {'p': p + '1'}])
# put item with just a key but it exists, so no event
table.put_item(Item={'p': p + '1'})
# delete non-existing column from item with only key fields - no event
table.update_item(Key={'p': p + '1'}, UpdateExpression='REMOVE g')
return events
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE')
@pytest.mark.xfail(reason="Temporary - fix coming in next commit")
def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')