From 1ddc76ffd1e1ea331de6dc5ea40099bc596329fe Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 14 Aug 2023 14:20:50 +0400 Subject: [PATCH] test_fencing: add test_fence_hints The test makes a write through the first node with the third node down, this causes a hint to be stored on the first node for the second. We increment the version and fence_version on the third node, restart it, and expect to see a hint delivery failure because of versions mismatch. Then we update the versions of the first node and expect hint to be successfully delivered. --- .../test_fencing.py | 126 ++++++++++++++++-- 1 file changed, 118 insertions(+), 8 deletions(-) diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py index 3b197baaa3..bd901eda5f 100644 --- a/test/topology_experimental_raft/test_fencing.py +++ b/test/topology_experimental_raft/test_fencing.py @@ -5,9 +5,13 @@ # from test.pylib.manager_client import ManagerClient from test.pylib.random_tables import RandomTables, Column, IntType, CounterType -from test.pylib.util import unique_name, wait_for_cql_and_get_hosts -from cassandra import WriteFailure - +from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for +from cassandra import WriteFailure, ConsistencyLevel +from test.pylib.internal_types import ServerInfo +from test.pylib.rest_client import ScyllaMetrics +from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module +from cassandra.query import SimpleStatement +from test.topology.conftest import skip_mode import pytest import logging import time @@ -16,6 +20,40 @@ import time logger = logging.getLogger(__name__) +def host_by_server(hosts: list[Host], srv: ServerInfo): + for h in hosts: + if h.address == srv.ip_addr: + return h + raise ValueError(f"can't find host for server {srv}") + + +async def set_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.topology set version=%s where key = 'topology'", + parameters=[new_version], + host=host) + + +async def set_fence_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.scylla_local set value=%s where key = 'topology_fence_version'", + parameters=[str(new_version)], + host=host) + + +async def get_version(manager: ManagerClient, host: Host): + rows = await manager.cql.run_async( + "select version from system.topology where key = 'topology'", + host=host) + return rows[0].version + + +def send_errors_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_send_errors') + + +def sent_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_sent') + + @pytest.mark.asyncio async def test_fence_writes(request, manager: ManagerClient): logger.info("Bootstrapping first two nodes") @@ -41,13 +79,10 @@ async def test_fence_writes(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] - version = (await manager.cql.run_async( - "select version from system.topology where key = 'topology'", - host=host2))[0].version + version = await get_version(manager, host2) logger.info(f"version on host2 {version}") - await manager.cql.run_async(f"update system.topology set version={version - 1} where key = 'topology'", - host=host2) + await set_version(manager, host2, version - 1) logger.info(f"decremented version on host2") await manager.server_restart(servers[2].server_id, wait_others=2) @@ -64,3 +99,78 @@ async def test_fence_writes(request, manager: ManagerClient): await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2) random_tables.drop_all() + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_fence_hints(request, manager: ManagerClient): + logger.info("Bootstrapping cluster with three nodes") + s0 = await manager.server_add(config={ + 'error_injections_at_startup': ['decrease_hints_flush_period'] + }) + s1 = await manager.server_add() + s2 = await manager.server_add() + + logger.info(f'Creating test table') + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + table1 = await random_tables.add_table(name='t1', pks=1, columns=[ + Column("pk", IntType), + Column('int_c', IntType) + ]) + await manager.cql.run_async(f"USE {random_tables.keyspace}") + + logger.info(f'Waiting for cql and hosts') + hosts = await wait_for_cql_and_get_hosts(manager.cql, [s0, s2], time.time() + 60) + + host2 = host_by_server(hosts, s2) + new_version = (await get_version(manager, host2)) + 1 + logger.info(f"Set version and fence_version to {new_version} on node {host2}") + await set_version(manager, host2, new_version) + await set_fence_version(manager, host2, new_version) + + select_all_stmt = SimpleStatement("select * from t1", consistency_level=ConsistencyLevel.ONE) + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info(f"Stopping node {host2}") + await manager.server_stop_gracefully(s2.server_id) + + host0 = host_by_server(hosts, s0) + logger.info(f"Writing through {host0} to regular column") + await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host0) + + logger.info(f"Starting last node {host2}") + await manager.server_start(s2.server_id) + + logger.info(f"Waiting for failed hints on {host0}") + async def at_least_one_hint_failed(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) >= 1 and sent_metric(metrics_data) == 0: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(at_least_one_hint_failed, time.time() + 5) + + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [s2], time.time() + 60))[0] + + # Check there is no new data on host2. + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info("Restarting first node with new version") + await set_version(manager, host0, new_version) + await set_fence_version(manager, host0, new_version) + await manager.server_restart(s0.server_id, wait_others=2) + + logger.info(f"Waiting for sent hints on {host0}") + async def exactly_one_hint_sent(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) == 0 and sent_metric(metrics_data) == 1: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(exactly_one_hint_sent, time.time() + 5) + + # Check the hint is delivered, and we see the new data on host2 + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 1 + + random_tables.drop_all()