test_fencing: add test_fence_hints
The test makes a write through the first node with the third node down, this causes a hint to be stored on the first node for the second. We increment the version and fence_version on the third node, restart it, and expect to see a hint delivery failure because of versions mismatch. Then we update the versions of the first node and expect hint to be successfully delivered.
This commit is contained in:
@@ -5,9 +5,13 @@
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables, Column, IntType, CounterType
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from cassandra import WriteFailure
|
||||
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for
|
||||
from cassandra import WriteFailure, ConsistencyLevel
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.rest_client import ScyllaMetrics
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.query import SimpleStatement
|
||||
from test.topology.conftest import skip_mode
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
@@ -16,6 +20,40 @@ import time
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def host_by_server(hosts: list[Host], srv: ServerInfo):
|
||||
for h in hosts:
|
||||
if h.address == srv.ip_addr:
|
||||
return h
|
||||
raise ValueError(f"can't find host for server {srv}")
|
||||
|
||||
|
||||
async def set_version(manager: ManagerClient, host: Host, new_version: int):
|
||||
await manager.cql.run_async("update system.topology set version=%s where key = 'topology'",
|
||||
parameters=[new_version],
|
||||
host=host)
|
||||
|
||||
|
||||
async def set_fence_version(manager: ManagerClient, host: Host, new_version: int):
|
||||
await manager.cql.run_async("update system.scylla_local set value=%s where key = 'topology_fence_version'",
|
||||
parameters=[str(new_version)],
|
||||
host=host)
|
||||
|
||||
|
||||
async def get_version(manager: ManagerClient, host: Host):
|
||||
rows = await manager.cql.run_async(
|
||||
"select version from system.topology where key = 'topology'",
|
||||
host=host)
|
||||
return rows[0].version
|
||||
|
||||
|
||||
def send_errors_metric(metrics: ScyllaMetrics):
|
||||
return metrics.get('scylla_hints_manager_send_errors')
|
||||
|
||||
|
||||
def sent_metric(metrics: ScyllaMetrics):
|
||||
return metrics.get('scylla_hints_manager_sent')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fence_writes(request, manager: ManagerClient):
|
||||
logger.info("Bootstrapping first two nodes")
|
||||
@@ -41,13 +79,10 @@ async def test_fence_writes(request, manager: ManagerClient):
|
||||
logger.info(f'Waiting for cql and hosts')
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0]
|
||||
|
||||
version = (await manager.cql.run_async(
|
||||
"select version from system.topology where key = 'topology'",
|
||||
host=host2))[0].version
|
||||
version = await get_version(manager, host2)
|
||||
logger.info(f"version on host2 {version}")
|
||||
|
||||
await manager.cql.run_async(f"update system.topology set version={version - 1} where key = 'topology'",
|
||||
host=host2)
|
||||
await set_version(manager, host2, version - 1)
|
||||
logger.info(f"decremented version on host2")
|
||||
|
||||
await manager.server_restart(servers[2].server_id, wait_others=2)
|
||||
@@ -64,3 +99,78 @@ async def test_fence_writes(request, manager: ManagerClient):
|
||||
await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
|
||||
|
||||
random_tables.drop_all()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_fence_hints(request, manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster with three nodes")
|
||||
s0 = await manager.server_add(config={
|
||||
'error_injections_at_startup': ['decrease_hints_flush_period']
|
||||
})
|
||||
s1 = await manager.server_add()
|
||||
s2 = await manager.server_add()
|
||||
|
||||
logger.info(f'Creating test table')
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
|
||||
table1 = await random_tables.add_table(name='t1', pks=1, columns=[
|
||||
Column("pk", IntType),
|
||||
Column('int_c', IntType)
|
||||
])
|
||||
await manager.cql.run_async(f"USE {random_tables.keyspace}")
|
||||
|
||||
logger.info(f'Waiting for cql and hosts')
|
||||
hosts = await wait_for_cql_and_get_hosts(manager.cql, [s0, s2], time.time() + 60)
|
||||
|
||||
host2 = host_by_server(hosts, s2)
|
||||
new_version = (await get_version(manager, host2)) + 1
|
||||
logger.info(f"Set version and fence_version to {new_version} on node {host2}")
|
||||
await set_version(manager, host2, new_version)
|
||||
await set_fence_version(manager, host2, new_version)
|
||||
|
||||
select_all_stmt = SimpleStatement("select * from t1", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await manager.cql.run_async(select_all_stmt, host=host2)
|
||||
assert len(list(rows)) == 0
|
||||
|
||||
logger.info(f"Stopping node {host2}")
|
||||
await manager.server_stop_gracefully(s2.server_id)
|
||||
|
||||
host0 = host_by_server(hosts, s0)
|
||||
logger.info(f"Writing through {host0} to regular column")
|
||||
await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host0)
|
||||
|
||||
logger.info(f"Starting last node {host2}")
|
||||
await manager.server_start(s2.server_id)
|
||||
|
||||
logger.info(f"Waiting for failed hints on {host0}")
|
||||
async def at_least_one_hint_failed():
|
||||
metrics_data = await manager.metrics.query(s0.ip_addr)
|
||||
if send_errors_metric(metrics_data) >= 1 and sent_metric(metrics_data) == 0:
|
||||
return True
|
||||
logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}")
|
||||
await wait_for(at_least_one_hint_failed, time.time() + 5)
|
||||
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.cql, [s2], time.time() + 60))[0]
|
||||
|
||||
# Check there is no new data on host2.
|
||||
rows = await manager.cql.run_async(select_all_stmt, host=host2)
|
||||
assert len(list(rows)) == 0
|
||||
|
||||
logger.info("Restarting first node with new version")
|
||||
await set_version(manager, host0, new_version)
|
||||
await set_fence_version(manager, host0, new_version)
|
||||
await manager.server_restart(s0.server_id, wait_others=2)
|
||||
|
||||
logger.info(f"Waiting for sent hints on {host0}")
|
||||
async def exactly_one_hint_sent():
|
||||
metrics_data = await manager.metrics.query(s0.ip_addr)
|
||||
if send_errors_metric(metrics_data) == 0 and sent_metric(metrics_data) == 1:
|
||||
return True
|
||||
logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}")
|
||||
await wait_for(exactly_one_hint_sent, time.time() + 5)
|
||||
|
||||
# Check the hint is delivered, and we see the new data on host2
|
||||
rows = await manager.cql.run_async(select_all_stmt, host=host2)
|
||||
assert len(list(rows)) == 1
|
||||
|
||||
random_tables.drop_all()
|
||||
|
||||
Reference in New Issue
Block a user