test: Switch s3_test to use proxy
Switch `s3_test` to use the S3 proxy which is used to randomly inject retryable S3 errors to test the "retry" part of the S3 client. Fix `put_object` to make it retryable
This commit is contained in:
7
test.py
7
test.py
@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
|
||||
from test.pylib.artifact_registry import ArtifactRegistry
|
||||
from test.pylib.host_registry import HostRegistry
|
||||
from test.pylib.pool import Pool
|
||||
from test.pylib.s3_proxy import S3ProxyServer
|
||||
from test.pylib.s3_server_mock import MockS3Server
|
||||
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
|
||||
from test.pylib.util import LogPrefixAdapter
|
||||
@@ -1553,6 +1554,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
|
||||
await mock_s3_server.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
|
||||
|
||||
minio_uri = "http://" + os.environ[ms.ENV_ADDRESS] + ":" + os.environ[ms.ENV_PORT]
|
||||
proxy_s3_server = S3ProxyServer(await hosts.lease_host(), 9002, minio_uri, 3,
|
||||
LogPrefixAdapter(logging.getLogger('s3_proxy'), {'prefix': 's3_proxy'}))
|
||||
await proxy_s3_server.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
|
||||
|
||||
console.print_start_blurb()
|
||||
max_failures = options.max_failures
|
||||
failed = 0
|
||||
|
||||
@@ -35,7 +35,7 @@
|
||||
// permissive enough policy and then run the test with env set respectively
|
||||
// E.g. like this
|
||||
//
|
||||
// export S3_SERVER_ADDRESS_FOR_TEST=s3.us-east-2.amazonaws.com
|
||||
// export PROXY_S3_SERVER_HOST=s3.us-east-2.amazonaws.com
|
||||
// export S3_SERVER_PORT_FOR_TEST=443
|
||||
// export S3_BUCKET_FOR_TEST=xemul
|
||||
// export AWS_ACCESS_KEY_ID=${aws_access_key_id}
|
||||
@@ -45,8 +45,8 @@
|
||||
|
||||
s3::endpoint_config_ptr make_minio_config() {
|
||||
s3::endpoint_config cfg = {
|
||||
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
|
||||
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
|
||||
.port = std::stoul(tests::getenv_safe("PROXY_S3_SERVER_PORT")),
|
||||
.use_https = false,
|
||||
.aws = {{
|
||||
.access_key_id = tests::getenv_safe("AWS_ACCESS_KEY_ID"),
|
||||
.secret_access_key = tests::getenv_safe("AWS_SECRET_ACCESS_KEY"),
|
||||
@@ -68,7 +68,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -114,7 +114,7 @@ void do_test_client_multipart_upload(bool with_copy_upload) {
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object (with copy = {})\n", with_copy_upload);
|
||||
@@ -175,7 +175,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
|
||||
testlog.info("Make client");
|
||||
semaphore mem(0);
|
||||
mem.broken(); // so that any attempt to use it throws
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object");
|
||||
@@ -231,7 +231,7 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
|
||||
|
||||
// 2. upload the file to s3
|
||||
semaphore mem{memory_size};
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"),
|
||||
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"),
|
||||
make_minio_config(),
|
||||
mem);
|
||||
co_await client->upload_file(file_path, object_name);
|
||||
@@ -289,7 +289,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -331,7 +331,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -354,7 +354,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}",
|
||||
tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*client);
|
||||
auto data = sstring("1234567890ABCDEF").release();
|
||||
client->put_object(name, std::move(data)).get();
|
||||
@@ -396,7 +396,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
|
||||
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
|
||||
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*client);
|
||||
|
||||
// Put extra object to check list-by-prefix filters it out
|
||||
@@ -420,7 +420,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
|
||||
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
|
||||
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_minio_config(), mem);
|
||||
auto close_client = deferred_close(*client);
|
||||
|
||||
populate_bucket(client, bucket, prefix, 8);
|
||||
|
||||
253
test/pylib/s3_proxy.py
Normal file
253
test/pylib/s3_proxy.py
Normal file
@@ -0,0 +1,253 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
# S3 proxy server to inject retryable errors for fuzzy testing.
|
||||
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import asyncio
|
||||
import requests
|
||||
import threading
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import uuid
|
||||
from functools import partial
|
||||
from collections import OrderedDict
|
||||
from requests import Response
|
||||
from typing_extensions import Optional
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
|
||||
class Policy:
|
||||
def __init__(self, max_retries: int):
|
||||
self.should_forward: bool = True
|
||||
self.should_fail: bool = False
|
||||
self.error_count: int = 0
|
||||
self.max_errors: int = random.choice(list(range(1, max_retries)))
|
||||
|
||||
|
||||
class LRUCache:
|
||||
lock = threading.Lock()
|
||||
|
||||
def __init__(self, capacity: int):
|
||||
self.cache = OrderedDict()
|
||||
self.capacity = capacity
|
||||
|
||||
def get(self, key: str) -> Optional[Policy]:
|
||||
with self.lock:
|
||||
if key not in self.cache:
|
||||
return None
|
||||
self.cache.move_to_end(key)
|
||||
return self.cache[key]
|
||||
|
||||
def put(self, key: str, value: Policy) -> None:
|
||||
with self.lock:
|
||||
if key in self.cache:
|
||||
self.cache.move_to_end(key)
|
||||
self.cache[key] = value
|
||||
if len(self.cache) > self.capacity:
|
||||
self.cache.popitem(last=False)
|
||||
|
||||
|
||||
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
|
||||
def true_or_false():
|
||||
return random.choice([True, False])
|
||||
|
||||
|
||||
class InjectingHandler(BaseHTTPRequestHandler):
|
||||
retryable_codes = list((408, 419, 429, 440)) + list(range(500, 599))
|
||||
error_names = list(("InternalFailureException",
|
||||
"InternalFailure",
|
||||
"InternalServerError",
|
||||
"InternalError",
|
||||
"RequestExpiredException",
|
||||
"RequestExpired",
|
||||
"ServiceUnavailableException",
|
||||
"ServiceUnavailableError",
|
||||
"ServiceUnavailable",
|
||||
"RequestThrottledException",
|
||||
"RequestThrottled",
|
||||
"ThrottlingException",
|
||||
"ThrottledException",
|
||||
"Throttling",
|
||||
"SlowDownException",
|
||||
"SlowDown",
|
||||
"RequestTimeTooSkewedException",
|
||||
"RequestTimeTooSkewed",
|
||||
"RequestTimeoutException",
|
||||
"RequestTimeout"))
|
||||
|
||||
def __init__(self, policies, logger, minio_uri, max_retries, *args, **kwargs):
|
||||
self.minio_uri = minio_uri
|
||||
self.policies = policies
|
||||
self.logger = logger
|
||||
self.max_retries = max_retries
|
||||
super().__init__(*args, **kwargs)
|
||||
self.close_connection = False
|
||||
|
||||
def log_error(self, format, *args):
|
||||
if self.logger:
|
||||
self.logger.info("%s - - [%s] %s\n" %
|
||||
(self.client_address[0],
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
else:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Just don't be too verbose
|
||||
if not self.logger:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def parsed_qs(self):
|
||||
parsed_url = urlparse(self.path)
|
||||
query_components = parse_qs(parsed_url.query)
|
||||
for key in parsed_url.query.split('&'):
|
||||
if '=' not in key:
|
||||
query_components[key] = ['']
|
||||
return query_components
|
||||
|
||||
def get_policy(self):
|
||||
policy = self.policies.get(self.path)
|
||||
if policy is None:
|
||||
policy = Policy(self.max_retries)
|
||||
policy.should_forward = true_or_false()
|
||||
if policy.should_forward:
|
||||
policy.should_fail = true_or_false()
|
||||
else:
|
||||
policy.should_fail = True
|
||||
self.policies.put(self.path, policy)
|
||||
|
||||
# Unfortunately MPU completion retry on already completed upload would introduce flakiness to unit tests, for example `s3_test`
|
||||
if self.command == "POST" and "uploadId" in self.parsed_qs():
|
||||
policy.should_forward = not policy.should_fail
|
||||
|
||||
return policy
|
||||
|
||||
def get_retryable_http_codes(self):
|
||||
return random.choice(self.retryable_codes), random.choice(self.error_names)
|
||||
|
||||
def respond_with_error(self):
|
||||
code, error_name = self.get_retryable_http_codes()
|
||||
self.send_response(code)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
req_uuid = str(uuid.uuid4())
|
||||
response = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<Error>
|
||||
<Code>{error_name}</Code>
|
||||
<Message>Minio proxy injected error. Client should retry.</Message>
|
||||
<RequestId>{req_uuid}</RequestId>
|
||||
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
|
||||
</Error>""".encode('utf-8')
|
||||
self.send_header('Content-Length', str(len(response)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
|
||||
def process_request(self):
|
||||
try:
|
||||
policy = self.get_policy()
|
||||
|
||||
if policy.error_count >= policy.max_errors:
|
||||
policy.should_fail = False
|
||||
policy.should_forward = True
|
||||
|
||||
response = Response()
|
||||
body = None
|
||||
|
||||
content_length = self.headers['Content-Length']
|
||||
if content_length:
|
||||
body = self.rfile.read(int(content_length))
|
||||
|
||||
if policy.should_forward:
|
||||
target_url = self.minio_uri + self.path
|
||||
headers = {key: value for key, value in self.headers.items()}
|
||||
response = requests.request(self.command, target_url, headers=headers, data=body)
|
||||
|
||||
if policy.should_fail:
|
||||
policy.error_count += 1
|
||||
self.respond_with_error()
|
||||
else:
|
||||
self.send_response(response.status_code)
|
||||
for key, value in response.headers.items():
|
||||
if key.upper() != 'CONTENT-LENGTH':
|
||||
self.send_header(key, value)
|
||||
|
||||
if self.command == 'HEAD':
|
||||
self.send_header("Content-Length", response.headers['Content-Length'])
|
||||
else:
|
||||
self.send_header("Content-Length", str(len(response.content)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response.content)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def do_GET(self):
|
||||
self.process_request()
|
||||
|
||||
def do_POST(self):
|
||||
self.process_request()
|
||||
|
||||
def do_PUT(self):
|
||||
self.process_request()
|
||||
|
||||
def do_DELETE(self):
|
||||
self.process_request()
|
||||
|
||||
def do_HEAD(self):
|
||||
self.process_request()
|
||||
|
||||
|
||||
# Proxy server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
|
||||
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py`. In
|
||||
# addition, it is possible just to start this server using another script - `start_s3_proxy.py` to run it locally to
|
||||
# provide proxy between tests and minio
|
||||
class S3ProxyServer:
|
||||
def __init__(self, host: str, port: int, minio_uri: str, max_retries: int, logger=None):
|
||||
self.req_states = LRUCache(10000)
|
||||
handler = partial(InjectingHandler, self.req_states, logger, minio_uri, max_retries)
|
||||
self.server = ThreadingHTTPServer((host, port), handler)
|
||||
self.server_thread = None
|
||||
self.server.request_queue_size = 1000
|
||||
self.server.timeout = 10000
|
||||
self.server.socket.settimeout(10000)
|
||||
self.server.socket.listen(1000)
|
||||
self.is_running = False
|
||||
os.environ['PROXY_S3_SERVER_PORT'] = f'{port}'
|
||||
os.environ['PROXY_S3_SERVER_HOST'] = host
|
||||
|
||||
async def start(self):
|
||||
if not self.is_running:
|
||||
print(f'Starting S3 proxy server on {self.server.server_address}')
|
||||
loop = asyncio.get_running_loop()
|
||||
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
|
||||
self.is_running = True
|
||||
|
||||
async def stop(self):
|
||||
if self.is_running:
|
||||
print('Stopping S3 proxy server')
|
||||
self.server.shutdown()
|
||||
await self.server_thread
|
||||
self.is_running = False
|
||||
|
||||
async def run(self):
|
||||
try:
|
||||
await self.start()
|
||||
while self.is_running:
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
print(f"Server error: {e}")
|
||||
await self.stop()
|
||||
25
test/pylib/start_s3_proxy.py
Executable file
25
test/pylib/start_s3_proxy.py
Executable file
@@ -0,0 +1,25 @@
|
||||
#!/usr/bin/python3
|
||||
import asyncio
|
||||
import signal
|
||||
import argparse
|
||||
from s3_proxy import S3ProxyServer
|
||||
|
||||
|
||||
async def run():
|
||||
parser = argparse.ArgumentParser(description="Start S3 proxy server")
|
||||
parser.add_argument('--host', default='127.0.0.1')
|
||||
parser.add_argument('--port', type=int, default=9002)
|
||||
parser.add_argument('--minio-uri', default="http://127.0.0.1:9000")
|
||||
parser.add_argument('--max-retries', type=int, default=5)
|
||||
args = parser.parse_args()
|
||||
server = S3ProxyServer(args.host, args.port, args.minio_uri, args.max_retries)
|
||||
|
||||
print('Starting S3 proxy server')
|
||||
await server.start()
|
||||
signal.sigwait({signal.SIGINT, signal.SIGTERM})
|
||||
print('Stopping S3 proxy server')
|
||||
await server.stop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user