Files
scylladb/test/cluster/object_store/conftest.py
Avi Kivity 0df85c8ae8 Revert "Merge 'Unify configuration of object storage endpoints' from Pavel Emelyanov"
This reverts commit 1bb897c7ca, reversing
changes made to 954f2cbd2f. It makes
incompatible changes to the object storage configuration format, breaking
tests [1]. It's likely that it doesn't break any production configuration,
but we can't be sure.

Fixes #27966

Closes scylladb/scylladb#27969
2026-01-05 08:53:41 +02:00

270 lines
8.8 KiB
Python

#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import logging
# use minio_server
from test.pylib.minio_server import MinioServer
from test.pylib.suite.python import add_s3_options
from test.pylib.dockerized_service import DockerizedServer
from operator import attrgetter
import pytest
import boto3
import requests
def pytest_addoption(parser):
add_s3_options(parser)
def format_tuples(tuples=None, **kwargs):
'''format a dict to structured values (tuples) in CQL'''
if tuples is None:
tuples = {}
tuples.update(kwargs)
body = ', '.join(f"'{key}': '{value}'" for key, value in tuples.items())
return f'{{ {body} }}'
class S3_Server:
def __init__(self, tempdir: str, address: str, port: int, acc_key: str, secret_key: str, region: str, bucket_name):
self.tempdir = tempdir
self.address = address
self.port = port
self.acc_key = acc_key
self.secret_key = secret_key
self.region = region
self.bucket_name = bucket_name
def __repr__(self):
return f"[unknown] {self.address}:{self.port}/{self.bucket_name}"
@property
def type(self):
return 'S3'
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.port, self.region)
def get_resource(self):
"""Creates boto3.resource object that can be used to communicate to the given server"""
return boto3.resource('s3',
endpoint_url=f'http://{self.address}:{self.port}',
aws_access_key_id=self.acc_key,
aws_secret_access_key=self.secret_key,
aws_session_token=None,
config=boto3.session.Config(signature_version='s3v4'),
verify=False
)
async def start(self):
pass
async def stop(self):
pass
class MinioWrapper(S3_Server):
def __init__(self, tempdir):
self.server = MinioServer(tempdir,
'127.0.0.1',
logging.getLogger('minio'))
self.tempdir = tempdir
self.address = self.server.address
self.port = self.server.port
self.acc_key = self.server.access_key
self.secret_key = self.server.access_key
self.region = MinioServer.DEFAULT_REGION
self.bucket_name = self.server.bucket_name
def create_endpoint_conf(self):
return MinioServer.create_conf(self.address, self.port, self.region)
async def start(self):
return self.server.start()
async def stop(self):
return self.server.stop()
def create_s3_server(pytestconfig, tmpdir):
server = None
s3_server_address = pytestconfig.getoption('--s3-server-address')
s3_server_port = pytestconfig.getoption('--s3-server-port')
aws_acc_key = pytestconfig.getoption('--aws-access-key')
aws_secret_key = pytestconfig.getoption('--aws-secret-key')
aws_region = pytestconfig.getoption('--aws-region')
s3_server_bucket = pytestconfig.getoption('--s3-server-bucket')
default_address = os.environ.get(MinioServer.ENV_ADDRESS)
default_port = os.environ.get(MinioServer.ENV_PORT)
default_acc_key = os.environ.get(MinioServer.ENV_ACCESS_KEY)
default_secret_key = os.environ.get(MinioServer.ENV_SECRET_KEY)
default_region = MinioServer.DEFAULT_REGION
default_bucket = os.environ.get(MinioServer.ENV_BUCKET)
tempdir = tmpdir.strpath
if s3_server_address:
server = S3_Server(tempdir,
s3_server_address,
s3_server_port,
aws_acc_key,
aws_secret_key,
aws_region,
s3_server_bucket)
elif default_address:
server = S3_Server(tempdir,
default_address,
int(default_port),
default_acc_key,
default_secret_key,
default_region,
default_bucket)
else:
server = MinioWrapper(tempdir)
return server
@pytest.fixture(scope="function")
async def s3_server(pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
await server.start()
try:
yield server
finally:
await server.stop()
class GSFront:
def __init__(self, endpoint, bucket_name, credentials_file):
self.endpoint = endpoint
self.bucket_name = bucket_name
self.credentials_file = credentials_file
@property
def address(self):
return self.endpoint
@property
def type(self):
return 'GS'
def create_endpoint_conf(self):
endpoint = {'name': self.endpoint,
'type': 'gs',
'credentials_file': self.credentials_file if self.credentials_file else 'none'
}
return [endpoint]
def get_resource(self):
"""Creates boto3.resource object that can be used to communicate to the given server"""
return boto3.resource('s3',
endpoint_url=self.endpoint,
config=boto3.session.Config(signature_version='s3v4'),
verify=False
)
async def start(self):
pass
async def stop(self):
pass
class GSServer(GSFront):
def __init__(self, tmpdir):
super(GSServer, self).__init__(None, 'testbucket', None)
self.server = None
self.host = None
self.port = None
self.oldvars = {}
self.vars = { 'GS_SERVER_ADDRESS_FOR_TEST': attrgetter('endpoint'),
'GS_BUCKET_FOR_TEST': attrgetter('bucket_name'),
'GS_CREDENTIALS_FILE': attrgetter('credentials_file'),
}
self.tmpdir = tmpdir
def publish(self):
self.endpoint = f'http://{self.host}:{self.port}'
self.oldvars = { k : os.environ.get(k) for k in self.vars }
for k, v in self.vars.items():
val = v(self)
if val:
os.environ[k] = val
def unpublish(self):
for k in self.vars:
v = self.oldvars[k]
if v:
os.environ[k] = v
elif os.environ.get(k):
del os.environ[k]
def _docker_args(self, host, port):
# pylint: disable=unused-argument
return ["-p", f'{port}:{port}']
def _image_args(self, host, port):
# pylint: disable=unused-argument
return ["-scheme", "http", "-log-level", "debug", "--port", f'{port}', '-public-host', f'127.0.0.1:{port}']
async def start(self):
self.server = DockerizedServer("docker.io/fsouza/fake-gcs-server:1.52.3", self.tmpdir,
logfilenamebase="fake-gcs-server",
docker_args=self._docker_args,
image_args=self._image_args,
success_string="server started at",
failure_string="address already in use"
)
await self.server.start()
self.port = self.server.port
self.host = self.server.host
self.publish()
# create bucket. can't use boto, because fake server does not support xml syntax
# for anything beyong listing
response = requests.post(f'{self.endpoint}/storage/v1/b?project=testproject', json = {
'name': self.bucket_name, 'location' : 'US', 'storageClass' : 'STANDARD',
'iamConfiguration': {
'uniformBucketLevelAccess' : {
'enabled': True,
}
}
}, timeout = 10)
if response.status_code not in [200, 201]:
raise Exception(f'Could not create test bucket: {response}')
async def stop(self):
if self.server:
await self.server.stop()
self.unpublish()
@pytest.fixture(scope="function", params=['s3','gs'])
async def object_storage(request, pytestconfig, tmpdir):
server = None
if request.param == 'gs':
endpoint = os.environ.get('GS_SERVER_ADDRESS_FOR_TEST')
bucket = os.environ.get('GS_BUCKET_FOR_TEST')
credentials_file = os.environ.get('GS_CREDENTIALS_FILE')
if endpoint is not None and bucket is not None:
server = GSFront(endpoint, bucket, credentials_file)
else:
server = GSServer(tmpdir)
else:
server = create_s3_server(pytestconfig, tmpdir)
try:
await server.start()
yield server
finally:
await server.stop()
@pytest.fixture(scope="function")
async def s3_storage(pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
try:
await server.start()
yield server
finally:
await server.stop()