test: add complete_multipart_upload completion tests

A primitive python http server is processing s3 client requests and issues either success or error. A multipart uploader should fail or succeed (with or without retries) depending on aforementioned server response
This commit is contained in:
Ernest Zaslavsky
2024-09-19 16:56:29 +03:00
parent 3be6052786
commit 5a96549c86
7 changed files with 522 additions and 0 deletions

View File

@@ -555,6 +555,8 @@ scylla_tests = set([
'test/boost/row_cache_test',
'test/boost/rust_test',
'test/boost/s3_test',
'test/boost/aws_errors_test',
'test/boost/aws_error_injection_test',
'test/boost/schema_change_test',
'test/boost/schema_changes_test',
'test/boost/schema_loader_test',
@@ -1043,6 +1045,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/gz/crc_combine.cc',
'utils/gz/crc_combine_table.cc',
'utils/s3/client.cc',
'utils/s3/aws_error.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',

View File

@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.host_registry import HostRegistry
from test.pylib.pool import Pool
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
from test.pylib.util import LogPrefixAdapter
from test.pylib.scylla_cluster import ScyllaServer, ScyllaCluster, get_cluster_manager, merge_cmdline_options
@@ -1538,6 +1539,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
await ms.start()
TestSuite.artifacts.add_exit_artifact(None, ms.stop)
hosts = HostRegistry()
mock_s3_server = MockS3Server(await hosts.lease_host(), 2012,
LogPrefixAdapter(logging.getLogger('s3_mock'), {'prefix': 's3_mock'}))
await mock_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
console.print_start_blurb()
try:
TestSuite.artifacts.add_exit_artifact(None, TestSuite.hosts.cleanup)

View File

@@ -261,6 +261,10 @@ add_scylla_test(rust_test
LIBRARIES inc)
add_scylla_test(s3_test
KIND SEASTAR)
add_scylla_test(aws_errors_test
KIND BOOST)
add_scylla_test(aws_error_injection_test
KIND SEASTAR)
add_scylla_test(schema_change_test
KIND SEASTAR)
add_scylla_test(schema_changes_test

View File

@@ -0,0 +1,151 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/tmpdir.hh"
#include "utils/exceptions.hh"
#include "utils/s3/client.hh"
#include <cstdlib>
#include <seastar/core/fstream.hh>
#include <seastar/core/units.hh>
#include <seastar/http/httpd.hh>
#include <seastar/util/closeable.hh>
using namespace seastar;
using namespace std::chrono_literals;
enum class failure_policy : uint8_t {
SUCCESS = 0,
RETRYABLE_FAILURE = 1,
NONRETRYABLE_FAILURE = 2,
};
static uint16_t get_port() {
return std::stoi(tests::getenv_safe("MOCK_S3_SERVER_PORT"));
}
static std::string get_address() {
return tests::getenv_safe("MOCK_S3_SERVER_HOST");
}
static s3::endpoint_config_ptr make_minio_config() {
s3::endpoint_config cfg = {
.port = get_port(),
.use_https = false,
.aws = {{
.access_key_id = "foo",
.secret_access_key = "bar",
.session_token = "baz",
.region = "us-east-1",
}},
};
return make_lw_shared<s3::endpoint_config>(std::move(cfg));
}
static void register_policy(const std::string& key, failure_policy policy) {
auto cln = http::experimental::client(socket_address(net::inet_address(get_address()), get_port()));
auto close_client = deferred_close(cln);
auto req = http::request::make("PUT", get_address(), "/");
req._headers["Content-Length"] = "0";
req.query_parameters["Key"] = key;
req.query_parameters["Policy"] = std::to_string(std::to_underlying(policy));
cln.make_request(std::move(req), [](const http::reply&, input_stream<char>&&) -> future<> { return seastar::make_ready_future(); }).get();
}
void test_client_upload_file(std::string_view test_name, failure_policy policy, size_t total_size, size_t memory_size) {
tmpdir tmp;
const auto file_path = tmp.path() / fmt::format("test-{}", ::getpid());
{
file f = open_file_dma(file_path.native(), open_flags::create | open_flags::wo).get();
auto output = make_file_output_stream(std::move(f)).get();
auto close_file = deferred_close(output);
std::string_view data = "1234567890ABCDEF";
// so we can test !with_remainder case properly with multiple writes
SCYLLA_ASSERT(total_size % data.size() == 0);
for (size_t bytes_written = 0; bytes_written < total_size; bytes_written += data.size()) {
output.write(data.data(), data.size()).get();
}
}
const auto object_name = fmt::format("/{}/{}-{}", "test", test_name, ::getpid());
register_policy(object_name, policy);
semaphore mem{memory_size};
auto client = s3::client::make(get_address(), make_minio_config(), mem);
auto client_shutdown = deferred_close(*client);
client->upload_file(file_path, object_name).get();
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_success) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_NO_THROW(test_client_upload_file(seastar_test::get_name(), failure_policy::SUCCESS, total_size, memory_size));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_retryable_success) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
});
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size),
storage_io_error,
[](const storage_io_error& e) { return e.code().value() == EIO; });
}
void do_test_client_multipart_upload(failure_policy policy) {
const sstring name(fmt::format("/{}/testobject-{}", "test", ::getpid()));
register_policy(name, policy);
testlog.info("Make client");
semaphore mem(16 << 20);
auto cln = s3::client::make(get_address(), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object");
auto out = output_stream<char>(cln->make_upload_sink(name));
auto close_stream = deferred_close(out);
static constexpr unsigned chunk_size = 1000;
auto rnd = tests::random::get_bytes(chunk_size);
for (unsigned ch = 0; ch < 128_KiB; ch++) {
out.write(reinterpret_cast<char*>(rnd.begin()), rnd.size()).get();
}
testlog.info("Flush multipart upload");
out.flush().get();
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_success) {
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::SUCCESS));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_retryable_success) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
});
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
});
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#define BOOST_TEST_MODULE object_storage
#include "utils/s3/aws_error.hh"
#include <boost/test/unit_test.hpp>
#include <seastar/core/sstring.hh>
enum class message_style : uint8_t { singular = 1, plural = 2 };
namespace aws {
std::ostream& boost_test_print_type(std::ostream& os, const aws::aws_error_type& error_type) {
return os << fmt::underlying(error_type);
}
} // namespace aws
static seastar::sstring
build_xml_response(const std::string& exception, const std::string& message, const std::string& requestId, message_style style = message_style::singular) {
return fmt::format(R"(<?xml version="1.0" encoding="UTF-8"?>
{}
{}
<Error>
<Code>{}</Code>
<Message>{}</Message>
<Resource>/mybucket/myfile.bin</Resource>
{}
</Error>
{}
{}
{})",
style == message_style::plural ? "<OtherRoot>" : "",
style == message_style::plural ? "<Errors>" : "",
exception,
message,
style == message_style::singular ? "<RequestId>" + requestId + "</RequestId>" : "",
style == message_style::plural ? "</Errors>" : "",
style == message_style::plural ? "<RequestId>" + requestId + "</RequestId>" : "",
style == message_style::plural ? "</OtherRoot>" : "");
}
BOOST_AUTO_TEST_CASE(TestXmlErrorPayload) {
std::string message = "Test Message";
std::string requestId = "Request Id";
aws::aws_error error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId));
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INCOMPLETE_SIGNATURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural));
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural));
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
error = aws::aws_error::parse("");
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
BOOST_REQUIRE_EQUAL("", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
error =
aws::aws_error::parse("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
BOOST_REQUIRE_EQUAL("", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
std::string response = " ";
response += build_xml_response("InternalFailure", message, requestId, message_style::singular);
error = aws::aws_error::parse(response);
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
}
BOOST_AUTO_TEST_CASE(TestErrorsWithPrefixParse) {
std::string message = "Test Message";
std::string exceptionPrefix = "blahblahblah#";
std::string requestId = "Request Id";
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId));
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
}
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId));
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
}
BOOST_AUTO_TEST_CASE(TestErrorsWithoutPrefixParse) {
std::string message = "Test Message";
std::string requestId = "Request Id";
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId));
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
}
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId));
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
}

View File

@@ -0,0 +1,219 @@
#!/usr/bin/python3
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Mock S3 server to inject errors for testing.
import os
import sys
import asyncio
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import uuid
from enum import Enum
from functools import partial
from collections import OrderedDict
sys.path.insert(0, os.path.dirname(__file__))
class Policy(Enum):
SUCCESS = 0
RETRYABLE_FAILURE = 1
NONRETRYABLE_FAILURE = 2
class LRUCache:
lock = threading.Lock()
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: str) -> Policy:
with self.lock:
if key not in self.cache:
return Policy.SUCCESS
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Policy) -> None:
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
# Simple HTTP handler for testing (at this moment) and injecting errors for S3 multipart uploads. It can be easily
# extended to support whatever S3 call is needed by adding any logic to the `do_*` handlers. Also one can extend this
# handler to process additional HTTP methods, for example HEAD.
# In case one have to keep some sort of tracking or state there is shared LRUCache maintained by the MockS3Server class
# (see below). At this moment this LRU is used to track failure policies for the test case by tracking it using the S3
# object name. One can extend it to whatever key to accommodate new needs.
class InjectingHandler(BaseHTTPRequestHandler):
def __init__(self, policies, logger, *args, **kwargs):
self.policies = policies
self.logger = logger
super().__init__(*args, **kwargs)
self.close_connection = False
def log_error(self, format, *args):
if self.logger:
self.logger.info("%s - - [%s] %s\n" %
(self.client_address[0],
self.log_date_time_string(),
format % args))
else:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def log_message(self, format, *args):
# Just don't be too verbose
if not self.logger:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def parsed_qs(self):
parsed_url = urlparse(self.path)
query_components = parse_qs(parsed_url.query)
for key in parsed_url.query.split('&'):
if '=' not in key:
query_components[key] = ['']
return query_components
# Processes POST method, see `build_POST_reponse` for details
def do_POST(self):
content_length = self.headers['Content-Length']
if content_length:
put_data = self.rfile.read(int(content_length))
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Connection', 'keep-alive')
response = self.build_POST_reponse(self.parsed_qs(), urlparse(self.path).path).encode('utf-8')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
# Processes PUT method, at this moment it serves for providing response for S3 upload part method, also it used to
# set values to the LRU cache entries by calling PUT with Key and Policy on the query string
def do_PUT(self):
content_length = self.headers['Content-Length']
if content_length:
put_data = self.rfile.read(int(content_length))
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Content-Length', '0')
self.send_header('Connection', 'keep-alive')
query_components = self.parsed_qs()
if 'Key' in query_components and 'Policy' in query_components:
self.policies.put(query_components['Key'][0], Policy(int(query_components['Policy'][0])))
elif 'uploadId' in query_components and 'partNumber' in query_components:
self.send_header('ETag', "SomeTag_" + query_components.get("partNumber")[0])
else:
self.send_header('ETag', "SomeTag")
self.end_headers()
# Processes DELETE method, does nothing except providing in the response expected headers and response code
def do_DELETE(self):
self.send_response(204)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Content-Length', '0')
self.send_header('Connection', 'keep-alive')
self.end_headers()
# This function builds the XML response to satisfy the request type, for example `InitiateMultipartUpload` or
# `CompleteMultipartUpload`, and failure policy. For example, when the policy is `RETRYABLE_FAILURE` it will
# transmit the erroneous response for the first time and then switch the policy to `SUCCESS`. One can extend this
# mechanism by adding additional policies or rewriting the response body logic
def build_POST_reponse(self, query, path):
if 'uploads' in query:
req_uuid = str(uuid.uuid4())
return f"""<InitiateMultipartUploadResult>
<Bucket>bucket</Bucket>
<Key>key</Key>
<UploadId>{req_uuid}</UploadId>
</InitiateMultipartUploadResult>"""
if 'uploadId' in query:
match self.policies.get(path):
case Policy.SUCCESS:
return """<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://Example-Bucket.s3.Region.amazonaws.com/Example-Object</Location>
<Bucket>Example-Bucket</Bucket>
<Key>Example-Object</Key>
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
</CompleteMultipartUploadResult>"""
case Policy.RETRYABLE_FAILURE:
# should succeed on retry
self.policies.put(path, Policy.SUCCESS)
return """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InternalError</Code>
<Message>We encountered an internal error. Please try again.</Message>
<RequestId>656c76696e6727732072657175657374</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>"""
case Policy.NONRETRYABLE_FAILURE:
return """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>InvalidAction</Code>
<Message>Something went terribly wrong</Message>
<RequestId>656c76696e6727732072657175657374</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>"""
case _:
raise ValueError("Unknown policy")
# Mock server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py` once the
# pytest kicks in. In addition, it is possible just to start this server using another script - `start_mock.py` to
# run it locally to provide mocking functionality for tests executed during development process
class MockS3Server:
def __init__(self, host, port, logger=None):
self.req_states = LRUCache(10000)
handler = partial(InjectingHandler, self.req_states, logger)
self.server = ThreadingHTTPServer((host, port), handler)
self.server_thread = None
self.server.request_queue_size = 1000
self.server.timeout = 10000
self.server.socket.settimeout(10000)
self.server.socket.listen(1000)
self.is_running = False
os.environ['MOCK_S3_SERVER_PORT'] = f'{port}'
os.environ['MOCK_S3_SERVER_HOST'] = host
async def start(self):
if not self.is_running:
print(f'Starting S3 mock server on {self.server.server_address}')
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
async def stop(self):
if self.is_running:
print('Stopping S3 mock server')
self.server.shutdown()
await self.server_thread
self.is_running = False
async def run(self):
try:
await self.start()
while self.is_running:
await asyncio.sleep(1)
except Exception as e:
print(f"Server error: {e}")
await self.stop()

24
test/pylib/start_s3_mock.py Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/python3
import asyncio
import sys
import signal
import argparse
from s3_server_mock import MockS3Server
async def run():
parser = argparse.ArgumentParser(description="Start S3 mock server")
parser.add_argument('--host', default='127.0.0.1')
parser.add_argument('--port', type=int, default=2012)
args = parser.parse_args()
server = MockS3Server(args.host, args.port)
print('Starting S3 mock server')
await server.start()
signal.sigwait({signal.SIGINT, signal.SIGTERM})
print('Stopping S3 mock server')
await server.stop()
if __name__ == '__main__':
asyncio.run(run())