Disable O_DSYNC in minio to avoid unnecessary slowdown in S3 tests. Closes scylladb/scylladb#28579
311 lines
11 KiB
Python
Executable File
311 lines
11 KiB
Python
Executable File
#!/usr/bin/python3
|
|
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
"""Minio server for testing.
|
|
Provides helpers to setup and manage minio server for testing.
|
|
"""
|
|
import os
|
|
import argparse
|
|
import asyncio
|
|
from asyncio.subprocess import Process
|
|
from typing import Generator, Optional
|
|
import json
|
|
import logging
|
|
import pathlib
|
|
import random
|
|
import subprocess
|
|
import shutil
|
|
import time
|
|
import tempfile
|
|
import socket
|
|
import string
|
|
import yaml
|
|
from io import BufferedWriter
|
|
|
|
class MinioServer:
|
|
ENV_ADDRESS = 'S3_SERVER_ADDRESS_FOR_TEST'
|
|
ENV_PORT = 'S3_SERVER_PORT_FOR_TEST'
|
|
ENV_BUCKET = 'S3_BUCKET_FOR_TEST'
|
|
ENV_ACCESS_KEY = 'AWS_ACCESS_KEY_ID'
|
|
ENV_SECRET_KEY = 'AWS_SECRET_ACCESS_KEY'
|
|
DEFAULT_REGION = 'local'
|
|
|
|
log_file: BufferedWriter
|
|
|
|
def __init__(self, tempdir_base, address, logger):
|
|
self.srv_exe = shutil.which('minio')
|
|
self.address = address
|
|
self.port = None
|
|
tempdir = tempfile.mkdtemp(dir=tempdir_base, prefix="minio-")
|
|
self.tempdir = pathlib.Path(tempdir)
|
|
self.rootdir = self.tempdir / 'minio_root'
|
|
self.mcdir = self.tempdir / 'mc'
|
|
self.logger = logger
|
|
self.cmd: Optional[Process] = None
|
|
self.default_user = 'minioadmin'
|
|
self.default_pass = 'minioadmin'
|
|
self.bucket_name = 'testbucket'
|
|
self.access_key = os.environ.get(self.ENV_ACCESS_KEY, ''.join(random.choice(string.hexdigits) for i in range(16)))
|
|
self.secret_key = os.environ.get(self.ENV_SECRET_KEY, ''.join(random.choice(string.hexdigits) for i in range(32)))
|
|
self.log_filename = (self.tempdir / 'minio').with_suffix(".log")
|
|
self.old_env = dict()
|
|
self.default_config = None
|
|
|
|
def __repr__(self):
|
|
return f"[minio] {self.address}:{self.port}/{self.bucket_name}"
|
|
|
|
def check_server(self, port):
|
|
s = socket.socket()
|
|
try:
|
|
s.connect((self.address, port))
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
finally:
|
|
s.close()
|
|
|
|
def log_to_file(self, str):
|
|
self.log_file.write(str.encode())
|
|
self.log_file.write('\n'.encode())
|
|
self.log_file.flush()
|
|
|
|
async def mc(self, *args, ignore_failure=False, timeout=0):
|
|
retry_until: float = time.time() + timeout
|
|
retry_step: float = 0.1
|
|
cmd = ['mc',
|
|
'--debug',
|
|
'--config-dir', self.mcdir]
|
|
cmd.extend(args)
|
|
|
|
while True:
|
|
try:
|
|
subprocess.check_call(cmd, stdout=self.log_file, stderr=self.log_file)
|
|
except subprocess.CalledProcessError:
|
|
if ignore_failure:
|
|
self.log_to_file('ignoring')
|
|
break
|
|
command = ' '.join(args)
|
|
self.log_to_file(f'failed to run "mc {command}"')
|
|
if time.time() >= retry_until:
|
|
raise
|
|
self.log_to_file(f'retry after {retry_step} seconds')
|
|
await asyncio.sleep(retry_step)
|
|
else:
|
|
break
|
|
|
|
def _bucket_policy(self):
|
|
# the default anonymous public policy does not allow us to access
|
|
# the taggings, so let's add the tagging actions manually.
|
|
#
|
|
# the original access policy is dumped using:
|
|
# mc anonymous set public local/testbucket
|
|
# mc anonymous get-json local/testbucket
|
|
#
|
|
# we added following actions to the policy for accessing objects in the
|
|
# bucket created for testing:
|
|
# - GetObjectTagging
|
|
# - PutObjectTagging
|
|
# - DeleteObjectTagging
|
|
#
|
|
# the full list of actions can be found at
|
|
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations.html
|
|
bucket_actions = [
|
|
"s3:ListBucket",
|
|
"s3:ListBucketMultipartUploads",
|
|
"s3:GetBucketLocation",
|
|
]
|
|
object_actions = [
|
|
"s3:AbortMultipartUpload",
|
|
"s3:DeleteObject",
|
|
"s3:GetObject",
|
|
"s3:ListMultipartUploadParts",
|
|
"s3:PutObject",
|
|
"s3:GetObjectTagging",
|
|
"s3:PutObjectTagging",
|
|
"s3:DeleteObjectTagging"
|
|
]
|
|
statement = [
|
|
{
|
|
'Action': bucket_actions,
|
|
'Effect': 'Allow',
|
|
'Principal': {'AWS': ['*']},
|
|
'Resource': [ f'arn:aws:s3:::{self.bucket_name}' ]
|
|
},
|
|
{
|
|
'Action': object_actions,
|
|
'Effect': 'Allow',
|
|
'Principal': {'AWS': ['*']},
|
|
'Resource': [ f'arn:aws:s3:::{self.bucket_name}/*' ]
|
|
}
|
|
]
|
|
return {'Statement': statement,
|
|
'Version': '2012-10-17'}
|
|
|
|
def _get_local_ports(self, num_ports: int) -> Generator[int, None, None]:
|
|
with open('/proc/sys/net/ipv4/ip_local_port_range', encoding='ascii') as port_range:
|
|
min_port, max_port = map(int, port_range.read().split())
|
|
for _ in range(num_ports):
|
|
yield random.randint(min_port, max_port)
|
|
|
|
@staticmethod
|
|
def create_conf(url: str, region: str):
|
|
endpoint = {'name': url,
|
|
# don't put credentials here. We're exporing env vars, which should
|
|
# be picked up properly by scylla.
|
|
# https://github.com/scylladb/scylla-pkg/issues/3845
|
|
#'aws_access_key_id': acc_key,
|
|
#'aws_secret_access_key': secret_key,
|
|
'aws_region': region,
|
|
'iam_role_arn': '',
|
|
'type': 's3',
|
|
}
|
|
return [endpoint]
|
|
|
|
async def _run_server(self, port):
|
|
self.logger.info(f'Starting minio server at {self.address}:{port}')
|
|
cmd = await asyncio.create_subprocess_exec(
|
|
self.srv_exe,
|
|
*[ 'server', '--address', f'{self.address}:{port}', self.rootdir ],
|
|
preexec_fn=os.setsid,
|
|
stderr=self.log_file,
|
|
stdout=self.log_file,
|
|
env={
|
|
**os.environ,
|
|
'MINIO_BROWSER': 'off',
|
|
'MINIO_FS_OSYNC': 'off',
|
|
},
|
|
)
|
|
timeout = time.time() + 30
|
|
while time.time() < timeout:
|
|
if cmd.returncode is not None:
|
|
# the minio server exits before it starts to server. maybe the
|
|
# port is used by another server?
|
|
self.logger.info('minio exited with %s', cmd.returncode)
|
|
raise RuntimeError("Failed to start minio server")
|
|
if self.check_server(port):
|
|
self.logger.info('minio is up and running')
|
|
break
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
return cmd
|
|
|
|
def _set_environ(self):
|
|
self.old_env = dict(os.environ)
|
|
os.environ[self.ENV_ADDRESS] = f'{self.address}'
|
|
os.environ[self.ENV_PORT] = f'{self.port}'
|
|
os.environ[self.ENV_BUCKET] = f'{self.bucket_name}'
|
|
os.environ[self.ENV_ACCESS_KEY] = f'{self.access_key}'
|
|
os.environ[self.ENV_SECRET_KEY] = f'{self.secret_key}'
|
|
|
|
def _get_environs(self):
|
|
return [self.ENV_ADDRESS,
|
|
self.ENV_PORT,
|
|
self.ENV_BUCKET,
|
|
self.ENV_ACCESS_KEY,
|
|
self.ENV_SECRET_KEY]
|
|
|
|
def get_envs_settings(self):
|
|
return {key: os.environ[key] for key in self._get_environs()}
|
|
|
|
def _unset_environ(self):
|
|
for env in self._get_environs():
|
|
if value := self.old_env.get(env):
|
|
os.environ[env] = value
|
|
else:
|
|
del os.environ[env]
|
|
|
|
def print_environ(self):
|
|
msgs = []
|
|
for key in self._get_environs():
|
|
value = os.environ[key]
|
|
msgs.append(f'export {key}={value}')
|
|
print('\n'.join(msgs))
|
|
|
|
async def start(self):
|
|
if self.srv_exe is None:
|
|
self.logger.error("Minio not installed, get it from https://dl.minio.io/server/minio/release/linux-amd64/minio and put into PATH")
|
|
return
|
|
|
|
self.log_file = self.log_filename.open("wb")
|
|
os.mkdir(self.rootdir)
|
|
|
|
retries = 42 # just retry a fixed number of times
|
|
for port in self._get_local_ports(retries):
|
|
try:
|
|
self.cmd = await self._run_server(port)
|
|
self.port = port
|
|
except RuntimeError:
|
|
pass
|
|
else:
|
|
break
|
|
else:
|
|
self.logger.error("Failed to start Minio server")
|
|
return
|
|
|
|
self._set_environ()
|
|
|
|
try:
|
|
alias = 'local'
|
|
self.log_to_file(f'Configuring access to {self.address}:{self.port}')
|
|
await self.mc('alias', 'rm', alias, ignore_failure=True)
|
|
# wait for the server to be ready when running the first command which should not fail
|
|
await self.mc('alias', 'set', alias, f'http://{self.address}:{self.port}', self.default_user, self.default_pass, timeout=30)
|
|
self.log_to_file(f'Creating user with key {self.access_key}')
|
|
await self.mc('admin', 'user', 'add', alias, self.access_key, self.secret_key)
|
|
self.log_to_file(f'Configuring bucket {self.bucket_name}')
|
|
await self.mc('mb', f'{alias}/{self.bucket_name}')
|
|
with tempfile.NamedTemporaryFile(mode='w', encoding='UTF-8', suffix='.json') as policy_file:
|
|
json.dump(self._bucket_policy(), policy_file, indent=2)
|
|
policy_file.flush()
|
|
await self.mc('admin', 'policy', 'create', alias, 'test-policy', policy_file.name)
|
|
await self.mc('admin', 'policy', 'attach', alias, 'test-policy', '--user', self.access_key)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f'MC failed: {e}')
|
|
await self.stop()
|
|
|
|
async def stop(self):
|
|
self.logger.info('Killing minio server')
|
|
if not self.cmd:
|
|
return
|
|
|
|
# so the test's process environment is not polluted by a test case
|
|
# which launches the MinioServer by itself.
|
|
self._unset_environ()
|
|
try:
|
|
self.cmd.kill()
|
|
except ProcessLookupError:
|
|
pass
|
|
else:
|
|
await self.cmd.wait()
|
|
finally:
|
|
self.logger.info('Killed minio server')
|
|
self.cmd = None
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="Start a MinIO server")
|
|
parser.add_argument('--tempdir')
|
|
parser.add_argument('--host', default='127.0.0.1')
|
|
args = parser.parse_args()
|
|
with tempfile.TemporaryDirectory(suffix='-minio', dir=args.tempdir) as tempdir:
|
|
if args.tempdir is None:
|
|
print(f'{tempdir=}')
|
|
server = MinioServer(tempdir, args.host, logging.getLogger('minio'))
|
|
await server.start()
|
|
server.print_environ()
|
|
try:
|
|
_ = input('server started. press any key to stop: ')
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
await server.stop()
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|