#!/usr/bin/env python3
# Use the run.py library from ../cql-pytest:
import sys
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
import run

import atexit
import os
import requests
import shutil
import yaml

print('Scylla is: ' + run.find_scylla() + '.')
success = True

ssl = '--ssl' in sys.argv
if ssl:
    cmd = run.run_scylla_ssl_cql_cmd
    check_cql = run.check_ssl_cql
else:
    cmd = run.run_scylla_cmd
    check_cql = run.check_cql

test_tempdir = run.pid_to_dir(os.getpid())
os.mkdir(test_tempdir)
s3_server_address = os.environ['S3_SERVER_ADDRESS_FOR_TEST']
s3_server_port = int(os.environ['S3_SERVER_PORT_FOR_TEST'])
s3_public_bucket = os.environ['S3_PUBLIC_BUCKET_FOR_TEST']

def get_tempdir(pid):
    return test_tempdir

with open(test_tempdir + '/object_storage.yaml', 'w') as config_file:
    yaml.dump({ 'endpoints': [
            {
                'name': s3_server_address,
                'port': s3_server_port,
            }
        ]
    }, config_file)

def run_scylla_cmd(pid, dir):
    global cmd
    (c, e) = cmd(pid, dir)
    c += ['--object-storage-config-file', test_tempdir + '/object_storage.yaml']
    return (c, e)

def teardown(pid):
    print('Kill scylla')
    sys.stdout.flush()
    log = run.abort_run_with_dir(pid, test_tempdir)
    shutil.copyfileobj(log, sys.stdout.buffer)

print(f'Start scylla (dir={test_tempdir}')
pid = run.run_with_generated_dir(run_scylla_cmd, get_tempdir)
atexit.register(lambda: teardown(pid))

ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [ lambda: check_cql(ip) ])

print(f'Create keyspace (minio listening at {s3_server_address})')
cluster = run.get_cql_cluster(ip)
conn = cluster.connect()
conn.execute("CREATE KEYSPACE test_ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': '1' } AND STORAGE = { 'type': 'S3', 'endpoint': '" + f'{s3_server_address}' + "', 'bucket': '" + f'{s3_public_bucket}' + "' };")
conn.execute("CREATE TABLE test_ks.test_cf ( name text primary key, value text );")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('0', 'zero');")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('1', 'one');")
conn.execute("INSERT INTO test_ks.test_cf ( name, value ) VALUES ('2', 'two');")
res = conn.execute("SELECT * FROM test_ks.test_cf;")

r = requests.post(f'http://{ip}:10000/storage_service/keyspace_flush/test_ks', timeout=60)
if r.status_code != 200:
    print(f'Error flushing keyspace: {r}')
    success = False

# Check that the ownership table is populated properly
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
    if not row.location.startswith(test_tempdir):
        print(f'Unexpected entry location in registry: {row.location}')
        success = False
    if row.status != 'sealed':
        print(f'Unexpected entry status in registry: {row.status}')
        success = False

cluster.shutdown()

print('Restart scylla')
pid = run.restart_with_dir(pid, run_scylla_cmd, test_tempdir)
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [ lambda: check_cql(ip) ])

cluster = run.get_cql_cluster(ip)
conn = cluster.connect()
res = conn.execute("SELECT * FROM test_ks.test_cf;")
want_res = { '0': 'zero', '1': 'one', '2': 'two' }
have_res = { x.name: x.value for x in res }
if want_res != have_res:
    print(f'Unexpected table content: {have_res}')
    success = False

print('Drop table')
conn.execute("DROP TABLE test_ks.test_cf;")
# Check that the ownership table is de-populated
res = conn.execute("SELECT * FROM system.sstables;")
for row in res:
    print(f'Unexpected entry in registry: {row.location} {row.status}')
    success = False

cluster.shutdown()

sys.exit(0 if success else 1)
