test: add per-test bucket isolation to object_store fixtures

Create a unique S3/GCS bucket for each test function using the pytest
test name (from request.node.name), sanitized into a valid bucket name.
This ensures tests do not share state through a common bucket and makes
bucket names meaningful for debugging (e.g. test-basic-s3-a1b2c3d4).
Each fixture now calls create_test_bucket() on setup and
destroy_test_bucket() on teardown.
This commit is contained in:
Ernest Zaslavsky
2026-04-15 20:25:04 +03:00
parent 8e02e99c36
commit e9724f52a9

View File

@@ -6,6 +6,8 @@
import os
import logging
import re
import uuid
# use minio_server
from test.pylib.minio_server import MinioServer
@@ -35,6 +37,22 @@ def keyspace_options(object_storage, rf=1):
return f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND STORAGE = {storage_opts}"
def _make_bucket_name(test_name: str) -> str:
"""Generate a valid S3 bucket name from a pytest test name.
S3 bucket naming rules: lowercase, digits, hyphens only; 3-63 chars;
must start/end with a letter or digit.
"""
# Lowercase, replace non-alphanumeric runs with a single hyphen, strip leading/trailing hyphens
name = re.sub(r'[^a-z0-9]+', '-', test_name.lower()).strip('-')
# Add a short unique suffix to avoid collisions (e.g. parametrized tests with same prefix)
suffix = uuid.uuid4().hex[:8]
# Truncate so total length (name + '-' + suffix) <= 63
max_prefix = 63 - len(suffix) - 1
name = name[:max_prefix].rstrip('-')
return f"{name}-{suffix}"
class S3_Server:
def __init__(self, tempdir: str, address: str, port: int, acc_key: str, secret_key: str, region: str, bucket_name):
self.tempdir = tempdir
@@ -67,6 +85,22 @@ class S3_Server:
verify=False
)
def create_test_bucket(self, test_name: str):
"""Create a unique per-test bucket using boto3."""
self.bucket_name = _make_bucket_name(test_name)
resource = self.get_resource()
resource.Bucket(self.bucket_name).create()
def destroy_test_bucket(self):
"""Empty and delete the per-test bucket using boto3."""
try:
resource = self.get_resource()
bucket = resource.Bucket(self.bucket_name)
bucket.objects.all().delete()
bucket.delete()
except Exception:
pass
async def start(self):
pass
@@ -134,12 +168,14 @@ def create_s3_server(pytestconfig, tmpdir):
return server
@pytest.fixture(scope="function")
async def s3_server(pytestconfig, tmpdir):
async def s3_server(request, pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
await server.start()
server.create_test_bucket(request.node.name)
try:
yield server
finally:
server.destroy_test_bucket()
await server.stop()
class GSFront:
@@ -171,6 +207,22 @@ class GSFront:
verify=False
)
def create_test_bucket(self, test_name: str):
"""Create a unique per-test bucket using boto3."""
self.bucket_name = _make_bucket_name(test_name)
resource = self.get_resource()
resource.Bucket(self.bucket_name).create()
def destroy_test_bucket(self):
"""Empty and delete the per-test bucket using boto3."""
try:
resource = self.get_resource()
bucket = resource.Bucket(self.bucket_name)
bucket.objects.all().delete()
bucket.delete()
except Exception:
pass
async def start(self):
pass
async def stop(self):
@@ -236,6 +288,32 @@ class GSServer(GSFront):
if response.status_code not in [200, 201]:
raise Exception(f'Could not create test bucket: {response}')
def create_test_bucket(self, test_name: str):
"""Create a unique per-test bucket using GCS HTTP API (fake server doesn't support S3 XML for this)."""
self.bucket_name = _make_bucket_name(test_name)
response = requests.post(f'{self.endpoint}/storage/v1/b?project=testproject', json={
'name': self.bucket_name, 'location': 'US', 'storageClass': 'STANDARD',
'iamConfiguration': {
'uniformBucketLevelAccess': {
'enabled': True,
}
}
}, timeout=10)
if response.status_code not in [200, 201]:
raise Exception(f'Could not create test bucket: {response}')
def destroy_test_bucket(self):
"""Empty and delete the per-test bucket using GCS HTTP API."""
try:
# List and delete all objects first using boto3 (listing works on fake GCS)
resource = self.get_resource()
bucket = resource.Bucket(self.bucket_name)
bucket.objects.all().delete()
# Delete the bucket via GCS HTTP API
requests.delete(f'{self.endpoint}/storage/v1/b/{self.bucket_name}', timeout=10)
except Exception:
pass
async def stop(self):
if self.server:
await self.server.stop()
@@ -257,18 +335,27 @@ async def object_storage(request, pytestconfig, tmpdir):
else:
server = create_s3_server(pytestconfig, tmpdir)
bucket_created = False
try:
await server.start()
server.create_test_bucket(request.node.name)
bucket_created = True
yield server
finally:
if bucket_created:
server.destroy_test_bucket()
await server.stop()
@pytest.fixture(scope="function")
async def s3_storage(pytestconfig, tmpdir):
async def s3_storage(request, pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
bucket_created = False
try:
await server.start()
server.create_test_bucket(request.node.name)
bucket_created = True
yield server
finally:
if bucket_created:
server.destroy_test_bucket()
await server.stop()