diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c357c10a36..e413d0aa2c 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -83,6 +83,9 @@ void manager::register_metrics(const sstring& group_name) { sm::make_counter("discarded", _stats.discarded, sm::description("Number of hints that were discarded during sending (too old, schema changed, etc.).")), + sm::make_counter("send_errors", _stats.send_errors, + sm::description("Number of unexpected errors during sending, sending will be retried later")), + sm::make_counter("corrupted_files", _stats.corrupted_files, sm::description("Number of hints files that were discarded during sending because the file was corrupted.")), @@ -863,7 +866,8 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrmark_hint_as_in_progress(rp); // Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`). - (void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { + auto h = ctx_ptr->file_send_gate.hold(); + (void)std::invoke([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { auto m = this->get_mutation(ctx_ptr, buf); gc_clock::duration gc_grace_sec = m.s->gc_grace_seconds(); @@ -872,7 +876,10 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr gc_grace_sec - manager::hints_flush_period) { + if (const auto now = gc_clock::now().time_since_epoch(); now - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) { + manager_logger.debug("send_hints(): the hint is too old, skipping it, " + "secs since file last modification {}, gc_grace_sec {}, hints_flush_period {}", + now - secs_since_file_mod, gc_grace_sec, manager::hints_flush_period); return make_ready_future<>(); } @@ -880,6 +887,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().sent; }).handle_exception([this, ctx_ptr] (auto eptr) { manager_logger.trace("send_one_hint(): failed to send to {}: {}", end_point_key(), eptr); + ++this->shard_stats().send_errors; return make_exception_future<>(std::move(eptr)); }); @@ -896,10 +904,11 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().send_errors; return make_exception_future<>(std::move(eptr)); } return make_ready_future<>(); - }).then_wrapped([this, units = std::move(units), rp, ctx_ptr] (future<>&& f) { + }).then_wrapped([this, units = std::move(units), rp, ctx_ptr, h = std::move(h)] (future<>&& f) { // Information about the error was already printed somewhere higher. // We just need to account in the ctx that sending of this hint has failed. if (!f.failed()) { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 58c066b6d0..f42f898276 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -74,6 +74,7 @@ private: uint64_t dropped = 0; uint64_t sent = 0; uint64_t discarded = 0; + uint64_t send_errors = 0; uint64_t corrupted_files = 0; }; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 921cca71bd..02bfdfaef6 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1193,6 +1193,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { _shared = std::move(tmptr); _shared->set_version_tracker(_versions_barrier.start()); + tlogger.debug("new token_metadata is set, version {}", _shared->get_version()); } void shared_token_metadata::update_fence_version(token_metadata::version_t version) { @@ -1215,6 +1216,7 @@ void shared_token_metadata::update_fence_version(token_metadata::version_t versi _fence_version, version)); } _fence_version = version; + tlogger.debug("new fence_version is set, version {}", _fence_version); } future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 440c15b4bd..edb7f0c07d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1740,7 +1740,6 @@ class topology_coordinator { case node_state::bootstrapping: { topology_mutation_builder builder(node.guard.write_timestamp()); builder.del_transition_state() - .set_version(_topo_sm._topology.version + 1) .with_node(node.id) .set("node_state", node_state::normal); co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, @@ -5860,7 +5859,6 @@ future storage_service::raft_topology_cmd_handler(shar future<> storage_service::update_fence_version(token_metadata::version_t new_version) { return container().invoke_on_all([new_version] (storage_service& ss) { - slogger.debug("update_fence_version, version {}", new_version); ss._shared_token_metadata.update_fence_version(new_version); }); } diff --git a/test.py b/test.py index 07abd2acbd..4170614aff 100755 --- a/test.py +++ b/test.py @@ -851,7 +851,8 @@ class PythonTest(Test): "--log-level=DEBUG", # Capture logs "-o", "junit_family=xunit2", - "--junit-xml={}".format(self.xmlout)] + "--junit-xml={}".format(self.xmlout), + "-rs"] if options.markers: self.args.append(f"-m={options.markers}") @@ -926,6 +927,7 @@ class TopologyTest(PythonTest): test_path = os.path.join(self.suite.options.tmpdir, self.mode) async with get_cluster_manager(self.mode + '/' + self.uname, self.suite.clusters, test_path) as manager: + self.args.insert(0, "--mode={}".format(self.mode)) self.args.insert(0, "--manager-api={}".format(manager.sock_path)) try: diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 257623d400..325200cb74 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -12,7 +12,7 @@ from typing import List, Optional, Callable, Any from time import time import logging -from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient +from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient, ScyllaMetricsClient from test.pylib.util import wait_for from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer @@ -43,6 +43,7 @@ class ManagerClient(): # A client for communicating with ScyllaClusterManager (server) self.client = UnixRESTClient(sock_path) self.api = ScyllaRESTAPIClient() + self.metrics = ScyllaMetricsClient() async def stop(self): """Close driver""" diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py index ecca26c501..b6bbb620ac 100644 --- a/test/pylib/random_tables.py +++ b/test/pylib/random_tables.py @@ -91,6 +91,14 @@ class UUIDType(ValueType): return uuid.UUID(f"{{00000000-0000-0000-0000-{seed:012}}}") +class CounterType(ValueType): + def __init__(self): + self.name: str = 'counter' + + def val(self, seed: int) -> int: + return seed + + class Column(): """A column definition. If no value type specified it picks a random one. diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 6a9aef8d86..7c98795241 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -233,6 +233,58 @@ class ScyllaRESTAPIClient(): await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip) +class ScyllaMetrics: + def __init__(self, lines: list[str]): + self.lines: list[str] = lines + + def lines_by_prefix(self, prefix: str): + """Returns all metrics whose name starts with a prefix, e.g. + metrics.lines_by_prefix('scylla_hints_manager_') + """ + return [l for l in self.lines if l.startswith(prefix)] + + def get(self, name: str, labels = None, shard: str ='total'): + """Get the metric value by name. Allows to specify additional labels filter, e.g. + metrics.get('scylla_transport_cql_errors_total', {'type': 'protocol_error'}). + If shard is not set, returns the sum of metric values across all shards, + otherwise returns the metric value from the specified shard. + """ + result = None + for l in self.lines: + if not l.startswith(name): + continue + labels_start = l.find('{') + labels_finish = l.find('}') + if labels_start == -1 or labels_finish == -1: + raise ValueError(f'invalid metric format [{l}]') + def match_kv(kv): + key, val = kv.split('=') + val = val.strip('"') + return shard == 'total' or val == shard if key == 'shard' \ + else labels is None or labels.get(key, None) == val + match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(',')) + if match: + value = float(l[labels_finish + 2:]) + if result is None: + result = value + else: + result += value + if shard != 'total': + break + return result + + +class ScyllaMetricsClient: + """Async Scylla Metrics API client""" + + def __init__(self, port: int = 9180): + self.client = TCPRESTClient(port) + + async def query(self, server_ip: IPAddress) -> ScyllaMetrics: + data = await self.client.get_text('/metrics', host=server_ip) + return ScyllaMetrics(data.split('\n')) + + class InjectionHandler(): """An async client for communicating with injected code by REST API""" def __init__(self, api: ScyllaRESTAPIClient, injection: str, node_ip: str): diff --git a/test/topology/conftest.py b/test/topology/conftest.py index 03e6920df9..2f48ae1fe0 100644 --- a/test/topology/conftest.py +++ b/test/topology/conftest.py @@ -36,6 +36,8 @@ print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}") def pytest_addoption(parser): parser.addoption('--manager-api', action='store', required=True, help='Manager unix socket path') + parser.addoption('--mode', action='store', required=True, + help='Scylla build mode. Tests can use it to adjust their behavior.') parser.addoption('--host', action='store', default='localhost', help='CQL server host to connect to') parser.addoption('--port', action='store', default='9042', @@ -197,3 +199,20 @@ async def random_tables(request, manager): failed = request.node.stash[FAILED_KEY] if not failed and not await manager.is_dirty(): tables.drop_all() + +@pytest.fixture(scope="function") +def mode(request): + return request.config.getoption('mode') + +skipped_funcs = {} +def skip_mode(mode: str, reason: str): + def wrap(func): + skipped_funcs[(func, mode)] = reason + return func + return wrap + +@pytest.fixture(scope="function", autouse=True) +def skip_mode_fixture(request, mode): + skip_reason = skipped_funcs.get((request.function, mode)) + if skip_reason is not None: + pytest.skip(f'{request.node.name} skipped, reason: {skip_reason}') diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py new file mode 100644 index 0000000000..bd901eda5f --- /dev/null +++ b/test/topology_experimental_raft/test_fencing.py @@ -0,0 +1,176 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables, Column, IntType, CounterType +from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for +from cassandra import WriteFailure, ConsistencyLevel +from test.pylib.internal_types import ServerInfo +from test.pylib.rest_client import ScyllaMetrics +from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module +from cassandra.query import SimpleStatement +from test.topology.conftest import skip_mode +import pytest +import logging +import time + + +logger = logging.getLogger(__name__) + + +def host_by_server(hosts: list[Host], srv: ServerInfo): + for h in hosts: + if h.address == srv.ip_addr: + return h + raise ValueError(f"can't find host for server {srv}") + + +async def set_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.topology set version=%s where key = 'topology'", + parameters=[new_version], + host=host) + + +async def set_fence_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.scylla_local set value=%s where key = 'topology_fence_version'", + parameters=[str(new_version)], + host=host) + + +async def get_version(manager: ManagerClient, host: Host): + rows = await manager.cql.run_async( + "select version from system.topology where key = 'topology'", + host=host) + return rows[0].version + + +def send_errors_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_send_errors') + + +def sent_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_sent') + + +@pytest.mark.asyncio +async def test_fence_writes(request, manager: ManagerClient): + logger.info("Bootstrapping first two nodes") + servers = [await manager.server_add(), await manager.server_add()] + + # The third node is started as the last one, so we can be sure that is has + # the latest topology version + logger.info("Bootstrapping the last node") + servers += [await manager.server_add()] + + logger.info(f'Creating new tables') + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + table1 = await random_tables.add_table(name='t1', pks=1, columns=[ + Column("pk", IntType), + Column('int_c', IntType) + ]) + table2 = await random_tables.add_table(name='t2', pks=1, columns=[ + Column("pk", IntType), + Column('counter_c', CounterType) + ]) + await manager.cql.run_async(f"USE {random_tables.keyspace}") + + logger.info(f'Waiting for cql and hosts') + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] + + version = await get_version(manager, host2) + logger.info(f"version on host2 {version}") + + await set_version(manager, host2, version - 1) + logger.info(f"decremented version on host2") + + await manager.server_restart(servers[2].server_id, wait_others=2) + logger.info(f"host2 restarted") + + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] + + logger.info(f"trying to write through host2 to regular column [{host2}]") + with pytest.raises(WriteFailure, match="stale topology exception"): + await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host2) + + logger.info(f"trying to write through host2 to counter column [{host2}]") + with pytest.raises(WriteFailure, match="stale topology exception"): + await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2) + + random_tables.drop_all() + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_fence_hints(request, manager: ManagerClient): + logger.info("Bootstrapping cluster with three nodes") + s0 = await manager.server_add(config={ + 'error_injections_at_startup': ['decrease_hints_flush_period'] + }) + s1 = await manager.server_add() + s2 = await manager.server_add() + + logger.info(f'Creating test table') + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + table1 = await random_tables.add_table(name='t1', pks=1, columns=[ + Column("pk", IntType), + Column('int_c', IntType) + ]) + await manager.cql.run_async(f"USE {random_tables.keyspace}") + + logger.info(f'Waiting for cql and hosts') + hosts = await wait_for_cql_and_get_hosts(manager.cql, [s0, s2], time.time() + 60) + + host2 = host_by_server(hosts, s2) + new_version = (await get_version(manager, host2)) + 1 + logger.info(f"Set version and fence_version to {new_version} on node {host2}") + await set_version(manager, host2, new_version) + await set_fence_version(manager, host2, new_version) + + select_all_stmt = SimpleStatement("select * from t1", consistency_level=ConsistencyLevel.ONE) + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info(f"Stopping node {host2}") + await manager.server_stop_gracefully(s2.server_id) + + host0 = host_by_server(hosts, s0) + logger.info(f"Writing through {host0} to regular column") + await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host0) + + logger.info(f"Starting last node {host2}") + await manager.server_start(s2.server_id) + + logger.info(f"Waiting for failed hints on {host0}") + async def at_least_one_hint_failed(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) >= 1 and sent_metric(metrics_data) == 0: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(at_least_one_hint_failed, time.time() + 5) + + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [s2], time.time() + 60))[0] + + # Check there is no new data on host2. + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info("Restarting first node with new version") + await set_version(manager, host0, new_version) + await set_fence_version(manager, host0, new_version) + await manager.server_restart(s0.server_id, wait_others=2) + + logger.info(f"Waiting for sent hints on {host0}") + async def exactly_one_hint_sent(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) == 0 and sent_metric(metrics_data) == 1: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(exactly_one_hint_sent, time.time() + 5) + + # Check the hint is delivered, and we see the new data on host2 + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 1 + + random_tables.drop_all()