Files
scylladb/test/object_store/test_basic.py
Kefu Chai af8bc8ba63 sstable: switch to uuid identifier for naming S3 sstable objects
before this change, we create a new UUID for a new sstable managed
by the s3_storage, and we use the string representation of UUID
defined by RFC4122 like "0aa490de-7a85-46e2-8f90-38b8f496d53b" for
naming the objects stored on s3_storage. but this representation is
not what we are using for storing sstables on local filesystem when
the option of "uuid_sstable_identifiers_enabled" is enabled. instead,
we are using a base36-based representation which is shorter.

to be consistent with the naming of the sstables created for local
filesystem, and more importantly, to simplify the interaction between
the local copy of sstables and those stored on object storage, we should
use the same string representation of the sstable identifier.

so, in this change:

1. instead of creating a new UUID, just reuse the generation of the
   sstable for the object's key.
2. do not store the uuid in the sstable_registry system table. As
   we already have the generation of the sstable for the same purpose.
3. switch the sstable identifier representation from the one defined
   by the RFC4122 (implemented by fmt::formatter<utils::UUID>) to the
   base36-based one (implemented by
   fmt::formatter<sstables::generation_type>)
4. enable the `uuid_sstable_identifers` cluster feature if it is
   enabled in the `test_env_config`, so that it the sstable manager
   can enable the uuid-based uuid when creating a new uuid for
   sstable.
5. throw if the generation of sstable is not UUID-based when
   accessing / manipulating an sstable with S3 storage backend. as
   the S3 storage backend now relies on this option. as, otherwise
   we'd have sstables with key like s3://bucket/number/basename, which
   is just unable to serve as a unique id for sstable if the bucket is
   shared across multiple tables.

Fixes #14175
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
2023-10-23 10:08:22 +08:00

219 lines
8.5 KiB
Python

#!/usr/bin/env python3
# Use the run.py library from ../cql-pytest:
import sys
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
import run
from util import format_tuples
import os
import requests
import signal
import yaml
import pytest
import xml.etree.ElementTree as ET
from contextlib import contextmanager
from test.pylib.rest_client import ScyllaRESTAPIClient
def get_scylla_with_s3_cmd(ssl, s3_server):
'''return a function which in turn returns the command for running scylla'''
scylla = run.find_scylla()
print('Scylla under test:', scylla)
def make_run_cmd(pid, d):
'''return the command args and environmental variables for running scylla'''
if ssl:
cmd, env = run.run_scylla_ssl_cql_cmd(pid, d)
else:
cmd, env = run.run_scylla_cmd(pid, d)
cmd += ['--object-storage-config-file', s3_server.config_file]
return cmd, env
return make_run_cmd
def check_with_cql(ip, ssl):
'''return a checker which checks the readiness of scylla'''
def checker():
if ssl:
return run.check_ssl_cql(ip)
else:
return run.check_cql(ip)
return checker
def run_with_dir(run_cmd_gen, run_dir):
print(f'Start scylla (dir={run_dir}')
mask = signal.pthread_sigmask(signal.SIG_BLOCK, {})
signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT, signal.SIGQUIT, signal.SIGTERM})
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid == 0:
# child
cmd, env = run_cmd_gen(os.getpid(), run_dir)
log = os.path.join(run_dir, 'log')
log_fd = os.open(log, os.O_WRONLY | os.O_CREAT | os.O_APPEND, mode=0o666)
# redirect stdout and stderr to the log file
# close and dup2 the original fds associated with stdout and stderr,
# pytest changes sys.stdout and sys.stderr to the its output buffers
# to capture them. so, if we intent to redirect the child process's
# stdout and stderr to the specified fd, we have to use the fd numbers
# *used* by the child process, not the ones used by the test.
outputs = [(sys.stdout, 1),
(sys.stderr, 2)]
for output, output_fd in outputs:
output.flush()
os.close(output_fd)
os.dup2(log_fd, output_fd)
os.setsid()
os.execve(cmd[0], cmd, dict(os.environ, **env))
# parent
signal.pthread_sigmask(signal.SIG_SETMASK, mask)
return pid
def kill_with_dir(old_pid, run_dir):
try:
print('Kill scylla')
os.killpg(old_pid, 2)
os.waitpid(old_pid, 0)
except ProcessLookupError:
pass
scylla_link = os.path.join(run_dir, 'test_scylla')
os.unlink(scylla_link)
class Cluster:
def __init__(self, cql, ip: str):
self.cql = cql
self.ip = ip
self.api = ScyllaRESTAPIClient()
@contextmanager
def managed_cluster(run_dir, ssl, s3_server):
# launch a one-node scylla cluster which uses the give s3_server as its
# object storage backend, it yields an instance of Cluster
# before this function returns, the cluster is teared down.
run_scylla_cmd = get_scylla_with_s3_cmd(ssl, s3_server)
pid = run_with_dir(run_scylla_cmd, run_dir)
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [check_with_cql(ip, ssl)])
cluster = run.get_cql_cluster(ip)
try:
yield Cluster(cluster, ip)
finally:
cluster.shutdown()
kill_with_dir(pid, run_dir)
@pytest.mark.asyncio
async def test_basic(test_tempdir, s3_server, ssl):
'''verify ownership table is updated, and tables written to S3 can be read after scylla restarts'''
ks = 'test_ks'
cf = 'test_cf'
rows = [('0', 'zero'),
('1', 'one'),
('2', 'two')]
with managed_cluster(test_tempdir, ssl, s3_server) as cluster:
print(f'Create keyspace (minio listening at {s3_server.address})')
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type='S3',
endpoint=s3_server.address,
bucket=s3_server.bucket_name)
conn = cluster.cql.connect()
conn.execute((f"CREATE KEYSPACE {ks} WITH"
f" REPLICATION = {replication_opts} AND STORAGE = {storage_opts};"))
conn.execute(f"CREATE TABLE {ks}.{cf} ( name text primary key, value text );")
for row in rows:
cql_fmt = "INSERT INTO {}.{} ( name, value ) VALUES ('{}', '{}');"
conn.execute(cql_fmt.format(ks, cf, *row))
res = conn.execute(f"SELECT * FROM {ks}.{cf};")
await cluster.api.flush_keyspace(cluster.ip, ks)
# Check that the ownership table is populated properly
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
assert row.location.startswith(test_tempdir), \
f'Unexpected entry location in registry: {row.location}'
assert row.status == 'sealed', f'Unexpected entry status in registry: {row.status}'
print('Restart scylla')
with managed_cluster(test_tempdir, ssl, s3_server) as cluster:
conn = cluster.cql.connect()
res = conn.execute(f"SELECT * FROM {ks}.{cf};")
have_res = { x.name: x.value for x in res }
assert have_res == dict(rows), f'Unexpected table content: {have_res}'
print('Drop table')
conn.execute(f"DROP TABLE {ks}.{cf};")
# Check that the ownership table is de-populated
res = conn.execute("SELECT * FROM system.sstables;")
rows = "\n".join(f"{row.location} {row.status}" for row in res)
assert not rows, 'Unexpected entries in registry'
@pytest.mark.asyncio
async def test_garbage_collect(test_tempdir, s3_server, ssl):
'''verify ownership table is garbage-collected on boot'''
ks = 'test_ks'
cf = 'test_cf'
rows = [('0', 'zero'),
('1', 'one'),
('2', 'two')]
def list_bucket(s3_server):
r = requests.get(f'http://{s3_server.address}:{s3_server.port}/{s3_server.bucket_name}')
bucket_list_res = ET.fromstring(r.content)
objects = []
for elem in bucket_list_res:
if elem.tag.endswith('Contents'):
for opt in elem:
if opt.tag.endswith('Key'):
objects.append(opt.text)
return objects
sstable_entries = []
with managed_cluster(test_tempdir, ssl, s3_server) as cluster:
print(f'Create keyspace (minio listening at {s3_server.address})')
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type='S3',
endpoint=s3_server.address,
bucket=s3_server.bucket_name)
conn = cluster.cql.connect()
conn.execute((f"CREATE KEYSPACE {ks} WITH"
f" REPLICATION = {replication_opts} AND STORAGE = {storage_opts};"))
conn.execute(f"CREATE TABLE {ks}.{cf} ( name text primary key, value text );")
for row in rows:
cql_fmt = "INSERT INTO {}.{} ( name, value ) VALUES ('{}', '{}');"
conn.execute(cql_fmt.format(ks, cf, *row))
await cluster.api.flush_keyspace(cluster.ip, ks)
# Mark the sstables as "removing" to simulate the problem
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
sstable_entries.append(tuple((row.location, row.generation)))
print(f'Found entries: {[ str(ent[1]) for ent in sstable_entries ]}')
for sst in sstable_entries:
conn.execute(f"UPDATE system.sstables SET status = 'removing' WHERE location = '{sst[0]}' AND generation = {sst[1]};")
print('Restart scylla')
with managed_cluster(test_tempdir, ssl, s3_server) as cluster:
conn = cluster.cql.connect()
res = conn.execute(f"SELECT * FROM {ks}.{cf};")
have_res = { x.name: x.value for x in res }
# Must be empty as no sstables should have been picked up
assert not have_res, f'Sstables not cleaned, got {have_res}'
# Make sure objects also disappeared
objects = list_bucket(s3_server)
print(f'Found objects: {[ objects ]}')
for o in objects:
for ent in sstable_entries:
assert not o.startswith(str(ent[1])), f'Sstable object not cleaned, found {o}'