Files
scylladb/test/alternator/test_streams.py
Nadav Har'El 92ee959e9b test/alternator: speed up test_streams.py by using module-scope fixtures
Previously, all stream-table fixtures in this test file used
scope="function", forcing a fresh table to be created for every test,
slowing down the test a bit (though not much), and discouraging writing
small new tests.

This was a workaround for a DynamoDB quirk (that Alternator doesn't have):
LATEST shard iterators have a time slack and may point slightly before
the true stream head, causing leftover events from a previous test to
appear in the next test's reads.

We fix this by draining the stream inside latest_iterators() and
shards_and_latest_iterators() after obtaining the LATEST iterators:
fetch records in a loop until two consecutive polling rounds both return
empty, guaranteeing the iterators are positioned past all pre-existing
events before the caller writes anything.  With this guarantee in place,
all stream-table fixtures can safely use scope="module".

After this patch, test_streams.py continues to pass on DynamoDB.
On Alternator, the test file's run time went down a bit, from
20.2 seconds to 17.7 seconds.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-03-10 17:14:04 +02:00

2070 lines
116 KiB
Python

# Copyright 2020-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for stream operations: ListStreams, DescribeStream, GetShardIterator,
# GetRecords.
import time
import urllib.request
from contextlib import contextmanager, ExitStack
from urllib.error import URLError
import pytest
from boto3.dynamodb.types import TypeDeserializer
from botocore.exceptions import ClientError
from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request
# All tests in this file are expected to fail with tablets due to #23838.
# To ensure that Alternator Streams is still being tested, instead of
# xfailing these tests, we temporarily coerce the tests below to avoid
# using default tablets setting, even if it's available. We do this by
# using the following tags when creating each table below:
TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']
def disable_stream(dynamodbstreams, table):
table.update(StreamSpecification={'StreamEnabled': False});
# Wait for the stream to really be disabled. A table may have multiple
# historic streams - we need all of them to become DISABLED. One of
# them (the current one) may remain DISABLING for some time.
exp = time.process_time() + 60
while time.process_time() < exp:
streams = dynamodbstreams.list_streams(TableName=table.name)
disabled = True
for stream in streams['Streams']:
desc = dynamodbstreams.describe_stream(StreamArn=stream['StreamArn'])['StreamDescription']
if desc['StreamStatus'] != 'DISABLED':
disabled = False
break
if disabled:
print('disabled stream on {}'.format(table.name))
return
time.sleep(0.5)
pytest.fail("timed out")
# Cannot use fixtures. Because real dynamodb cannot _remove_ a stream
# once created. It can only expire 24h later. So reusing test_table for
# example works great for scylla/local testing, but once we run against
# actual aws instances, we get lingering streams, and worse: cannot
# create new ones to replace it, because we have overlapping types etc.
#
# So we have to create and delete a table per test. And not run this
# test to often against aws.
@contextmanager
def create_stream_test_table(dynamodb, StreamViewType=None):
spec = { 'StreamEnabled': False }
if StreamViewType != None:
spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}
table = create_test_table(dynamodb, StreamSpecification=spec,
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
])
try:
yield table
finally:
print(f"Deleting table {table.name}")
while True:
try:
table.delete()
break
except ClientError as ce:
# if the table has a stream currently being created we cannot
# delete the table immediately. Again, only with real dynamo
if ce.response['Error']['Code'] == 'ResourceInUseException':
print('Could not delete table yet. Sleeping 5s.')
time.sleep(5)
continue;
raise
def wait_for_active_stream(dynamodbstreams, table, timeout=60):
exp = time.process_time() + timeout
while time.process_time() < exp:
streams = dynamodbstreams.list_streams(TableName=table.name)
for stream in streams['Streams']:
arn = stream['StreamArn']
if arn == None:
continue
desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
if not 'StreamStatus' in desc or desc.get('StreamStatus') == 'ENABLED':
return (arn, stream['StreamLabel']);
# real dynamo takes some time until a stream is usable
print("Stream not available. Sleep 5s...")
time.sleep(5)
assert False
# Local java dynamodb server version behaves differently from
# the "real" one. Most importantly, it does not verify a number of
# parameters, and consequently does not throw when called with borked
# args. This will try to check if we are in fact running against
# this test server, and if so, just raise the error here and be done
# with it. All this just so we can run through the tests on
# aws, scylla and local.
def is_local_java(dynamodbstreams):
# no good way to check, but local server runs on a Jetty,
# so check for that.
url = dynamodbstreams.meta.endpoint_url
try:
urllib.request.urlopen(url)
except URLError as e:
if hasattr(e, 'info'):
return e.info()['Server'].startswith('Jetty')
return False
def ensure_java_server(dynamodbstreams, error='ValidationException'):
# no good way to check, but local server has a "shell" builtin,
# so check for that.
if is_local_java(dynamodbstreams):
if error != None:
raise ClientError({'Error': { 'Code' : error }}, '')
return
assert False
def test_list_streams_create(dynamodb, dynamodbstreams):
for type in stream_types:
with create_stream_test_table(dynamodb, StreamViewType=type) as table:
wait_for_active_stream(dynamodbstreams, table)
def test_list_streams_alter(dynamodb, dynamodbstreams):
for type in stream_types:
with create_stream_test_table(dynamodb, StreamViewType=None) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': type});
wait_for_active_stream(dynamodbstreams, table)
def test_list_streams_paged(dynamodb, dynamodbstreams):
# There is no reason to run this test for all stream types - we have
# other tests for creating tables with all stream types, and for using
# them. This one is only about list_streams.
for type in stream_types[0:1]:
with create_stream_test_table(dynamodb, StreamViewType=type) as table1:
with create_stream_test_table(dynamodb, StreamViewType=type) as table2:
wait_for_active_stream(dynamodbstreams, table1)
wait_for_active_stream(dynamodbstreams, table2)
streams = dynamodbstreams.list_streams(Limit=1)
assert streams
assert streams.get('Streams')
assert streams.get('LastEvaluatedStreamArn')
tables = [ table1.name, table2.name ]
while True:
for s in streams['Streams']:
name = s['TableName']
if name in tables: tables.remove(name)
if not tables:
break
streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])
# ListStreams with paging should be able correctly return a full list of
# pre-existing streams even if additional tables were added between pages
# and caused Scylla's hash table of tables to be reorganized.
# Reproduces something similar to what we saw in #12601. Unfortunately to
# reproduce the failure, we needed to create a fairly large number of tables
# below to reproduce the situation that the internal hash table got
# reshuffled. It's also not exactly the situation of issue #12601 - there
# we suspect the hash table got different order for different pages for
# other reasons - not because of added tables.
def test_list_streams_paged_with_new_table(dynamodb, dynamodbstreams):
N1 = 4 # Number of tables to create initially
LIMIT = 2 # Number of tables out of N1 to list in the first page
N2 = 30 # Number of additional tables to create later
N1_tables = []
with ExitStack() as created_tables:
for i in range(N1):
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
wait_for_active_stream(dynamodbstreams, table)
N1_tables.append(table.name)
streams = dynamodbstreams.list_streams(Limit=LIMIT)
assert streams
assert 'Streams' in streams
assert len(streams['Streams']) == LIMIT
first_tables = [s['TableName'] for s in streams['Streams']]
last_arn = streams['LastEvaluatedStreamArn']
for i in range(N2):
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
wait_for_active_stream(dynamodbstreams, table)
# Get the rest of the streams (no limit, assuming we don't have
# a huge number of streams)
streams = dynamodbstreams.list_streams(ExclusiveStartStreamArn=last_arn)
assert 'Streams' in streams
tables = first_tables + [s['TableName'] for s in streams['Streams']]
# Each of the N1 tables must have been returned from the paged
# iteration, and only once. The tables in N2 may or may not be
# returned. Tables not related to this test may also be returned.
for t in N1_tables:
assert tables.count(t) == 1
def test_list_streams_zero_limit(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
wait_for_active_stream(dynamodbstreams, table)
dynamodbstreams.list_streams(Limit=0)
# The DynamoDB documentation for ListStreams says that for Limit, "the upper
# limit is 100.". Indeed, if we try 101, we get a ValidationException.
# There is no reason why Alternator must implement exactly the same limit,
# but supporting a huge number (below we test 100,000) is not a good idea
# because theoretically (if we have a huge number of tables...) it can result
# in a huge response, which we don't want to allow.
@pytest.mark.xfail(reason="no upper limit for Limit")
def test_list_streams_too_high_limit(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
wait_for_active_stream(dynamodbstreams, table)
dynamodbstreams.list_streams(Limit=100000)
def test_create_streams_wrong_type(dynamodb, dynamodbstreams, test_table):
with pytest.raises(ClientError, match='ValidationException'):
# should throw
test_table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'Fisk'});
# just in case the test fails, disable stream again
test_table.update(StreamSpecification={'StreamEnabled': False});
def test_list_streams_empty(dynamodb, dynamodbstreams, test_table):
streams = dynamodbstreams.list_streams(TableName=test_table.name)
assert 'Streams' in streams
assert not streams['Streams'] # empty
def test_list_streams_with_nonexistent_last_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
streams = dynamodbstreams.list_streams(TableName=table.name, ExclusiveStartStreamArn='kossaapaaasdafsdaasdasdasdasasdasfadfadfasdasdas')
assert 'Streams' in streams
assert not streams['Streams'] # empty
# local java dynamodb does _not_ raise validation error for
# malformed stream arn here. verify
ensure_java_server(dynamodbstreams)
def test_describe_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
assert desc;
assert desc.get('StreamDescription')
assert desc['StreamDescription']['StreamArn'] == arn
assert desc['StreamDescription']['StreamStatus'] != 'DISABLED'
assert desc['StreamDescription']['StreamViewType'] == 'KEYS_ONLY'
assert desc['StreamDescription']['TableName'] == table.name
assert desc['StreamDescription'].get('Shards')
assert desc['StreamDescription']['Shards'][0].get('ShardId')
assert desc['StreamDescription']['Shards'][0].get('SequenceNumberRange')
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange'].get('StartingSequenceNumber')
# We don't know what the sequence number is supposed to be, but the
# DynamoDB documentation requires that it contains only numeric
# characters and some libraries rely on this. Reproduces issue #7158:
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'].isdecimal()
@pytest.mark.xfail(reason="alternator does not have creation time on streams")
def test_describe_stream_create_time(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
assert desc;
assert desc.get('StreamDescription')
# note these are non-required attributes
assert 'CreationRequestDateTime' in desc['StreamDescription']
def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')
def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
try:
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId='zzzzzzzzzzzzzzzzzzzzzzzzsfasdgagadfadfgagkjsdfsfsdjfjks')
assert not desc['StreamDescription']['Shards']
except:
# local java throws here. real does not.
ensure_java_server(dynamodbstreams, error=None)
def test_get_shard_iterator(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
shard_id = desc['StreamDescription']['Shards'][0]['ShardId'];
seq = desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'];
# spec says shard_id must be 65 chars or less
assert len(shard_id) <= 65
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
iter = dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
assert iter.get('ShardIterator')
for type in ['TRIM_HORIZON', 'LATEST']:
iter = dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
)
assert iter.get('ShardIterator')
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
# must have seq in these modes
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
)
for type in ['TRIM_HORIZON', 'LATEST']:
# should not set "seq" in these modes
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
# bad arn
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn='sdfadsfsdfsdgdfsgsfdabadfbabdadsfsdfsdfsdfsdfsdfsdfdfdssdffbdfdf', ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
# bad shard id
with pytest.raises(ClientError, match='ResourceNotFoundException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId='semprinidfaasdasfsdvacsdcfsdsvsdvsdvsdvsdvsdv',
ShardIteratorType='LATEST'
)
# bad iter type
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
ShardIteratorType='bulle', SequenceNumber=seq
)
# bad seq
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
ShardIteratorType='LATEST', SequenceNumber='sdfsafglldfngjdafnasdflgnaldklkafdsgklnasdlv'
)
def test_get_shard_iterator_for_nonexistent_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
desc = dynamodbstreams.describe_stream(StreamArn=arn)
shards = desc['StreamDescription']['Shards']
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn='sdfadfsddafgdafsgjnadflkgnalngalsdfnlkasnlkasdfasdfasf', ShardId=shards[0]['ShardId'], ShardIteratorType='LATEST'
)
def test_get_shard_iterator_for_nonexistent_shard(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
with pytest.raises(ClientError, match='ResourceNotFoundException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId='adfasdasdasdasdasdasdasdasdasasdasd', ShardIteratorType='LATEST'
)
def test_get_records(dynamodb, dynamodbstreams):
# TODO: add tests for storage/transactionable variations and global/local index
with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
p = 'piglet'
c = 'ninja'
val = 'lucifers'
val2 = 'flowers'
table.put_item(Item={'p': p, 'c': c, 'a1': val, 'a2': val2})
nval = 'semprini'
nval2 = 'nudge'
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={ 'a1': {'Value': nval, 'Action': 'PUT'},
'a2': {'Value': nval2, 'Action': 'PUT'}
})
has_insert = False
# in truth, we should sleep already here, since at least scylla
# will not be able to produce any stream content until
# ~30s after insert/update (confidence iterval)
# but it is useful to see a working null-iteration as well, so
# lets go already.
while True:
desc = dynamodbstreams.describe_stream(StreamArn=arn)
iterators = []
while True:
shards = desc['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
start = shard['SequenceNumberRange']['StartingSequenceNumber']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']
iterators.append(iter)
last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")
if not last_shard:
break
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=last_shard)
next_iterators = []
while iterators:
iter = iterators.pop(0)
response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
if 'NextShardIterator' in response:
next_iterators.append(response['NextShardIterator'])
records = response.get('Records')
# print("Query {} -> {}".format(iter, records))
if records:
for record in records:
# print("Record: {}".format(record))
type = record['eventName']
dynamodb = record['dynamodb']
keys = dynamodb['Keys']
assert keys.get('p')
assert keys.get('c')
assert keys['p'].get('S')
assert keys['p']['S'] == p
assert keys['c'].get('S')
assert keys['c']['S'] == c
if type == 'MODIFY' or type == 'INSERT':
assert dynamodb.get('NewImage')
newimage = dynamodb['NewImage'];
assert newimage.get('a1')
assert newimage.get('a2')
if type == 'INSERT' or (type == 'MODIFY' and not has_insert):
assert newimage['a1']['S'] == val
assert newimage['a2']['S'] == val2
has_insert = True
continue
if type == 'MODIFY':
assert has_insert
assert newimage['a1']['S'] == nval
assert newimage['a2']['S'] == nval2
assert dynamodb.get('OldImage')
oldimage = dynamodb['OldImage'];
assert oldimage.get('a1')
assert oldimage.get('a2')
assert oldimage['a1']['S'] == val
assert oldimage['a2']['S'] == val2
return
print("Not done. Sleep 10s...")
time.sleep(10)
iterators = next_iterators
def test_get_records_nonexistent_iterator(dynamodbstreams):
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_records(ShardIterator='sdfsdfsgagaddafgagasgasgasdfasdfasdfasdfasdgasdasdgasdg', Limit=1000)
##############################################################################
# Fixtures for creating a table with a stream enabled with one of the allowed
# StreamViewType settings (KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES).
# Unfortunately changing the StreamViewType setting of an existing stream is
# not allowed (see test_streams_change_type), and while removing and re-adding
# a stream is possible, it is very slow. So we create four different fixtures
# with the four different StreamViewType settings for these four fixtures.
@contextmanager
def create_table_ss(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def create_table_sss_lsi(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }, { 'AttributeName': 'a', 'AttributeType': 'S' }],
LocalSecondaryIndexes=[
{
'IndexName': 'a_idx',
'KeySchema': [
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'a', 'KeyType': 'RANGE' }],
'Projection': {
'ProjectionType': 'ALL',
}
}
],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def create_table_s_no_ck(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
@pytest.fixture(scope="module")
def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams):
yield from create_table_sss_lsi(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
@pytest.fixture(scope="module")
def test_table_ss_keys_only(dynamodb, dynamodbstreams):
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream:
yield stream
@pytest.fixture(scope="module")
def test_table_ss_new_image(dynamodb, dynamodbstreams):
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream:
yield stream
@pytest.fixture(scope="module")
def test_table_ss_old_image(dynamodb, dynamodbstreams):
with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream:
yield stream
@pytest.fixture(scope="module")
def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
yield stream
@pytest.fixture(scope="module")
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'KEYS_ONLY')
@pytest.fixture(scope="module")
def test_table_s_no_ck_new_image(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_IMAGE')
@pytest.fixture(scope="module")
def test_table_s_no_ck_old_image(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'OLD_IMAGE')
@pytest.fixture(scope="module")
def test_table_s_no_ck_new_and_old_images(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
# Test that it is, sadly, not allowed to use UpdateTable on a table which
# already has a stream enabled to change that stream's StreamViewType.
# Relates to #6939
def test_streams_change_type(test_table_ss_keys_only):
table, arn = test_table_ss_keys_only
with pytest.raises(ClientError, match='ValidationException.*already'):
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE'});
# If the above change succeeded (because of issue #6939), switch it back :-)
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# It is not allowed to enable stream for a table that already has a stream,
# even if of the same type.
def test_streams_enable_on_enabled(test_table_ss_new_and_old_images):
table, arn = test_table_ss_new_and_old_images
with pytest.raises(ClientError, match='ValidationException.*already.*enabled'):
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'});
# It is not allowed to disbale stream on a table that does not have a stream.
def test_streams_disable_on_disabled(test_table):
with pytest.raises(ClientError, match='ValidationException.*stream.*disable'):
test_table.update(StreamSpecification={'StreamEnabled': False});
# Utility function for listing all the shards of the given stream arn.
# Implemented by multiple calls to DescribeStream, possibly several pages
# until all the shards are returned. The return of this function should be
# cached - it is potentially slow, and DynamoDB documentation even states
# DescribeStream may only be called at a maximum rate of 10 times per second.
# list_shards() only returns the shard IDs, not the information about the
# shards' sequence number range, which is also returned by DescribeStream.
def list_shards(dynamodbstreams, arn):
# By default DescribeStream lists a limit of 100 shards. For faster
# tests we reduced the number of shards in the testing setup to
# 32 (16 vnodes x 2 cpus), see issue #6979, so to still exercise this
# paging feature, lets use a limit of 10.
limit = 10
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit)['StreamDescription']
assert len(response['Shards']) <= limit
shards = [x['ShardId'] for x in response['Shards']]
while 'LastEvaluatedShardId' in response:
# 7409 kinesis ignores LastEvaluatedShardId and just looks at last shard
assert shards[-1] == response['LastEvaluatedShardId']
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit,
ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
assert len(response['Shards']) <= limit
shards.extend([x['ShardId'] for x in response['Shards']])
print('Number of shards in stream: {}'.format(len(shards)))
assert len(set(shards)) == len(shards)
# 7409 - kinesis required shards to be in lexical order.
# verify.
assert shards == sorted(shards)
# special test: ensure we get nothing more if we ask for starting at the last
# of the last
response = dynamodbstreams.describe_stream(StreamArn=arn,
ExclusiveStartShardId=shards[-1])['StreamDescription']
assert len(response['Shards']) == 0
return shards
# Utility function for getting shard iterators starting at "LATEST" for
# all the shards of the given stream arn.
# On DynamoDB (but not Alternator), LATEST has a time slack: it may point to
# a position slightly before the true end of the stream, so writes from a
# previous test that reused the same table can appear to be "in the future"
# relative to the returned iterators and therefore show up unexpectedly in
# the current test's reads. To work around this we drain any already-pending
# records from the iterators before returning them, so the caller is
# guaranteed to see only events written *after* this call returns.
def latest_iterators(dynamodbstreams, arn):
iterators = []
for shard_id in list_shards(dynamodbstreams, arn):
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'])
assert len(set(iterators)) == len(iterators)
# Drain any records that are already visible at the LATEST position.
# We keep fetching until no more records are returned, which means that
# the stream is caught up. This drain loop is not necessary on Alternator,
# and needlessly slows the test down.
if not dynamodbstreams._endpoint.host.endswith('.amazonaws.com'):
return iterators
while True:
events = []
iterators = fetch_more(dynamodbstreams, iterators, events)
if events == []:
return iterators
# Similar to latest_iterators(), just also returns the shard id which produced
# each iterator.
def shards_and_latest_iterators(dynamodbstreams, arn):
shards_and_iterators = []
for shard_id in list_shards(dynamodbstreams, arn):
shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))
# Drain pre-existing records from the iterators, for the same reason as
# explained in latest_iterators() above.
if not dynamodbstreams._endpoint.host.endswith('.amazonaws.com'):
return shards_and_iterators
while True:
events = []
new_iters = fetch_more(dynamodbstreams, [it for _, it in shards_and_iterators], events)
shards_and_iterators = list(zip([sh for sh, _ in shards_and_iterators], new_iters))
if events == []:
return shards_and_iterators
# Utility function for fetching more content from the stream (given its
# array of iterators) into an "output" array. Call repeatedly to get more
# content - the function returns a new array of iterators which should be
# used to replace the input list of iterators.
# Note that the order of the updates is guaranteed for the same partition,
# but cannot be trusted for *different* partitions.
def fetch_more(dynamodbstreams, iterators, output):
new_iterators = []
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'NextShardIterator' in response:
new_iterators.append(response['NextShardIterator'])
output.extend(response['Records'])
assert len(set(new_iterators)) == len(new_iterators)
return new_iterators
def print_events(expected_events, output, failed_at=None):
if failed_at is None:
print(f'compare_events: timeouted')
else:
print(f'compare_events: failed at output event {failed_at}')
for index, event in enumerate(expected_events):
expected_type, expected_key, expected_old_image, expected_new_image = event
print(f'expected event {index}: type={expected_type}, key={expected_key}, old_image={expected_old_image}, new_image={expected_new_image}')
for index, event in enumerate(output):
print(f'output event {index}: type={event["eventName"]}, key={event["dynamodb"]["Keys"]}, old_image={event["dynamodb"].get("OldImage")}, new_image={event["dynamodb"].get("NewImage")}')
# Utility function for comparing "output" as fetched by fetch_more(), to a list
# expected_events, each of which looks like:
# [type, keys, old_image, new_image]
# where type is REMOVE, INSERT or MODIFY.
# The "mode" parameter specifies which StreamViewType mode (KEYS_ONLY,
# OLD_IMAGE, NEW_IMAGE, NEW_AND_OLD_IMAGES) was supposedly used to generate
# "output". This mode dictates what we can compare - e.g., in KEYS_ONLY mode
# the compare_events() function ignores the the old and new image in
# expected_events.
# The "expected_region" parameter specifies the value of awsRegion field.
# compare_events() throws an exception immediately if it sees an unexpected
# event, but if some of the expected events are just missing in the "output",
# it only returns false - suggesting maybe the caller needs to try again
# later - maybe more output is coming.
# Note that the order of events is only guaranteed (and therefore compared)
# inside a single partition.
def compare_events(expected_events, output, mode, expected_region):
# The order of expected_events is only meaningful inside a partition, so
# let's convert it into a map indexed by partition key.
expected_events_map = {}
for event in expected_events:
expected_type, expected_key, expected_old_image, expected_new_image = event
# For simplicity, we actually use the entire key, not just the partition
# key. We only lose a bit of testing power we didn't plan to test anyway
# (that events for different items in the same partition are ordered).
key = freeze(expected_key)
if not key in expected_events_map:
expected_events_map[key] = []
expected_events_map[key].append(event)
# Iterate over the events in output. An event for a certain key needs to
# be the *first* remaining event for this key in expected_events_map (and
# then we remove this matched even from expected_events_map)
for e, event in enumerate(output):
try:
# In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to
# a *different* value - 'scylladb:alternator'. Issue #6931.
assert 'eventSource' in event
# For lack of a direct equivalent of a region, Alternator provides the
# DC name instead. Reproduces #6931.
assert 'awsRegion' in event
assert event['awsRegion'] == expected_region
# Reproduces #6931.
assert 'eventVersion' in event
assert event['eventVersion'] in ['1.0', '1.1']
# Check that eventID appears, but can't check much on what it is.
assert 'eventID' in event
op = event['eventName']
record = event['dynamodb']
# record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we
# want to deserialize it to match our expected_events content.
deserializer = TypeDeserializer()
key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}
expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)
assert op == expected_type
assert record['StreamViewType'] == mode
# We don't know what ApproximateCreationDateTime should be, but we do
# know it needs to be a timestamp - there is conflicting documentation
# in what format (ISO 8601?). In any case, boto3 parses this timestamp
# for us, so we can't check it here, beyond checking it exists.
assert 'ApproximateCreationDateTime' in record
# We don't know what SequenceNumber is supposed to be, but the DynamoDB
# documentation requires that it contains only numeric characters and
# some libraries rely on this. This reproduces issue #7158:
assert 'SequenceNumber' in record
assert record['SequenceNumber'].isdecimal()
# Alternator doesn't set the SizeBytes member. Issue #6931.
#assert 'SizeBytes' in record
if mode == 'KEYS_ONLY':
assert not 'NewImage' in record
assert not 'OldImage' in record
elif mode == 'NEW_IMAGE':
assert not 'OldImage' in record
if expected_new_image == None:
assert not 'NewImage' in record
else:
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
assert expected_new_image == new_image
elif mode == 'OLD_IMAGE':
assert not 'NewImage' in record
if expected_old_image == None:
assert not 'OldImage' in record
else:
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
assert expected_old_image == old_image
elif mode == 'NEW_AND_OLD_IMAGES':
if expected_new_image == None:
assert not 'NewImage' in record
else:
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
assert expected_new_image == new_image
if expected_old_image == None:
assert not 'OldImage' in record
else:
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
assert expected_old_image == old_image
else:
pytest.fail('cannot happen')
except AssertionError:
print_events(expected_events, output, failed_at=e)
raise
# After the above loop, expected_events_map should remain empty arrays.
# If it isn't, one of the expected events did not yet happen. Return False.
for entry in expected_events_map.values():
if len(entry) > 0:
return False
return True
def fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode):
# Updating the stream is asynchronous. Moreover, it may even be delayed
# artificially (see Alternator's alternator_streams_time_window_s config).
# So if compare_events() reports the stream data is missing some of the
# expected events (by returning False), we need to retry it for some time.
# Note that compare_events() throws if the stream data contains *wrong*
# (unexpected) data, so even failing tests usually finish reasonably
# fast - depending on the alternator_streams_time_window_s parameter.
# This is optimization is important to keep *failing* tests reasonably
# fast and not have to wait until the following arbitrary timeout.
timeout = time.time() + 20
region = get_region(dynamodb)
output = []
while time.time() < timeout:
iterators = fetch_more(dynamodbstreams, iterators, output)
print("after fetch_more number expected_events={}, output={}".format(len(expected_events), len(output)))
if compare_events(expected_events, output, mode, region):
# success!
return
time.sleep(0.5)
# If we're still here, the last compare_events returned false.
print_events(expected_events, output)
pytest.fail('missing events in output: {}'.format(output))
# Convenience function used to implement several tests below. It runs a given
# function "updatefunc" which is supposed to do some updates to the table
# and also return an expected_events list. do_test() then fetches the streams
# data and compares it to the expected_events using compare_events().
def do_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode):
table, arn = test_table_ss_stream
iterators = latest_iterators(dynamodbstreams, arn)
p = random_string()
c = random_string()
expected_events = updatefunc(table, p, c)
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
def do_batch_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode, item_count = 3):
p = [random_string() for _ in range(item_count)]
c = [f"ck_{i}" for i in range(item_count)]
table, arn = test_table_ss_stream
iterators = latest_iterators(dynamodbstreams, arn)
expected_events = updatefunc(table, p, c)
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
# Test a single PutItem of a new item. Should result in a single INSERT
# event. Reproduces #6930.
def test_streams_putitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
return events
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
# Replacing an item should result in a MODIFY, rather than REMOVE and MODIFY.
# Moreover, the old item should be visible in OldImage. Reproduces #6930.
def test_streams_putitem_new_items_override_old(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'a': 1})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': 1}])
table.put_item(Item={'p': p, 'c': c, 'b': 2})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 1}, {'p': p, 'c': c, 'b': 2}])
table.put_item(Item={'p': p, 'c': c, 'a': 3, 'b': 4})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 2}, {'p': p, 'c': c, 'a': 3, 'b': 4}])
table.put_item(Item={'p': p, 'c': c})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 3, 'b': 4}, {'p': p, 'c': c}])
return events
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Same as test_streams_putitem_new_items_overrides_old, but for a replaced
# column is used in LSI. Reproduces #6930.
def test_streams_putitem_new_item_overrides_old_lsi(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'a': '1'})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': '1'}])
table.put_item(Item={'p': p, 'c': c})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': '1'}, {'p': p, 'c': c}])
assert len(full_scan(table, IndexName='a_idx')) == 0
return events
do_test(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test PutItem streams in a table with no clustering key.
def test_streams_putitem_no_ck_new_items_override_old(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, _):
events = []
table.put_item(Item={'p': p, 'a': 1})
events.append(['INSERT', {'p': p}, None, {'p': p, 'a': 1}])
table.put_item(Item={'p': p, 'b': 2})
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 1}, {'p': p, 'b': 2}])
table.put_item(Item={'p': p, 'a': 3, 'b': 4})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 2}, {'p': p, 'a': 3, 'b': 4}])
table.put_item(Item={'p': p})
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 3, 'b': 4}, {'p': p}])
return events
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# In test_streams_new_and_old_images among other things we test that a
# DeleteItem operation gives the correct old image. That test used a table
# which has a sort key, the following test checks this (old image in DeleteItem
# case) for a table that does NOT have a sort key. Alternator uses different
# types of tombstones for deletions from tables with a CK (row tombstone) vs.
# with a partition key only (partition tombstone) - and those may be handled
# differently by CDC. In issue #26382 - which this test reproduces - we
# discovered that our implementation doesn't select a preimage for partition
# deletions, resulting in a missing OldImage.
def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, _):
events = []
table.update_item(Key={'p': p},
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 1}])
table.delete_item(Key={'p': p})
events.append(['REMOVE', {'p': p}, {'p': p, 'x': 1}, None])
return events
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test a single UpdateItem. Should result in a single INSERT event.
# Reproduces #6918.
def test_streams_updateitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
return events
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
# Test OLD_IMAGE using UpdateItem. Verify that the OLD_IMAGE indeed includes,
# as needed, the entire old item and not just the modified columns.
# Reproduces issue #6935
def test_streams_updateitem_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# Note: The "None" as OldImage here tests that the OldImage should be
# missing because the item didn't exist. This reproduces issue #6933.
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
return events
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# Above we verified that if an item did not previously exist, the OLD_IMAGE
# would be missing, but if the item did previously exist, OLD_IMAGE should
# be present and must include the key. Here we confirm the special case the
# latter case - the case of a pre-existing *empty* item, which has just the
# key - in this case since the item did exist, OLD_IMAGE should be returned -
# and include just the key. This is a special case of reproducing #6935 -
# the first patch for this issue failed in this special case.
def test_streams_updateitem_old_image_empty_item(test_table_ss_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Create an *empty* item, with nothing except a key:
table.update_item(Key={'p': p, 'c': c})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
# Note that OLD_IMAGE should be present and be the empty item,
# with just a key, not entirely missing.
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}])
return events
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# Test that OLD_IMAGE indeed includes the entire old item and not just the
# modified attributes, in the special case of attributes which are a key of
# a secondary index.
# The unique thing about this case is that as currently implemented,
# secondary-index key attributes are real Scylla columns, contrasting with
# other attributes which are just elements of a map. And our CDC
# implementation treats those cases differently - when a map is modified
# the preimage includes the entire content of the map, but for regular
# columns they are only included in the preimage if they change.
# Currently fails in Alternator because the item's key is missing in
# OldImage (#6935) and the LSI key is also missing (#7030).
@pytest.fixture(scope="module")
def test_table_ss_old_image_and_lsi(dynamodb, dynamodbstreams):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'c', 'KeyType': 'RANGE'}],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
{ 'AttributeName': 'k', 'AttributeType': 'S' }],
LocalSecondaryIndexes=[{
'IndexName': 'index',
'KeySchema': [
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'k', 'KeyType': 'RANGE'}],
'Projection': { 'ProjectionType': 'ALL' }
}],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE' })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def test_streams_updateitem_old_image_lsi(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x, k = :k',
ExpressionAttributeValues={':x': 2, ':k': 'dog'})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
# In issue #7030, the 'k' value was missing from the OldImage.
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}, {'p': p, 'c': c, 'x': 2, 'k': 'dog', 'y': 3}])
return events
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# This test is the same as the previous (test_streams_updateitem_old_image_lsi)
# except that the *old* value of the LSI key k is missing. Since PR 8568, CDC
# adds a special deleted$k marker for a missing column in the preimage, and
# this test verifies that Alternator Streams doesn't put this extra marker in
# its output.
def test_streams_updateitem_old_image_lsi_missing_column(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Note that we do *not* set the "k" attribute (the LSI key)
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
# "k" should be missing from OldImage, because it wasn't set earlier.
# Verify the events contain only the expected attributes - not some
# internal markers like "deleted$k".
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
return events
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# In general, DynamoDB doesn't emit events if an operation is a nop. This test
# test verifies that UpdateItem doesn't result in a log row when the updated
# item exists and it is identical to the old one.
# Corresponding tests for PutItem are included in tests based on do_updates_1.
# The case for a PutItem within a BatchWriteItem is tested in
# test_streams_batch_overwrite_identical. Reproduces #6918.
def test_streams_updateitem_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Overwriting the old item with an identical new item shouldn't produce
# any events.
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='ADD x :x',
ExpressionAttributeValues={':x': 0})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# The above test_streams_updateitem_identical tested that if we UpdateItem an
# item changing an attribute to a value identical to the one it already has,
# no event is generated on the stream. In this test we change an attribute
# value from one map to another which are really the same value, but have
# different serialization (the map's elements are ordered differently).
# Since the value is nevertheless the same, here too we expect not to see
# a change event in the stream. Reproduces issue #27375.
@pytest.mark.xfail(reason="issue #27375")
def test_streams_updateitem_equal_but_not_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# We use manual_request() to let us be sure that we pass the JSON
# values to the server in exactly the order we want to, without the
# Python SDK possibly rearranging them.
# Set x to be a map: {'dog': 1, 'cat': 2, 'mouse': 3}.
payload = '''{
"TableName": "''' + table.name + '''",
"Key": {"p": {"S": "''' + p + '''"}, "c": {"S": "''' + c + '''"}},
"UpdateExpression": "SET x = :x",
"ExpressionAttributeValues": {":x": %s}
}'''
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': {'dog': 1, 'cat': 2, 'mouse': 3}}])
# Overwriting x an identical map item shouldn't produce any events,
# so we won't add anything to the "events" list:
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
# Now try to overwrite x with a map that has the same elements in
# a different order. These two values, despite not being identical
# in JSON form, should be considered equal and again no event should
# be generated.
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"cat": {"N": "2"}, "dog": {"N": "1"}, "mouse": {"N": "3"}}}')
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Tests that deleting a missing attribute with UpdateItem doesn't generate a
# REMOVE event. Other cases are tested in test_streams_batch_delete_missing
# and in tests based on do_updates_1. Reproduces #6918.
def test_streams_updateitem_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Create an item
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}])
# Deleting a missing attribute shouldn't produce any event for both
# AttributeUpdates and UpdateExpression.
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={'y': {'Action': 'DELETE'}})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE z')
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Tests similar to the above tests for OLD_IMAGE, just for NEW_IMAGE mode.
# Verify that the NEW_IMAGE includes the entire old item (including the key),
# that deleting the item results in a missing NEW_IMAGE, and that setting the
# item to be empty has a different result - a NEW_IMAGE with just a key.
# Reproduces issue #7107.
def test_streams_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# Confirm that when adding one attribute "x", the NewImage contains both
# the new x, and the key columns (p and c).
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Confirm that when adding just attribute "y", the NewImage will contain
# all the attributes, including the old "x":
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
# Confirm that when deleting the columns x and y, the NewImage becomes
# empty - but still exists and contains the key columns,
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE x, y')
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
# Confirm that deleting the item results in a missing NewImage:
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c}, None])
return events
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
# Test similar to the above test for NEW_IMAGE corner cases, but here for
# NEW_AND_OLD_IMAGES mode.
# Although it is likely that if both OLD_IMAGE and NEW_IMAGE work correctly then
# so will the combined NEW_AND_OLD_IMAGES mode, it is still possible that the
# implementation of the combined mode has unique bugs, so it is worth testing
# it separately.
# Reproduces issue #7107.
def test_streams_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# The item doesn't yet exist, so OldImage is missing.
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Confirm that when adding just attribute "y", the NewImage will contain
# all the attributes, including the old "x". Also, OldImage contains the
# key attributes, not just "x":
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
# Confirm that when deleting the attributes x and y, the NewImage becomes
# empty - but still exists and contains the key attributes:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE x, y')
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
# Confirm that when adding an attribute z to the empty item, OldItem is
# not missing - it just contains only the key attributes:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET z = :val1', ExpressionAttributeValues={':val1': 4})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}])
# Confirm that deleting the item results in a missing NewImage:
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}, None])
return events
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test that when a stream shard has no data to read, GetRecords returns an
# empty Records array - not a missing one. Reproduces issue #6926.
def test_streams_no_records(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
# Get just one shard - any shard - and its LATEST iterator. Because it's
# LATEST, there will be no data to read from this iterator.
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
shard_id = shard['ShardId']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'NextShardIterator' in response or 'EndingSequenceNumber' in shard['SequenceNumberRange']
assert 'Records' in response
# We expect Records to be empty - there is no data at the LATEST iterator.
assert response['Records'] == []
# Test that after fetching the last result from a shard, we don't get it
# yet again. Reproduces issue #6942.
def test_streams_last_result(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually (we may need to retry this for a while), *one* of the
# stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Found the shard with the data! Test that it only has
# one event and that if we try to read again, we don't
# get more data (this was issue #6942).
assert len(response['Records']) == 1
assert 'NextShardIterator' in response
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert response['Records'] == []
return
time.sleep(0.5)
pytest.fail("timed out")
# In test_streams_last_result above we tested that there is no further events
# after reading the only one. In this test we verify that if we *do* perform
# another change on the same key, we do get another event and it happens on the
# *same* shard.
def test_streams_another_result(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Finally found the shard reporting the changes to this key
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
iter = response['NextShardIterator']
# Found the shard with the data. It only has one event so if
# we try to read again, we find nothing (this is the same as
# what test_streams_last_result tests).
response = dynamodbstreams.get_records(ShardIterator=iter)
assert response['Records'] == []
assert 'NextShardIterator' in response
iter = response['NextShardIterator']
# Do another UpdateItem operation to the same key, so it is
# expected to write to the *same* shard:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val2', ExpressionAttributeValues={':val2': 7})
# Again we may need to wait for the event to appear on the stream:
timeout = time.time() + 15
while time.time() < timeout:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
# The test is done, successfully.
return
time.sleep(0.5)
pytest.fail("timed out")
time.sleep(0.5)
pytest.fail("timed out")
# Test the SequenceNumber attribute returned for stream events, and the
# "AT_SEQUENCE_NUMBER" iterator that can be used to re-read from the same
# event again given its saved "sequence number".
def test_streams_at_sequence_number(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the one event:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Finally found the shard reporting the changes to this key:
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
sequence_number = response['Records'][0]['dynamodb']['SequenceNumber']
# Found the shard with the data. It only has one event so if
# we try to read again, we find nothing (this is the same as
# what test_streams_last_result tests).
iter = response['NextShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert response['Records'] == []
assert 'NextShardIterator' in response
# If we use the SequenceNumber of the first event to create an
# AT_SEQUENCE_NUMBER iterator, we can read the same event again.
# We don't need a loop and a timeout, because this event is already
# available.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=sequence_number)['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the SequenceNumber attribute returned for stream events, and the
# "AFTER_SEQUENCE_NUMBER" iterator that can be used to re-read *after* the same
# event again given its saved "sequence number".
def test_streams_after_sequence_number(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
# #7424 - AWS sdk assumes sequence numbers can be compared
# as bigints, and are monotonically growing.
assert int(sequence_number_1) < int(sequence_number_2)
# If we use the SequenceNumber of the first event to create an
# AFTER_SEQUENCE_NUMBER iterator, we can read the second event
# (only) again. We don't need a loop and a timeout, because this
# event is already available.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AFTER_SEQUENCE_NUMBER',
SequenceNumber=sequence_number_1)['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_2
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the "TRIM_HORIZON" iterator, which can be used to re-read *all* the
# previously-read events of the stream shard again.
def test_streams_trim_horizon(dynamodb, dynamodbstreams):
# This test needs a brand-new stream, without old data from other
# tests, so we can't reuse the test_table_ss_keys_only fixture.
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
# If we use the TRIM_HORIZON iterator, we should receive the
# same two events again, in the same order.
# Note that we assume that the fixture gave us a brand new
# stream, with no old events saved from other tests. If we
# couldn't assume this, this test would need to become much
# more complex, and would need to read from this shard until
# we find the two events we are looking for.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 2
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_1
assert response['Records'][1]['dynamodb']['SequenceNumber'] == sequence_number_2
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the StartingSequenceNumber information returned by DescribeStream.
# The DynamoDB documentation explains that StartingSequenceNumber is
# "The first sequence number for the stream records contained within a shard"
# which suggests that is should be the sequence number of the first event
# that can actually be read from the shard. However, in Alternator the
# StartingSequenceNumber may be *before* any of the events on the shard -
# it is the time that the shard was created. In Alternator, it is possible
# that no event appeared on this shard for some time after it was created,
# and it is also possible that after more than 24 hours, some of the original
# items that existed in the shard have expired. Nevertheless, we believe
# that the important thing is that reading a shard starting at
# StartingSequenceNumber will result in reading all the available items -
# similar to how TRIM_HORIZON works. This is what the following test verifies.
def test_streams_starting_sequence_number(dynamodb, dynamodbstreams):
# This test needs a brand-new stream, without old data from other
# tests, so we can't reuse the test_table_ss_keys_only fixture.
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Get for all the stream shards the iterator starting at the shard's
# StartingSequenceNumber:
response = dynamodbstreams.describe_stream(StreamArn=arn)
shards = response['StreamDescription']['Shards']
while 'LastEvaluatedShardId' in response['StreamDescription']:
response = dynamodbstreams.describe_stream(StreamArn=arn,
ExclusiveStartShardId=response['StreamDescription']['LastEvaluatedShardId'])
shards.extend(response['StreamDescription']['Shards'])
iterators = []
for shard in shards:
shard_id = shard['ShardId']
start = shard['SequenceNumberRange']['StartingSequenceNumber']
assert start.isdecimal()
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=start)['ShardIterator'])
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
return
time.sleep(0.5)
pytest.fail("timed out")
# Above we tested some specific operations in small tests aimed to reproduce
# a specific bug, in the following tests we do a all the different operations,
# PutItem, DeleteItem, BatchWriteItem and UpdateItem, and check the resulting
# stream for correctness.
# The following tests focus on multiple operations on the *same* item. Those
# should appear in the stream in the correct order.
# Reproduces #6918, #6930.
def do_updates_1(table, p, c):
events = []
# a first put_item appears as an INSERT event. Note also empty old_image.
table.put_item(Item={'p': p, 'c': c, 'x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
table.put_item(Item={'p': p, 'c': c, 'x': 2})
# a second put_item of the *same* key and different value, appears as a MODIFY event
table.put_item(Item={'p': p, 'c': c, 'y': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'y': 3}])
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}, None])
# deleting a non-existent item doesn't appear in the log at all.
table.delete_item(Key={'p': p, 'c': c})
# If update_item creates an item, the event is INSERT as well.
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET b = :val1',
ExpressionAttributeValues={':val1': 4})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'b': 4}])
# If update_item modifies the item, note how old and new image includes both old and new columns
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1',
ExpressionAttributeValues={':val1': 5})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4}, {'p': p, 'c': c, 'b': 4, 'x': 5}])
# TODO: incredibly, if we uncomment the "REMOVE b" update below, it will be
# completely missing from the DynamoDB stream - the test continues to
# pass even though we didn't add another expected event, and even though
# the preimage in the following expected event includes this "b" we will
# remove. I couldn't reproduce this apparent DynamoDB bug in a smaller test.
#table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE b')
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])
return events
# The tested events are the same as in do_updates_1, but the table doesn't have
# a clustering key.
def do_updates_1_no_ck(table, p, _):
events = []
# a first put_item appears as an INSERT event. Note also empty old_image.
table.put_item(Item={'p': p, 'x': 2})
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 2}])
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
table.put_item(Item={'p': p, 'x': 2})
# a second put_item of the *same* key and different value, appears as a MODIFY event
table.put_item(Item={'p': p, 'y': 3})
events.append(['MODIFY', {'p': p}, {'p': p, 'x': 2}, {'p': p, 'y': 3}])
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
table.delete_item(Key={'p': p})
events.append(['REMOVE', {'p': p}, {'p': p, 'y': 3}, None])
# deleting a non-existent item doesn't appear in the log at all.
table.delete_item(Key={'p': p})
# If update_item creates an item, the event is INSERT as well.
table.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ExpressionAttributeValues={':val1': 4})
events.append(['INSERT', {'p': p}, None, {'p': p, 'b': 4}])
# If update_item modifies the item, note how old and new image includes both old and new columns
table.update_item(Key={'p': p},
UpdateExpression='SET x = :val1',
ExpressionAttributeValues={':val1': 5})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4}, {'p': p, 'b': 4, 'x': 5}])
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}])
return events
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY')
def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE')
def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE')
def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')
def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY')
def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE')
def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE')
def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')
# Tests that a DeleteItem within a BatchWriteItem that tries to remove a
# missing item doesn't generate a REMOVE event.
# Reproduces #6918.
def test_streams_batch_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, ps, cs):
# Deleting items that don't exist shouldn't produce any events.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'DeleteRequest': {'Key': {'p': p, 'c': c}}} for p, c in zip(ps, cs)]
})
return []
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# This test verifies that a PutItem within a BatchWriteItem generates no event
# when it replaces an item with an identical one. This no-op behaviour is also
# tested for a standard PutItem (see tests based on do_updates_1), and an
# UpdateItem (see test_streams_updateitem_overwrite_identical). Reproduces
# #6918.
def test_streams_batch_overwrite_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
# Batch PutItem identical items
def do_updates(table, ps, cs):
# Emit a separate event for each item in the batch.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
})
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': i}] for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
# Overwriting with identical items shouldn't produce any events
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
})
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
def test_streams_batch_overwrite_different(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, ps, cs):
# Emit a separate event for each item in the batch.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 1}}} for p, c in zip(ps, cs)]
})
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}] for p, c in zip(ps, cs)]
# ... but overwriting with different items should be reported as MODIFY.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 2}}} for p, c in zip(ps, cs)]
})
events.extend([['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 1}, {'p': p, 'c': c, 'x': 2}] for p, c in zip(ps, cs)])
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# A fixture which creates a test table with a stream enabled, and returns a
# bunch of interesting information collected from the CreateTable response.
# This fixture is module-scoped - it can be shared by multiple tests below,
# because we are not going to actually use or change this stream, we will
# just do multiple tests on its setup.
@pytest.fixture(scope="module")
def test_table_stream_with_result(dynamodb, dynamodbstreams):
tablename = unique_table_name()
result = dynamodb.meta.client.create_table(TableName=tablename,
Tags=TAGS,
BillingMode='PAY_PER_REQUEST',
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
])
waiter = dynamodb.meta.client.get_waiter('table_exists')
waiter.config.delay = 1
waiter.config.max_attempts = 200
waiter.wait(TableName=tablename)
table = dynamodb.Table(tablename)
yield result, table
while True:
try:
table.delete()
return
except ClientError as ce:
# if the table has a stream currently being created we cannot
# delete the table immediately. Again, only with real dynamo
if ce.response['Error']['Code'] == 'ResourceInUseException':
print('Could not delete table yet. Sleeping 5s.')
time.sleep(5)
continue;
raise
# Wait for a table to return to "ACTIVE" status. We need to call this after
# doing an UpdateTable to a table - because before this wait finishes we are
# not allowed to update the same table again or delete it.
def wait_for_status_active(table):
for i in range(60):
desc = table.meta.client.describe_table(TableName=table.name)
if desc['Table']['TableStatus'] == 'ACTIVE':
return
time.sleep(1)
pytest.fail("wait_for_status_active() timed out")
# Test that in a table with Streams enabled, LatestStreamArn is returned
# by CreateTable, DescribeTable and UpdateTable, and is the same ARN as
# returned by ListStreams. Reproduces issue #7157.
def test_latest_stream_arn(test_table_stream_with_result, dynamodbstreams):
(result, table) = test_table_stream_with_result
assert 'LatestStreamArn' in result['TableDescription']
arn_in_create_table = result['TableDescription']['LatestStreamArn']
# Check that ListStreams returns the same stream ARN as returned
# by the original CreateTable
(arn_in_list_streams, label) = wait_for_active_stream(dynamodbstreams, table)
assert arn_in_create_table == arn_in_list_streams
# Check that DescribeTable also includes the same LatestStreamArn:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'LatestStreamArn' in desc
assert desc['LatestStreamArn'] == arn_in_create_table
# Check that UpdateTable also includes the same LatestStreamArn.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert desc['LatestStreamArn'] == arn_in_create_table
wait_for_status_active(table)
# Test that in a table with Streams enabled, LatestStreamLabel is returned
# by CreateTable, DescribeTable and UpdateTable, and is the same "label" as
# returned by ListStreams. Reproduces issue #7162.
def test_latest_stream_label(test_table_stream_with_result, dynamodbstreams):
(result, table) = test_table_stream_with_result
assert 'LatestStreamLabel' in result['TableDescription']
label_in_create_table = result['TableDescription']['LatestStreamLabel']
# Check that ListStreams returns the same stream label as returned
# by the original CreateTable
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
assert label_in_create_table == label
# Check that DescribeTable also includes the same LatestStreamLabel:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'LatestStreamLabel' in desc
assert desc['LatestStreamLabel'] == label_in_create_table
# Check that UpdateTable also includes the same LatestStreamLabel.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert desc['LatestStreamLabel'] == label_in_create_table
wait_for_status_active(table)
# Test that in a table with Streams enabled, StreamSpecification is returned
# by CreateTable, DescribeTable and UpdateTable. Reproduces issue #7163.
def test_stream_specification(test_table_stream_with_result, dynamodbstreams):
# StreamSpecification as set in test_table_stream_with_result:
stream_specification = {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}
(result, table) = test_table_stream_with_result
assert 'StreamSpecification' in result['TableDescription']
assert stream_specification == result['TableDescription']['StreamSpecification']
# Check that DescribeTable also includes the same StreamSpecification:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'StreamSpecification' in desc
assert stream_specification == desc['StreamSpecification']
# Check that UpdateTable also includes the same StreamSpecification.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert stream_specification == desc['StreamSpecification']
wait_for_status_active(table)
# The following test checks the behavior of *closed* shards.
# We achieve a closed shard by disabling the stream - the DynamoDB
# documentation states that "If you disable a stream, any shards that are
# still open will be closed. The data in the stream will continue to be
# readable for 24 hours". In the test we verify that indeed, after a shard
# is closed, it is still readable with GetRecords (reproduces issue #7239).
# Moreover, upon reaching the end of data in the shard, the NextShardIterator
# attribute should say that the end was reached. The DynamoDB documentation
# says that NextShardIterator should be "set to null" in this case - but it
# is not clear what "null" means in this context: Should NextShardIterator
# be missing? Or a "null" JSON type? Or an empty string? This test verifies
# that the right answer is that NextShardIterator should be *missing*
# (reproduces issue #7237).
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_closed_read(dynamodb, dynamodbstreams):
# This test can't use the shared table test_table_ss_keys_only,
# because it wants to disable streaming, so let's create a new table:
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
disable_stream(dynamodbstreams, table)
# Even after streaming is disabled for the table, we can still read
# from the earlier stream (it is guaranteed to work for 24 hours).
# The iterators we got earlier should still be fully usable, and
# eventually *one* of the stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Found the shard with the data! Test that it only has
# one event. NextShardIterator should either be missing now,
# indicating that it is a closed shard (DynamoDB does this),
# or, it may (and currently does in Alternator) return another
# and reading from *that* iterator should then tell us that
# we reached the end of the shard (i.e., zero results and
# missing NextShardIterator).
assert len(response['Records']) == 1
if 'NextShardIterator' in response:
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
# Until now we verified that we can read the closed shard
# using an old iterator. Let's test now that the closed
# shard id is also still valid, and a new iterator can be
# created for it, and the old data can be read from it:
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert len(response['Records']) == 1
return
time.sleep(0.5)
pytest.fail("timed out")
# In the above test (test_streams_closed_read) we used a disabled stream as
# a means to generate a closed shard, and tested the behavior of that closed
# shard. In the following test, we do more extensive testing on the the
# behavior of a disabled stream and verify that it is sill usable (for 24
# hours), reproducing issue #7239: The disabled stream's ARN should still be
# listed for the table, this ARN should continue to work, listing the
# stream's shards should give an indication that they are all closed - but
# all these shards should still be readable.
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_disabled_stream(dynamodb, dynamodbstreams):
# This test can't use the shared table test_table_ss_keys_only,
# because it wants to disable streaming, so let's create a new table:
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :x', ExpressionAttributeValues={':x': 5})
# Wait for this one update to become available in the stream before we
# disable the stream. Otherwise, theoretically (although unlikely in
# practice) we may disable the stream before the update was saved to it.
timeout = time.time() + 15
found = False
while time.time() < timeout and not found:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) > 0:
found = True
break
time.sleep(0.5)
assert found
disable_stream(dynamodbstreams, table)
# Check that the stream ARN which we previously got for the disabled
# stream is still listed by ListStreams
arns = [stream['StreamArn'] for stream in dynamodbstreams.list_streams(TableName=table.name)['Streams']]
assert arn in arns
# DescribeStream on the disabled stream still works and lists its shards.
# All these shards are listed as being closed (i.e., should have
# EndingSequenceNumber). The basic details of the stream (e.g., the view
# type) are available and the status of the stream is DISABLED.
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info = response['Shards']
while 'LastEvaluatedShardId' in response:
response = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info.extend(response['Shards'])
print('Number of shards in stream: {}'.format(len(shards_info)))
for shard in shards_info:
assert 'EndingSequenceNumber' in shard['SequenceNumberRange']
assert shard['SequenceNumberRange']['EndingSequenceNumber'].isdecimal()
# We can get TRIM_HORIZON iterators for all these shards, to read all
# the old data they still have (this data should be saved for 24 hours
# after the stream was disabled)
iterators = []
for shard in shards_info:
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'])
# We can read the one change we did in one of these iterators. The data
# should be available immediately - no need for retries with timeout.
nrecords = 0
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response:
nrecords += len(response['Records'])
# The shard is closed, so NextShardIterator should either be missing
# now, indicating that it is a closed shard (DynamoDB does this),
# or, it may (and currently does in Alternator) return an iterator
# and reading from *that* iterator should then tell us that
# we reached the end of the shard (i.e., zero results and
# missing NextShardIterator).
if 'NextShardIterator' in response:
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
assert nrecords == 1
# When streams are enabled for a table, we get a unique ARN which should be
# unique but not change unless streams are eventually disabled for this table.
# If this ARN changes unexpectedly, it can confuse existing readers who are
# still using this to read from the stream. We (incorrectly) suspected in
# issue #12601 that changes to the version of the schema lead to a change of
# the ARN. In this test we show that it doesn't happen.
def test_stream_arn_unchanging(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
# Change a tag on the table. This changes its schema.
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'animal', 'Value': 'dog' }])
# The change in the table's schema should not change its stream ARN
streams = dynamodbstreams.list_streams(TableName=table.name)
assert 'Streams' in streams
assert len(streams['Streams']) == 1
assert streams['Streams'][0]['StreamArn'] == arn
# Enabling a stream shouldn't cause any extra table to appear in ListTables.
# In issue #19911, enabling streams on a table called xyz caused the name
# "xyz_scylla_cdc_log" to appear in ListTables. The following test creates
# a table with a long unique name, and ensures that only one table containing
# this name as a substring is listed.
# In test_gsi.py and test_lsi.py we have similar tests for GSI and LSI.
# Reproduces #19911
def test_stream_list_tables(dynamodb):
with new_test_table(dynamodb,
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
# Check that the test table is listed by ListTable, but its long
# and unique name (created by unique_table_name()) isn't a proper
# substring of any other table's name.
tables = list_tables(dynamodb)
assert table.name in tables
for listed_name in tables:
if table.name != listed_name:
assert table.name not in listed_name
# The DynamoDB documentation for GetRecords says that "GetRecords can retrieve
# a maximum of 1 MB of data or 1000 stream records, whichever comes first.",
# and that for Limit, "the upper limit is 1000". Indeed, if we try Limit=1001,
# we get a ValidationException. There is no reason why Alternator must
# implement exactly the same maximum, but since it's documented, there is
# no reason not to. In any case, some maximum is needed unless we make
# sure the relevant code (executor::get_records()) has preemption points -
# and currently it does not. Reproduces issue #23534
def test_get_records_too_high_limit(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
# Get just one shard - any shard - and its LATEST iterator. Because it's
# LATEST, there will be no data to read from this iterator, but we don't
# care, we just want to run the GetRecords request, we don't care what
# is returned.
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
shard_id = shard['ShardId']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
# Limit=1000 should be allowed:
dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
# Limit=1001 should NOT be allowed
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=1001)
# Limit must be >= 0:
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=0)
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=-1)
# padded_name() creates a unique name of given length by taking the
# output of unique_table_name() and padding it with extra 'x' characters:
def padded_name(length):
u = unique_table_name()
assert length >= len(u)
return u + 'x'*(length-len(u))
# When Alternator enables streams, CDC creates a new table whose name is the
# same as the existing table with the suffix "_scylla_cdc_log". In issue
# #24598, we discovered that we could create a table with length 222 but
# then creating the stream crashed Scylla. We accept that either the table
# creation or the stream creation can fail if we lower the limits in the
# future, but it mustn't crash as it did in #24598.
# We have two versions of this test - one with the stream created with the
# table, and one with the stream added to the existing table.
# Reproduces #24598
def test_stream_table_name_length_222_create(dynamodb):
try:
with new_test_table(dynamodb, name=padded_name(222),
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
pass
except ClientError as e:
# In Alternator, we may decide that 222 is too long to create a table,
# or too long to create the stream. But if we reached here, at least
# it didn't crash.
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
# Reproduces #24598
def test_stream_table_name_length_222_update(dynamodb, dynamodbstreams):
try:
with new_test_table(dynamodb, name=padded_name(222),
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# DynamoDB doesn't allow deleting the base table while the stream
# is in the process of being added
wait_for_active_stream(dynamodbstreams, table)
except ClientError as e:
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
# When the table has a shorter name length, like 192, we should be able to
# create both the table and streams, with no problems.
def test_stream_table_name_length_192_create(dynamodb):
with new_test_table(dynamodb, name=padded_name(192),
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
pass
def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams):
with new_test_table(dynamodb, name=padded_name(192),
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# DynamoDB doesn't allow deleting the base table while the stream
# is in the process of being added
wait_for_active_stream(dynamodbstreams, table)
# In earlier tests, we tested the stream events logged for BatchWriteItem,
# but it was usually a single item in the batch or in do_batch_test(),
# it was multiple items in different partitions. This test checks the
# remaining case, of a batch writing multiple items in one partition -
# and checks that the correct events appear for them on the stream.
# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt
# write isolation mode, which writes all the items in the batch with the
# same timestamp. The test is parameterized to try all write isolation
# modes, and reproduces #28439 when it failed only in always_use_lwt mode.
# This is a Scylla-only test because it checks write isolation modes, which
# don't exist in DynamoDB.
@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw'])
def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode):
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
table, stream_arn = stream
# Set write isolation mode on the table to the chosen "mode":
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}])
# Now try the test, a single BatchWriteItem writing three different
# items in the same partition p:
def do_updates(table, p, c):
cs = [c + '1', c + '2', c + '3']
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]})
return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs]
do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# TODO: tests on multiple partitions
# TODO: write a test that disabling the stream and re-enabling it works, but
# requires the user to wait for the first stream to become DISABLED before
# creating the new one. Then ListStreams should return the two streams,
# one DISABLED and one ENABLED? I'm not sure we want or can do this in
# Alternator.
# TODO: Can we test shard splitting? (shard splitting
# requires the user to - periodically or following shards ending - to call
# DescribeStream again. We don't do this in any of our tests.