Files
scylladb/test/cluster/test_cdc_generation_publishing.py
Michael Litvak 3a1be33143 test_cdc_generation_publishing: fix to read monotonically
The test test_multiple_unpublished_cdc_generations reads the CDC
generation timestamps to verify they are published in the correct order.
To do so it issues reads in a loop with a short sleep period and checks
the differences between consecutive reads, assuming they are monotonic.

However the assumption that the reads are monotonic is not valid,
because the reads are issued with consistency_level=ONE, thus we may read
timestamps {A,B} from some node, then read timestamps {A} from another
node that didn't apply the write of the new timestamp B yet. This will
trigger the assert in the test and fail.

To ensure the reads are monotonic we change the test to use consistency
level ALL for the reads.

Fixes scylladb/scylladb#24262

Closes scylladb/scylladb#24272
2025-05-30 08:35:56 +02:00

124 lines
6.0 KiB
Python

#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.cluster.conftest import skip_mode
from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
import asyncio
import pytest
import logging
import time
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_cdc_generations_are_published(request, manager: ManagerClient):
"""Test that the CDC generation publisher eventually publishes committed CDC generations in the correct order."""
query_gen_timestamps = SimpleStatement(
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
consistency_level = ConsistencyLevel.ONE)
servers = list[ServerInfo]()
gen_timestamps = set[datetime]()
async def new_gen_appeared() -> Optional[set[datetime]]:
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
new_gen_timestamps = {r.time for r in await cql.run_async(query_gen_timestamps)}
assert len(gen_timestamps) + 1 >= len(new_gen_timestamps)
if gen_timestamps < new_gen_timestamps:
gen_timestamps_diff = new_gen_timestamps.difference(gen_timestamps)
# Check that we didn't lose any CDC generation.
assert len(gen_timestamps_diff) == 1
# Check that the new timestamp is not lower than the previous ones.
new_timestamp = next(iter(gen_timestamps_diff))
assert new_timestamp == max(new_gen_timestamps)
return new_gen_timestamps
return None
logger.info("Bootstrapping first node")
servers = [await manager.server_add()]
gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
logger.info(f"Timestamps after bootstrapping first node: {gen_timestamps}")
logger.info("Bootstrapping second node")
servers += [await manager.server_add()]
gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
logger.info(f"Timestamps after bootstrapping second node: {gen_timestamps}")
logger.info("Bootstrapping third node")
servers += [await manager.server_add()]
gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
logger.info(f"Timestamps after bootstrapping third node: {gen_timestamps}")
logger.info(f"Stopping node {servers[0]}")
await manager.server_stop_gracefully(servers[0].server_id)
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
servers = servers[1:]
# Performing multiple cdc_streams_check_and_repair requests concurrently after removing a node should result
# in creating exactly one CDC generation, because cdc_streams_check_and_repair doesn't create a new generation
# if the current one is optimal.
await asyncio.gather(*[manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[i % 2].ip_addr)
for i in range(10)])
gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
logger.info(f"Timestamps after check_and_repair: {gen_timestamps}")
@pytest.mark.asyncio
async def test_multiple_unpublished_cdc_generations(request, manager: ManagerClient):
"""Test that the CDC generation publisher works correctly when there is more than one unpublished CDC generation."""
query_gen_timestamps = SimpleStatement(
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
consistency_level = ConsistencyLevel.ALL)
logger.info("Bootstrapping first node")
servers = [await manager.server_add()]
async with inject_error(manager.api, servers[0].ip_addr, "cdc_generation_publisher_fiber") as handler:
# This injection delays publishing CDC generations committed after bootstrapping the following nodes.
# After all 3 nodes bootstrap, there should be 3 or 4 unpublished CDC generations (4 if publishing the first
# CDC generation hasn't started before injecting an error).
logger.info("Bootstrapping other nodes")
servers += await manager.servers_add(3)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
gen_timestamps = set[datetime]()
async def new_gen_appeared() -> Optional[set[datetime]]:
new_gen_timestamps = {r.time for r in await cql.run_async(query_gen_timestamps)}
assert gen_timestamps <= new_gen_timestamps
if gen_timestamps < new_gen_timestamps:
# Check that we didn't lose any CDC generations.
assert not gen_timestamps.difference(new_gen_timestamps)
# Check that all new timestamps are not lower than the previous ones.
gen_timestamps_diff = new_gen_timestamps.difference(gen_timestamps)
for new_timestamp in gen_timestamps_diff:
assert not gen_timestamps or new_timestamp >= max(gen_timestamps)
return new_gen_timestamps
return None
# Check that all 4 CDC generations are eventually published in the correct order.
for _ in range(4):
await handler.message()
while len(gen_timestamps) < 4:
# We prefer to detect CDC generation publications one-by-one, because it increases our chances of catching
# potential bugs like incorrect order of publications. Therefore, we use very short period - 0.01 s.
gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60, 0.01)
assert len(gen_timestamps) == 4