Files
scylladb/test/alternator/test_streams.py
Nadav Har'El 5c2ca56adf test/alternator: fix test passing a spurious parameter
The test test_streams.py::test_streams_putitem_new_item_overrides_old_lsi
failed on DynamoDB (Refs #26079) because we passed an unused parameter
NonKeyAttributes to the Projection setting an LSI. NonKeyAttributes is
only allowed when ProjectionType=INCLUDE, but we used ProjectionType=ALL.
DynamoDB refuses to create an LSI with such inconsistent parameters,
and we just need to remove this unnecessary parameter from this test.

The reason why this test didn't fail on Alternator is that Alternator
doesn't yet support or even parse the Projection parameter (Refs #5036).

We also add an xfailing test (passes on DynamoDB, fails on Alternator)
checking that a spurious NonKeyAttributes parameter is rejected. When
we get around to implement the projection feature (#5036), this will
be yet another acceptance test for this feature.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-01-05 13:51:01 +02:00

2006 lines
111 KiB
Python

# Copyright 2020-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for stream operations: ListStreams, DescribeStream, GetShardIterator,
# GetRecords.
import time
import urllib.request
from contextlib import contextmanager, ExitStack
from urllib.error import URLError
import pytest
from boto3.dynamodb.types import TypeDeserializer
from botocore.exceptions import ClientError
from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request
# All tests in this file are expected to fail with tablets due to #23838.
# To ensure that Alternator Streams is still being tested, instead of
# xfailing these tests, we temporarily coerce the tests below to avoid
# using default tablets setting, even if it's available. We do this by
# using the following tags when creating each table below:
TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']
def disable_stream(dynamodbstreams, table):
table.update(StreamSpecification={'StreamEnabled': False});
# Wait for the stream to really be disabled. A table may have multiple
# historic streams - we need all of them to become DISABLED. One of
# them (the current one) may remain DISABLING for some time.
exp = time.process_time() + 60
while time.process_time() < exp:
streams = dynamodbstreams.list_streams(TableName=table.name)
disabled = True
for stream in streams['Streams']:
desc = dynamodbstreams.describe_stream(StreamArn=stream['StreamArn'])['StreamDescription']
if desc['StreamStatus'] != 'DISABLED':
disabled = False
break
if disabled:
print('disabled stream on {}'.format(table.name))
return
time.sleep(0.5)
pytest.fail("timed out")
# Cannot use fixtures. Because real dynamodb cannot _remove_ a stream
# once created. It can only expire 24h later. So reusing test_table for
# example works great for scylla/local testing, but once we run against
# actual aws instances, we get lingering streams, and worse: cannot
# create new ones to replace it, because we have overlapping types etc.
#
# So we have to create and delete a table per test. And not run this
# test to often against aws.
@contextmanager
def create_stream_test_table(dynamodb, StreamViewType=None):
spec = { 'StreamEnabled': False }
if StreamViewType != None:
spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}
table = create_test_table(dynamodb, StreamSpecification=spec,
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
])
try:
yield table
finally:
print(f"Deleting table {table.name}")
while True:
try:
table.delete()
break
except ClientError as ce:
# if the table has a stream currently being created we cannot
# delete the table immediately. Again, only with real dynamo
if ce.response['Error']['Code'] == 'ResourceInUseException':
print('Could not delete table yet. Sleeping 5s.')
time.sleep(5)
continue;
raise
def wait_for_active_stream(dynamodbstreams, table, timeout=60):
exp = time.process_time() + timeout
while time.process_time() < exp:
streams = dynamodbstreams.list_streams(TableName=table.name)
for stream in streams['Streams']:
arn = stream['StreamArn']
if arn == None:
continue
desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
if not 'StreamStatus' in desc or desc.get('StreamStatus') == 'ENABLED':
return (arn, stream['StreamLabel']);
# real dynamo takes some time until a stream is usable
print("Stream not available. Sleep 5s...")
time.sleep(5)
assert False
# Local java dynamodb server version behaves differently from
# the "real" one. Most importantly, it does not verify a number of
# parameters, and consequently does not throw when called with borked
# args. This will try to check if we are in fact running against
# this test server, and if so, just raise the error here and be done
# with it. All this just so we can run through the tests on
# aws, scylla and local.
def is_local_java(dynamodbstreams):
# no good way to check, but local server runs on a Jetty,
# so check for that.
url = dynamodbstreams.meta.endpoint_url
try:
urllib.request.urlopen(url)
except URLError as e:
if hasattr(e, 'info'):
return e.info()['Server'].startswith('Jetty')
return False
def ensure_java_server(dynamodbstreams, error='ValidationException'):
# no good way to check, but local server has a "shell" builtin,
# so check for that.
if is_local_java(dynamodbstreams):
if error != None:
raise ClientError({'Error': { 'Code' : error }}, '')
return
assert False
def test_list_streams_create(dynamodb, dynamodbstreams):
for type in stream_types:
with create_stream_test_table(dynamodb, StreamViewType=type) as table:
wait_for_active_stream(dynamodbstreams, table)
def test_list_streams_alter(dynamodb, dynamodbstreams):
for type in stream_types:
with create_stream_test_table(dynamodb, StreamViewType=None) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': type});
wait_for_active_stream(dynamodbstreams, table)
def test_list_streams_paged(dynamodb, dynamodbstreams):
# There is no reason to run this test for all stream types - we have
# other tests for creating tables with all stream types, and for using
# them. This one is only about list_streams.
for type in stream_types[0:1]:
with create_stream_test_table(dynamodb, StreamViewType=type) as table1:
with create_stream_test_table(dynamodb, StreamViewType=type) as table2:
wait_for_active_stream(dynamodbstreams, table1)
wait_for_active_stream(dynamodbstreams, table2)
streams = dynamodbstreams.list_streams(Limit=1)
assert streams
assert streams.get('Streams')
assert streams.get('LastEvaluatedStreamArn')
tables = [ table1.name, table2.name ]
while True:
for s in streams['Streams']:
name = s['TableName']
if name in tables: tables.remove(name)
if not tables:
break
streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])
# ListStreams with paging should be able correctly return a full list of
# pre-existing streams even if additional tables were added between pages
# and caused Scylla's hash table of tables to be reorganized.
# Reproduces something similar to what we saw in #12601. Unfortunately to
# reproduce the failure, we needed to create a fairly large number of tables
# below to reproduce the situation that the internal hash table got
# reshuffled. It's also not exactly the situation of issue #12601 - there
# we suspect the hash table got different order for different pages for
# other reasons - not because of added tables.
def test_list_streams_paged_with_new_table(dynamodb, dynamodbstreams):
N1 = 4 # Number of tables to create initially
LIMIT = 2 # Number of tables out of N1 to list in the first page
N2 = 30 # Number of additional tables to create later
N1_tables = []
with ExitStack() as created_tables:
for i in range(N1):
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
wait_for_active_stream(dynamodbstreams, table)
N1_tables.append(table.name)
streams = dynamodbstreams.list_streams(Limit=LIMIT)
assert streams
assert 'Streams' in streams
assert len(streams['Streams']) == LIMIT
first_tables = [s['TableName'] for s in streams['Streams']]
last_arn = streams['LastEvaluatedStreamArn']
for i in range(N2):
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
wait_for_active_stream(dynamodbstreams, table)
# Get the rest of the streams (no limit, assuming we don't have
# a huge number of streams)
streams = dynamodbstreams.list_streams(ExclusiveStartStreamArn=last_arn)
assert 'Streams' in streams
tables = first_tables + [s['TableName'] for s in streams['Streams']]
# Each of the N1 tables must have been returned from the paged
# iteration, and only once. The tables in N2 may or may not be
# returned. Tables not related to this test may also be returned.
for t in N1_tables:
assert tables.count(t) == 1
def test_list_streams_zero_limit(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
wait_for_active_stream(dynamodbstreams, table)
dynamodbstreams.list_streams(Limit=0)
# The DynamoDB documentation for ListStreams says that for Limit, "the upper
# limit is 100.". Indeed, if we try 101, we get a ValidationException.
# There is no reason why Alternator must implement exactly the same limit,
# but supporting a huge number (below we test 100,000) is not a good idea
# because theoretically (if we have a huge number of tables...) it can result
# in a huge response, which we don't want to allow.
@pytest.mark.xfail(reason="no upper limit for Limit")
def test_list_streams_too_high_limit(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
wait_for_active_stream(dynamodbstreams, table)
dynamodbstreams.list_streams(Limit=100000)
def test_create_streams_wrong_type(dynamodb, dynamodbstreams, test_table):
with pytest.raises(ClientError, match='ValidationException'):
# should throw
test_table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'Fisk'});
# just in case the test fails, disable stream again
test_table.update(StreamSpecification={'StreamEnabled': False});
def test_list_streams_empty(dynamodb, dynamodbstreams, test_table):
streams = dynamodbstreams.list_streams(TableName=test_table.name)
assert 'Streams' in streams
assert not streams['Streams'] # empty
def test_list_streams_with_nonexistent_last_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
with pytest.raises(ClientError, match='ValidationException'):
streams = dynamodbstreams.list_streams(TableName=table.name, ExclusiveStartStreamArn='kossaapaaasdafsdaasdasdasdasasdasfadfadfasdasdas')
assert 'Streams' in streams
assert not streams['Streams'] # empty
# local java dynamodb does _not_ raise validation error for
# malformed stream arn here. verify
ensure_java_server(dynamodbstreams)
def test_describe_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
assert desc;
assert desc.get('StreamDescription')
assert desc['StreamDescription']['StreamArn'] == arn
assert desc['StreamDescription']['StreamStatus'] != 'DISABLED'
assert desc['StreamDescription']['StreamViewType'] == 'KEYS_ONLY'
assert desc['StreamDescription']['TableName'] == table.name
assert desc['StreamDescription'].get('Shards')
assert desc['StreamDescription']['Shards'][0].get('ShardId')
assert desc['StreamDescription']['Shards'][0].get('SequenceNumberRange')
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange'].get('StartingSequenceNumber')
# We don't know what the sequence number is supposed to be, but the
# DynamoDB documentation requires that it contains only numeric
# characters and some libraries rely on this. Reproduces issue #7158:
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'].isdecimal()
@pytest.mark.xfail(reason="alternator does not have creation time on streams")
def test_describe_stream_create_time(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
assert desc;
assert desc.get('StreamDescription')
# note these are non-required attributes
assert 'CreationRequestDateTime' in desc['StreamDescription']
def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')
def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
try:
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId='zzzzzzzzzzzzzzzzzzzzzzzzsfasdgagadfadfgagkjsdfsfsdjfjks')
assert not desc['StreamDescription']['Shards']
except:
# local java throws here. real does not.
ensure_java_server(dynamodbstreams, error=None)
def test_get_shard_iterator(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
desc = dynamodbstreams.describe_stream(StreamArn=arn)
shard_id = desc['StreamDescription']['Shards'][0]['ShardId'];
seq = desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'];
# spec says shard_id must be 65 chars or less
assert len(shard_id) <= 65
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
iter = dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
assert iter.get('ShardIterator')
for type in ['TRIM_HORIZON', 'LATEST']:
iter = dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
)
assert iter.get('ShardIterator')
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
# must have seq in these modes
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
)
for type in ['TRIM_HORIZON', 'LATEST']:
# should not set "seq" in these modes
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
# bad arn
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn='sdfadsfsdfsdgdfsgsfdabadfbabdadsfsdfsdfsdfsdfsdfsdfdfdssdffbdfdf', ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
)
# bad shard id
with pytest.raises(ClientError, match='ResourceNotFoundException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId='semprinidfaasdasfsdvacsdcfsdsvsdvsdvsdvsdvsdv',
ShardIteratorType='LATEST'
)
# bad iter type
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
ShardIteratorType='bulle', SequenceNumber=seq
)
# bad seq
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
ShardIteratorType='LATEST', SequenceNumber='sdfsafglldfngjdafnasdflgnaldklkafdsgklnasdlv'
)
def test_get_shard_iterator_for_nonexistent_stream(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
desc = dynamodbstreams.describe_stream(StreamArn=arn)
shards = desc['StreamDescription']['Shards']
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
dynamodbstreams.get_shard_iterator(
StreamArn='sdfadfsddafgdafsgjnadflkgnalngalsdfnlkasnlkasdfasdfasf', ShardId=shards[0]['ShardId'], ShardIteratorType='LATEST'
)
def test_get_shard_iterator_for_nonexistent_shard(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
streams = dynamodbstreams.list_streams(TableName=table.name)
arn = streams['Streams'][0]['StreamArn'];
with pytest.raises(ClientError, match='ResourceNotFoundException'):
dynamodbstreams.get_shard_iterator(
StreamArn=arn, ShardId='adfasdasdasdasdasdasdasdasdasasdasd', ShardIteratorType='LATEST'
)
def test_get_records(dynamodb, dynamodbstreams):
# TODO: add tests for storage/transactionable variations and global/local index
with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
p = 'piglet'
c = 'ninja'
val = 'lucifers'
val2 = 'flowers'
table.put_item(Item={'p': p, 'c': c, 'a1': val, 'a2': val2})
nval = 'semprini'
nval2 = 'nudge'
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={ 'a1': {'Value': nval, 'Action': 'PUT'},
'a2': {'Value': nval2, 'Action': 'PUT'}
})
has_insert = False
# in truth, we should sleep already here, since at least scylla
# will not be able to produce any stream content until
# ~30s after insert/update (confidence iterval)
# but it is useful to see a working null-iteration as well, so
# lets go already.
while True:
desc = dynamodbstreams.describe_stream(StreamArn=arn)
iterators = []
while True:
shards = desc['StreamDescription']['Shards']
for shard in shards:
shard_id = shard['ShardId']
start = shard['SequenceNumberRange']['StartingSequenceNumber']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']
iterators.append(iter)
last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")
if not last_shard:
break
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=last_shard)
next_iterators = []
while iterators:
iter = iterators.pop(0)
response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
if 'NextShardIterator' in response:
next_iterators.append(response['NextShardIterator'])
records = response.get('Records')
# print("Query {} -> {}".format(iter, records))
if records:
for record in records:
# print("Record: {}".format(record))
type = record['eventName']
dynamodb = record['dynamodb']
keys = dynamodb['Keys']
assert keys.get('p')
assert keys.get('c')
assert keys['p'].get('S')
assert keys['p']['S'] == p
assert keys['c'].get('S')
assert keys['c']['S'] == c
if type == 'MODIFY' or type == 'INSERT':
assert dynamodb.get('NewImage')
newimage = dynamodb['NewImage'];
assert newimage.get('a1')
assert newimage.get('a2')
if type == 'INSERT' or (type == 'MODIFY' and not has_insert):
assert newimage['a1']['S'] == val
assert newimage['a2']['S'] == val2
has_insert = True
continue
if type == 'MODIFY':
assert has_insert
assert newimage['a1']['S'] == nval
assert newimage['a2']['S'] == nval2
assert dynamodb.get('OldImage')
oldimage = dynamodb['OldImage'];
assert oldimage.get('a1')
assert oldimage.get('a2')
assert oldimage['a1']['S'] == val
assert oldimage['a2']['S'] == val2
return
print("Not done. Sleep 10s...")
time.sleep(10)
iterators = next_iterators
def test_get_records_nonexistent_iterator(dynamodbstreams):
with pytest.raises(ClientError, match='ValidationException'):
dynamodbstreams.get_records(ShardIterator='sdfsdfsgagaddafgagasgasgasdfasdfasdfasdfasdgasdasdgasdg', Limit=1000)
##############################################################################
# Fixtures for creating a table with a stream enabled with one of the allowed
# StreamViewType settings (KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES).
# Unfortunately changing the StreamViewType setting of an existing stream is
# not allowed (see test_streams_change_type), and while removing and re-adding
# a stream is possible, it is very slow. So we create four different fixtures
# with the four different StreamViewType settings for these four fixtures.
#
# It turns out that DynamoDB makes reusing the same table in different tests
# very difficult, because when we request a "LATEST" iterator we sometimes
# miss the immediately following write (this issue doesn't happen in
# ALternator, just in DynamoDB - presumably LATEST adds some time slack?)
# So all the fixtures we create below have scope="function", meaning that a
# separate table is created for each of the tests using these fixtures. This
# slows the tests down a bit, but not by much (about 0.05 seconds per test).
# It is still worthwhile to use a fixture rather than to create a table
# explicitly - it is convenient, safe (the table gets deleted automatically)
# and if in the future we can work around the DynamoDB problem, we can return
# these fixtures to module scope.
def create_table_ss(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def create_table_sss_lsi(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }, { 'AttributeName': 'a', 'AttributeType': 'S' }],
LocalSecondaryIndexes=[
{
'IndexName': 'a_idx',
'KeySchema': [
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'a', 'KeyType': 'RANGE' }],
'Projection': {
'ProjectionType': 'ALL',
}
}
],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def create_table_s_no_ck(dynamodb, dynamodbstreams, type):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
@pytest.fixture(scope="function")
def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams):
yield from create_table_sss_lsi(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
@pytest.fixture(scope="function")
def test_table_ss_keys_only(dynamodb, dynamodbstreams):
yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY')
@pytest.fixture(scope="function")
def test_table_ss_new_image(dynamodb, dynamodbstreams):
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE')
@pytest.fixture(scope="function")
def test_table_ss_old_image(dynamodb, dynamodbstreams):
yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE')
@pytest.fixture(scope="function")
def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
@pytest.fixture(scope="function")
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'KEYS_ONLY')
@pytest.fixture(scope="function")
def test_table_s_no_ck_new_image(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_IMAGE')
@pytest.fixture(scope="function")
def test_table_s_no_ck_old_image(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'OLD_IMAGE')
@pytest.fixture(scope="function")
def test_table_s_no_ck_new_and_old_images(dynamodb, dynamodbstreams):
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
# Test that it is, sadly, not allowed to use UpdateTable on a table which
# already has a stream enabled to change that stream's StreamViewType.
# Relates to #6939
def test_streams_change_type(test_table_ss_keys_only):
table, arn = test_table_ss_keys_only
with pytest.raises(ClientError, match='ValidationException.*already'):
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE'});
# If the above change succeeded (because of issue #6939), switch it back :-)
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# It is not allowed to enable stream for a table that already has a stream,
# even if of the same type.
def test_streams_enable_on_enabled(test_table_ss_new_and_old_images):
table, arn = test_table_ss_new_and_old_images
with pytest.raises(ClientError, match='ValidationException.*already.*enabled'):
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'});
# It is not allowed to disbale stream on a table that does not have a stream.
def test_streams_disable_on_disabled(test_table):
with pytest.raises(ClientError, match='ValidationException.*stream.*disable'):
test_table.update(StreamSpecification={'StreamEnabled': False});
# Utility function for listing all the shards of the given stream arn.
# Implemented by multiple calls to DescribeStream, possibly several pages
# until all the shards are returned. The return of this function should be
# cached - it is potentially slow, and DynamoDB documentation even states
# DescribeStream may only be called at a maximum rate of 10 times per second.
# list_shards() only returns the shard IDs, not the information about the
# shards' sequence number range, which is also returned by DescribeStream.
def list_shards(dynamodbstreams, arn):
# By default DescribeStream lists a limit of 100 shards. For faster
# tests we reduced the number of shards in the testing setup to
# 32 (16 vnodes x 2 cpus), see issue #6979, so to still exercise this
# paging feature, lets use a limit of 10.
limit = 10
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit)['StreamDescription']
assert len(response['Shards']) <= limit
shards = [x['ShardId'] for x in response['Shards']]
while 'LastEvaluatedShardId' in response:
# 7409 kinesis ignores LastEvaluatedShardId and just looks at last shard
assert shards[-1] == response['LastEvaluatedShardId']
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit,
ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
assert len(response['Shards']) <= limit
shards.extend([x['ShardId'] for x in response['Shards']])
print('Number of shards in stream: {}'.format(len(shards)))
assert len(set(shards)) == len(shards)
# 7409 - kinesis required shards to be in lexical order.
# verify.
assert shards == sorted(shards)
# special test: ensure we get nothing more if we ask for starting at the last
# of the last
response = dynamodbstreams.describe_stream(StreamArn=arn,
ExclusiveStartShardId=shards[-1])['StreamDescription']
assert len(response['Shards']) == 0
return shards
# Utility function for getting shard iterators starting at "LATEST" for
# all the shards of the given stream arn.
def latest_iterators(dynamodbstreams, arn):
iterators = []
for shard_id in list_shards(dynamodbstreams, arn):
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'])
assert len(set(iterators)) == len(iterators)
return iterators
# Similar to latest_iterators(), just also returns the shard id which produced
# each iterator.
def shards_and_latest_iterators(dynamodbstreams, arn):
shards_and_iterators = []
for shard_id in list_shards(dynamodbstreams, arn):
shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))
return shards_and_iterators
# Utility function for fetching more content from the stream (given its
# array of iterators) into an "output" array. Call repeatedly to get more
# content - the function returns a new array of iterators which should be
# used to replace the input list of iterators.
# Note that the order of the updates is guaranteed for the same partition,
# but cannot be trusted for *different* partitions.
def fetch_more(dynamodbstreams, iterators, output):
new_iterators = []
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'NextShardIterator' in response:
new_iterators.append(response['NextShardIterator'])
output.extend(response['Records'])
assert len(set(new_iterators)) == len(new_iterators)
return new_iterators
# Utility function for comparing "output" as fetched by fetch_more(), to a list
# expected_events, each of which looks like:
# [type, keys, old_image, new_image]
# where type is REMOVE, INSERT or MODIFY.
# The "mode" parameter specifies which StreamViewType mode (KEYS_ONLY,
# OLD_IMAGE, NEW_IMAGE, NEW_AND_OLD_IMAGES) was supposedly used to generate
# "output". This mode dictates what we can compare - e.g., in KEYS_ONLY mode
# the compare_events() function ignores the the old and new image in
# expected_events.
# The "expected_region" parameter specifies the value of awsRegion field.
# compare_events() throws an exception immediately if it sees an unexpected
# event, but if some of the expected events are just missing in the "output",
# it only returns false - suggesting maybe the caller needs to try again
# later - maybe more output is coming.
# Note that the order of events is only guaranteed (and therefore compared)
# inside a single partition.
def compare_events(expected_events, output, mode, expected_region):
# The order of expected_events is only meaningful inside a partition, so
# let's convert it into a map indexed by partition key.
expected_events_map = {}
for event in expected_events:
expected_type, expected_key, expected_old_image, expected_new_image = event
# For simplicity, we actually use the entire key, not just the partition
# key. We only lose a bit of testing power we didn't plan to test anyway
# (that events for different items in the same partition are ordered).
key = freeze(expected_key)
if not key in expected_events_map:
expected_events_map[key] = []
expected_events_map[key].append(event)
# Iterate over the events in output. An event for a certain key needs to
# be the *first* remaining event for this key in expected_events_map (and
# then we remove this matched even from expected_events_map)
for event in output:
# In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to
# a *different* value - 'scylladb:alternator'. Issue #6931.
assert 'eventSource' in event
# For lack of a direct equivalent of a region, Alternator provides the
# DC name instead. Reproduces #6931.
assert 'awsRegion' in event
assert event['awsRegion'] == expected_region
# Reproduces #6931.
assert 'eventVersion' in event
assert event['eventVersion'] in ['1.0', '1.1']
# Check that eventID appears, but can't check much on what it is.
assert 'eventID' in event
op = event['eventName']
record = event['dynamodb']
# record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we
# want to deserialize it to match our expected_events content.
deserializer = TypeDeserializer()
key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}
expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)
assert op == expected_type
assert record['StreamViewType'] == mode
# We don't know what ApproximateCreationDateTime should be, but we do
# know it needs to be a timestamp - there is conflicting documentation
# in what format (ISO 8601?). In any case, boto3 parses this timestamp
# for us, so we can't check it here, beyond checking it exists.
assert 'ApproximateCreationDateTime' in record
# We don't know what SequenceNumber is supposed to be, but the DynamoDB
# documentation requires that it contains only numeric characters and
# some libraries rely on this. This reproduces issue #7158:
assert 'SequenceNumber' in record
assert record['SequenceNumber'].isdecimal()
# Alternator doesn't set the SizeBytes member. Issue #6931.
#assert 'SizeBytes' in record
if mode == 'KEYS_ONLY':
assert not 'NewImage' in record
assert not 'OldImage' in record
elif mode == 'NEW_IMAGE':
assert not 'OldImage' in record
if expected_new_image == None:
assert not 'NewImage' in record
else:
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
assert expected_new_image == new_image
elif mode == 'OLD_IMAGE':
assert not 'NewImage' in record
if expected_old_image == None:
assert not 'OldImage' in record
else:
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
assert expected_old_image == old_image
elif mode == 'NEW_AND_OLD_IMAGES':
if expected_new_image == None:
assert not 'NewImage' in record
else:
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
assert expected_new_image == new_image
if expected_old_image == None:
assert not 'OldImage' in record
else:
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
assert expected_old_image == old_image
else:
pytest.fail('cannot happen')
# After the above loop, expected_events_map should remain empty arrays.
# If it isn't, one of the expected events did not yet happen. Return False.
for entry in expected_events_map.values():
if len(entry) > 0:
return False
return True
def fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode):
# Updating the stream is asynchronous. Moreover, it may even be delayed
# artificially (see Alternator's alternator_streams_time_window_s config).
# So if compare_events() reports the stream data is missing some of the
# expected events (by returning False), we need to retry it for some time.
# Note that compare_events() throws if the stream data contains *wrong*
# (unexpected) data, so even failing tests usually finish reasonably
# fast - depending on the alternator_streams_time_window_s parameter.
# This is optimization is important to keep *failing* tests reasonably
# fast and not have to wait until the following arbitrary timeout.
timeout = time.time() + 20
region = get_region(dynamodb)
output = []
while time.time() < timeout:
iterators = fetch_more(dynamodbstreams, iterators, output)
print("after fetch_more number expected_events={}, output={}".format(len(expected_events), len(output)))
if compare_events(expected_events, output, mode, region):
# success!
return
time.sleep(0.5)
# If we're still here, the last compare_events returned false.
pytest.fail('missing events in output: {}'.format(output))
# Convenience function used to implement several tests below. It runs a given
# function "updatefunc" which is supposed to do some updates to the table
# and also return an expected_events list. do_test() then fetches the streams
# data and compares it to the expected_events using compare_events().
def do_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode, p = random_string(), c = random_string()):
table, arn = test_table_ss_stream
iterators = latest_iterators(dynamodbstreams, arn)
expected_events = updatefunc(table, p, c)
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
def do_batch_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode, item_count = 3):
p = [random_string() for _ in range(item_count)]
c = [f"ck_{i}" for i in range(item_count)]
table, arn = test_table_ss_stream
iterators = latest_iterators(dynamodbstreams, arn)
expected_events = updatefunc(table, p, c)
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
# Test a single PutItem of a new item. Should result in a single INSERT
# event. Reproduces #6930.
def test_streams_putitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
return events
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
# Replacing an item should result in a MODIFY, rather than REMOVE and MODIFY.
# Moreover, the old item should be visible in OldImage. Reproduces #6930.
def test_streams_putitem_new_items_override_old(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'a': 1})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': 1}])
table.put_item(Item={'p': p, 'c': c, 'b': 2})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 1}, {'p': p, 'c': c, 'b': 2}])
table.put_item(Item={'p': p, 'c': c, 'a': 3, 'b': 4})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 2}, {'p': p, 'c': c, 'a': 3, 'b': 4}])
table.put_item(Item={'p': p, 'c': c})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 3, 'b': 4}, {'p': p, 'c': c}])
return events
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Same as test_streams_putitem_new_items_overrides_old, but for a replaced
# column is used in LSI. Reproduces #6930.
def test_streams_putitem_new_item_overrides_old_lsi(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.put_item(Item={'p': p, 'c': c, 'a': '1'})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': '1'}])
table.put_item(Item={'p': p, 'c': c})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': '1'}, {'p': p, 'c': c}])
assert len(full_scan(table, IndexName='a_idx')) == 0
return events
do_test(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test PutItem streams in a table with no clustering key.
def test_streams_putitem_no_ck_new_items_override_old(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, _):
events = []
table.put_item(Item={'p': p, 'a': 1})
events.append(['INSERT', {'p': p}, None, {'p': p, 'a': 1}])
table.put_item(Item={'p': p, 'b': 2})
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 1}, {'p': p, 'b': 2}])
table.put_item(Item={'p': p, 'a': 3, 'b': 4})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 2}, {'p': p, 'a': 3, 'b': 4}])
table.put_item(Item={'p': p})
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 3, 'b': 4}, {'p': p}])
return events
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# In test_streams_new_and_old_images among other things we test that a
# DeleteItem operation gives the correct old image. That test used a table
# which has a sort key, the following test checks this (old image in DeleteItem
# case) for a table that does NOT have a sort key. Alternator uses different
# types of tombstones for deletions from tables with a CK (row tombstone) vs.
# with a partition key only (partition tombstone) - and those may be handled
# differently by CDC. In issue #26382 - which this test reproduces - we
# discovered that our implementation doesn't select a preimage for partition
# deletions, resulting in a missing OldImage.
def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, _):
events = []
table.update_item(Key={'p': p},
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 1}])
table.delete_item(Key={'p': p})
events.append(['REMOVE', {'p': p}, {'p': p, 'x': 1}, None])
return events
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test a single UpdateItem. Should result in a single INSERT event.
# Reproduces #6918.
def test_streams_updateitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
return events
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
# Test OLD_IMAGE using UpdateItem. Verify that the OLD_IMAGE indeed includes,
# as needed, the entire old item and not just the modified columns.
# Reproduces issue #6935
def test_streams_updateitem_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# Note: The "None" as OldImage here tests that the OldImage should be
# missing because the item didn't exist. This reproduces issue #6933.
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
return events
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# Above we verified that if an item did not previously exist, the OLD_IMAGE
# would be missing, but if the item did previously exist, OLD_IMAGE should
# be present and must include the key. Here we confirm the special case the
# latter case - the case of a pre-existing *empty* item, which has just the
# key - in this case since the item did exist, OLD_IMAGE should be returned -
# and include just the key. This is a special case of reproducing #6935 -
# the first patch for this issue failed in this special case.
def test_streams_updateitem_old_image_empty_item(test_table_ss_old_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Create an *empty* item, with nothing except a key:
table.update_item(Key={'p': p, 'c': c})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
# Note that OLD_IMAGE should be present and be the empty item,
# with just a key, not entirely missing.
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}])
return events
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# Test that OLD_IMAGE indeed includes the entire old item and not just the
# modified attributes, in the special case of attributes which are a key of
# a secondary index.
# The unique thing about this case is that as currently implemented,
# secondary-index key attributes are real Scylla columns, contrasting with
# other attributes which are just elements of a map. And our CDC
# implementation treats those cases differently - when a map is modified
# the preimage includes the entire content of the map, but for regular
# columns they are only included in the preimage if they change.
# Currently fails in Alternator because the item's key is missing in
# OldImage (#6935) and the LSI key is also missing (#7030).
@pytest.fixture(scope="function")
def test_table_ss_old_image_and_lsi(dynamodb, dynamodbstreams):
table = create_test_table(dynamodb,
Tags=TAGS,
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'c', 'KeyType': 'RANGE'}],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
{ 'AttributeName': 'k', 'AttributeType': 'S' }],
LocalSecondaryIndexes=[{
'IndexName': 'index',
'KeySchema': [
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'k', 'KeyType': 'RANGE'}],
'Projection': { 'ProjectionType': 'ALL' }
}],
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE' })
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
yield table, arn
table.delete()
def test_streams_updateitem_old_image_lsi(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x, k = :k',
ExpressionAttributeValues={':x': 2, ':k': 'dog'})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
# In issue #7030, the 'k' value was missing from the OldImage.
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}, {'p': p, 'c': c, 'x': 2, 'k': 'dog', 'y': 3}])
return events
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# This test is the same as the previous (test_streams_updateitem_old_image_lsi)
# except that the *old* value of the LSI key k is missing. Since PR 8568, CDC
# adds a special deleted$k marker for a missing column in the preimage, and
# this test verifies that Alternator Streams doesn't put this extra marker in
# its output.
def test_streams_updateitem_old_image_lsi_missing_column(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Note that we do *not* set the "k" attribute (the LSI key)
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
# "k" should be missing from OldImage, because it wasn't set earlier.
# Verify the events contain only the expected attributes - not some
# internal markers like "deleted$k".
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
return events
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
# In general, DynamoDB doesn't emit events if an operation is a nop. This test
# test verifies that UpdateItem doesn't result in a log row when the updated
# item exists and it is identical to the old one.
# Corresponding tests for PutItem are included in tests based on do_updates_1.
# The case for a PutItem within a BatchWriteItem is tested in
# test_streams_batch_overwrite_identical. Reproduces #6918.
def test_streams_updateitem_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Overwriting the old item with an identical new item shouldn't produce
# any events.
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='ADD x :x',
ExpressionAttributeValues={':x': 0})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :x',
ExpressionAttributeValues={':x': 2})
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# The above test_streams_updateitem_identical tested that if we UpdateItem an
# item changing an attribute to a value identical to the one it already has,
# no event is generated on the stream. In this test we change an attribute
# value from one map to another which are really the same value, but have
# different serialization (the map's elements are ordered differently).
# Since the value is nevertheless the same, here too we expect not to see
# a change event in the stream. Reproduces issue #27375.
@pytest.mark.xfail(reason="issue #27375")
def test_streams_updateitem_equal_but_not_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# We use manual_request() to let us be sure that we pass the JSON
# values to the server in exactly the order we want to, without the
# Python SDK possibly rearranging them.
# Set x to be a map: {'dog': 1, 'cat': 2, 'mouse': 3}.
payload = '''{
"TableName": "''' + table.name + '''",
"Key": {"p": {"S": "''' + p + '''"}, "c": {"S": "''' + c + '''"}},
"UpdateExpression": "SET x = :x",
"ExpressionAttributeValues": {":x": %s}
}'''
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': {'dog': 1, 'cat': 2, 'mouse': 3}}])
# Overwriting x an identical map item shouldn't produce any events,
# so we won't add anything to the "events" list:
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
# Now try to overwrite x with a map that has the same elements in
# a different order. These two values, despite not being identical
# in JSON form, should be considered equal and again no event should
# be generated.
manual_request(dynamodb, 'UpdateItem',
payload % '{"M": {"cat": {"N": "2"}, "dog": {"N": "1"}, "mouse": {"N": "3"}}}')
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Tests that deleting a missing attribute with UpdateItem doesn't generate a
# REMOVE event. Other cases are tested in test_streams_batch_delete_missing
# and in tests based on do_updates_1. Reproduces #6918.
def test_streams_updateitem_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
# Create an item
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}])
# Deleting a missing attribute shouldn't produce any event for both
# AttributeUpdates and UpdateExpression.
table.update_item(Key={'p': p, 'c': c},
AttributeUpdates={'y': {'Action': 'DELETE'}})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE z')
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Tests similar to the above tests for OLD_IMAGE, just for NEW_IMAGE mode.
# Verify that the NEW_IMAGE includes the entire old item (including the key),
# that deleting the item results in a missing NEW_IMAGE, and that setting the
# item to be empty has a different result - a NEW_IMAGE with just a key.
# Reproduces issue #7107.
def test_streams_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# Confirm that when adding one attribute "x", the NewImage contains both
# the new x, and the key columns (p and c).
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Confirm that when adding just attribute "y", the NewImage will contain
# all the attributes, including the old "x":
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
# Confirm that when deleting the columns x and y, the NewImage becomes
# empty - but still exists and contains the key columns,
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE x, y')
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
# Confirm that deleting the item results in a missing NewImage:
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c}, None])
return events
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
# Test similar to the above test for NEW_IMAGE corner cases, but here for
# NEW_AND_OLD_IMAGES mode.
# Although it is likely that if both OLD_IMAGE and NEW_IMAGE work correctly then
# so will the combined NEW_AND_OLD_IMAGES mode, it is still possible that the
# implementation of the combined mode has unique bugs, so it is worth testing
# it separately.
# Reproduces issue #7107.
def test_streams_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, p, c):
events = []
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
# The item doesn't yet exist, so OldImage is missing.
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# Confirm that when adding just attribute "y", the NewImage will contain
# all the attributes, including the old "x". Also, OldImage contains the
# key attributes, not just "x":
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
# Confirm that when deleting the attributes x and y, the NewImage becomes
# empty - but still exists and contains the key attributes:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='REMOVE x, y')
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
# Confirm that when adding an attribute z to the empty item, OldItem is
# not missing - it just contains only the key attributes:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET z = :val1', ExpressionAttributeValues={':val1': 4})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}])
# Confirm that deleting the item results in a missing NewImage:
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}, None])
return events
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# Test that when a stream shard has no data to read, GetRecords returns an
# empty Records array - not a missing one. Reproduces issue #6926.
def test_streams_no_records(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
# Get just one shard - any shard - and its LATEST iterator. Because it's
# LATEST, there will be no data to read from this iterator.
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
shard_id = shard['ShardId']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'NextShardIterator' in response or 'EndingSequenceNumber' in shard['SequenceNumberRange']
assert 'Records' in response
# We expect Records to be empty - there is no data at the LATEST iterator.
assert response['Records'] == []
# Test that after fetching the last result from a shard, we don't get it
# yet again. Reproduces issue #6942.
def test_streams_last_result(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually (we may need to retry this for a while), *one* of the
# stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Found the shard with the data! Test that it only has
# one event and that if we try to read again, we don't
# get more data (this was issue #6942).
assert len(response['Records']) == 1
assert 'NextShardIterator' in response
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert response['Records'] == []
return
time.sleep(0.5)
pytest.fail("timed out")
# In test_streams_last_result above we tested that there is no further events
# after reading the only one. In this test we verify that if we *do* perform
# another change on the same key, we do get another event and it happens on the
# *same* shard.
def test_streams_another_result(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Finally found the shard reporting the changes to this key
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
iter = response['NextShardIterator']
# Found the shard with the data. It only has one event so if
# we try to read again, we find nothing (this is the same as
# what test_streams_last_result tests).
response = dynamodbstreams.get_records(ShardIterator=iter)
assert response['Records'] == []
assert 'NextShardIterator' in response
iter = response['NextShardIterator']
# Do another UpdateItem operation to the same key, so it is
# expected to write to the *same* shard:
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val2', ExpressionAttributeValues={':val2': 7})
# Again we may need to wait for the event to appear on the stream:
timeout = time.time() + 15
while time.time() < timeout:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
# The test is done, successfully.
return
time.sleep(0.5)
pytest.fail("timed out")
time.sleep(0.5)
pytest.fail("timed out")
# Test the SequenceNumber attribute returned for stream events, and the
# "AT_SEQUENCE_NUMBER" iterator that can be used to re-read from the same
# event again given its saved "sequence number".
def test_streams_at_sequence_number(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the one event:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Finally found the shard reporting the changes to this key:
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert 'NextShardIterator' in response
sequence_number = response['Records'][0]['dynamodb']['SequenceNumber']
# Found the shard with the data. It only has one event so if
# we try to read again, we find nothing (this is the same as
# what test_streams_last_result tests).
iter = response['NextShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert response['Records'] == []
assert 'NextShardIterator' in response
# If we use the SequenceNumber of the first event to create an
# AT_SEQUENCE_NUMBER iterator, we can read the same event again.
# We don't need a loop and a timeout, because this event is already
# available.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=sequence_number)['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the SequenceNumber attribute returned for stream events, and the
# "AFTER_SEQUENCE_NUMBER" iterator that can be used to re-read *after* the same
# event again given its saved "sequence number".
def test_streams_after_sequence_number(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
# #7424 - AWS sdk assumes sequence numbers can be compared
# as bigints, and are monotonically growing.
assert int(sequence_number_1) < int(sequence_number_2)
# If we use the SequenceNumber of the first event to create an
# AFTER_SEQUENCE_NUMBER iterator, we can read the second event
# (only) again. We don't need a loop and a timeout, because this
# event is already available.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AFTER_SEQUENCE_NUMBER',
SequenceNumber=sequence_number_1)['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 1
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_2
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the "TRIM_HORIZON" iterator, which can be used to re-read *all* the
# previously-read events of the stream shard again.
# NOTE: This test relies on the test_table_ss_keys_only fixture giving us a
# brand new stream, with no old events saved from other tests. If we ever
# change this, we should change this test to use a different fixture.
def test_streams_trim_horizon(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
# If we use the TRIM_HORIZON iterator, we should receive the
# same two events again, in the same order.
# Note that we assume that the fixture gave us a brand new
# stream, with no old events saved from other tests. If we
# couldn't assume this, this test would need to become much
# more complex, and would need to read from this shard until
# we find the two events we are looking for.
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert 'Records' in response
assert len(response['Records']) == 2
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_1
assert response['Records'][1]['dynamodb']['SequenceNumber'] == sequence_number_2
return
time.sleep(0.5)
pytest.fail("timed out")
# Test the StartingSequenceNumber information returned by DescribeStream.
# The DynamoDB documentation explains that StartingSequenceNumber is
# "The first sequence number for the stream records contained within a shard"
# which suggests that is should be the sequence number of the first event
# that can actually be read from the shard. However, in Alternator the
# StartingSequenceNumber may be *before* any of the events on the shard -
# it is the time that the shard was created. In Alternator, it is possible
# that no event appeared on this shard for some time after it was created,
# and it is also possible that after more than 24 hours, some of the original
# items that existed in the shard have expired. Nevertheless, we believe
# that the important thing is that reading a shard starting at
# StartingSequenceNumber will result in reading all the available items -
# similar to how TRIM_HORIZON works. This is what the following test verifies.
def test_streams_starting_sequence_number(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
# Do two UpdateItem operations to the same key, that are expected to leave
# two events in the stream.
p = random_string()
c = random_string()
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Get for all the stream shards the iterator starting at the shard's
# StartingSequenceNumber:
response = dynamodbstreams.describe_stream(StreamArn=arn)
shards = response['StreamDescription']['Shards']
while 'LastEvaluatedShardId' in response['StreamDescription']:
response = dynamodbstreams.describe_stream(StreamArn=arn,
ExclusiveStartShardId=response['StreamDescription']['LastEvaluatedShardId'])
shards.extend(response['StreamDescription']['Shards'])
iterators = []
for shard in shards:
shard_id = shard['ShardId']
start = shard['SequenceNumberRange']['StartingSequenceNumber']
assert start.isdecimal()
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=start)['ShardIterator'])
# Eventually, *one* of the stream shards will return the two events:
timeout = time.time() + 15
while time.time() < timeout:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) == 2:
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
return
time.sleep(0.5)
pytest.fail("timed out")
# Above we tested some specific operations in small tests aimed to reproduce
# a specific bug, in the following tests we do a all the different operations,
# PutItem, DeleteItem, BatchWriteItem and UpdateItem, and check the resulting
# stream for correctness.
# The following tests focus on multiple operations on the *same* item. Those
# should appear in the stream in the correct order.
# Reproduces #6918, #6930.
def do_updates_1(table, p, c):
events = []
# a first put_item appears as an INSERT event. Note also empty old_image.
table.put_item(Item={'p': p, 'c': c, 'x': 2})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
table.put_item(Item={'p': p, 'c': c, 'x': 2})
# a second put_item of the *same* key and different value, appears as a MODIFY event
table.put_item(Item={'p': p, 'c': c, 'y': 3})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'y': 3}])
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
table.delete_item(Key={'p': p, 'c': c})
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}, None])
# deleting a non-existent item doesn't appear in the log at all.
table.delete_item(Key={'p': p, 'c': c})
# If update_item creates an item, the event is INSERT as well.
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET b = :val1',
ExpressionAttributeValues={':val1': 4})
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'b': 4}])
# If update_item modifies the item, note how old and new image includes both old and new columns
table.update_item(Key={'p': p, 'c': c},
UpdateExpression='SET x = :val1',
ExpressionAttributeValues={':val1': 5})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4}, {'p': p, 'c': c, 'b': 4, 'x': 5}])
# TODO: incredibly, if we uncomment the "REMOVE b" update below, it will be
# completely missing from the DynamoDB stream - the test continues to
# pass even though we didn't add another expected event, and even though
# the preimage in the following expected event includes this "b" we will
# remove. I couldn't reproduce this apparent DynamoDB bug in a smaller test.
#table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE b')
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])
return events
# The tested events are the same as in do_updates_1, but the table doesn't have
# a clustering key.
def do_updates_1_no_ck(table, p, _):
events = []
# a first put_item appears as an INSERT event. Note also empty old_image.
table.put_item(Item={'p': p, 'x': 2})
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 2}])
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
table.put_item(Item={'p': p, 'x': 2})
# a second put_item of the *same* key and different value, appears as a MODIFY event
table.put_item(Item={'p': p, 'y': 3})
events.append(['MODIFY', {'p': p}, {'p': p, 'x': 2}, {'p': p, 'y': 3}])
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
table.delete_item(Key={'p': p})
events.append(['REMOVE', {'p': p}, {'p': p, 'y': 3}, None])
# deleting a non-existent item doesn't appear in the log at all.
table.delete_item(Key={'p': p})
# If update_item creates an item, the event is INSERT as well.
table.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ExpressionAttributeValues={':val1': 4})
events.append(['INSERT', {'p': p}, None, {'p': p, 'b': 4}])
# If update_item modifies the item, note how old and new image includes both old and new columns
table.update_item(Key={'p': p},
UpdateExpression='SET x = :val1',
ExpressionAttributeValues={':val1': 5})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4}, {'p': p, 'b': 4, 'x': 5}])
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]})
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}])
return events
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY')
def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE')
def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE')
def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')
def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY')
def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE')
def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE')
def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')
# Tests that a DeleteItem within a BatchWriteItem that tries to remove a
# missing item doesn't generate a REMOVE event.
# Reproduces #6918.
def test_streams_batch_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, ps, cs):
# Deleting items that don't exist shouldn't produce any events.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'DeleteRequest': {'Key': {'p': p, 'c': c}}} for p, c in zip(ps, cs)]
})
return []
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# This test verifies that a PutItem within a BatchWriteItem generates no event
# when it replaces an item with an identical one. This no-op behaviour is also
# tested for a standard PutItem (see tests based on do_updates_1), and an
# UpdateItem (see test_streams_updateitem_overwrite_identical). Reproduces
# #6918.
def test_streams_batch_overwrite_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
# Batch PutItem identical items
def do_updates(table, ps, cs):
# Emit a separate event for each item in the batch.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
})
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': i}] for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
# Overwriting with identical items shouldn't produce any events
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
})
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
def test_streams_batch_overwrite_different(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
def do_updates(table, ps, cs):
# Emit a separate event for each item in the batch.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 1}}} for p, c in zip(ps, cs)]
})
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}] for p, c in zip(ps, cs)]
# ... but overwriting with different items should be reported as MODIFY.
table.meta.client.batch_write_item(RequestItems = {
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 2}}} for p, c in zip(ps, cs)]
})
events.extend([['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 1}, {'p': p, 'c': c, 'x': 2}] for p, c in zip(ps, cs)])
return events
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
# A fixture which creates a test table with a stream enabled, and returns a
# bunch of interesting information collected from the CreateTable response.
# This fixture is module-scoped - it can be shared by multiple tests below,
# because we are not going to actually use or change this stream, we will
# just do multiple tests on its setup.
@pytest.fixture(scope="module")
def test_table_stream_with_result(dynamodb, dynamodbstreams):
tablename = unique_table_name()
result = dynamodb.meta.client.create_table(TableName=tablename,
Tags=TAGS,
BillingMode='PAY_PER_REQUEST',
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
],
AttributeDefinitions=[
{ 'AttributeName': 'p', 'AttributeType': 'S' },
{ 'AttributeName': 'c', 'AttributeType': 'S' },
])
waiter = dynamodb.meta.client.get_waiter('table_exists')
waiter.config.delay = 1
waiter.config.max_attempts = 200
waiter.wait(TableName=tablename)
table = dynamodb.Table(tablename)
yield result, table
while True:
try:
table.delete()
return
except ClientError as ce:
# if the table has a stream currently being created we cannot
# delete the table immediately. Again, only with real dynamo
if ce.response['Error']['Code'] == 'ResourceInUseException':
print('Could not delete table yet. Sleeping 5s.')
time.sleep(5)
continue;
raise
# Wait for a table to return to "ACTIVE" status. We need to call this after
# doing an UpdateTable to a table - because before this wait finishes we are
# not allowed to update the same table again or delete it.
def wait_for_status_active(table):
for i in range(60):
desc = table.meta.client.describe_table(TableName=table.name)
if desc['Table']['TableStatus'] == 'ACTIVE':
return
time.sleep(1)
pytest.fail("wait_for_status_active() timed out")
# Test that in a table with Streams enabled, LatestStreamArn is returned
# by CreateTable, DescribeTable and UpdateTable, and is the same ARN as
# returned by ListStreams. Reproduces issue #7157.
def test_latest_stream_arn(test_table_stream_with_result, dynamodbstreams):
(result, table) = test_table_stream_with_result
assert 'LatestStreamArn' in result['TableDescription']
arn_in_create_table = result['TableDescription']['LatestStreamArn']
# Check that ListStreams returns the same stream ARN as returned
# by the original CreateTable
(arn_in_list_streams, label) = wait_for_active_stream(dynamodbstreams, table)
assert arn_in_create_table == arn_in_list_streams
# Check that DescribeTable also includes the same LatestStreamArn:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'LatestStreamArn' in desc
assert desc['LatestStreamArn'] == arn_in_create_table
# Check that UpdateTable also includes the same LatestStreamArn.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert desc['LatestStreamArn'] == arn_in_create_table
wait_for_status_active(table)
# Test that in a table with Streams enabled, LatestStreamLabel is returned
# by CreateTable, DescribeTable and UpdateTable, and is the same "label" as
# returned by ListStreams. Reproduces issue #7162.
def test_latest_stream_label(test_table_stream_with_result, dynamodbstreams):
(result, table) = test_table_stream_with_result
assert 'LatestStreamLabel' in result['TableDescription']
label_in_create_table = result['TableDescription']['LatestStreamLabel']
# Check that ListStreams returns the same stream label as returned
# by the original CreateTable
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
assert label_in_create_table == label
# Check that DescribeTable also includes the same LatestStreamLabel:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'LatestStreamLabel' in desc
assert desc['LatestStreamLabel'] == label_in_create_table
# Check that UpdateTable also includes the same LatestStreamLabel.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert desc['LatestStreamLabel'] == label_in_create_table
wait_for_status_active(table)
# Test that in a table with Streams enabled, StreamSpecification is returned
# by CreateTable, DescribeTable and UpdateTable. Reproduces issue #7163.
def test_stream_specification(test_table_stream_with_result, dynamodbstreams):
# StreamSpecification as set in test_table_stream_with_result:
stream_specification = {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}
(result, table) = test_table_stream_with_result
assert 'StreamSpecification' in result['TableDescription']
assert stream_specification == result['TableDescription']['StreamSpecification']
# Check that DescribeTable also includes the same StreamSpecification:
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'StreamSpecification' in desc
assert stream_specification == desc['StreamSpecification']
# Check that UpdateTable also includes the same StreamSpecification.
# The "update" changes nothing (it just sets BillingMode to what it was).
desc = table.meta.client.update_table(TableName=table.name,
BillingMode='PAY_PER_REQUEST')['TableDescription']
assert stream_specification == desc['StreamSpecification']
wait_for_status_active(table)
# The following test checks the behavior of *closed* shards.
# We achieve a closed shard by disabling the stream - the DynamoDB
# documentation states that "If you disable a stream, any shards that are
# still open will be closed. The data in the stream will continue to be
# readable for 24 hours". In the test we verify that indeed, after a shard
# is closed, it is still readable with GetRecords (reproduces issue #7239).
# Moreover, upon reaching the end of data in the shard, the NextShardIterator
# attribute should say that the end was reached. The DynamoDB documentation
# says that NextShardIterator should be "set to null" in this case - but it
# is not clear what "null" means in this context: Should NextShardIterator
# be missing? Or a "null" JSON type? Or an empty string? This test verifies
# that the right answer is that NextShardIterator should be *missing*
# (reproduces issue #7237).
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
# Disable streaming for this table. Note that the test_table_ss_keys_only
# fixture has "function" scope so it is fine to ruin table, it will not
# be used in other tests.
disable_stream(dynamodbstreams, table)
# Even after streaming is disabled for the table, we can still read
# from the earlier stream (it is guaranteed to work for 24 hours).
# The iterators we got earlier should still be fully usable, and
# eventually *one* of the stream shards will return one event:
timeout = time.time() + 15
while time.time() < timeout:
for (shard_id, iter) in shards_and_iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and response['Records'] != []:
# Found the shard with the data! Test that it only has
# one event. NextShardIterator should either be missing now,
# indicating that it is a closed shard (DynamoDB does this),
# or, it may (and currently does in Alternator) return another
# and reading from *that* iterator should then tell us that
# we reached the end of the shard (i.e., zero results and
# missing NextShardIterator).
assert len(response['Records']) == 1
if 'NextShardIterator' in response:
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
# Until now we verified that we can read the closed shard
# using an old iterator. Let's test now that the closed
# shard id is also still valid, and a new iterator can be
# created for it, and the old data can be read from it:
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
response = dynamodbstreams.get_records(ShardIterator=iter)
assert len(response['Records']) == 1
return
time.sleep(0.5)
pytest.fail("timed out")
# In the above test (test_streams_closed_read) we used a disabled stream as
# a means to generate a closed shard, and tested the behavior of that closed
# shard. In the following test, we do more extensive testing on the the
# behavior of a disabled stream and verify that it is sill usable (for 24
# hours), reproducing issue #7239: The disabled stream's ARN should still be
# listed for the table, this ARN should continue to work, listing the
# stream's shards should give an indication that they are all closed - but
# all these shards should still be readable.
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
def test_streams_disabled_stream(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
iterators = latest_iterators(dynamodbstreams, arn)
# Do an UpdateItem operation that is expected to leave one event in the
# stream.
table.update_item(Key={'p': random_string(), 'c': random_string()},
UpdateExpression='SET x = :x', ExpressionAttributeValues={':x': 5})
# Wait for this one update to become available in the stream before we
# disable the stream. Otherwise, theoretically (although unlikely in
# practice) we may disable the stream before the update was saved to it.
timeout = time.time() + 15
found = False
while time.time() < timeout and not found:
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response and len(response['Records']) > 0:
found = True
break
time.sleep(0.5)
assert found
# Disable streaming for this table. Note that the test_table_ss_keys_only
# fixture has "function" scope so it is fine to ruin table, it will not
# be used in other tests.
disable_stream(dynamodbstreams, table)
# Check that the stream ARN which we previously got for the disabled
# stream is still listed by ListStreams
arns = [stream['StreamArn'] for stream in dynamodbstreams.list_streams(TableName=table.name)['Streams']]
assert arn in arns
# DescribeStream on the disabled stream still works and lists its shards.
# All these shards are listed as being closed (i.e., should have
# EndingSequenceNumber). The basic details of the stream (e.g., the view
# type) are available and the status of the stream is DISABLED.
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info = response['Shards']
while 'LastEvaluatedShardId' in response:
response = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
assert response['StreamStatus'] == 'DISABLED'
assert response['StreamViewType'] == 'KEYS_ONLY'
assert response['TableName'] == table.name
shards_info.extend(response['Shards'])
print('Number of shards in stream: {}'.format(len(shards_info)))
for shard in shards_info:
assert 'EndingSequenceNumber' in shard['SequenceNumberRange']
assert shard['SequenceNumberRange']['EndingSequenceNumber'].isdecimal()
# We can get TRIM_HORIZON iterators for all these shards, to read all
# the old data they still have (this data should be saved for 24 hours
# after the stream was disabled)
iterators = []
for shard in shards_info:
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'])
# We can read the one change we did in one of these iterators. The data
# should be available immediately - no need for retries with timeout.
nrecords = 0
for iter in iterators:
response = dynamodbstreams.get_records(ShardIterator=iter)
if 'Records' in response:
nrecords += len(response['Records'])
# The shard is closed, so NextShardIterator should either be missing
# now, indicating that it is a closed shard (DynamoDB does this),
# or, it may (and currently does in Alternator) return an iterator
# and reading from *that* iterator should then tell us that
# we reached the end of the shard (i.e., zero results and
# missing NextShardIterator).
if 'NextShardIterator' in response:
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
assert len(response['Records']) == 0
assert not 'NextShardIterator' in response
assert nrecords == 1
# When streams are enabled for a table, we get a unique ARN which should be
# unique but not change unless streams are eventually disabled for this table.
# If this ARN changes unexpectedly, it can confuse existing readers who are
# still using this to read from the stream. We (incorrectly) suspected in
# issue #12601 that changes to the version of the schema lead to a change of
# the ARN. In this test we show that it doesn't happen.
def test_stream_arn_unchanging(dynamodb, dynamodbstreams):
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
# Change a tag on the table. This changes its schema.
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'animal', 'Value': 'dog' }])
# The change in the table's schema should not change its stream ARN
streams = dynamodbstreams.list_streams(TableName=table.name)
assert 'Streams' in streams
assert len(streams['Streams']) == 1
assert streams['Streams'][0]['StreamArn'] == arn
# Enabling a stream shouldn't cause any extra table to appear in ListTables.
# In issue #19911, enabling streams on a table called xyz caused the name
# "xyz_scylla_cdc_log" to appear in ListTables. The following test creates
# a table with a long unique name, and ensures that only one table containing
# this name as a substring is listed.
# In test_gsi.py and test_lsi.py we have similar tests for GSI and LSI.
# Reproduces #19911
def test_stream_list_tables(dynamodb):
with new_test_table(dynamodb,
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
# Check that the test table is listed by ListTable, but its long
# and unique name (created by unique_table_name()) isn't a proper
# substring of any other table's name.
tables = list_tables(dynamodb)
assert table.name in tables
for listed_name in tables:
if table.name != listed_name:
assert table.name not in listed_name
# The DynamoDB documentation for GetRecords says that "GetRecords can retrieve
# a maximum of 1 MB of data or 1000 stream records, whichever comes first.",
# and that for Limit, "the upper limit is 1000". Indeed, if we try Limit=1001,
# we get a ValidationException. There is no reason why Alternator must
# implement exactly the same maximum, but since it's documented, there is
# no reason not to. In any case, some maximum is needed unless we make
# sure the relevant code (executor::get_records()) has preemption points -
# and currently it does not. Reproduces issue #23534
def test_get_records_too_high_limit(test_table_ss_keys_only, dynamodbstreams):
table, arn = test_table_ss_keys_only
# Get just one shard - any shard - and its LATEST iterator. Because it's
# LATEST, there will be no data to read from this iterator, but we don't
# care, we just want to run the GetRecords request, we don't care what
# is returned.
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
shard_id = shard['ShardId']
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
# Limit=1000 should be allowed:
dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
# Limit=1001 should NOT be allowed
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=1001)
# Limit must be >= 0:
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=0)
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
dynamodbstreams.get_records(ShardIterator=iter, Limit=-1)
# padded_name() creates a unique name of given length by taking the
# output of unique_table_name() and padding it with extra 'x' characters:
def padded_name(length):
u = unique_table_name()
assert length >= len(u)
return u + 'x'*(length-len(u))
# When Alternator enables streams, CDC creates a new table whose name is the
# same as the existing table with the suffix "_scylla_cdc_log". In issue
# #24598, we discovered that we could create a table with length 222 but
# then creating the stream crashed Scylla. We accept that either the table
# creation or the stream creation can fail if we lower the limits in the
# future, but it mustn't crash as it did in #24598.
# We have two versions of this test - one with the stream created with the
# table, and one with the stream added to the existing table.
# Reproduces #24598
def test_stream_table_name_length_222_create(dynamodb):
try:
with new_test_table(dynamodb, name=padded_name(222),
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
pass
except ClientError as e:
# In Alternator, we may decide that 222 is too long to create a table,
# or too long to create the stream. But if we reached here, at least
# it didn't crash.
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
# Reproduces #24598
def test_stream_table_name_length_222_update(dynamodb, dynamodbstreams):
try:
with new_test_table(dynamodb, name=padded_name(222),
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# DynamoDB doesn't allow deleting the base table while the stream
# is in the process of being added
wait_for_active_stream(dynamodbstreams, table)
except ClientError as e:
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
# When the table has a shorter name length, like 192, we should be able to
# create both the table and streams, with no problems.
def test_stream_table_name_length_192_create(dynamodb):
with new_test_table(dynamodb, name=padded_name(192),
Tags=TAGS,
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
pass
def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams):
with new_test_table(dynamodb, name=padded_name(192),
Tags=TAGS,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
) as table:
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
# DynamoDB doesn't allow deleting the base table while the stream
# is in the process of being added
wait_for_active_stream(dynamodbstreams, table)
# TODO: tests on multiple partitions
# TODO: write a test that disabling the stream and re-enabling it works, but
# requires the user to wait for the first stream to become DISABLED before
# creating the new one. Then ListStreams should return the two streams,
# one DISABLED and one ENABLED? I'm not sure we want or can do this in
# Alternator.
# TODO: Can we test shard splitting? (shard splitting
# requires the user to - periodically or following shards ending - to call
# DescribeStream again. We don't do this in any of our tests.