test: add complete_multipart_upload completion tests
A primitive python http server is processing s3 client requests and issues either success or error. A multipart uploader should fail or succeed (with or without retries) depending on aforementioned server response
This commit is contained in:
@@ -555,6 +555,8 @@ scylla_tests = set([
|
||||
'test/boost/row_cache_test',
|
||||
'test/boost/rust_test',
|
||||
'test/boost/s3_test',
|
||||
'test/boost/aws_errors_test',
|
||||
'test/boost/aws_error_injection_test',
|
||||
'test/boost/schema_change_test',
|
||||
'test/boost/schema_changes_test',
|
||||
'test/boost/schema_loader_test',
|
||||
@@ -1043,6 +1045,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/gz/crc_combine.cc',
|
||||
'utils/gz/crc_combine_table.cc',
|
||||
'utils/s3/client.cc',
|
||||
'utils/s3/aws_error.cc',
|
||||
'gms/version_generator.cc',
|
||||
'gms/versioned_value.cc',
|
||||
'gms/gossiper.cc',
|
||||
|
||||
7
test.py
7
test.py
@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
|
||||
from test.pylib.artifact_registry import ArtifactRegistry
|
||||
from test.pylib.host_registry import HostRegistry
|
||||
from test.pylib.pool import Pool
|
||||
from test.pylib.s3_server_mock import MockS3Server
|
||||
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
|
||||
from test.pylib.util import LogPrefixAdapter
|
||||
from test.pylib.scylla_cluster import ScyllaServer, ScyllaCluster, get_cluster_manager, merge_cmdline_options
|
||||
@@ -1538,6 +1539,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
|
||||
await ms.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, ms.stop)
|
||||
|
||||
hosts = HostRegistry()
|
||||
mock_s3_server = MockS3Server(await hosts.lease_host(), 2012,
|
||||
LogPrefixAdapter(logging.getLogger('s3_mock'), {'prefix': 's3_mock'}))
|
||||
await mock_s3_server.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
|
||||
|
||||
console.print_start_blurb()
|
||||
try:
|
||||
TestSuite.artifacts.add_exit_artifact(None, TestSuite.hosts.cleanup)
|
||||
|
||||
@@ -261,6 +261,10 @@ add_scylla_test(rust_test
|
||||
LIBRARIES inc)
|
||||
add_scylla_test(s3_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(aws_errors_test
|
||||
KIND BOOST)
|
||||
add_scylla_test(aws_error_injection_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(schema_change_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(schema_changes_test
|
||||
|
||||
151
test/boost/aws_error_injection_test.cc
Normal file
151
test/boost/aws_error_injection_test.cc
Normal file
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
#include "utils/s3/client.hh"
|
||||
#include <cstdlib>
|
||||
#include <seastar/core/fstream.hh>
|
||||
#include <seastar/core/units.hh>
|
||||
#include <seastar/http/httpd.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
using namespace seastar;
|
||||
using namespace std::chrono_literals;
|
||||
enum class failure_policy : uint8_t {
|
||||
SUCCESS = 0,
|
||||
RETRYABLE_FAILURE = 1,
|
||||
NONRETRYABLE_FAILURE = 2,
|
||||
};
|
||||
|
||||
static uint16_t get_port() {
|
||||
return std::stoi(tests::getenv_safe("MOCK_S3_SERVER_PORT"));
|
||||
}
|
||||
|
||||
static std::string get_address() {
|
||||
return tests::getenv_safe("MOCK_S3_SERVER_HOST");
|
||||
}
|
||||
|
||||
static s3::endpoint_config_ptr make_minio_config() {
|
||||
s3::endpoint_config cfg = {
|
||||
.port = get_port(),
|
||||
.use_https = false,
|
||||
.aws = {{
|
||||
.access_key_id = "foo",
|
||||
.secret_access_key = "bar",
|
||||
.session_token = "baz",
|
||||
.region = "us-east-1",
|
||||
}},
|
||||
};
|
||||
return make_lw_shared<s3::endpoint_config>(std::move(cfg));
|
||||
}
|
||||
|
||||
static void register_policy(const std::string& key, failure_policy policy) {
|
||||
auto cln = http::experimental::client(socket_address(net::inet_address(get_address()), get_port()));
|
||||
auto close_client = deferred_close(cln);
|
||||
auto req = http::request::make("PUT", get_address(), "/");
|
||||
req._headers["Content-Length"] = "0";
|
||||
req.query_parameters["Key"] = key;
|
||||
req.query_parameters["Policy"] = std::to_string(std::to_underlying(policy));
|
||||
cln.make_request(std::move(req), [](const http::reply&, input_stream<char>&&) -> future<> { return seastar::make_ready_future(); }).get();
|
||||
}
|
||||
|
||||
void test_client_upload_file(std::string_view test_name, failure_policy policy, size_t total_size, size_t memory_size) {
|
||||
tmpdir tmp;
|
||||
const auto file_path = tmp.path() / fmt::format("test-{}", ::getpid());
|
||||
{
|
||||
file f = open_file_dma(file_path.native(), open_flags::create | open_flags::wo).get();
|
||||
auto output = make_file_output_stream(std::move(f)).get();
|
||||
auto close_file = deferred_close(output);
|
||||
std::string_view data = "1234567890ABCDEF";
|
||||
// so we can test !with_remainder case properly with multiple writes
|
||||
SCYLLA_ASSERT(total_size % data.size() == 0);
|
||||
|
||||
for (size_t bytes_written = 0; bytes_written < total_size; bytes_written += data.size()) {
|
||||
output.write(data.data(), data.size()).get();
|
||||
}
|
||||
}
|
||||
|
||||
const auto object_name = fmt::format("/{}/{}-{}", "test", test_name, ::getpid());
|
||||
register_policy(object_name, policy);
|
||||
|
||||
semaphore mem{memory_size};
|
||||
auto client = s3::client::make(get_address(), make_minio_config(), mem);
|
||||
auto client_shutdown = deferred_close(*client);
|
||||
client->upload_file(file_path, object_name).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_success) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_NO_THROW(test_client_upload_file(seastar_test::get_name(), failure_policy::SUCCESS, total_size, memory_size));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_retryable_success) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size),
|
||||
storage_io_error,
|
||||
[](const storage_io_error& e) { return e.code().value() == EIO; });
|
||||
}
|
||||
|
||||
void do_test_client_multipart_upload(failure_policy policy) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", "test", ::getpid()));
|
||||
register_policy(name, policy);
|
||||
testlog.info("Make client");
|
||||
semaphore mem(16 << 20);
|
||||
auto cln = s3::client::make(get_address(), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object");
|
||||
auto out = output_stream<char>(cln->make_upload_sink(name));
|
||||
auto close_stream = deferred_close(out);
|
||||
|
||||
static constexpr unsigned chunk_size = 1000;
|
||||
auto rnd = tests::random::get_bytes(chunk_size);
|
||||
for (unsigned ch = 0; ch < 128_KiB; ch++) {
|
||||
out.write(reinterpret_cast<char*>(rnd.begin()), rnd.size()).get();
|
||||
}
|
||||
|
||||
testlog.info("Flush multipart upload");
|
||||
out.flush().get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_success) {
|
||||
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::SUCCESS));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_retryable_success) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
});
|
||||
}
|
||||
114
test/boost/aws_errors_test.cc
Normal file
114
test/boost/aws_errors_test.cc
Normal file
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE object_storage
|
||||
|
||||
#include "utils/s3/aws_error.hh"
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
enum class message_style : uint8_t { singular = 1, plural = 2 };
|
||||
|
||||
namespace aws {
|
||||
std::ostream& boost_test_print_type(std::ostream& os, const aws::aws_error_type& error_type) {
|
||||
return os << fmt::underlying(error_type);
|
||||
}
|
||||
} // namespace aws
|
||||
|
||||
static seastar::sstring
|
||||
build_xml_response(const std::string& exception, const std::string& message, const std::string& requestId, message_style style = message_style::singular) {
|
||||
return fmt::format(R"(<?xml version="1.0" encoding="UTF-8"?>
|
||||
{}
|
||||
{}
|
||||
<Error>
|
||||
<Code>{}</Code>
|
||||
<Message>{}</Message>
|
||||
<Resource>/mybucket/myfile.bin</Resource>
|
||||
{}
|
||||
</Error>
|
||||
{}
|
||||
{}
|
||||
{})",
|
||||
style == message_style::plural ? "<OtherRoot>" : "",
|
||||
style == message_style::plural ? "<Errors>" : "",
|
||||
exception,
|
||||
message,
|
||||
style == message_style::singular ? "<RequestId>" + requestId + "</RequestId>" : "",
|
||||
style == message_style::plural ? "</Errors>" : "",
|
||||
style == message_style::plural ? "<RequestId>" + requestId + "</RequestId>" : "",
|
||||
style == message_style::plural ? "</OtherRoot>" : "");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(TestXmlErrorPayload) {
|
||||
std::string message = "Test Message";
|
||||
std::string requestId = "Request Id";
|
||||
aws::aws_error error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId));
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INCOMPLETE_SIGNATURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural));
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
|
||||
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural));
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
error = aws::aws_error::parse("");
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
error =
|
||||
aws::aws_error::parse("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
std::string response = " ";
|
||||
response += build_xml_response("InternalFailure", message, requestId, message_style::singular);
|
||||
error = aws::aws_error::parse(response);
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(TestErrorsWithPrefixParse) {
|
||||
std::string message = "Test Message";
|
||||
std::string exceptionPrefix = "blahblahblah#";
|
||||
std::string requestId = "Request Id";
|
||||
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId));
|
||||
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
|
||||
}
|
||||
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId));
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(TestErrorsWithoutPrefixParse) {
|
||||
std::string message = "Test Message";
|
||||
std::string requestId = "Request Id";
|
||||
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
|
||||
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId));
|
||||
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
|
||||
}
|
||||
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId));
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
}
|
||||
219
test/pylib/s3_server_mock.py
Normal file
219
test/pylib/s3_server_mock.py
Normal file
@@ -0,0 +1,219 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
# Mock S3 server to inject errors for testing.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
import threading
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from functools import partial
|
||||
from collections import OrderedDict
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
|
||||
class Policy(Enum):
|
||||
SUCCESS = 0
|
||||
RETRYABLE_FAILURE = 1
|
||||
NONRETRYABLE_FAILURE = 2
|
||||
|
||||
|
||||
class LRUCache:
|
||||
lock = threading.Lock()
|
||||
|
||||
def __init__(self, capacity: int):
|
||||
self.cache = OrderedDict()
|
||||
self.capacity = capacity
|
||||
|
||||
def get(self, key: str) -> Policy:
|
||||
with self.lock:
|
||||
if key not in self.cache:
|
||||
return Policy.SUCCESS
|
||||
self.cache.move_to_end(key)
|
||||
return self.cache[key]
|
||||
|
||||
def put(self, key: str, value: Policy) -> None:
|
||||
with self.lock:
|
||||
if key in self.cache:
|
||||
self.cache.move_to_end(key)
|
||||
self.cache[key] = value
|
||||
if len(self.cache) > self.capacity:
|
||||
self.cache.popitem(last=False)
|
||||
|
||||
|
||||
# Simple HTTP handler for testing (at this moment) and injecting errors for S3 multipart uploads. It can be easily
|
||||
# extended to support whatever S3 call is needed by adding any logic to the `do_*` handlers. Also one can extend this
|
||||
# handler to process additional HTTP methods, for example HEAD.
|
||||
# In case one have to keep some sort of tracking or state there is shared LRUCache maintained by the MockS3Server class
|
||||
# (see below). At this moment this LRU is used to track failure policies for the test case by tracking it using the S3
|
||||
# object name. One can extend it to whatever key to accommodate new needs.
|
||||
class InjectingHandler(BaseHTTPRequestHandler):
|
||||
|
||||
def __init__(self, policies, logger, *args, **kwargs):
|
||||
self.policies = policies
|
||||
self.logger = logger
|
||||
super().__init__(*args, **kwargs)
|
||||
self.close_connection = False
|
||||
|
||||
def log_error(self, format, *args):
|
||||
if self.logger:
|
||||
self.logger.info("%s - - [%s] %s\n" %
|
||||
(self.client_address[0],
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
else:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Just don't be too verbose
|
||||
if not self.logger:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def parsed_qs(self):
|
||||
parsed_url = urlparse(self.path)
|
||||
query_components = parse_qs(parsed_url.query)
|
||||
for key in parsed_url.query.split('&'):
|
||||
if '=' not in key:
|
||||
query_components[key] = ['']
|
||||
return query_components
|
||||
|
||||
# Processes POST method, see `build_POST_reponse` for details
|
||||
def do_POST(self):
|
||||
content_length = self.headers['Content-Length']
|
||||
if content_length:
|
||||
put_data = self.rfile.read(int(content_length))
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
response = self.build_POST_reponse(self.parsed_qs(), urlparse(self.path).path).encode('utf-8')
|
||||
self.send_header('Content-Length', str(len(response)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
|
||||
# Processes PUT method, at this moment it serves for providing response for S3 upload part method, also it used to
|
||||
# set values to the LRU cache entries by calling PUT with Key and Policy on the query string
|
||||
def do_PUT(self):
|
||||
content_length = self.headers['Content-Length']
|
||||
if content_length:
|
||||
put_data = self.rfile.read(int(content_length))
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
query_components = self.parsed_qs()
|
||||
if 'Key' in query_components and 'Policy' in query_components:
|
||||
self.policies.put(query_components['Key'][0], Policy(int(query_components['Policy'][0])))
|
||||
elif 'uploadId' in query_components and 'partNumber' in query_components:
|
||||
self.send_header('ETag', "SomeTag_" + query_components.get("partNumber")[0])
|
||||
else:
|
||||
self.send_header('ETag', "SomeTag")
|
||||
self.end_headers()
|
||||
|
||||
# Processes DELETE method, does nothing except providing in the response expected headers and response code
|
||||
def do_DELETE(self):
|
||||
self.send_response(204)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
self.end_headers()
|
||||
|
||||
# This function builds the XML response to satisfy the request type, for example `InitiateMultipartUpload` or
|
||||
# `CompleteMultipartUpload`, and failure policy. For example, when the policy is `RETRYABLE_FAILURE` it will
|
||||
# transmit the erroneous response for the first time and then switch the policy to `SUCCESS`. One can extend this
|
||||
# mechanism by adding additional policies or rewriting the response body logic
|
||||
def build_POST_reponse(self, query, path):
|
||||
if 'uploads' in query:
|
||||
req_uuid = str(uuid.uuid4())
|
||||
return f"""<InitiateMultipartUploadResult>
|
||||
<Bucket>bucket</Bucket>
|
||||
<Key>key</Key>
|
||||
<UploadId>{req_uuid}</UploadId>
|
||||
</InitiateMultipartUploadResult>"""
|
||||
if 'uploadId' in query:
|
||||
match self.policies.get(path):
|
||||
case Policy.SUCCESS:
|
||||
return """<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||
<Location>http://Example-Bucket.s3.Region.amazonaws.com/Example-Object</Location>
|
||||
<Bucket>Example-Bucket</Bucket>
|
||||
<Key>Example-Object</Key>
|
||||
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
|
||||
</CompleteMultipartUploadResult>"""
|
||||
case Policy.RETRYABLE_FAILURE:
|
||||
# should succeed on retry
|
||||
self.policies.put(path, Policy.SUCCESS)
|
||||
return """<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<Error>
|
||||
<Code>InternalError</Code>
|
||||
<Message>We encountered an internal error. Please try again.</Message>
|
||||
<RequestId>656c76696e6727732072657175657374</RequestId>
|
||||
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
|
||||
</Error>"""
|
||||
case Policy.NONRETRYABLE_FAILURE:
|
||||
return """<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<Error>
|
||||
<Code>InvalidAction</Code>
|
||||
<Message>Something went terribly wrong</Message>
|
||||
<RequestId>656c76696e6727732072657175657374</RequestId>
|
||||
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
|
||||
</Error>"""
|
||||
case _:
|
||||
raise ValueError("Unknown policy")
|
||||
|
||||
|
||||
# Mock server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
|
||||
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py` once the
|
||||
# pytest kicks in. In addition, it is possible just to start this server using another script - `start_mock.py` to
|
||||
# run it locally to provide mocking functionality for tests executed during development process
|
||||
class MockS3Server:
|
||||
def __init__(self, host, port, logger=None):
|
||||
self.req_states = LRUCache(10000)
|
||||
handler = partial(InjectingHandler, self.req_states, logger)
|
||||
self.server = ThreadingHTTPServer((host, port), handler)
|
||||
self.server_thread = None
|
||||
self.server.request_queue_size = 1000
|
||||
self.server.timeout = 10000
|
||||
self.server.socket.settimeout(10000)
|
||||
self.server.socket.listen(1000)
|
||||
self.is_running = False
|
||||
os.environ['MOCK_S3_SERVER_PORT'] = f'{port}'
|
||||
os.environ['MOCK_S3_SERVER_HOST'] = host
|
||||
|
||||
async def start(self):
|
||||
if not self.is_running:
|
||||
print(f'Starting S3 mock server on {self.server.server_address}')
|
||||
loop = asyncio.get_running_loop()
|
||||
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
|
||||
self.is_running = True
|
||||
|
||||
async def stop(self):
|
||||
if self.is_running:
|
||||
print('Stopping S3 mock server')
|
||||
self.server.shutdown()
|
||||
await self.server_thread
|
||||
self.is_running = False
|
||||
|
||||
async def run(self):
|
||||
try:
|
||||
await self.start()
|
||||
while self.is_running:
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
print(f"Server error: {e}")
|
||||
await self.stop()
|
||||
24
test/pylib/start_s3_mock.py
Executable file
24
test/pylib/start_s3_mock.py
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/python3
|
||||
import asyncio
|
||||
import sys
|
||||
import signal
|
||||
import argparse
|
||||
from s3_server_mock import MockS3Server
|
||||
|
||||
|
||||
async def run():
|
||||
parser = argparse.ArgumentParser(description="Start S3 mock server")
|
||||
parser.add_argument('--host', default='127.0.0.1')
|
||||
parser.add_argument('--port', type=int, default=2012)
|
||||
args = parser.parse_args()
|
||||
server = MockS3Server(args.host, args.port)
|
||||
|
||||
print('Starting S3 mock server')
|
||||
await server.start()
|
||||
signal.sigwait({signal.SIGINT, signal.SIGTERM})
|
||||
print('Stopping S3 mock server')
|
||||
await server.stop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user