Files
scylladb/test/object_store/run
Pavel Emelyanov 2f6aa5b52e code: Introduce conf/object_storage.yaml configuration file
In order to access real S3 bucket, the client should use signed requests
over https. Partially this is due to security considerations, partially
this is unavoidable, because multipart-uploading is banned for unsigned
requests on the S3. Also, signed requests over plain http require
signing the payload as well, which is a bit troublesome, so it's better
to stick to secure https and keep payload unsigned.

To prepare signed requests the code needs to know three things:
- aws key
- aws secret
- aws region name

The latter could be derived from the endpoint URL, but it's simpler to
configure it explicitly, all the more so there's an option to use S3
URLs without region name in them we could want to use some time.

To keep the described configuration the proposed place is the
object_storage.yaml file with the format

endpoints:
  - name: a.b.c
    port: 443
    aws_key: 12345
    aws_secret: abcdefghijklmnop
    ...

When loaded, the map gets into db::config and later will be propagated
down to sstables code (see next patch).

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2023-05-03 20:19:15 +03:00

111 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python3
# Use the run.py library from ../cql-pytest:
import sys
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
import run
import atexit
import os
import requests
import shutil
import yaml
print('Scylla is: ' + run.find_scylla() + '.')
success = True
ssl = '--ssl' in sys.argv
if ssl:
cmd = run.run_scylla_ssl_cql_cmd
check_cql = run.check_ssl_cql
else:
cmd = run.run_scylla_cmd
check_cql = run.check_cql
test_tempdir = run.pid_to_dir(os.getpid())
os.mkdir(test_tempdir)
s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST']
s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST']
def get_tempdir(pid):
return test_tempdir
with open(test_tempdir + '/object_storage.yaml', 'w') as config_file:
yaml.dump({ 'endpoints': [
{
'name': s3_server_address,
}
]
}, config_file)
def run_scylla_cmd(pid, dir):
global cmd
(c, e) = cmd(pid, dir)
c += ['--object-storage-config-file', test_tempdir + '/object_storage.yaml']
return (c, e)
def teardown(pid):
print('Kill scylla')
sys.stdout.flush()
log = run.abort_run_with_dir(pid, test_tempdir)
shutil.copyfileobj(log, sys.stdout.buffer)
print(f'Start scylla (dir={test_tempdir}')
pid = run.run_with_generated_dir(run_scylla_cmd, get_tempdir)
atexit.register(lambda: teardown(pid))
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [ lambda: check_cql(ip) ])
print(f'Create keyspace (minio listening at {s3_server_address})')
cluster = run.get_cql_cluster(ip)
conn = cluster.connect()
conn.execute("CREATE KEYSPACE test_ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': '1' } AND STORAGE = { 'type': 'S3', 'endpoint': '" + f'{s3_server_address}' + ":9000', 'bucket': '" + f'{s3_public_bucket}' + "' };")
conn.execute("CREATE TABLE test_ks.test_cf ( name text primary key, value text );")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('0', 'zero');")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('1', 'one');")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('2', 'two');")
res = conn.execute("SELECT * FROM test_ks.test_cf;")
r = requests.post(f'http://{ip}:10000/storage_service/keyspace_flush/test_ks', timeout=60)
if r.status_code != 200:
print(f'Error flushing keyspace: {r}')
success = False
# Check that the ownership table is populated properly
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
if not row.location.startswith(test_tempdir):
print(f'Unexpected entry location in registry: {row.location}')
success = False
if row.status != 'sealed':
print(f'Unexpected entry status in registry: {row.status}')
success = False
cluster.shutdown()
print('Restart scylla')
pid = run.restart_with_dir(pid, run_scylla_cmd, test_tempdir)
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [ lambda: check_cql(ip) ])
cluster = run.get_cql_cluster(ip)
conn = cluster.connect()
res = conn.execute("SELECT * FROM test_ks.test_cf;")
want_res = { '0': 'zero', '1': 'one', '2': 'two' }
have_res = { x.name: x.value for x in res }
if want_res != have_res:
print(f'Unexpected table content: {have_res}')
success = False
print('Drop table')
conn.execute("DROP TABLE test_ks.test_cf;")
# Check that the ownership table is de-populated
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
print(f'Unexpected entry in registry: {row.location} {row.status}')
success = False
cluster.shutdown()
sys.exit(0 if success else 1)