854 lines
44 KiB
Python
854 lines
44 KiB
Python
# Copyright 2021 ScyllaDB
|
||
#
|
||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||
|
||
# Tests for the Time To Live (TTL) feature for item expiration.
|
||
|
||
import math
|
||
import re
|
||
import time
|
||
from contextlib import contextmanager
|
||
from decimal import Decimal
|
||
|
||
import pytest
|
||
from botocore.exceptions import ClientError
|
||
|
||
from .util import new_test_table, random_string, full_query, unique_table_name, is_aws, client_no_transform, multiset, scylla_config_read
|
||
|
||
# The following fixture is to ensure that Alternator TTL is being tested with both vnodes and tablets.
|
||
# This fixture will run automatically for every test in the module.
|
||
# It sets the TAGS variable in the module’s global namespace to the current parameter value before each test.
|
||
# All tests that use the global TAGS variable will see the correct value, and each test will be run for
|
||
# both values. Thanks to this, the tests will run for both vnodes and tables without the need to change
|
||
# their argument list.
|
||
@pytest.fixture(params=[
|
||
[{'Key': 'system:initial_tablets', 'Value': 'none'}],
|
||
[{'Key': 'system:initial_tablets', 'Value': '0'}],
|
||
], ids=["using vnodes", "using tablets"], autouse=True)
|
||
def tags_param(request):
|
||
# Set TAGS in the global namespace of this module
|
||
global TAGS
|
||
TAGS = request.param
|
||
|
||
# passes_or_raises() is similar to pytest.raises(), except that while raises()
|
||
# expects a certain exception must happen, the new passes_or_raises()
|
||
# expects the code to either pass (not raise), or if it throws, it must
|
||
# throw the specific specified exception.
|
||
@contextmanager
|
||
def passes_or_raises(expected_exception, match=None):
|
||
# Sadly __tracebackhide__=True only drops some of the unhelpful backtrace
|
||
# lines. See https://github.com/pytest-dev/pytest/issues/2057
|
||
__tracebackhide__ = True
|
||
try:
|
||
yield
|
||
# The user's "with" code is running during the yield. If it didn't
|
||
# throw we return from the function - the raises_or_not() passed as
|
||
# the "or not" case.
|
||
return
|
||
except expected_exception as err:
|
||
if match == None or re.search(match, str(err)):
|
||
# The raises_or_not() passed on as the "raises" case
|
||
return
|
||
pytest.fail(f"exception message '{err}' did not match '{match}'")
|
||
except Exception as err:
|
||
pytest.fail(f"Got unexpected exception type {type(err).__name__} instead of {expected_exception.__name__}: {err}")
|
||
|
||
# Some of the TTL tests below can be very slow on DynamoDB, or even in
|
||
# Scylla in some configurations. The following two fixtures allow tests to
|
||
# run when they can be run reasonably quickly, but be skipped otherwise -
|
||
# unless the "--runverslow" option is passed to pytest. This is similar
|
||
# to our @pytest.mark.veryslow, but the latter is a blunter instrument.
|
||
|
||
# The waits_for_expiration fixture indicates that this test waits for expired
|
||
# items to be deleted. This is always very slow on AWS, and whether it is
|
||
# very slow on Scylla or reasonably fast depends on the
|
||
# alternator_ttl_period_in_seconds configuration - test/alternator/run sets
|
||
# it very low, but Scylla may have been run manually.
|
||
@pytest.fixture(scope="module")
|
||
def waits_for_expiration(dynamodb, request):
|
||
if is_aws(dynamodb):
|
||
if request.config.getoption('runveryslow'):
|
||
return
|
||
else:
|
||
pytest.skip('need --runveryslow option to run')
|
||
period = scylla_config_read(dynamodb, 'alternator_ttl_period_in_seconds')
|
||
assert period is not None
|
||
period = float(period)
|
||
if period > 1 and not request.config.getoption('runveryslow'):
|
||
pytest.skip('need --runveryslow option to run')
|
||
|
||
# The veryslow_on_aws says that this test is very slow on AWS, but
|
||
# always reasonably fast on Scylla. If fastness on Scylla requires a
|
||
# specific setting of alternator_ttl_period_in_seconds, don't use this
|
||
# fixture - use the above waits_for_expiration instead.
|
||
@pytest.fixture(scope="module")
|
||
def veryslow_on_aws(dynamodb, request):
|
||
if is_aws(dynamodb) and not request.config.getoption('runveryslow'):
|
||
pytest.skip('need --runveryslow option to run')
|
||
|
||
# Test the DescribeTimeToLive operation on a table where the time-to-live
|
||
# feature was *not* enabled.
|
||
def test_describe_ttl_without_ttl(test_table):
|
||
response = test_table.meta.client.describe_time_to_live(TableName=test_table.name)
|
||
assert 'TimeToLiveDescription' in response
|
||
assert 'TimeToLiveStatus' in response['TimeToLiveDescription']
|
||
assert response['TimeToLiveDescription']['TimeToLiveStatus'] == 'DISABLED'
|
||
assert not 'AttributeName' in response['TimeToLiveDescription']
|
||
|
||
# Test that UpdateTimeToLive can be used to pick an expiration-time attribute
|
||
# and this information becomes available via DescribeTimeToLive
|
||
def test_ttl_enable(dynamodb):
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
client = table.meta.client
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
response = client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
assert 'TimeToLiveSpecification' in response
|
||
assert response['TimeToLiveSpecification'] == ttl_spec
|
||
# Verify that DescribeTimeToLive can recall this setting:
|
||
response = client.describe_time_to_live(TableName=table.name)
|
||
assert 'TimeToLiveDescription' in response
|
||
assert response['TimeToLiveDescription'] == {
|
||
'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'expiration'}
|
||
# Verify that UpdateTimeToLive cannot enable TTL is it is already
|
||
# enabled. A user is not allowed to change the expiration attribute
|
||
# without disabling TTL first, and it's an error even to try to
|
||
# enable TTL with exactly the same attribute as already enabled.
|
||
# (the error message uses slightly different wording in those two
|
||
# cases)
|
||
with pytest.raises(ClientError, match='ValidationException.*(active|enabled)'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
with pytest.raises(ClientError, match='ValidationException.*(active|enabled)'):
|
||
new_ttl_spec = {'AttributeName': 'new', 'Enabled': True}
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=new_ttl_spec)
|
||
|
||
# Test various *wrong* ways of disabling TTL. Although we test here various
|
||
# error cases of how to disable TTL incorrectly, we don't actually check in
|
||
# this test case the successful disabling case, because DynamoDB refuses to
|
||
# disable TTL if it was enabled in the last hour (according to DynamoDB's
|
||
# documentation). We have below a much longer test for the successful TTL
|
||
# disable case.
|
||
def test_ttl_disable_errors(dynamodb):
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
client = table.meta.client
|
||
# We can't disable TTL if it's not already enabled.
|
||
with pytest.raises(ClientError, match='ValidationException.*disabled'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': False})
|
||
# So enable TTL, before disabling it:
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
|
||
response = client.describe_time_to_live(TableName=table.name)
|
||
assert response['TimeToLiveDescription'] == {
|
||
'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'expiration'}
|
||
# To disable TTL, the user must specify the current expiration
|
||
# attribute in the command - you can't not specify it, or specify
|
||
# the wrong one!
|
||
with pytest.raises(ClientError, match='ValidationException.*different'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'dog', 'Enabled': False})
|
||
# Finally disable TTL the right way :-) On DynamoDB this fails
|
||
# because you are not allowed to modify the TTL setting twice in
|
||
# one hour, but in our implementation it can also pass quickly.
|
||
with passes_or_raises(ClientError, match='ValidationException.*multiple times'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': False})
|
||
|
||
# Test *successful* disabling of TTL. This is an extremely slow test on AWS,
|
||
# as DynamoDB refuses to disable TTL if it was enabled in the last half hour
|
||
# (the documentation suggests a full hour, but in practice it seems 30
|
||
# minutes). But on Scylla it is currently almost instantaneous.
|
||
def test_ttl_disable(dynamodb, veryslow_on_aws):
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
client = table.meta.client
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
|
||
assert client.describe_time_to_live(TableName=table.name)['TimeToLiveDescription'] == {
|
||
'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'expiration'}
|
||
start_time = time.time()
|
||
while time.time() < start_time + 4000:
|
||
try:
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': False})
|
||
break
|
||
except ClientError as error:
|
||
# As long as we get "Time to live has been modified multiple
|
||
# times within a fixed interval", we need to try again -
|
||
# the documentation says for a full hour!
|
||
assert 'times' in error.response['Error']['Message']
|
||
print(time.time() - start_time)
|
||
time.sleep(60)
|
||
continue
|
||
assert client.describe_time_to_live(TableName=table.name)['TimeToLiveDescription'] == {
|
||
'TimeToLiveStatus': 'DISABLED'}
|
||
|
||
# Test various errors in the UpdateTimeToLive request.
|
||
def test_update_ttl_errors(dynamodb):
|
||
client = dynamodb.meta.client
|
||
# Can't set TTL on a non-existent table
|
||
nonexistent_table = unique_table_name()
|
||
with pytest.raises(ClientError, match='ResourceNotFoundException'):
|
||
client.update_time_to_live(TableName=nonexistent_table,
|
||
TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
# AttributeName must be between 1 and 255 characters long.
|
||
with pytest.raises(ClientError, match='ValidationException.*length'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'x'*256, 'Enabled': True})
|
||
with pytest.raises(ClientError, match='ValidationException.*length'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': '', 'Enabled': True})
|
||
# Missing mandatory UpdateTimeToLive parameters - AttributeName or Enabled
|
||
with pytest.raises(ClientError, match='ValidationException.*[aA]ttributeName'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'Enabled': True})
|
||
with pytest.raises(ClientError, match='ValidationException.*[eE]nabled'):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'hello'})
|
||
# Wrong types for these mandatory parameters (e.g., string for Enabled)
|
||
# The error type is currently a bit different in Alternator
|
||
# (ValidationException) and in DynamoDB (SerializationException).
|
||
with pytest.raises(ClientError):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'hello', 'Enabled': 'dog'})
|
||
with pytest.raises(ClientError):
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 3, 'Enabled': True})
|
||
|
||
# Basic test that expiration indeed expires items that should be expired,
|
||
# and doesn't expire items which shouldn't be expired.
|
||
# On AWS, this is an extremely slow test - DynamoDB documentation says that
|
||
# expiration may even be delayed for 48 hours. But in practice, at the time
|
||
# of this writing, for tiny tables we see delays of around 10 minutes.
|
||
# The following test is set to always run for "duration" seconds, currently
|
||
# 20 minutes on AWS. During this time, we expect to see the items which should
|
||
# have expired to be expired - and the items that should not have expired
|
||
# should still exist.
|
||
# When running against Scylla configured (for testing purposes) to expire
|
||
# items with very short delays, "duration" can be set much lower so this
|
||
# test will finish in a much more reasonable time.
|
||
@pytest.mark.veryslow
|
||
def test_ttl_expiration(dynamodb):
|
||
duration = 1200 if is_aws(dynamodb) else 10
|
||
# delta is a quarter of the test duration, but no less than one second,
|
||
# and we use it to schedule some expirations a bit after the test starts,
|
||
# not immediately.
|
||
delta = math.ceil(duration / 4)
|
||
assert delta >= 1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
# Insert one expiring item *before* enabling the TTL, to verify that
|
||
# items that already exist when TTL is configured also get handled.
|
||
p0 = random_string()
|
||
table.put_item(Item={'p': p0, 'expiration': int(time.time())-60})
|
||
# Enable TTL, using the attribute "expiration":
|
||
client = table.meta.client
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
response = client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
assert response['TimeToLiveSpecification'] == ttl_spec
|
||
# This item should never expire, it is missing the "expiration"
|
||
# attribute:
|
||
p1 = random_string()
|
||
table.put_item(Item={'p': p1})
|
||
# This item should expire ASAP, as its "expiration" has already
|
||
# passed, one minute ago:
|
||
p2 = random_string()
|
||
table.put_item(Item={'p': p2, 'expiration': int(time.time())-60})
|
||
# This item has an expiration more than 5 years in the past (it is my
|
||
# birth date...), so according to the DynamoDB documentation it should
|
||
# be ignored and p3 should never expire:
|
||
p3 = random_string()
|
||
table.put_item(Item={'p': p3, 'expiration': 162777600})
|
||
# This item has as its expiration delta into the future, which is a
|
||
# small part of the test duration, so should expire by the time the
|
||
# test ends:
|
||
p4 = random_string()
|
||
table.put_item(Item={'p': p4, 'expiration': int(time.time())+delta})
|
||
# This item starts with expiration time delta into the future,
|
||
# but before it expires we will move it again, so it will never expire:
|
||
p5 = random_string()
|
||
table.put_item(Item={'p': p5, 'expiration': int(time.time())+delta})
|
||
# This item has an expiration time two durations into the future, so it
|
||
# will not expire by the time the test ends:
|
||
p6 = random_string()
|
||
table.put_item(Item={'p': p6, 'expiration': int(time.time()+duration*2)})
|
||
# Like p4, this item has an expiration time delta into the future,
|
||
# here the expiration time is wrongly encoded as a string, not a
|
||
# number, so the item should never expire:
|
||
p7 = random_string()
|
||
table.put_item(Item={'p': p7, 'expiration': str(int(time.time())+delta)})
|
||
# Like p2, p8 and p9 also have an already-passed expiration time,
|
||
# and should expire ASAP. However, whereas p2 had a straightforward
|
||
# integer like 12345678 as the expiration time, p8 and p9 have
|
||
# slightly more elaborate numbers: p8 has 1234567e1 and p9 has
|
||
# 12345678.1234. Those formats should be fine, and this test verifies
|
||
# the TTL code's number parsing doesn't get confused (in our original
|
||
# implementation, it did).
|
||
p8 = random_string()
|
||
with client_no_transform(table.meta.client) as client:
|
||
client.put_item(TableName=table.name, Item={'p': {'S': p8}, 'expiration': {'N': str((int(time.time())-60)//10) + "e1"}})
|
||
# Similarly, floating point expiration time like 12345678.1 should
|
||
# also be fine (note that Python's time.time() returns floating point).
|
||
# This item should also be expired ASAP too.
|
||
p9 = random_string()
|
||
print(Decimal(str(time.time()-60)))
|
||
table.put_item(Item={'p': p9, 'expiration': Decimal(str(time.time()-60))})
|
||
|
||
def check_expired():
|
||
# After the delay, p1,p3,p5,p6,p7 should be alive, p0,p2,p4 should not
|
||
return not 'Item' in table.get_item(Key={'p': p0}) \
|
||
and 'Item' in table.get_item(Key={'p': p1}) \
|
||
and not 'Item' in table.get_item(Key={'p': p2}) \
|
||
and 'Item' in table.get_item(Key={'p': p3}) \
|
||
and not 'Item' in table.get_item(Key={'p': p4}) \
|
||
and 'Item' in table.get_item(Key={'p': p5}) \
|
||
and 'Item' in table.get_item(Key={'p': p6}) \
|
||
and 'Item' in table.get_item(Key={'p': p7}) \
|
||
and not 'Item' in table.get_item(Key={'p': p8}) \
|
||
and not 'Item' in table.get_item(Key={'p': p9})
|
||
|
||
# We could have just done time.sleep(duration) here, but in case a
|
||
# user is watching this long test, let's output the status every
|
||
# minute, and it also allows us to test what happens when an item
|
||
# p5's expiration time is continuously pushed back into the future:
|
||
start_time = time.time()
|
||
while time.time() < start_time + duration:
|
||
print(f"--- {int(time.time()-start_time)} seconds")
|
||
if 'Item' in table.get_item(Key={'p': p0}):
|
||
print("p0 alive")
|
||
if 'Item' in table.get_item(Key={'p': p1}):
|
||
print("p1 alive")
|
||
if 'Item' in table.get_item(Key={'p': p2}):
|
||
print("p2 alive")
|
||
if 'Item' in table.get_item(Key={'p': p3}):
|
||
print("p3 alive")
|
||
if 'Item' in table.get_item(Key={'p': p4}):
|
||
print("p4 alive")
|
||
if 'Item' in table.get_item(Key={'p': p5}):
|
||
print("p5 alive")
|
||
if 'Item' in table.get_item(Key={'p': p6}):
|
||
print("p6 alive")
|
||
if 'Item' in table.get_item(Key={'p': p7}):
|
||
print("p7 alive")
|
||
if 'Item' in table.get_item(Key={'p': p8}):
|
||
print("p8 alive")
|
||
if 'Item' in table.get_item(Key={'p': p9}):
|
||
print("p9 alive")
|
||
# Always keep p5's expiration delta into the future
|
||
table.update_item(Key={'p': p5},
|
||
AttributeUpdates={'expiration': {'Value': int(time.time())+delta, 'Action': 'PUT'}})
|
||
if check_expired():
|
||
break
|
||
time.sleep(duration/15.0)
|
||
|
||
assert check_expired()
|
||
|
||
# test_ttl_expiration above used a table with just a hash key. Because the
|
||
# code to *delete* items in a table which also has a range key is subtly
|
||
# different than the code for just a hash key, we want to test the case of
|
||
# such a table as well. This test is simpler than test_ttl_expiration because
|
||
# all we want to check is that the deletion works - not the expiration time
|
||
# parsing and semantics.
|
||
def test_ttl_expiration_with_rangekey(dynamodb, waits_for_expiration):
|
||
# Note that unlike test_ttl_expiration, this test doesn't have a fixed
|
||
# duration - it finishes as soon as the item we expect to be expired
|
||
# is expired.
|
||
max_duration = 1200 if is_aws(dynamodb) else 240
|
||
sleep = 30 if is_aws(dynamodb) else 0.1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'c', 'AttributeType': 'S' }]) as table:
|
||
# Enable TTL, using the attribute "expiration":
|
||
client = table.meta.client
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
response = client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
assert response['TimeToLiveSpecification'] == ttl_spec
|
||
# This item should never expire, it is missing the "expiration"
|
||
# attribute:
|
||
p1 = random_string()
|
||
c1 = random_string()
|
||
table.put_item(Item={'p': p1, 'c': c1})
|
||
# This item should expire ASAP, as its "expiration" has already
|
||
# passed, one minute ago:
|
||
p2 = random_string()
|
||
c2 = random_string()
|
||
table.put_item(Item={'p': p2, 'c': c2, 'expiration': int(time.time())-60})
|
||
start_time = time.time()
|
||
while time.time() < start_time + max_duration:
|
||
if ('Item' in table.get_item(Key={'p': p1, 'c': c1})) and not ('Item' in table.get_item(Key={'p': p2, 'c': c2})):
|
||
# p2 expired, p1 didn't. We're done
|
||
break
|
||
time.sleep(sleep)
|
||
assert 'Item' in table.get_item(Key={'p': p1, 'c': c1})
|
||
assert not 'Item' in table.get_item(Key={'p': p2, 'c': c2})
|
||
|
||
# While it probably makes little sense to do this, the designated
|
||
# expiration-time attribute *may* be the hash or range key attributes.
|
||
# So let's test that this indeed works and items indeed expire
|
||
# Just like test_ttl_expiration() above, these tests are extremely slow
|
||
# on AWS because DynamoDB delays expiration by around 10 minutes.
|
||
def test_ttl_expiration_hash(dynamodb, waits_for_expiration):
|
||
max_duration = 1200 if is_aws(dynamodb) else 240
|
||
sleep = 30 if is_aws(dynamodb) else 0.1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'N' } ]) as table:
|
||
ttl_spec = {'AttributeName': 'p', 'Enabled': True}
|
||
table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
# p1 is in the past, and should be expired. p2 is in the distant
|
||
# future and should not be expired.
|
||
p1 = int(time.time()) - 60
|
||
p2 = int(time.time()) + 3600
|
||
table.put_item(Item={'p': p1})
|
||
table.put_item(Item={'p': p2})
|
||
start_time = time.time()
|
||
|
||
def check_expired():
|
||
return not 'Item' in table.get_item(Key={'p': p1}) and 'Item' in table.get_item(Key={'p': p2})
|
||
|
||
while time.time() < start_time + max_duration:
|
||
print(f"--- {int(time.time()-start_time)} seconds")
|
||
if 'Item' in table.get_item(Key={'p': p1}):
|
||
print("p1 alive")
|
||
if 'Item' in table.get_item(Key={'p': p2}):
|
||
print("p2 alive")
|
||
if check_expired():
|
||
break
|
||
time.sleep(sleep)
|
||
# After the delay, p2 should be alive, p1 should not
|
||
assert check_expired()
|
||
|
||
def test_ttl_expiration_range(dynamodb, waits_for_expiration):
|
||
max_duration = 1200 if is_aws(dynamodb) else 240
|
||
sleep = 30 if is_aws(dynamodb) else 0.1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'N' } ]) as table:
|
||
ttl_spec = {'AttributeName': 'c', 'Enabled': True}
|
||
table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
# c1 is in the past, and should be expired. c2 is in the distant
|
||
# future and should not be expired.
|
||
p = random_string()
|
||
c1 = int(time.time()) - 60
|
||
c2 = int(time.time()) + 3600
|
||
table.put_item(Item={'p': p, 'c': c1})
|
||
table.put_item(Item={'p': p, 'c': c2})
|
||
start_time = time.time()
|
||
|
||
def check_expired():
|
||
return not 'Item' in table.get_item(Key={'p': p, 'c': c1}) and 'Item' in table.get_item(Key={'p': p, 'c': c2})
|
||
|
||
while time.time() < start_time + max_duration:
|
||
print(f"--- {int(time.time()-start_time)} seconds")
|
||
if 'Item' in table.get_item(Key={'p': p, 'c': c1}):
|
||
print("c1 alive")
|
||
if 'Item' in table.get_item(Key={'p': p, 'c': c2}):
|
||
print("c2 alive")
|
||
if check_expired():
|
||
break
|
||
time.sleep(sleep)
|
||
# After the delay, c2 should be alive, c1 should not
|
||
assert check_expired()
|
||
|
||
# In the above key-attribute tests, the key attribute we used for the
|
||
# expiration-time attribute had the expected numeric type. If the key
|
||
# attribute has a non-numeric type (e.g., string), it can never contain
|
||
# an expiration time, so nothing will ever expire - but although DynamoDB
|
||
# knows this, it doesn't refuse this setting anyway - although it could.
|
||
# This test demonstrates that:
|
||
@pytest.mark.veryslow
|
||
def test_ttl_expiration_hash_wrong_type(dynamodb):
|
||
duration = 900 if is_aws(dynamodb) else 3
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ]) as table:
|
||
ttl_spec = {'AttributeName': 'p', 'Enabled': True}
|
||
table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
# p1 is in the past, but it's a string, not the required numeric type,
|
||
# so the item should never expire.
|
||
p1 = str(int(time.time()) - 60)
|
||
table.put_item(Item={'p': p1})
|
||
start_time = time.time()
|
||
while time.time() < start_time + duration:
|
||
print(f"--- {int(time.time()-start_time)} seconds")
|
||
if 'Item' in table.get_item(Key={'p': p1}):
|
||
print("p1 alive")
|
||
time.sleep(duration/30)
|
||
# After the delay, p2 should be alive, p1 should not
|
||
assert 'Item' in table.get_item(Key={'p': p1})
|
||
|
||
# Test how in a table with a GSI or LSI, expiring an item also removes it
|
||
# from the GSI and LSI.
|
||
# We already tested above the various reasons for an item expiring or not
|
||
# expiring, so we don't need to continue testing these various cases here,
|
||
# and can test just one expiring item. This also allows us to finish the
|
||
# test as soon as the item expires, instead of after a fixed "duration" as
|
||
# in the above tests.
|
||
def test_ttl_expiration_gsi_lsi(dynamodb, waits_for_expiration):
|
||
# In our experiments we noticed that when a table has secondary indexes,
|
||
# items are expired with significant delay. Whereas a 10 minute delay
|
||
# for regular tables was typical, in the table we have here we saw
|
||
# a typical delay of 30 minutes before items expired.
|
||
max_duration = 3600 if is_aws(dynamodb) else 240
|
||
sleep = 30 if is_aws(dynamodb) else 0.1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[
|
||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' },
|
||
],
|
||
LocalSecondaryIndexes=[
|
||
{ 'IndexName': 'lsi',
|
||
'KeySchema': [
|
||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'l', 'KeyType': 'RANGE' },
|
||
],
|
||
'Projection': { 'ProjectionType': 'ALL' },
|
||
},
|
||
],
|
||
GlobalSecondaryIndexes=[
|
||
{ 'IndexName': 'gsi',
|
||
'KeySchema': [
|
||
{ 'AttributeName': 'g', 'KeyType': 'HASH' },
|
||
],
|
||
'Projection': { 'ProjectionType': 'ALL' }
|
||
},
|
||
],
|
||
AttributeDefinitions=[
|
||
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'g', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'l', 'AttributeType': 'S' },
|
||
]) as table:
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
response = table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
assert 'TimeToLiveSpecification' in response
|
||
assert response['TimeToLiveSpecification'] == ttl_spec
|
||
p = random_string()
|
||
c = random_string()
|
||
g = random_string()
|
||
l = random_string()
|
||
|
||
def check_alive():
|
||
base_alive = 'Item' in table.get_item(Key={'p': p, 'c': c})
|
||
gsi_alive = bool(full_query(table, IndexName='gsi',
|
||
ConsistentRead=False,
|
||
KeyConditionExpression="g=:g",
|
||
ExpressionAttributeValues={':g': g}))
|
||
lsi_alive = bool(full_query(table, IndexName='lsi',
|
||
KeyConditionExpression="p=:p and l=:l",
|
||
ExpressionAttributeValues={':p': p, ':l': l}))
|
||
return (base_alive, gsi_alive, lsi_alive)
|
||
|
||
# Non-expiring item, so should be visible in the base table, GSI and
|
||
# LSI. It can take a bit of time for the GSI value to become visible.
|
||
table.put_item(Item={'p': p, 'c': c, 'g': g, 'l': l})
|
||
start_time = time.time()
|
||
while time.time() < start_time + 30:
|
||
base_alive, gsi_alive, lsi_alive = check_alive()
|
||
if base_alive and gsi_alive and lsi_alive:
|
||
break
|
||
time.sleep(sleep)
|
||
assert base_alive and gsi_alive and lsi_alive
|
||
|
||
# Rewrite the item with expiration one minute in the past, so item
|
||
# should expire ASAP - in all of base, GSI and LSI.
|
||
expiration = int(time.time()) - 60
|
||
table.put_item(Item={'p': p, 'c': c, 'g': g, 'l': l, 'expiration': expiration})
|
||
start_time = time.time()
|
||
while time.time() < start_time + max_duration:
|
||
base_alive, gsi_alive, lsi_alive = check_alive()
|
||
# If the base item, gsi item and lsi item have all expired, the
|
||
# test is done - and successful:
|
||
if not base_alive and not gsi_alive and not lsi_alive:
|
||
return
|
||
time.sleep(sleep)
|
||
assert not base_alive and not gsi_alive and not lsi_alive
|
||
|
||
# Above in test_ttl_expiration_hash() and test_ttl_expiration_range() we
|
||
# checked the case where TTL's expiration-time attribute is not a regular
|
||
# attribute but one of the two key attributes. Alternator encodes these
|
||
# attributes differently (as a real column in the schema - not a serialized
|
||
# JSON inside a map column), so we needed to check that such an attribute is
|
||
# usable as well. LSI *also* currently implements their keys differently
|
||
# (like the base key), so let's check that having the TTL column an LSI key
|
||
# still works. Even though it is unlikely that any user would want to have
|
||
# such a use case.
|
||
# For GSIs, to support the ability to add a GSI to an existing table, a
|
||
# GSI key can be any regular attribute in the base table, so we don't need
|
||
# a special test for GSIs.
|
||
def test_ttl_expiration_lsi_key(dynamodb, waits_for_expiration):
|
||
# In my experiments, a 30-minute (1800 seconds) is the typical delay
|
||
# for expiration in this test for DynamoDB
|
||
max_duration = 3600 if is_aws(dynamodb) else 240
|
||
sleep = 30 if is_aws(dynamodb) else 0.1
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[
|
||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' },
|
||
],
|
||
LocalSecondaryIndexes=[
|
||
{ 'IndexName': 'lsi',
|
||
'KeySchema': [
|
||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'l', 'KeyType': 'RANGE' },
|
||
],
|
||
'Projection': { 'ProjectionType': 'ALL' },
|
||
},
|
||
],
|
||
AttributeDefinitions=[
|
||
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'l', 'AttributeType': 'N' },
|
||
]) as table:
|
||
# The expiration-time attribute is the LSI key "l":
|
||
ttl_spec = {'AttributeName': 'l', 'Enabled': True}
|
||
response = table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
assert 'TimeToLiveSpecification' in response
|
||
assert response['TimeToLiveSpecification'] == ttl_spec
|
||
p = random_string()
|
||
c = random_string()
|
||
l = random_string()
|
||
# expiration one minute in the past, so item should expire ASAP.
|
||
expiration = int(time.time()) - 60
|
||
table.put_item(Item={'p': p, 'c': c, 'l': expiration})
|
||
start_time = time.time()
|
||
gsi_was_alive = False
|
||
while time.time() < start_time + max_duration:
|
||
if 'Item' not in table.get_item(Key={'p': p, 'c': c}):
|
||
# test is done - and successful:
|
||
return
|
||
time.sleep(sleep)
|
||
pytest.fail('base not expired')
|
||
|
||
# Check that in the DynamoDB Streams API, an event appears about an item
|
||
# becoming expired. This event should contain be a REMOVE event, contain
|
||
# the appropriate information about the expired item (its key and/or its
|
||
# content), and a special userIdentity flag saying that this is not a regular
|
||
# REMOVE but an expiration. Reproduces issue #11523.
|
||
def test_ttl_expiration_streams(dynamodb, dynamodbstreams, waits_for_expiration):
|
||
# Alternator Streams currently doesn't work with tablets, so until
|
||
# #23838 is solved, skip this test on tablets.
|
||
for tag in TAGS:
|
||
if tag['Key'] == 'system:initial_tablets' and tag['Value'].isdigit():
|
||
pytest.skip("Streams test skipped on tablets due to #23838")
|
||
|
||
# In my experiments, a 30-minute (1800 seconds) is the typical
|
||
# expiration delay in this test. If the test doesn't finish within
|
||
# max_duration, we report a failure.
|
||
max_duration = 3600 if is_aws(dynamodb) else 10
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[
|
||
{ 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' },
|
||
],
|
||
AttributeDefinitions=[
|
||
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
||
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
||
],
|
||
StreamSpecification={
|
||
'StreamEnabled': True,
|
||
'StreamViewType': 'NEW_AND_OLD_IMAGES'}
|
||
) as table:
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
|
||
# Before writing to the table, wait for the stream to become active
|
||
# so we can be sure that the expiration - even if it miraculously
|
||
# happens in a second (it usually takes 30 minutes) - is guaranteed
|
||
# to reach the stream.
|
||
stream_enabled = False
|
||
start_time = time.time()
|
||
while time.time() < start_time + 120:
|
||
desc = table.meta.client.describe_table(TableName=table.name)['Table']
|
||
if 'LatestStreamArn' in desc:
|
||
arn = desc['LatestStreamArn']
|
||
desc = dynamodbstreams.describe_stream(StreamArn=arn)
|
||
if desc['StreamDescription']['StreamStatus'] == 'ENABLED':
|
||
stream_enabled = True
|
||
break
|
||
time.sleep(10)
|
||
assert stream_enabled
|
||
|
||
# Write a single expiring item. Set its expiration one minute in the
|
||
# past, so the item should expire ASAP.
|
||
p = random_string()
|
||
c = random_string()
|
||
expiration = int(time.time()) - 60
|
||
table.put_item(Item={'p': p, 'c': c, 'animal': 'dog', 'expiration': expiration})
|
||
|
||
# Wait (up to max_duration) for the item to expire, and for the
|
||
# expiration event to reach the stream:
|
||
start_time = time.time()
|
||
event_found = False
|
||
while time.time() < start_time + max_duration:
|
||
print(f"--- {int(time.time()-start_time)} seconds")
|
||
item_expired = not 'Item' in table.get_item(Key={'p': p, 'c': c})
|
||
for record in read_entire_stream(dynamodbstreams, table):
|
||
# An expired item has a specific userIdentity as follows:
|
||
if record.get('userIdentity') == { 'Type': 'Service', 'PrincipalId': 'dynamodb.amazonaws.com' }:
|
||
# The expired item should be a REMOVE, and because we
|
||
# asked for old images and both the key and the full
|
||
# content.
|
||
assert record['eventName'] == 'REMOVE'
|
||
assert record['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}
|
||
assert record['dynamodb']['OldImage'] == {'p': {'S': p}, 'c': {'S': c}, 'animal': {'S': 'dog'}, 'expiration': {'N': str(expiration)} }
|
||
event_found = True
|
||
print(f"item expired {item_expired} event {event_found}")
|
||
if item_expired and event_found:
|
||
return
|
||
time.sleep(max_duration/15)
|
||
pytest.fail('item did not expire or event did not reach stream')
|
||
|
||
# Utility function for reading the entire contents of a table's DynamoDB
|
||
# Streams. This function is only useful when we expect only a handful of
|
||
# items, and performance is not important, because nothing is cached between
|
||
# calls. So it's only used in "veryslow"-marked tests above.
|
||
def read_entire_stream(dynamodbstreams, table):
|
||
# Look for the latest stream. If there is none, return nothing
|
||
desc = table.meta.client.describe_table(TableName=table.name)['Table']
|
||
if not 'LatestStreamArn' in desc:
|
||
return []
|
||
arn = desc['LatestStreamArn']
|
||
# List all shards of the stream in an array "shards":
|
||
response = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']
|
||
shards = [x['ShardId'] for x in response['Shards']]
|
||
while 'LastEvaluatedShardId' in response:
|
||
response = dynamodbstreams.describe_stream(StreamArn=arn,
|
||
ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']
|
||
shards.extend([x['ShardId'] for x in response['Shards']])
|
||
records = []
|
||
for shard_id in shards:
|
||
# Get an iterator for everything (TRIM_HORIZON) in the shard
|
||
iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,
|
||
ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']
|
||
while iter != None:
|
||
response = dynamodbstreams.get_records(ShardIterator=iter)
|
||
# DynamoDB will continue returning records until reaching the
|
||
# current end, and only then we will get an empty response.
|
||
if len(response['Records']) == 0:
|
||
break
|
||
records.extend(response['Records'])
|
||
iter = response.get('NextShardIterator')
|
||
return records
|
||
|
||
# Whereas tests above use tiny tables with just a few items, this one creates
|
||
# a table with significantly more items - one that will need to be scanned
|
||
# in more than one page - so verifies that the expiration scanner service
|
||
# does the paging properly.
|
||
@pytest.mark.veryslow
|
||
def test_ttl_expiration_long(dynamodb, waits_for_expiration):
|
||
# Write 100*N items to the table, 1/100th of them are expired. The test
|
||
# completes when all of them are expired (or on timeout).
|
||
# To compilicate matter for the paging that we're trying to test,
|
||
# have N partitions with 100 items in each, so potentially the paging
|
||
# might stop in the middle of a partition.
|
||
# We need to pick a relatively big N to cause the 100*N items to exceed
|
||
# the size of a single page (I tested this by stopping the scanner after
|
||
# the first page and checking which N starts to generate incorrect results)
|
||
N=400
|
||
max_duration = 1200 if is_aws(dynamodb) else 15
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
||
{ 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'N' },
|
||
{ 'AttributeName': 'c', 'AttributeType': 'N' }]) as table:
|
||
ttl_spec = {'AttributeName': 'expiration', 'Enabled': True}
|
||
response = table.meta.client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification=ttl_spec)
|
||
with table.batch_writer() as batch:
|
||
for p in range(N):
|
||
for c in range(100):
|
||
# Only the first item (c==0) in each partition will expire
|
||
expiration = int(time.time())-60 if c==0 else int(time.time())+3600
|
||
batch.put_item(Item={
|
||
'p': p, 'c': c, 'expiration': expiration })
|
||
start_time = time.time()
|
||
while time.time() < start_time + max_duration:
|
||
count = 0
|
||
response = table.scan(ConsistentRead=True, Select='COUNT')
|
||
if 'Count' in response:
|
||
count += response['Count']
|
||
while 'LastEvaluatedKey' in response:
|
||
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
||
ConsistentRead=True, Select='COUNT')
|
||
if 'Count' in response:
|
||
count += response['Count']
|
||
print(count)
|
||
if count == 99*N:
|
||
break
|
||
time.sleep(max_duration/100.0)
|
||
assert count == 99*N
|
||
|
||
# Alternator uses a tag "system:ttl_attribute" to store the TTL attribute
|
||
# chosen by UpdateTimeToLive. However, this tag is not supposed to be
|
||
# readable or writable by the user directly - it should be read or written
|
||
# only with the usual UpdateTimeToLive and DescribeTimeToLive operations.
|
||
# The following two test confirms that this is the case. The first test
|
||
# checks that the internal tag is invisible, i.e., not returned by
|
||
# ListTagsOfResource. Basically we check that enabling TTL does not add
|
||
# any tags to the list of tags.
|
||
# Reproduces issue #24098.
|
||
def test_ttl_tag_is_invisible(dynamodb):
|
||
with new_test_table(dynamodb,
|
||
Tags=TAGS,
|
||
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
|
||
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]
|
||
) as table:
|
||
client = table.meta.client
|
||
client.update_time_to_live(TableName=table.name,
|
||
TimeToLiveSpecification={'AttributeName': 'x', 'Enabled': True})
|
||
# Verify that TTL is set for this table, but no extra tags
|
||
# like "system:ttl_attribute" (or anything else) are visible
|
||
# in ListTagsOfResource:
|
||
assert client.describe_time_to_live(TableName=table.name)['TimeToLiveDescription'] == {'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'x'}
|
||
arn = client.describe_table(TableName=table.name)['Table']['TableArn']
|
||
assert multiset(TAGS) == multiset(client.list_tags_of_resource(ResourceArn=arn)['Tags'])
|
||
|
||
# Now check that the internal tag system:ttl_attribute cannot be written with
|
||
# TagResource or UntagResource (it can only be modified by UpdateTimeToLive).
|
||
# This is an Scylla-only test because in DynamoDB, there is nothing
|
||
# special about the tag name "system:ttl_attribute", and it can be written.
|
||
# Reproduces issue #24098.
|
||
def test_ttl_tag_is_unwritable(test_table, scylla_only):
|
||
tag_name = 'system:ttl_attribute'
|
||
client = test_table.meta.client
|
||
arn = client.describe_table(TableName=test_table.name)['Table']['TableArn']
|
||
with pytest.raises(ClientError, match='ValidationException.*internal'):
|
||
client.tag_resource(ResourceArn=arn, Tags=[{'Key': tag_name, 'Value': 'x'}])
|
||
with pytest.raises(ClientError, match='ValidationException.*internal'):
|
||
client.untag_resource(ResourceArn=arn, TagKeys=[tag_name])
|