sstable_validation_test tests the `scylla sstable validate` command by passing it intentionally corrupted sstables. It uses an sstable cache to avoid re-creating the same sstables. However, the cache does not consider the sstable version, so if called twice with the same inputs for different versions, it will return an sstable with the original version for both calls. As a results, `ms` sstables were not tested. Fix this bug by adding the sstable version (and the schema for good measure) to the cache key. An additional bug, hidden by the first, was that we corrupted the sstable by overwriting its Index.db component. But `ms` sstables don't have an Index.db component, they have a Partitions.db component. Adjust the corrupting code to take that into account. With these two fixes, test_scylla_sstable_validate_mismatching_partition_large fails on `ms` sstables. Disable it for that version. Since it was previously practically untested, we're not losing any coverage. Fixing this test unblocks further work on making pytest take charge of running the tests. pytest exposed this problem, likely by running it on different runners (and thus reducing the effectiveness of the cache). Fixes #27822. Closes scylladb/scylladb#27825
365 lines
16 KiB
Python
365 lines
16 KiB
Python
# Copyright 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
#############################################################################
|
|
# Tests for sstable validation.
|
|
#
|
|
# Namely, detecting discrepancies between index and data files.
|
|
# These tests produce pairs of sstables, that differ only slightly, mixes the
|
|
# index/data from the two of them, then runs `sstable validate` expecting it to
|
|
# find the discrepancies.
|
|
#############################################################################
|
|
|
|
|
|
import glob
|
|
import json
|
|
import os
|
|
import pytest
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def schema1_file():
|
|
"""Create a schema.cql for the schema1"""
|
|
with tempfile.NamedTemporaryFile("w+t") as f:
|
|
f.write("CREATE TABLE ks.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))")
|
|
f.flush()
|
|
yield f.name
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def schema2_file():
|
|
"""Create a schema.cql for the schema2"""
|
|
with tempfile.NamedTemporaryFile("w+t") as f:
|
|
f.write("CREATE TABLE ks.tbl (pk int, ck int, v text, s text STATIC, PRIMARY KEY (pk, ck))")
|
|
f.flush()
|
|
yield f.name
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def large_rows():
|
|
"""Create a set of large rows"""
|
|
rows = []
|
|
v_1k = 'v' * 1024
|
|
for ck in range(128):
|
|
ck_raw = "0004{:08x}".format(ck)
|
|
rows.append({ "type": "clustering-row", "key": { "raw": ck_raw }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": v_1k } } })
|
|
return rows
|
|
|
|
|
|
def find_component(generation, component_type, sst_dir):
|
|
comps = glob.glob(os.path.join(sst_dir, f"*-{str(generation)}-big-{component_type}.db"))
|
|
assert len(comps) == 1
|
|
return comps[0]
|
|
|
|
def find_partition_index_component(generation, sst_dir):
|
|
'''
|
|
Returns Index.db or Partitions.db component path for an sstable,
|
|
depending on the version.
|
|
'''
|
|
comps = (glob.glob(os.path.join(sst_dir, f"*-{str(generation)}-*-Index.db"))
|
|
+ glob.glob(os.path.join(sst_dir, f"*-{str(generation)}-*-Partitions.db")))
|
|
assert len(comps) == 1
|
|
return comps[0]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def sstable_cache(scylla_path):
|
|
"""Content-addressable sstable cache"""
|
|
class cache:
|
|
def __init__(self, scylla_path, workdir):
|
|
self._scylla_path = scylla_path
|
|
self._in_json = os.path.join(workdir, "input.json")
|
|
self._store_dir = os.path.join(workdir, "sst_store_dir")
|
|
self._cache = {}
|
|
|
|
os.mkdir(self._store_dir)
|
|
|
|
@property
|
|
def dir(self):
|
|
return self._store_dir
|
|
|
|
def get_generation(self, sst, schema_file, version):
|
|
json_str = json.dumps(sst)
|
|
with open(schema_file, "r") as f:
|
|
schema_content = f.read()
|
|
cache_key = (json_str, schema_content, version)
|
|
generation = self._cache.get(cache_key)
|
|
if not generation is None:
|
|
return generation
|
|
with open(self._in_json, "w") as f:
|
|
f.write(json_str)
|
|
version_arg = ["--sstable-version", version] if version is not None else []
|
|
generation = subprocess.check_output([self._scylla_path, "sstable", "write", *version_arg, "--schema-file", schema_file, "--output-dir", self._store_dir, "--input-file", self._in_json, "--input-format", "json"], text=True).strip()
|
|
self._cache[cache_key] = generation
|
|
return generation
|
|
|
|
def copy_sstable_to(self, generation, target_dir):
|
|
for f in glob.glob(os.path.join(self._store_dir, f"*-{str(generation)}-big-*.*")):
|
|
shutil.copy(f, target_dir)
|
|
|
|
with tempfile.TemporaryDirectory() as workdir:
|
|
yield cache(scylla_path, workdir)
|
|
|
|
|
|
def validate_mixed_sstable_pair(ssta, sstb, scylla_path, sst_cache, sst_work_dir, schema_file, version, error_message):
|
|
"""Validate an sstable, created by mixing the index and data from two different sstables.
|
|
|
|
Check that the validation has the expected result (`error_message`).
|
|
"""
|
|
generation_a = sst_cache.get_generation(ssta, schema_file, version)
|
|
generation_b = sst_cache.get_generation(sstb, schema_file, version)
|
|
shutil.rmtree(sst_work_dir)
|
|
os.mkdir(sst_work_dir)
|
|
sst_cache.copy_sstable_to(generation_a, sst_work_dir)
|
|
shutil.copyfile(find_partition_index_component(generation_b, sst_cache.dir), find_partition_index_component(generation_a, sst_work_dir))
|
|
|
|
res = subprocess.run([scylla_path, "sstable", "validate", "--schema-file", schema_file, find_component(generation_a, "Data", sst_work_dir)],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
out = res.stdout.decode('utf-8')
|
|
err = res.stderr.decode('utf-8')
|
|
sstables = json.loads(out)['sstables']
|
|
valid = next(iter(sstables.values()))['valid']
|
|
if error_message:
|
|
assert not valid
|
|
assert error_message in err
|
|
else:
|
|
print(err)
|
|
assert valid
|
|
|
|
|
|
def make_partition(pk, ck, v):
|
|
pks = [ "000400000001", "000400000000", "000400000002", "000400000003" ]
|
|
r = { "type": "clustering-row", "key": { "raw": "0004{:08x}".format(ck) }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": v } } }
|
|
return { "key": { "raw": pks[pk] }, "clustering_elements": [ r ] }
|
|
|
|
|
|
def make_large_partition(pk, rows, with_static_row = False):
|
|
pks = [ "000400000001", "000400000000", "000400000002", "000400000003" ]
|
|
if with_static_row:
|
|
return {
|
|
"key": { "raw": pks[pk] },
|
|
"static_row": { "s": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "s" * 128 } },
|
|
"clustering_elements": rows
|
|
}
|
|
else:
|
|
return { "key": { "raw": pks[pk] }, "clustering_elements": rows }
|
|
|
|
big_versions = ["me"]
|
|
bti_versions = ["ms"]
|
|
versions = big_versions + bti_versions + [None]
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_ok1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v')],
|
|
[make_partition(0, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_ok2(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v'), make_partition(1, 0, 'v'), make_partition(2, 0, 'v')],
|
|
[make_partition(0, 0, 'v'), make_partition(1, 0, 'v'), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_mismatching_partition(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v')],
|
|
[make_partition(1, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: partition mismatch")
|
|
|
|
# FIXME: this should pass on ms too, no?
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_mismatching_partition_large(cql, scylla_path, temp_workdir, schema1_file, large_rows, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows)],
|
|
[make_large_partition(1, large_rows)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: partition mismatch")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_mismatching_position(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
|
[make_partition(0, 0, 'vv'), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: position mismatch")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_index_ends_before_data(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
|
[make_partition(0, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: index is at EOF, but data file has more data")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
@pytest.mark.xfail(reason="index's EOF definition depends on data file size")
|
|
def test_scylla_sstable_validate_index_ends_before_data1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v')],
|
|
[make_partition(0, 0, 'vvvvvvvvvvvvvvvvvvvvvv'), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: data is at EOF, but index has more data")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
@pytest.mark.xfail(reason="index's EOF definition depends on data file size")
|
|
def test_scylla_sstable_validate_index_ends_before_data2(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_partition(0, 0, 'v')],
|
|
[make_partition(0, 0, 'v'), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: data is at EOF, but index has more data")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_large_rows_ok1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows)],
|
|
[make_large_partition(0, large_rows)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_large_rows_ok2(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows)],
|
|
[make_large_partition(0, large_rows)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema2_file,
|
|
version = version,
|
|
error_message = "")
|
|
|
|
@pytest.mark.parametrize("version", versions)
|
|
def test_scylla_sstable_validate_large_rows_ok3(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows, True)],
|
|
[make_large_partition(0, large_rows, True)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema2_file,
|
|
version = version,
|
|
error_message = "")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_mismatching_position1(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows, version):
|
|
row_0x = { "type": "clustering-row", "key": { "raw": "000400000000" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "v" } } }
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows)],
|
|
[make_large_partition(0, [row_0x] + large_rows[1:])],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: position mismatch: promoted index:")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_mismatching_position2(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows, True)],
|
|
[make_large_partition(0, large_rows)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema2_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: position mismatch: promoted index:")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_mismatching_position3(cql, scylla_path, temp_workdir, schema2_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows)],
|
|
[make_large_partition(0, large_rows, True)],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema2_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: position mismatch: promoted index:")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_mismatching_row(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, [large_rows[1]] + large_rows[4:])],
|
|
[make_large_partition(0, [large_rows[2]] + large_rows[4:])],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: clustering element")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_end_of_pi_not_end_of_rows(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows, version):
|
|
row_0x = { "type": "clustering-row", "key": { "raw": "000400000000" }, "columns": { "v": { "is_live": True, "type": "regular", "timestamp": 1680263186359882, "value": "v" } } }
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, [row_0x]), make_partition(2, 0, 'v')],
|
|
[make_large_partition(0, large_rows), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: promoted index has more blocks, but it is end of partition")
|
|
|
|
@pytest.mark.parametrize("version", big_versions)
|
|
def test_scylla_sstable_validate_large_rows_end_of_rows_not_end_of_pi(cql, scylla_path, temp_workdir, schema1_file, sstable_cache, large_rows, version):
|
|
validate_mixed_sstable_pair(
|
|
[make_large_partition(0, large_rows), make_partition(2, 0, 'v')],
|
|
[make_large_partition(0, large_rows[:96]), make_partition(2, 0, 'v')],
|
|
scylla_path,
|
|
sstable_cache,
|
|
temp_workdir,
|
|
schema_file = schema1_file,
|
|
version = version,
|
|
error_message = "mismatching index/data: promoted index has no more blocks, but partition")
|