From c63f43975f95049fbf3e63a8a5d417116578663c Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 4 Feb 2026 08:13:41 +0200 Subject: [PATCH 1/2] test/alternator: reproducer for Alternator Streams bug This patch adds a reproducer for an Alternator Streams bug described in issue #28439, where the stream returns the wrong events (and fewer of them) in the following specific combination of the following circumstances: 1. A BatchWriteItem operation writing multiple items to the *same* partition. 2. The "always_use_lwt" write isolation mode is used. (the bug doesn't occur in other write isolation modes). We didn't catch this bug earlier because the Alternator Streams test we had for BatchWriteItem had multiple items in multiple partitions, and we missed the multiple-items-in-one-partition case. Moreover, today we run all the tests in only_rmw_uses_lwt mode (in the past, we did use always_use_lwt, but changed recently in commit e7257b1393e following commit 76a766c that changed test.py). As issue #28439 explains, the underlying cause of the bug is that the always_use_lwt causes the multiple items to be written with the same timestamp, which confused the Alternator Streams code reading the CDC log. The bug is not in BatchWriteItem itself, or in ScyllaDB CDC, but just in the Alternator Streams layer. The test in this patch is parameterized to run on each of the four write isolation modes, and currently fails (and so marked xfail) just for the one mode 'always_use_lwt'. The test is scylla_only, as its purpose is to checks the different write isolation mode - which don't exist in AWS DynamoDB. Refs #28439 Signed-off-by: Nadav Har'El --- test/alternator/test_streams.py | 40 +++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 4c5e20cfcd..7cb55e8645 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -482,6 +482,7 @@ def test_get_records_nonexistent_iterator(dynamodbstreams): # and if in the future we can work around the DynamoDB problem, we can return # these fixtures to module scope. +@contextmanager def create_table_ss(dynamodb, dynamodbstreams, type): table = create_test_table(dynamodb, Tags=TAGS, @@ -529,19 +530,23 @@ def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams): @pytest.fixture(scope="function") def test_table_ss_keys_only(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') + with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_new_image(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_old_image(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') + with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream: + yield stream @pytest.fixture(scope="function") def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams): - yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream: + yield stream @pytest.fixture(scope="function") def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams): @@ -1994,6 +1999,33 @@ def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams): # is in the process of being added wait_for_active_stream(dynamodbstreams, table) +# In earlier tests, we tested the stream events logged for BatchWriteItem, +# but it was usually a single item in the batch or in do_batch_test(), +# it was multiple items in different partitions. This test checks the +# remaining case, of a batch writing multiple items in one partition - +# and checks that the correct events appear for them on the stream. +# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt +# write isolation mode, which writes all the items in the batch with the +# same timestamp. The test is parameterized to try all write isolation +# modes, and reproduces #28439 when it failed only in always_use_lwt mode. +# This is a Scylla-only test because it checks write isolation modes, which +# don't exist in DynamoDB. +@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw']) +def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode): + with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream: + table, stream_arn = stream + # Set write isolation mode on the table to the chosen "mode": + table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn'] + table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}]) + # Now try the test, a single BatchWriteItem writing three different + # items in the same partition p: + def do_updates(table, p, c): + cs = [c + '1', c + '2', c + '3'] + table.meta.client.batch_write_item(RequestItems = { + table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]}) + return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs] + do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES') + # TODO: tests on multiple partitions # TODO: write a test that disabling the stream and re-enabling it works, but # requires the user to wait for the first stream to become DISABLED before From 47e827262f12772df56fcaff2364236303b5ee77 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 4 Feb 2026 09:00:24 +0200 Subject: [PATCH 2/2] test/alternator: add test for BatchWriteItem with different write isolations Alternator's various write operations have different code paths for the different write isolation modes. Because most of the test suite runs in only a single write mode (currently - only_rmw_uses_lwt), we already introduced a test file test/alternator/test_write_isolation.py for checking the different write operations in *all* four write isolation modes. But we missed testing one write operation - BatchWriteItem. This operation isn't very "interesting" because it doesn't support *any* read-modify-option option (it doesn't support UpdateExpression, ConditionExpression or ReturnValues), but even without those, the pure write code still has different code paths with and without LWT, and should be tested. So we add the missing test here - and it passes. In issue #28439 we discovered a bug that can be seen in Alternator Streams in the case of BatchWriteItem with multiple writes to the same partition and always_use_lwt mode. The fact that the test added here passes shows that the bug is NOT in BatchWriteItem itself, which works correctly in this case - but only in the Alternator Streams layer. Fixes #28171 Signed-off-by: Nadav Har'El --- test/alternator/test_write_isolation.py | 52 ++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/test/alternator/test_write_isolation.py b/test/alternator/test_write_isolation.py index 7deeec8342..bea702cfe2 100644 --- a/test/alternator/test_write_isolation.py +++ b/test/alternator/test_write_isolation.py @@ -51,7 +51,7 @@ import pytest from botocore.exceptions import ClientError -from .util import create_test_table, random_string +from .util import create_test_table, random_string, new_test_table @pytest.fixture(scope="function", autouse=True) def all_tests_are_scylla_only(scylla_only): @@ -430,3 +430,53 @@ def test_isolation_updateitem_returnvalues(table_forbid_rmw, tables_permit_rmw): UpdateExpression='SET a = :val', ExpressionAttributeValues={':val': 1}, ReturnValues=returnvalues) + +############################################################################# +# BatchWriteItem tests. +# BatchWriteItem writes are always pure write - never RMW (read-modify-write) +# operations - because none of the RMW options are supported: Batch writes +# don't support an UpdateExpression, a ConditionExpression or ReturnValues. +# Still, even in the pure write case, the write code paths are different for +# the different write isolation modes, and we need to exercise them. + +# For completeness, this test exercises a single batch with more than one +# partition, more than one clustering key in the same partition, and a +# combination of PutRequest and DeleteRequest. +def test_isolation_batchwriteitem(dynamodb): + # Unfortunately we can't use the four table fixtures that all other tests + # use, because those fixtures only have a partition key and we also want + # a sort key (so we can test the case of multiple items in the same + # partition). So we have to create four new tables just for this test. + for mode in ['only_rmw_uses_lwt', 'always_use_lwt', 'unsafe_rmw', 'forbid_rmw']: + with new_test_table(dynamodb, + Tags=[{'Key': 'system:write_isolation', 'Value': mode}], + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, + { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], + AttributeDefinitions=[ + { 'AttributeName': 'p', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table: + p1 = random_string() + p2 = random_string() + # Set up two items in p1, only one of them will be deleted later + table.put_item(Item={'p': p1, 'c': 'item1', 'x': 'hello'}) + assert table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item1', 'x': 'hello'} + table.put_item(Item={'p': p1, 'c': 'item2', 'x': 'hi'}) + assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'} + # Perform the batch write, writing to two different partitions + # (p1 and p2), multiple items in one partition (p1), and + # one of the writes is a DeleteRequest (of item1 that we wrote + # above). + table.meta.client.batch_write_item(RequestItems = { + table.name: [ + {'PutRequest': {'Item': {'p': p1, 'c': 'item3', 'x': 'dog'}}}, + {'PutRequest': {'Item': {'p': p1, 'c': 'item4', 'x': 'cat'}}}, + {'DeleteRequest': {'Key': {'p': p1, 'c': 'item1'}}}, + {'PutRequest': {'Item': {'p': p2, 'c': 'item5', 'x': 'mouse'}}} + ]}) + # After the batch write, item1 will be gone, item2..item5 should + # exist with the right content. + assert 'Item' not in table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True) + assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'} + assert table.get_item(Key={'p': p1, 'c': 'item3'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item3', 'x': 'dog'} + assert table.get_item(Key={'p': p1, 'c': 'item4'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item4', 'x': 'cat'} + assert table.get_item(Key={'p': p2, 'c': 'item5'}, ConsistentRead=True)['Item'] == {'p': p2, 'c': 'item5', 'x': 'mouse'}