diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 32c7f14d56..3f4526bf15 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -482,6 +482,7 @@ def test_get_records_nonexistent_iterator(dynamodbstreams): # and if in the future we can work around the DynamoDB problem, we can return # these fixtures to module scope. +@contextmanager def create_table_ss(dynamodb, dynamodbstreams, type): table = create_test_table(dynamodb, Tags=TAGS, @@ -529,19 +530,23 @@ def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams): @pytest.fixture(scope="function") def test_table_ss_keys_only(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') + with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_new_image(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_old_image(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') + with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream: + yield stream @pytest.fixture(scope="function") def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams): @@ -2011,6 +2016,33 @@ def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams): # is in the process of being added wait_for_active_stream(dynamodbstreams, table) +# In earlier tests, we tested the stream events logged for BatchWriteItem, +# but it was usually a single item in the batch or in do_batch_test(), +# it was multiple items in different partitions. This test checks the +# remaining case, of a batch writing multiple items in one partition - +# and checks that the correct events appear for them on the stream. +# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt +# write isolation mode, which writes all the items in the batch with the +# same timestamp. The test is parameterized to try all write isolation +# modes, and reproduces #28439 when it failed only in always_use_lwt mode. +# This is a Scylla-only test because it checks write isolation modes, which +# don't exist in DynamoDB. +@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw']) +def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode): + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream: + table, stream_arn = stream + # Set write isolation mode on the table to the chosen "mode": + table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn'] + table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}]) + # Now try the test, a single BatchWriteItem writing three different + # items in the same partition p: + def do_updates(table, p, c): + cs = [c + '1', c + '2', c + '3'] + table.meta.client.batch_write_item(RequestItems = { + table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]}) + return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs] + do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + # TODO: tests on multiple partitions # TODO: write a test that disabling the stream and re-enabling it works, but # requires the user to wait for the first stream to become DISABLED before diff --git a/test/alternator/test_write_isolation.py b/test/alternator/test_write_isolation.py index 7deeec8342..bea702cfe2 100644 --- a/test/alternator/test_write_isolation.py +++ b/test/alternator/test_write_isolation.py @@ -51,7 +51,7 @@ import pytest from botocore.exceptions import ClientError -from .util import create_test_table, random_string +from .util import create_test_table, random_string, new_test_table @pytest.fixture(scope="function", autouse=True) def all_tests_are_scylla_only(scylla_only): @@ -430,3 +430,53 @@ def test_isolation_updateitem_returnvalues(table_forbid_rmw, tables_permit_rmw): UpdateExpression='SET a = :val', ExpressionAttributeValues={':val': 1}, ReturnValues=returnvalues) + +############################################################################# +# BatchWriteItem tests. +# BatchWriteItem writes are always pure write - never RMW (read-modify-write) +# operations - because none of the RMW options are supported: Batch writes +# don't support an UpdateExpression, a ConditionExpression or ReturnValues. +# Still, even in the pure write case, the write code paths are different for +# the different write isolation modes, and we need to exercise them. + +# For completeness, this test exercises a single batch with more than one +# partition, more than one clustering key in the same partition, and a +# combination of PutRequest and DeleteRequest. +def test_isolation_batchwriteitem(dynamodb): + # Unfortunately we can't use the four table fixtures that all other tests + # use, because those fixtures only have a partition key and we also want + # a sort key (so we can test the case of multiple items in the same + # partition). So we have to create four new tables just for this test. + for mode in ['only_rmw_uses_lwt', 'always_use_lwt', 'unsafe_rmw', 'forbid_rmw']: + with new_test_table(dynamodb, + Tags=[{'Key': 'system:write_isolation', 'Value': mode}], + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, + { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], + AttributeDefinitions=[ + { 'AttributeName': 'p', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table: + p1 = random_string() + p2 = random_string() + # Set up two items in p1, only one of them will be deleted later + table.put_item(Item={'p': p1, 'c': 'item1', 'x': 'hello'}) + assert table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item1', 'x': 'hello'} + table.put_item(Item={'p': p1, 'c': 'item2', 'x': 'hi'}) + assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'} + # Perform the batch write, writing to two different partitions + # (p1 and p2), multiple items in one partition (p1), and + # one of the writes is a DeleteRequest (of item1 that we wrote + # above). + table.meta.client.batch_write_item(RequestItems = { + table.name: [ + {'PutRequest': {'Item': {'p': p1, 'c': 'item3', 'x': 'dog'}}}, + {'PutRequest': {'Item': {'p': p1, 'c': 'item4', 'x': 'cat'}}}, + {'DeleteRequest': {'Key': {'p': p1, 'c': 'item1'}}}, + {'PutRequest': {'Item': {'p': p2, 'c': 'item5', 'x': 'mouse'}}} + ]}) + # After the batch write, item1 will be gone, item2..item5 should + # exist with the right content. + assert 'Item' not in table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True) + assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'} + assert table.get_item(Key={'p': p1, 'c': 'item3'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item3', 'x': 'dog'} + assert table.get_item(Key={'p': p1, 'c': 'item4'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item4', 'x': 'cat'} + assert table.get_item(Key={'p': p2, 'c': 'item5'}, ConsistentRead=True)['Item'] == {'p': p2, 'c': 'item5', 'x': 'mouse'}