Files
scylladb/test/cluster/object_store/conftest.py
Calle Wilund 3e8a9a0beb pytest: use ephemeral port publish for docker mock servers
Changes dockerized_service to use ephermal port publish, and
query the published port from podman/docker.
Modifies client code to use slightly changed usage syntax.
2026-03-11 12:32:01 +01:00

275 lines
9.0 KiB
Python

#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import logging
# use minio_server
from test.pylib.minio_server import MinioServer
from test.pylib.suite.python import add_s3_options
from test.pylib.dockerized_service import DockerizedServer
from operator import attrgetter
import pytest
import boto3
import requests
def pytest_addoption(parser):
add_s3_options(parser)
def format_tuples(tuples=None, **kwargs):
'''format a dict to structured values (tuples) in CQL'''
if tuples is None:
tuples = {}
tuples.update(kwargs)
body = ', '.join(f"'{key}': '{value}'" for key, value in tuples.items())
return f'{{ {body} }}'
def keyspace_options(object_storage, rf=1):
storage_opts = format_tuples(type=f'{object_storage.type}', endpoint=object_storage.address, bucket=object_storage.bucket_name)
return f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND STORAGE = {storage_opts}"
class S3_Server:
def __init__(self, tempdir: str, address: str, port: int, acc_key: str, secret_key: str, region: str, bucket_name):
self.tempdir = tempdir
self.ip = address
self.port = port
self.address = f'http://{self.ip}:{self.port}'
self.acc_key = acc_key
self.secret_key = secret_key
self.region = region
self.bucket_name = bucket_name
def __repr__(self):
return f"[unknown] {self.address}/{self.bucket_name}"
@property
def type(self):
return 'S3'
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.region)
def get_resource(self):
"""Creates boto3.resource object that can be used to communicate to the given server"""
return boto3.resource('s3',
endpoint_url=self.address,
aws_access_key_id=self.acc_key,
aws_secret_access_key=self.secret_key,
aws_session_token=None,
config=boto3.session.Config(signature_version='s3v4'),
verify=False
)
async def start(self):
pass
async def stop(self):
pass
class MinioWrapper(S3_Server):
def __init__(self, tempdir):
self.server = MinioServer(tempdir,
'127.0.0.1',
logging.getLogger('minio'))
super().__init__(
tempdir,
self.server.address,
self.server.port,
self.server.access_key,
self.server.secret_key,
MinioServer.DEFAULT_REGION,
self.server.bucket_name)
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.region)
async def start(self):
return self.server.start()
async def stop(self):
return self.server.stop()
def create_s3_server(pytestconfig, tmpdir):
server = None
s3_server_address = pytestconfig.getoption('--s3-server-address')
s3_server_port = pytestconfig.getoption('--s3-server-port')
aws_acc_key = pytestconfig.getoption('--aws-access-key')
aws_secret_key = pytestconfig.getoption('--aws-secret-key')
aws_region = pytestconfig.getoption('--aws-region')
s3_server_bucket = pytestconfig.getoption('--s3-server-bucket')
default_address = os.environ.get(MinioServer.ENV_ADDRESS)
default_port = os.environ.get(MinioServer.ENV_PORT)
default_acc_key = os.environ.get(MinioServer.ENV_ACCESS_KEY)
default_secret_key = os.environ.get(MinioServer.ENV_SECRET_KEY)
default_region = MinioServer.DEFAULT_REGION
default_bucket = os.environ.get(MinioServer.ENV_BUCKET)
tempdir = tmpdir.strpath
if s3_server_address:
server = S3_Server(tempdir,
s3_server_address,
s3_server_port,
aws_acc_key,
aws_secret_key,
aws_region,
s3_server_bucket)
elif default_address:
server = S3_Server(tempdir,
default_address,
int(default_port),
default_acc_key,
default_secret_key,
default_region,
default_bucket)
else:
server = MinioWrapper(tempdir)
return server
@pytest.fixture(scope="function")
async def s3_server(pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
await server.start()
try:
yield server
finally:
await server.stop()
class GSFront:
def __init__(self, endpoint, bucket_name, credentials_file):
self.endpoint = endpoint
self.bucket_name = bucket_name
self.credentials_file = credentials_file
@property
def address(self):
return self.endpoint
@property
def type(self):
return 'GS'
def create_endpoint_conf(self):
endpoint = {'name': self.endpoint,
'type': 'gs',
'credentials_file': self.credentials_file if self.credentials_file else 'none'
}
return [endpoint]
def get_resource(self):
"""Creates boto3.resource object that can be used to communicate to the given server"""
return boto3.resource('s3',
endpoint_url=self.endpoint,
config=boto3.session.Config(signature_version='s3v4'),
verify=False
)
async def start(self):
pass
async def stop(self):
pass
class GSServer(GSFront):
def __init__(self, tmpdir):
super(GSServer, self).__init__(None, 'testbucket', None)
self.server = None
self.host = None
self.port = None
self.oldvars = {}
self.vars = { 'GS_SERVER_ADDRESS_FOR_TEST': attrgetter('endpoint'),
'GS_BUCKET_FOR_TEST': attrgetter('bucket_name'),
'GS_CREDENTIALS_FILE': attrgetter('credentials_file'),
}
self.tmpdir = tmpdir
def publish(self):
self.endpoint = f'http://{self.host}:{self.port}'
self.oldvars = { k : os.environ.get(k) for k in self.vars }
for k, v in self.vars.items():
val = v(self)
if val:
os.environ[k] = val
def unpublish(self):
for k in self.vars:
v = self.oldvars[k]
if v:
os.environ[k] = v
elif os.environ.get(k):
del os.environ[k]
def _image_args(self, host, port):
# pylint: disable=unused-argument
# note: need to set 'public-host' to the IP we connect to, to make XML (s3) API work for listing
return ["-scheme", "http", "-log-level", "debug", "--port", f'{port}', '-public-host', '127.0.0.1']
async def start(self):
self.server = DockerizedServer("docker.io/fsouza/fake-gcs-server:1.52.3", self.tmpdir,
logfilenamebase="fake-gcs-server",
image_args=self._image_args,
success_string="server started at",
failure_string="address already in use",
port=4443
)
await self.server.start()
self.port = self.server.port
self.host = self.server.host
self.publish()
# create bucket. can't use boto, because fake server does not support xml syntax
# for anything beyong listing
response = requests.post(f'{self.endpoint}/storage/v1/b?project=testproject', json = {
'name': self.bucket_name, 'location' : 'US', 'storageClass' : 'STANDARD',
'iamConfiguration': {
'uniformBucketLevelAccess' : {
'enabled': True,
}
}
}, timeout = 10)
if response.status_code not in [200, 201]:
raise Exception(f'Could not create test bucket: {response}')
async def stop(self):
if self.server:
await self.server.stop()
self.unpublish()
@pytest.fixture(scope="function", params=['s3','gs'])
async def object_storage(request, pytestconfig, tmpdir):
server = None
if request.param == 'gs':
endpoint = os.environ.get('GS_SERVER_ADDRESS_FOR_TEST')
bucket = os.environ.get('GS_BUCKET_FOR_TEST')
credentials_file = os.environ.get('GS_CREDENTIALS_FILE')
if endpoint is not None and bucket is not None:
server = GSFront(endpoint, bucket, credentials_file)
else:
server = GSServer(tmpdir)
else:
server = create_s3_server(pytestconfig, tmpdir)
try:
await server.start()
yield server
finally:
await server.stop()
@pytest.fixture(scope="function")
async def s3_storage(pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
try:
await server.start()
yield server
finally:
await server.stop()