This flag was added to operations which have an --output-dir command-line arguments. These operations write sstables and need a directory where to write them. Back in the numeric-generation world this posed a problem: if the directory contained any sstable, generation clash was almost guaranteed, because each scylla-sstable command invokation would start output generations from 1. To avoid this, empty output directory was a requirement, with the --unsafe-accept-nonempty-output-dir allowing for a force-override. Now in the timeuuid generation days, all this is not necessary anymore: generations are unique, so it is not a problem if the output directory already contains sstables: the probability of generation clash is almost 0. Even if it happens, the tool will just simply fail to write the new sstable with the clashing generation. Remove this historic relic of a flag and the related logic, it is just a pointless nuissance nowadays.
485 lines
20 KiB
Python
485 lines
20 KiB
Python
#
|
|
# The original Apache Cassandra license:
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
# Modifications: Copyright (C) 2021-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import glob
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
from ccmlib import common
|
|
|
|
from dtest_class import Tester, create_cf, create_ks
|
|
|
|
logger = logging.getLogger(__name__)
|
|
KEYSPACE = "ks"
|
|
|
|
|
|
class TestHelper(Tester):
|
|
def get_table_path(self, table):
|
|
"""
|
|
Return the path where the table sstables are located
|
|
"""
|
|
node1 = self.cluster.nodelist()[0]
|
|
path = ""
|
|
basepath = os.path.join(node1.get_path(), "data", KEYSPACE)
|
|
for x in os.listdir(basepath):
|
|
if x.startswith(table):
|
|
path = os.path.join(basepath, x)
|
|
break
|
|
return path
|
|
|
|
def get_index_path(self, index):
|
|
"""
|
|
Return the path where the index sstables are located
|
|
"""
|
|
node1 = self.cluster.nodelist()[0]
|
|
basepath = os.path.join(node1.get_path(), "data", KEYSPACE)
|
|
index_path = ""
|
|
for x in os.listdir(basepath):
|
|
if x.startswith(index):
|
|
index_path = os.path.join(basepath, x)
|
|
break
|
|
return index_path
|
|
|
|
def get_sstable_files(self, path):
|
|
"""
|
|
Return the sstable files at a specific location
|
|
"""
|
|
ret = []
|
|
logger.debug(f"Checking sstables in {path}")
|
|
|
|
for ext in ("*.db", "*.txt", "*.adler32", "*.sha1"):
|
|
for fname in glob.glob(os.path.join(path, ext)):
|
|
bname = os.path.basename(fname)
|
|
if "-scylla." in bname.lower():
|
|
continue
|
|
ret.append(bname)
|
|
return ret
|
|
|
|
def delete_non_essential_sstable_files(self, table):
|
|
"""
|
|
Delete all sstable files except for the -Data.db file and the
|
|
-Statistics.db file (only available in >= 3.0)
|
|
"""
|
|
# NOTE: TOC file is essential for ScyllaDB, also any component in the
|
|
# TOC is required, so we need to remove deleted components from the TOC too.
|
|
# See https://github.com/scylladb/scylladb/issues/21145
|
|
|
|
sstable_re = re.compile(
|
|
r"""(?P<version>la|m[cdes]|n[a-b]|o[a]|d[a])- # the sstable version
|
|
(?P<id>[^-]+)- # sstable identifier
|
|
(?P<format>\w+)- # format: 'big' or 'bti'
|
|
(?P<component>.*) # component: e.g., 'Data'""",
|
|
re.X,
|
|
)
|
|
bti_tocs = list()
|
|
big_tocs = list()
|
|
for fname in self.get_sstable_files(self.get_table_path(table)):
|
|
# Collect removed TOCs, to be restored later.
|
|
fullname = os.path.join(self.get_table_path(table), fname)
|
|
matched = sstable_re.fullmatch(os.path.basename(fname))
|
|
if matched and matched["component"] == "TOC.txt":
|
|
if matched["version"] in ["ms", "da"]:
|
|
bti_tocs.append(fullname)
|
|
else:
|
|
big_tocs.append(fullname)
|
|
if not matched or matched["component"] not in ["Data.db", "Index.db", "Statistics.db", "Partitions.db", "Rows.db"]:
|
|
logger.debug(f"Deleting {fullname}")
|
|
os.remove(fullname)
|
|
logger.info(f"TOCS: {bti_tocs + big_tocs}")
|
|
# Restore TOCs
|
|
for toc in big_tocs:
|
|
logger.debug(f"restoring TOC {toc}")
|
|
with open(toc, "w") as f:
|
|
f.write("Data.db\n")
|
|
f.write("Index.db\n")
|
|
f.write("Statistics.db\n")
|
|
f.write("TOC.txt\n")
|
|
f.flush()
|
|
for toc in bti_tocs:
|
|
logger.debug(f"restoring TOC {toc}")
|
|
with open(toc, "w") as f:
|
|
f.write("Data.db\n")
|
|
f.write("Partitions.db\n")
|
|
f.write("Rows.db\n")
|
|
f.write("Statistics.db\n")
|
|
f.write("TOC.txt\n")
|
|
f.flush()
|
|
|
|
def get_sstables(self, table, indexes):
|
|
"""
|
|
Return the sstables for a table and the specified indexes of this table
|
|
"""
|
|
sstables = {}
|
|
table_sstables = self.get_sstable_files(self.get_table_path(table))
|
|
assert len(table_sstables) > 0, f"sstables were not found in {self.get_table_path(table)}"
|
|
sstables[table] = sorted(table_sstables)
|
|
|
|
for index in indexes:
|
|
index_sstables = self.get_sstable_files(self.get_index_path(index))
|
|
assert len(index_sstables) > 0, f"No indexes were found by path: {self.get_index_path(index)}"
|
|
sstables[index] = sorted(f"{index}/{sstable}" for sstable in index_sstables)
|
|
|
|
return sstables
|
|
|
|
def launch_nodetool_cmd(self, cmd):
|
|
"""
|
|
Launch a nodetool command and check the result is empty (no error)
|
|
"""
|
|
node1 = self.cluster.nodelist()[0]
|
|
response = node1.nodetool(cmd, capture_output=True)[0]
|
|
assert len(response) == 0, response # nodetool does not print anything unless there is an error
|
|
|
|
def launch_standalone_scrub(self, ks, cf):
|
|
"""
|
|
Launch the standalone scrub
|
|
"""
|
|
node1 = self.cluster.nodelist()[0]
|
|
|
|
table_path = self.get_table_path(cf)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
node1.run_scylla_sstable("scrub", additional_args=["--scrub-mode", "abort", "--output-dir", tmp_dir, "--logger-log-level", "scylla-sstable=debug"], keyspace=ks, column_families=[cf])
|
|
# Replace the table's sstables with the scrubbed ones, just like online scrub would do.
|
|
shutil.rmtree(table_path)
|
|
shutil.copytree(tmp_dir, table_path)
|
|
|
|
def perform_node_tool_cmd(self, cmd, table, indexes):
|
|
"""
|
|
Perform a nodetool command on a table and the indexes specified
|
|
"""
|
|
self.launch_nodetool_cmd(f"{cmd} {KEYSPACE} {table}")
|
|
for index in indexes:
|
|
self.launch_nodetool_cmd(f"{cmd} {KEYSPACE} {index}_index")
|
|
|
|
def flush(self, table, *indexes):
|
|
"""
|
|
Flush table and indexes via nodetool, and then return all sstables
|
|
in a dict keyed by the table or index name.
|
|
"""
|
|
self.perform_node_tool_cmd("flush", table, indexes)
|
|
return self.get_sstables(table, indexes)
|
|
|
|
def scrub(self, table, *indexes):
|
|
"""
|
|
Scrub table and indexes via nodetool, and then return all sstables
|
|
in a dict keyed by the table or index name.
|
|
"""
|
|
self.perform_node_tool_cmd("scrub", table, indexes)
|
|
return self.get_sstables(table, indexes)
|
|
|
|
def standalonescrub(self, table, *indexes):
|
|
"""
|
|
Launch standalone scrub on table and indexes, and then return all sstables
|
|
in a dict keyed by the table or index name.
|
|
"""
|
|
self.launch_standalone_scrub(KEYSPACE, table)
|
|
for index in indexes:
|
|
self.launch_standalone_scrub(KEYSPACE, f"{index}_index")
|
|
return self.get_sstables(table, indexes)
|
|
|
|
|
|
@pytest.mark.dtest_full
|
|
@pytest.mark.single_node
|
|
@pytest.mark.next_gating
|
|
class TestScrubIndexes(TestHelper):
|
|
"""
|
|
Test that we scrub indexes as well as their parent tables
|
|
"""
|
|
|
|
def create_users(self, session):
|
|
columns = {"password": "varchar", "gender": "varchar", "session_token": "varchar", "state": "varchar", "birth_year": "bigint"}
|
|
create_cf(session, "users", columns=columns)
|
|
session.execute("CREATE INDEX gender_idx ON users (gender)")
|
|
session.execute("CREATE INDEX state_idx ON users (state)")
|
|
session.execute("CREATE INDEX birth_year_idx ON users (birth_year)")
|
|
|
|
def update_users(self, session):
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user1', 'ch@ngem3a', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user2', 'ch@ngem3b', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user3', 'ch@ngem3c', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user4', 'ch@ngem3d', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user5', 'ch@ngem3e', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user6', 'ch@ngem3f', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user7', 'ch@ngem3g', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user8', 'ch@ngem3h', 'm', 'CA', 1982)")
|
|
|
|
session.execute("DELETE FROM users where KEY = 'user1'")
|
|
session.execute("DELETE FROM users where KEY = 'user5'")
|
|
session.execute("DELETE FROM users where KEY = 'user7'")
|
|
|
|
def query_users(self, session):
|
|
ret = list(session.execute("SELECT * FROM users"))
|
|
ret.extend(list(session.execute("SELECT * FROM users WHERE state='TX'")))
|
|
ret.extend(list(session.execute("SELECT * FROM users WHERE gender='f'")))
|
|
ret.extend(list(session.execute("SELECT * FROM users WHERE birth_year=1978")))
|
|
assert len(ret) == 8, "Invalid number of records in table 'users'"
|
|
return ret
|
|
|
|
def test_scrub_static_table(self):
|
|
cluster = self.cluster
|
|
cluster.set_configuration_options(
|
|
values={
|
|
"tablets_initial_scale_factor": 1,
|
|
}
|
|
)
|
|
cluster.populate(1).start(jvm_args=["--smp", "1"])
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
self.create_users(session)
|
|
self.update_users(session)
|
|
|
|
initial_users = self.query_users(session)
|
|
initial_sstables = self.flush("users", "gender_idx", "state_idx", "birth_year_idx")
|
|
scrubbed_sstables = self.scrub("users", "gender_idx", "state_idx", "birth_year_idx")
|
|
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
# Scrub and check sstables and data again
|
|
scrubbed_sstables = self.scrub("users", "gender_idx", "state_idx", "birth_year_idx")
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
# Restart and check data again
|
|
cluster.stop()
|
|
cluster.start()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("USE %s" % (KEYSPACE))
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
def test_standalone_scrub(self):
|
|
cluster = self.cluster
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
self.create_users(session)
|
|
self.update_users(session)
|
|
|
|
initial_users = self.query_users(session)
|
|
initial_sstables = self.flush("users", "gender_idx", "state_idx", "birth_year_idx")
|
|
|
|
cluster.stop()
|
|
|
|
scrubbed_sstables = self.standalonescrub("users", "gender_idx", "state_idx", "birth_year_idx")
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
cluster.start()
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("USE %s" % (KEYSPACE))
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
def test_scrub_collections_table_without_indexes(self):
|
|
cluster = self.cluster
|
|
cluster.set_configuration_options(
|
|
values={
|
|
"tablets_initial_scale_factor": 1,
|
|
}
|
|
)
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
session.execute("CREATE TABLE users (user_id uuid PRIMARY KEY, email text, uuids list<uuid>)")
|
|
|
|
_id = uuid.uuid4()
|
|
num_users = 100
|
|
for i in range(num_users):
|
|
user_uuid = uuid.uuid4()
|
|
session.execute(f"INSERT INTO users (user_id, email) values ({user_uuid}, 'test@example.com')")
|
|
session.execute(f"UPDATE users set uuids = [{_id}] where user_id = {user_uuid}")
|
|
|
|
initial_users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
|
|
assert num_users == len(initial_users), "Not all users were added to table"
|
|
|
|
initial_sstables = self.flush("users")
|
|
scrubbed_sstables = self.scrub("users")
|
|
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
scrubbed_sstables = self.scrub("users")
|
|
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = list(session.execute(f"SELECT * from users where uuids contains {_id} ALLOW FILTERING"))
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
|
|
@pytest.mark.dtest_full
|
|
@pytest.mark.single_node
|
|
@pytest.mark.next_gating
|
|
class TestScrub(TestHelper):
|
|
"""
|
|
Generic tests for scrubbing
|
|
"""
|
|
|
|
def create_users(self, session):
|
|
columns = {"password": "varchar", "gender": "varchar", "session_token": "varchar", "state": "varchar", "birth_year": "bigint"}
|
|
create_cf(session, "users", columns=columns)
|
|
|
|
def update_users(self, session):
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user1', 'ch@ngem3a', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user2', 'ch@ngem3b', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user3', 'ch@ngem3c', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user4', 'ch@ngem3d', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user5', 'ch@ngem3e', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user6', 'ch@ngem3f', 'm', 'CA', 1982)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user7', 'ch@ngem3g', 'f', 'TX', 1978)")
|
|
session.execute("INSERT INTO users (KEY, password, gender, state, birth_year) VALUES ('user8', 'ch@ngem3h', 'm', 'CA', 1982)")
|
|
|
|
session.execute("DELETE FROM users where KEY = 'user1'")
|
|
session.execute("DELETE FROM users where KEY = 'user5'")
|
|
session.execute("DELETE FROM users where KEY = 'user7'")
|
|
|
|
def query_users(self, session):
|
|
ret = list(session.execute("SELECT * FROM users"))
|
|
assert len(ret) == 5, "Amount of users is different"
|
|
return ret
|
|
|
|
def test_nodetool_scrub(self):
|
|
cluster = self.cluster
|
|
cluster.set_configuration_options(
|
|
values={
|
|
"tablets_initial_scale_factor": 1,
|
|
}
|
|
)
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
self.create_users(session)
|
|
self.update_users(session)
|
|
|
|
initial_users = self.query_users(session)
|
|
initial_sstables = self.flush("users")
|
|
scrubbed_sstables = self.scrub("users")
|
|
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
# Scrub and check sstables and data again
|
|
scrubbed_sstables = self.scrub("users")
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
# Restart and check data again
|
|
cluster.stop()
|
|
cluster.start()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("USE %s" % (KEYSPACE))
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
def test_standalone_scrub(self):
|
|
cluster = self.cluster
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
self.create_users(session)
|
|
self.update_users(session)
|
|
|
|
initial_users = self.query_users(session)
|
|
initial_sstables = self.flush("users")
|
|
|
|
cluster.stop()
|
|
|
|
scrubbed_sstables = self.standalonescrub("users")
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
cluster.start()
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("USE %s" % (KEYSPACE))
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
def test_standalone_scrub_essential_files_only(self):
|
|
cluster = self.cluster
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
create_ks(session, KEYSPACE, 1)
|
|
|
|
self.create_users(session)
|
|
self.update_users(session)
|
|
|
|
initial_users = self.query_users(session)
|
|
initial_sstables = self.flush("users")
|
|
|
|
cluster.stop()
|
|
|
|
self.delete_non_essential_sstable_files("users")
|
|
|
|
scrubbed_sstables = self.standalonescrub("users")
|
|
assert len(initial_sstables) == len(scrubbed_sstables), "Amount of sstables before and after scrub differs"
|
|
|
|
cluster.start()
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("USE %s" % (KEYSPACE))
|
|
|
|
users = self.query_users(session)
|
|
assert initial_users == users, "List of users before and after scrub are different"
|
|
|
|
def test_scrub_with_udt(self):
|
|
"""
|
|
@jira_ticket CASSANDRA-7665
|
|
"""
|
|
cluster = self.cluster
|
|
cluster.populate(1).start()
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
session.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 };")
|
|
session.execute("use test;")
|
|
session.execute("CREATE TYPE point_t (x double, y double);")
|
|
|
|
node1.nodetool("scrub")
|
|
time.sleep(2)
|
|
match = node1.grep_log("org.apache.cassandra.serializers.MarshalException: Not enough bytes to read a set")
|
|
assert len(match) == 0, f"{len(match)} is not equal 0"
|