Files
scylladb/test/alternator/test_manual_requests.py
Nadav Har'El 32afcdbaf0 test/alternator: enable, and add, tests for gzip'ed requests
After in the previous patch we implemented support in Alternator for
gzip-compressed requests ("Content-Encoding: gzip"), here we enable
an existing xfail-ing test for this feature, and also add more tests
for more cases:

  * A test for longer compressed requests, or a short compressed
    request which expands to a longer request. Since the decompression
    uses small buffers, this test reaches additional code paths.

  * Check for various cases of a malformed gzip'ed request, and also
    an attempt to use an unsupported Content-Encoding. DynamoDB
    returns error 500 for both cases, so we want to test that we
    do to - and not silently ignore such errors.

  * Check that two concatenated gzip'ed streams is a valid request,
    and check that garbage at the end of the gzip - or a missing
    character at the end of the gzip - is recognized as an error.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-11-27 09:42:47 +02:00

590 lines
31 KiB
Python

# Copyright 2020-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for manual requests - not necessarily generated
# by boto3, in order to allow non-validated input to get through
import base64
import json
import pytest
import requests
import urllib3
from botocore.exceptions import BotoCoreError, ClientError
from packaging.version import Version
from test.alternator.util import random_bytes, random_string
def gen_json(n):
return '{"":'*n + '{}' + '}'*n
def get_signed_request(dynamodb, target, payload):
# Usually "payload" will be a Python string and we'll write it as UTF-8.
# but in some tests we may want to write bytes directly - potentially
# bytes which include invalid UTF-8.
payload_bytes = payload if isinstance(payload, bytes) else payload.encode(encoding='UTF-8')
# NOTE: Signing routines use boto3 implementation details and may be prone
# to unexpected changes
class Request:
url=dynamodb.meta.client._endpoint.host
headers={'X-Amz-Target': 'DynamoDB_20120810.' + target, 'Content-Type': 'application/x-amz-json-1.0'}
body=payload_bytes
method='POST'
context={}
params={}
req = Request()
signer = dynamodb.meta.client._request_signer
signer.get_auth(signer.signing_name, signer.region_name).add_auth(request=req)
return req
# Test that deeply nested objects (e.g. with depth of 200k) are parsed correctly,
# i.e. do not cause stack overflows for the server. It's totally fine for the
# server to refuse these packets with an error message though.
# NOTE: The test uses raw HTTP requests, because it's not easy to send
# a deeply nested object via boto3 - it quickly crashes on 'too deep recursion'
# for objects with depth as low as 150 (with sys.getrecursionlimit() == 3000).
# Hence, a request is manually crafted to contain a deeply nested JSON document.
def test_deeply_nested_put(dynamodb, test_table):
big_json = gen_json(200000)
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}, "attribute":' + big_json + '}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
# Check that the request delivery succeeded and the server
# responded with a comprehensible message - it can be either
# a success report or an error - both are acceptable as long as
# the oversized message did not make the server crash.
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
print(response, response.text)
# If the PutItem request above failed, the deeply nested item
# was not put into the database, so it's fine for this request
# to receive a response that it was not found. An error informing
# about not being able to process this request is also acceptable,
# as long as the server didn't crash.
item = test_table.get_item(Key={'p': 'x', 'c':'x'}, ConsistentRead=True)
print(item)
# Test that a too deeply nested object is refused,
# assuming max depth of 32 - and keeping the nested level
# low enough for Python not to choke on it with too deep recursion
def test_exceed_nested_level_a_little(dynamodb, test_table):
p = 'xxx'
c = 'yyy'
nested = dict()
nested_it = nested
for i in range(50):
nested_it['a'] = dict()
nested_it = nested_it['a']
with pytest.raises(ClientError, match='.*Exception.*nested'):
test_table.put_item(Item={'p': p, 'c': c, 'nested': nested})
# Test that we indeed allow the maximum level of 32 nested objects
def test_almost_exceed_nested_level(dynamodb, test_table):
p = 'xxx'
c = 'yyy'
nested = dict()
nested_it = nested
for i in range(30): # 30 added levels + top level + the item itself == 32 total
nested_it['a'] = dict()
nested_it = nested_it['a']
test_table.put_item(Item={'p': p, 'c': c, 'nested': nested})
def test_too_large_request(dynamodb, test_table):
p = 'abc'
c = 'def'
big = 'x' * (16 * 1024 * 1024 + 7)
# The exception type differs due to differences between HTTP servers
# in alternator and DynamoDB. The former returns 413, the latter
# a ClientError explaining that the element size was too large.
with pytest.raises(BotoCoreError):
try:
test_table.put_item(Item={'p': p, 'c': c, 'big': big})
except ClientError:
raise BotoCoreError()
# Tests that a request larger than the 16MB limit is rejected, improving on
# the rather blunt test in test_too_large_request() above. The following two
# tests verify that:
# 1. An over-long request is rejected no matter if it is sent using a
# Content-Length header or chunked encoding (reproduces issue #8196).
# 2. The client should be able to recognize this error as a 413 error, not
# some I/O error like broken pipe. Reproduces issue #8195 and, rarely,
# #12166.
#
# Because issue #12166 is still open, currently we can (in theory, but
# almost never in practice) get a "connection reset by peer" instead of a
# clean 413 reply if the packets get reordered and the RST arrives before
# the reply. So we accept this failure mode too to avoid test flakiness.
# When #12166 is fixed, we should stop allowing that failure mode.
def test_too_large_request_chunked(dynamodb, test_table):
if Version(urllib3.__version__) < Version('1.26'):
pytest.skip("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
# To make a request very large, we just stuff it with a lot of spaces :-)
spaces = ' ' * (17 * 1024 * 1024)
req = get_signed_request(dynamodb, 'PutItem',
'{"TableName": "' + test_table.name + '", ' + spaces + '"Item": {"p": {"S": "x"}, "c": {"S": "x"}}}')
def generator(s):
yield s
try:
response = requests.post(req.url, headers=req.headers, data=generator(req.body), verify=False)
# Until #12166 is fixed, we need this except. See comment above why.
except requests.exceptions.ConnectionError as e:
return
# In issue #8196, Alternator did not recognize the request is too long
# because it uses chunked encoding instead of Content-Length, so the
# request succeeded, and the status_code was 200 instead of 413.
assert response.status_code == 413
# 17 MB is enough to verify that Alternator set a 16 MB limit on request
# length (as DynamoDB does), but let's also try a bigger size, which in
# at some point during the development caused us to reserve too much memory
# and hang.
@pytest.mark.parametrize("mb", [17, 50])
def test_too_large_request_content_length(dynamodb, test_table, mb):
if Version(urllib3.__version__) < Version('1.26'):
pytest.skip("urllib3 before 1.26.0 threw broken pipe and did not read response and cause issue #8195. Fixed by pull request urllib3/urllib3#1524")
spaces = ' ' * (mb * 1024 * 1024)
req = get_signed_request(dynamodb, 'PutItem',
'{"TableName": "' + test_table.name + '", ' + spaces + '"Item": {"p": {"S": "x"}, "c": {"S": "x"}}}')
try:
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
# Until #12166 is fixed, we need this except. See comment above why.
except requests.exceptions.ConnectionError as e:
return
# In issue #8195, Alternator closed the connection early, causing the
# library to incorrectly throw an exception (Broken Pipe) instead noticing
# the error code 413 which the server did send.
assert response.status_code == 413
# In addition to oversized request bodies that cause Scylla to OOM and the
# previous tests verified are limited to 16 MB, there is also a risk that
# huge headers that the Seastar HTTP server reads and saves in memory can
# OOM Scylla. DynamoDB limits the total size of the headers to 16 KB, so
# we can use the same limit too. In all useful cases, headers will be
# much shorter.
# Reproduces #23438.
@pytest.mark.xfail(reason="issue #23438")
def test_too_large_request_headers(dynamodb, test_table):
# First prepare a valid signed request, which works:
req = get_signed_request(dynamodb, 'PutItem',
'{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 200
# Add to the valid request, two extra headers "header1" and "header2",
# with short values. These extra headers are ignored (and do not change
# the signature computation), and the request still works:
headers = dict(req.headers)
headers.update({'header1': 'dog', 'header2': 'cat'})
response = requests.post(req.url, headers=headers, data=req.body, verify=False)
assert response.status_code == 200
# Finally, make the two extra headers long - totaling more than 16 KB.
# The request should now fail with a 400 Bad Request. Although such a
# 400 Bad Request could have many reasons, we know the only difference
# between this request and the previous ones is the length of the extra
# headers, so it proves the server caught the oversized headers.
headers.update({'header1': 'x'*8192, 'header2': 'y'*8192})
response = requests.post(req.url, headers=headers, data=req.body, verify=False)
assert response.status_code == 400
# In addition to oversized request bodies and headers tested in the above
# tests, there is also a risk that a huge request *line* (the URL) can
# cause the Seastar HTTP server to read it into memory and OOM Scylla.
# DynamoDB limits the total size of the request line to 16 KB, so can we
# can use the same limit too. In all useful cases, the request line will
# be much shorter (for ordinary API requests, it is even empty).
# Reproduces #23438.
@pytest.mark.xfail(reason="issue #23438")
def test_too_large_request_line(dynamodb, test_table):
# First prepare a valid signed request, which works:
req = get_signed_request(dynamodb, 'PutItem',
'{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 200
# Add to the valid request's URL some unnecessary garbage at the end,
# but short. Because the URL is part of the signed data, we expect to
# either get a InvalidSignatureException (this is what happens on AWS)
# or a 404 error (this is what happens on Scylla).
url = req.url + '/' + 'garbage'
response = requests.post(url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 404 or 'InvalidSignatureException' in response.text
# Finally, add some very long garbage to the end of the URL. Now we
# don't want to the 404 or InvalidSignatureException that were fine
# with the short URL - because either of those errors would mean that
# the server read the entire URL, and stored it entirely in memory.
# This time, we need to see a 400 Bad Request - but not one with a
# InvalidSignatureException error in its body.
url = req.url + '/' + 'x' * 17000
response = requests.post(url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 400 and not 'InvalidSignatureException' in response.text
def test_incorrect_json(dynamodb, test_table):
correct_req = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
# Check all non-full prefixes of a correct JSON - none of them are valid JSON's themselves
# NOTE: DynamoDB returns two kinds of errors on incorrect input - SerializationException
# or "Page Not Found". Alternator returns "ValidationExeption" for simplicity.
validate_resp = lambda t: "SerializationException" in t or "ValidationException" in t or "Page Not Found" in t
for i in range(len(correct_req)):
req = get_signed_request(dynamodb, 'PutItem', correct_req[:i])
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert validate_resp(response.text)
incorrect_reqs = [
'}}}', '}{', 'habababa', '7', '124463gwe', '><#', '????', '"""', '{"""}', '{""}', '{7}',
'{3: }}', '{"2":{}', ',', '{,}', '{{}}', '"a": "b"', '{{{', '{'*10000 + '}'*9999, '{'*10000 + '}'*10007
]
for incorrect_req in incorrect_reqs:
req = get_signed_request(dynamodb, 'PutItem', incorrect_req)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert validate_resp(response.text)
# Test that the value returned by PutItem is always a JSON object, not an empty string (see #6568)
def test_put_item_return_type(dynamodb, test_table):
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.text
# json::loads throws on invalid input
json.loads(response.text)
# Test that TagResource and UntagResource requests return empty HTTP body on success
def test_tags_return_empty_body(dynamodb, test_table):
descr = test_table.meta.client.describe_table(TableName=test_table.name)['Table']
arn = descr['TableArn']
req = get_signed_request(dynamodb, 'TagResource', '{"ResourceArn": "' + arn + '", "Tags": [{"Key": "k", "Value": "v"}]}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert not response.text
req = get_signed_request(dynamodb, 'UntagResource', '{"ResourceArn": "' + arn + '", "TagKeys": ["k"]}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert not response.text
# Test that incorrect number values are detected
def test_incorrect_numbers(dynamodb, test_table):
for incorrect in ["NaN", "Infinity", "-Infinity", "-NaN", "dog", "-dog"]:
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}, "v": {"N": "' + incorrect + '"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert "ValidationException" in response.text and "numeric" in response.text
# Although the DynamoDB API responses are JSON, additional conventions apply
# to these responses - such as how error codes are encoded in JSON. For this
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
# of the standard 'application/json'. This test verifies that we return the
# correct content type header.
# While most DynamoDB libraries we tried do not care about an unexpected
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
# defined x-amz-json-1.1 - see
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
# which differs (only) in how it encodes error replies.
# So in the future it may become even more important that Scylla return the
# correct content type.
def test_content_type(dynamodb, test_table):
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
# *request*. In the future this may or may not effect the content type
# in the response (today, DynamoDB doesn't allow any other content type
# in the request anyway).
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'
# Alternator implements long responses using a different code path - using a
# streaming body writer instead of a contiguous string object. So we want to
# verify that we got the right Content-type in the streaming-respose code
# path as well.
def test_content_type_long(dynamodb, test_table):
# Alternator currently defines a "long" response as being over 100KB
# (see executor.hh, is_big()). So we'll do a Query returning 200 KB.
p = random_string()
with test_table.batch_writer() as batch:
for i in range(20):
batch.put_item({'p': p, 'c': str(i), 'x': 'x'*10000})
payload = '{"TableName": "' + test_table.name + '", "KeyConditions": {"p": {"AttributeValueList": [{"S": "' + p + '"}], "ComparisonOperator": "EQ"}}}'
req = get_signed_request(dynamodb, 'Query', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 200
assert len(response.text) > 200000
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'
# Error messages also have a Content-Type, and those too have separate code
# generating them, so let's test this case as well.
def test_content_type_error(dynamodb, test_table):
# PutItem without a TableName will generate an error:
payload = '{"Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
r = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert r.status_code == 400 and 'ValidationException' in r.text
assert r.headers['Content-Type'] == 'application/x-amz-json-1.0'
# An unknown operation should result with an UnknownOperationException:
def test_unknown_operation(dynamodb):
req = get_signed_request(dynamodb, 'BoguousOperationName', '{}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 400
assert 'UnknownOperationException' in response.text
print(response.text)
# Reproduce issue #10278, where double-quotes in an error message resulted
# in a broken JSON structure in the error message, which confuses boto3 to
# misunderstand the error response.
# Because this test uses boto3, we can reproduce the error, but not really
# understand what it is. We have another variant of this test below -
# test_exception_escape_raw() - that does the same thing without boto3
# so we can see the error happens during the response JSON parsing.
def test_exception_escape(test_table_s):
# ADD expects its parameter :inc to be an integer. We'll send a string,
# so expect a ValidationException.
with pytest.raises(ClientError) as error:
test_table_s.update_item(Key={'p': 'hello'},
UpdateExpression='ADD n :inc',
ExpressionAttributeValues={':inc': '1'})
r = error.value.response
assert r['Error']['Code'] == 'ValidationException'
assert r['ResponseMetadata']['HTTPStatusCode'] == 400
# Similar to test_exception_escape above, but do the request manually,
# without boto3. This avoids boto3's blundering attempts of covering up
# the error it seens - and allows us to notice that the bug is that
# Alternator returns an unparsable JSON response.
# Reproduces #10278
def test_exception_escape_raw(dynamodb, test_table_s):
payload = '{"TableName": "' + test_table_s.name + '", "Key": {"p": {"S": "hello"}}, "UpdateExpression": "ADD n :inc", "ExpressionAttributeValues": {":inc": {"S": "1"}}}'
req = get_signed_request(dynamodb, 'UpdateItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.status_code == 400
# In issue #10278, the JSON parsing fails:
r = json.loads(response.text)
assert 'ValidationException' in r['__type']
def put_item_binary_data_in_key(dynamodb, test_table_b, item_data):
payload = '{"TableName": "%s", "Item": {"p": {"B": "%s"}}}' % (test_table_b.name, item_data)
req = get_signed_request(dynamodb, 'PutItem', payload)
return requests.post(req.url, headers=req.headers, data=req.body, verify=True)
def put_item_binary_data_in_non_key(dynamodb, test_table_b, item_data):
payload ='''{
"TableName": "%s",
"Item": {
"p": {
"B": "%s"
},
"c": {
"B": "%s"
}
}
}''' % (test_table_b.name, base64.b64encode(random_bytes()).decode(), item_data)
req = get_signed_request(dynamodb, 'PutItem', payload)
return requests.post(req.url, headers=req.headers, data=req.body, verify=True)
# Reproduces issue #6487 where setting binary values with missing "=" padding characters
# was allowed in Scylla.
def test_base64_missing_padding(dynamodb, test_table_b):
r = put_item_binary_data_in_key(dynamodb, test_table_b, "YWJjZGVmZ2g")
assert r.status_code == 400 and 'SerializationException' in r.text
r = put_item_binary_data_in_non_key(dynamodb, test_table_b, "YWJjZGVmZ2g")
assert r.status_code == 400 and 'SerializationException' in r.text
# Tests the case where non base64 text is placed as binary data value.
def test_base64_malformed(dynamodb, test_table_b):
r = put_item_binary_data_in_key(dynamodb, test_table_b, "YWJj??!!")
assert r.status_code == 400 and 'SerializationException' in r.text
r = put_item_binary_data_in_non_key(dynamodb, test_table_b, "YWJj??!!")
assert r.status_code == 400 and 'SerializationException' in r.text
# The check for valid base64 encoding had a bug (#25701) where it used a
# 255-byte lookup-table instead of 256 bytes - so sending a byte 255 as part
# of an invalid base64 string could lead the code to go beyond this lookup
# table's bounds, and not recognize the invalid string or worse (the out-of-
# bound read may be detected and crash Scylla).
# Reproduces issue #25701.
def test_base64_malformed_255(dynamodb, test_table_b):
# No valid UTF-8 can contain the byte we want to use 255 (0xFF), so we
# can't use the function put_item_binary_data_in_key() as in other tests
# because that function only writes UTF-8. So we need to compose the
# payload with the byte 0xFF directly as a Python "bytes" object.
# We need to pad the length of that test string to be a multiple of 4 -
# e.g., "\xFFdog", so the validation code doesn't fail early.
table_name_bytes = test_table_b.name.encode('UTF-8')
payload_bytes = b'{"TableName": "' + table_name_bytes + b'", "Item": {"p": {"B": "\xFFdog"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload_bytes)
r = requests.post(req.url, headers=req.headers, data=req.body, verify=True)
assert r.status_code == 400 and 'SerializationException' in r.text
def update_item_binary_data(dynamodb, test_table_b, item_data):
payload ='''{
"TableName": "%s",
"Key": { "p": { "B": "%s" } },
"UpdateExpression": "SET a = :val",
"ExpressionAttributeValues": {":val": {"B": "%s"} }
}''' % (test_table_b.name, base64.b64encode(random_bytes()).decode(), item_data)
req = get_signed_request(dynamodb, 'UpdateItem', payload)
return requests.post(req.url, headers=req.headers, data=req.body, verify=True)
# The same tests as above for invalid B (binary) values, just for UpdateItem
# instead of PutItem, attempting to reproduce issue #17539.
# An UpdateExpression's SET operation contains the value already in JSON
# encoding, but it should not be trusted implicitly without verifying it is
# valid.
def test_base64_missing_padding_updateitem(dynamodb, test_table_b):
r = update_item_binary_data(dynamodb, test_table_b, "fakebase64")
assert r.status_code == 400 and 'SerializationException' in r.text
def test_base64_malformed_updateitem(dynamodb, test_table_b):
r = update_item_binary_data(dynamodb, test_table_b, "YWJj??!!")
assert r.status_code == 400 and 'SerializationException' in r.text
def scan_with_binary_data_in_cond_expr(dynamodb, test_table_b, filter_expr, expr_attr_values):
payload ='''{
"TableName": "%s",
"FilterExpression": "%s",
"ExpressionAttributeValues": { %s }
}''' % (test_table_b.name, filter_expr, expr_attr_values)
req = get_signed_request(dynamodb, 'Scan', payload)
return requests.post(req.url, headers=req.headers, data=req.body, verify=True)
# Tests the case where malformed binary data is placed as part of filter expression
def test_base64_malformed_cond_expr(dynamodb, test_table_b):
# put some data
c_data = base64.b64encode(b"fefe").decode()
r = put_item_binary_data_in_non_key(dynamodb, test_table_b, c_data)
assert r.status_code == 200
malformed_data = "ZmVmZQ=!" # has the same length as c_data to test begins_with
exp_attr = '''":v": {"B": "%s"}''' % malformed_data
# note that expression "c = :v" or "c in(:v)" would fail on dynamodb but not on alternator
# as we don't deserialize in this case
for exp in [
"c > :v",
":v > c",
"NOT c > :v",
"contains(c, :v)",
"contains(:v, c)",
"c between :v and :v",
":v between c and c",
"begins_with(c, :v)",
"begins_with(:v, c)"]:
r = scan_with_binary_data_in_cond_expr(dynamodb, test_table_b, exp, exp_attr)
assert r.status_code == 400, "Failed on expression \"%s\"" % (exp)
# The assert checks response text and expects error of type ValidationException
# or optionally SerializationException.
# The assert specifically forbids 'assert' word in message to check it is not a fallback
# from RAPIDJSON_ASSERT re-implementation added in util/rjson.hh, that prevents crashing
# on mishandled requests, but produces non-communicative message (see #23233).
def assert_validation_exception(response_text, request_info, accept_serialization_exception=False):
assert "assert" not in response_text, f"RAPIDJSON_ASSERT fallback message for {request_info}"
r = json.loads(response_text)
assert "__type" in r, f"Unexpectedly no error for {request_info}"
assert 'ValidationException' in r['__type'] or \
(accept_serialization_exception and "SerializationException" in r['__type']), \
f"Unexpected error type {r['__type']} for {request_info}"
# Tests some invalid payloads (empty values, wrong types) to BatchWriteItem. Reproduces #23233
def test_batch_write_item_invalid_payload(dynamodb, test_table):
cases = [
'', 'null', '""', '{}',
'{"RequestItems": null}',
'{"RequestItems": ""}',
'{"RequestItems": []}',
'{"RequestItems": {"__TABLE__": null}}',
'{"RequestItems": {"__TABLE__": [null]}}',
'{"RequestItems": {"__TABLE__": [{}}]}}',
'{"RequestItems": {"__TABLE__": [{"PutRequest": null}]}}',
'{"RequestItems": {"__TABLE__": [{"PutRequest": {}}]}}',
'{"RequestItems": {"__TABLE__": [{"PutRequest": {"Item": null}}]}}',
'{"RequestItems": {"__TABLE__": [{"PutRequest": {"Item": {}}}]}}',
'{"RequestItems": {"__TABLE__": [{"DeleteRequest": null}]}}',
'{"RequestItems": {"__TABLE__": [{"DeleteRequest": {}}]}}',
'{"RequestItems": {"__TABLE__": [{"DeleteRequest": {"Key": null}}]}}',
'{"RequestItems": {"__TABLE__": [{"DeleteRequest": {"Key": {}}}]}}'
]
for body in cases:
body = body.replace("__TABLE__", test_table.name)
req = get_signed_request(dynamodb, "BatchWriteItem", body)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert_validation_exception(response.text, f"payload: \'{body}\'", accept_serialization_exception=True)
# Tests payloads with no or empty request list, that used to be handled without error by Alternator,
# while DynamoDB returns ValidationException.
def test_batch_write_item_empty_request_list(dynamodb, test_table, test_table_s):
cases = [
'{"RequestItems": {}}',
'{"RequestItems": {"__TABLE__": []}}',
'{"RequestItems": {"__TABLE_S__": [{"PutRequest": {"Item": {"p": {"S": "hello"}}}}], "__TABLE__": []}}'
]
for body in cases:
body = body.replace("__TABLE__", test_table.name).replace("__TABLE_S__", test_table_s.name)
req = get_signed_request(dynamodb, "BatchWriteItem", body)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert_validation_exception(response.text, f"payload: \'{body}\'")
# Tests that non-object payload of a request will result with ValidationException.
def test_request_payload_must_be_object(dynamodb):
for body in ['null', '[]']:
req = get_signed_request(dynamodb, "ListTables", body)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert_validation_exception(response.text, f"payload: \'{body}\'", accept_serialization_exception=True)
req = get_signed_request(dynamodb, "ListTables", '{}')
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
r = json.loads(response.text)
assert "__type" not in r and "TableNames" in r
# Verify that HTTP keep-alive, i.e., connection reuse, works correctly.
# We send two requests, and check that if keep-alive is requested both
# requests were sent over the same connection, but if keep-alive is disabled
# the two requests are sent over different connections.
# Checking both cases - with and without keep-alive - is important also for
# verifying the validity of the test. It confirms that the test's code that
# tries to detect when a connection is wrongly not reused, does succeed to
# detect non-reuse of the connection.
# This test was requested in #23067
@pytest.mark.parametrize("use_keep_alive", [True, False])
def test_keep_alive(dynamodb, test_table, use_keep_alive):
p = random_string()
req = get_signed_request(dynamodb, 'PutItem', '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "' + p + '"}, "c": {"S": "x"}}}')
# Adding a "Connection: close" header in the !use_keep_alive case tells
# the server to close the connection after the first request, forcing
# the test to use a different connection for the second request.
if not use_keep_alive:
req.headers['Connection'] = 'close'
# Use a requests library's "Session" to allow the library to reuse the
# same connection if the server keeps the connection alive.
session = requests.Session()
# Monkey-patch urllib's connect() functions for both HTTP and HTTPS,
# which the requests library uses, to tell us how many new connections
# were created. If a connection is reused, one fewer connection is created.
connect_count = 0
original_http_connect = urllib3.connection.HTTPConnection.connect
original_https_connect = urllib3.connection.HTTPSConnection.connect
def patched_http_connect(self):
nonlocal connect_count
connect_count += 1
return original_http_connect(self)
def patched_https_connect(self):
nonlocal connect_count
connect_count += 1
return original_https_connect(self)
urllib3.connection.HTTPConnection.connect = patched_http_connect
urllib3.connection.HTTPSConnection.connect = patched_https_connect
try:
# Note that by default (stream=False), post() reads the entire
# response body before returning. So the connection should be
# immediately reusable if the server kept it alive.
session.post(req.url, headers=req.headers, data=req.body, verify=False)
session.post(req.url, headers=req.headers, data=req.body, verify=False)
if use_keep_alive:
assert connect_count == 1 # one connection reused.
else:
assert connect_count == 2 # connection not reused, so two opened.
finally:
urllib3.connection.HTTPConnection.connect = original_http_connect
urllib3.connection.HTTPSConnection.connect = original_https_connect