Files
scylladb/test/cluster/dtest/commitlog_test.py
Calle Wilund e48170ca8e commitlog_replay: Handle fully corrupt files same as partial corruption.
Fixes #26744

If a segment to replay is broken such that the main header is not zero,
but still broken, we throw header_checksum_error. This was not handled in
replayer, which grouped this into the "user error/fundamental problem"
category. However, assuming we allow for "real" disk corruption, this should
really be treated same as data corruption, i.e. reported data loss, not
failure to start up.

The `test_one_big_mutation_corrupted_on_startup` test accidentally sometimes
provoked this issue, by doing random file wrecking, which on rare occasions
provoked this, and thus failed test due to scylla not starting up, instead
of loosing data as expected.

Changed test to consistently cause this exact error instead.
2025-12-10 15:37:04 +01:00

832 lines
36 KiB
Python

#
# Copyright (C) 2015-present The Apache Software Foundation
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import glob
import logging
import os
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
import pytest
from cassandra.cluster import Session
from ccmlib.scylla_cluster import ScyllaCluster
from ccmlib.scylla_node import ScyllaNode
from dtest_class import Tester, create_cf, create_ks
from tools.assertions import (
assert_all,
assert_almost_equal,
assert_lists_equal_ignoring_order,
assert_row_count,
assert_row_count_in_select_less,
)
from tools.data import insert_c1c2, rows_to_list
from tools.metrics import get_node_metrics
logger = logging.getLogger(__name__)
@pytest.mark.single_node
class TestCommitLog(Tester):
"""CommitLog Tests"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns += [
r"commitlog - Exception in segment reservation: storage_io_error \(Storage I/O error: 13: filesystem error: open failed",
"Shutting down communications due to I/O errors until operator intervention",
r"commitlog - Failed to (flush|persist) commits to disk.*storage_io_error \(Storage I/O error: 28: No space left on device\)",
r"storage_proxy - .*\(Could not write mutation .* to commitlog\): storage_io_error \(Storage I/O error: 28: No space left on device\)",
]
@pytest.fixture(autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
fixture_dtest_setup.cluster.set_configuration_options({"start_rpc": "true"})
fixture_dtest_setup.cluster.populate(1)
[self.node1] = fixture_dtest_setup.cluster.nodelist()
def prepare(self, configuration=None, create_test_keyspace=True, **kwargs):
if configuration is None:
configuration = {}
conf = {"commitlog_sync_period_in_ms": 1000}
conf.update(configuration)
self.cluster.set_configuration_options(values=conf, **kwargs)
self.cluster.start()
unknown_options = self.cluster.nodelist()[0].grep_log("config - Unknown option")
if unknown_options:
pytest.fail(f"Unknown option found! Please check the test! {unknown_options}")
self.session1 = self.patient_cql_connection(self.node1)
if create_test_keyspace:
self.session1.execute("DROP KEYSPACE IF EXISTS ks;")
create_ks(self.session1, "ks", 1)
self.session1.execute("DROP TABLE IF EXISTS test;")
query = """
CREATE TABLE test (
key int primary key,
col1 int
)
"""
self.session1.execute(query)
def _get_commitlog_path(self):
"""Returns the commitlog path"""
return os.path.join(self.node1.get_path(), "commitlog")
def _get_commitlog_files(self):
"""Returns the commitlog files in the directory"""
path = self._get_commitlog_path()
return glob.glob(path + "/CommitLog-*.log")
def _get_commitlog_size(self, include_stdout=False, allow_errors=False):
"""Returns the commitlog directory size in MB"""
path = self._get_commitlog_path()
files = glob.glob(f"{path}/*CommitLog-*.log")
if not files:
return 0, "" if include_stdout else 0
cmd_args = ["du", "-m"]
cmd_args.extend(files)
p = subprocess.Popen(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
exit_status = p.returncode
# allow_error is needed for running du
# while the node is up and running
# and commitlog files may be created and deleted
# while du runs.
if exit_status:
if allow_errors:
return -1, stderr.decode()
pytest.fail("du exited with a non-zero status: %d" % exit_status + "\n%s" % stderr.decode())
# du will print a line for file
size = 0
for l in stdout.decode().split("\n"):
if not l:
continue
a = l.split()
if len(a) == 2:
size += int(a[0])
else:
logger.warning(f"Unrecognized du output line: {l}")
return size, stdout.decode() if include_stdout else size
def _segment_size_test(self, segment_size_in_mb, compressed=False):
"""Execute a basic commitlog test and validate the commitlog files"""
conf = {"commitlog_segment_size_in_mb": segment_size_in_mb}
if compressed:
conf["commitlog_compression"] = [{"class_name": "LZ4Compressor"}]
conf["memtable_heap_space_in_mb"] = 512
self.prepare(configuration=conf, create_test_keyspace=False)
segment_size = segment_size_in_mb * 1024 * 1024
self.node1.stress(["write", "n=150000", "-rate", "threads=25"])
time.sleep(1)
commitlogs = self._get_commitlog_files()
assert len(commitlogs) > 0, "No commit log files were created"
# the most recently-written segment of the commitlog may be smaller
# than the expected size, so we allow exactly one segment to be smaller
smaller_found = False
for i, f in enumerate(commitlogs):
size = os.path.getsize(f)
size_in_mb = int(size / 1024 / 1024)
logger.debug(f"segment file {f} {size_in_mb}; smaller already found: {smaller_found}")
if size_in_mb < 1 or size < (segment_size * 0.1):
continue # commitlog not yet used
try:
if compressed:
# if compression is used, we assume there will be at most a 50% compression ratio
assert size <= segment_size, f"expect {size} < {segment_size}"
assert size > segment_size / 2, f"expect {size} > {segment_size / 2}"
else:
# if no compression is used, the size will be close to what we expect
assert_almost_equal(size, segment_size, error=0.05)
except AssertionError as e:
# the last segment may be smaller
if not smaller_found:
assert size <= segment_size, f"expects size <= segment_size, actual are {size} {segment_size}"
smaller_found = True
else:
raise e
def test_commitlog_replay_on_startup(self):
"""Test commit log replay"""
node1 = self.node1
node1.set_configuration_options(batch_commitlog=True)
node1.start(wait_for_binary_proto=True)
logger.debug("Insert data")
session = self.patient_cql_connection(node1)
create_ks(session, "Test", 1)
session.execute(
"""
CREATE TABLE users (
user_name varchar PRIMARY KEY,
password varchar,
gender varchar,
state varchar,
birth_year bigint
);
"""
)
session.execute("INSERT INTO Test. users (user_name, password, gender, state, birth_year) VALUES('gandalf', 'p@$$', 'male', 'WA', 1955);")
logger.debug("Verify data is present")
session = self.patient_cql_connection(node1)
res = session.execute("SELECT * FROM Test. users")
assert rows_to_list(res) == [["gandalf", 1955, "male", "p@$$", "WA"]]
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Verify commitlog was written before abrupt stop")
commitlog_files = self._get_commitlog_files()
assert len(commitlog_files) > 0, f"expecting positive, len(commitlog_files)={len(commitlog_files)}"
logger.debug("Verify no SSTables were flushed before abrupt stop")
data_dir = os.path.join(node1.get_path(), "data")
cf_id = next(s for s in os.listdir(os.path.join(data_dir, "test")) if s.startswith("users"))
cf_data_dir = glob.glob(f"{data_dir}/test/{cf_id}")[0]
cf_data_dir_files = os.listdir(cf_data_dir)
for special_dir in ["backups", "upload", "staging"]:
if special_dir in cf_data_dir_files:
cf_data_dir_files.remove(special_dir)
assert len(cf_data_dir_files) == 0, f"expecting 0, len(cf_data_dir_files)={len(cf_data_dir_files)}"
logger.debug("Verify commit log was replayed on startup")
node1.start(wait_for_binary_proto=False)
assert node1.is_running(), "node is not running"
node1.watch_log_for("Log replay complete")
# Here we verify there was more than 0 replayed mutations
zero_replays = node1.grep_log(" 0 replayed mutations", filter_expr="DEBUG")
assert len(zero_replays) == 0, f"expect 0, len(zero_replays)={len(zero_replays)}"
logger.debug("Make query and ensure data is present")
session = self.patient_cql_connection(node1)
res = session.execute("SELECT * FROM Test. users")
assert_lists_equal_ignoring_order(rows_to_list(res), [["gandalf", 1955, "male", "p@$$", "WA"]])
def test_commitlog_replay_with_alter_table(self):
"""
Test commit log replay with alter table
The goal of the test is to verify that commitlog replay works correctly even if the commitlog contains
mutations written using old versions of the schema.
Based on test_commitlog_replay_on_startup
"""
node1 = self.node1
node1.set_configuration_options(batch_commitlog=True)
node1.start(wait_for_binary_proto=True)
logger.debug("Create table")
session = self.patient_cql_connection(node1)
create_ks(session, "Test", 1)
session.execute(
"""
CREATE TABLE cf (
pk1 int,
ck1 int,
r2 int,
r3 text,
r5 set<int>,
PRIMARY KEY(pk1, ck1)
);
"""
)
logger.debug("Insert some data")
n_partitions = 5
for key in range(n_partitions):
session.execute("INSERT INTO Test.cf (pk1, ck1, r2, r3, r5) VALUES(%d, 9, 8, 'seven', {6, 5});" % (key))
session.execute("INSERT INTO Test.cf (pk1, ck1, r3) VALUES(%d, 8, 'eight');" % (key))
logger.debug("Flush")
self.cluster.flush()
logger.debug("Insert more data and alter table")
for key in range(n_partitions):
session.execute("INSERT INTO Test.cf (pk1, ck1, r2, r3, r5) VALUES(%d, 0, 1, 'two', {3, 4});" % (key))
session.execute("ALTER TABLE Test.cf ADD r1 int;")
for key in range(n_partitions):
session.execute("INSERT INTO Test.cf (pk1, ck1, r1, r2, r3, r5) VALUES(%d, 1, 2, 3, 'four', {5, 6, 7});" % (key))
session.execute("ALTER TABLE Test.cf DROP r2;")
for key in range(n_partitions):
session.execute("INSERT INTO Test.cf (pk1, ck1, r1, r3) VALUES(%d, 2, 3, 'four');" % (key))
session.execute("ALTER TABLE Test.cf DROP r5;")
session.execute("ALTER TABLE Test.cf ADD r2 varint;")
session.execute("ALTER TABLE Test.cf ADD r4 int;")
for key in range(n_partitions):
session.execute("INSERT INTO Test.cf (pk1, ck1, r2, r4) VALUES(%d, 0, 99, 999);" % (key))
logger.debug("Verify data is present")
session = self.patient_cql_connection(node1)
for key in range(n_partitions):
assert_all(
session,
f"SELECT * FROM Test.cf where pk1 = {key}",
[
[key, 0, None, 99, "two", 999],
[key, 1, 2, None, "four", None],
[key, 2, 3, None, "four", None],
[key, 8, None, None, "eight", None],
[key, 9, None, None, "seven", None],
],
ignore_order=True,
)
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Verify commitlog was written before abrupt stop")
commitlog_files = self._get_commitlog_files()
assert len(commitlog_files) > 0, f"expect >0, len(commitlog_files)={len(commitlog_files)}"
logger.debug("Verify commitlog was replayed on startup")
node1.start(wait_for_binary_proto=False)
node1.watch_log_for("Log replay complete")
replays = node1.grep_log(r" (\d+) replayed mutations", filter_expr="DEBUG")
assert len(replays) > 0, f"expect >0, len(replays)={len(replays)}"
replayed_mutations = 0
for line, m in replays:
replayed_mutations += int(m.group(1))
assert replayed_mutations >= 4, f"expect >=4, replayed_mutations={replayed_mutations}"
logger.debug("Make query and ensure data is present")
session = self.patient_cql_connection(node1)
for key in range(n_partitions):
assert_all(
session,
f"SELECT * FROM Test.cf where pk1 = {key}",
[
[key, 0, None, 99, "two", 999],
[key, 1, 2, None, "four", None],
[key, 2, 3, None, "four", None],
[key, 8, None, None, "eight", None],
[key, 9, None, None, "seven", None],
],
ignore_order=True,
)
def test_default_segment_size(self):
"""Test default commitlog_segment_size_in_mb (32MB)"""
self._segment_size_test(32)
def test_small_segment_size(self):
"""Test a small commitlog_segment_size_in_mb (5MB)"""
self._segment_size_test(5)
def test_default_compressed_segment_size(self):
"""Test default compressed commitlog_segment_size_in_mb (32MB)"""
# Scylla: Unknown option commitlog_compression
self._segment_size_test(32, compressed=True)
def test_small_compressed_segment_size(self):
"""Test a small compressed commitlog_segment_size_in_mb (5MB)"""
# Scylla: Unknown option commitlog_compression
self._segment_size_test(5, compressed=True)
def prepare_cluster_with_ks_cf(self, jvm_args=None):
node1 = self.node1
jvm_args = jvm_args or []
if not "--commitlog-sync-period-in-ms" in jvm_args:
commit_log_sync_period = 10000
if isinstance(self.cluster, ScyllaCluster) and self.cluster.scylla_mode == "debug":
commit_log_sync_period *= 3
jvm_args.extend(["--commitlog-sync-period-in-ms", str(commit_log_sync_period)])
self.cluster.start(jvm_args=jvm_args)
logger.debug("Create table")
session = self.patient_cql_connection(node1)
create_ks(session, "Test", 1)
session.execute(
"""
CREATE TABLE cf (
pk1 INT,
ck1 INT,
v1 int,
PRIMARY KEY(pk1, ck1)
);
"""
)
return session, node1
def test_periodic_commitlog(self):
"""
Test periodic mode of commitlog flushing
'periodic' mode is where all commitlog writes are ready the moment they are stored in
a memory buffer and the memory buffer is flushed to a storage periodically.
"""
session, node1 = self.prepare_cluster_with_ks_cf()
logger.debug("Insert 100 rows")
for i in range(100):
session.execute(f"INSERT INTO Test.cf (pk1, ck1, v1) VALUES ({i}, {i}, {i})")
assert_row_count(session=session, table_name="Test.cf", expected=100)
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Start node")
node1.start(wait_other_notice=True, wait_for_binary_proto=True)
logger.debug("Make query and ensure data is present as expected")
session = self.patient_cql_connection(node1)
assert_row_count_in_select_less(session=session, query="select * from Test.cf", max_rows_expected=100)
def _test_total_space_limit_of_commitlog( # noqa: PLR0915
self,
commitlog_segment_size_in_mb,
commitlog_total_space_in_mb,
):
"""
`commitlog_reuse_segments` is enabled by default, reusing commitlog
segments without deleting and recreating will improve the performance.
`commitlog_segment_size_in_mb` is the size limit of single commitlog.
The space usage of commitlog is limited by `commitlog_total_space_in_mb`,
after flusing segment buffers or sgement is recycled, nothing is filled
but the diskspace isn't released.
This case testes with different size of `commitlog_total_space_in_mb`
and `commitlog_segment_size_in_mb`, fills data by cs workload for
reaching the `commitlog_disk_usage_threshold`, then flushing will be
triggered automatically.
When the cs workload writes data, commitlog disk usage will be limited
by `commitlog_total_space_in_mb`. Currently scylla allows to have one
more segment, so the real limit is increased to be a multiple of
segment size.
Related Scylla PR: https://github.com/scylladb/scylla/pull/6368
"""
node1 = self.node1
log_level = "trace" if commitlog_total_space_in_mb == -1 else "debug"
total_space_limit = commitlog_total_space_in_mb
if commitlog_segment_size_in_mb == -1:
commitlog_segment_size_in_mb = 32 # default
logger.debug(f"commitlog_segment_size_in_mb={commitlog_segment_size_in_mb}")
logger.debug(f"commitlog_total_space_in_mb={commitlog_total_space_in_mb}")
logger.debug(f"total_space_limit={total_space_limit}")
# Calculate a reasonably low commitlog_disk_usage_threshold, flushing will be triggered
# at this point. We want something that is within reachable range
commitlog_disk_usage_threshold = int(total_space_limit / 2)
opts = {"commitlog_segment_size_in_mb": commitlog_segment_size_in_mb, "commitlog_total_space_in_mb": commitlog_total_space_in_mb, "commitlog_reuse_segments": True, "commitlog_use_hard_size_limit": True}
if total_space_limit != -1:
opts.update({"commitlog_flush_threshold_in_mb": commitlog_disk_usage_threshold})
# By default periodic commitlog_sync mode will be used, so we have chance
# to accumulate more commitlogs.
commit_log_sync_period = 10000
if isinstance(self.cluster, ScyllaCluster) and self.cluster.scylla_mode == "debug":
commit_log_sync_period *= 3
node1.set_configuration_options(values=opts)
logger.debug(f"Commitlog size before start: {self._get_commitlog_size()}M")
memory = max(0, commitlog_total_space_in_mb) + 512
logger.debug(f"Start cluster with '--smp 1' '--memory {memory}M' ...")
args = ["--smp", "1", "--memory", f"{memory}M", "--logger-log-level", f"commitlog={log_level}"]
self.cluster.start(jvm_args=args, wait_for_binary_proto=True)
if commitlog_total_space_in_mb == -1:
matches = node1.grep_log(r"Commitlog .* maximum disk size: (\d+) MB / cpu \(\d+ cpus\)")
total_space_limit = int(matches[0][1].group(1))
commitlog_disk_usage_threshold = int(total_space_limit / 2)
logger.debug(f"total_space_limit={total_space_limit}")
logger.debug(f"commitlog_disk_usage_threshold={commitlog_disk_usage_threshold}")
logger.debug("Create test keyspace and table")
session = self.patient_cql_connection(node1)
create_ks(session, "ks", 1)
create_cf(session, "cf", columns={"c1": "text", "c2": "text"})
# Fill data by CS workload, a set of commitlogs will be generated for testing the space limit
cs_n = max(int(total_space_limit * 200), 20000)
cs_cmd = ["write", f"n={cs_n}", "-rate", "threads=25", "-col", "size=FIXED(1000)", "-log", "interval=100"]
logger.debug(f"Starting CS workload: {cs_cmd}")
executor = ThreadPoolExecutor(max_workers=1)
start = time.time()
cs_task = executor.submit(lambda: node1.stress(cs_cmd))
# Insert a few data by insert_c1c2(), the data will be verifed in the end
total_size = 0
unit_size = commitlog_segment_size_in_mb
reach_threshold_cases = []
# Scylla allows to create one more commitlog file out of the space limit
# and commitlog segments may be go over commitlog_segment_size_in_mb in 1MB as well.
actual_space_limit = (total_space_limit // commitlog_segment_size_in_mb + 1) * (commitlog_segment_size_in_mb + 1)
def check_commitlog_size(allow_errors: bool):
dir_size, stdout = self._get_commitlog_size(include_stdout=True, allow_errors=allow_errors)
if dir_size > actual_space_limit and not allow_errors:
logger.debug(f"Commitlog file sizes in MB:\n{stdout}")
assert dir_size <= actual_space_limit, f"Out of total space limit\n"
return dir_size
while not cs_task.done() and not reach_threshold_cases:
# Insert a few data
insert_c1c2(session, keys=range(total_size, total_size + unit_size))
total_size += unit_size
dir_size = check_commitlog_size(allow_errors=True)
if dir_size > commitlog_disk_usage_threshold:
reach_threshold_cases.append(dir_size)
if len(reach_threshold_cases) > 0:
logger.debug(
f"Current commitlog size: {dir_size}M "
f"({round(100 * dir_size / total_space_limit, 2)})%, "
f"Well-used cases: {len(reach_threshold_cases)} / 5, "
f"Time passed: {round(time.time() - start, 2)}s, "
f"commitlog_disk_usage_threshold: {commitlog_disk_usage_threshold}M"
)
# Have enough commitlog to trigger flushing
if len(reach_threshold_cases) >= 5:
logger.debug("Waiting the cs workload to be done ...")
logger.debug(cs_task.result())
break
logger.debug(cs_task.result())
node1.stop(gently=False)
dir_size = check_commitlog_size(allow_errors=False)
logger.debug(f"Final commitlog size: [{self._get_commitlog_path()}] {dir_size}M")
# set commitlog config back to default
node1.set_configuration_options(values={"commitlog_segment_size_in_mb": 32, "commitlog_total_space_in_mb": -1, "commitlog_reuse_segments": True, "commitlog_use_hard_size_limit": True})
logger.debug("Restart node1 to enable default commitlog configure")
node1.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node1)
assert_row_count_in_select_less(session=session, query="select * from ks.cf", max_rows_expected=total_size + 1)
logger.debug("Test with more data after rollback to default config")
insert_c1c2(session, n=int(total_size * 1.5))
assert_row_count(session=session, table_name="ks.cf", expected=int(total_size * 1.5))
def test_total_space_limit_of_commitlog_with_memory_based_limit(self):
"""
Test with auto-sized commitlog files, and total space limit (based on available memory)
"""
self._test_total_space_limit_of_commitlog(commitlog_segment_size_in_mb=-1, commitlog_total_space_in_mb=-1)
def test_total_space_limit_of_commitlog_with_small_limit(self):
"""
Test with 5M commitlog files, total space limit is 30M
"""
self._test_total_space_limit_of_commitlog(commitlog_segment_size_in_mb=5, commitlog_total_space_in_mb=30)
def test_alter_keyspace_durable_writes_false(self):
"""
Test 'CREATE KEYSPACE ... WITH durable_writes = false;'
The commitlog should not be used when durable_writes is false.
"""
node1 = self.node1
node1.set_configuration_options(batch_commitlog=True)
node1.start()
session = self.patient_cql_connection(node1)
logger.debug("Create keyspace")
create_ks(session, "dw", 1)
logger.debug("Create table")
session.execute("CREATE TABLE dw.cf(id int PRIMARY KEY)")
logger.debug("Set durable_writes")
session.execute("ALTER KEYSPACE dw WITH durable_writes = true")
logger.debug("Insert data")
session.execute("INSERT INTO dw.cf(id) VALUES (1);")
session.execute("INSERT INTO dw.cf(id) VALUES (2);")
logger.debug("Unset durable_writes")
session.execute("ALTER KEYSPACE dw WITH durable_writes = false")
logger.debug("Insert data")
session.execute("INSERT INTO dw.cf(id) VALUES (3);")
session.execute("INSERT INTO dw.cf(id) VALUES (4);")
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Start node again")
node1.start()
session = self.patient_cql_connection(node1)
assert_all(session=session, query="SELECT * FROM dw.cf", expected=[[1], [2]], ignore_order=True)
def test_alter_keyspace_durable_writes_true(self):
"""
Test 'ALTER KEYSPACE ... WITH durable_writes = true;'
After changing durable_writes from false to true, following inserts should
become persistent immediately.
"""
node1 = self.node1
node1.set_configuration_options(batch_commitlog=True)
node1.start()
session = self.patient_cql_connection(node1)
logger.debug("Create keyspace")
create_ks(session, "dw", 1)
logger.debug("Create table")
session.execute("CREATE TABLE dw.cf(id int PRIMARY KEY);")
logger.debug("Unset durable_writes")
session.execute("ALTER KEYSPACE dw WITH durable_writes = false;")
logger.debug("Insert data")
session.execute("INSERT INTO dw.cf(id) VALUES (1);")
session.execute("INSERT INTO dw.cf(id) VALUES (2);")
logger.debug("Set durable_writes")
session.execute("ALTER KEYSPACE dw WITH durable_writes = true;")
logger.debug("Insert data")
session.execute("INSERT INTO dw.cf(id) VALUES (3);")
session.execute("INSERT INTO dw.cf(id) VALUES (4);")
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Start node again")
node1.start()
session = self.patient_cql_connection(node1)
assert_all(session=session, query="SELECT * FROM dw.cf;", expected=[[3], [4]], ignore_order=True)
def prepare_big_mutation(self):
self.prepare(batch_commitlog=True)
self.node1: ScyllaNode
session: Session = self.patient_cql_connection(self.node1)
# Create keyspace
self.ks = "test_ks"
create_ks(session, self.ks, rf=1)
# the mutation limit is about half of the segment size (default 32MB)
# so 20 columns of 1MB each is guaranteed to be above it
self.big_columns = 20
# add one more column as the primary key is not allowed to be this big
columns = self.big_columns + 1
self.values = [1, *[f"value_{i:05d}_{'x' * 1024 * 1024}" for i in range(1, columns)]]
# Create table
self.cf = "big_commitlog"
create_statement = f"CREATE TABLE {self.cf} (c00000 int PRIMARY KEY, {', '.join(f'c{i:05d} varchar' for i in range(1, columns))});"
session.execute(create_statement)
self.insert_statement = session.prepare(f"INSERT INTO {self.ks}.{self.cf} ({', '.join(f'c{i:05d}' for i in range(columns))}) VALUES({', '.join('?' for i in range(columns))});")
def test_one_big_mutation_replay_on_startup(self):
"""
Test commit log replay with a single big (larger than mutation limit) commitlog mutation
The node is forcefully shut down after the transaction is committed
After restarting the node, check for relevant logs and whether the data exists
"""
self.prepare_big_mutation()
node1 = self.node1
session: Session = self.patient_cql_connection(node1)
session.execute(self.insert_statement, self.values)
# check data
in_table = rows_to_list(session.execute(f"SELECT * FROM {self.ks}.{self.cf};"))
assert in_table == [self.values]
# stop node forcibly so no memtable flush happens
node1.stop(gently=False)
# restart the node
node1.start(wait_for_binary_proto=True)
session: Session = self.patient_cql_connection(node1)
# check the data and the logs
in_table = rows_to_list(session.execute(f"SELECT * FROM {self.ks}.{self.cf};"))
assert node1.grep_log(f"large_data - Writing large row {self.ks}/{self.cf}:")
assert in_table == [self.values]
def test_one_big_mutation_corrupted_on_startup(self):
"""
Test commit log replay with a single big (larger than mutation limit) commitlog mutation
While the node is shut down, corrupt the commitlog
After restarting the node, check for relevant logs and whether the data exists
"""
self.prepare_big_mutation()
node1 = self.node1
session: Session = self.patient_cql_connection(node1)
session.execute(self.insert_statement, self.values)
# check data
in_table = rows_to_list(session.execute(f"SELECT * FROM {self.ks}.{self.cf};"))
assert in_table == [self.values]
# stop node forcibly so no memtable flush happens
node1.stop(gently=False)
# corrupt the commitlogs
def corrupt_file(file_path: str):
print(f"Writing junk into file header: {file_path}")
with open(file_path, "r+b") as f:
f.seek(18)
f.write(os.urandom(474))
for commitlog in self._get_commitlog_files():
corrupt_file(commitlog)
# restart the node
node1.start(wait_for_binary_proto=True)
session: Session = self.patient_cql_connection(node1)
# check the data and the logs
in_table = rows_to_list(session.execute(f"SELECT * FROM {self.ks}.{self.cf};"))
assert not node1.grep_log(f"large_data - Writing large row {self.ks}/{self.cf}")
assert node1.grep_log("commitlog_replayer - Corrupted file")
assert in_table == []
def test_one_big_mutation_rollback_on_startup(self):
"""
Test commit log replay with a single big (larger than mutation limit) commitlog mutation
The node is shut down while the transaction is in progress
After restarting the node, check for relevant logs and whether the data exists
"""
self.prepare_big_mutation()
node1 = self.node1
session: Session = self.patient_cql_connection(node1)
# execute_async so it can be interrupted
session.execute_async(self.insert_statement, self.values)
# stop node forcibly so no memtable flush happens
node1.stop(gently=False)
if self._get_commitlog_size()[0] > self.big_columns:
pytest.skip("Skipping rollback scenario as data was written to commitlog before stopping node")
# restart the node
node1.start(wait_for_binary_proto=True)
session: Session = self.patient_cql_connection(node1)
# check the data and the logs
in_table = rows_to_list(session.execute(f"SELECT * FROM {self.ks}.{self.cf};"))
assert not node1.grep_log(f"large_data - Writing large row {self.ks}/{self.cf}")
assert in_table == []
def test_pinned_cl_segment_doesnt_resurrect_data(self):
"""
The tested scenario is as follows:
* Two tables, ks1.tbl1 and ks2.tbl2.
* A commitlog segment contains writes for both tables.
* ks1.tbl1 has very low write traffix, memtables are very rarely
flushed, this results in ks1.tbl1 "pinning" any commitlog segment
which has writes, that are still in the memtable.
* ks2.tbl2 deletes the data, some of which is still in the segment
pinned by ks1.tbl1.
* ks2.tbl2 flushes the memtable, data gets to sstables, commitlog
segments containing writes for both tables are still pinned.
* ks2.tbl2 compacts and purges away both the data and the tombstone.
* The node has an unclean restart, the pinned segments are replayed and
data is resurrected, as the tombstone is already purged.
This test uses gc_grace_seconds=0 to make the test fast.
See https://github.com/scylladb/scylladb/issues/14870
"""
node1 = self.node1
node1.set_configuration_options(batch_commitlog=True, values={"commitlog_segment_size_in_mb": 1, "enable_cache": False})
node1.start()
session = self.patient_cql_connection(node1)
logger.debug("Create keyspace")
create_ks(session, "ks1", 1)
create_ks(session, "ks2", 1)
logger.debug("Create table")
session.execute("CREATE TABLE ks1.tbl1 (pk int, ck int, PRIMARY KEY (pk, ck))")
session.execute("CREATE TABLE ks2.tbl2 (pk int, ck int, v text, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds = 0")
def get_segments_num():
metrics_res = get_node_metrics(node_ip=self.cluster.get_node_ip(1), metrics=["scylla_commitlog_segments"])
return int(metrics_res["scylla_commitlog_segments"])
def get_cl_segments():
cl_path = self._get_commitlog_path()
return {os.path.basename(s) for s in glob.glob(os.path.join(cl_path, "CommitLog-*"))}
segments_before_writes = get_segments_num()
segments_after_writes = segments_before_writes
logger.debug(f"Have {segments_after_writes} segments before writing data")
insert_id_tbl1 = session.prepare("INSERT INTO ks1.tbl1 (pk, ck) VALUES (?, ?)")
insert_id_tbl2 = session.prepare("INSERT INTO ks2.tbl2 (pk, ck, v) VALUES (?, ?, ?)")
pk1 = 0
pk2 = 1
ck = 0
value = "v" * 1024
logger.debug(f"Filling segment with mixed data from ks1.tbl1 and ks2.tbl2")
# Ensure at least one segment with writes from both tables
while segments_after_writes < segments_before_writes + 1:
session.execute(insert_id_tbl1, (pk1, ck))
session.execute(insert_id_tbl2, (pk1, ck, value))
ck = ck + 1
segments_after_writes = get_segments_num()
logger.debug(f"Filling segment(s) with ks2.tbl2 only")
while segments_after_writes < segments_before_writes + 3:
session.execute(insert_id_tbl2, (pk1, ck, value))
ck = ck + 1
segments_after_writes = get_segments_num()
session.execute(f"DELETE FROM ks2.tbl2 WHERE pk = {pk1}")
# We need to make sure the segment in which the above delete landed in
# is full, otherwise the memtable flush will not be able to destroy it.
logger.debug(f"Filling another segment with ks2.tbl2 (pk={pk2})")
while segments_after_writes < segments_before_writes + 4:
session.execute(insert_id_tbl2, (pk2, ck, value))
ck = ck + 1
segments_after_writes = get_segments_num()
segments_before = get_cl_segments()
logger.debug(f"Wrote {ck} rows, now have {segments_after_writes} segments ({segments_before}")
logger.debug("Flush ks2.tbl2")
node1.flush(ks="ks2", table="tbl2")
node1.compact(keyspace="ks2", tables=["tbl2"])
segments_after = get_cl_segments()
logger.debug(f"After flush+compact, now have {get_segments_num()} segments ({segments_after})")
assert len(list(session.execute(f"SELECT * FROM ks2.tbl2 WHERE pk = {pk1}"))) == 0
# Need to ensure at least one segment was freed.
# We assume the last segment, containing the tombstone, was among the freed ones.
removed_segments = segments_before - segments_after
assert len(removed_segments) > 0
logger.debug(f"The following segments were removed: {removed_segments}")
logger.debug("Kill + restart the node")
node1.stop(gently=False)
node1.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node1)
# CL replay should not have resurrected the data
assert len(list(session.execute(f"SELECT * FROM ks2.tbl2 WHERE pk = {pk1}"))) == 0