Files
scylladb/test/cluster/object_store/test_basic.py
Ernest Zaslavsky 321d4caf0c object_storage: add retryable machinery to object storage
remove hand rolled error handling from object storage client
and replace with common machinery that supports exception
handling and retrying when appropriate
2026-02-22 14:00:44 +02:00

298 lines
14 KiB
Python

#!/usr/bin/env python3
import asyncio
import os
import pytest
import shutil
import logging
import json
from test.pylib.minio_server import MinioServer
from cassandra.protocol import ConfigurationException
from test.pylib.manager_client import ManagerClient
from test.cluster.util import reconnect_driver
from test.cluster.object_store.conftest import format_tuples, keyspace_options
from test.cqlpy.rest_api import scylla_inject_error
from test.cluster.test_config import wait_for_config
from test.cluster.util import new_test_keyspace
logger = logging.getLogger(__name__)
@pytest.mark.parametrize('mode', ['normal', 'encrypted'])
@pytest.mark.asyncio
async def test_basic(manager: ManagerClient, object_storage, tmp_path, mode):
'''verify ownership table is updated, and tables written to object storage can be read after scylla restarts'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
if mode == 'encrypted':
d = tmp_path / "system_keys"
d.mkdir()
cfg = cfg | {
'system_key_directory': str(d),
'user_info_encryption': { 'enabled': True, 'key_provider': 'LocalFileSystemKeyProviderFactory' }
}
server = await manager.server_add(config=cfg)
cql = manager.get_cql()
workdir = await manager.server_get_workdir(server.server_id)
print(f'Create keyspace (storage server listening at {object_storage.address})')
async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)])
assert not os.path.exists(os.path.join(workdir, f'data/{ks}')), "object storage backed keyspace has local directory created"
# Sanity check that the path is constructed correctly
assert os.path.exists(os.path.join(workdir, 'data/system')), "Datadir is elsewhere"
desc = cql.execute(f"DESCRIBE KEYSPACE {ks}").one().create_statement
# The storage_opts wraps options with '{ <options> }' while the DESCRIBE
# does it like '{<options>}' so strip the corner branches and spaces for check
assert f"{{'type': '{object_storage.type}', 'bucket': '{object_storage.bucket_name}', 'endpoint': '{object_storage.address}'}}" in desc, "DESCRIBE generates unexpected storage options"
res = cql.execute(f"SELECT * FROM {ks}.test;")
rows = {x.name: x.value for x in res}
assert len(rows) > 0, 'Test table is empty'
await manager.api.flush_keyspace(server.ip_addr, ks)
# Check that the ownership table is populated properly
res = cql.execute("SELECT * FROM system.sstables;")
tid = cql.execute(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'").one()
for row in res:
assert row.owner == tid.id, \
f'Unexpected entry owner in registry: {row.owner}'
assert row.status == 'sealed', f'Unexpected entry status in registry: {row.status}'
print('Restart scylla')
await manager.server_restart(server.server_id)
cql = await reconnect_driver(manager)
# Shouldn't be recreated by populator code
assert not os.path.exists(os.path.join(workdir, f'data/{ks}')), "object storage backed keyspace has local directory resurrected"
res = cql.execute(f"SELECT * FROM {ks}.test;")
have_res = {x.name: x.value for x in res}
assert have_res == rows, f'Unexpected table content: {have_res}'
print('Drop table')
cql.execute(f"DROP TABLE {ks}.test;")
# Check that the ownership table is de-populated
res = cql.execute("SELECT * FROM system.sstables;")
rows = "\n".join(f"{row.owner} {row.status}" for row in res)
assert not rows, 'Unexpected entries in registry'
@pytest.mark.asyncio
async def test_garbage_collect(manager: ManagerClient, object_storage):
'''verify ownership table is garbage-collected on boot'''
sstable_entries = []
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
cmd = ['--logger-log-level', 's3=trace:http=debug:gcp_storage=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
cql = manager.get_cql()
print(f'Create keyspace (storage server listening at {object_storage.address})')
async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)])
await manager.api.flush_keyspace(server.ip_addr, ks)
# Mark the sstables as "removing" to simulate the problem
res = cql.execute("SELECT * FROM system.sstables;")
for row in res:
sstable_entries.append((row.owner, row.generation))
print(f'Found entries: {[ str(ent[1]) for ent in sstable_entries ]}')
for owner, gen in sstable_entries:
cql.execute("UPDATE system.sstables SET status = 'removing'"
f" WHERE owner = {owner} AND generation = {gen};")
print('Restart scylla')
await manager.server_restart(server.server_id)
cql = await reconnect_driver(manager)
res = cql.execute(f"SELECT * FROM {ks}.test;")
have_res = {x.name: x.value for x in res}
# Must be empty as no sstables should have been picked up
assert not have_res, f'Sstables not cleaned, got {have_res}'
# Make sure objects also disappeared
objects = object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all()
print(f'Found objects: {[ objects ]}')
for o in objects:
for ent in sstable_entries:
assert not o.key.startswith(str(ent[1])), f'Sstable object not cleaned, found {o.key}'
@pytest.mark.asyncio
async def test_populate_from_quarantine(manager: ManagerClient, object_storage):
'''verify sstables are populated from quarantine state'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
server = await manager.server_add(config=cfg)
cql = manager.get_cql()
print(f'Create keyspace (storage server listening at {object_storage.address})')
async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)])
res = cql.execute(f"SELECT * FROM {ks}.test;")
rows = {x.name: x.value for x in res}
assert len(rows) > 0, 'Test table is empty'
await manager.api.flush_keyspace(server.ip_addr, ks)
# Move the sstables into "quarantine"
res = cql.execute("SELECT * FROM system.sstables;")
assert len(list(res)) > 0, 'No entries in registry'
for row in res:
cql.execute("UPDATE system.sstables SET state = 'quarantine'"
f" WHERE owner = {row.owner} AND generation = {row.generation};")
print('Restart scylla')
await manager.server_restart(server.server_id)
cql = await reconnect_driver(manager)
res = cql.execute(f"SELECT * FROM {ks}.test;")
have_res = {x.name: x.value for x in res}
# Quarantine entries must have been processed normally
assert have_res == rows, f'Unexpected table content: {have_res}'
@pytest.mark.asyncio
async def test_misconfigured_storage(manager: ManagerClient, object_storage):
'''creating keyspace with unknown endpoint is not allowed'''
# scylladb/scylladb#15074
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
server = await manager.server_add(config=cfg)
cql = manager.get_cql()
print(f'Create keyspace (storage server listening at {object_storage.address})')
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type=f'{object_storage.type}',
endpoint='unknown_endpoint',
bucket=object_storage.bucket_name)
with pytest.raises(ConfigurationException):
cql.execute((f"CREATE KEYSPACE test_ks WITH"
f" REPLICATION = {replication_opts} AND STORAGE = {storage_opts};"))
@pytest.mark.asyncio
async def test_memtable_flush_retries(manager: ManagerClient, tmpdir, object_storage):
'''verify that memtable flush doesn't crash in case storage access keys are incorrect'''
print('Spoof the object-store config')
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
server = await manager.server_add(config=cfg)
cql = manager.get_cql()
print(f'Create keyspace (storage server listening at {object_storage.address})')
async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)])
res = cql.execute(f"SELECT * FROM {ks}.test;")
rows = {x.name: x.value for x in res}
with scylla_inject_error(cql, "s3_client_fail_authorization"):
print(f'Flush keyspace')
flush = asyncio.create_task(manager.api.flush_keyspace(server.ip_addr, ks))
print(f'Wait few seconds')
await asyncio.sleep(8)
print(f'Wait for flush to finish')
await flush
print(f'Check the sstables table')
res = cql.execute("SELECT * FROM system.sstables;")
ssts = "\n".join(f"{row.owner} {row.generation} {row.status}" for row in res)
print(f'sstables:\n{ssts}')
print('Restart scylla')
await manager.server_restart(server.server_id)
cql = await reconnect_driver(manager)
res = cql.execute(f"SELECT * FROM {ks}.test;")
have_res = { x.name: x.value for x in res }
assert have_res == dict(rows), f'Unexpected table content: {have_res}'
@pytest.mark.asyncio
@pytest.mark.parametrize('config_with_full_url', [True, False])
async def test_get_object_store_endpoints(manager: ManagerClient, config_with_full_url):
if config_with_full_url:
objconf = MinioServer.create_conf('http://a:123', 'region')
else:
objconf = MinioServer.create_conf('a', 'region')
objconf[0]["port"] = 123
objconf[0]["use_https"] = False
del objconf[0]["type"]
cfg = {'object_storage_endpoints': objconf}
print('Scylla returns the object storage endpoints')
server = await manager.server_add(config=cfg)
endpoints = await manager.api.get_config(server.ip_addr, 'object_storage_endpoints')
name = objconf[0]['name']
del objconf[0]['name']
print('Also check the returned string is valid JSON')
assert name in endpoints
assert json.loads(endpoints[name]) == objconf[0]
print('Check that system.config contains the object storage endpoints')
cql = manager.get_cql()
res = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'object_storage_endpoints';").one().value)
assert name in res
assert json.loads(res[name]) == objconf[0]
@pytest.mark.asyncio
async def test_create_keyspace_after_config_update(manager: ManagerClient, object_storage):
server = await manager.server_add()
cql = manager.get_cql()
print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()')
endpoint = 'http://a:456'
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type=f'{object_storage.type}',
endpoint=endpoint,
bucket=object_storage.bucket_name)
with pytest.raises(ConfigurationException):
cql.execute((f'CREATE KEYSPACE random_ks WITH'
f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))
print('Update config with a new endpoint and SIGHUP Scylla to reload configuration')
new_endpoint = MinioServer.create_conf(endpoint, 'region')
await manager.server_update_config(server.server_id, 'object_storage_endpoints', new_endpoint)
await wait_for_config(manager, server, 'object_storage_endpoints', {endpoint: '{ "type": "s3", "aws_region": "region", "iam_role_arn": "" }'})
print('Passing a known endpoint will make the CREATE KEYSPACE stmt to succeed')
cql.execute((f'CREATE KEYSPACE random_ks WITH'
f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))