diff --git a/test/topology/util.py b/test/topology/util.py index 5cdac13ed8..61f173d5fb 100644 --- a/test/topology/util.py +++ b/test/topology/util.py @@ -13,7 +13,7 @@ import functools import operator import time import re -from contextlib import suppress +from contextlib import asynccontextmanager, contextmanager, suppress from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvailable, Session, SimpleStatement # type: ignore # pylint: disable=no-name-in-module from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module @@ -22,7 +22,6 @@ from test.pylib.internal_types import ServerInfo, HostID from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import get_host_api_address, read_barrier from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name -from contextlib import asynccontextmanager from typing import Optional, List @@ -472,22 +471,22 @@ async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_e await wait_for(new_coordinator_elected, deadline=deadline) @asynccontextmanager -async def new_test_keyspace(cql, opts): +async def new_test_keyspace(cql, opts, host=None): """ A utility function for creating a new temporary keyspace with given options. It can be used in a "async with", as: async with new_test_keyspace(cql, '...') as keyspace: """ keyspace = unique_name() - await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts) + await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts, host=host) try: yield keyspace finally: - await cql.run_async("DROP KEYSPACE " + keyspace) + await cql.run_async("DROP KEYSPACE " + keyspace, host=host) previously_used_table_names = [] @asynccontextmanager -async def new_test_table(cql, keyspace, schema, extra=""): +async def new_test_table(cql, keyspace, schema, extra="", host=None, reuse_tables=True): """ A utility function for creating a new temporary table with a given schema. Because Scylla becomes slower when a huge number of uniquely-named tables @@ -498,16 +497,20 @@ async def new_test_table(cql, keyspace, schema, extra=""): async with create_table(cql, test_keyspace, '...') as table: """ global previously_used_table_names - if not previously_used_table_names: - previously_used_table_names.append(unique_name()) - table_name = previously_used_table_names.pop() + if reuse_tables: + if not previously_used_table_names: + previously_used_table_names.append(unique_name()) + table_name = previously_used_table_names.pop() + else: + table_name = unique_name() table = keyspace + "." + table_name - await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra) + await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra, host=host) try: yield table finally: - await cql.run_async("DROP TABLE " + table) - previously_used_table_names.append(table_name) + await cql.run_async("DROP TABLE " + table, host=host) + if reuse_tables: + previously_used_table_names.append(table_name) @asynccontextmanager async def new_materialized_view(cql, table, select, pk, where, extra=""): @@ -532,3 +535,18 @@ async def get_raft_log_size(cql, host) -> int: async def get_raft_snap_id(cql, host) -> str: query = "select snapshot_id from system.raft limit 1" return (await cql.run_async(query, host=host))[0].snapshot_id + + +@contextmanager +def disable_schema_agreement_wait(cql: Session): + """ + A context manager that temporarily disables the schema agreement wait + for the given cql session. + """ + assert hasattr(cql.cluster, "max_schema_agreement_wait") + old_value = cql.cluster.max_schema_agreement_wait + cql.cluster.max_schema_agreement_wait = 0 + try: + yield + finally: + cql.cluster.max_schema_agreement_wait = old_value diff --git a/test/topology_experimental_raft/test_tombstone_gc.py b/test/topology_experimental_raft/test_tombstone_gc.py index 36e4b24c28..3a594f7b02 100644 --- a/test/topology_experimental_raft/test_tombstone_gc.py +++ b/test/topology_experimental_raft/test_tombstone_gc.py @@ -3,11 +3,17 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later # +import asyncio +from datetime import datetime, timezone +from functools import partial +import json import logging +import time import pytest from test.pylib.manager_client import ManagerClient -from test.topology.util import new_test_keyspace, new_test_table +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts +from test.topology.util import disable_schema_agreement_wait, new_test_keyspace, new_test_table logger = logging.getLogger(__name__) @@ -44,3 +50,149 @@ async def test_default_tombstone_gc_does_not_override(manager: ManagerClient, rf async with new_test_table(cql, keyspace, "p int primary key, x int", " with tombstone_gc = {'mode': 'disabled'}") as table: await cql.run_async(f"ALTER TABLE {table} add y int") check_tombstone_gc_mode(cql, table, "disabled") + + +@pytest.mark.asyncio +@pytest.mark.xfail(reason="issue #15607") +async def test_group0_tombstone_gc(manager: ManagerClient): + """ + Regression test for #15607. + + Test that the tombstones are being cleaned after the group 0 catching up. + + Test #1: + Arrange: + - start 3 nodes + Act: + - create new group0 tombstones by updating the schema (create/alter/delete randome tables) + Assert: + - the tombstones are cleaned up eventually + + Test #2: + Arrange: + - stop one of the nodes + Act: + - create new group0 tombstones by updating the schema (create/alter/delete randome tables) + Assert: + - the tombstones are not cleaned up (one of the nodes is down, so they can't catch up) + + Test #3: + Arrange: + - start the node again + Act: + - tombstones exist from the previous test + Assert: + - the tombstones are cleaned up eventually + """ + cmdline = [ + # disabling caches as the tombstones still remain in the cache even after the compaction + # (the alternative would be to drop the caches after the compaction or to filter the mutation fragments listing) + '--enable-cache', '0', + ] + servers = [await manager.server_add(cmdline=cmdline) for _ in range(3)] + + cql = manager.get_cql() + hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + for s in servers] + + host_primary = hosts[0] + + # create/alter/drop a few tables + async def alter_system_schema(keyspace=None, table_count=3): + if not keyspace: + async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace: + alter_system_schema(keyspace, table_count) + return + + for _ in range(table_count): + async with new_test_table(cql, keyspace, "p int primary key, x int", host=host_primary, reuse_tables=False) as table: + await cql.run_async(f"ALTER TABLE {table} add y int") + + def get_tombstone(row): + if row.metadata is None: + return None + metadata = json.loads(row.metadata) + return metadata.get("tombstone") + + async def list_tombstones(tombstone_mark, host): + tombstones = {} + for tbl in ("tables", "columns"): + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS(system_schema.{tbl})", host=host)) + tombstones[tbl] = [] + for row in res: + tombstone = get_tombstone(row) + if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark: + tombstones[tbl].append(tombstone) + return tombstones + + async def tombstone_gc_completed(tombstone_mark): + # flush and compact the keyspace + await asyncio.gather(*[manager.api.keyspace_flush(srv.ip_addr, "system_schema") + for srv in servers]) + await asyncio.gather(*[manager.api.keyspace_compaction(srv.ip_addr, "system_schema") + for srv in servers]) + + # check the remanining tombstones + tombstones_count_total = 0 + tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host) + for host in hosts]) + for tombstones in tombstones_per_host: + for tbl in tombstones.keys(): + tombstones_remaining = tombstones[tbl] + tombstones_count = len(tombstones_remaining) + tombstones_count_total += tombstones_count + logger.info(f"{tbl} tombstones remaining: {tombstones_count}") + if tombstones_count_total != 0: + return None + return True + + # should usually run much faster than 30s, but left some margin to avoid flakiness + async def verify_tombstone_gc(tombstone_mark, timeout=30): + # wait for 2 sec to let the current tombstones fully expire + await asyncio.sleep(2) + + deadline = time.time() + timeout + + # perform a single change to generate a new state_id and set the previous tombstones to expire + # (this is needed because we deduct 1s of the tombstone expiration time to account for the changes coming + # in the same second) + await alter_system_schema(table_count=1) + + await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline) + + with disable_schema_agreement_wait(cql): + async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace: + await alter_system_schema(keyspace) + tombstone_mark = datetime.now(timezone.utc) + + # test #1: the tombstones are cleaned up eventually + await verify_tombstone_gc(tombstone_mark) + + # shut down one server + down_server = servers.pop() + down_host = hosts.pop() + await manager.server_stop_gracefully(down_server.server_id) + await asyncio.gather(*[manager.server_not_sees_other_server(srv.ip_addr, down_server.ip_addr) + for srv in servers]) + + await alter_system_schema(keyspace) + tombstone_mark = datetime.now(timezone.utc) + + # test #2: the tombstones are not cleaned up when one node is down + with pytest.raises(AssertionError, match="Deadline exceeded"): + # waiting for shorter time (5s normally enough for a successful case, we expect the timeout here) + await verify_tombstone_gc(tombstone_mark, timeout=5) + + # start the server again + await manager.server_start(down_server.server_id) + await asyncio.gather(*[manager.server_sees_other_server(srv.ip_addr, down_server.ip_addr) + for srv in servers]) + + servers.append(down_server) + hosts.append(down_host) + + # make sure the hosts are available (avoid test flakiness because of ConnectionError) + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # test #3: the tombstones are cleaned up after the node is started again + await verify_tombstone_gc(tombstone_mark)