From b50272d663f4643972ca925441684dab5803b512 Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Wed, 23 Jul 2025 21:47:41 +0200 Subject: [PATCH 1/2] truncate: change check for write during truncate into a log warning TRUNCATE TABLE performs a memtable flush and then discards the sstables of the table being truncated. It collects the highest replay position for both of these. When the highest replay position of the discarded sstables is higher than the highest replay position of the flushed memtable, that means that we have had writes during truncate which have been flushed to disk independently of the truncate process. We check for this and trigger an on_internal_error() which throws an exception, informing the user that writing data concurrently with TRUNCATE TABLE is not advised. The problem with this is that truncate is also called from DROP KEYSPACE and DROP TABLE. These are raft operations and exceptions thrown by them are caught by the (...) exception handler in the raft applier fiber, which then exits leaving the node without the ability to execute subsequent raft commands. This commit changes the on_internal_error() into a warning log entry. It also outputs to keyspace/table names, the truncated_at timepoint, the offending replay positions which caused the check to fail. Fixes: #25173 Fixes: #25013 (cherry picked from commit 268ec72dc9c6427fb3cf962bc602fd0fe4a870ea) --- replica/database.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/replica/database.cc b/replica/database.cc index 5945150864..88e0f79e9c 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2671,7 +2671,8 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, cons // since we don't want to leave behind data on disk with RP lower than the one we set // in the truncation table. if (st.did_flush && rp != db::replay_position() && st.low_mark < rp) { - on_internal_error(dblog, "Data written after truncation time was incorrectly truncated. Truncate is known to not work well with concurrent writes. Retry!"); + dblog.warn("Data in table {}.{} is written after truncation time and was incorrectly truncated. truncated_at: {} low_mark: {} rp: {}", + cf.schema()->ks_name(), cf.schema()->cf_name(), truncated_at, st.low_mark, rp); } if (rp == db::replay_position()) { // If this shard had no mutations, st.low_mark will be an empty, default constructed From 932223414b1f1206495fd552a5fccf4d9b3eb7aa Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 29 Jul 2025 17:14:13 +0200 Subject: [PATCH 2/2] truncate: add test for truncate with concurrent writes test_validate_truncate_with_concurrent_writes checks if truncate deletes all the data written before the truncate starts, and does not delete any data after truncate completes. (cherry picked from commit 33488ba943b696d7a1e148e195027f98d0b34abe) --- .../test_truncate_concurrent_writes.py | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 test/cluster/test_truncate_concurrent_writes.py diff --git a/test/cluster/test_truncate_concurrent_writes.py b/test/cluster/test_truncate_concurrent_writes.py new file mode 100644 index 0000000000..4080283b73 --- /dev/null +++ b/test/cluster/test_truncate_concurrent_writes.py @@ -0,0 +1,139 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + +from test.pylib.manager_client import ManagerClient +from test.pylib.rest_client import inject_error_one_shot, read_barrier +from test.cluster.conftest import skip_mode +from test.cluster.util import create_new_test_keyspace +from cassandra.query import SimpleStatement, ConsistencyLevel + +import pytest +import asyncio +import logging +import time +import random +import glob +import os + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_validate_truncate_with_concurrent_writes(manager: ManagerClient): + + # This test validates that all the data before a truncate started has been deleted, + # and that none of the data after truncate ended has been deleted + + cmdline = [] + config = {} + + servers = await manager.servers_add(3, config=config, cmdline=cmdline, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r2"}, + {"dc": "dc1", "rack": "r3"}, + ]) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, val int, PRIMARY KEY(pk, ck));") + + trunc_started_event = asyncio.Event() + trunc_completed_event = asyncio.Event() + writer_halfpoint = asyncio.Event() + + cks_to_insert_before_trunc = 5000 + cks_to_insert_after_trunc = 5000 + writer_results = {} + async def writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event): + stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, ck, val) VALUES (?, ?, ?);") + # FIXME: inserting data with CL=ONE can result in writes reaching some replicas after truncate + # completed, even though the write was acknowledged by the coordinator before truncate started + # see: https://github.com/scylladb/scylladb/issues/25336 + stmt.consistency_level = ConsistencyLevel.ALL + last_ck_before_trunc = None + first_ck_after_trunc = None + cks_inserted_after_trunc = 0 + ck = 0 + while cks_inserted_after_trunc < cks_to_insert_after_trunc: + trunc_completed = trunc_completed_event.is_set() + await cql.run_async(stmt, [pk, ck, ck]) + trunc_started = trunc_started_event.is_set() + + if not trunc_started: + last_ck_before_trunc = ck + if trunc_completed: + if first_ck_after_trunc is None: + first_ck_after_trunc = ck + cks_inserted_after_trunc += 1 + + ck += 1 + + if ck == cks_to_insert_before_trunc: + writer_halfpoint.set() + + writer_results[pk] = (last_ck_before_trunc, first_ck_after_trunc, ck) + logger.info(f'write of pk: {pk} ended; records written: {ck} {last_ck_before_trunc=} {first_ck_after_trunc=}') + + async def do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event): + logger.info(f'truncate waiting') + await writer_halfpoint.wait() + logger.info(f'truncate starting') + trunc_started_event.set() + await cql.run_async(f"TRUNCATE TABLE {ks}.test;") + trunc_completed_event.set() + logger.info(f'truncate ended') + + # start the writers and truncate + tasks = [] + for pk in range(50): + tasks.append(asyncio.create_task(writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event))) + + tasks.append(asyncio.create_task(do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event))) + + await asyncio.gather(*tasks) + + # assert truncate deleted everything before truncate started + # and did not delete anything after truncate ended. + # + # don't fail the test on the first wrong result, + # instead log all data and errors before failing the test + remaining_ck_per_pk = {} + no_failures = True + for pk, cks in writer_results.items(): + last_ck_before_trunc = cks[0] + first_ck_after_trunc = cks[1] + num_cks_written = cks[2] + + stmt = SimpleStatement(f'SELECT ck FROM {ks}.test WHERE pk = {pk} LIMIT 1;') + stmt.consistency_level = ConsistencyLevel.ALL + res = await cql.run_async(stmt) + min_ck = res[0].ck + + stmt = SimpleStatement(f'SELECT count(*) as cnt FROM {ks}.test WHERE pk = {pk} AND ck >= {first_ck_after_trunc};') + stmt.consistency_level = ConsistencyLevel.ALL + res = await cql.run_async(stmt) + count_after_trunc = res[0].cnt + + logger.info(f'{pk=} {min_ck=} {num_cks_written=} {count_after_trunc=} {last_ck_before_trunc=} {first_ck_after_trunc=}') + + if min_ck <= last_ck_before_trunc: + logger.error(f'ck: {min_ck} for pk: {pk} should have been deleted but was not; {last_ck_before_trunc=}') + no_failures = False + + if min_ck > first_ck_after_trunc: + logger.error(f'ck: {first_ck_after_trunc} for pk: {pk} should not have been deleted but was; minimal ck remaining: {min_ck}') + no_failures = False + + if first_ck_after_trunc + count_after_trunc < num_cks_written: + missing_records = num_cks_written - first_ck_after_trunc - count_after_trunc + logger.error(f'{missing_records} records that were written after truncate ended were deleted; {pk=}') + no_failures = False + + if first_ck_after_trunc + count_after_trunc > num_cks_written: + extra_records = first_ck_after_trunc + count_after_trunc - num_cks_written + logger.error(f'{extra_records} too many records found after {first_ck_after_trunc=}: expected only {num_cks_written - first_ck_after_trunc}; {pk=}') + no_failures = False + + assert no_failures, 'Errors were found, please check error entries in the test log for more details' \ No newline at end of file