test: Switch s3_test to use proxy

Switch `s3_test` to use the S3 proxy which is used to randomly inject retryable S3 errors to test the "retry" part of the S3 client.
Fix `put_object` to make it retryable
This commit is contained in:
Ernest Zaslavsky
2024-10-10 17:52:32 +03:00
parent b1e36c868c
commit 8919e0abab
4 changed files with 297 additions and 12 deletions

View File

@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.host_registry import HostRegistry
from test.pylib.pool import Pool
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
from test.pylib.util import LogPrefixAdapter
@@ -1553,6 +1554,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
await mock_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
minio_uri = "http://" + os.environ[ms.ENV_ADDRESS] + ":" + os.environ[ms.ENV_PORT]
proxy_s3_server = S3ProxyServer(await hosts.lease_host(), 9002, minio_uri, 3,
LogPrefixAdapter(logging.getLogger('s3_proxy'), {'prefix': 's3_proxy'}))
await proxy_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
console.print_start_blurb()
max_failures = options.max_failures
failed = 0

View File

@@ -35,7 +35,7 @@
// permissive enough policy and then run the test with env set respectively
// E.g. like this
//
// export S3_SERVER_ADDRESS_FOR_TEST=s3.us-east-2.amazonaws.com
// export PROXY_S3_SERVER_HOST=s3.us-east-2.amazonaws.com
// export S3_SERVER_PORT_FOR_TEST=443
// export S3_BUCKET_FOR_TEST=xemul
// export AWS_ACCESS_KEY_ID=${aws_access_key_id}
@@ -45,8 +45,8 @@
s3::endpoint_config_ptr make_minio_config() {
s3::endpoint_config cfg = {
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
.port = std::stoul(tests::getenv_safe("PROXY_S3_SERVER_PORT")),
.use_https = false,
.aws = {{
.access_key_id = tests::getenv_safe("AWS_ACCESS_KEY_ID"),
.secret_access_key = tests::getenv_safe("AWS_SECRET_ACCESS_KEY"),
@@ -68,7 +68,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -114,7 +114,7 @@ void do_test_client_multipart_upload(bool with_copy_upload) {
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object (with copy = {})\n", with_copy_upload);
@@ -175,7 +175,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
testlog.info("Make client");
semaphore mem(0);
mem.broken(); // so that any attempt to use it throws
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object");
@@ -231,7 +231,7 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
// 2. upload the file to s3
semaphore mem{memory_size};
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"),
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"),
make_minio_config(),
mem);
co_await client->upload_file(file_path, object_name);
@@ -289,7 +289,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -331,7 +331,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -354,7 +354,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
const sstring name(fmt::format("/{}/testobject-{}",
tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*client);
auto data = sstring("1234567890ABCDEF").release();
client->put_object(name, std::move(data)).get();
@@ -396,7 +396,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*client);
// Put extra object to check list-by-prefix filters it out
@@ -420,7 +420,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
auto close_client = deferred_close(*client);
populate_bucket(client, bucket, prefix, 8);

253
test/pylib/s3_proxy.py Normal file
View File

@@ -0,0 +1,253 @@
#!/usr/bin/python3
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# S3 proxy server to inject retryable errors for fuzzy testing.
import os
import random
import sys
import asyncio
import requests
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import uuid
from functools import partial
from collections import OrderedDict
from requests import Response
from typing_extensions import Optional
sys.path.insert(0, os.path.dirname(__file__))
class Policy:
def __init__(self, max_retries: int):
self.should_forward: bool = True
self.should_fail: bool = False
self.error_count: int = 0
self.max_errors: int = random.choice(list(range(1, max_retries)))
class LRUCache:
lock = threading.Lock()
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: str) -> Optional[Policy]:
with self.lock:
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Policy) -> None:
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
def true_or_false():
return random.choice([True, False])
class InjectingHandler(BaseHTTPRequestHandler):
retryable_codes = list((408, 419, 429, 440)) + list(range(500, 599))
error_names = list(("InternalFailureException",
"InternalFailure",
"InternalServerError",
"InternalError",
"RequestExpiredException",
"RequestExpired",
"ServiceUnavailableException",
"ServiceUnavailableError",
"ServiceUnavailable",
"RequestThrottledException",
"RequestThrottled",
"ThrottlingException",
"ThrottledException",
"Throttling",
"SlowDownException",
"SlowDown",
"RequestTimeTooSkewedException",
"RequestTimeTooSkewed",
"RequestTimeoutException",
"RequestTimeout"))
def __init__(self, policies, logger, minio_uri, max_retries, *args, **kwargs):
self.minio_uri = minio_uri
self.policies = policies
self.logger = logger
self.max_retries = max_retries
super().__init__(*args, **kwargs)
self.close_connection = False
def log_error(self, format, *args):
if self.logger:
self.logger.info("%s - - [%s] %s\n" %
(self.client_address[0],
self.log_date_time_string(),
format % args))
else:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def log_message(self, format, *args):
# Just don't be too verbose
if not self.logger:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def parsed_qs(self):
parsed_url = urlparse(self.path)
query_components = parse_qs(parsed_url.query)
for key in parsed_url.query.split('&'):
if '=' not in key:
query_components[key] = ['']
return query_components
def get_policy(self):
policy = self.policies.get(self.path)
if policy is None:
policy = Policy(self.max_retries)
policy.should_forward = true_or_false()
if policy.should_forward:
policy.should_fail = true_or_false()
else:
policy.should_fail = True
self.policies.put(self.path, policy)
# Unfortunately MPU completion retry on already completed upload would introduce flakiness to unit tests, for example `s3_test`
if self.command == "POST" and "uploadId" in self.parsed_qs():
policy.should_forward = not policy.should_fail
return policy
def get_retryable_http_codes(self):
return random.choice(self.retryable_codes), random.choice(self.error_names)
def respond_with_error(self):
code, error_name = self.get_retryable_http_codes()
self.send_response(code)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Connection', 'keep-alive')
req_uuid = str(uuid.uuid4())
response = f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>{error_name}</Code>
<Message>Minio proxy injected error. Client should retry.</Message>
<RequestId>{req_uuid}</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>""".encode('utf-8')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
def process_request(self):
try:
policy = self.get_policy()
if policy.error_count >= policy.max_errors:
policy.should_fail = False
policy.should_forward = True
response = Response()
body = None
content_length = self.headers['Content-Length']
if content_length:
body = self.rfile.read(int(content_length))
if policy.should_forward:
target_url = self.minio_uri + self.path
headers = {key: value for key, value in self.headers.items()}
response = requests.request(self.command, target_url, headers=headers, data=body)
if policy.should_fail:
policy.error_count += 1
self.respond_with_error()
else:
self.send_response(response.status_code)
for key, value in response.headers.items():
if key.upper() != 'CONTENT-LENGTH':
self.send_header(key, value)
if self.command == 'HEAD':
self.send_header("Content-Length", response.headers['Content-Length'])
else:
self.send_header("Content-Length", str(len(response.content)))
self.end_headers()
self.wfile.write(response.content)
except Exception as e:
print(e)
def do_GET(self):
self.process_request()
def do_POST(self):
self.process_request()
def do_PUT(self):
self.process_request()
def do_DELETE(self):
self.process_request()
def do_HEAD(self):
self.process_request()
# Proxy server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py`. In
# addition, it is possible just to start this server using another script - `start_s3_proxy.py` to run it locally to
# provide proxy between tests and minio
class S3ProxyServer:
def __init__(self, host: str, port: int, minio_uri: str, max_retries: int, logger=None):
self.req_states = LRUCache(10000)
handler = partial(InjectingHandler, self.req_states, logger, minio_uri, max_retries)
self.server = ThreadingHTTPServer((host, port), handler)
self.server_thread = None
self.server.request_queue_size = 1000
self.server.timeout = 10000
self.server.socket.settimeout(10000)
self.server.socket.listen(1000)
self.is_running = False
os.environ['PROXY_S3_SERVER_PORT'] = f'{port}'
os.environ['PROXY_S3_SERVER_HOST'] = host
async def start(self):
if not self.is_running:
print(f'Starting S3 proxy server on {self.server.server_address}')
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
async def stop(self):
if self.is_running:
print('Stopping S3 proxy server')
self.server.shutdown()
await self.server_thread
self.is_running = False
async def run(self):
try:
await self.start()
while self.is_running:
await asyncio.sleep(1)
except Exception as e:
print(f"Server error: {e}")
await self.stop()

25
test/pylib/start_s3_proxy.py Executable file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/python3
import asyncio
import signal
import argparse
from s3_proxy import S3ProxyServer
async def run():
parser = argparse.ArgumentParser(description="Start S3 proxy server")
parser.add_argument('--host', default='127.0.0.1')
parser.add_argument('--port', type=int, default=9002)
parser.add_argument('--minio-uri', default="http://127.0.0.1:9000")
parser.add_argument('--max-retries', type=int, default=5)
args = parser.parse_args()
server = S3ProxyServer(args.host, args.port, args.minio_uri, args.max_retries)
print('Starting S3 proxy server')
await server.start()
signal.sigwait({signal.SIGINT, signal.SIGTERM})
print('Stopping S3 proxy server')
await server.stop()
if __name__ == '__main__':
asyncio.run(run())