Files
scylladb/test/pylib/s3_proxy.py
Yaron Kaikov d671ca9f53 fix: remove return from finally block in s3_proxy.py
during any jenkins job that trigger `test.py` we get:
```
/jenkins/workspace/releng-testing/byo/byo_build_tests_dtest/scylla/test/pylib/s3_proxy.py:152: SyntaxWarning: 'return' in a 'finally' block
```

The 'return' statement in the finally block was causing a SyntaxWarning.
Moving the return outside the finally block ensures proper exception
handling while maintaining the intended behavior.

Closes scylladb/scylladb#27823
2025-12-24 06:48:03 +02:00

281 lines
10 KiB
Python

#!/usr/bin/python3
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
# S3 proxy server to inject retryable errors for fuzzy testing.
import logging
import os
import random
import sys
import asyncio
import requests
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import uuid
from functools import partial
from collections import OrderedDict
from requests import Response
from typing_extensions import Optional
sys.path.insert(0, os.path.dirname(__file__))
class Policy:
def __init__(self, max_retries: int):
self.should_forward: bool = True
self.should_fail: bool = False
self.server_should_fail: bool = False
self.error_count: int = 0
self.max_errors: int = random.choice(list(range(1, max_retries)))
class LRUCache:
lock = threading.Lock()
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: str) -> Optional[Policy]:
with self.lock:
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Policy) -> None:
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
def remove(self, key: str):
with self.lock:
if key in self.cache:
del self.cache[key]
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
def true_or_false():
return random.choice([True, False])
class InjectingHandler(BaseHTTPRequestHandler):
retryable_codes = list((408, 419, 429, 440)) + list(range(500, 599))
error_names = list(("InternalFailureException",
"InternalFailure",
"InternalServerError",
"InternalError",
"RequestExpiredException",
"RequestExpired",
"ServiceUnavailableException",
"ServiceUnavailableError",
"ServiceUnavailable",
"RequestThrottledException",
"RequestThrottled",
"ThrottlingException",
"ThrottledException",
"Throttling",
"SlowDownException",
"SlowDown",
"RequestTimeTooSkewedException",
"RequestTimeTooSkewed",
"RequestTimeoutException",
"RequestTimeout",
# Not really retryable, but we can use it to test the client behavior when the token is expired
"ExpiredTokenException",
))
def __init__(self, policies, logger, minio_uri, max_retries, *args, **kwargs):
self.minio_uri = minio_uri
self.policies = policies
self.logger = logger
self.max_retries = max_retries
super().__init__(*args, **kwargs)
self.close_connection = False
def log_message(self, format, *args):
if not self.logger.isEnabledFor(logging.INFO):
return
self.logger.info("%s - - [%s] %s",
self.client_address[0],
self.log_date_time_string(),
format % args)
def parsed_qs(self):
parsed_url = urlparse(self.path)
query_components = parse_qs(parsed_url.query)
for key in parsed_url.query.split('&'):
if '=' not in key:
query_components[key] = ['']
return query_components
def get_policy(self):
policy = self.policies.get(self.path)
if policy is None:
policy = Policy(self.max_retries)
policy.should_forward = true_or_false()
if policy.should_forward:
policy.should_fail = true_or_false()
else:
policy.should_fail = True
if policy.should_fail:
policy.server_should_fail = true_or_false()
self.policies.put(self.path, policy)
# Unfortunately MPU completion retry on already completed upload would introduce flakiness to unit tests, for example `s3_test`
if self.command == "POST" and "uploadId" in self.parsed_qs():
policy.should_forward = not policy.should_fail
return policy
def get_retryable_http_codes(self):
return random.choice(self.retryable_codes), random.choice(self.error_names)
def respond_with_error(self, reset_connection: bool):
if reset_connection:
try:
# Forcefully close the connection to simulate a connection reset
self.request.shutdown_request()
except OSError:
pass
return
code, error_name = self.get_retryable_http_codes()
self.send_response(code)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Connection', 'keep-alive')
req_uuid = str(uuid.uuid4())
response = f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>{error_name}</Code>
<Message>Minio proxy injected error. {"The provided token has expired." if error_name == "ExpiredTokenException" else "Client should retry."}</Message>
<RequestId>{req_uuid}</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>""".encode('utf-8')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
def process_request(self):
try:
policy = self.get_policy()
if policy.error_count >= policy.max_errors:
policy.should_fail = False
policy.server_should_fail = False
policy.should_forward = True
response = Response()
body = None
content_length = self.headers['Content-Length']
if content_length:
body = self.rfile.read(int(content_length))
if policy.should_forward:
target_url = self.minio_uri + self.path
headers = {key: value for key, value in self.headers.items()}
response = requests.request(self.command, target_url, headers=headers, data=body)
if policy.should_fail:
policy.error_count += 1
self.respond_with_error(reset_connection=policy.server_should_fail)
else:
# Once the request is successfully processed, we remove the policy from the cache to make following request to the resource being illegible to fail
self.policies.remove(self.path)
self.send_response(response.status_code)
for key, value in response.headers.items():
if key.upper() != 'CONTENT-LENGTH':
self.send_header(key, value)
if self.command == 'HEAD':
self.send_header("Content-Length", response.headers['Content-Length'])
else:
self.send_header("Content-Length", str(len(response.content)))
self.end_headers()
self.wfile.write(response.content)
except Exception as e:
self.logger.error("%s", e)
def do_GET(self):
self.process_request()
def do_POST(self):
self.process_request()
def do_PUT(self):
self.process_request()
def do_DELETE(self):
self.process_request()
def do_HEAD(self):
self.process_request()
# Proxy server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py`. In
# addition, it is possible just to start this server using another script - `start_s3_proxy.py` to run it locally to
# provide proxy between tests and minio
class S3ProxyServer:
def __init__(self, host: str, port: int, minio_uri: str, max_retries: int, seed: int, logger):
self.logger = logger
self.logger.info('Setting minio proxy random seed to %s', seed)
random.seed(seed)
self.req_states = LRUCache(10000)
handler = partial(InjectingHandler, self.req_states, logger, minio_uri, max_retries)
self.server = ThreadingHTTPServer((host, port), handler)
self.server_thread = None
self.server.request_queue_size = 1000
self.server.timeout = 10000
self.server.socket.settimeout(10000)
self.server.socket.listen(1000)
self.is_running = False
self.envs = {'PROXY_S3_SERVER_PORT': f'{port}', 'PROXY_S3_SERVER_HOST': f'{host}'}
def _set_environ(self):
for key, value in self.envs.items():
os.environ[key] = value
def _unset_environ(self):
for key in self.envs.keys():
del os.environ[key]
def get_envs_settings(self):
return self.envs
async def start(self):
if not self.is_running:
self.logger.info('Starting S3 proxy server on %s', self.server.server_address)
self._set_environ()
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
async def stop(self):
if self.is_running:
self.logger.info('Stopping S3 proxy server')
self._unset_environ()
self.server.shutdown()
await self.server_thread
self.is_running = False
async def run(self):
try:
await self.start()
while self.is_running:
await asyncio.sleep(1)
except Exception as e:
self.logger.error("Server error: %s", e)
await self.stop()