Files
scylladb/test/cluster/test_truncate_concurrent_writes.py
Ferenc Szili 33488ba943 truncate: add test for truncate with concurrent writes
test_validate_truncate_with_concurrent_writes checks if truncate deletes
all the data written before the truncate starts, and does not delete any
data after truncate completes.
2025-08-05 13:54:14 +02:00

139 lines
5.7 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.cluster.conftest import skip_mode
from test.cluster.util import create_new_test_keyspace
from cassandra.query import SimpleStatement, ConsistencyLevel
import pytest
import asyncio
import logging
import time
import random
import glob
import os
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_validate_truncate_with_concurrent_writes(manager: ManagerClient):
# This test validates that all the data before a truncate started has been deleted,
# and that none of the data after truncate ended has been deleted
cmdline = []
config = {}
servers = await manager.servers_add(3, config=config, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
])
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, val int, PRIMARY KEY(pk, ck));")
trunc_started_event = asyncio.Event()
trunc_completed_event = asyncio.Event()
writer_halfpoint = asyncio.Event()
cks_to_insert_before_trunc = 5000
cks_to_insert_after_trunc = 5000
writer_results = {}
async def writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event):
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, ck, val) VALUES (?, ?, ?);")
# FIXME: inserting data with CL=ONE can result in writes reaching some replicas after truncate
# completed, even though the write was acknowledged by the coordinator before truncate started
# see: https://github.com/scylladb/scylladb/issues/25336
stmt.consistency_level = ConsistencyLevel.ALL
last_ck_before_trunc = None
first_ck_after_trunc = None
cks_inserted_after_trunc = 0
ck = 0
while cks_inserted_after_trunc < cks_to_insert_after_trunc:
trunc_completed = trunc_completed_event.is_set()
await cql.run_async(stmt, [pk, ck, ck])
trunc_started = trunc_started_event.is_set()
if not trunc_started:
last_ck_before_trunc = ck
if trunc_completed:
if first_ck_after_trunc is None:
first_ck_after_trunc = ck
cks_inserted_after_trunc += 1
ck += 1
if ck == cks_to_insert_before_trunc:
writer_halfpoint.set()
writer_results[pk] = (last_ck_before_trunc, first_ck_after_trunc, ck)
logger.info(f'write of pk: {pk} ended; records written: {ck} {last_ck_before_trunc=} {first_ck_after_trunc=}')
async def do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event):
logger.info(f'truncate waiting')
await writer_halfpoint.wait()
logger.info(f'truncate starting')
trunc_started_event.set()
await cql.run_async(f"TRUNCATE TABLE {ks}.test;")
trunc_completed_event.set()
logger.info(f'truncate ended')
# start the writers and truncate
tasks = []
for pk in range(50):
tasks.append(asyncio.create_task(writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event)))
tasks.append(asyncio.create_task(do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event)))
await asyncio.gather(*tasks)
# assert truncate deleted everything before truncate started
# and did not delete anything after truncate ended.
#
# don't fail the test on the first wrong result,
# instead log all data and errors before failing the test
remaining_ck_per_pk = {}
no_failures = True
for pk, cks in writer_results.items():
last_ck_before_trunc = cks[0]
first_ck_after_trunc = cks[1]
num_cks_written = cks[2]
stmt = SimpleStatement(f'SELECT ck FROM {ks}.test WHERE pk = {pk} LIMIT 1;')
stmt.consistency_level = ConsistencyLevel.ALL
res = await cql.run_async(stmt)
min_ck = res[0].ck
stmt = SimpleStatement(f'SELECT count(*) as cnt FROM {ks}.test WHERE pk = {pk} AND ck >= {first_ck_after_trunc};')
stmt.consistency_level = ConsistencyLevel.ALL
res = await cql.run_async(stmt)
count_after_trunc = res[0].cnt
logger.info(f'{pk=} {min_ck=} {num_cks_written=} {count_after_trunc=} {last_ck_before_trunc=} {first_ck_after_trunc=}')
if min_ck <= last_ck_before_trunc:
logger.error(f'ck: {min_ck} for pk: {pk} should have been deleted but was not; {last_ck_before_trunc=}')
no_failures = False
if min_ck > first_ck_after_trunc:
logger.error(f'ck: {first_ck_after_trunc} for pk: {pk} should not have been deleted but was; minimal ck remaining: {min_ck}')
no_failures = False
if first_ck_after_trunc + count_after_trunc < num_cks_written:
missing_records = num_cks_written - first_ck_after_trunc - count_after_trunc
logger.error(f'{missing_records} records that were written after truncate ended were deleted; {pk=}')
no_failures = False
if first_ck_after_trunc + count_after_trunc > num_cks_written:
extra_records = first_ck_after_trunc + count_after_trunc - num_cks_written
logger.error(f'{extra_records} too many records found after {first_ck_after_trunc=}: expected only {num_cks_written - first_ck_after_trunc}; {pk=}')
no_failures = False
assert no_failures, 'Errors were found, please check error entries in the test log for more details'