test_validate_truncate_with_concurrent_writes checks if truncate deletes all the data written before the truncate starts, and does not delete any data after truncate completes.
139 lines
5.7 KiB
Python
139 lines
5.7 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot, read_barrier
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import create_new_test_keyspace
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import random
|
|
import glob
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_validate_truncate_with_concurrent_writes(manager: ManagerClient):
|
|
|
|
# This test validates that all the data before a truncate started has been deleted,
|
|
# and that none of the data after truncate ended has been deleted
|
|
|
|
cmdline = []
|
|
config = {}
|
|
|
|
servers = await manager.servers_add(3, config=config, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r2"},
|
|
{"dc": "dc1", "rack": "r3"},
|
|
])
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, val int, PRIMARY KEY(pk, ck));")
|
|
|
|
trunc_started_event = asyncio.Event()
|
|
trunc_completed_event = asyncio.Event()
|
|
writer_halfpoint = asyncio.Event()
|
|
|
|
cks_to_insert_before_trunc = 5000
|
|
cks_to_insert_after_trunc = 5000
|
|
writer_results = {}
|
|
async def writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event):
|
|
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, ck, val) VALUES (?, ?, ?);")
|
|
# FIXME: inserting data with CL=ONE can result in writes reaching some replicas after truncate
|
|
# completed, even though the write was acknowledged by the coordinator before truncate started
|
|
# see: https://github.com/scylladb/scylladb/issues/25336
|
|
stmt.consistency_level = ConsistencyLevel.ALL
|
|
last_ck_before_trunc = None
|
|
first_ck_after_trunc = None
|
|
cks_inserted_after_trunc = 0
|
|
ck = 0
|
|
while cks_inserted_after_trunc < cks_to_insert_after_trunc:
|
|
trunc_completed = trunc_completed_event.is_set()
|
|
await cql.run_async(stmt, [pk, ck, ck])
|
|
trunc_started = trunc_started_event.is_set()
|
|
|
|
if not trunc_started:
|
|
last_ck_before_trunc = ck
|
|
if trunc_completed:
|
|
if first_ck_after_trunc is None:
|
|
first_ck_after_trunc = ck
|
|
cks_inserted_after_trunc += 1
|
|
|
|
ck += 1
|
|
|
|
if ck == cks_to_insert_before_trunc:
|
|
writer_halfpoint.set()
|
|
|
|
writer_results[pk] = (last_ck_before_trunc, first_ck_after_trunc, ck)
|
|
logger.info(f'write of pk: {pk} ended; records written: {ck} {last_ck_before_trunc=} {first_ck_after_trunc=}')
|
|
|
|
async def do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event):
|
|
logger.info(f'truncate waiting')
|
|
await writer_halfpoint.wait()
|
|
logger.info(f'truncate starting')
|
|
trunc_started_event.set()
|
|
await cql.run_async(f"TRUNCATE TABLE {ks}.test;")
|
|
trunc_completed_event.set()
|
|
logger.info(f'truncate ended')
|
|
|
|
# start the writers and truncate
|
|
tasks = []
|
|
for pk in range(50):
|
|
tasks.append(asyncio.create_task(writer(pk, writer_halfpoint, trunc_started_event, trunc_completed_event)))
|
|
|
|
tasks.append(asyncio.create_task(do_truncate(writer_halfpoint, trunc_started_event, trunc_completed_event)))
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
# assert truncate deleted everything before truncate started
|
|
# and did not delete anything after truncate ended.
|
|
#
|
|
# don't fail the test on the first wrong result,
|
|
# instead log all data and errors before failing the test
|
|
remaining_ck_per_pk = {}
|
|
no_failures = True
|
|
for pk, cks in writer_results.items():
|
|
last_ck_before_trunc = cks[0]
|
|
first_ck_after_trunc = cks[1]
|
|
num_cks_written = cks[2]
|
|
|
|
stmt = SimpleStatement(f'SELECT ck FROM {ks}.test WHERE pk = {pk} LIMIT 1;')
|
|
stmt.consistency_level = ConsistencyLevel.ALL
|
|
res = await cql.run_async(stmt)
|
|
min_ck = res[0].ck
|
|
|
|
stmt = SimpleStatement(f'SELECT count(*) as cnt FROM {ks}.test WHERE pk = {pk} AND ck >= {first_ck_after_trunc};')
|
|
stmt.consistency_level = ConsistencyLevel.ALL
|
|
res = await cql.run_async(stmt)
|
|
count_after_trunc = res[0].cnt
|
|
|
|
logger.info(f'{pk=} {min_ck=} {num_cks_written=} {count_after_trunc=} {last_ck_before_trunc=} {first_ck_after_trunc=}')
|
|
|
|
if min_ck <= last_ck_before_trunc:
|
|
logger.error(f'ck: {min_ck} for pk: {pk} should have been deleted but was not; {last_ck_before_trunc=}')
|
|
no_failures = False
|
|
|
|
if min_ck > first_ck_after_trunc:
|
|
logger.error(f'ck: {first_ck_after_trunc} for pk: {pk} should not have been deleted but was; minimal ck remaining: {min_ck}')
|
|
no_failures = False
|
|
|
|
if first_ck_after_trunc + count_after_trunc < num_cks_written:
|
|
missing_records = num_cks_written - first_ck_after_trunc - count_after_trunc
|
|
logger.error(f'{missing_records} records that were written after truncate ended were deleted; {pk=}')
|
|
no_failures = False
|
|
|
|
if first_ck_after_trunc + count_after_trunc > num_cks_written:
|
|
extra_records = first_ck_after_trunc + count_after_trunc - num_cks_written
|
|
logger.error(f'{extra_records} too many records found after {first_ck_after_trunc=}: expected only {num_cks_written - first_ck_after_trunc}; {pk=}')
|
|
no_failures = False
|
|
|
|
assert no_failures, 'Errors were found, please check error entries in the test log for more details' |