# Copyright 2022-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 ############################################################################# # Tests for the tools hosted by scylla ############################################################################# import contextlib import collections import datetime import glob import itertools import functools import json import math import os import pathlib import pytest import subprocess import tempfile import textwrap import random import re import shutil import uuid from . import nodetool from . import util import stat from typing import Iterable, Type, Union from cassandra.util import Duration import yaml from test.cluster.object_store.conftest import s3_server def simple_no_clustering_table(cql, keyspace): table = util.unique_name() schema = f"CREATE TABLE {keyspace}.{table} (pk int PRIMARY KEY, v int) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(schema) # Ensure at least 3 live rows for tests that depend on it live_rows_needed = 3 for pk in range(0, 10): # For the first 3 rows, always insert; for the rest, use randomness if pk < live_rows_needed: cql.execute(f"INSERT INTO {keyspace}.{table} (pk, v) VALUES ({pk}, 0)") else: x = random.randrange(0, 4) if x == 0: # partition tombstone cql.execute(f"DELETE FROM {keyspace}.{table} WHERE pk = {pk}") else: # live row cql.execute(f"INSERT INTO {keyspace}.{table} (pk, v) VALUES ({pk}, 0)") if pk == 5: nodetool.flush(cql, f"{keyspace}.{table}") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def simple_clustering_table(cql, keyspace): table = util.unique_name() schema = f"CREATE TABLE {keyspace}.{table} (pk1 int, pk2 int, ck1 int, ck2 int, v int, s int STATIC, PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(schema) for pk in range(0, 10): for ck in range(0, 10): x = random.randrange(0, 8) if x == 0: # ttl cql.execute(f"INSERT INTO {keyspace}.{table} (pk1, pk2, ck1, ck2, v) VALUES ({pk}, {pk}, {ck}, {ck}, 0) USING TTL 6000") elif x == 1: # row tombstone cql.execute(f"DELETE FROM {keyspace}.{table} WHERE pk1 = {pk} AND pk2 = {pk} AND ck1 = {ck} AND ck2 = {ck}") elif x == 2: # cell tombstone cql.execute(f"DELETE v FROM {keyspace}.{table} WHERE pk1 = {pk} AND pk2 = {pk} AND ck1 = {ck} AND ck2 = {ck}") elif x == 3: # range tombstone l = ck * 10 u = ck * 11 cql.execute(f"DELETE FROM {keyspace}.{table} WHERE pk1 = {pk} AND pk2 = {pk} AND ck1 > {l} AND ck1 < {u}") else: # live row cql.execute(f"INSERT INTO {keyspace}.{table} (pk1, pk2, ck1, ck2, v) VALUES ({pk}, {pk}, {ck}, {ck}, 0)") if pk == 5: cql.execute(f"UPDATE {keyspace}.{table} SET s = 10 WHERE pk1 = {pk} AND pk2 = {pk}") nodetool.flush(cql, f"{keyspace}.{table}") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def clustering_table_with_collection(cql, keyspace): table = util.unique_name() schema = f"CREATE TABLE {keyspace}.{table} (pk int, ck int, v1 map, v2 set, v3 list, PRIMARY KEY (pk, ck)) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(schema) for pk in range(0, 10): for ck in range(0, 10): map_vals = {f"{p}: '{c}'" for p in range(0, pk) for c in range(0, ck)} map_str = ", ".join(map_vals) set_list_vals = list(range(0, pk)) set_list_str = ", ".join(map(str, set_list_vals)) cql.execute(f"INSERT INTO {keyspace}.{table} (pk, ck, v1, v2, v3) VALUES ({pk}, {ck}, {{{map_str}}}, {{{set_list_str}}}, [{set_list_str}])") if pk == 5: nodetool.flush(cql, f"{keyspace}.{table}") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def clustering_table_with_udt(cql, keyspace): table = util.unique_name() create_type1_schema = f"CREATE TYPE IF NOT EXISTS {keyspace}.type1 (f1 int, f2 text)" create_type2_schema = f"CREATE TYPE IF NOT EXISTS {keyspace}.type2 (f1 int, f2 frozen)" create_table_schema = f" CREATE TABLE {keyspace}.{table} (pk int, ck int, v type2, PRIMARY KEY (pk, ck)) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(create_type1_schema) cql.execute(create_type2_schema) cql.execute(create_table_schema) for pk in range(0, 10): for ck in range(0, 10): cql.execute(f"INSERT INTO {keyspace}.{table} (pk, ck, v) VALUES ({pk}, {ck}, {{f1: 100, f2: {{f1: 1, f2: 'asd'}}}})") if pk == 5: nodetool.flush(cql, f"{keyspace}.{table}") nodetool.flush(cql, f"{keyspace}.{table}") return (table, f'{keyspace}.type2', f'{keyspace}.type1'), "; ".join((create_type1_schema, create_type2_schema, create_table_schema)) def table_with_counters(cql, keyspace): table = util.unique_name() schema = f"CREATE TABLE {keyspace}.{table} (pk int PRIMARY KEY, v counter) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(schema) for pk in range(0, 10): for c in range(0, 4): cql.execute(f"UPDATE {keyspace}.{table} SET v = v + 1 WHERE pk = {pk};") if pk == 5: nodetool.flush(cql, f"{keyspace}.{table}") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def get_sstables_for_table(data_dir, keyspace, table): def sstable_has_no_temporary_toc(sst): path, basename = os.path.split(sst) basename_components = basename.split("-") toc_basename = "-".join(basename_components[:-1] + ["TOC.txt"]) temporary_toc_basename = "-".join(basename_components[:-1] + ["TOC.txt.tmp"]) return os.path.exists(os.path.join(path, toc_basename)) and not os.path.exists(os.path.join(path, temporary_toc_basename)) return list(filter(sstable_has_no_temporary_toc, glob.glob(os.path.join(data_dir, keyspace, table + '-*', '*-Data.db')))) @contextlib.contextmanager def scylla_sstable(table_factory, cql, ks, data_dir, s3_server=None, copy_to_s3=False, everywhere=False): symbols, schema = table_factory(cql, ks) if type(symbols) is tuple: table = symbols[0] types = symbols[1:] else: table = symbols types = [] schema_file = os.path.join(data_dir, "..", "test_tools_schema.cql") with open(schema_file, "w") as f: f.write(schema) sstables = get_sstables_for_table(data_dir, ks, table) if copy_to_s3: sstable_path = os.path.dirname(sstables[0]) if everywhere: sstables += upload_folder_to_s3(sstable_path, s3_server) else: sstables = upload_folder_to_s3(sstable_path, s3_server) try: yield (table, schema_file, sstables) finally: cql.execute(f"DROP TABLE {ks}.{table}") for t in types: cql.execute(f"DROP TYPE {t}") os.unlink(schema_file) def one_sstable(sstables): assert len(sstables) > 1 return [sstables[0]] def all_sstables(sstables): assert len(sstables) > 1 return sstables def upload_folder_to_s3(folder_path, s3_server): s3_resource = s3_server.get_resource() bucket = s3_resource.Bucket(s3_server.bucket_name) prefix = "just/some/s3/prefix" sstables = [] for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) s3_key = os.path.join(prefix, os.path.relpath(file_path, folder_path)) if file.endswith('-Data.db'): sstables.append(f"s3://{s3_server.bucket_name}/{s3_key}") bucket.upload_file(file_path, s3_key) return sstables @pytest.mark.parametrize("what", ["index", "compression-info", "summary", "statistics", "scylla-metadata"]) @pytest.mark.parametrize("where", ["s3", "mixed"]) def test_scylla_sstable_dump_component_with_s3(skip_s3_tests, cql, test_keyspace, scylla_path, scylla_data_dir, scylla_home_dir, what, where, s3_server): objconf = s3_server.create_endpoint_conf() scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") with open(scylla_yaml_file, "a") as f: f.write(f"\n{yaml.dump({'object_storage_endpoints': objconf})}") with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir, s3_server, False if where == "local" else True, True if where == "mixed" else False) as ( _, schema_file, sstables): out = subprocess.check_output( [scylla_path, "sstable", f"dump-{what}", "--scylla-yaml-file", scylla_yaml_file, "--schema-file", schema_file] + all_sstables(sstables)) print(out) assert out assert json.loads(out) @pytest.mark.parametrize("where", ["s3", "mixed"]) def test_scylla_sstable_dump_data_with_s3(skip_s3_tests, cql, test_keyspace, scylla_path, scylla_data_dir, scylla_home_dir, where, s3_server): objconf = s3_server.create_endpoint_conf() scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") with open(scylla_yaml_file, "a") as f: f.write(f"\n{yaml.dump({'object_storage_endpoints': objconf})}") with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir, s3_server, False if where == "local" else True, True if where == "mixed" else False) as ( _, schema_file, sstables): args = [scylla_path, "sstable", "dump-data", "--scylla-yaml-file", scylla_yaml_file, "--schema-file", schema_file, "--output-format", "json"] out = subprocess.check_output(args + sstables) print(out) assert out assert json.loads(out) @pytest.mark.parametrize("what", ["index", "compression-info", "summary", "statistics", "scylla-metadata"]) @pytest.mark.parametrize("which_sstables", [one_sstable, all_sstables]) def test_scylla_sstable_dump_component(cql, test_keyspace, scylla_path, scylla_data_dir, what, which_sstables): with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): out = subprocess.check_output([scylla_path, "sstable", f"dump-{what}", "--schema-file", schema_file] + which_sstables(sstables)) print(out) assert out json_out = json.loads(out) assert json_out if what == "scylla-metadata": assert "sstables" in json_out, f"Expected 'sstables' in json output: {json_out}" for sst_name, sst_metadata in json_out["sstables"].items(): assert "sharding" in sst_metadata, f"Expected 'sharding' metadata in sstable scylla-metadata: sstable={sst_name}: {sst_metadata}" assert sst_metadata["sharding"] != [], f"Expected non-empty sharding metadata in sstable scylla-metadata: sstable={sst_name}: {sst_metadata}" @pytest.mark.parametrize("table_factory", [ simple_no_clustering_table, simple_clustering_table, clustering_table_with_collection, clustering_table_with_udt, table_with_counters, ]) @pytest.mark.parametrize("merge", [True, False]) @pytest.mark.parametrize("output_format", ["text", "json"]) @pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) def test_scylla_sstable_dump_data(request, cql, test_keyspace, scylla_path, scylla_data_dir, table_factory, merge, output_format): with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): args = [scylla_path, "sstable", "dump-data", "--schema-file", schema_file, "--output-format", output_format] if merge: args.append("--merge") out = subprocess.check_output(args + sstables) print(out) assert out if output_format == "json": assert json.loads(out) class deletion_time: def __init__(self, dt): self._dt = dt def __eq__(self, other): dt_format = "%Y-%m-%d %H:%M:%Sz" dt1 = datetime.datetime.strptime(self._dt, dt_format) dt2 = datetime.datetime.strptime(other._dt, dt_format) delta = abs((dt1 - dt2).total_seconds()) return delta <= 10 # Need slack for debug builds def __str__(self): return str(self._dt) def __repr__(self): return f"deletion_time({self._dt})" def test_scylla_sstable_write_cql_query_file_reader(cql, test_keyspace, scylla_path): with tempfile.TemporaryDirectory() as tmp_dir: schema_file = os.path.join(tmp_dir, 'schema.cql') with open(schema_file, 'w') as f: f.write(f"CREATE TABLE ks.tbl (pk int PRIMARY KEY)") input_file = os.path.join(tmp_dir, 'input.cql') with open(input_file, 'w') as f: f.write(" INSERT INTO scylla_sstable.tbl (pk) VALUES (0);\nINSERT INTO scylla_sstable.tbl (pk) VALUES (1);INSERT INTO scylla_sstable.tbl (pk) VALUES (2);\n; ;;INSERT INTO scylla_sstable.tbl (pk) VALUES (3)") generation = subprocess.check_output([scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-format", "cql", "--input-file", input_file, "--output-dir", tmp_dir, "--logger-log-level", "scylla-sstable=trace"], text=True).strip() sstable_files = glob.glob(os.path.join(tmp_dir, f"*-{generation}-big-Data.db")) assert len(sstable_files) == 1 sstable_file = sstable_files[0] result = json.loads(subprocess.check_output([scylla_path, "sstable", "query", "--schema-file", schema_file, "--output-format", "json", sstable_file], text=True)) keys = {row['pk'] for row in result} assert keys == {0, 1, 2, 3} def test_scylla_sstable_write_cql(cql, test_keyspace, scylla_path): col_defs = "pk int, ck int, v int, PRIMARY KEY (pk, ck)" with util.new_test_table(cql, test_keyspace, col_defs) as table, tempfile.TemporaryDirectory() as tmp_dir: keyspace_name, table_name = table.split(".") write_query_templates = [ "INSERT INTO {} (pk, ck, v) VALUES (0, 0, 40) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (0, 1, 30) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (1, 0, 20) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (0, 1, 10) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (2, 0, 50) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (2, 1, 50) USING TIMESTAMP 100", "INSERT INTO {} (pk, ck, v) VALUES (3, 0, 50) USING TIMESTAMP 100", "UPDATE {} USING TIMESTAMP 200 SET v = 180 WHERE pk = 0 AND ck = 1", "UPDATE {} USING TIMESTAMP 200 SET v = 180 WHERE pk = 0 AND ck = 2", "DELETE v FROM {} USING TIMESTAMP 300 WHERE pk = 1 AND ck = 1", "DELETE v FROM {} USING TIMESTAMP 300 WHERE pk = 1 AND ck = 2", "DELETE FROM {} USING TIMESTAMP 300 WHERE pk = 2 AND ck = 1", "DELETE FROM {} USING TIMESTAMP 300 WHERE pk = 3", "DELETE FROM {} USING TIMESTAMP 300 WHERE pk = 4", ] # Don't select mutation_source, it will differ between the two selects. fragments_query_template = "SELECT pk, partition_region, ck, position_weight, mutation_fragment_kind, metadata, value FROM MUTATION_FRAGMENTS({})" for write_query_template in write_query_templates: cql.execute(write_query_template.format(table)) # FIXME cannot combine JSON and MUTATION_FRAGMENTS original_json = [row._asdict() for row in cql.execute(fragments_query_template.format(table))] input_file = os.path.join(tmp_dir, 'input.cql') schema_file = os.path.join(tmp_dir, 'schema.cql') with open(input_file, 'w') as f: for write_query_template in write_query_templates: f.write(write_query_template.format(f"scylla_sstable.{table_name}") + ";\n") with open(schema_file, 'w') as f: f.write(f"CREATE TABLE {table} ({col_defs})") generation = subprocess.check_output([scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-format", "cql", "--input-file", input_file, "--output-dir", tmp_dir, "--logger-log-level", "scylla-sstable=trace"], text=True).strip() sstable_files = glob.glob(os.path.join(tmp_dir, f"*-{generation}-big-Data.db")) assert len(sstable_files) == 1 sstable_file = sstable_files[0] query_file = os.path.join(tmp_dir, 'query.cql') with open(query_file, 'w') as f: f.write(fragments_query_template.format(f"scylla_sstable.{table_name}")) actual_json = json.loads(subprocess.check_output([scylla_path, "sstable", "query", "--schema-file", schema_file, "--output-format", "json", "--query-file", query_file, "--logger-log-level", "scylla-sstable=trace", sstable_file], text=True)) def wrap_deletion_time(json_rows): for row in json_rows: if row['metadata'] is None: continue metadata = json.loads(row['metadata']) if 'row_marker' in metadata and 'deletion_time' in metadata['row_marker']: metadata['row_marker']['deletion_time'] = deletion_time(metadata['row_marker']['deletion_time']) if 'tombstone' in row['metadata'] and 'deletion_time' in metadata['tombstone']: metadata['tombstone']['deletion_time'] = deletion_time(metadata['tombstone']['deletion_time']) if 'shadowable_tombstone' in row['metadata'] and 'deletion_time' in metadata['shadowable_tombstone']: metadata['shadowable_tombstone']['deletion_time'] = deletion_time(metadata['shadowable_tombstone']['deletion_time']) if 'columns' in metadata: for name, col in metadata['columns'].items(): if 'deletion_time' in col: col['deletion_time'] = deletion_time(col['deletion_time']) row['metadata'] = metadata return json_rows # deletion time is wall-clock, allow for a few seconds of difference assert wrap_deletion_time(actual_json) == wrap_deletion_time(original_json) def test_scylla_sstable_write_cql_large_input(scylla_path): keyspace_name = "ks" table_name = "tbl" table_definition = f"CREATE TABLE {keyspace_name}.{table_name} (pk int PRIMARY KEY, v text);" with tempfile.TemporaryDirectory() as tmp_dir: schema_file = os.path.join(tmp_dir, 'schema.cql') with open(schema_file, 'w') as f: f.write(table_definition) f.flush() input_file = os.path.join(tmp_dir, 'input.cql') expected_json = [] value_size = 1 << 17 total_size = 1 << 20 value_count = math.ceil(total_size / value_size) value = 'a' * value_size with open(input_file, 'w') as f: for i in range(0, value_count): expected_json.append({'pk': i, 'v': value}) f.write(f"INSERT INTO scylla_sstable.{table_name} (pk, v) VALUES ({i}, '{value}');\n") # memtable grows in segment_size increments (128KiB) memory_limit = 1 << 17 out = subprocess.check_output([ scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-format", "cql", "--input-file", input_file, "--memory-limit", str(memory_limit), "--output-dir", tmp_dir, '--logger-log-level', 'scylla-sstable=trace'], text=True) generations = out.strip().split("\n") assert len(generations) == math.ceil(total_size / memory_limit) sstable_files = glob.glob(os.path.join(tmp_dir, f"??-*-???-Data.db")) assert(len(sstable_files) == len(generations)) actual_json = json.loads(subprocess.check_output([ scylla_path, "sstable", "query", "--schema-file", schema_file, "--output-format", "json"] + sstable_files)) assert sorted(actual_json, key=lambda x: x['pk']) == expected_json def test_scylla_sstable_write_validation(cql, scylla_path): """Check that invalid queries are rejected.""" keyspace_name = "ks" table_name = "tbl" table_definition = f"CREATE TABLE {keyspace_name}.{table_name} (pk int PRIMARY KEY, v text);" with tempfile.TemporaryDirectory() as tmp_dir: schema_file = os.path.join(tmp_dir, 'schema.cql') with open(schema_file, 'w') as f: f.write(table_definition) f.flush() common_params = [scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-format", "cql"] def check(bad_query, expected_error): input_file = os.path.join(tmp_dir, 'input.cql') with open(input_file, 'w') as f: f.write(bad_query + "\n") f.flush() res = subprocess.run(common_params + ["--input-file", input_file], text=True, capture_output=True) assert res.returncode == 1 assert res.stdout == "" assert "error processing arguments: " + expected_error in res.stderr check(f"INSERT INTO scylla_sstable.{table_name} (pk) VALUES,", "failed to parse query: exceptions::syntax_exception") check(f"INSERT INTO {table_name} (pk) VALUES (0);", "query must have keyspace and the keyspace has to be scylla_sstable") check(f"INSERT INTO foo.{table_name} (pk) VALUES (0);", "query must be against scylla_sstable keyspace, got foo instead") check(f"INSERT INTO {keyspace_name}.{table_name} (pk) VALUES (0);", f"query must be against scylla_sstable keyspace, got {keyspace_name} instead") check(f"INSERT INTO scylla_sstable.foo (pk) VALUES (0);", f"query must be against {table_name} table, got foo instead") check(f"SELECT * FROM scylla_sstable.{table_name}", "query must be an insert, update or delete query") def test_scylla_sstable_write_temp_dir(cql, scylla_path, scylla_data_dir): """Check that TEMPDIR environment variable is respected. This is very hard to test with a positive test, because cql_test_env removes its temp-dir on exit. So we test with a negative test: give an impossible path and check that creating the temp-dir fails. """ with tempfile.TemporaryDirectory() as tmp_dir: schema_file = os.path.join(tmp_dir, 'schema.cql') with open(schema_file, 'w') as f: f.write(f"CREATE TABLE ks.tbl (pk int PRIMARY KEY)") input_file = os.path.join(tmp_dir, 'input.cql') with open(input_file, 'w') as f: f.write("INSERT INTO scylla_sstable.tbl (pk) VALUES (0)") with tempfile.NamedTemporaryFile("r") as f: args = [scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-file", input_file, "--input-format", "cql", "--output-dir", tmp_dir] res = subprocess.run(args, text=True, capture_output=True, env={'TEMPDIR': f.name}) assert res.returncode == 2 assert res.stdout == "" assert res.stderr.endswith(f"error running operation: std::filesystem::__cxx11::filesystem_error (error generic:20, filesystem error: temp_directory_path: Not a directory [{f.name}])\n") @pytest.mark.parametrize("table_factory", [ simple_no_clustering_table, simple_clustering_table, ]) def test_scylla_sstable_write_json(cql, test_keyspace, scylla_path, scylla_data_dir, table_factory): with scylla_sstable(table_factory, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): with tempfile.TemporaryDirectory() as tmp_dir: dump_common_args = [scylla_path, "sstable", "dump-data", "--schema-file", schema_file, "--output-format", "json", "--merge"] original_out = subprocess.check_output(dump_common_args + sstables) original_json = json.loads(original_out)["sstables"]["anonymous"] input_file = os.path.join(tmp_dir, 'input.json') with open(input_file, 'w') as f: json.dump(original_json, f) subprocess.check_call([scylla_path, "sstable", "write", "--schema-file", schema_file, "--input-format", "json", "--input-file", input_file, "--output-dir", tmp_dir, '--logger-log-level', 'scylla-sstable=trace']) sstable_files = glob.glob(os.path.join(tmp_dir, f"??-*-???-Data.db")) assert len(sstable_files) == 1 sstable_file = sstable_files[0] actual_out = subprocess.check_output(dump_common_args + [sstable_file]) actual_json = json.loads(actual_out)["sstables"]["anonymous"] assert actual_json == original_json def script_consume_test_table_factory(cql, keyspace): table = util.unique_name() schema = f"CREATE TABLE {keyspace}.{table} (pk int, ck int, v int, s int STATIC, PRIMARY KEY (pk, ck)) WITH compaction = {{'class': 'NullCompactionStrategy'}}" cql.execute(schema) partitions = 4 for sst in range(0, 2): for pk in range(sst * partitions, (sst + 1) * partitions): # static row cql.execute(f"UPDATE {keyspace}.{table} SET s = 10 WHERE pk = {pk}") # range tombstone cql.execute(f"DELETE FROM {keyspace}.{table} WHERE pk = {pk} AND ck >= 0 AND ck <= 4") # 2 rows for ck in range(0, 4): cql.execute(f"INSERT INTO {keyspace}.{table} (pk, ck, v) VALUES ({pk}, {ck}, 0)") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def test_scylla_sstable_script_consume_sstable(cql, test_keyspace, scylla_path, scylla_data_dir): script_file = os.path.join(scylla_data_dir, "..", "test_scylla_sstable_script_consume_sstable.lua") script = """ wr = Scylla.new_json_writer() i = 0 function arg(args, arg) ret = nil wr:key(arg) if args[arg] then ret = tonumber(args[arg]) wr:int(ret) else wr:null() end return ret end function basename(path) s, e = string.find(string.reverse(path), '/', 1, true) return string.sub(path, #path - s + 2) end function consume_stream_start(args) wr:start_object() start_sst = arg(args, "start_sst") end_sst = arg(args, "end_sst") wr:key("content") wr:start_array() end function consume_sstable_start(sst) wrote_ps = false i = i + 1 if i == start_sst then return false end wr:string(basename(sst.filename)) end function consume_partition_start(ps) if not wrote_ps then wr:string("ps") wrote_ps = true end end function consume_sstable_end() if i == end_sst then return false end end function consume_stream_end() wr:end_array() wr:end_object() end """ with open(script_file, 'w') as f: f.write(script) with scylla_sstable(script_consume_test_table_factory, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): sst1 = os.path.basename(sstables[0]) sst2 = os.path.basename(sstables[1]) def run_scenario(script_args, expected): print(f"Scenario: '{script_args}'\n") if script_args: script_args = ["--script-arg", script_args] else: script_args = [] script_args = [scylla_path, "sstable", "script", "--schema-file", schema_file, "--script-file", script_file] + script_args + sstables[0:2] res = json.loads(subprocess.check_output(script_args)) assert res == expected run_scenario("", {'start_sst': None, 'end_sst': None, 'content': [sst1, "ps", sst2, "ps"]}) run_scenario("start_sst=1", {'start_sst': 1, 'end_sst': None, 'content': [sst2, "ps"]}) run_scenario("start_sst=2", {'start_sst': 2, 'end_sst': None, 'content': [sst1, "ps"]}) run_scenario("start_sst=1:end_sst=1", {'start_sst': 1, 'end_sst': 1, 'content': []}) run_scenario("start_sst=2:end_sst=2", {'start_sst': 2, 'end_sst': 2, 'content': [sst1, "ps"]}) run_scenario("end_sst=1", {'start_sst': None, 'end_sst': 1, 'content': [sst1, "ps"]}) run_scenario("end_sst=2", {'start_sst': None, 'end_sst': 2, 'content': [sst1, "ps", sst2, "ps"]}) def test_scylla_sstable_script_slice(cql, test_keyspace, scylla_path, scylla_data_dir): class bound: @staticmethod def unpack_value(value): if isinstance(value, tuple): return value else: return None, value def __init__(self, value, weight): self.token, self.value = self.unpack_value(value) self.weight = weight def tri_cmp(self, value): if self.token is None and self.value is None: assert(self.weight) return -self.weight token, value = self.unpack_value(value) if token is None: res = 0 else: res = int(token) - int(self.token) if res == 0 and not value is None and not self.value is None : res = int(value) - int(self.value) return res if res else -self.weight def get_value(self, lookup_table, is_start): if self.token is None and self.value is None: return '-inf' if is_start else '+inf' if self.value is None: return "t{}".format(int(self.token)) return lookup_table[self.value] @staticmethod def before(value): return bound(value, -1) @staticmethod def at(value): return bound(value, 0) @staticmethod def after(value): return bound(value, 1) class interval: def __init__(self, start_bound, end_bound): self.start = start_bound self.end = end_bound def contains(self, value): return self.start.tri_cmp(value) >= 0 and self.end.tri_cmp(value) <= 0 def summarize_dump(dump): summary = [] for partition in list(dump["sstables"].items())[0][1]: partition_summary = {"pk": partition["key"]["value"], "token": partition["key"]["token"], "frags": []} if "static_row" in partition: partition_summary["frags"].append(("sr", None)) for clustering_fragment in partition.get("clustering_elements", []): type_str = "cr" if clustering_fragment["type"] == "clustering-row" else "rtc" partition_summary["frags"].append((type_str, clustering_fragment["key"]["value"])) summary.append(partition_summary) return summary def filter_summary(summary, partition_ranges, clustering_ranges): if not partition_ranges: return summary filtered_summary = [] for partition in summary: if any(map(lambda x: interval.contains(x, (partition["token"], partition["pk"])), partition_ranges)): filtered_summary.append({"pk": partition["pk"], "token": partition["token"], "frags": []}) for (t, k) in partition["frags"]: if t == "rtc" or k is None or not clustering_ranges or any(map(lambda x: interval.contains(x, k), clustering_ranges)): filtered_summary[-1]["frags"].append((t, k)) return filtered_summary def serialize_ranges(prefix, ranges, lookup_table): serialized_ranges = [] i = 0 for r in ranges: s = r.start.get_value(lookup_table, True) e = r.end.get_value(lookup_table, False) serialized_ranges.append("{}{}={}{},{}{}".format( prefix, i, "(" if s == '-inf' or r.start.weight > 0 else "[", s, e, ")" if e == '+inf' or r.end.weight < 0 else "]")) i = i + 1 return serialized_ranges scripts_path = os.path.realpath(os.path.join(__file__, '../../../tools/scylla-sstable-scripts')) script_file = os.path.join(scripts_path, 'slice.lua') with scylla_sstable(script_consume_test_table_factory, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): reference_summary = summarize_dump(json.loads(subprocess.check_output([scylla_path, "sstable", "dump-data", "--schema-file", schema_file, "--merge"] + sstables))) # same order as in dump pks = [(p["token"], p["pk"]) for p in reference_summary] cks = set() for p in reference_summary: for t, ck in p["frags"]: if not ck is None: cks.add(ck) cks = sorted(list(cks)) serialized_pk_lookup = {pk: subprocess.check_output([scylla_path, "types", "serialize", "--full-compound", "-t", "Int32Type", "--", pk]).strip().decode() for t, pk in pks} serialized_ck_lookup = {ck: subprocess.check_output([scylla_path, "types", "serialize", "--prefix-compound", "-t", "Int32Type", "--", ck]).strip().decode() for ck in cks} script_common_args = [scylla_path, "sstable", "script", "--schema-file", schema_file, "--merge", "--script-file", script_file] def run_scenario(scenario, partition_ranges, clustering_ranges): print(f"running scenario {scenario}") script_args = serialize_ranges("pr", partition_ranges, serialized_pk_lookup) + serialize_ranges("cr", clustering_ranges, serialized_ck_lookup) if script_args: script_args = ["--script-arg"] + [":".join(script_args)] print(f"script_args={script_args}") expected = filter_summary(reference_summary, partition_ranges, clustering_ranges) out = subprocess.check_output(script_common_args + script_args + sstables) summary = summarize_dump(json.loads(out)) assert summary == expected run_scenario("no args", [], []) run_scenario("full range", [interval(bound.before(None), bound.after(None))], []) run_scenario("(pks[0], +inf)", [interval(bound.after(pks[0]), bound.after(None))], []) run_scenario("(-inf, pks[-3]]", [interval(bound.before(None), bound.after(pks[-3]))], []) run_scenario("[pks[2], pks[-2]]", [interval(bound.before(pks[2]), bound.after(pks[-2]))], []) run_scenario("[pks[0], pks[1]], [pks[2], pks[3]]", [interval(bound.before(pks[1]), bound.after(pks[2])), interval(bound.before(pks[3]), bound.after(pks[4]))], []) run_scenario("[t:pks[2], t:pks[-2]]", [interval(bound.before((pks[2][0], None)), bound.after((pks[-2][0], None)))], []) run_scenario("full pk range | [-inf, cks[2]]", [interval(bound.before(None), bound.after(None))], [interval(bound.before(None), bound.after(cks[2]))]) run_scenario("[pks[0], pks[1]] | (cks[0], cks[1]], (cks[2], +inf)", [interval(bound.before(pks[1]), bound.after(pks[2]))], [interval(bound.after(cks[0]), bound.after(cks[1])), interval(bound.after(cks[2]), bound.after(None))]) @pytest.mark.parametrize("table_factory", [ simple_no_clustering_table, simple_clustering_table, clustering_table_with_collection, clustering_table_with_udt, table_with_counters, ]) def test_scylla_sstable_script(cql, request, test_keyspace, scylla_path, scylla_data_dir, table_factory): scripts_path = os.path.realpath(os.path.join(__file__, '../../../tools/scylla-sstable-scripts')) slice_script_path = os.path.join(scripts_path, 'slice.lua') dump_script_path = os.path.join(scripts_path, 'dump.lua') with scylla_sstable(table_factory, cql, test_keyspace, scylla_data_dir) as (_,schema_file, sstables): dump_common_args = [scylla_path, "sstable", "dump-data", "--schema-file", schema_file, "--output-format", "json"] script_common_args = [scylla_path, "sstable", "script", "--schema-file", schema_file] # without --merge cxx_json = json.loads(subprocess.check_output(dump_common_args + sstables)) dump_lua_json = json.loads(subprocess.check_output(script_common_args + ["--script-file", dump_script_path] + sstables)) slice_lua_json = json.loads(subprocess.check_output(script_common_args + ["--script-file", slice_script_path] + sstables)) assert dump_lua_json == cxx_json assert slice_lua_json == cxx_json # with --merge cxx_json = json.loads(subprocess.check_output(dump_common_args + ["--merge"] + sstables)) dump_lua_json = json.loads(subprocess.check_output(script_common_args + ["--merge", "--script-file", dump_script_path] + sstables)) slice_lua_json = json.loads(subprocess.check_output(script_common_args + ["--merge", "--script-file", slice_script_path] + sstables)) assert dump_lua_json == cxx_json assert slice_lua_json == cxx_json class TestScyllaSsstableSchemaLoadingBase: def check(self, scylla_path, extra_args, sstable, dump_reference, cwd=None, env=None): dump_common_args = [scylla_path, "sstable", "dump-data", "--output-format", "json", "--logger-log-level", "scylla-sstable=debug:schema_loader=trace"] dump = json.loads(subprocess.check_output(dump_common_args + extra_args + [sstable], cwd=cwd, env=env))["sstables"] dump = list(dump.values())[0] assert dump == dump_reference def check_fail(self, scylla_path, extra_args, sstable, error_msg=None, cwd=None): common_args = [scylla_path, "sstable", "dump-data", "--logger-log-level", "scylla-sstable=debug:schema_loader=trace"] res = subprocess.run(common_args + extra_args + [sstable], capture_output=True, text=True, cwd=cwd, env={}) print(res.stderr) if error_msg is None: error_msg = "Failed to autodetect and load schema, try again with --logger-log-level scylla-sstable=debug to learn more or provide the schema source manually" assert res.stderr.split('\n')[-2] == error_msg assert res.returncode != 0 def copy_sstable_to_external_dir(self, system_scylla_local_sstable_prepared, temp_workdir): table_data_dir, sstable_filename = os.path.split(system_scylla_local_sstable_prepared) sstable_glob = "-".join(sstable_filename.split("-")[:-1]) + "*" sstable_components = glob.glob(os.path.join(table_data_dir, sstable_glob)) for c in sstable_components: shutil.copy(c, temp_workdir) return glob.glob(os.path.join(temp_workdir, "*-Data.db"))[0] @contextlib.contextmanager def _prepare_sstable(cql, scylla_data_dir, table, write_fun=None): """ Prepares the table for the needs of the schema loading tests. Namely: * Disable auto-compaction for the system-schema keyspace and table's keyspace. * Flushes said keyspaces. * Locates an sstable belonging to the table and returns it. """ keyspace_name, table_name = table.split(".") with nodetool.no_autocompaction_context(cql, keyspace_name, "system_schema"): if write_fun is not None: write_fun() # Need to flush system keyspaces whose sstables we want to meddle # with, to make sure they are actually on disk. nodetool.flush_keyspace(cql, "system_schema") nodetool.flush_keyspace(cql, keyspace_name) sstables = glob.glob(os.path.join(scylla_data_dir, keyspace_name, table_name + "-*", "*-Data.db")) yield sstables[0] @pytest.fixture(scope="class") def system_scylla_local_sstable_prepared(cql, scylla_data_dir): with _prepare_sstable(cql, scylla_data_dir, "system.scylla_local") as sst: yield sst @pytest.fixture(scope="class") def system_scylla_local_schema_file(): """ Prepares a schema.cql with the schema of system.scylla_local. """ with tempfile.NamedTemporaryFile("w+t") as f: f.write("CREATE TABLE system.scylla_local (key text PRIMARY KEY, value text)") f.flush() yield f.name @pytest.fixture(scope="class") def scylla_home_dir(scylla_data_dir): """ Create a temporary directory structure to be used as SCYLLA_HOME. The top-level directory contains a conf dir, which contains a scylla.yaml, which has the workdir configuration item set. The top level directory can be used as SCYLLA_HOME, while the conf dir can be used as SCYLLA_CONF environment variables respectively, allowing scylla sstable tool to locate the work-directory of the node. """ with tempfile.TemporaryDirectory() as scylla_home: conf_dir = os.path.join(scylla_home, "conf") os.mkdir(conf_dir) scylla_yaml_file = os.path.join(conf_dir, "scylla.yaml") with open(scylla_yaml_file, "w") as f: f.write(f"workdir: {os.path.split(scylla_data_dir)[0]}") yield scylla_home def _produce_reference_dump(scylla_path, schema_args, sstable): """ Produce a json dump, to be used as a reference, of the specified sstable. """ dump_reference = subprocess.check_output([ scylla_path, "sstable", "dump-data", "--output-format", "json", "--logger-log-level", "scylla-sstable=debug", ] + schema_args + [sstable]) dump_reference = json.loads(dump_reference)["sstables"] return list(dump_reference.values())[0] @pytest.fixture(scope="class") def system_scylla_local_reference_dump(scylla_path, system_scylla_local_sstable_prepared): return _produce_reference_dump(scylla_path, ["--system-schema", "--keyspace", "system", "--table", "scylla_local"], system_scylla_local_sstable_prepared) class TestScyllaSsstableSchemaLoading(TestScyllaSsstableSchemaLoadingBase): """ Test class containing all the schema loader tests. Helps in providing a natural scope of all the specialized fixtures shared by these tests. """ keyspace = "system" table = "scylla_local" def test_table_dir_system_schema(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump): self.check( scylla_path, ["--system-schema", "--keyspace", self.keyspace, "--table", self.table], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_system_schema_deduced_keyspace_table(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump): self.check( scylla_path, ["--system-schema"], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_schema_file(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, system_scylla_local_schema_file): self.check( scylla_path, ["--schema-file", system_scylla_local_schema_file], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_data_dir(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, scylla_data_dir): self.check( scylla_path, ["--schema-tables", "--scylla-data-dir", scylla_data_dir, "--keyspace", self.keyspace, "--table", self.table], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_data_dir_deduced_keyspace_table(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, scylla_data_dir): self.check( scylla_path, ["--schema-tables", "--scylla-data-dir", scylla_data_dir], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_scylla_yaml(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, scylla_home_dir): scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") self.check( scylla_path, ["--schema-tables", "--scylla-yaml-file", scylla_yaml_file, "--keyspace", self.keyspace, "--table", self.table], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_table_dir_scylla_yaml_deduced_keyspace_table(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, scylla_home_dir): scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") self.check( scylla_path, ["--schema-tables", "--scylla-yaml-file", scylla_yaml_file], system_scylla_local_sstable_prepared, system_scylla_local_reference_dump) def test_external_dir_system_schema(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) self.check( scylla_path, ["--system-schema", "--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump) def test_external_dir_schema_file(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, system_scylla_local_schema_file): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) self.check( scylla_path, ["--schema-file", system_scylla_local_schema_file], ext_sstable, system_scylla_local_reference_dump) def test_external_dir_data_dir(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, scylla_data_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) self.check( scylla_path, ["--schema-tables", "--scylla-data-dir", scylla_data_dir, "--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump) def test_external_dir_scylla_yaml(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") self.check( scylla_path, ["--schema-tables", "--scylla-yaml-file", scylla_yaml_file, "--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump) def test_external_dir_sstable_serialization_header_keyspace_table(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) # It is important to use a controlled workdir, so scylla-sstable doesn't accidentally pick up a scylla.yaml. self.check( scylla_path, ["--sstable-schema", "--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump, cwd=temp_workdir) def test_external_dir_autodetect_schema_file(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, system_scylla_local_schema_file): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) shutil.copy(system_scylla_local_schema_file, os.path.join(temp_workdir, "schema.cql")) self.check( scylla_path, [], ext_sstable, system_scylla_local_reference_dump, cwd=temp_workdir) def test_external_dir_autodetect_conf_dir(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) self.check( scylla_path, ["--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump, cwd=scylla_home_dir) def test_external_dir_autodetect_conf_dir_conf_env(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) conf_dir = os.path.join(scylla_home_dir, "conf") self.check( scylla_path, ["--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump, env={"SCYLLA_CONF": conf_dir}) def test_external_dir_autodetect_conf_dir_home_env(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) self.check( scylla_path, ["--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump, env={"SCYLLA_HOME": scylla_home_dir}) def test_external_dir_autodetect_sstable_serialization_header_keyspace_table(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) # It is important to use a controlled workdir, so scylla-sstable doesn't accidentally pick up a scylla.yaml. self.check( scylla_path, ["--keyspace", self.keyspace, "--table", self.table], ext_sstable, system_scylla_local_reference_dump, cwd=temp_workdir) def test_external_dir_autodetect_sstable_serialization_header(self, scylla_path, system_scylla_local_sstable_prepared, system_scylla_local_reference_dump, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) # It is important to use a controlled workdir, so scylla-sstable doesn't accidentally pick up a scylla.yaml. self.check( scylla_path, [], ext_sstable, system_scylla_local_reference_dump, cwd=temp_workdir) def test_fail_nonexistent_keyspace(self, scylla_path, system_scylla_local_sstable_prepared, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") self.check_fail( scylla_path, ["--schema-tables", "--scylla-yaml-file", scylla_yaml_file, "--keyspace", "non-existent-keyspace", "--table", self.table], ext_sstable, error_msg="error processing arguments: could not load schema via schema-tables: std::runtime_error (Failed to find non-existent-keyspace.scylla_local in schema tables)") def test_fail_nonexistent_table(self, scylla_path, system_scylla_local_sstable_prepared, temp_workdir, scylla_home_dir): ext_sstable = self.copy_sstable_to_external_dir(system_scylla_local_sstable_prepared, temp_workdir) scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") self.check_fail( scylla_path, ["--schema-tables", "--scylla-yaml-file", scylla_yaml_file, "--keyspace", self.keyspace, "--table", "non-existent-table"], ext_sstable, error_msg="error processing arguments: could not load schema via schema-tables: std::runtime_error (Failed to find system.non-existent-table in schema tables)") @pytest.fixture(scope="class") def schema_test_base_table(cql, test_keyspace): with util.new_test_table(cql, test_keyspace, "pk int, v1 text, v2 text, PRIMARY KEY (pk)") as table: yield table @pytest.fixture(scope="class") def schema_test_mv(cql, schema_test_base_table): with util.new_materialized_view(cql, schema_test_base_table, '*', 'v1, pk', 'v1 is not null and pk is not null') as mv: yield mv @pytest.fixture(scope="class") def schema_test_si(cql, schema_test_base_table): keyspace, base_table = schema_test_base_table.split(".") si_name = f"{base_table}_by_v2" with util.new_secondary_index(cql, schema_test_base_table, "v2", name=si_name) as si: yield si + "_index" @pytest.fixture(scope="class") def schema_test_mv_sstable_prepared(cql, test_keyspace, schema_test_base_table, schema_test_mv, scylla_data_dir): def write(): cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (0, 'v1-0', 'v2-0')") cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (1, 'v1-1', 'v2-1')") cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (2, 'v1-1', 'v2-2')") with _prepare_sstable(cql, scylla_data_dir, schema_test_mv, write) as sst: yield sst @pytest.fixture(scope="class") def schema_test_si_sstable_prepared(cql, test_keyspace, schema_test_base_table, schema_test_si, scylla_data_dir): def write(): cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (0, 'v1-0', 'v2-0')") cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (1, 'v1-1', 'v2-1')") cql.execute(f"INSERT INTO {schema_test_base_table} (pk, v1, v2) VALUES (2, 'v1-1', 'v2-2')") with _prepare_sstable(cql, scylla_data_dir, schema_test_si, write) as sst: yield sst @pytest.fixture(scope="class") def schema_test_mv_schema_file(schema_test_base_table, schema_test_mv): """ Prepares a schema.cql with the schema of the view, matching that in the `mv_sstable_prepared` fixture. """ with tempfile.NamedTemporaryFile("w+t") as f: f.write(f"CREATE TABLE {schema_test_base_table} (pk int, v1 text, v2 text, PRIMARY KEY (pk));") f.write(f"CREATE MATERIALIZED VIEW {schema_test_mv} AS") f.write(f" SELECT * FROM {schema_test_base_table} WHERE v1 IS NOT NULL AND pk IS NOT NULL") f.write(" PRIMARY KEY (v1, pk);") f.flush() yield f.name @pytest.fixture(scope="class") def schema_test_si_schema_file(schema_test_base_table, schema_test_si): """ Prepares a schema.cql with the schema of the index, matching that in the `si_sstable_prepared` fixture. """ keyspace, base_table = schema_test_base_table.split(".") with tempfile.NamedTemporaryFile("w+t") as f: f.write(f"CREATE TABLE {schema_test_base_table} (pk int, v1 text, v2 text, PRIMARY KEY (pk));") f.write(f"CREATE INDEX {base_table}_by_v2 ON {schema_test_base_table}(v2);") f.flush() yield f.name @pytest.fixture(scope="class") def schema_test_mv_reference_dump(scylla_path, schema_test_mv, schema_test_mv_sstable_prepared): with tempfile.NamedTemporaryFile("w+t") as f: f.write(f"CREATE TABLE {schema_test_mv} (v1 text, pk int, v2 text, PRIMARY KEY (v1, pk))") f.flush() return _produce_reference_dump(scylla_path, ["--schema-file", f.name], schema_test_mv_sstable_prepared) @pytest.fixture(scope="class") def schema_test_si_reference_dump(scylla_path, schema_test_si, schema_test_si_sstable_prepared): with tempfile.NamedTemporaryFile("w+t") as f: f.write(f"CREATE TABLE {schema_test_si} (v2 text, idx_token bigint, pk int, PRIMARY KEY (v2, idx_token, pk))") f.flush() return _produce_reference_dump(scylla_path, ["--schema-file", f.name], schema_test_si_sstable_prepared) class TestScyllaSsstableViewSchemaLoading(TestScyllaSsstableSchemaLoadingBase): """ Test class containing schema-loading tests for materialized views and indexes. Similar to TestScyllaSsstableSchemaLoading, but focuses on testing that materialized view and index schemas can be loaded with all methods. Not focusing on exhaustively testing data directory discovery, that is already tested by TestScyllaSsstableSchemaLoading. """ def test_mv_table_dir_schema_file(self, scylla_path, schema_test_mv_sstable_prepared, schema_test_mv_reference_dump, schema_test_mv_schema_file): self.check( scylla_path, ["--schema-file", schema_test_mv_schema_file], schema_test_mv_sstable_prepared, schema_test_mv_reference_dump) def test_mv_external_dir_schema_file(self, scylla_path, schema_test_mv_sstable_prepared, schema_test_mv_reference_dump, schema_test_mv_schema_file, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(schema_test_mv_sstable_prepared, temp_workdir) self.check( scylla_path, ["--schema-file", schema_test_mv_schema_file], ext_sstable, schema_test_mv_reference_dump) def test_mv_table_dir_autodeduced(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared, schema_test_mv_reference_dump, scylla_home_dir): self.check( scylla_path, [], schema_test_mv_sstable_prepared, schema_test_mv_reference_dump) def test_mv_table_dir_scylla_yaml(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared, schema_test_mv_reference_dump, scylla_home_dir): scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") keyspace, table = schema_test_mv.split(".") self.check( scylla_path, ["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table], schema_test_mv_sstable_prepared, schema_test_mv_reference_dump) def test_mv_external_dir_scylla_yaml(self, scylla_path, schema_test_mv, schema_test_mv_sstable_prepared, schema_test_mv_reference_dump, scylla_home_dir, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(schema_test_mv_sstable_prepared, temp_workdir) scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") keyspace, table = schema_test_mv.split(".") self.check( scylla_path, ["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table], ext_sstable, schema_test_mv_reference_dump) def test_si_table_dir_schema_file(self, scylla_path, schema_test_si_sstable_prepared, schema_test_si_reference_dump, schema_test_si_schema_file): self.check( scylla_path, ["--schema-file", schema_test_si_schema_file], schema_test_si_sstable_prepared, schema_test_si_reference_dump) def test_si_external_dir_schema_file(self, scylla_path, schema_test_si_sstable_prepared, schema_test_si_reference_dump, schema_test_si_schema_file, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(schema_test_si_sstable_prepared, temp_workdir) self.check( scylla_path, ["--schema-file", schema_test_si_schema_file], ext_sstable, schema_test_si_reference_dump) def test_si_table_dir_autodeduced(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared, schema_test_si_reference_dump, scylla_home_dir): self.check( scylla_path, [], schema_test_si_sstable_prepared, schema_test_si_reference_dump) def test_si_table_dir_scylla_yaml(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared, schema_test_si_reference_dump, scylla_home_dir): scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") keyspace, table = schema_test_si.split(".") self.check( scylla_path, ["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table], schema_test_si_sstable_prepared, schema_test_si_reference_dump) def test_si_external_dir_scylla_yaml(self, scylla_path, schema_test_si, schema_test_si_sstable_prepared, schema_test_si_reference_dump, scylla_home_dir, temp_workdir): ext_sstable = self.copy_sstable_to_external_dir(schema_test_si_sstable_prepared, temp_workdir) scylla_yaml_file = os.path.join(scylla_home_dir, "conf", "scylla.yaml") keyspace, table = schema_test_si.split(".") self.check( scylla_path, ["--scylla-yaml-file", scylla_yaml_file, "--keyspace", keyspace, "--table", table], ext_sstable, schema_test_si_reference_dump) @pytest.fixture(scope="module") def scrub_workdir(): """A root temporary directory to be shared by all the scrub tests""" with tempfile.TemporaryDirectory() as tmp_dir: yield tmp_dir @pytest.fixture(scope="module") def scrub_schema_file(scrub_workdir): """Create a schema.cql for the scrub tests""" fname = os.path.join(scrub_workdir, "schema.cql") with open(fname, "w") as f: f.write("CREATE TABLE ks.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))") f.flush() yield fname @pytest.fixture(scope="module") def scrub_good_sstable(scylla_path, scrub_workdir, scrub_schema_file): """A good sstable used by the scrub tests.""" with tempfile.TemporaryDirectory(prefix="good-sstable", dir=scrub_workdir) as tmp_dir: sst_json_path = os.path.join(tmp_dir, "sst.json") with open(sst_json_path, "w") as f: sst_json = [ { "key": { "raw": "0004000000c8" }, "clustering_elements": [ { "type": "clustering-row", "key": { "raw": "000400000001" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1686815362417553, "value": "vv" } } } ] } ] json.dump(sst_json, f) subprocess.check_call([scylla_path, "sstable", "write", "--schema-file", scrub_schema_file, "--input-format", "json", "--output-dir", tmp_dir, "--input-file", sst_json_path]) ssts = glob.glob(os.path.join(tmp_dir, "*-Data.db")) assert len(ssts) == 1 yield ssts[0] @pytest.fixture(scope="module") def scrub_bad_sstable(scylla_path, scrub_workdir, scrub_schema_file): """A bad sstable (out-of-order rows) used by the scrub tests.""" with tempfile.TemporaryDirectory(prefix="bad-sstable", dir=scrub_workdir) as tmp_dir: sst_json_path = os.path.join(tmp_dir, "sst.json") with open(sst_json_path, "w") as f: # rows are out-of-order sst_json = [ { "key": { "raw": "0004000000c8" }, "clustering_elements": [ { "type": "clustering-row", "key": { "raw": "000400000002" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1686815362417553, "value": "vv" } } }, { "type": "clustering-row", "key": { "raw": "000400000001" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1686815362417553, "value": "vv" } } } ] } ] json.dump(sst_json, f) subprocess.check_call([scylla_path, "sstable", "write", "--schema-file", scrub_schema_file, "--output-dir", tmp_dir, "--input-format", "json", "--input-file", sst_json_path, "--validation-level", "none"]) ssts = glob.glob(os.path.join(tmp_dir, "*-Data.db")) assert len(ssts) == 1 yield ssts[0] def subprocess_check_error(args, pattern): """Invoke scubprocess.run() with the provided args and check that it fails with stderr matching the provided pattern.""" res = subprocess.run(args, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert res.returncode != 0 err = res.stderr.decode('utf-8') assert re.search(pattern, err) is not None def check_scrub_output_dir(sst_dir, num_sstables): assert len(glob.glob(os.path.join(sst_dir, "*-Data.db"))) == num_sstables def test_scrub_no_sstables(scylla_path, scrub_schema_file): subprocess_check_error([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "validate"], "error processing arguments: no sstables specified on the command line") def test_scrub_missing_scrub_mode_cli_arg(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable): subprocess_check_error([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, scrub_good_sstable], "error processing arguments: missing mandatory command-line argument --scrub-mode") def test_scrub_output_dir(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable): with tempfile.TemporaryDirectory(prefix="test_scrub_output_dir", dir=scrub_workdir) as tmp_dir: # Empty output directory is accepted. subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "abort", "--output-dir", tmp_dir, scrub_good_sstable]) def test_scrub_abort_mode(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable, scrub_bad_sstable): with tempfile.TemporaryDirectory(prefix="test_scrub_abort_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "abort", "--output-dir", tmp_dir, scrub_good_sstable]) check_scrub_output_dir(tmp_dir, 1) with tempfile.TemporaryDirectory(prefix="test_scrub_abort_mode", dir=scrub_workdir) as tmp_dir: subprocess_check_error([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "abort", "--output-dir", tmp_dir, scrub_bad_sstable], "compaction_aborted_exception \\(Compaction for ks/tbl was aborted due to: scrub compaction found invalid data\\)") check_scrub_output_dir(tmp_dir, 0) def test_scrub_skip_mode(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable, scrub_bad_sstable): with tempfile.TemporaryDirectory(prefix="test_scrub_skip_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "skip", "--output-dir", tmp_dir, scrub_good_sstable]) check_scrub_output_dir(tmp_dir, 1) with tempfile.TemporaryDirectory(prefix="test_scrub_skip_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "skip", "--output-dir", tmp_dir, scrub_bad_sstable]) check_scrub_output_dir(tmp_dir, 1) def test_scrub_segregate_mode(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable, scrub_bad_sstable): with tempfile.TemporaryDirectory(prefix="test_scrub_segregate_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "segregate", "--output-dir", tmp_dir, scrub_good_sstable]) check_scrub_output_dir(tmp_dir, 1) with tempfile.TemporaryDirectory(prefix="test_scrub_segregate_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "segregate", "--output-dir", tmp_dir, scrub_bad_sstable]) check_scrub_output_dir(tmp_dir, 2) def test_scrub_validate_mode(scylla_path, scrub_workdir, scrub_schema_file, scrub_good_sstable, scrub_bad_sstable): with tempfile.TemporaryDirectory(prefix="test_scrub_validate_mode", dir=scrub_workdir) as tmp_dir: subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "validate", "--output-dir", tmp_dir, scrub_good_sstable]) check_scrub_output_dir(tmp_dir, 0) subprocess.check_call([scylla_path, "sstable", "scrub", "--schema-file", scrub_schema_file, "--scrub-mode", "validate", "--output-dir", tmp_dir, scrub_bad_sstable]) check_scrub_output_dir(tmp_dir, 0) # Check that validate did not move the bad sstable into qurantine assert os.path.exists(scrub_bad_sstable) def _to_cql3_type(t: Type) -> str: # map from Python type to Cassandra type, only a small subset is supported py_to_cql3_type = {int: "Int32Type", str: "UTF8Type", bool: "BooleanType"} return py_to_cql3_type[t] KeyType = Union[int, str, bool] def _serialize_value(scylla_path: str, value: KeyType) -> str: return subprocess.check_output([scylla_path, "types", "serialize", "--full-compound", "-t", _to_cql3_type(type(value)), "--", str(value)]).strip().decode() @functools.cache def _shard_of_values(scylla_path: str, shards: int, *values: list[KeyType]) -> int: args = [scylla_path, "types", "shardof", "--full-compound", "--shards", str(shards)] for value in values: args.extend(['-t', _to_cql3_type(type(value))]) serialized = ''.join(_serialize_value(scylla_path, v) for v in values) args.extend(['--', serialized]) output = subprocess.check_output(args).strip().decode() # the output looks like: # (file_instance, 2021-03-27, c61a3321-0459-41c3-8e56-75255feb0196): token: -5043005771368701888, shard: 1 shard = output.rsplit(':', 1)[-1] return int(shard) def _generate_key_for_shard(scylla_path: str, shards: int, shard_id: int) -> Iterable[int]: # this only works with the table with a single integer pk. if we want to # be more general, we could use a randomized generator to enumerate all # possible pk combinations. for pk in itertools.count(start=0, step=1): if _shard_of_values(scylla_path, shards, pk) == shard_id: yield pk def _simple_table_with_keys(cql, keyspace: str, keys: Iterable[int]) -> tuple[str, str]: table = util.unique_name() schema = (f"CREATE TABLE {keyspace}.{table} (pk int PRIMARY KEY, v int) " "WITH compaction = {'class': 'NullCompactionStrategy'}") cql.execute(schema) for pk in keys: cql.execute(f"INSERT INTO {keyspace}.{table} (pk, v) VALUES ({pk}, 0)") nodetool.flush(cql, f"{keyspace}.{table}") return table, schema def test_scylla_sstable_shard_of_vnodes(cql, test_keyspace_vnodes, scylla_path, scylla_data_dir) -> None: # cqlpy/run.py::run_scylla_cmd() passes "--smp 2" to scylla, so we # need to be consistent with it to get the correct sstable-shard mapping scylla_option_smp = 2 shards = scylla_option_smp num_keys = 1 for shard_id in range(shards): all_keys_for_shard = _generate_key_for_shard(scylla_path, shards, shard_id) keys = itertools.islice(all_keys_for_shard, num_keys) table_factory = functools.partial(_simple_table_with_keys, keys=keys) with scylla_sstable(table_factory, cql, test_keyspace_vnodes, scylla_data_dir) as (_, schema_file, sstables): out = subprocess.check_output([scylla_path, "sstable", "shard-of", "--vnodes", "--schema-file", schema_file, "--shards", str(shards)] + sstables) # all sstables contains the rows with the keys deliberately # created for specified shard sstables_json = json.loads(out)['sstables'] expected_json = [shard_id] for actual_json in sstables_json.values(): assert actual_json == expected_json def test_scylla_sstable_shard_of_tablets(cql, test_keyspace_tablets, scylla_path, scylla_data_dir) -> None: # the token for 0 is mapped to shard 0, 142 is mapped to shard 1 shard_to_key = {0: 0, 1: 142} for shard_id, key in shard_to_key.items(): table_factory = functools.partial(_simple_table_with_keys, keys=[key]) with scylla_sstable(table_factory, cql, test_keyspace_tablets, scylla_data_dir) as (_, schema_file, sstables): with nodetool.no_autocompaction_context(cql, "system.tablets"): nodetool.flush_keyspace(cql, "system") out = subprocess.check_output([scylla_path, "sstable", "shard-of", "--tablets", "--schema-file", schema_file] + sstables) sstables_json = json.loads(out)['sstables'] for replica_sets in sstables_json.values(): for replica_set in replica_sets: actual_shard = replica_set['shard'] assert actual_shard == shard_id def test_scylla_sstable_no_args(scylla_path): res = subprocess.run([scylla_path, "sstable"], capture_output=True, text=True) assert res.stdout == "" assert res.stderr == """\ Usage: scylla sstable OPERATION [OPTIONS] ... Try `scylla sstable --help` for more information. """ def test_scylla_sstable_bad_scylla_yaml(cql, test_keyspace, scylla_path, scylla_data_dir): """ scylla-sstable should not choke on deprecated/unrecognized/etc options in scylla.yaml It should just log a debug-level log and proceed with reading it. This test checks that the config is successfully read, even if there are errors. Reproduces: https://github.com/scylladb/scylladb/issues/16538 """ with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (_, schema_file, sstables): with tempfile.NamedTemporaryFile("w+t") as scylla_yaml: scylla_yaml.write("foo: bar") scylla_yaml.flush() res = subprocess.run([scylla_path, "sstable", "dump-data", "--scylla-yaml", scylla_yaml.name, "--schema-file", schema_file, "--logger-log-level", "scylla-sstable=debug"] + sstables, text=True, stderr=subprocess.PIPE) assert res.returncode == 0 print(res.stderr) # when the test fails, it helps to see what the actual output is stderr_lines = res.stderr.split('\n') for expected_msg in ( "error processing configuration item: Unknown option : foo", "Successfully read scylla.yaml from"): assert any(map(lambda stderr_line: expected_msg in stderr_line, stderr_lines)) def test_scylla_sstable_format_version(cql, test_keyspace, scylla_data_dir): # Reproduces https://github.com/scylladb/scylladb/issues/16551 # # an sstable component filename looks like: # me-3g8w_00qf_4pbog2i7h2c7am0uoe-big-Data.db sstable_re = re.compile(r"""(?Pla|m[cdes])- # the sstable version (?P[^-]+)- # sstable identifier (?P\w+)- # format: 'big' (?P.*) # component: e.g., 'Data'""", re.X) with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (_, _, sstables): for fn in sstables: stem = pathlib.Path(fn).stem matched = sstable_re.match(stem) assert matched is not None, f"unmatched sstable component path: {fn}" sstable_version = matched["version"] # "ms" is currently the default sstable format version. assert sstable_version == "ms", f"unexpected sstable format: {sstable_version}" def test_create_local_key_file(scylla_path): with tempfile.TemporaryDirectory() as dir: file = os.path.join(dir, "keyfile") subprocess.check_call([scylla_path, "local-file-key-generator", "generate", file]) assert os.path.isfile(file) status = os.stat(file) assert (status.st_mode & 0xfff) == (stat.S_IRUSR | stat.S_IWUSR) num_lines = sum(1 for _ in open(file)) assert num_lines > 0 def test_create_local_key_file_with_args(scylla_path): with tempfile.TemporaryDirectory() as dir: file = os.path.join(dir, "keyfile") subprocess.check_call([scylla_path, "local-file-key-generator", "generate", "-a", "DESEDE", "-l", "128", "-b", "ECB", file]) assert os.path.isfile(file) status = os.stat(file) assert (status.st_mode & 0xfff) == (stat.S_IRUSR | stat.S_IWUSR) for line in open(file): assert re.match(r"DESEDE/ECB/PKCS5Padding:128:\S+", line) break class sstable_query_tester: def __init__(self, cql, scylla_path, sstables, keyspace, table, temp_workdir): self._cql = cql self._scylla_path = scylla_path self._sstables = sstables self._keyspace = keyspace self._table = table self._temp_workdir = temp_workdir def test_json(self, query_template): cql_query_result = self._cql.execute(query_template.replace("SELECT", "SELECT JSON").format(f"{self._keyspace}.{self._table}")) cql_query_result = list(map(lambda row: json.loads(row[0]), cql_query_result)) with open(os.path.join(self._temp_workdir, "query.cql"), "w+t") as query_file: query_file.write(query_template.format(f"scylla_sstable.{self._table}")) query_file.flush() sstable_query_result = json.loads(subprocess.check_output([ self._scylla_path, "sstable", "query", "--logger-log-level", "scylla-sstable=debug", "--reactor-backend=linux-aio", "--output-format", "json", "--schema-tables", "--query-file", query_file.name] + self._sstables)) assert sstable_query_result == cql_query_result def test_text(self, query_template=None): if query_template is None: cql_query_result = self._cql.execute("SELECT JSON * FROM {}".format(f"{self._keyspace}.{self._table}")) else: cql_query_result = self._cql.execute(query_template.replace("SELECT", "SELECT JSON").format(f"{self._keyspace}.{self._table}")) cql_query_result = list(map(lambda row: json.loads(row[0]), cql_query_result)) params = [self._scylla_path, "sstable", "query", "--logger-log-level", "scylla-sstable=debug", "--output-format", "text", "--schema-tables"] if query_template is not None: params += ["--query", query_template.format(f"scylla_sstable.{self._table}")] params += self._sstables sstable_query_result = subprocess.check_output(params, text=True) column_names = [] for line_index, line in enumerate(sstable_query_result.split('\n')): if not line: continue columns = list(map(str.strip, line.split('|'))) if line_index == 0: # header column_names = columns elif line_index == 1: continue # header-body separator line else: # body cql_row = cql_query_result[line_index - 2] assert len(columns) == len(cql_row) for column_index, column_value in enumerate(columns): column_name = column_names[column_index] assert column_name in cql_row assert str(cql_row[column_name]) == column_value scylla_sstable_query_simple_table_param = collections.namedtuple('scylla_sstable_query_simple_table_param', ['schema', 'insert_statement', 'insert_statement_parameters', 'prepare', 'teardown']) scylla_sstable_query_simple_all_types_param = scylla_sstable_query_simple_table_param( textwrap.dedent(""" CREATE TABLE {}.{} ( pk int, ck int, col_ascii ascii, col_bigint bigint, col_blob blob, col_boolean boolean, col_date date, col_decimal decimal, col_double double, col_duration duration, col_float float, col_inet inet, col_int int, col_smallint smallint, col_text text, col_time time, col_timestamp timestamp, col_timeuuid timeuuid, col_tinyint tinyint, col_uuid uuid, col_varint varint, PRIMARY KEY (pk, ck) ) WITH compaction = {{'class': 'NullCompactionStrategy'}}; """), textwrap.dedent(""" INSERT INTO {}.{} ( pk, ck, col_ascii, col_bigint, col_blob, col_boolean, col_date, col_decimal, col_double, col_duration, col_float, col_inet, col_int, col_smallint, col_text, col_time, col_timestamp, col_timeuuid, col_tinyint, col_uuid, col_varint ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """), ( 0, 0, "asasdfasfd", 99999999, bytearray(b'\x78\xcc\xaf'), True, "2012-03-04", 87689.2123, 0.7567645, Duration(months=10, days=1), 0.11, "127.0.0.1", 100, 10, "kjhknnhjhghgsdf", "08:12:54", 1299038700000, uuid.UUID("f06fdf50-bdf7-11ef-84c5-e1cf2fa2d062"), 1, uuid.UUID("123e4567-e89b-12d3-a456-426655440000"), 8876876876, ), None, None, ) my_udt = collections.namedtuple('my_type', ['field_1', 'field_2']) scylla_sstable_query_simple_collection_types_param = scylla_sstable_query_simple_table_param( textwrap.dedent(""" CREATE TABLE {}.{} ( pk int PRIMARY KEY, col_list list, col_set set, col_map map, col_tuple tuple, col_udt my_type ) WITH compaction = {{'class': 'NullCompactionStrategy'}}; """), "INSERT INTO {}.{} (pk, col_list, col_set, col_map, col_tuple, col_udt) VALUES (?, ?, ?, ?, ?, ?)", (0, ['adads', '3443sd'], {'asdafasdf', 'aasd'}, {1: 'ak', 2: 'pa'}, ('adadad', 1), my_udt(10, 'aasdad')), "CREATE TYPE {}.my_type (field_1 int, field_2 text)", "DROP TYPE {}.my_type", ) scylla_sstable_query_simple_counter_param = scylla_sstable_query_simple_table_param( textwrap.dedent(""" CREATE TABLE {}.{} ( pk int PRIMARY KEY, col_counter counter ) WITH compaction = {{'class': 'NullCompactionStrategy'}}; """), "UPDATE {}.{} SET col_counter = col_counter + 10 WHERE pk = ?", (0,), None, None, ) scylla_sstable_query_nested_udt = scylla_sstable_query_simple_table_param( textwrap.dedent(""" CREATE TABLE {}.{} ( pk int PRIMARY KEY, col_nested_udt list> ) WITH compaction = {{'class': 'NullCompactionStrategy'}}; """), "INSERT INTO {}.{} (pk, col_nested_udt) VALUES (?, ?)", (0, [my_udt(10, 'aasdad')]), "CREATE TYPE {}.my_type (field_1 int, field_2 text)", "DROP TYPE {}.my_type", ) @pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True) @pytest.mark.parametrize("test_table", [ scylla_sstable_query_simple_all_types_param, scylla_sstable_query_simple_collection_types_param, scylla_sstable_query_simple_counter_param, scylla_sstable_query_nested_udt, ]) def test_scylla_sstable_query_data_types(request, cql, test_keyspace, test_table, scylla_path, scylla_data_dir, temp_workdir): """Check read-all queries with all data-types. Run the same query via CQL and via scylla-sstable query and check that the results match. Test both with --query-file and the command-line query composition params. Test both json and text output. To reduce the amount of cases, --query-file is tested with json output, while command-line query is tested with text output. This test focuses on checkig the correct formatting and handling of all CQL data-types. """ if test_table.prepare is not None: cql.execute(test_table.prepare.format(test_keyspace)) table = util.unique_name() schema = test_table.schema.format(test_keyspace, table) cql.execute(schema) insert_statement = cql.prepare(test_table.insert_statement.format(test_keyspace, table)) cql.execute(insert_statement, test_table.insert_statement_parameters) nodetool.flush(cql, f"{test_keyspace}.{table}") nodetool.flush_keyspace(cql, "system_schema") sstables = get_sstables_for_table(scylla_data_dir, test_keyspace, table) with nodetool.no_autocompaction_context(cql, "system_schema"): tester = sstable_query_tester(cql, scylla_path, sstables, test_keyspace, table, temp_workdir) tester.test_json("SELECT * FROM {}") cql.execute(f"DROP TABLE {test_keyspace}.{table}") if test_table.teardown is not None: cql.execute(test_table.teardown.format(test_keyspace)) def test_scylla_sstable_query_advanced_queries(cql, test_keyspace, scylla_path, scylla_data_dir, temp_workdir): """Check more advanced queries. Run the same query via CQL and via scylla-sstable query and check that the results match. Test both with --query-file and the command-line query composition params. Test both json and text output. To reduce the amount of cases, --query-file is tested with json output, while command-line query is tested with text output. This test focuses on checkig the correct handling of more advanced queries, which select a subset of fields and use restrictions and even aggregation. No attempt is made to cover all CQL featues. """ table = util.unique_name() schema = f"CREATE TABLE {test_keyspace}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))" cql.execute(schema) insert_statement = cql.prepare(f"INSERT INTO {test_keyspace}.{table} (pk, ck, v) VALUES (?, ?, ?)") for pk in range(0, 10): for ck in range(0, 10): cql.execute(insert_statement, (pk, ck, pk + ck)) nodetool.flush(cql, f"{test_keyspace}.{table}") nodetool.flush_keyspace(cql, "system_schema") sstables = get_sstables_for_table(scylla_data_dir, test_keyspace, table) with nodetool.no_autocompaction_context(cql, "system_schema"): tester = sstable_query_tester(cql, scylla_path, sstables, test_keyspace, table, temp_workdir) tester.test_text() tester.test_json("SELECT count(*) FROM {} WHERE pk = 0") tester.test_json("SELECT ck, v FROM {} WHERE pk = 0 and ck = 0") tester.test_text("SELECT ck, v FROM {} WHERE pk=0") tester.test_json("SELECT * FROM {} WHERE pk = 0 AND v = 0 ALLOW FILTERING") tester.test_text("SELECT pk, v FROM {} WHERE pk=0 AND v=0 ALLOW FILTERING") cql.execute(f"DROP TABLE {test_keyspace}.{table}") def test_scylla_sstable_query_bad_command_line(cql, scylla_path, scylla_data_dir): """Check that not-allowed command line param combinations are refused.""" nodetool.flush_keyspace(cql, "system") with nodetool.no_autocompaction_context(cql, "system.local"): sstables = get_sstables_for_table(scylla_data_dir, "system", "local") common_params = [scylla_path, "sstable", "query", "--system-schema", "--keyspace", "system", "--table", "local", "--query-file", "whatever.cql"] def check(bad_params): res = subprocess.run(common_params + bad_params + sstables, text=True, capture_output=True) assert res.returncode == 1 assert res.stdout == "" assert res.stderr.endswith("error processing arguments: cannot provide both -q|--query and --query-file\n") check(["-q", "SELECT * FROM system.local"]) check(["--query", "SELECT * FROM system.local"]) def test_scylla_sstable_query_validation(cql, scylla_path, scylla_data_dir): """Check that not-allowed command line param combinations are refused.""" with nodetool.no_autocompaction_context(cql, "system.local"): sstables = get_sstables_for_table(scylla_data_dir, "system", "local") common_params = [scylla_path, "sstable", "query", "--system-schema", "--keyspace", "system", "--table", "local", "--reactor-backend=linux-aio", "--query"] def check(bad_query, expected_error): res = subprocess.run(common_params + [bad_query] + sstables, text=True, capture_output=True) assert res.returncode == 1 assert res.stdout == "" assert "error processing arguments: " + expected_error in res.stderr check("SELECT * FROM ,", "failed to parse query: exceptions::syntax_exception") check("SELECT * FROM scylla_sstable.columns; SELECT * FROM scylla_sstable.columns;", "expected exactly 1 query, got 2") check("SELECT * FROM foo", "query must have keyspace and the keyspace has to be scylla_sstable") check("SELECT * FROM foo.bar", "query must be against scylla_sstable keyspace, got foo instead") check("SELECT * FROM system.local", "query must be against scylla_sstable keyspace, got system instead") check("SELECT * FROM scylla_sstable.foo", "query must be against local table, got foo instead") check("SELECT * FROM scylla_sstable.tables", "query must be against local table, got tables instead") check("INSERT INTO scylla_sstable.local (key, bootstrapped) VALUES ('local', 'COMPLETED')", "query must be a select query") def test_scylla_sstable_query_temp_dir(cql, scylla_path, scylla_data_dir): """Check that TEMPDIR environment variable is respected. This is very hard to test with a positive test, because cql_test_env removes its temp-dir on exit. So we test with a negative test: give an impossible path and check that creating the temp-dir fails. """ with nodetool.no_autocompaction_context(cql, "system.local"): sstables = get_sstables_for_table(scylla_data_dir, "system", "local") with tempfile.NamedTemporaryFile("r") as f: args = [scylla_path, "sstable", "query", "--system-schema", "--keyspace", "system", "--table", "local"] res = subprocess.run(args + sstables, text=True, capture_output=True, env={'TEMPDIR': f.name}) assert res.returncode == 2 assert res.stdout == "" assert res.stderr.endswith(f"error running operation: std::filesystem::__cxx11::filesystem_error (error generic:20, filesystem error: temp_directory_path: Not a directory [{f.name}])\n") def test_scylla_sstable_query_null_data(cql, test_keyspace, scylla_path, scylla_data_dir): """Check that scylla-sstable query works with null cell values. Reproduces https://github.com/scylladb/scylladb/issues/25325 """ # simple_clustering_table sets the static column `s` only for some of the keys # and leaves it unset (null) for others. with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): args = [scylla_path, "sstable", "query", "--output-format", "json", "--scylla-yaml-file", f"{os.path.dirname(scylla_data_dir)}/conf/scylla.yaml", "--logger-log-level", "scylla-sstable=debug"] args.extend(sstables) try: out = subprocess.check_output(args) except subprocess.CalledProcessError as e: pytest.fail(f"Failed to query sstable: {e}\n{e.output.decode('utf-8')}") assert out assert json.loads(out) # Verify that null values were properly queried assert ":null" in out.decode('utf-8'), f"Expected null values in the output, but they were not found. out='{out.decode('utf-8')}'" # Use different snapshot tags to stress the sstables::make_entry_descriptor regex pattern matching # And try to reproduce https://github.com/scylladb/scylladb/issues/25242 where a snapshot tag # that resembles a table- directory confused the regular expression when doing greedy matching # to think that "snapshots" is the keyspace name and the snapshot tag to be the table name. @pytest.mark.parametrize("test_tag", [ "sstable-dump-4d1cc6a4-6c13-11f0-a3c9", "test-4d1cc6a46c1311f0a3c9", "dropped-1754462406" ]) def test_scylla_sstable_query_data_from_snapshot(cql, test_keyspace, scylla_path, scylla_data_dir, test_tag): """Check that scylla-sstable query works with sstables in a snapshot directory. Reproduces https://github.com/scylladb/scylladb/issues/25242 """ with scylla_sstable(simple_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): nodetool.take_snapshot(cql, f"{test_keyspace}.{table}", test_tag, False) args = [scylla_path, "sstable", "query", "--output-format", "json", "--scylla-yaml-file", f"{os.path.dirname(scylla_data_dir)}/conf/scylla.yaml", "--logger-log-level", "scylla-sstable=debug"] args.extend([f"{os.path.dirname(sst)}/snapshots/{test_tag}/{os.path.basename(sst)}" for sst in sstables]) try: out = subprocess.check_output(args) except subprocess.CalledProcessError as e: pytest.fail(f"Failed to query sstable: {e}\n{e.output.decode('utf-8')}") finally: nodetool.del_snapshot(cql, test_tag) assert out print(f"out: {out.decode('utf-8')}") assert json.loads(out) def test_scylla_sstable_upgrade(cql, test_keyspace, scylla_path, scylla_data_dir): with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): def invoke(tmp_dir, args): base_args = [scylla_path, "sstable", "upgrade", "--schema-file", schema_file, "--output-dir", tmp_dir, "--logger-log-level", "scylla-sstable=debug"] out = subprocess.check_output(base_args + args + sstables, text=True) return out.strip().split('\n') # Nothing to upgrade with tempfile.TemporaryDirectory() as tmp_dir: lines = invoke(tmp_dir, []) assert len(lines) == len(sstables) for line, sst in zip(lines, sstables): assert line.startswith(f"Nothing to do for sstable {sst}, skipping (use --all to force upgrade all sstables).") with tempfile.TemporaryDirectory() as tmp_dir: lines = invoke(tmp_dir, ["--all"]) assert len(lines) == len(sstables) for line, sst in zip(lines, sstables): assert line.startswith(f"Upgraded sstable {sst} to") with tempfile.TemporaryDirectory() as tmp_dir: lines = invoke(tmp_dir, ["--sstable-version", "md"]) # downgrade to "md" format assert len(lines) == len(sstables) for line, sst in zip(lines, sstables): assert not sst.startswith("md-") assert re.match(f"^Upgraded sstable {sst} to /.*/md-.*\\.$", line) with tempfile.TemporaryDirectory() as tmp_dir: with open(os.path.join(tmp_dir, "dummy.txt"), "w") as f: f.write("dummy") f.flush() lines = invoke(tmp_dir, []) assert len(lines) == len(sstables) for line, sst in zip(lines, sstables): assert line.startswith(f"Nothing to do for sstable {sst}, skipping (use --all to force upgrade all sstables).") def test_scylla_sstable_upgrade_ignore_digest_mismatch(cql, test_keyspace, scylla_path, scylla_data_dir): """Test that --ignore-component-digest-mismatch allows loading sstables with corrupted component digests.""" with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): assert len(sstables) >= 1 sst = sstables[0] stats_file = sst.replace("-Data.db", "-Statistics.db") assert os.path.exists(stats_file), f"Statistics file not found: {stats_file}" with open(stats_file, "ab") as f: f.write(b'\x00') base_args = [scylla_path, "sstable", "upgrade", "--schema-file", schema_file, "--all", "--logger-log-level", "scylla-sstable=debug"] # # Without --ignore-component-digest-mismatch, loading should fail due to digest mismatch with tempfile.TemporaryDirectory() as tmp_dir: result = subprocess.run(base_args + ["--output-dir", tmp_dir, sst], capture_output=True, text=True) assert result.returncode != 0, "Expected failure due to digest mismatch" # With --ignore-component-digest-mismatch, loading should succeed with tempfile.TemporaryDirectory() as tmp_dir: result = subprocess.run(base_args + ["--ignore-component-digest-mismatch", "--output-dir", tmp_dir, sst], capture_output=True, text=True) assert result.returncode == 0, f"Expected success with --ignore-component-digest-mismatch, stderr: {result.stderr}" def test_scylla_sstable_dump_schema(cql, test_keyspace, scylla_path, scylla_data_dir): def query_system_schema(schema_table: str, table_name: str) -> list: q = f"SELECT * FROM system_schema.{schema_table} WHERE keyspace_name = '{test_keyspace}' AND table_name = '{table_name}'" def process(r): d = {} for k, v in r._asdict().items(): # id and version depend on create time, so we normalize them if k == 'id': d[k] = '$ID' elif k == 'version': d[k] = '$VERSION' elif k == 'extensions': # Extensions like tombstone_gc are problematic, because the # scylla-sstable tool doesn't have access to the keyspace # definition, so it will come up with different defaults. pass else: d[k] = v return d return list(map(process, cql.execute(q))) expected_results = {} with scylla_sstable(clustering_table_with_udt, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): for schema_table in ('columns', 'tables', 'scylla_tables'): expected_results[schema_table] = query_system_schema(schema_table, table) table_name = table dumped_schema = subprocess.check_output([scylla_path, "sstable", f"dump-schema", "--schema-file", schema_file, sstables[0]], text=True) for statement in map(str.strip, dumped_schema.split(';')): if statement: cql.execute(statement) try: for schema_table in ('columns', 'tables', 'scylla_tables'): new_res = query_system_schema(schema_table, table_name) assert new_res == expected_results[schema_table] finally: cql.execute(f"DROP TABLE {test_keyspace}.{table_name}") def test_scylla_sstable_filter(cql, test_keyspace, scylla_path, scylla_data_dir): with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): pks = [r.pk for r in cql.execute(f"SELECT pk FROM {test_keyspace}.{table}")] print(f"Generated primary keys: {pks}") assert len(pks) > 2 serialized_pks = [_serialize_value(scylla_path, pk) for pk in pks] def filter(flag): with tempfile.TemporaryDirectory() as tmp_dir: cmd = [scylla_path, "sstable", "filter", "--schema-file", schema_file, "--output-dir", tmp_dir , flag] for pk in serialized_pks[:2]: cmd += ["--partition", pk] cmd += sstables out = subprocess.check_output(cmd, text=True) print(f"Filter: {' '.join(cmd)}\n{out}") out_lines = out.strip().split('\n') filtered_sstables = [] for line in out_lines: m = re.match(r"^Filtering.*\.\.\. output written to (.*)$", line) if m: filtered_sstables.append(m.group(1)) assert len(filtered_sstables) >= 1 query_cmd = [scylla_path, "sstable", "query", "--output-format", "json", "--schema-file", schema_file] + filtered_sstables query_res = json.loads(subprocess.check_output(query_cmd, text=True)) print(f"Query: {' '.join(query_cmd)}\n{query_res}") return {row['pk'] for row in query_res} if filter("--include") != set(pks[:2]): shutil.copy(schema_file, "/home/bdenes/out") for sst in sstables: shutil.copy(sst, "/home/bdenes/out/") assert filter("--include") == set(pks[:2]) assert filter("--exclude") == set(pks[2:]) def test_scylla_sstable_split(cql, test_keyspace, scylla_path, scylla_data_dir): with scylla_sstable(simple_no_clustering_table, cql, test_keyspace, scylla_data_dir) as (table, schema_file, sstables): # Get partition keys from sstables only (includes dead partitions) sstable_rows = list(cql.execute(f"SELECT pk FROM MUTATION_FRAGMENTS({test_keyspace}.{table}) WHERE mutation_source >= 'sstable:' ALLOW FILTERING")) sstable_pks = set(r.pk for r in sstable_rows) # Get tokens for live partitions to use as split points live_rows = list(cql.execute(f"SELECT pk, token(pk) FROM {test_keyspace}.{table}")) live_partitions = [{'pk': r.pk, 'token': r.system_token_pk} for r in live_rows] print(f"Partitions in sstables: {sstable_pks}") print(f"Live partitions with tokens: {live_partitions}") assert len(live_partitions) >= 2, "Need at least 2 live partitions for split test" # Choose split token from a live partition split_partition = live_partitions[1] split_token = split_partition['token'] assert split_partition['pk'] in sstable_pks, "Split partition should be present in sstables" print(f"Split token: {split_token}") with tempfile.TemporaryDirectory() as tmp_dir: # Test split (use --merge if multiple sstables to test that path too) merge_flag = ["--merge"] if len(sstables) > 1 else [] cmd = [scylla_path, "sstable", "split", "--schema-file", schema_file, "--output-dir", tmp_dir, "-t", str(split_token)] + merge_flag + sstables out = subprocess.check_output(cmd, text=True) print(f"Split output:\n{out}") # Parse output to find generated sstables split_sstables = [] for line in out.strip().split('\n'): m = re.match(r"^\s+(.+/[^/]+-Data\.db)\s*$", line) if m: split_sstables.append(m.group(1)) # Should have created 2 output sstables (N+1 for N split tokens) assert len(split_sstables) == 2, f"Expected 2 output sstables, got {len(split_sstables)}" # Query each output sstable and collect partition keys output_pks = [set(), set()] for i, sst in enumerate(split_sstables): dump_cmd = [scylla_path, "sstable", "dump-data", "--output-format", "json", "--schema-file", schema_file, sst] dump_res = json.loads(subprocess.check_output(dump_cmd, text=True)) for sstable_data in dump_res["sstables"].values(): for partition in sstable_data: pk = int(partition['key']['value']) token = int(partition['key']['token']) output_pks[i].add(pk) # Verify token is in expected range if i == 0: assert token < split_token, f"Token {token} in first sstable should be < {split_token}" else: assert token >= split_token, f"Token {token} in second sstable should be >= {split_token}" print(f"First sstable contains: {output_pks[0]}") print(f"Second sstable contains: {output_pks[1]}") # Verify all partitions are present and no overlap (compare partition keys, not tokens) all_output_pks = output_pks[0] | output_pks[1] assert all_output_pks == sstable_pks, "Split sstables should contain all original partitions" assert len(output_pks[0] & output_pks[1]) == 0, "Output sstables should not overlap"