mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-01 21:55:50 +00:00
Merge 'More Alternator tests for BatchWriteItem' from Nadav Har'El
The goal of this small pull request is to reproduce issue #28439, which found a bug in the Alternator Streams output when BatchWriteItem is called to write multiple items in the same partition, and always_use_lwt write isolation mode is used. * The first patch reproduces this specific bug in Alternator Streams. * The second patch adds missing (Fixes #28171) tests for BatchWriteItem in different write modes, and shows that BatchWriteItem itself works correctly - the bug is just in Alternator Streams' reporting of this write. Closes scylladb/scylladb#28528 * github.com:scylladb/scylladb: test/alternator: add test for BatchWriteItem with different write isolations test/alternator: reproducer for Alternator Streams bug
This commit is contained in:
@@ -482,6 +482,7 @@ def test_get_records_nonexistent_iterator(dynamodbstreams):
|
||||
# and if in the future we can work around the DynamoDB problem, we can return
|
||||
# these fixtures to module scope.
|
||||
|
||||
@contextmanager
|
||||
def create_table_ss(dynamodb, dynamodbstreams, type):
|
||||
table = create_test_table(dynamodb,
|
||||
Tags=TAGS,
|
||||
@@ -529,19 +530,23 @@ def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams):
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_keys_only(dynamodb, dynamodbstreams):
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY')
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream:
|
||||
yield stream
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_new_image(dynamodb, dynamodbstreams):
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE')
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream:
|
||||
yield stream
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_old_image(dynamodb, dynamodbstreams):
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE')
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream:
|
||||
yield stream
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
|
||||
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
||||
yield stream
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
|
||||
@@ -2011,6 +2016,33 @@ def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams):
|
||||
# is in the process of being added
|
||||
wait_for_active_stream(dynamodbstreams, table)
|
||||
|
||||
# In earlier tests, we tested the stream events logged for BatchWriteItem,
|
||||
# but it was usually a single item in the batch or in do_batch_test(),
|
||||
# it was multiple items in different partitions. This test checks the
|
||||
# remaining case, of a batch writing multiple items in one partition -
|
||||
# and checks that the correct events appear for them on the stream.
|
||||
# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt
|
||||
# write isolation mode, which writes all the items in the batch with the
|
||||
# same timestamp. The test is parameterized to try all write isolation
|
||||
# modes, and reproduces #28439 when it failed only in always_use_lwt mode.
|
||||
# This is a Scylla-only test because it checks write isolation modes, which
|
||||
# don't exist in DynamoDB.
|
||||
@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw'])
|
||||
def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode):
|
||||
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
||||
table, stream_arn = stream
|
||||
# Set write isolation mode on the table to the chosen "mode":
|
||||
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
|
||||
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}])
|
||||
# Now try the test, a single BatchWriteItem writing three different
|
||||
# items in the same partition p:
|
||||
def do_updates(table, p, c):
|
||||
cs = [c + '1', c + '2', c + '3']
|
||||
table.meta.client.batch_write_item(RequestItems = {
|
||||
table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]})
|
||||
return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs]
|
||||
do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
||||
|
||||
# TODO: tests on multiple partitions
|
||||
# TODO: write a test that disabling the stream and re-enabling it works, but
|
||||
# requires the user to wait for the first stream to become DISABLED before
|
||||
|
||||
@@ -51,7 +51,7 @@
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from .util import create_test_table, random_string
|
||||
from .util import create_test_table, random_string, new_test_table
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def all_tests_are_scylla_only(scylla_only):
|
||||
@@ -430,3 +430,53 @@ def test_isolation_updateitem_returnvalues(table_forbid_rmw, tables_permit_rmw):
|
||||
UpdateExpression='SET a = :val',
|
||||
ExpressionAttributeValues={':val': 1},
|
||||
ReturnValues=returnvalues)
|
||||
|
||||
#############################################################################
|
||||
# BatchWriteItem tests.
|
||||
# BatchWriteItem writes are always pure write - never RMW (read-modify-write)
|
||||
# operations - because none of the RMW options are supported: Batch writes
|
||||
# don't support an UpdateExpression, a ConditionExpression or ReturnValues.
|
||||
# Still, even in the pure write case, the write code paths are different for
|
||||
# the different write isolation modes, and we need to exercise them.
|
||||
|
||||
# For completeness, this test exercises a single batch with more than one
|
||||
# partition, more than one clustering key in the same partition, and a
|
||||
# combination of PutRequest and DeleteRequest.
|
||||
def test_isolation_batchwriteitem(dynamodb):
|
||||
# Unfortunately we can't use the four table fixtures that all other tests
|
||||
# use, because those fixtures only have a partition key and we also want
|
||||
# a sort key (so we can test the case of multiple items in the same
|
||||
# partition). So we have to create four new tables just for this test.
|
||||
for mode in ['only_rmw_uses_lwt', 'always_use_lwt', 'unsafe_rmw', 'forbid_rmw']:
|
||||
with new_test_table(dynamodb,
|
||||
Tags=[{'Key': 'system:write_isolation', 'Value': mode}],
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||||
AttributeDefinitions=[
|
||||
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||||
{ 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table:
|
||||
p1 = random_string()
|
||||
p2 = random_string()
|
||||
# Set up two items in p1, only one of them will be deleted later
|
||||
table.put_item(Item={'p': p1, 'c': 'item1', 'x': 'hello'})
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item1', 'x': 'hello'}
|
||||
table.put_item(Item={'p': p1, 'c': 'item2', 'x': 'hi'})
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'}
|
||||
# Perform the batch write, writing to two different partitions
|
||||
# (p1 and p2), multiple items in one partition (p1), and
|
||||
# one of the writes is a DeleteRequest (of item1 that we wrote
|
||||
# above).
|
||||
table.meta.client.batch_write_item(RequestItems = {
|
||||
table.name: [
|
||||
{'PutRequest': {'Item': {'p': p1, 'c': 'item3', 'x': 'dog'}}},
|
||||
{'PutRequest': {'Item': {'p': p1, 'c': 'item4', 'x': 'cat'}}},
|
||||
{'DeleteRequest': {'Key': {'p': p1, 'c': 'item1'}}},
|
||||
{'PutRequest': {'Item': {'p': p2, 'c': 'item5', 'x': 'mouse'}}}
|
||||
]})
|
||||
# After the batch write, item1 will be gone, item2..item5 should
|
||||
# exist with the right content.
|
||||
assert 'Item' not in table.get_item(Key={'p': p1, 'c': 'item1'}, ConsistentRead=True)
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item2'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item2', 'x': 'hi'}
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item3'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item3', 'x': 'dog'}
|
||||
assert table.get_item(Key={'p': p1, 'c': 'item4'}, ConsistentRead=True)['Item'] == {'p': p1, 'c': 'item4', 'x': 'cat'}
|
||||
assert table.get_item(Key={'p': p2, 'c': 'item5'}, ConsistentRead=True)['Item'] == {'p': p2, 'c': 'item5', 'x': 'mouse'}
|
||||
|
||||
Reference in New Issue
Block a user