raft/test: add test for the group0 tombstone GC

Test that the group0 fast tombstone GC works correctly.
This commit is contained in:
Emil Maskovsky
2024-07-26 18:08:26 +02:00
parent a840949ea0
commit fa45fdf5f7
2 changed files with 183 additions and 13 deletions

View File

@@ -13,7 +13,7 @@ import functools
import operator
import time
import re
from contextlib import suppress
from contextlib import asynccontextmanager, contextmanager, suppress
from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvailable, Session, SimpleStatement # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
@@ -22,7 +22,6 @@ from test.pylib.internal_types import ServerInfo, HostID
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import get_host_api_address, read_barrier
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
from contextlib import asynccontextmanager
from typing import Optional, List
@@ -472,22 +471,22 @@ async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_e
await wait_for(new_coordinator_elected, deadline=deadline)
@asynccontextmanager
async def new_test_keyspace(cql, opts):
async def new_test_keyspace(cql, opts, host=None):
"""
A utility function for creating a new temporary keyspace with given
options. It can be used in a "async with", as:
async with new_test_keyspace(cql, '...') as keyspace:
"""
keyspace = unique_name()
await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts)
await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts, host=host)
try:
yield keyspace
finally:
await cql.run_async("DROP KEYSPACE " + keyspace)
await cql.run_async("DROP KEYSPACE " + keyspace, host=host)
previously_used_table_names = []
@asynccontextmanager
async def new_test_table(cql, keyspace, schema, extra=""):
async def new_test_table(cql, keyspace, schema, extra="", host=None, reuse_tables=True):
"""
A utility function for creating a new temporary table with a given schema.
Because Scylla becomes slower when a huge number of uniquely-named tables
@@ -498,16 +497,20 @@ async def new_test_table(cql, keyspace, schema, extra=""):
async with create_table(cql, test_keyspace, '...') as table:
"""
global previously_used_table_names
if not previously_used_table_names:
previously_used_table_names.append(unique_name())
table_name = previously_used_table_names.pop()
if reuse_tables:
if not previously_used_table_names:
previously_used_table_names.append(unique_name())
table_name = previously_used_table_names.pop()
else:
table_name = unique_name()
table = keyspace + "." + table_name
await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra)
await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra, host=host)
try:
yield table
finally:
await cql.run_async("DROP TABLE " + table)
previously_used_table_names.append(table_name)
await cql.run_async("DROP TABLE " + table, host=host)
if reuse_tables:
previously_used_table_names.append(table_name)
@asynccontextmanager
async def new_materialized_view(cql, table, select, pk, where, extra=""):
@@ -532,3 +535,18 @@ async def get_raft_log_size(cql, host) -> int:
async def get_raft_snap_id(cql, host) -> str:
query = "select snapshot_id from system.raft limit 1"
return (await cql.run_async(query, host=host))[0].snapshot_id
@contextmanager
def disable_schema_agreement_wait(cql: Session):
"""
A context manager that temporarily disables the schema agreement wait
for the given cql session.
"""
assert hasattr(cql.cluster, "max_schema_agreement_wait")
old_value = cql.cluster.max_schema_agreement_wait
cql.cluster.max_schema_agreement_wait = 0
try:
yield
finally:
cql.cluster.max_schema_agreement_wait = old_value

View File

@@ -3,11 +3,17 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
from datetime import datetime, timezone
from functools import partial
import json
import logging
import time
import pytest
from test.pylib.manager_client import ManagerClient
from test.topology.util import new_test_keyspace, new_test_table
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.topology.util import disable_schema_agreement_wait, new_test_keyspace, new_test_table
logger = logging.getLogger(__name__)
@@ -44,3 +50,149 @@ async def test_default_tombstone_gc_does_not_override(manager: ManagerClient, rf
async with new_test_table(cql, keyspace, "p int primary key, x int", " with tombstone_gc = {'mode': 'disabled'}") as table:
await cql.run_async(f"ALTER TABLE {table} add y int")
check_tombstone_gc_mode(cql, table, "disabled")
@pytest.mark.asyncio
@pytest.mark.xfail(reason="issue #15607")
async def test_group0_tombstone_gc(manager: ManagerClient):
"""
Regression test for #15607.
Test that the tombstones are being cleaned after the group 0 catching up.
Test #1:
Arrange:
- start 3 nodes
Act:
- create new group0 tombstones by updating the schema (create/alter/delete randome tables)
Assert:
- the tombstones are cleaned up eventually
Test #2:
Arrange:
- stop one of the nodes
Act:
- create new group0 tombstones by updating the schema (create/alter/delete randome tables)
Assert:
- the tombstones are not cleaned up (one of the nodes is down, so they can't catch up)
Test #3:
Arrange:
- start the node again
Act:
- tombstones exist from the previous test
Assert:
- the tombstones are cleaned up eventually
"""
cmdline = [
# disabling caches as the tombstones still remain in the cache even after the compaction
# (the alternative would be to drop the caches after the compaction or to filter the mutation fragments listing)
'--enable-cache', '0',
]
servers = [await manager.server_add(cmdline=cmdline) for _ in range(3)]
cql = manager.get_cql()
hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
for s in servers]
host_primary = hosts[0]
# create/alter/drop a few tables
async def alter_system_schema(keyspace=None, table_count=3):
if not keyspace:
async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace:
alter_system_schema(keyspace, table_count)
return
for _ in range(table_count):
async with new_test_table(cql, keyspace, "p int primary key, x int", host=host_primary, reuse_tables=False) as table:
await cql.run_async(f"ALTER TABLE {table} add y int")
def get_tombstone(row):
if row.metadata is None:
return None
metadata = json.loads(row.metadata)
return metadata.get("tombstone")
async def list_tombstones(tombstone_mark, host):
tombstones = {}
for tbl in ("tables", "columns"):
res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS(system_schema.{tbl})", host=host))
tombstones[tbl] = []
for row in res:
tombstone = get_tombstone(row)
if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark:
tombstones[tbl].append(tombstone)
return tombstones
async def tombstone_gc_completed(tombstone_mark):
# flush and compact the keyspace
await asyncio.gather(*[manager.api.keyspace_flush(srv.ip_addr, "system_schema")
for srv in servers])
await asyncio.gather(*[manager.api.keyspace_compaction(srv.ip_addr, "system_schema")
for srv in servers])
# check the remanining tombstones
tombstones_count_total = 0
tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host)
for host in hosts])
for tombstones in tombstones_per_host:
for tbl in tombstones.keys():
tombstones_remaining = tombstones[tbl]
tombstones_count = len(tombstones_remaining)
tombstones_count_total += tombstones_count
logger.info(f"{tbl} tombstones remaining: {tombstones_count}")
if tombstones_count_total != 0:
return None
return True
# should usually run much faster than 30s, but left some margin to avoid flakiness
async def verify_tombstone_gc(tombstone_mark, timeout=30):
# wait for 2 sec to let the current tombstones fully expire
await asyncio.sleep(2)
deadline = time.time() + timeout
# perform a single change to generate a new state_id and set the previous tombstones to expire
# (this is needed because we deduct 1s of the tombstone expiration time to account for the changes coming
# in the same second)
await alter_system_schema(table_count=1)
await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline)
with disable_schema_agreement_wait(cql):
async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace:
await alter_system_schema(keyspace)
tombstone_mark = datetime.now(timezone.utc)
# test #1: the tombstones are cleaned up eventually
await verify_tombstone_gc(tombstone_mark)
# shut down one server
down_server = servers.pop()
down_host = hosts.pop()
await manager.server_stop_gracefully(down_server.server_id)
await asyncio.gather(*[manager.server_not_sees_other_server(srv.ip_addr, down_server.ip_addr)
for srv in servers])
await alter_system_schema(keyspace)
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="Deadline exceeded"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
# start the server again
await manager.server_start(down_server.server_id)
await asyncio.gather(*[manager.server_sees_other_server(srv.ip_addr, down_server.ip_addr)
for srv in servers])
servers.append(down_server)
hosts.append(down_host)
# make sure the hosts are available (avoid test flakiness because of ConnectionError)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #3: the tombstones are cleaned up after the node is started again
await verify_tombstone_gc(tombstone_mark)