#!/usr/bin/env python3 import asyncio import os import pytest import shutil import logging import json from test.pylib.minio_server import MinioServer from cassandra.protocol import ConfigurationException from test.pylib.manager_client import ManagerClient from test.cluster.util import reconnect_driver from test.cluster.object_store.conftest import format_tuples, keyspace_options from test.cqlpy.rest_api import scylla_inject_error from test.cluster.test_config import wait_for_config from test.cluster.util import new_test_keyspace logger = logging.getLogger(__name__) @pytest.mark.parametrize('mode', ['normal', 'encrypted']) @pytest.mark.asyncio async def test_basic(manager: ManagerClient, object_storage, tmp_path, mode): '''verify ownership table is updated, and tables written to object storage can be read after scylla restarts''' objconf = object_storage.create_endpoint_conf() cfg = {'enable_user_defined_functions': False, 'object_storage_endpoints': objconf, 'experimental_features': ['keyspace-storage-options']} if mode == 'encrypted': d = tmp_path / "system_keys" d.mkdir() cfg = cfg | { 'system_key_directory': str(d), 'user_info_encryption': { 'enabled': True, 'key_provider': 'LocalFileSystemKeyProviderFactory' } } server = await manager.server_add(config=cfg) cql = manager.get_cql() workdir = await manager.server_get_workdir(server.server_id) print(f'Create keyspace (storage server listening at {object_storage.address})') async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks: await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)]) assert not os.path.exists(os.path.join(workdir, f'data/{ks}')), "object storage backed keyspace has local directory created" # Sanity check that the path is constructed correctly assert os.path.exists(os.path.join(workdir, 'data/system')), "Datadir is elsewhere" desc = cql.execute(f"DESCRIBE KEYSPACE {ks}").one().create_statement # The storage_opts wraps options with '{ }' while the DESCRIBE # does it like '{}' so strip the corner branches and spaces for check assert f"{{'type': '{object_storage.type}', 'bucket': '{object_storage.bucket_name}', 'endpoint': '{object_storage.address}'}}" in desc, "DESCRIBE generates unexpected storage options" res = cql.execute(f"SELECT * FROM {ks}.test;") rows = {x.name: x.value for x in res} assert len(rows) > 0, 'Test table is empty' await manager.api.flush_keyspace(server.ip_addr, ks) # Check that the ownership table is populated properly res = cql.execute("SELECT * FROM system.sstables;") tid = cql.execute(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'").one() for row in res: assert row.owner == tid.id, \ f'Unexpected entry owner in registry: {row.owner}' assert row.status == 'sealed', f'Unexpected entry status in registry: {row.status}' print('Restart scylla') await manager.server_restart(server.server_id) cql = await reconnect_driver(manager) # Shouldn't be recreated by populator code assert not os.path.exists(os.path.join(workdir, f'data/{ks}')), "object storage backed keyspace has local directory resurrected" res = cql.execute(f"SELECT * FROM {ks}.test;") have_res = {x.name: x.value for x in res} assert have_res == rows, f'Unexpected table content: {have_res}' print('Drop table') cql.execute(f"DROP TABLE {ks}.test;") # Check that the ownership table is de-populated res = cql.execute("SELECT * FROM system.sstables;") rows = "\n".join(f"{row.owner} {row.status}" for row in res) assert not rows, 'Unexpected entries in registry' @pytest.mark.asyncio async def test_garbage_collect(manager: ManagerClient, object_storage): '''verify ownership table is garbage-collected on boot''' sstable_entries = [] objconf = object_storage.create_endpoint_conf() cfg = {'enable_user_defined_functions': False, 'object_storage_endpoints': objconf, 'experimental_features': ['keyspace-storage-options']} cmd = ['--logger-log-level', 's3=trace:http=debug:gcp_storage=trace'] server = await manager.server_add(config=cfg, cmdline=cmd) cql = manager.get_cql() print(f'Create keyspace (storage server listening at {object_storage.address})') async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks: await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)]) await manager.api.flush_keyspace(server.ip_addr, ks) # Mark the sstables as "removing" to simulate the problem res = cql.execute("SELECT * FROM system.sstables;") for row in res: sstable_entries.append((row.owner, row.generation)) print(f'Found entries: {[ str(ent[1]) for ent in sstable_entries ]}') for owner, gen in sstable_entries: cql.execute("UPDATE system.sstables SET status = 'removing'" f" WHERE owner = {owner} AND generation = {gen};") print('Restart scylla') await manager.server_restart(server.server_id) cql = await reconnect_driver(manager) res = cql.execute(f"SELECT * FROM {ks}.test;") have_res = {x.name: x.value for x in res} # Must be empty as no sstables should have been picked up assert not have_res, f'Sstables not cleaned, got {have_res}' # Make sure objects also disappeared objects = object_storage.get_resource().Bucket(object_storage.bucket_name).objects.all() print(f'Found objects: {[ objects ]}') for o in objects: for ent in sstable_entries: assert not o.key.startswith(str(ent[1])), f'Sstable object not cleaned, found {o.key}' @pytest.mark.asyncio async def test_populate_from_quarantine(manager: ManagerClient, object_storage): '''verify sstables are populated from quarantine state''' objconf = object_storage.create_endpoint_conf() cfg = {'enable_user_defined_functions': False, 'object_storage_endpoints': objconf, 'experimental_features': ['keyspace-storage-options']} server = await manager.server_add(config=cfg) cql = manager.get_cql() print(f'Create keyspace (storage server listening at {object_storage.address})') async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks: await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)]) res = cql.execute(f"SELECT * FROM {ks}.test;") rows = {x.name: x.value for x in res} assert len(rows) > 0, 'Test table is empty' await manager.api.flush_keyspace(server.ip_addr, ks) # Move the sstables into "quarantine" res = cql.execute("SELECT * FROM system.sstables;") assert len(list(res)) > 0, 'No entries in registry' for row in res: cql.execute("UPDATE system.sstables SET state = 'quarantine'" f" WHERE owner = {row.owner} AND generation = {row.generation};") print('Restart scylla') await manager.server_restart(server.server_id) cql = await reconnect_driver(manager) res = cql.execute(f"SELECT * FROM {ks}.test;") have_res = {x.name: x.value for x in res} # Quarantine entries must have been processed normally assert have_res == rows, f'Unexpected table content: {have_res}' @pytest.mark.asyncio async def test_misconfigured_storage(manager: ManagerClient, object_storage): '''creating keyspace with unknown endpoint is not allowed''' # scylladb/scylladb#15074 objconf = object_storage.create_endpoint_conf() cfg = {'enable_user_defined_functions': False, 'object_storage_endpoints': objconf, 'experimental_features': ['keyspace-storage-options']} server = await manager.server_add(config=cfg) cql = manager.get_cql() print(f'Create keyspace (storage server listening at {object_storage.address})') replication_opts = format_tuples({'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}) storage_opts = format_tuples(type=f'{object_storage.type}', endpoint='unknown_endpoint', bucket=object_storage.bucket_name) with pytest.raises(ConfigurationException): cql.execute((f"CREATE KEYSPACE test_ks WITH" f" REPLICATION = {replication_opts} AND STORAGE = {storage_opts};")) @pytest.mark.asyncio async def test_memtable_flush_retries(manager: ManagerClient, tmpdir, object_storage): '''verify that memtable flush doesn't crash in case storage access keys are incorrect''' print('Spoof the object-store config') objconf = object_storage.create_endpoint_conf() cfg = {'enable_user_defined_functions': False, 'object_storage_endpoints': objconf, 'experimental_features': ['keyspace-storage-options']} server = await manager.server_add(config=cfg) cql = manager.get_cql() print(f'Create keyspace (storage server listening at {object_storage.address})') async with new_test_keyspace(manager, keyspace_options(object_storage)) as ks: await cql.run_async(f"CREATE TABLE {ks}.test (name text PRIMARY KEY, value int);") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (name, value) VALUES ('{k}', {k});") for k in range(4)]) res = cql.execute(f"SELECT * FROM {ks}.test;") rows = {x.name: x.value for x in res} with scylla_inject_error(cql, "s3_client_fail_authorization"): print(f'Flush keyspace') flush = asyncio.create_task(manager.api.flush_keyspace(server.ip_addr, ks)) print(f'Wait few seconds') await asyncio.sleep(8) print(f'Wait for flush to finish') await flush print(f'Check the sstables table') res = cql.execute("SELECT * FROM system.sstables;") ssts = "\n".join(f"{row.owner} {row.generation} {row.status}" for row in res) print(f'sstables:\n{ssts}') print('Restart scylla') await manager.server_restart(server.server_id) cql = await reconnect_driver(manager) res = cql.execute(f"SELECT * FROM {ks}.test;") have_res = { x.name: x.value for x in res } assert have_res == dict(rows), f'Unexpected table content: {have_res}' @pytest.mark.asyncio @pytest.mark.parametrize('config_with_full_url', [True, False]) async def test_get_object_store_endpoints(manager: ManagerClient, config_with_full_url): if config_with_full_url: objconf = MinioServer.create_conf('http://a:123', 'region') else: objconf = MinioServer.create_conf('a', 'region') objconf[0]["port"] = 123 objconf[0]["use_https"] = False del objconf[0]["type"] cfg = {'object_storage_endpoints': objconf} print('Scylla returns the object storage endpoints') server = await manager.server_add(config=cfg) endpoints = await manager.api.get_config(server.ip_addr, 'object_storage_endpoints') name = objconf[0]['name'] del objconf[0]['name'] print('Also check the returned string is valid JSON') assert name in endpoints assert json.loads(endpoints[name]) == objconf[0] print('Check that system.config contains the object storage endpoints') cql = manager.get_cql() res = json.loads(cql.execute("SELECT value FROM system.config WHERE name = 'object_storage_endpoints';").one().value) assert name in res assert json.loads(res[name]) == objconf[0] @pytest.mark.asyncio async def test_create_keyspace_after_config_update(manager: ManagerClient, object_storage): server = await manager.server_add() cql = manager.get_cql() print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()') endpoint = 'http://a:456' replication_opts = format_tuples({'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}) storage_opts = format_tuples(type=f'{object_storage.type}', endpoint=endpoint, bucket=object_storage.bucket_name) with pytest.raises(ConfigurationException): cql.execute((f'CREATE KEYSPACE random_ks WITH' f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};')) print('Update config with a new endpoint and SIGHUP Scylla to reload configuration') new_endpoint = MinioServer.create_conf(endpoint, 'region') await manager.server_update_config(server.server_id, 'object_storage_endpoints', new_endpoint) await wait_for_config(manager, server, 'object_storage_endpoints', {endpoint: '{ "type": "s3", "aws_region": "region", "iam_role_arn": "" }'}) print('Passing a known endpoint will make the CREATE KEYSPACE stmt to succeed') cql.execute((f'CREATE KEYSPACE random_ks WITH' f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))