Previously, all stream-table fixtures in this test file used scope="function", forcing a fresh table to be created for every test, slowing down the test a bit (though not much), and discouraging writing small new tests. This was a workaround for a DynamoDB quirk (that Alternator doesn't have): LATEST shard iterators have a time slack and may point slightly before the true stream head, causing leftover events from a previous test to appear in the next test's reads. We fix this by draining the stream inside latest_iterators() and shards_and_latest_iterators() after obtaining the LATEST iterators: fetch records in a loop until two consecutive polling rounds both return empty, guaranteeing the iterators are positioned past all pre-existing events before the caller writes anything. With this guarantee in place, all stream-table fixtures can safely use scope="module". After this patch, test_streams.py continues to pass on DynamoDB. On Alternator, the test file's run time went down a bit, from 20.2 seconds to 17.7 seconds. Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2070 lines
116 KiB
Python
2070 lines
116 KiB
Python
# Copyright 2020-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
# Tests for stream operations: ListStreams, DescribeStream, GetShardIterator,
|
|
# GetRecords.
|
|
|
|
import time
|
|
import urllib.request
|
|
from contextlib import contextmanager, ExitStack
|
|
from urllib.error import URLError
|
|
|
|
import pytest
|
|
from boto3.dynamodb.types import TypeDeserializer
|
|
from botocore.exceptions import ClientError
|
|
|
|
from test.alternator.util import is_aws, scylla_config_temporary, unique_table_name, create_test_table, new_test_table, random_string, full_scan, freeze, list_tables, get_region, manual_request
|
|
|
|
# All tests in this file are expected to fail with tablets due to #23838.
|
|
# To ensure that Alternator Streams is still being tested, instead of
|
|
# xfailing these tests, we temporarily coerce the tests below to avoid
|
|
# using default tablets setting, even if it's available. We do this by
|
|
# using the following tags when creating each table below:
|
|
TAGS = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
|
|
|
|
stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']
|
|
|
|
def disable_stream(dynamodbstreams, table):
|
|
table.update(StreamSpecification={'StreamEnabled': False});
|
|
# Wait for the stream to really be disabled. A table may have multiple
|
|
# historic streams - we need all of them to become DISABLED. One of
|
|
# them (the current one) may remain DISABLING for some time.
|
|
exp = time.process_time() + 60
|
|
while time.process_time() < exp:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
disabled = True
|
|
for stream in streams['Streams']:
|
|
desc = dynamodbstreams.describe_stream(StreamArn=stream['StreamArn'])['StreamDescription']
|
|
if desc['StreamStatus'] != 'DISABLED':
|
|
disabled = False
|
|
break
|
|
if disabled:
|
|
print('disabled stream on {}'.format(table.name))
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# Cannot use fixtures. Because real dynamodb cannot _remove_ a stream
|
|
# once created. It can only expire 24h later. So reusing test_table for
|
|
# example works great for scylla/local testing, but once we run against
|
|
# actual aws instances, we get lingering streams, and worse: cannot
|
|
# create new ones to replace it, because we have overlapping types etc.
|
|
#
|
|
# So we have to create and delete a table per test. And not run this
|
|
# test to often against aws.
|
|
@contextmanager
|
|
def create_stream_test_table(dynamodb, StreamViewType=None):
|
|
spec = { 'StreamEnabled': False }
|
|
if StreamViewType != None:
|
|
spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}
|
|
table = create_test_table(dynamodb, StreamSpecification=spec,
|
|
Tags=TAGS,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
|
|
],
|
|
AttributeDefinitions=[
|
|
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
|
])
|
|
try:
|
|
yield table
|
|
finally:
|
|
print(f"Deleting table {table.name}")
|
|
while True:
|
|
try:
|
|
table.delete()
|
|
break
|
|
except ClientError as ce:
|
|
# if the table has a stream currently being created we cannot
|
|
# delete the table immediately. Again, only with real dynamo
|
|
if ce.response['Error']['Code'] == 'ResourceInUseException':
|
|
print('Could not delete table yet. Sleeping 5s.')
|
|
time.sleep(5)
|
|
continue;
|
|
raise
|
|
|
|
def wait_for_active_stream(dynamodbstreams, table, timeout=60):
|
|
exp = time.process_time() + timeout
|
|
while time.process_time() < exp:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
for stream in streams['Streams']:
|
|
arn = stream['StreamArn']
|
|
if arn == None:
|
|
continue
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
|
|
if not 'StreamStatus' in desc or desc.get('StreamStatus') == 'ENABLED':
|
|
return (arn, stream['StreamLabel']);
|
|
# real dynamo takes some time until a stream is usable
|
|
print("Stream not available. Sleep 5s...")
|
|
time.sleep(5)
|
|
assert False
|
|
|
|
# Local java dynamodb server version behaves differently from
|
|
# the "real" one. Most importantly, it does not verify a number of
|
|
# parameters, and consequently does not throw when called with borked
|
|
# args. This will try to check if we are in fact running against
|
|
# this test server, and if so, just raise the error here and be done
|
|
# with it. All this just so we can run through the tests on
|
|
# aws, scylla and local.
|
|
def is_local_java(dynamodbstreams):
|
|
# no good way to check, but local server runs on a Jetty,
|
|
# so check for that.
|
|
url = dynamodbstreams.meta.endpoint_url
|
|
try:
|
|
urllib.request.urlopen(url)
|
|
except URLError as e:
|
|
if hasattr(e, 'info'):
|
|
return e.info()['Server'].startswith('Jetty')
|
|
return False
|
|
|
|
def ensure_java_server(dynamodbstreams, error='ValidationException'):
|
|
# no good way to check, but local server has a "shell" builtin,
|
|
# so check for that.
|
|
if is_local_java(dynamodbstreams):
|
|
if error != None:
|
|
raise ClientError({'Error': { 'Code' : error }}, '')
|
|
return
|
|
assert False
|
|
|
|
def test_list_streams_create(dynamodb, dynamodbstreams):
|
|
for type in stream_types:
|
|
with create_stream_test_table(dynamodb, StreamViewType=type) as table:
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
|
|
def test_list_streams_alter(dynamodb, dynamodbstreams):
|
|
for type in stream_types:
|
|
with create_stream_test_table(dynamodb, StreamViewType=None) as table:
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': type});
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
|
|
def test_list_streams_paged(dynamodb, dynamodbstreams):
|
|
# There is no reason to run this test for all stream types - we have
|
|
# other tests for creating tables with all stream types, and for using
|
|
# them. This one is only about list_streams.
|
|
for type in stream_types[0:1]:
|
|
with create_stream_test_table(dynamodb, StreamViewType=type) as table1:
|
|
with create_stream_test_table(dynamodb, StreamViewType=type) as table2:
|
|
wait_for_active_stream(dynamodbstreams, table1)
|
|
wait_for_active_stream(dynamodbstreams, table2)
|
|
streams = dynamodbstreams.list_streams(Limit=1)
|
|
assert streams
|
|
assert streams.get('Streams')
|
|
assert streams.get('LastEvaluatedStreamArn')
|
|
tables = [ table1.name, table2.name ]
|
|
while True:
|
|
for s in streams['Streams']:
|
|
name = s['TableName']
|
|
if name in tables: tables.remove(name)
|
|
if not tables:
|
|
break
|
|
streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])
|
|
|
|
# ListStreams with paging should be able correctly return a full list of
|
|
# pre-existing streams even if additional tables were added between pages
|
|
# and caused Scylla's hash table of tables to be reorganized.
|
|
# Reproduces something similar to what we saw in #12601. Unfortunately to
|
|
# reproduce the failure, we needed to create a fairly large number of tables
|
|
# below to reproduce the situation that the internal hash table got
|
|
# reshuffled. It's also not exactly the situation of issue #12601 - there
|
|
# we suspect the hash table got different order for different pages for
|
|
# other reasons - not because of added tables.
|
|
def test_list_streams_paged_with_new_table(dynamodb, dynamodbstreams):
|
|
N1 = 4 # Number of tables to create initially
|
|
LIMIT = 2 # Number of tables out of N1 to list in the first page
|
|
N2 = 30 # Number of additional tables to create later
|
|
N1_tables = []
|
|
with ExitStack() as created_tables:
|
|
for i in range(N1):
|
|
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
N1_tables.append(table.name)
|
|
streams = dynamodbstreams.list_streams(Limit=LIMIT)
|
|
assert streams
|
|
assert 'Streams' in streams
|
|
assert len(streams['Streams']) == LIMIT
|
|
first_tables = [s['TableName'] for s in streams['Streams']]
|
|
last_arn = streams['LastEvaluatedStreamArn']
|
|
for i in range(N2):
|
|
table = created_tables.enter_context(create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY'))
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
# Get the rest of the streams (no limit, assuming we don't have
|
|
# a huge number of streams)
|
|
streams = dynamodbstreams.list_streams(ExclusiveStartStreamArn=last_arn)
|
|
assert 'Streams' in streams
|
|
tables = first_tables + [s['TableName'] for s in streams['Streams']]
|
|
# Each of the N1 tables must have been returned from the paged
|
|
# iteration, and only once. The tables in N2 may or may not be
|
|
# returned. Tables not related to this test may also be returned.
|
|
for t in N1_tables:
|
|
assert tables.count(t) == 1
|
|
|
|
def test_list_streams_zero_limit(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
dynamodbstreams.list_streams(Limit=0)
|
|
|
|
# The DynamoDB documentation for ListStreams says that for Limit, "the upper
|
|
# limit is 100.". Indeed, if we try 101, we get a ValidationException.
|
|
# There is no reason why Alternator must implement exactly the same limit,
|
|
# but supporting a huge number (below we test 100,000) is not a good idea
|
|
# because theoretically (if we have a huge number of tables...) it can result
|
|
# in a huge response, which we don't want to allow.
|
|
@pytest.mark.xfail(reason="no upper limit for Limit")
|
|
def test_list_streams_too_high_limit(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
dynamodbstreams.list_streams(Limit=100000)
|
|
|
|
def test_create_streams_wrong_type(dynamodb, dynamodbstreams, test_table):
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
# should throw
|
|
test_table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'Fisk'});
|
|
# just in case the test fails, disable stream again
|
|
test_table.update(StreamSpecification={'StreamEnabled': False});
|
|
|
|
def test_list_streams_empty(dynamodb, dynamodbstreams, test_table):
|
|
streams = dynamodbstreams.list_streams(TableName=test_table.name)
|
|
assert 'Streams' in streams
|
|
assert not streams['Streams'] # empty
|
|
|
|
def test_list_streams_with_nonexistent_last_stream(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
streams = dynamodbstreams.list_streams(TableName=table.name, ExclusiveStartStreamArn='kossaapaaasdafsdaasdasdasdasasdasfadfadfasdasdas')
|
|
assert 'Streams' in streams
|
|
assert not streams['Streams'] # empty
|
|
# local java dynamodb does _not_ raise validation error for
|
|
# malformed stream arn here. verify
|
|
ensure_java_server(dynamodbstreams)
|
|
|
|
def test_describe_stream(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
arn = streams['Streams'][0]['StreamArn'];
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
assert desc;
|
|
assert desc.get('StreamDescription')
|
|
assert desc['StreamDescription']['StreamArn'] == arn
|
|
assert desc['StreamDescription']['StreamStatus'] != 'DISABLED'
|
|
assert desc['StreamDescription']['StreamViewType'] == 'KEYS_ONLY'
|
|
assert desc['StreamDescription']['TableName'] == table.name
|
|
assert desc['StreamDescription'].get('Shards')
|
|
assert desc['StreamDescription']['Shards'][0].get('ShardId')
|
|
assert desc['StreamDescription']['Shards'][0].get('SequenceNumberRange')
|
|
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange'].get('StartingSequenceNumber')
|
|
# We don't know what the sequence number is supposed to be, but the
|
|
# DynamoDB documentation requires that it contains only numeric
|
|
# characters and some libraries rely on this. Reproduces issue #7158:
|
|
assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'].isdecimal()
|
|
|
|
@pytest.mark.xfail(reason="alternator does not have creation time on streams")
|
|
def test_describe_stream_create_time(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
arn = streams['Streams'][0]['StreamArn'];
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
assert desc;
|
|
assert desc.get('StreamDescription')
|
|
# note these are non-required attributes
|
|
assert 'CreationRequestDateTime' in desc['StreamDescription']
|
|
|
|
def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):
|
|
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
|
|
dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')
|
|
|
|
def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
arn = streams['Streams'][0]['StreamArn'];
|
|
try:
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId='zzzzzzzzzzzzzzzzzzzzzzzzsfasdgagadfadfgagkjsdfsfsdjfjks')
|
|
assert not desc['StreamDescription']['Shards']
|
|
except:
|
|
# local java throws here. real does not.
|
|
ensure_java_server(dynamodbstreams, error=None)
|
|
|
|
def test_get_shard_iterator(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
arn = streams['Streams'][0]['StreamArn'];
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
|
|
shard_id = desc['StreamDescription']['Shards'][0]['ShardId'];
|
|
seq = desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'];
|
|
|
|
# spec says shard_id must be 65 chars or less
|
|
assert len(shard_id) <= 65
|
|
|
|
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
|
|
iter = dynamodbstreams.get_shard_iterator(
|
|
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
|
|
)
|
|
assert iter.get('ShardIterator')
|
|
|
|
for type in ['TRIM_HORIZON', 'LATEST']:
|
|
iter = dynamodbstreams.get_shard_iterator(
|
|
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
|
|
)
|
|
assert iter.get('ShardIterator')
|
|
|
|
for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']:
|
|
# must have seq in these modes
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(
|
|
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type
|
|
)
|
|
|
|
for type in ['TRIM_HORIZON', 'LATEST']:
|
|
# should not set "seq" in these modes
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(
|
|
StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
|
|
)
|
|
|
|
# bad arn
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(
|
|
StreamArn='sdfadsfsdfsdgdfsgsfdabadfbabdadsfsdfsdfsdfsdfsdfsdfdfdssdffbdfdf', ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq
|
|
)
|
|
# bad shard id
|
|
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
|
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId='semprinidfaasdasfsdvacsdcfsdsvsdvsdvsdvsdvsdv',
|
|
ShardIteratorType='LATEST'
|
|
)
|
|
# bad iter type
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
|
|
ShardIteratorType='bulle', SequenceNumber=seq
|
|
)
|
|
# bad seq
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id,
|
|
ShardIteratorType='LATEST', SequenceNumber='sdfsafglldfngjdafnasdflgnaldklkafdsgklnasdlv'
|
|
)
|
|
|
|
def test_get_shard_iterator_for_nonexistent_stream(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
shards = desc['StreamDescription']['Shards']
|
|
with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):
|
|
dynamodbstreams.get_shard_iterator(
|
|
StreamArn='sdfadfsddafgdafsgjnadflkgnalngalsdfnlkasnlkasdfasdfasf', ShardId=shards[0]['ShardId'], ShardIteratorType='LATEST'
|
|
)
|
|
|
|
def test_get_shard_iterator_for_nonexistent_shard(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
arn = streams['Streams'][0]['StreamArn'];
|
|
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
|
dynamodbstreams.get_shard_iterator(
|
|
StreamArn=arn, ShardId='adfasdasdasdasdasdasdasdasdasasdasd', ShardIteratorType='LATEST'
|
|
)
|
|
|
|
def test_get_records(dynamodb, dynamodbstreams):
|
|
# TODO: add tests for storage/transactionable variations and global/local index
|
|
with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES') as table:
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
|
|
|
p = 'piglet'
|
|
c = 'ninja'
|
|
val = 'lucifers'
|
|
val2 = 'flowers'
|
|
table.put_item(Item={'p': p, 'c': c, 'a1': val, 'a2': val2})
|
|
|
|
nval = 'semprini'
|
|
nval2 = 'nudge'
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
AttributeUpdates={ 'a1': {'Value': nval, 'Action': 'PUT'},
|
|
'a2': {'Value': nval2, 'Action': 'PUT'}
|
|
})
|
|
|
|
has_insert = False
|
|
|
|
# in truth, we should sleep already here, since at least scylla
|
|
# will not be able to produce any stream content until
|
|
# ~30s after insert/update (confidence iterval)
|
|
# but it is useful to see a working null-iteration as well, so
|
|
# lets go already.
|
|
while True:
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
iterators = []
|
|
|
|
while True:
|
|
shards = desc['StreamDescription']['Shards']
|
|
|
|
for shard in shards:
|
|
shard_id = shard['ShardId']
|
|
start = shard['SequenceNumberRange']['StartingSequenceNumber']
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']
|
|
iterators.append(iter)
|
|
|
|
last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")
|
|
if not last_shard:
|
|
break
|
|
|
|
desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=last_shard)
|
|
|
|
next_iterators = []
|
|
while iterators:
|
|
iter = iterators.pop(0)
|
|
response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
|
|
if 'NextShardIterator' in response:
|
|
next_iterators.append(response['NextShardIterator'])
|
|
|
|
records = response.get('Records')
|
|
# print("Query {} -> {}".format(iter, records))
|
|
if records:
|
|
for record in records:
|
|
# print("Record: {}".format(record))
|
|
type = record['eventName']
|
|
dynamodb = record['dynamodb']
|
|
keys = dynamodb['Keys']
|
|
|
|
assert keys.get('p')
|
|
assert keys.get('c')
|
|
assert keys['p'].get('S')
|
|
assert keys['p']['S'] == p
|
|
assert keys['c'].get('S')
|
|
assert keys['c']['S'] == c
|
|
|
|
if type == 'MODIFY' or type == 'INSERT':
|
|
assert dynamodb.get('NewImage')
|
|
newimage = dynamodb['NewImage'];
|
|
assert newimage.get('a1')
|
|
assert newimage.get('a2')
|
|
|
|
if type == 'INSERT' or (type == 'MODIFY' and not has_insert):
|
|
assert newimage['a1']['S'] == val
|
|
assert newimage['a2']['S'] == val2
|
|
has_insert = True
|
|
continue
|
|
if type == 'MODIFY':
|
|
assert has_insert
|
|
assert newimage['a1']['S'] == nval
|
|
assert newimage['a2']['S'] == nval2
|
|
assert dynamodb.get('OldImage')
|
|
oldimage = dynamodb['OldImage'];
|
|
assert oldimage.get('a1')
|
|
assert oldimage.get('a2')
|
|
assert oldimage['a1']['S'] == val
|
|
assert oldimage['a2']['S'] == val2
|
|
return
|
|
print("Not done. Sleep 10s...")
|
|
time.sleep(10)
|
|
iterators = next_iterators
|
|
|
|
def test_get_records_nonexistent_iterator(dynamodbstreams):
|
|
with pytest.raises(ClientError, match='ValidationException'):
|
|
dynamodbstreams.get_records(ShardIterator='sdfsdfsgagaddafgagasgasgasdfasdfasdfasdfasdgasdasdgasdg', Limit=1000)
|
|
|
|
##############################################################################
|
|
|
|
# Fixtures for creating a table with a stream enabled with one of the allowed
|
|
# StreamViewType settings (KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES).
|
|
# Unfortunately changing the StreamViewType setting of an existing stream is
|
|
# not allowed (see test_streams_change_type), and while removing and re-adding
|
|
# a stream is possible, it is very slow. So we create four different fixtures
|
|
# with the four different StreamViewType settings for these four fixtures.
|
|
|
|
@contextmanager
|
|
def create_table_ss(dynamodb, dynamodbstreams, type):
|
|
table = create_test_table(dynamodb,
|
|
Tags=TAGS,
|
|
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
|
|
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }],
|
|
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
|
|
yield table, arn
|
|
table.delete()
|
|
|
|
def create_table_sss_lsi(dynamodb, dynamodbstreams, type):
|
|
table = create_test_table(dynamodb,
|
|
Tags=TAGS,
|
|
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],
|
|
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }, { 'AttributeName': 'a', 'AttributeType': 'S' }],
|
|
LocalSecondaryIndexes=[
|
|
{
|
|
'IndexName': 'a_idx',
|
|
'KeySchema': [
|
|
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{ 'AttributeName': 'a', 'KeyType': 'RANGE' }],
|
|
'Projection': {
|
|
'ProjectionType': 'ALL',
|
|
}
|
|
}
|
|
],
|
|
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
|
|
yield table, arn
|
|
table.delete()
|
|
|
|
def create_table_s_no_ck(dynamodb, dynamodbstreams, type):
|
|
table = create_test_table(dynamodb,
|
|
Tags=TAGS,
|
|
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
|
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }],
|
|
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
|
|
yield table, arn
|
|
table.delete()
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_sss_new_and_old_images_lsi(dynamodb, dynamodbstreams):
|
|
yield from create_table_sss_lsi(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_ss_keys_only(dynamodb, dynamodbstreams):
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as stream:
|
|
yield stream
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_ss_new_image(dynamodb, dynamodbstreams):
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE') as stream:
|
|
yield stream
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_ss_old_image(dynamodb, dynamodbstreams):
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE') as stream:
|
|
yield stream
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
|
yield stream
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_s_no_ck_keys_only(dynamodb, dynamodbstreams):
|
|
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'KEYS_ONLY')
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_s_no_ck_new_image(dynamodb, dynamodbstreams):
|
|
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_IMAGE')
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_s_no_ck_old_image(dynamodb, dynamodbstreams):
|
|
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'OLD_IMAGE')
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_table_s_no_ck_new_and_old_images(dynamodb, dynamodbstreams):
|
|
yield from create_table_s_no_ck(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Test that it is, sadly, not allowed to use UpdateTable on a table which
|
|
# already has a stream enabled to change that stream's StreamViewType.
|
|
# Relates to #6939
|
|
def test_streams_change_type(test_table_ss_keys_only):
|
|
table, arn = test_table_ss_keys_only
|
|
with pytest.raises(ClientError, match='ValidationException.*already'):
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE'});
|
|
# If the above change succeeded (because of issue #6939), switch it back :-)
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
|
|
|
|
# It is not allowed to enable stream for a table that already has a stream,
|
|
# even if of the same type.
|
|
def test_streams_enable_on_enabled(test_table_ss_new_and_old_images):
|
|
table, arn = test_table_ss_new_and_old_images
|
|
with pytest.raises(ClientError, match='ValidationException.*already.*enabled'):
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'NEW_AND_OLD_IMAGES'});
|
|
|
|
# It is not allowed to disbale stream on a table that does not have a stream.
|
|
def test_streams_disable_on_disabled(test_table):
|
|
with pytest.raises(ClientError, match='ValidationException.*stream.*disable'):
|
|
test_table.update(StreamSpecification={'StreamEnabled': False});
|
|
|
|
# Utility function for listing all the shards of the given stream arn.
|
|
# Implemented by multiple calls to DescribeStream, possibly several pages
|
|
# until all the shards are returned. The return of this function should be
|
|
# cached - it is potentially slow, and DynamoDB documentation even states
|
|
# DescribeStream may only be called at a maximum rate of 10 times per second.
|
|
# list_shards() only returns the shard IDs, not the information about the
|
|
# shards' sequence number range, which is also returned by DescribeStream.
|
|
def list_shards(dynamodbstreams, arn):
|
|
# By default DescribeStream lists a limit of 100 shards. For faster
|
|
# tests we reduced the number of shards in the testing setup to
|
|
# 32 (16 vnodes x 2 cpus), see issue #6979, so to still exercise this
|
|
# paging feature, lets use a limit of 10.
|
|
limit = 10
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit)['StreamDescription']
|
|
assert len(response['Shards']) <= limit
|
|
shards = [x['ShardId'] for x in response['Shards']]
|
|
while 'LastEvaluatedShardId' in response:
|
|
# 7409 kinesis ignores LastEvaluatedShardId and just looks at last shard
|
|
assert shards[-1] == response['LastEvaluatedShardId']
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit,
|
|
ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
|
|
assert len(response['Shards']) <= limit
|
|
shards.extend([x['ShardId'] for x in response['Shards']])
|
|
|
|
print('Number of shards in stream: {}'.format(len(shards)))
|
|
assert len(set(shards)) == len(shards)
|
|
# 7409 - kinesis required shards to be in lexical order.
|
|
# verify.
|
|
assert shards == sorted(shards)
|
|
|
|
# special test: ensure we get nothing more if we ask for starting at the last
|
|
# of the last
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn,
|
|
ExclusiveStartShardId=shards[-1])['StreamDescription']
|
|
assert len(response['Shards']) == 0
|
|
|
|
return shards
|
|
|
|
# Utility function for getting shard iterators starting at "LATEST" for
|
|
# all the shards of the given stream arn.
|
|
# On DynamoDB (but not Alternator), LATEST has a time slack: it may point to
|
|
# a position slightly before the true end of the stream, so writes from a
|
|
# previous test that reused the same table can appear to be "in the future"
|
|
# relative to the returned iterators and therefore show up unexpectedly in
|
|
# the current test's reads. To work around this we drain any already-pending
|
|
# records from the iterators before returning them, so the caller is
|
|
# guaranteed to see only events written *after* this call returns.
|
|
def latest_iterators(dynamodbstreams, arn):
|
|
iterators = []
|
|
for shard_id in list_shards(dynamodbstreams, arn):
|
|
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'])
|
|
assert len(set(iterators)) == len(iterators)
|
|
# Drain any records that are already visible at the LATEST position.
|
|
# We keep fetching until no more records are returned, which means that
|
|
# the stream is caught up. This drain loop is not necessary on Alternator,
|
|
# and needlessly slows the test down.
|
|
if not dynamodbstreams._endpoint.host.endswith('.amazonaws.com'):
|
|
return iterators
|
|
while True:
|
|
events = []
|
|
iterators = fetch_more(dynamodbstreams, iterators, events)
|
|
if events == []:
|
|
return iterators
|
|
|
|
# Similar to latest_iterators(), just also returns the shard id which produced
|
|
# each iterator.
|
|
def shards_and_latest_iterators(dynamodbstreams, arn):
|
|
shards_and_iterators = []
|
|
for shard_id in list_shards(dynamodbstreams, arn):
|
|
shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))
|
|
# Drain pre-existing records from the iterators, for the same reason as
|
|
# explained in latest_iterators() above.
|
|
if not dynamodbstreams._endpoint.host.endswith('.amazonaws.com'):
|
|
return shards_and_iterators
|
|
while True:
|
|
events = []
|
|
new_iters = fetch_more(dynamodbstreams, [it for _, it in shards_and_iterators], events)
|
|
shards_and_iterators = list(zip([sh for sh, _ in shards_and_iterators], new_iters))
|
|
if events == []:
|
|
return shards_and_iterators
|
|
|
|
# Utility function for fetching more content from the stream (given its
|
|
# array of iterators) into an "output" array. Call repeatedly to get more
|
|
# content - the function returns a new array of iterators which should be
|
|
# used to replace the input list of iterators.
|
|
# Note that the order of the updates is guaranteed for the same partition,
|
|
# but cannot be trusted for *different* partitions.
|
|
def fetch_more(dynamodbstreams, iterators, output):
|
|
new_iterators = []
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'NextShardIterator' in response:
|
|
new_iterators.append(response['NextShardIterator'])
|
|
output.extend(response['Records'])
|
|
assert len(set(new_iterators)) == len(new_iterators)
|
|
return new_iterators
|
|
|
|
def print_events(expected_events, output, failed_at=None):
|
|
if failed_at is None:
|
|
print(f'compare_events: timeouted')
|
|
else:
|
|
print(f'compare_events: failed at output event {failed_at}')
|
|
for index, event in enumerate(expected_events):
|
|
expected_type, expected_key, expected_old_image, expected_new_image = event
|
|
print(f'expected event {index}: type={expected_type}, key={expected_key}, old_image={expected_old_image}, new_image={expected_new_image}')
|
|
for index, event in enumerate(output):
|
|
print(f'output event {index}: type={event["eventName"]}, key={event["dynamodb"]["Keys"]}, old_image={event["dynamodb"].get("OldImage")}, new_image={event["dynamodb"].get("NewImage")}')
|
|
|
|
# Utility function for comparing "output" as fetched by fetch_more(), to a list
|
|
# expected_events, each of which looks like:
|
|
# [type, keys, old_image, new_image]
|
|
# where type is REMOVE, INSERT or MODIFY.
|
|
# The "mode" parameter specifies which StreamViewType mode (KEYS_ONLY,
|
|
# OLD_IMAGE, NEW_IMAGE, NEW_AND_OLD_IMAGES) was supposedly used to generate
|
|
# "output". This mode dictates what we can compare - e.g., in KEYS_ONLY mode
|
|
# the compare_events() function ignores the the old and new image in
|
|
# expected_events.
|
|
# The "expected_region" parameter specifies the value of awsRegion field.
|
|
# compare_events() throws an exception immediately if it sees an unexpected
|
|
# event, but if some of the expected events are just missing in the "output",
|
|
# it only returns false - suggesting maybe the caller needs to try again
|
|
# later - maybe more output is coming.
|
|
# Note that the order of events is only guaranteed (and therefore compared)
|
|
# inside a single partition.
|
|
def compare_events(expected_events, output, mode, expected_region):
|
|
# The order of expected_events is only meaningful inside a partition, so
|
|
# let's convert it into a map indexed by partition key.
|
|
expected_events_map = {}
|
|
for event in expected_events:
|
|
expected_type, expected_key, expected_old_image, expected_new_image = event
|
|
# For simplicity, we actually use the entire key, not just the partition
|
|
# key. We only lose a bit of testing power we didn't plan to test anyway
|
|
# (that events for different items in the same partition are ordered).
|
|
key = freeze(expected_key)
|
|
if not key in expected_events_map:
|
|
expected_events_map[key] = []
|
|
expected_events_map[key].append(event)
|
|
# Iterate over the events in output. An event for a certain key needs to
|
|
# be the *first* remaining event for this key in expected_events_map (and
|
|
# then we remove this matched even from expected_events_map)
|
|
for e, event in enumerate(output):
|
|
try:
|
|
# In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to
|
|
# a *different* value - 'scylladb:alternator'. Issue #6931.
|
|
assert 'eventSource' in event
|
|
# For lack of a direct equivalent of a region, Alternator provides the
|
|
# DC name instead. Reproduces #6931.
|
|
assert 'awsRegion' in event
|
|
assert event['awsRegion'] == expected_region
|
|
# Reproduces #6931.
|
|
assert 'eventVersion' in event
|
|
assert event['eventVersion'] in ['1.0', '1.1']
|
|
# Check that eventID appears, but can't check much on what it is.
|
|
assert 'eventID' in event
|
|
op = event['eventName']
|
|
record = event['dynamodb']
|
|
# record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we
|
|
# want to deserialize it to match our expected_events content.
|
|
deserializer = TypeDeserializer()
|
|
key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}
|
|
expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)
|
|
assert op == expected_type
|
|
assert record['StreamViewType'] == mode
|
|
# We don't know what ApproximateCreationDateTime should be, but we do
|
|
# know it needs to be a timestamp - there is conflicting documentation
|
|
# in what format (ISO 8601?). In any case, boto3 parses this timestamp
|
|
# for us, so we can't check it here, beyond checking it exists.
|
|
assert 'ApproximateCreationDateTime' in record
|
|
# We don't know what SequenceNumber is supposed to be, but the DynamoDB
|
|
# documentation requires that it contains only numeric characters and
|
|
# some libraries rely on this. This reproduces issue #7158:
|
|
assert 'SequenceNumber' in record
|
|
assert record['SequenceNumber'].isdecimal()
|
|
# Alternator doesn't set the SizeBytes member. Issue #6931.
|
|
#assert 'SizeBytes' in record
|
|
if mode == 'KEYS_ONLY':
|
|
assert not 'NewImage' in record
|
|
assert not 'OldImage' in record
|
|
elif mode == 'NEW_IMAGE':
|
|
assert not 'OldImage' in record
|
|
if expected_new_image == None:
|
|
assert not 'NewImage' in record
|
|
else:
|
|
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
|
assert expected_new_image == new_image
|
|
elif mode == 'OLD_IMAGE':
|
|
assert not 'NewImage' in record
|
|
if expected_old_image == None:
|
|
assert not 'OldImage' in record
|
|
else:
|
|
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
|
assert expected_old_image == old_image
|
|
elif mode == 'NEW_AND_OLD_IMAGES':
|
|
if expected_new_image == None:
|
|
assert not 'NewImage' in record
|
|
else:
|
|
new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}
|
|
assert expected_new_image == new_image
|
|
if expected_old_image == None:
|
|
assert not 'OldImage' in record
|
|
else:
|
|
old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}
|
|
assert expected_old_image == old_image
|
|
else:
|
|
pytest.fail('cannot happen')
|
|
except AssertionError:
|
|
print_events(expected_events, output, failed_at=e)
|
|
raise
|
|
|
|
# After the above loop, expected_events_map should remain empty arrays.
|
|
# If it isn't, one of the expected events did not yet happen. Return False.
|
|
for entry in expected_events_map.values():
|
|
if len(entry) > 0:
|
|
return False
|
|
return True
|
|
|
|
def fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode):
|
|
# Updating the stream is asynchronous. Moreover, it may even be delayed
|
|
# artificially (see Alternator's alternator_streams_time_window_s config).
|
|
# So if compare_events() reports the stream data is missing some of the
|
|
# expected events (by returning False), we need to retry it for some time.
|
|
# Note that compare_events() throws if the stream data contains *wrong*
|
|
# (unexpected) data, so even failing tests usually finish reasonably
|
|
# fast - depending on the alternator_streams_time_window_s parameter.
|
|
# This is optimization is important to keep *failing* tests reasonably
|
|
# fast and not have to wait until the following arbitrary timeout.
|
|
timeout = time.time() + 20
|
|
region = get_region(dynamodb)
|
|
output = []
|
|
while time.time() < timeout:
|
|
iterators = fetch_more(dynamodbstreams, iterators, output)
|
|
print("after fetch_more number expected_events={}, output={}".format(len(expected_events), len(output)))
|
|
if compare_events(expected_events, output, mode, region):
|
|
# success!
|
|
return
|
|
time.sleep(0.5)
|
|
# If we're still here, the last compare_events returned false.
|
|
print_events(expected_events, output)
|
|
pytest.fail('missing events in output: {}'.format(output))
|
|
|
|
# Convenience function used to implement several tests below. It runs a given
|
|
# function "updatefunc" which is supposed to do some updates to the table
|
|
# and also return an expected_events list. do_test() then fetches the streams
|
|
# data and compares it to the expected_events using compare_events().
|
|
def do_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode):
|
|
table, arn = test_table_ss_stream
|
|
iterators = latest_iterators(dynamodbstreams, arn)
|
|
p = random_string()
|
|
c = random_string()
|
|
expected_events = updatefunc(table, p, c)
|
|
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
|
|
|
|
def do_batch_test(test_table_ss_stream, dynamodb, dynamodbstreams, updatefunc, mode, item_count = 3):
|
|
p = [random_string() for _ in range(item_count)]
|
|
c = [f"ck_{i}" for i in range(item_count)]
|
|
|
|
table, arn = test_table_ss_stream
|
|
iterators = latest_iterators(dynamodbstreams, arn)
|
|
expected_events = updatefunc(table, p, c)
|
|
fetch_and_compare_events(dynamodb, dynamodbstreams, iterators, expected_events, mode)
|
|
|
|
# Test a single PutItem of a new item. Should result in a single INSERT
|
|
# event. Reproduces #6930.
|
|
def test_streams_putitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.put_item(Item={'p': p, 'c': c, 'x': 2})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
return events
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
|
|
# Replacing an item should result in a MODIFY, rather than REMOVE and MODIFY.
|
|
# Moreover, the old item should be visible in OldImage. Reproduces #6930.
|
|
def test_streams_putitem_new_items_override_old(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.put_item(Item={'p': p, 'c': c, 'a': 1})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': 1}])
|
|
table.put_item(Item={'p': p, 'c': c, 'b': 2})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 1}, {'p': p, 'c': c, 'b': 2}])
|
|
table.put_item(Item={'p': p, 'c': c, 'a': 3, 'b': 4})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 2}, {'p': p, 'c': c, 'a': 3, 'b': 4}])
|
|
table.put_item(Item={'p': p, 'c': c})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': 3, 'b': 4}, {'p': p, 'c': c}])
|
|
return events
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Same as test_streams_putitem_new_items_overrides_old, but for a replaced
|
|
# column is used in LSI. Reproduces #6930.
|
|
def test_streams_putitem_new_item_overrides_old_lsi(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.put_item(Item={'p': p, 'c': c, 'a': '1'})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'a': '1'}])
|
|
table.put_item(Item={'p': p, 'c': c})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'a': '1'}, {'p': p, 'c': c}])
|
|
assert len(full_scan(table, IndexName='a_idx')) == 0
|
|
return events
|
|
do_test(test_table_sss_new_and_old_images_lsi, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Test PutItem streams in a table with no clustering key.
|
|
def test_streams_putitem_no_ck_new_items_override_old(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, _):
|
|
events = []
|
|
table.put_item(Item={'p': p, 'a': 1})
|
|
events.append(['INSERT', {'p': p}, None, {'p': p, 'a': 1}])
|
|
table.put_item(Item={'p': p, 'b': 2})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 1}, {'p': p, 'b': 2}])
|
|
table.put_item(Item={'p': p, 'a': 3, 'b': 4})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 2}, {'p': p, 'a': 3, 'b': 4}])
|
|
table.put_item(Item={'p': p})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'a': 3, 'b': 4}, {'p': p}])
|
|
return events
|
|
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# In test_streams_new_and_old_images among other things we test that a
|
|
# DeleteItem operation gives the correct old image. That test used a table
|
|
# which has a sort key, the following test checks this (old image in DeleteItem
|
|
# case) for a table that does NOT have a sort key. Alternator uses different
|
|
# types of tombstones for deletions from tables with a CK (row tombstone) vs.
|
|
# with a partition key only (partition tombstone) - and those may be handled
|
|
# differently by CDC. In issue #26382 - which this test reproduces - we
|
|
# discovered that our implementation doesn't select a preimage for partition
|
|
# deletions, resulting in a missing OldImage.
|
|
def test_streams_deleteitem_old_image_no_ck(test_table_s_no_ck_new_and_old_images, test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, _):
|
|
events = []
|
|
table.update_item(Key={'p': p},
|
|
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
|
|
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 1}])
|
|
table.delete_item(Key={'p': p})
|
|
events.append(['REMOVE', {'p': p}, {'p': p, 'x': 1}, None])
|
|
return events
|
|
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Test a single UpdateItem. Should result in a single INSERT event.
|
|
# Reproduces #6918.
|
|
def test_streams_updateitem_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
return events
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
|
|
# Test OLD_IMAGE using UpdateItem. Verify that the OLD_IMAGE indeed includes,
|
|
# as needed, the entire old item and not just the modified columns.
|
|
# Reproduces issue #6935
|
|
def test_streams_updateitem_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
|
|
# Note: The "None" as OldImage here tests that the OldImage should be
|
|
# missing because the item didn't exist. This reproduces issue #6933.
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
|
|
return events
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
|
|
# Above we verified that if an item did not previously exist, the OLD_IMAGE
|
|
# would be missing, but if the item did previously exist, OLD_IMAGE should
|
|
# be present and must include the key. Here we confirm the special case the
|
|
# latter case - the case of a pre-existing *empty* item, which has just the
|
|
# key - in this case since the item did exist, OLD_IMAGE should be returned -
|
|
# and include just the key. This is a special case of reproducing #6935 -
|
|
# the first patch for this issue failed in this special case.
|
|
def test_streams_updateitem_old_image_empty_item(test_table_ss_old_image, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
# Create an *empty* item, with nothing except a key:
|
|
table.update_item(Key={'p': p, 'c': c})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c}])
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
|
|
# Note that OLD_IMAGE should be present and be the empty item,
|
|
# with just a key, not entirely missing.
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}])
|
|
return events
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
|
|
# Test that OLD_IMAGE indeed includes the entire old item and not just the
|
|
# modified attributes, in the special case of attributes which are a key of
|
|
# a secondary index.
|
|
# The unique thing about this case is that as currently implemented,
|
|
# secondary-index key attributes are real Scylla columns, contrasting with
|
|
# other attributes which are just elements of a map. And our CDC
|
|
# implementation treats those cases differently - when a map is modified
|
|
# the preimage includes the entire content of the map, but for regular
|
|
# columns they are only included in the preimage if they change.
|
|
# Currently fails in Alternator because the item's key is missing in
|
|
# OldImage (#6935) and the LSI key is also missing (#7030).
|
|
@pytest.fixture(scope="module")
|
|
def test_table_ss_old_image_and_lsi(dynamodb, dynamodbstreams):
|
|
table = create_test_table(dynamodb,
|
|
Tags=TAGS,
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'}],
|
|
AttributeDefinitions=[
|
|
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'k', 'AttributeType': 'S' }],
|
|
LocalSecondaryIndexes=[{
|
|
'IndexName': 'index',
|
|
'KeySchema': [
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'k', 'KeyType': 'RANGE'}],
|
|
'Projection': { 'ProjectionType': 'ALL' }
|
|
}],
|
|
StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE' })
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)
|
|
yield table, arn
|
|
table.delete()
|
|
|
|
def test_streams_updateitem_old_image_lsi(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :x, k = :k',
|
|
ExpressionAttributeValues={':x': 2, ':k': 'dog'})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}])
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
|
|
# In issue #7030, the 'k' value was missing from the OldImage.
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}, {'p': p, 'c': c, 'x': 2, 'k': 'dog', 'y': 3}])
|
|
return events
|
|
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
|
|
# This test is the same as the previous (test_streams_updateitem_old_image_lsi)
|
|
# except that the *old* value of the LSI key k is missing. Since PR 8568, CDC
|
|
# adds a special deleted$k marker for a missing column in the preimage, and
|
|
# this test verifies that Alternator Streams doesn't put this extra marker in
|
|
# its output.
|
|
def test_streams_updateitem_old_image_lsi_missing_column(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
# Note that we do *not* set the "k" attribute (the LSI key)
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :x',
|
|
ExpressionAttributeValues={':x': 2})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})
|
|
# "k" should be missing from OldImage, because it wasn't set earlier.
|
|
# Verify the events contain only the expected attributes - not some
|
|
# internal markers like "deleted$k".
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
|
|
return events
|
|
do_test(test_table_ss_old_image_and_lsi, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
|
|
# In general, DynamoDB doesn't emit events if an operation is a nop. This test
|
|
# test verifies that UpdateItem doesn't result in a log row when the updated
|
|
# item exists and it is identical to the old one.
|
|
# Corresponding tests for PutItem are included in tests based on do_updates_1.
|
|
# The case for a PutItem within a BatchWriteItem is tested in
|
|
# test_streams_batch_overwrite_identical. Reproduces #6918.
|
|
def test_streams_updateitem_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :x',
|
|
ExpressionAttributeValues={':x': 2})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
# Overwriting the old item with an identical new item shouldn't produce
|
|
# any events.
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='ADD x :x',
|
|
ExpressionAttributeValues={':x': 0})
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :x',
|
|
ExpressionAttributeValues={':x': 2})
|
|
return events
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# The above test_streams_updateitem_identical tested that if we UpdateItem an
|
|
# item changing an attribute to a value identical to the one it already has,
|
|
# no event is generated on the stream. In this test we change an attribute
|
|
# value from one map to another which are really the same value, but have
|
|
# different serialization (the map's elements are ordered differently).
|
|
# Since the value is nevertheless the same, here too we expect not to see
|
|
# a change event in the stream. Reproduces issue #27375.
|
|
@pytest.mark.xfail(reason="issue #27375")
|
|
def test_streams_updateitem_equal_but_not_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
# We use manual_request() to let us be sure that we pass the JSON
|
|
# values to the server in exactly the order we want to, without the
|
|
# Python SDK possibly rearranging them.
|
|
# Set x to be a map: {'dog': 1, 'cat': 2, 'mouse': 3}.
|
|
payload = '''{
|
|
"TableName": "''' + table.name + '''",
|
|
"Key": {"p": {"S": "''' + p + '''"}, "c": {"S": "''' + c + '''"}},
|
|
"UpdateExpression": "SET x = :x",
|
|
"ExpressionAttributeValues": {":x": %s}
|
|
}'''
|
|
manual_request(dynamodb, 'UpdateItem',
|
|
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': {'dog': 1, 'cat': 2, 'mouse': 3}}])
|
|
# Overwriting x an identical map item shouldn't produce any events,
|
|
# so we won't add anything to the "events" list:
|
|
manual_request(dynamodb, 'UpdateItem',
|
|
payload % '{"M": {"dog": {"N": "1"}, "cat": {"N": "2"}, "mouse": {"N": "3"}}}')
|
|
# Now try to overwrite x with a map that has the same elements in
|
|
# a different order. These two values, despite not being identical
|
|
# in JSON form, should be considered equal and again no event should
|
|
# be generated.
|
|
manual_request(dynamodb, 'UpdateItem',
|
|
payload % '{"M": {"cat": {"N": "2"}, "dog": {"N": "1"}, "mouse": {"N": "3"}}}')
|
|
return events
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Tests that deleting a missing attribute with UpdateItem doesn't generate a
|
|
# REMOVE event. Other cases are tested in test_streams_batch_delete_missing
|
|
# and in tests based on do_updates_1. Reproduces #6918.
|
|
def test_streams_updateitem_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
# Create an item
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
AttributeUpdates={'x': {'Value': 1, 'Action': 'PUT'}})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}])
|
|
# Deleting a missing attribute shouldn't produce any event for both
|
|
# AttributeUpdates and UpdateExpression.
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
AttributeUpdates={'y': {'Action': 'DELETE'}})
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='REMOVE z')
|
|
return events
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Tests similar to the above tests for OLD_IMAGE, just for NEW_IMAGE mode.
|
|
# Verify that the NEW_IMAGE includes the entire old item (including the key),
|
|
# that deleting the item results in a missing NEW_IMAGE, and that setting the
|
|
# item to be empty has a different result - a NEW_IMAGE with just a key.
|
|
# Reproduces issue #7107.
|
|
def test_streams_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
|
|
# Confirm that when adding one attribute "x", the NewImage contains both
|
|
# the new x, and the key columns (p and c).
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
# Confirm that when adding just attribute "y", the NewImage will contain
|
|
# all the attributes, including the old "x":
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
|
|
# Confirm that when deleting the columns x and y, the NewImage becomes
|
|
# empty - but still exists and contains the key columns,
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='REMOVE x, y')
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
|
|
# Confirm that deleting the item results in a missing NewImage:
|
|
table.delete_item(Key={'p': p, 'c': c})
|
|
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c}, None])
|
|
return events
|
|
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
|
|
# Test similar to the above test for NEW_IMAGE corner cases, but here for
|
|
# NEW_AND_OLD_IMAGES mode.
|
|
# Although it is likely that if both OLD_IMAGE and NEW_IMAGE work correctly then
|
|
# so will the combined NEW_AND_OLD_IMAGES mode, it is still possible that the
|
|
# implementation of the combined mode has unique bugs, so it is worth testing
|
|
# it separately.
|
|
# Reproduces issue #7107.
|
|
def test_streams_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, p, c):
|
|
events = []
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})
|
|
# The item doesn't yet exist, so OldImage is missing.
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
# Confirm that when adding just attribute "y", the NewImage will contain
|
|
# all the attributes, including the old "x". Also, OldImage contains the
|
|
# key attributes, not just "x":
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])
|
|
# Confirm that when deleting the attributes x and y, the NewImage becomes
|
|
# empty - but still exists and contains the key attributes:
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='REMOVE x, y')
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])
|
|
# Confirm that when adding an attribute z to the empty item, OldItem is
|
|
# not missing - it just contains only the key attributes:
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET z = :val1', ExpressionAttributeValues={':val1': 4})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}])
|
|
# Confirm that deleting the item results in a missing NewImage:
|
|
table.delete_item(Key={'p': p, 'c': c})
|
|
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}, None])
|
|
return events
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Test that when a stream shard has no data to read, GetRecords returns an
|
|
# empty Records array - not a missing one. Reproduces issue #6926.
|
|
def test_streams_no_records(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
# Get just one shard - any shard - and its LATEST iterator. Because it's
|
|
# LATEST, there will be no data to read from this iterator.
|
|
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
|
|
shard_id = shard['ShardId']
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert 'NextShardIterator' in response or 'EndingSequenceNumber' in shard['SequenceNumberRange']
|
|
assert 'Records' in response
|
|
# We expect Records to be empty - there is no data at the LATEST iterator.
|
|
assert response['Records'] == []
|
|
|
|
# Test that after fetching the last result from a shard, we don't get it
|
|
# yet again. Reproduces issue #6942.
|
|
def test_streams_last_result(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
iterators = latest_iterators(dynamodbstreams, arn)
|
|
# Do an UpdateItem operation that is expected to leave one event in the
|
|
# stream.
|
|
table.update_item(Key={'p': random_string(), 'c': random_string()},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Eventually (we may need to retry this for a while), *one* of the
|
|
# stream shards will return one event:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and response['Records'] != []:
|
|
# Found the shard with the data! Test that it only has
|
|
# one event and that if we try to read again, we don't
|
|
# get more data (this was issue #6942).
|
|
assert len(response['Records']) == 1
|
|
assert 'NextShardIterator' in response
|
|
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
|
|
assert response['Records'] == []
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# In test_streams_last_result above we tested that there is no further events
|
|
# after reading the only one. In this test we verify that if we *do* perform
|
|
# another change on the same key, we do get another event and it happens on the
|
|
# *same* shard.
|
|
def test_streams_another_result(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
iterators = latest_iterators(dynamodbstreams, arn)
|
|
# Do an UpdateItem operation that is expected to leave one event in the
|
|
# stream.
|
|
p = random_string()
|
|
c = random_string()
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Eventually, *one* of the stream shards will return one event:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and response['Records'] != []:
|
|
# Finally found the shard reporting the changes to this key
|
|
assert len(response['Records']) == 1
|
|
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
|
|
assert 'NextShardIterator' in response
|
|
iter = response['NextShardIterator']
|
|
# Found the shard with the data. It only has one event so if
|
|
# we try to read again, we find nothing (this is the same as
|
|
# what test_streams_last_result tests).
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert response['Records'] == []
|
|
assert 'NextShardIterator' in response
|
|
iter = response['NextShardIterator']
|
|
# Do another UpdateItem operation to the same key, so it is
|
|
# expected to write to the *same* shard:
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val2', ExpressionAttributeValues={':val2': 7})
|
|
# Again we may need to wait for the event to appear on the stream:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and response['Records'] != []:
|
|
assert len(response['Records']) == 1
|
|
assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}
|
|
assert 'NextShardIterator' in response
|
|
# The test is done, successfully.
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# Test the SequenceNumber attribute returned for stream events, and the
|
|
# "AT_SEQUENCE_NUMBER" iterator that can be used to re-read from the same
|
|
# event again given its saved "sequence number".
|
|
def test_streams_at_sequence_number(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
|
|
# Do an UpdateItem operation that is expected to leave one event in the
|
|
# stream.
|
|
p = random_string()
|
|
c = random_string()
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Eventually, *one* of the stream shards will return the one event:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for (shard_id, iter) in shards_and_iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and response['Records'] != []:
|
|
# Finally found the shard reporting the changes to this key:
|
|
assert len(response['Records']) == 1
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert 'NextShardIterator' in response
|
|
sequence_number = response['Records'][0]['dynamodb']['SequenceNumber']
|
|
# Found the shard with the data. It only has one event so if
|
|
# we try to read again, we find nothing (this is the same as
|
|
# what test_streams_last_result tests).
|
|
iter = response['NextShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert response['Records'] == []
|
|
assert 'NextShardIterator' in response
|
|
# If we use the SequenceNumber of the first event to create an
|
|
# AT_SEQUENCE_NUMBER iterator, we can read the same event again.
|
|
# We don't need a loop and a timeout, because this event is already
|
|
# available.
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
|
|
SequenceNumber=sequence_number)['ShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert 'Records' in response
|
|
assert len(response['Records']) == 1
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# Test the SequenceNumber attribute returned for stream events, and the
|
|
# "AFTER_SEQUENCE_NUMBER" iterator that can be used to re-read *after* the same
|
|
# event again given its saved "sequence number".
|
|
def test_streams_after_sequence_number(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
|
|
# Do two UpdateItem operations to the same key, that are expected to leave
|
|
# two events in the stream.
|
|
p = random_string()
|
|
c = random_string()
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Eventually, *one* of the stream shards will return the two events:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for (shard_id, iter) in shards_and_iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and len(response['Records']) == 2:
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
|
|
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
|
|
|
|
# #7424 - AWS sdk assumes sequence numbers can be compared
|
|
# as bigints, and are monotonically growing.
|
|
assert int(sequence_number_1) < int(sequence_number_2)
|
|
|
|
# If we use the SequenceNumber of the first event to create an
|
|
# AFTER_SEQUENCE_NUMBER iterator, we can read the second event
|
|
# (only) again. We don't need a loop and a timeout, because this
|
|
# event is already available.
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='AFTER_SEQUENCE_NUMBER',
|
|
SequenceNumber=sequence_number_1)['ShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert 'Records' in response
|
|
assert len(response['Records']) == 1
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_2
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# Test the "TRIM_HORIZON" iterator, which can be used to re-read *all* the
|
|
# previously-read events of the stream shard again.
|
|
def test_streams_trim_horizon(dynamodb, dynamodbstreams):
|
|
# This test needs a brand-new stream, without old data from other
|
|
# tests, so we can't reuse the test_table_ss_keys_only fixture.
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
|
|
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
|
|
# Do two UpdateItem operations to the same key, that are expected to leave
|
|
# two events in the stream.
|
|
p = random_string()
|
|
c = random_string()
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Eventually, *one* of the stream shards will return the two events:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for (shard_id, iter) in shards_and_iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and len(response['Records']) == 2:
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']
|
|
sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']
|
|
# If we use the TRIM_HORIZON iterator, we should receive the
|
|
# same two events again, in the same order.
|
|
# Note that we assume that the fixture gave us a brand new
|
|
# stream, with no old events saved from other tests. If we
|
|
# couldn't assume this, this test would need to become much
|
|
# more complex, and would need to read from this shard until
|
|
# we find the two events we are looking for.
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert 'Records' in response
|
|
assert len(response['Records']) == 2
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_1
|
|
assert response['Records'][1]['dynamodb']['SequenceNumber'] == sequence_number_2
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# Test the StartingSequenceNumber information returned by DescribeStream.
|
|
# The DynamoDB documentation explains that StartingSequenceNumber is
|
|
# "The first sequence number for the stream records contained within a shard"
|
|
# which suggests that is should be the sequence number of the first event
|
|
# that can actually be read from the shard. However, in Alternator the
|
|
# StartingSequenceNumber may be *before* any of the events on the shard -
|
|
# it is the time that the shard was created. In Alternator, it is possible
|
|
# that no event appeared on this shard for some time after it was created,
|
|
# and it is also possible that after more than 24 hours, some of the original
|
|
# items that existed in the shard have expired. Nevertheless, we believe
|
|
# that the important thing is that reading a shard starting at
|
|
# StartingSequenceNumber will result in reading all the available items -
|
|
# similar to how TRIM_HORIZON works. This is what the following test verifies.
|
|
def test_streams_starting_sequence_number(dynamodb, dynamodbstreams):
|
|
# This test needs a brand-new stream, without old data from other
|
|
# tests, so we can't reuse the test_table_ss_keys_only fixture.
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
|
|
# Do two UpdateItem operations to the same key, that are expected to leave
|
|
# two events in the stream.
|
|
p = random_string()
|
|
c = random_string()
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
# Get for all the stream shards the iterator starting at the shard's
|
|
# StartingSequenceNumber:
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn)
|
|
shards = response['StreamDescription']['Shards']
|
|
while 'LastEvaluatedShardId' in response['StreamDescription']:
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn,
|
|
ExclusiveStartShardId=response['StreamDescription']['LastEvaluatedShardId'])
|
|
shards.extend(response['StreamDescription']['Shards'])
|
|
iterators = []
|
|
for shard in shards:
|
|
shard_id = shard['ShardId']
|
|
start = shard['SequenceNumberRange']['StartingSequenceNumber']
|
|
assert start.isdecimal()
|
|
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',
|
|
SequenceNumber=start)['ShardIterator'])
|
|
|
|
# Eventually, *one* of the stream shards will return the two events:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and len(response['Records']) == 2:
|
|
assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
|
return
|
|
time.sleep(0.5)
|
|
|
|
pytest.fail("timed out")
|
|
|
|
# Above we tested some specific operations in small tests aimed to reproduce
|
|
# a specific bug, in the following tests we do a all the different operations,
|
|
# PutItem, DeleteItem, BatchWriteItem and UpdateItem, and check the resulting
|
|
# stream for correctness.
|
|
# The following tests focus on multiple operations on the *same* item. Those
|
|
# should appear in the stream in the correct order.
|
|
# Reproduces #6918, #6930.
|
|
def do_updates_1(table, p, c):
|
|
events = []
|
|
# a first put_item appears as an INSERT event. Note also empty old_image.
|
|
table.put_item(Item={'p': p, 'c': c, 'x': 2})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])
|
|
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
|
|
table.put_item(Item={'p': p, 'c': c, 'x': 2})
|
|
# a second put_item of the *same* key and different value, appears as a MODIFY event
|
|
table.put_item(Item={'p': p, 'c': c, 'y': 3})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'y': 3}])
|
|
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
|
|
table.delete_item(Key={'p': p, 'c': c})
|
|
events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}, None])
|
|
# deleting a non-existent item doesn't appear in the log at all.
|
|
table.delete_item(Key={'p': p, 'c': c})
|
|
# If update_item creates an item, the event is INSERT as well.
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET b = :val1',
|
|
ExpressionAttributeValues={':val1': 4})
|
|
events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'b': 4}])
|
|
# If update_item modifies the item, note how old and new image includes both old and new columns
|
|
table.update_item(Key={'p': p, 'c': c},
|
|
UpdateExpression='SET x = :val1',
|
|
ExpressionAttributeValues={':val1': 5})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4}, {'p': p, 'c': c, 'b': 4, 'x': 5}])
|
|
# TODO: incredibly, if we uncomment the "REMOVE b" update below, it will be
|
|
# completely missing from the DynamoDB stream - the test continues to
|
|
# pass even though we didn't add another expected event, and even though
|
|
# the preimage in the following expected event includes this "b" we will
|
|
# remove. I couldn't reproduce this apparent DynamoDB bug in a smaller test.
|
|
#table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE b')
|
|
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
|
|
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})
|
|
events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])
|
|
return events
|
|
|
|
# The tested events are the same as in do_updates_1, but the table doesn't have
|
|
# a clustering key.
|
|
def do_updates_1_no_ck(table, p, _):
|
|
events = []
|
|
# a first put_item appears as an INSERT event. Note also empty old_image.
|
|
table.put_item(Item={'p': p, 'x': 2})
|
|
events.append(['INSERT', {'p': p}, None, {'p': p, 'x': 2}])
|
|
# a second put_item of the *same* key and same value, doesn't appear in the log at all!
|
|
table.put_item(Item={'p': p, 'x': 2})
|
|
# a second put_item of the *same* key and different value, appears as a MODIFY event
|
|
table.put_item(Item={'p': p, 'y': 3})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'x': 2}, {'p': p, 'y': 3}])
|
|
# deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.
|
|
table.delete_item(Key={'p': p})
|
|
events.append(['REMOVE', {'p': p}, {'p': p, 'y': 3}, None])
|
|
# deleting a non-existent item doesn't appear in the log at all.
|
|
table.delete_item(Key={'p': p})
|
|
# If update_item creates an item, the event is INSERT as well.
|
|
table.update_item(Key={'p': p},
|
|
UpdateExpression='SET b = :val1',
|
|
ExpressionAttributeValues={':val1': 4})
|
|
events.append(['INSERT', {'p': p}, None, {'p': p, 'b': 4}])
|
|
# If update_item modifies the item, note how old and new image includes both old and new columns
|
|
table.update_item(Key={'p': p},
|
|
UpdateExpression='SET x = :val1',
|
|
ExpressionAttributeValues={':val1': 5})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4}, {'p': p, 'b': 4, 'x': 5}])
|
|
# Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.
|
|
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'x': 5}}}]})
|
|
events.append(['MODIFY', {'p': p}, {'p': p, 'b': 4, 'x': 5}, {'p': p, 'x': 5}])
|
|
return events
|
|
|
|
def test_streams_1_keys_only(test_table_ss_keys_only, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates_1, 'KEYS_ONLY')
|
|
|
|
def test_streams_1_new_image(test_table_ss_new_image, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates_1, 'NEW_IMAGE')
|
|
|
|
def test_streams_1_old_image(test_table_ss_old_image, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates_1, 'OLD_IMAGE')
|
|
|
|
def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')
|
|
|
|
def test_streams_1_no_ck_keys_only(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_s_no_ck_keys_only, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'KEYS_ONLY')
|
|
|
|
def test_streams_1_no_ck_new_image(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_s_no_ck_new_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_IMAGE')
|
|
|
|
def test_streams_1_no_ck_old_image(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_s_no_ck_old_image, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'OLD_IMAGE')
|
|
|
|
def test_streams_1_no_ck_new_and_old_images(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams):
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_test(test_table_s_no_ck_new_and_old_images, dynamodb, dynamodbstreams, do_updates_1_no_ck, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# Tests that a DeleteItem within a BatchWriteItem that tries to remove a
|
|
# missing item doesn't generate a REMOVE event.
|
|
# Reproduces #6918.
|
|
def test_streams_batch_delete_missing(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, ps, cs):
|
|
# Deleting items that don't exist shouldn't produce any events.
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'DeleteRequest': {'Key': {'p': p, 'c': c}}} for p, c in zip(ps, cs)]
|
|
})
|
|
return []
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# This test verifies that a PutItem within a BatchWriteItem generates no event
|
|
# when it replaces an item with an identical one. This no-op behaviour is also
|
|
# tested for a standard PutItem (see tests based on do_updates_1), and an
|
|
# UpdateItem (see test_streams_updateitem_overwrite_identical). Reproduces
|
|
# #6918.
|
|
def test_streams_batch_overwrite_identical(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
# Batch PutItem identical items
|
|
def do_updates(table, ps, cs):
|
|
# Emit a separate event for each item in the batch.
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
|
|
})
|
|
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': i}] for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
|
|
# Overwriting with identical items shouldn't produce any events
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': i}}} for p, c, i in zip(ps, cs, range(1, len(ps) + 1))]
|
|
})
|
|
return events
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
def test_streams_batch_overwrite_different(test_table_ss_keys_only, test_table_ss_new_image, test_table_ss_old_image, test_table_ss_new_and_old_images, dynamodb, dynamodbstreams):
|
|
def do_updates(table, ps, cs):
|
|
# Emit a separate event for each item in the batch.
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 1}}} for p, c in zip(ps, cs)]
|
|
})
|
|
events = [['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 1}] for p, c in zip(ps, cs)]
|
|
# ... but overwriting with different items should be reported as MODIFY.
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 2}}} for p, c in zip(ps, cs)]
|
|
})
|
|
events.extend([['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 1}, {'p': p, 'c': c, 'x': 2}] for p, c in zip(ps, cs)])
|
|
return events
|
|
with scylla_config_temporary(dynamodb, 'alternator_streams_increased_compatibility', 'true', nop=is_aws(dynamodb)):
|
|
do_batch_test(test_table_ss_keys_only, dynamodb, dynamodbstreams, do_updates, 'KEYS_ONLY')
|
|
do_batch_test(test_table_ss_new_image, dynamodb, dynamodbstreams, do_updates, 'NEW_IMAGE')
|
|
do_batch_test(test_table_ss_old_image, dynamodb, dynamodbstreams, do_updates, 'OLD_IMAGE')
|
|
do_batch_test(test_table_ss_new_and_old_images, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# A fixture which creates a test table with a stream enabled, and returns a
|
|
# bunch of interesting information collected from the CreateTable response.
|
|
# This fixture is module-scoped - it can be shared by multiple tests below,
|
|
# because we are not going to actually use or change this stream, we will
|
|
# just do multiple tests on its setup.
|
|
@pytest.fixture(scope="module")
|
|
def test_table_stream_with_result(dynamodb, dynamodbstreams):
|
|
tablename = unique_table_name()
|
|
result = dynamodb.meta.client.create_table(TableName=tablename,
|
|
Tags=TAGS,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
|
|
],
|
|
AttributeDefinitions=[
|
|
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
|
])
|
|
waiter = dynamodb.meta.client.get_waiter('table_exists')
|
|
waiter.config.delay = 1
|
|
waiter.config.max_attempts = 200
|
|
waiter.wait(TableName=tablename)
|
|
table = dynamodb.Table(tablename)
|
|
yield result, table
|
|
while True:
|
|
try:
|
|
table.delete()
|
|
return
|
|
except ClientError as ce:
|
|
# if the table has a stream currently being created we cannot
|
|
# delete the table immediately. Again, only with real dynamo
|
|
if ce.response['Error']['Code'] == 'ResourceInUseException':
|
|
print('Could not delete table yet. Sleeping 5s.')
|
|
time.sleep(5)
|
|
continue;
|
|
raise
|
|
|
|
# Wait for a table to return to "ACTIVE" status. We need to call this after
|
|
# doing an UpdateTable to a table - because before this wait finishes we are
|
|
# not allowed to update the same table again or delete it.
|
|
def wait_for_status_active(table):
|
|
for i in range(60):
|
|
desc = table.meta.client.describe_table(TableName=table.name)
|
|
if desc['Table']['TableStatus'] == 'ACTIVE':
|
|
return
|
|
time.sleep(1)
|
|
pytest.fail("wait_for_status_active() timed out")
|
|
|
|
# Test that in a table with Streams enabled, LatestStreamArn is returned
|
|
# by CreateTable, DescribeTable and UpdateTable, and is the same ARN as
|
|
# returned by ListStreams. Reproduces issue #7157.
|
|
def test_latest_stream_arn(test_table_stream_with_result, dynamodbstreams):
|
|
(result, table) = test_table_stream_with_result
|
|
assert 'LatestStreamArn' in result['TableDescription']
|
|
arn_in_create_table = result['TableDescription']['LatestStreamArn']
|
|
# Check that ListStreams returns the same stream ARN as returned
|
|
# by the original CreateTable
|
|
(arn_in_list_streams, label) = wait_for_active_stream(dynamodbstreams, table)
|
|
assert arn_in_create_table == arn_in_list_streams
|
|
# Check that DescribeTable also includes the same LatestStreamArn:
|
|
desc = table.meta.client.describe_table(TableName=table.name)['Table']
|
|
assert 'LatestStreamArn' in desc
|
|
assert desc['LatestStreamArn'] == arn_in_create_table
|
|
# Check that UpdateTable also includes the same LatestStreamArn.
|
|
# The "update" changes nothing (it just sets BillingMode to what it was).
|
|
desc = table.meta.client.update_table(TableName=table.name,
|
|
BillingMode='PAY_PER_REQUEST')['TableDescription']
|
|
assert desc['LatestStreamArn'] == arn_in_create_table
|
|
wait_for_status_active(table)
|
|
|
|
# Test that in a table with Streams enabled, LatestStreamLabel is returned
|
|
# by CreateTable, DescribeTable and UpdateTable, and is the same "label" as
|
|
# returned by ListStreams. Reproduces issue #7162.
|
|
def test_latest_stream_label(test_table_stream_with_result, dynamodbstreams):
|
|
(result, table) = test_table_stream_with_result
|
|
assert 'LatestStreamLabel' in result['TableDescription']
|
|
label_in_create_table = result['TableDescription']['LatestStreamLabel']
|
|
# Check that ListStreams returns the same stream label as returned
|
|
# by the original CreateTable
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
|
assert label_in_create_table == label
|
|
# Check that DescribeTable also includes the same LatestStreamLabel:
|
|
desc = table.meta.client.describe_table(TableName=table.name)['Table']
|
|
assert 'LatestStreamLabel' in desc
|
|
assert desc['LatestStreamLabel'] == label_in_create_table
|
|
# Check that UpdateTable also includes the same LatestStreamLabel.
|
|
# The "update" changes nothing (it just sets BillingMode to what it was).
|
|
desc = table.meta.client.update_table(TableName=table.name,
|
|
BillingMode='PAY_PER_REQUEST')['TableDescription']
|
|
assert desc['LatestStreamLabel'] == label_in_create_table
|
|
wait_for_status_active(table)
|
|
|
|
# Test that in a table with Streams enabled, StreamSpecification is returned
|
|
# by CreateTable, DescribeTable and UpdateTable. Reproduces issue #7163.
|
|
def test_stream_specification(test_table_stream_with_result, dynamodbstreams):
|
|
# StreamSpecification as set in test_table_stream_with_result:
|
|
stream_specification = {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}
|
|
(result, table) = test_table_stream_with_result
|
|
assert 'StreamSpecification' in result['TableDescription']
|
|
assert stream_specification == result['TableDescription']['StreamSpecification']
|
|
# Check that DescribeTable also includes the same StreamSpecification:
|
|
desc = table.meta.client.describe_table(TableName=table.name)['Table']
|
|
assert 'StreamSpecification' in desc
|
|
assert stream_specification == desc['StreamSpecification']
|
|
# Check that UpdateTable also includes the same StreamSpecification.
|
|
# The "update" changes nothing (it just sets BillingMode to what it was).
|
|
desc = table.meta.client.update_table(TableName=table.name,
|
|
BillingMode='PAY_PER_REQUEST')['TableDescription']
|
|
assert stream_specification == desc['StreamSpecification']
|
|
wait_for_status_active(table)
|
|
|
|
# The following test checks the behavior of *closed* shards.
|
|
# We achieve a closed shard by disabling the stream - the DynamoDB
|
|
# documentation states that "If you disable a stream, any shards that are
|
|
# still open will be closed. The data in the stream will continue to be
|
|
# readable for 24 hours". In the test we verify that indeed, after a shard
|
|
# is closed, it is still readable with GetRecords (reproduces issue #7239).
|
|
# Moreover, upon reaching the end of data in the shard, the NextShardIterator
|
|
# attribute should say that the end was reached. The DynamoDB documentation
|
|
# says that NextShardIterator should be "set to null" in this case - but it
|
|
# is not clear what "null" means in this context: Should NextShardIterator
|
|
# be missing? Or a "null" JSON type? Or an empty string? This test verifies
|
|
# that the right answer is that NextShardIterator should be *missing*
|
|
# (reproduces issue #7237).
|
|
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
|
|
def test_streams_closed_read(dynamodb, dynamodbstreams):
|
|
# This test can't use the shared table test_table_ss_keys_only,
|
|
# because it wants to disable streaming, so let's create a new table:
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
|
|
shards_and_iterators = shards_and_latest_iterators(dynamodbstreams, arn)
|
|
# Do an UpdateItem operation that is expected to leave one event in the
|
|
# stream.
|
|
table.update_item(Key={'p': random_string(), 'c': random_string()},
|
|
UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})
|
|
disable_stream(dynamodbstreams, table)
|
|
|
|
# Even after streaming is disabled for the table, we can still read
|
|
# from the earlier stream (it is guaranteed to work for 24 hours).
|
|
# The iterators we got earlier should still be fully usable, and
|
|
# eventually *one* of the stream shards will return one event:
|
|
timeout = time.time() + 15
|
|
while time.time() < timeout:
|
|
for (shard_id, iter) in shards_and_iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and response['Records'] != []:
|
|
# Found the shard with the data! Test that it only has
|
|
# one event. NextShardIterator should either be missing now,
|
|
# indicating that it is a closed shard (DynamoDB does this),
|
|
# or, it may (and currently does in Alternator) return another
|
|
# and reading from *that* iterator should then tell us that
|
|
# we reached the end of the shard (i.e., zero results and
|
|
# missing NextShardIterator).
|
|
assert len(response['Records']) == 1
|
|
if 'NextShardIterator' in response:
|
|
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
|
|
assert len(response['Records']) == 0
|
|
assert not 'NextShardIterator' in response
|
|
# Until now we verified that we can read the closed shard
|
|
# using an old iterator. Let's test now that the closed
|
|
# shard id is also still valid, and a new iterator can be
|
|
# created for it, and the old data can be read from it:
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
assert len(response['Records']) == 1
|
|
return
|
|
time.sleep(0.5)
|
|
pytest.fail("timed out")
|
|
|
|
# In the above test (test_streams_closed_read) we used a disabled stream as
|
|
# a means to generate a closed shard, and tested the behavior of that closed
|
|
# shard. In the following test, we do more extensive testing on the the
|
|
# behavior of a disabled stream and verify that it is sill usable (for 24
|
|
# hours), reproducing issue #7239: The disabled stream's ARN should still be
|
|
# listed for the table, this ARN should continue to work, listing the
|
|
# stream's shards should give an indication that they are all closed - but
|
|
# all these shards should still be readable.
|
|
@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")
|
|
def test_streams_disabled_stream(dynamodb, dynamodbstreams):
|
|
# This test can't use the shared table test_table_ss_keys_only,
|
|
# because it wants to disable streaming, so let's create a new table:
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY') as (table, arn):
|
|
iterators = latest_iterators(dynamodbstreams, arn)
|
|
# Do an UpdateItem operation that is expected to leave one event in the
|
|
# stream.
|
|
table.update_item(Key={'p': random_string(), 'c': random_string()},
|
|
UpdateExpression='SET x = :x', ExpressionAttributeValues={':x': 5})
|
|
|
|
# Wait for this one update to become available in the stream before we
|
|
# disable the stream. Otherwise, theoretically (although unlikely in
|
|
# practice) we may disable the stream before the update was saved to it.
|
|
timeout = time.time() + 15
|
|
found = False
|
|
while time.time() < timeout and not found:
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response and len(response['Records']) > 0:
|
|
found = True
|
|
break
|
|
time.sleep(0.5)
|
|
assert found
|
|
|
|
disable_stream(dynamodbstreams, table)
|
|
|
|
# Check that the stream ARN which we previously got for the disabled
|
|
# stream is still listed by ListStreams
|
|
arns = [stream['StreamArn'] for stream in dynamodbstreams.list_streams(TableName=table.name)['Streams']]
|
|
assert arn in arns
|
|
|
|
# DescribeStream on the disabled stream still works and lists its shards.
|
|
# All these shards are listed as being closed (i.e., should have
|
|
# EndingSequenceNumber). The basic details of the stream (e.g., the view
|
|
# type) are available and the status of the stream is DISABLED.
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
|
|
assert response['StreamStatus'] == 'DISABLED'
|
|
assert response['StreamViewType'] == 'KEYS_ONLY'
|
|
assert response['TableName'] == table.name
|
|
shards_info = response['Shards']
|
|
while 'LastEvaluatedShardId' in response:
|
|
response = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
|
|
assert response['StreamStatus'] == 'DISABLED'
|
|
assert response['StreamViewType'] == 'KEYS_ONLY'
|
|
assert response['TableName'] == table.name
|
|
shards_info.extend(response['Shards'])
|
|
print('Number of shards in stream: {}'.format(len(shards_info)))
|
|
for shard in shards_info:
|
|
assert 'EndingSequenceNumber' in shard['SequenceNumberRange']
|
|
assert shard['SequenceNumberRange']['EndingSequenceNumber'].isdecimal()
|
|
|
|
# We can get TRIM_HORIZON iterators for all these shards, to read all
|
|
# the old data they still have (this data should be saved for 24 hours
|
|
# after the stream was disabled)
|
|
iterators = []
|
|
for shard in shards_info:
|
|
iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
|
ShardId=shard['ShardId'], ShardIteratorType='TRIM_HORIZON')['ShardIterator'])
|
|
|
|
# We can read the one change we did in one of these iterators. The data
|
|
# should be available immediately - no need for retries with timeout.
|
|
nrecords = 0
|
|
for iter in iterators:
|
|
response = dynamodbstreams.get_records(ShardIterator=iter)
|
|
if 'Records' in response:
|
|
nrecords += len(response['Records'])
|
|
# The shard is closed, so NextShardIterator should either be missing
|
|
# now, indicating that it is a closed shard (DynamoDB does this),
|
|
# or, it may (and currently does in Alternator) return an iterator
|
|
# and reading from *that* iterator should then tell us that
|
|
# we reached the end of the shard (i.e., zero results and
|
|
# missing NextShardIterator).
|
|
if 'NextShardIterator' in response:
|
|
response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])
|
|
assert len(response['Records']) == 0
|
|
assert not 'NextShardIterator' in response
|
|
assert nrecords == 1
|
|
|
|
# When streams are enabled for a table, we get a unique ARN which should be
|
|
# unique but not change unless streams are eventually disabled for this table.
|
|
# If this ARN changes unexpectedly, it can confuse existing readers who are
|
|
# still using this to read from the stream. We (incorrectly) suspected in
|
|
# issue #12601 that changes to the version of the schema lead to a change of
|
|
# the ARN. In this test we show that it doesn't happen.
|
|
def test_stream_arn_unchanging(dynamodb, dynamodbstreams):
|
|
with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:
|
|
(arn, label) = wait_for_active_stream(dynamodbstreams, table)
|
|
# Change a tag on the table. This changes its schema.
|
|
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
|
|
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'animal', 'Value': 'dog' }])
|
|
# The change in the table's schema should not change its stream ARN
|
|
streams = dynamodbstreams.list_streams(TableName=table.name)
|
|
assert 'Streams' in streams
|
|
assert len(streams['Streams']) == 1
|
|
assert streams['Streams'][0]['StreamArn'] == arn
|
|
|
|
# Enabling a stream shouldn't cause any extra table to appear in ListTables.
|
|
# In issue #19911, enabling streams on a table called xyz caused the name
|
|
# "xyz_scylla_cdc_log" to appear in ListTables. The following test creates
|
|
# a table with a long unique name, and ensures that only one table containing
|
|
# this name as a substring is listed.
|
|
# In test_gsi.py and test_lsi.py we have similar tests for GSI and LSI.
|
|
# Reproduces #19911
|
|
def test_stream_list_tables(dynamodb):
|
|
with new_test_table(dynamodb,
|
|
Tags=TAGS,
|
|
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
|
) as table:
|
|
# Check that the test table is listed by ListTable, but its long
|
|
# and unique name (created by unique_table_name()) isn't a proper
|
|
# substring of any other table's name.
|
|
tables = list_tables(dynamodb)
|
|
assert table.name in tables
|
|
for listed_name in tables:
|
|
if table.name != listed_name:
|
|
assert table.name not in listed_name
|
|
|
|
# The DynamoDB documentation for GetRecords says that "GetRecords can retrieve
|
|
# a maximum of 1 MB of data or 1000 stream records, whichever comes first.",
|
|
# and that for Limit, "the upper limit is 1000". Indeed, if we try Limit=1001,
|
|
# we get a ValidationException. There is no reason why Alternator must
|
|
# implement exactly the same maximum, but since it's documented, there is
|
|
# no reason not to. In any case, some maximum is needed unless we make
|
|
# sure the relevant code (executor::get_records()) has preemption points -
|
|
# and currently it does not. Reproduces issue #23534
|
|
def test_get_records_too_high_limit(test_table_ss_keys_only, dynamodbstreams):
|
|
table, arn = test_table_ss_keys_only
|
|
# Get just one shard - any shard - and its LATEST iterator. Because it's
|
|
# LATEST, there will be no data to read from this iterator, but we don't
|
|
# care, we just want to run the GetRecords request, we don't care what
|
|
# is returned.
|
|
shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]
|
|
shard_id = shard['ShardId']
|
|
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']
|
|
# Limit=1000 should be allowed:
|
|
dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)
|
|
# Limit=1001 should NOT be allowed
|
|
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
|
|
dynamodbstreams.get_records(ShardIterator=iter, Limit=1001)
|
|
# Limit must be >= 0:
|
|
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
|
|
dynamodbstreams.get_records(ShardIterator=iter, Limit=0)
|
|
with pytest.raises(ClientError, match='ValidationException.*[Ll]imit'):
|
|
dynamodbstreams.get_records(ShardIterator=iter, Limit=-1)
|
|
|
|
# padded_name() creates a unique name of given length by taking the
|
|
# output of unique_table_name() and padding it with extra 'x' characters:
|
|
def padded_name(length):
|
|
u = unique_table_name()
|
|
assert length >= len(u)
|
|
return u + 'x'*(length-len(u))
|
|
|
|
# When Alternator enables streams, CDC creates a new table whose name is the
|
|
# same as the existing table with the suffix "_scylla_cdc_log". In issue
|
|
# #24598, we discovered that we could create a table with length 222 but
|
|
# then creating the stream crashed Scylla. We accept that either the table
|
|
# creation or the stream creation can fail if we lower the limits in the
|
|
# future, but it mustn't crash as it did in #24598.
|
|
# We have two versions of this test - one with the stream created with the
|
|
# table, and one with the stream added to the existing table.
|
|
# Reproduces #24598
|
|
def test_stream_table_name_length_222_create(dynamodb):
|
|
try:
|
|
with new_test_table(dynamodb, name=padded_name(222),
|
|
Tags=TAGS,
|
|
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
|
) as table:
|
|
pass
|
|
except ClientError as e:
|
|
# In Alternator, we may decide that 222 is too long to create a table,
|
|
# or too long to create the stream. But if we reached here, at least
|
|
# it didn't crash.
|
|
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
|
|
|
|
# Reproduces #24598
|
|
def test_stream_table_name_length_222_update(dynamodb, dynamodbstreams):
|
|
try:
|
|
with new_test_table(dynamodb, name=padded_name(222),
|
|
Tags=TAGS,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
|
) as table:
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
|
|
# DynamoDB doesn't allow deleting the base table while the stream
|
|
# is in the process of being added
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
except ClientError as e:
|
|
assert 'table name is longer than' in str(e) or 'TableName must be' in str(e)
|
|
|
|
# When the table has a shorter name length, like 192, we should be able to
|
|
# create both the table and streams, with no problems.
|
|
def test_stream_table_name_length_192_create(dynamodb):
|
|
with new_test_table(dynamodb, name=padded_name(192),
|
|
Tags=TAGS,
|
|
StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
|
) as table:
|
|
pass
|
|
|
|
def test_stream_table_name_length_192_update(dynamodb, dynamodbstreams):
|
|
with new_test_table(dynamodb, name=padded_name(192),
|
|
Tags=TAGS,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, ]
|
|
) as table:
|
|
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});
|
|
# DynamoDB doesn't allow deleting the base table while the stream
|
|
# is in the process of being added
|
|
wait_for_active_stream(dynamodbstreams, table)
|
|
|
|
# In earlier tests, we tested the stream events logged for BatchWriteItem,
|
|
# but it was usually a single item in the batch or in do_batch_test(),
|
|
# it was multiple items in different partitions. This test checks the
|
|
# remaining case, of a batch writing multiple items in one partition -
|
|
# and checks that the correct events appear for them on the stream.
|
|
# Turns out we had a bug (#28439) in this case, but *only* in always_use_lwt
|
|
# write isolation mode, which writes all the items in the batch with the
|
|
# same timestamp. The test is parameterized to try all write isolation
|
|
# modes, and reproduces #28439 when it failed only in always_use_lwt mode.
|
|
# This is a Scylla-only test because it checks write isolation modes, which
|
|
# don't exist in DynamoDB.
|
|
@pytest.mark.parametrize('mode', ['only_rmw_uses_lwt', pytest.param('always_use_lwt', marks=pytest.mark.xfail(reason='#28439')), 'unsafe_rmw', 'forbid_rmw'])
|
|
def test_streams_multiple_items_one_partition(dynamodb, dynamodbstreams, scylla_only, mode):
|
|
with create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES') as stream:
|
|
table, stream_arn = stream
|
|
# Set write isolation mode on the table to the chosen "mode":
|
|
table_arn = table.meta.client.describe_table(TableName=table.name)['Table']['TableArn']
|
|
table.meta.client.tag_resource(ResourceArn=table_arn, Tags=[{'Key': 'system:write_isolation', 'Value': mode}])
|
|
# Now try the test, a single BatchWriteItem writing three different
|
|
# items in the same partition p:
|
|
def do_updates(table, p, c):
|
|
cs = [c + '1', c + '2', c + '3']
|
|
table.meta.client.batch_write_item(RequestItems = {
|
|
table.name: [{'PutRequest': {'Item': {'p': p, 'c': cc, 'x': cc}}} for cc in cs]})
|
|
return [['INSERT', {'p': p, 'c': cc}, None, {'p': p, 'c': cc, 'x': cc}] for cc in cs]
|
|
do_test(stream, dynamodb, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')
|
|
|
|
# TODO: tests on multiple partitions
|
|
# TODO: write a test that disabling the stream and re-enabling it works, but
|
|
# requires the user to wait for the first stream to become DISABLED before
|
|
# creating the new one. Then ListStreams should return the two streams,
|
|
# one DISABLED and one ENABLED? I'm not sure we want or can do this in
|
|
# Alternator.
|
|
# TODO: Can we test shard splitting? (shard splitting
|
|
# requires the user to - periodically or following shards ending - to call
|
|
# DescribeStream again. We don't do this in any of our tests.
|