diff --git a/replica/database.cc b/replica/database.cc index 5945150864..88e0f79e9c 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2671,7 +2671,8 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, cons // since we don't want to leave behind data on disk with RP lower than the one we set // in the truncation table. if (st.did_flush && rp != db::replay_position() && st.low_mark < rp) { - on_internal_error(dblog, "Data written after truncation time was incorrectly truncated. Truncate is known to not work well with concurrent writes. Retry!"); + dblog.warn("Data in table {}.{} is written after truncation time and was incorrectly truncated. truncated_at: {} low_mark: {} rp: {}", + cf.schema()->ks_name(), cf.schema()->cf_name(), truncated_at, st.low_mark, rp); } if (rp == db::replay_position()) { // If this shard had no mutations, st.low_mark will be an empty, default constructed diff --git a/test/cluster/test_truncate_concurrent_writes.py b/test/cluster/test_truncate_concurrent_writes.py new file mode 100644 index 0000000000..4080283b73 --- /dev/null +++ b/test/cluster/test_truncate_concurrent_writes.py @@ -0,0 +1,139 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + +from test.pylib.manager_client import ManagerClient +from test.pylib.rest_client import inject_error_one_shot, read_barrier +from test.cluster.conftest import skip_mode +from test.cluster.util import create_new_test_keyspace +from cassandra.query import SimpleStatement, ConsistencyLevel + +import pytest +import asyncio +import logging +import time +import random +import glob +import os + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_validate_truncate_with_concurrent_writes(manager: ManagerClient): + + # This test validates that all the data before a truncate started has been deleted, + # and that none of the data after truncate ended has been deleted + + cmdline = [] + config = {} + + servers = await manager.servers_add(3, config=config, cmdline=cmdline, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r2"}, + {"dc": "dc1", "rack": "r3"}, + ]) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, val int, PRIMARY KEY(pk, ck));") + + trunc_started_event = asyncio.Event() + trunc_completed_event = asyncio.Event() + writer_halfpoint = asyncio.Event() + + cks_to_insert_before_trunc = 5000 + cks_to_insert_after_trunc = 5000 + writer_results = {} + async def writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event): + stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, ck, val) VALUES (?, ?, ?);") + # FIXME: inserting data with CL=ONE can result in writes reaching some replicas after truncate + # completed, even though the write was acknowledged by the coordinator before truncate started + # see: https://github.com/scylladb/scylladb/issues/25336 + stmt.consistency_level = ConsistencyLevel.ALL + last_ck_before_trunc = None + first_ck_after_trunc = None + cks_inserted_after_trunc = 0 + ck = 0 + while cks_inserted_after_trunc < cks_to_insert_after_trunc: + trunc_completed = trunc_completed_event.is_set() + await cql.run_async(stmt, [pk, ck, ck]) + trunc_started = trunc_started_event.is_set() + + if not trunc_started: + last_ck_before_trunc = ck + if trunc_completed: + if first_ck_after_trunc is None: + first_ck_after_trunc = ck + cks_inserted_after_trunc += 1 + + ck += 1 + + if ck == cks_to_insert_before_trunc: + writer_halfpoint.set() + + writer_results[pk] = (last_ck_before_trunc, first_ck_after_trunc, ck) + logger.info(f'write of pk: {pk} ended; records written: {ck} {last_ck_before_trunc=} {first_ck_after_trunc=}') + + async def do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event): + logger.info(f'truncate waiting') + await writer_halfpoint.wait() + logger.info(f'truncate starting') + trunc_started_event.set() + await cql.run_async(f"TRUNCATE TABLE {ks}.test;") + trunc_completed_event.set() + logger.info(f'truncate ended') + + # start the writers and truncate + tasks = [] + for pk in range(50): + tasks.append(asyncio.create_task(writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event))) + + tasks.append(asyncio.create_task(do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event))) + + await asyncio.gather(*tasks) + + # assert truncate deleted everything before truncate started + # and did not delete anything after truncate ended. + # + # don't fail the test on the first wrong result, + # instead log all data and errors before failing the test + remaining_ck_per_pk = {} + no_failures = True + for pk, cks in writer_results.items(): + last_ck_before_trunc = cks[0] + first_ck_after_trunc = cks[1] + num_cks_written = cks[2] + + stmt = SimpleStatement(f'SELECT ck FROM {ks}.test WHERE pk = {pk} LIMIT 1;') + stmt.consistency_level = ConsistencyLevel.ALL + res = await cql.run_async(stmt) + min_ck = res[0].ck + + stmt = SimpleStatement(f'SELECT count(*) as cnt FROM {ks}.test WHERE pk = {pk} AND ck >= {first_ck_after_trunc};') + stmt.consistency_level = ConsistencyLevel.ALL + res = await cql.run_async(stmt) + count_after_trunc = res[0].cnt + + logger.info(f'{pk=} {min_ck=} {num_cks_written=} {count_after_trunc=} {last_ck_before_trunc=} {first_ck_after_trunc=}') + + if min_ck <= last_ck_before_trunc: + logger.error(f'ck: {min_ck} for pk: {pk} should have been deleted but was not; {last_ck_before_trunc=}') + no_failures = False + + if min_ck > first_ck_after_trunc: + logger.error(f'ck: {first_ck_after_trunc} for pk: {pk} should not have been deleted but was; minimal ck remaining: {min_ck}') + no_failures = False + + if first_ck_after_trunc + count_after_trunc < num_cks_written: + missing_records = num_cks_written - first_ck_after_trunc - count_after_trunc + logger.error(f'{missing_records} records that were written after truncate ended were deleted; {pk=}') + no_failures = False + + if first_ck_after_trunc + count_after_trunc > num_cks_written: + extra_records = first_ck_after_trunc + count_after_trunc - num_cks_written + logger.error(f'{extra_records} too many records found after {first_ck_after_trunc=}: expected only {num_cks_written - first_ck_after_trunc}; {pk=}') + no_failures = False + + assert no_failures, 'Errors were found, please check error entries in the test log for more details' \ No newline at end of file