Merge 'raft: add fencing tests' from Petr Gusev

In this PR a simple test for fencing is added. It exercises the data
plane, meaning if it somehow happens that the node has a stale topology
version, then requests from this node will get an error 'stale
topology'. The test just decrements the node version manually through
CQL, so it's quite artificial. To test a more real-world scenario we
need to allow the topology change fiber to sometimes skip unavailable
nodes. Now the algorithm fails and retries indefinitely in this case.

The PR also adds some logs, and removes one seemingly redundant topology
version increment, see the commit messages for details.

Closes #14901

* github.com:scylladb/scylladb:
  test_fencing: add test_fence_hints
  test.py: output the skipped tests
  test.py: add skip_mode decorator and fixture
  test.py: add mode fixture
  hints: add debug log for dropped hints
  hints: send_one_hint: extend the scope of file_send_gate holder
  pylib: add ScyllaMetrics
  hints manager: add send_errors counter
  token_metadata: add debug logs
  fencing: add simple data plane test
  random_tables.py: add counter column type
  raft topology: don't increment version when transitioning to node_state::normal
This commit is contained in:
Kamil Braun
2023-08-22 16:28:21 +02:00
10 changed files with 275 additions and 7 deletions

View File

@@ -12,7 +12,7 @@
from typing import List, Optional, Callable, Any
from time import time
import logging
from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient
from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient, ScyllaMetricsClient
from test.pylib.util import wait_for
from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo
from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer
@@ -43,6 +43,7 @@ class ManagerClient():
# A client for communicating with ScyllaClusterManager (server)
self.client = UnixRESTClient(sock_path)
self.api = ScyllaRESTAPIClient()
self.metrics = ScyllaMetricsClient()
async def stop(self):
"""Close driver"""

View File

@@ -91,6 +91,14 @@ class UUIDType(ValueType):
return uuid.UUID(f"{{00000000-0000-0000-0000-{seed:012}}}")
class CounterType(ValueType):
def __init__(self):
self.name: str = 'counter'
def val(self, seed: int) -> int:
return seed
class Column():
"""A column definition.
If no value type specified it picks a random one.

View File

@@ -233,6 +233,58 @@ class ScyllaRESTAPIClient():
await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip)
class ScyllaMetrics:
def __init__(self, lines: list[str]):
self.lines: list[str] = lines
def lines_by_prefix(self, prefix: str):
"""Returns all metrics whose name starts with a prefix, e.g.
metrics.lines_by_prefix('scylla_hints_manager_')
"""
return [l for l in self.lines if l.startswith(prefix)]
def get(self, name: str, labels = None, shard: str ='total'):
"""Get the metric value by name. Allows to specify additional labels filter, e.g.
metrics.get('scylla_transport_cql_errors_total', {'type': 'protocol_error'}).
If shard is not set, returns the sum of metric values across all shards,
otherwise returns the metric value from the specified shard.
"""
result = None
for l in self.lines:
if not l.startswith(name):
continue
labels_start = l.find('{')
labels_finish = l.find('}')
if labels_start == -1 or labels_finish == -1:
raise ValueError(f'invalid metric format [{l}]')
def match_kv(kv):
key, val = kv.split('=')
val = val.strip('"')
return shard == 'total' or val == shard if key == 'shard' \
else labels is None or labels.get(key, None) == val
match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(','))
if match:
value = float(l[labels_finish + 2:])
if result is None:
result = value
else:
result += value
if shard != 'total':
break
return result
class ScyllaMetricsClient:
"""Async Scylla Metrics API client"""
def __init__(self, port: int = 9180):
self.client = TCPRESTClient(port)
async def query(self, server_ip: IPAddress) -> ScyllaMetrics:
data = await self.client.get_text('/metrics', host=server_ip)
return ScyllaMetrics(data.split('\n'))
class InjectionHandler():
"""An async client for communicating with injected code by REST API"""
def __init__(self, api: ScyllaRESTAPIClient, injection: str, node_ip: str):

View File

@@ -36,6 +36,8 @@ print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
def pytest_addoption(parser):
parser.addoption('--manager-api', action='store', required=True,
help='Manager unix socket path')
parser.addoption('--mode', action='store', required=True,
help='Scylla build mode. Tests can use it to adjust their behavior.')
parser.addoption('--host', action='store', default='localhost',
help='CQL server host to connect to')
parser.addoption('--port', action='store', default='9042',
@@ -197,3 +199,20 @@ async def random_tables(request, manager):
failed = request.node.stash[FAILED_KEY]
if not failed and not await manager.is_dirty():
tables.drop_all()
@pytest.fixture(scope="function")
def mode(request):
return request.config.getoption('mode')
skipped_funcs = {}
def skip_mode(mode: str, reason: str):
def wrap(func):
skipped_funcs[(func, mode)] = reason
return func
return wrap
@pytest.fixture(scope="function", autouse=True)
def skip_mode_fixture(request, mode):
skip_reason = skipped_funcs.get((request.function, mode))
if skip_reason is not None:
pytest.skip(f'{request.node.name} skipped, reason: {skip_reason}')

View File

@@ -0,0 +1,176 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, Column, IntType, CounterType
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for
from cassandra import WriteFailure, ConsistencyLevel
from test.pylib.internal_types import ServerInfo
from test.pylib.rest_client import ScyllaMetrics
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement
from test.topology.conftest import skip_mode
import pytest
import logging
import time
logger = logging.getLogger(__name__)
def host_by_server(hosts: list[Host], srv: ServerInfo):
for h in hosts:
if h.address == srv.ip_addr:
return h
raise ValueError(f"can't find host for server {srv}")
async def set_version(manager: ManagerClient, host: Host, new_version: int):
await manager.cql.run_async("update system.topology set version=%s where key = 'topology'",
parameters=[new_version],
host=host)
async def set_fence_version(manager: ManagerClient, host: Host, new_version: int):
await manager.cql.run_async("update system.scylla_local set value=%s where key = 'topology_fence_version'",
parameters=[str(new_version)],
host=host)
async def get_version(manager: ManagerClient, host: Host):
rows = await manager.cql.run_async(
"select version from system.topology where key = 'topology'",
host=host)
return rows[0].version
def send_errors_metric(metrics: ScyllaMetrics):
return metrics.get('scylla_hints_manager_send_errors')
def sent_metric(metrics: ScyllaMetrics):
return metrics.get('scylla_hints_manager_sent')
@pytest.mark.asyncio
async def test_fence_writes(request, manager: ManagerClient):
logger.info("Bootstrapping first two nodes")
servers = [await manager.server_add(), await manager.server_add()]
# The third node is started as the last one, so we can be sure that is has
# the latest topology version
logger.info("Bootstrapping the last node")
servers += [await manager.server_add()]
logger.info(f'Creating new tables')
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
table1 = await random_tables.add_table(name='t1', pks=1, columns=[
Column("pk", IntType),
Column('int_c', IntType)
])
table2 = await random_tables.add_table(name='t2', pks=1, columns=[
Column("pk", IntType),
Column('counter_c', CounterType)
])
await manager.cql.run_async(f"USE {random_tables.keyspace}")
logger.info(f'Waiting for cql and hosts')
host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0]
version = await get_version(manager, host2)
logger.info(f"version on host2 {version}")
await set_version(manager, host2, version - 1)
logger.info(f"decremented version on host2")
await manager.server_restart(servers[2].server_id, wait_others=2)
logger.info(f"host2 restarted")
host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0]
logger.info(f"trying to write through host2 to regular column [{host2}]")
with pytest.raises(WriteFailure, match="stale topology exception"):
await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host2)
logger.info(f"trying to write through host2 to counter column [{host2}]")
with pytest.raises(WriteFailure, match="stale topology exception"):
await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
random_tables.drop_all()
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_fence_hints(request, manager: ManagerClient):
logger.info("Bootstrapping cluster with three nodes")
s0 = await manager.server_add(config={
'error_injections_at_startup': ['decrease_hints_flush_period']
})
s1 = await manager.server_add()
s2 = await manager.server_add()
logger.info(f'Creating test table')
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)
table1 = await random_tables.add_table(name='t1', pks=1, columns=[
Column("pk", IntType),
Column('int_c', IntType)
])
await manager.cql.run_async(f"USE {random_tables.keyspace}")
logger.info(f'Waiting for cql and hosts')
hosts = await wait_for_cql_and_get_hosts(manager.cql, [s0, s2], time.time() + 60)
host2 = host_by_server(hosts, s2)
new_version = (await get_version(manager, host2)) + 1
logger.info(f"Set version and fence_version to {new_version} on node {host2}")
await set_version(manager, host2, new_version)
await set_fence_version(manager, host2, new_version)
select_all_stmt = SimpleStatement("select * from t1", consistency_level=ConsistencyLevel.ONE)
rows = await manager.cql.run_async(select_all_stmt, host=host2)
assert len(list(rows)) == 0
logger.info(f"Stopping node {host2}")
await manager.server_stop_gracefully(s2.server_id)
host0 = host_by_server(hosts, s0)
logger.info(f"Writing through {host0} to regular column")
await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host0)
logger.info(f"Starting last node {host2}")
await manager.server_start(s2.server_id)
logger.info(f"Waiting for failed hints on {host0}")
async def at_least_one_hint_failed():
metrics_data = await manager.metrics.query(s0.ip_addr)
if send_errors_metric(metrics_data) >= 1 and sent_metric(metrics_data) == 0:
return True
logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}")
await wait_for(at_least_one_hint_failed, time.time() + 5)
host2 = (await wait_for_cql_and_get_hosts(manager.cql, [s2], time.time() + 60))[0]
# Check there is no new data on host2.
rows = await manager.cql.run_async(select_all_stmt, host=host2)
assert len(list(rows)) == 0
logger.info("Restarting first node with new version")
await set_version(manager, host0, new_version)
await set_fence_version(manager, host0, new_version)
await manager.server_restart(s0.server_id, wait_others=2)
logger.info(f"Waiting for sent hints on {host0}")
async def exactly_one_hint_sent():
metrics_data = await manager.metrics.query(s0.ip_addr)
if send_errors_metric(metrics_data) == 0 and sent_metric(metrics_data) == 1:
return True
logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}")
await wait_for(exactly_one_hint_sent, time.time() + 5)
# Check the hint is delivered, and we see the new data on host2
rows = await manager.cql.run_async(select_all_stmt, host=host2)
assert len(list(rows)) == 1
random_tables.drop_all()