Files
scylladb/test/cluster/dtest/scrub_test.py
Artsiom Mishuta 7b30a3981b test.py: enable strict_config,xfail_strict,strict-markers
this commit enables 3 strict pytest options:

strict_config - if any warnings encountered while parsing the pytest section of the configuration file will raise errors.
xfail_strict - if markers not registered in the markers section of the configuration file will raise errors.
strict-markers - if tests marked with @pytest.mark.xfail that actually succeed will by default fail the test suite

and fix errors that occur after enabling these options

Closes scylladb/scylladb#28859
2026-03-05 12:54:26 +02:00

481 lines
19 KiB
Python

#
# The original Apache Cassandra license:
#
# SPDX-License-Identifier: Apache-2.0
#
# Modifications: Copyright (C) 2021-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import glob
import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
import uuid
import pytest
from ccmlib import common
from dtest_class import Tester, create_cf, create_ks
logger = logging.getLogger(__name__)
KEYSPACE = "ks"
class TestHelper(Tester):
def get_table_path(self, table):
"""
Return the path where the table sstables are located
"""
node1 = self.cluster.nodelist()[0]
path = ""
basepath = os.path.join(node1.get_path(), "data", KEYSPACE)
for x in os.listdir(basepath):
if x.startswith(table):
path = os.path.join(basepath, x)
break
return path
def get_index_path(self, index):
"""
Return the path where the index sstables are located
"""
node1 = self.cluster.nodelist()[0]
basepath = os.path.join(node1.get_path(), "data", KEYSPACE)
index_path = ""
for x in os.listdir(basepath):
if x.startswith(index):
index_path = os.path.join(basepath, x)
break
return index_path
def get_sstable_files(self, path):
"""
Return the sstable files at a specific location
"""
ret = []
logger.debug(f"Checking sstables in {path}")
for ext in ("*.db", "*.txt", "*.adler32", "*.sha1"):
for fname in glob.glob(os.path.join(path, ext)):
bname = os.path.basename(fname)
if "-scylla." in bname.lower():
continue
ret.append(bname)
return ret
def delete_non_essential_sstable_files(self, table):
"""
Delete all sstable files except for the -Data.db file and the
-Statistics.db file (only available in >= 3.0)
"""
# NOTE: TOC file is essential for ScyllaDB, also any component in the
# TOC is required, so we need to remove deleted components from the TOC too.
# See https://github.com/scylladb/scylladb/issues/21145
sstable_re = re.compile(
r"""(?P<version>la|m[cdes]|n[a-b]|o[a]|d[a])- # the sstable version
(?P<id>[^-]+)- # sstable identifier
(?P<format>\w+)- # format: 'big' or 'bti'
(?P<component>.*) # component: e.g., 'Data'""",
re.X,
)
bti_tocs = list()
big_tocs = list()
for fname in self.get_sstable_files(self.get_table_path(table)):
# Collect removed TOCs, to be restored later.
fullname = os.path.join(self.get_table_path(table), fname)
matched = sstable_re.fullmatch(os.path.basename(fname))
if matched and matched["component"] == "TOC.txt":
if matched["version"] in ["ms", "da"]:
bti_tocs.append(fullname)
else:
big_tocs.append(fullname)
if not matched or matched["component"] not in ["Data.db", "Index.db", "Statistics.db", "Partitions.db", "Rows.db"]:
logger.debug(f"Deleting {fullname}")
os.remove(fullname)
logger.info(f"TOCS: {bti_tocs + big_tocs}")
# Restore TOCs
for toc in big_tocs:
logger.debug(f"restoring TOC {toc}")
with open(toc, "w") as f:
f.write("Data.db\n")
f.write("Index.db\n")
f.write("Statistics.db\n")
f.write("TOC.txt\n")
f.flush()
for toc in bti_tocs:
logger.debug(f"restoring TOC {toc}")
with open(toc, "w") as f:
f.write("Data.db\n")
f.write("Partitions.db\n")
f.write("Rows.db\n")
f.write("Statistics.db\n")
f.write("TOC.txt\n")
f.flush()
def get_sstables(self, table, indexes):
"""
Return the sstables for a table and the specified indexes of this table
"""
sstables = {}
table_sstables = self.get_sstable_files(self.get_table_path(table))
assert len(table_sstables) > 0, f"sstables were not found in {self.get_table_path(table)}"
sstables[table] = sorted(table_sstables)
for index in indexes:
index_sstables = self.get_sstable_files(self.get_index_path(index))
assert len(index_sstables) > 0, f"No indexes were found by path: {self.get_index_path(index)}"
sstables[index] = sorted(f"{index}/{sstable}" for sstable in index_sstables)
return sstables
def launch_nodetool_cmd(self, cmd):
"""
Launch a nodetool command and check the result is empty (no error)
"""
node1 = self.cluster.nodelist()[0]
response = node1.nodetool(cmd, capture_output=True)[0]
assert len(response) == 0, response # nodetool does not print anything unless there is an error
def launch_standalone_scrub(self, ks, cf):
"""
Launch the standalone scrub
"""
node1 = self.cluster.nodelist()[0]
table_path = self.get_table_path(cf)
with tempfile.TemporaryDirectory() as tmp_dir:
node1.run_scylla_sstable("scrub", additional_args=["--scrub-mode", "abort", "--output-dir", tmp_dir, "--logger-log-level", "scylla-sstable=debug"], keyspace=ks, column_families=[cf])
# Replace the table's sstables with the scrubbed ones, just like online scrub would do.
shutil.rmtree(table_path)
shutil.copytree(tmp_dir, table_path)
def perform_node_tool_cmd(self, cmd, table, indexes):
"""
Perform a nodetool command on a table and the indexes specified
"""
self.launch_nodetool_cmd(f"{cmd} {KEYSPACE} {table}")
for index in indexes:
self.launch_nodetool_cmd(f"{cmd} {KEYSPACE} {index}_index")
def flush(self, table, *indexes):
"""
Flush table and indexes via nodetool, and then return all sstables
in a dict keyed by the table or index name.
"""
self.perform_node_tool_cmd("flush", table, indexes)
return self.get_sstables(table, indexes)
def scrub(self, table, *indexes):
"""
Scrub table and indexes via nodetool, and then return all sstables
in a dict keyed by the table or index name.
"""
self.perform_node_tool_cmd("scrub", table, indexes)
return self.get_sstables(table, indexes)
def standalonescrub(self, table, *indexes):
"""
Launch standalone scrub on table and indexes, and then return all sstables
in a dict keyed by the table or index name.
"""
self.launch_standalone_scrub(KEYSPACE, table)
for index in indexes:
self.launch_standalone_scrub(KEYSPACE, f"{index}_index")
return self.get_sstables(table, indexes)
@pytest.mark.single_node
class TestScrubIndexes(TestHelper):
"""
Test that we scrub indexes as well as their parent tables
"""
def create_users(self, session):
columns = {"password": "varchar", "gender": "varchar", "session_token": "varchar", "state": "varchar", "birth_year": "bigint"}
create_cf(session, "users", columns=columns)
session.execute("CREATE INDEX gender_idx ON users (gender)")
session.execute("CREATE INDEX state_idx ON users (state)")
session.execute("CREATE INDEX birth_year_idx ON users (birth_year)")
def update_users(self, session):
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user1', 'ch@ngem3a', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user2', 'ch@ngem3b', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user3', 'ch@ngem3c', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user4', 'ch@ngem3d', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user5', 'ch@ngem3e', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user6', 'ch@ngem3f', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user7', 'ch@ngem3g', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user8', 'ch@ngem3h', 'm', 'CA', 1982)")
session.execute("DELETE FROM users where KEY = 'user1'")
session.execute("DELETE FROM users where KEY = 'user5'")
session.execute("DELETE FROM users where KEY = 'user7'")
def query_users(self, session):
ret = list(session.execute("SELECT * FROM users"))
ret.extend(list(session.execute("SELECT * FROM users WHERE state='TX'")))
ret.extend(list(session.execute("SELECT * FROM users WHERE gender='f'")))
ret.extend(list(session.execute("SELECT * FROM users WHERE birth_year=1978")))
assert len(ret) == 8, "Invalid number of records in table 'users'"
return ret
def test_scrub_static_table(self):
cluster = self.cluster
cluster.set_configuration_options(
values={
"tablets_initial_scale_factor": 1,
}
)
cluster.populate(1).start(jvm_args=["--smp", "1"])
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
self.create_users(session)
self.update_users(session)
initial_users = self.query_users(session)
initial_sstables = self.flush("users", "gender_idx", "state_idx", "birth_year_idx")
scrubbed_sstables = self.scrub("users", "gender_idx", "state_idx", "birth_year_idx")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
# Scrub and check sstables and data again
scrubbed_sstables = self.scrub("users", "gender_idx", "state_idx", "birth_year_idx")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
# Restart and check data again
cluster.stop()
cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE %s" % (KEYSPACE))
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
def test_standalone_scrub(self):
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
self.create_users(session)
self.update_users(session)
initial_users = self.query_users(session)
initial_sstables = self.flush("users", "gender_idx", "state_idx", "birth_year_idx")
cluster.stop()
scrubbed_sstables = self.standalonescrub("users", "gender_idx", "state_idx", "birth_year_idx")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE %s" % (KEYSPACE))
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
def test_scrub_collections_table_without_indexes(self):
cluster = self.cluster
cluster.set_configuration_options(
values={
"tablets_initial_scale_factor": 1,
}
)
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
session.execute("CREATE TABLE users (user_id uuid PRIMARY KEY, email text, uuids list<uuid>)")
_id = uuid.uuid4()
num_users = 100
for i in range(num_users):
user_uuid = uuid.uuid4()
session.execute(f"INSERT INTO users (user_id, email) values ({user_uuid}, 'test@example.com')")
session.execute(f"UPDATE users set uuids = [{_id}] where user_id = {user_uuid}")
initial_users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
assert num_users == len(initial_users), "Not all users were added to table"
initial_sstables = self.flush("users")
scrubbed_sstables = self.scrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
assert initial_users == users, "List of users before and after scrub are different"
scrubbed_sstables = self.scrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
assert initial_users == users, "List of users before and after scrub are different"
@pytest.mark.single_node
class TestScrub(TestHelper):
"""
Generic tests for scrubbing
"""
def create_users(self, session):
columns = {"password": "varchar", "gender": "varchar", "session_token": "varchar", "state": "varchar", "birth_year": "bigint"}
create_cf(session, "users", columns=columns)
def update_users(self, session):
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user1', 'ch@ngem3a', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user2', 'ch@ngem3b', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user3', 'ch@ngem3c', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user4', 'ch@ngem3d', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user5', 'ch@ngem3e', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user6', 'ch@ngem3f', 'm', 'CA', 1982)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user7', 'ch@ngem3g', 'f', 'TX', 1978)")
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user8', 'ch@ngem3h', 'm', 'CA', 1982)")
session.execute("DELETE FROM users where KEY = 'user1'")
session.execute("DELETE FROM users where KEY = 'user5'")
session.execute("DELETE FROM users where KEY = 'user7'")
def query_users(self, session):
ret = list(session.execute("SELECT * FROM users"))
assert len(ret) == 5, "Amount of users is different"
return ret
def test_nodetool_scrub(self):
cluster = self.cluster
cluster.set_configuration_options(
values={
"tablets_initial_scale_factor": 1,
}
)
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
self.create_users(session)
self.update_users(session)
initial_users = self.query_users(session)
initial_sstables = self.flush("users")
scrubbed_sstables = self.scrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
# Scrub and check sstables and data again
scrubbed_sstables = self.scrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
# Restart and check data again
cluster.stop()
cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE %s" % (KEYSPACE))
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
def test_standalone_scrub(self):
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
self.create_users(session)
self.update_users(session)
initial_users = self.query_users(session)
initial_sstables = self.flush("users")
cluster.stop()
scrubbed_sstables = self.standalonescrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE %s" % (KEYSPACE))
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
def test_standalone_scrub_essential_files_only(self):
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, KEYSPACE, 1)
self.create_users(session)
self.update_users(session)
initial_users = self.query_users(session)
initial_sstables = self.flush("users")
cluster.stop()
self.delete_non_essential_sstable_files("users")
scrubbed_sstables = self.standalonescrub("users")
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
cluster.start()
session = self.patient_cql_connection(node1)
session.execute("USE %s" % (KEYSPACE))
users = self.query_users(session)
assert initial_users == users, "List of users before and after scrub are different"
def test_scrub_with_udt(self):
"""
@jira_ticket CASSANDRA-7665
"""
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 };")
session.execute("use test;")
session.execute("CREATE TYPE point_t (x double, y double);")
node1.nodetool("scrub")
time.sleep(2)
match = node1.grep_log("org.apache.cassandra.serializers.MarshalException: Not enough bytes to read a set")
assert len(match) == 0, f"{len(match)} is not equal 0"