raft/test: add test for the group0 tombstone GC
Test that the group0 fast tombstone GC works correctly.
This commit is contained in:
@@ -13,7 +13,7 @@ import functools
|
||||
import operator
|
||||
import time
|
||||
import re
|
||||
from contextlib import suppress
|
||||
from contextlib import asynccontextmanager, contextmanager, suppress
|
||||
|
||||
from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvailable, Session, SimpleStatement # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
@@ -22,7 +22,6 @@ from test.pylib.internal_types import ServerInfo, HostID
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional, List
|
||||
|
||||
|
||||
@@ -472,22 +471,22 @@ async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_e
|
||||
await wait_for(new_coordinator_elected, deadline=deadline)
|
||||
|
||||
@asynccontextmanager
|
||||
async def new_test_keyspace(cql, opts):
|
||||
async def new_test_keyspace(cql, opts, host=None):
|
||||
"""
|
||||
A utility function for creating a new temporary keyspace with given
|
||||
options. It can be used in a "async with", as:
|
||||
async with new_test_keyspace(cql, '...') as keyspace:
|
||||
"""
|
||||
keyspace = unique_name()
|
||||
await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts)
|
||||
await cql.run_async("CREATE KEYSPACE " + keyspace + " " + opts, host=host)
|
||||
try:
|
||||
yield keyspace
|
||||
finally:
|
||||
await cql.run_async("DROP KEYSPACE " + keyspace)
|
||||
await cql.run_async("DROP KEYSPACE " + keyspace, host=host)
|
||||
|
||||
previously_used_table_names = []
|
||||
@asynccontextmanager
|
||||
async def new_test_table(cql, keyspace, schema, extra=""):
|
||||
async def new_test_table(cql, keyspace, schema, extra="", host=None, reuse_tables=True):
|
||||
"""
|
||||
A utility function for creating a new temporary table with a given schema.
|
||||
Because Scylla becomes slower when a huge number of uniquely-named tables
|
||||
@@ -498,16 +497,20 @@ async def new_test_table(cql, keyspace, schema, extra=""):
|
||||
async with create_table(cql, test_keyspace, '...') as table:
|
||||
"""
|
||||
global previously_used_table_names
|
||||
if not previously_used_table_names:
|
||||
previously_used_table_names.append(unique_name())
|
||||
table_name = previously_used_table_names.pop()
|
||||
if reuse_tables:
|
||||
if not previously_used_table_names:
|
||||
previously_used_table_names.append(unique_name())
|
||||
table_name = previously_used_table_names.pop()
|
||||
else:
|
||||
table_name = unique_name()
|
||||
table = keyspace + "." + table_name
|
||||
await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra)
|
||||
await cql.run_async("CREATE TABLE " + table + "(" + schema + ")" + extra, host=host)
|
||||
try:
|
||||
yield table
|
||||
finally:
|
||||
await cql.run_async("DROP TABLE " + table)
|
||||
previously_used_table_names.append(table_name)
|
||||
await cql.run_async("DROP TABLE " + table, host=host)
|
||||
if reuse_tables:
|
||||
previously_used_table_names.append(table_name)
|
||||
|
||||
@asynccontextmanager
|
||||
async def new_materialized_view(cql, table, select, pk, where, extra=""):
|
||||
@@ -532,3 +535,18 @@ async def get_raft_log_size(cql, host) -> int:
|
||||
async def get_raft_snap_id(cql, host) -> str:
|
||||
query = "select snapshot_id from system.raft limit 1"
|
||||
return (await cql.run_async(query, host=host))[0].snapshot_id
|
||||
|
||||
|
||||
@contextmanager
|
||||
def disable_schema_agreement_wait(cql: Session):
|
||||
"""
|
||||
A context manager that temporarily disables the schema agreement wait
|
||||
for the given cql session.
|
||||
"""
|
||||
assert hasattr(cql.cluster, "max_schema_agreement_wait")
|
||||
old_value = cql.cluster.max_schema_agreement_wait
|
||||
cql.cluster.max_schema_agreement_wait = 0
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
cql.cluster.max_schema_agreement_wait = old_value
|
||||
|
||||
@@ -3,11 +3,17 @@
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
from functools import partial
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.topology.util import new_test_keyspace, new_test_table
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.topology.util import disable_schema_agreement_wait, new_test_keyspace, new_test_table
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,3 +50,149 @@ async def test_default_tombstone_gc_does_not_override(manager: ManagerClient, rf
|
||||
async with new_test_table(cql, keyspace, "p int primary key, x int", " with tombstone_gc = {'mode': 'disabled'}") as table:
|
||||
await cql.run_async(f"ALTER TABLE {table} add y int")
|
||||
check_tombstone_gc_mode(cql, table, "disabled")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.xfail(reason="issue #15607")
|
||||
async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
"""
|
||||
Regression test for #15607.
|
||||
|
||||
Test that the tombstones are being cleaned after the group 0 catching up.
|
||||
|
||||
Test #1:
|
||||
Arrange:
|
||||
- start 3 nodes
|
||||
Act:
|
||||
- create new group0 tombstones by updating the schema (create/alter/delete randome tables)
|
||||
Assert:
|
||||
- the tombstones are cleaned up eventually
|
||||
|
||||
Test #2:
|
||||
Arrange:
|
||||
- stop one of the nodes
|
||||
Act:
|
||||
- create new group0 tombstones by updating the schema (create/alter/delete randome tables)
|
||||
Assert:
|
||||
- the tombstones are not cleaned up (one of the nodes is down, so they can't catch up)
|
||||
|
||||
Test #3:
|
||||
Arrange:
|
||||
- start the node again
|
||||
Act:
|
||||
- tombstones exist from the previous test
|
||||
Assert:
|
||||
- the tombstones are cleaned up eventually
|
||||
"""
|
||||
cmdline = [
|
||||
# disabling caches as the tombstones still remain in the cache even after the compaction
|
||||
# (the alternative would be to drop the caches after the compaction or to filter the mutation fragments listing)
|
||||
'--enable-cache', '0',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline) for _ in range(3)]
|
||||
|
||||
cql = manager.get_cql()
|
||||
hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
|
||||
for s in servers]
|
||||
|
||||
host_primary = hosts[0]
|
||||
|
||||
# create/alter/drop a few tables
|
||||
async def alter_system_schema(keyspace=None, table_count=3):
|
||||
if not keyspace:
|
||||
async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace:
|
||||
alter_system_schema(keyspace, table_count)
|
||||
return
|
||||
|
||||
for _ in range(table_count):
|
||||
async with new_test_table(cql, keyspace, "p int primary key, x int", host=host_primary, reuse_tables=False) as table:
|
||||
await cql.run_async(f"ALTER TABLE {table} add y int")
|
||||
|
||||
def get_tombstone(row):
|
||||
if row.metadata is None:
|
||||
return None
|
||||
metadata = json.loads(row.metadata)
|
||||
return metadata.get("tombstone")
|
||||
|
||||
async def list_tombstones(tombstone_mark, host):
|
||||
tombstones = {}
|
||||
for tbl in ("tables", "columns"):
|
||||
res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS(system_schema.{tbl})", host=host))
|
||||
tombstones[tbl] = []
|
||||
for row in res:
|
||||
tombstone = get_tombstone(row)
|
||||
if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark:
|
||||
tombstones[tbl].append(tombstone)
|
||||
return tombstones
|
||||
|
||||
async def tombstone_gc_completed(tombstone_mark):
|
||||
# flush and compact the keyspace
|
||||
await asyncio.gather(*[manager.api.keyspace_flush(srv.ip_addr, "system_schema")
|
||||
for srv in servers])
|
||||
await asyncio.gather(*[manager.api.keyspace_compaction(srv.ip_addr, "system_schema")
|
||||
for srv in servers])
|
||||
|
||||
# check the remanining tombstones
|
||||
tombstones_count_total = 0
|
||||
tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host)
|
||||
for host in hosts])
|
||||
for tombstones in tombstones_per_host:
|
||||
for tbl in tombstones.keys():
|
||||
tombstones_remaining = tombstones[tbl]
|
||||
tombstones_count = len(tombstones_remaining)
|
||||
tombstones_count_total += tombstones_count
|
||||
logger.info(f"{tbl} tombstones remaining: {tombstones_count}")
|
||||
if tombstones_count_total != 0:
|
||||
return None
|
||||
return True
|
||||
|
||||
# should usually run much faster than 30s, but left some margin to avoid flakiness
|
||||
async def verify_tombstone_gc(tombstone_mark, timeout=30):
|
||||
# wait for 2 sec to let the current tombstones fully expire
|
||||
await asyncio.sleep(2)
|
||||
|
||||
deadline = time.time() + timeout
|
||||
|
||||
# perform a single change to generate a new state_id and set the previous tombstones to expire
|
||||
# (this is needed because we deduct 1s of the tombstone expiration time to account for the changes coming
|
||||
# in the same second)
|
||||
await alter_system_schema(table_count=1)
|
||||
|
||||
await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline)
|
||||
|
||||
with disable_schema_agreement_wait(cql):
|
||||
async with new_test_keyspace(cql, "with replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 2}", host=host_primary) as keyspace:
|
||||
await alter_system_schema(keyspace)
|
||||
tombstone_mark = datetime.now(timezone.utc)
|
||||
|
||||
# test #1: the tombstones are cleaned up eventually
|
||||
await verify_tombstone_gc(tombstone_mark)
|
||||
|
||||
# shut down one server
|
||||
down_server = servers.pop()
|
||||
down_host = hosts.pop()
|
||||
await manager.server_stop_gracefully(down_server.server_id)
|
||||
await asyncio.gather(*[manager.server_not_sees_other_server(srv.ip_addr, down_server.ip_addr)
|
||||
for srv in servers])
|
||||
|
||||
await alter_system_schema(keyspace)
|
||||
tombstone_mark = datetime.now(timezone.utc)
|
||||
|
||||
# test #2: the tombstones are not cleaned up when one node is down
|
||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
||||
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
|
||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||
|
||||
# start the server again
|
||||
await manager.server_start(down_server.server_id)
|
||||
await asyncio.gather(*[manager.server_sees_other_server(srv.ip_addr, down_server.ip_addr)
|
||||
for srv in servers])
|
||||
|
||||
servers.append(down_server)
|
||||
hosts.append(down_host)
|
||||
|
||||
# make sure the hosts are available (avoid test flakiness because of ConnectionError)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# test #3: the tombstones are cleaned up after the node is started again
|
||||
await verify_tombstone_gc(tombstone_mark)
|
||||
|
||||
Reference in New Issue
Block a user