From f5b41a807526da77049afb1f7ef875d76e25bb52 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 31 Jul 2023 17:41:58 +0400 Subject: [PATCH 01/12] raft topology: don't increment version when transitioning to node_state::normal This version increment in not accompanied by a global_token_metadata_barrier, which means the new version won't be reflected in fence_version and basically will have no effect in terms of fencing. --- service/storage_service.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index e97a60643b..bb65475ce5 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1700,7 +1700,6 @@ class topology_coordinator { case node_state::bootstrapping: { topology_mutation_builder builder(node.guard.write_timestamp()); builder.del_transition_state() - .set_version(_topo_sm._topology.version + 1) .with_node(node.id) .set("node_state", node_state::normal); co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, From 5361de76f985b53ca00276089e4d6ac0be953127 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 11 Aug 2023 17:37:09 +0400 Subject: [PATCH 02/12] random_tables.py: add counter column type We'll need it for fencing test. --- test/pylib/random_tables.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py index ecca26c501..b6bbb620ac 100644 --- a/test/pylib/random_tables.py +++ b/test/pylib/random_tables.py @@ -91,6 +91,14 @@ class UUIDType(ValueType): return uuid.UUID(f"{{00000000-0000-0000-0000-{seed:012}}}") +class CounterType(ValueType): + def __init__(self): + self.name: str = 'counter' + + def val(self, seed: int) -> int: + return seed + + class Column(): """A column definition. If no value type specified it picks a random one. From 360453fd87a48b60e8c599a8794bfa697a9d8540 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 31 Jul 2023 17:47:42 +0400 Subject: [PATCH 03/12] fencing: add simple data plane test The test starts a three node cluster and manually decrements the version on the last node. It then tries to write some data through the last node and expects to get 'stale topology' exception. --- .../test_fencing.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 test/topology_experimental_raft/test_fencing.py diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py new file mode 100644 index 0000000000..3b197baaa3 --- /dev/null +++ b/test/topology_experimental_raft/test_fencing.py @@ -0,0 +1,66 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables, Column, IntType, CounterType +from test.pylib.util import unique_name, wait_for_cql_and_get_hosts +from cassandra import WriteFailure + +import pytest +import logging +import time + + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_fence_writes(request, manager: ManagerClient): + logger.info("Bootstrapping first two nodes") + servers = [await manager.server_add(), await manager.server_add()] + + # The third node is started as the last one, so we can be sure that is has + # the latest topology version + logger.info("Bootstrapping the last node") + servers += [await manager.server_add()] + + logger.info(f'Creating new tables') + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + table1 = await random_tables.add_table(name='t1', pks=1, columns=[ + Column("pk", IntType), + Column('int_c', IntType) + ]) + table2 = await random_tables.add_table(name='t2', pks=1, columns=[ + Column("pk", IntType), + Column('counter_c', CounterType) + ]) + await manager.cql.run_async(f"USE {random_tables.keyspace}") + + logger.info(f'Waiting for cql and hosts') + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] + + version = (await manager.cql.run_async( + "select version from system.topology where key = 'topology'", + host=host2))[0].version + logger.info(f"version on host2 {version}") + + await manager.cql.run_async(f"update system.topology set version={version - 1} where key = 'topology'", + host=host2) + logger.info(f"decremented version on host2") + + await manager.server_restart(servers[2].server_id, wait_others=2) + logger.info(f"host2 restarted") + + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] + + logger.info(f"trying to write through host2 to regular column [{host2}]") + with pytest.raises(WriteFailure, match="stale topology exception"): + await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host2) + + logger.info(f"trying to write through host2 to counter column [{host2}]") + with pytest.raises(WriteFailure, match="stale topology exception"): + await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2) + + random_tables.drop_all() From fa25e6d63ea803e51146a1288f1f9ff969f530c3 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 31 Jul 2023 18:16:21 +0400 Subject: [PATCH 04/12] token_metadata: add debug logs We log the new version when the new token metadata is set. Also, the log for fence_version is moved in shared_token_metadata from storage_service for uniformity. --- locator/token_metadata.cc | 2 ++ service/storage_service.cc | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index fcdb9cc193..f193853774 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1193,6 +1193,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { _shared = std::move(tmptr); _shared->set_version_tracker(_versions_barrier.start()); + tlogger.debug("new token_metadata is set, version {}", _shared->get_version()); } void shared_token_metadata::update_fence_version(token_metadata::version_t version) { @@ -1215,6 +1216,7 @@ void shared_token_metadata::update_fence_version(token_metadata::version_t versi _fence_version, version)); } _fence_version = version; + tlogger.debug("new fence_version is set, version {}", _fence_version); } future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function (token_metadata&)> func) { diff --git a/service/storage_service.cc b/service/storage_service.cc index bb65475ce5..f7cfd532a8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5819,7 +5819,6 @@ future storage_service::raft_topology_cmd_handler(shar future<> storage_service::update_fence_version(token_metadata::version_t new_version) { return container().invoke_on_all([new_version] (storage_service& ss) { - slogger.debug("update_fence_version, version {}", new_version); ss._shared_token_metadata.update_fence_version(new_version); }); } From 1b7603af231804069f46e6e2edf09691ec3800e3 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 14 Aug 2023 13:44:00 +0400 Subject: [PATCH 05/12] hints manager: add send_errors counter There was no indication of problems in the hints manager metrics before. We need this counter for fencing tests in the later commit, but it seems to be useful on its own. --- db/hints/manager.cc | 5 +++++ db/hints/manager.hh | 1 + 2 files changed, 6 insertions(+) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c357c10a36..8739976a4b 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -83,6 +83,9 @@ void manager::register_metrics(const sstring& group_name) { sm::make_counter("discarded", _stats.discarded, sm::description("Number of hints that were discarded during sending (too old, schema changed, etc.).")), + sm::make_counter("send_errors", _stats.send_errors, + sm::description("Number of unexpected errors during sending, sending will be retried later")), + sm::make_counter("corrupted_files", _stats.corrupted_files, sm::description("Number of hints files that were discarded during sending because the file was corrupted.")), @@ -880,6 +883,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().sent; }).handle_exception([this, ctx_ptr] (auto eptr) { manager_logger.trace("send_one_hint(): failed to send to {}: {}", end_point_key(), eptr); + ++this->shard_stats().send_errors; return make_exception_future<>(std::move(eptr)); }); @@ -896,6 +900,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrshard_stats().send_errors; return make_exception_future<>(std::move(eptr)); } return make_ready_future<>(); diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 58c066b6d0..f42f898276 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -74,6 +74,7 @@ private: uint64_t dropped = 0; uint64_t sent = 0; uint64_t discarded = 0; + uint64_t send_errors = 0; uint64_t corrupted_files = 0; }; From 0b7a90dff6a815d170332eec4eb710c2e9423e8f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 14 Aug 2023 14:04:23 +0400 Subject: [PATCH 06/12] pylib: add ScyllaMetrics This patch adds facilities to work with Scylla metrics from test.py tests. The new metrics property was added to ManagerClient, its query method sends a request to Scylla metrics endpoint and returns and object to conveniently access the result. ScyllaMetrics is copy-pasted from test_shedding.py. It's difficult to reuse code between 'new' and 'old' styles of tests, we can't just import pylib in 'old' tests because of some problems with python search directories. A past commit of mine that attempted to solve this problem was rejected on review. --- test/pylib/manager_client.py | 3 ++- test/pylib/rest_client.py | 52 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 231b7b8383..13a4bed7b4 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -12,7 +12,7 @@ from typing import List, Optional, Callable, Any from time import time import logging -from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient +from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient, ScyllaMetricsClient from test.pylib.util import wait_for from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer @@ -43,6 +43,7 @@ class ManagerClient(): # A client for communicating with ScyllaClusterManager (server) self.client = UnixRESTClient(sock_path) self.api = ScyllaRESTAPIClient() + self.metrics = ScyllaMetricsClient() async def stop(self): """Close driver""" diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 6ae2e0d20f..6fac01cea8 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -226,6 +226,58 @@ class ScyllaRESTAPIClient(): await self.client.post(f"/system/logger/{logger}?level={level}", host=node_ip) +class ScyllaMetrics: + def __init__(self, lines: list[str]): + self.lines: list[str] = lines + + def lines_by_prefix(self, prefix: str): + """Returns all metrics whose name starts with a prefix, e.g. + metrics.lines_by_prefix('scylla_hints_manager_') + """ + return [l for l in self.lines if l.startswith(prefix)] + + def get(self, name: str, labels = None, shard: str ='total'): + """Get the metric value by name. Allows to specify additional labels filter, e.g. + metrics.get('scylla_transport_cql_errors_total', {'type': 'protocol_error'}). + If shard is not set, returns the sum of metric values across all shards, + otherwise returns the metric value from the specified shard. + """ + result = None + for l in self.lines: + if not l.startswith(name): + continue + labels_start = l.find('{') + labels_finish = l.find('}') + if labels_start == -1 or labels_finish == -1: + raise ValueError(f'invalid metric format [{l}]') + def match_kv(kv): + key, val = kv.split('=') + val = val.strip('"') + return shard == 'total' or val == shard if key == 'shard' \ + else labels is None or labels.get(key, None) == val + match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(',')) + if match: + value = float(l[labels_finish + 2:]) + if result is None: + result = value + else: + result += value + if shard != 'total': + break + return result + + +class ScyllaMetricsClient: + """Async Scylla Metrics API client""" + + def __init__(self, port: int = 9180): + self.client = TCPRESTClient(port) + + async def query(self, server_ip: IPAddress) -> ScyllaMetrics: + data = await self.client.get_text('/metrics', host=server_ip) + return ScyllaMetrics(data.split('\n')) + + class InjectionHandler(): """An async client for communicating with injected code by REST API""" def __init__(self, api: ScyllaRESTAPIClient, injection: str, node_ip: str): From 9fd3df13a283eb4f200b94ea0817480bd7f8f695 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Wed, 16 Aug 2023 15:58:11 +0400 Subject: [PATCH 07/12] hints: send_one_hint: extend the scope of file_send_gate holder The problem was that the holder in with_gate call was released too early. This happened before the possible call to on_hint_send_failure in then_wrapped. As a result, the effects of on_hint_send_failure (segment_replay_failed flag) were not visible in send_one_file after ctx_ptr->file_send_gate.close(), so we could decide that the segment was sent in full and delete it even if sending of some hints led to errors. Fixes #15110 --- db/hints/manager.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 8739976a4b..afb82aef56 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -866,7 +866,8 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptrmark_hint_as_in_progress(rp); // Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`). - (void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { + auto h = ctx_ptr->file_send_gate.hold(); + (void)std::invoke([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable { try { auto m = this->get_mutation(ctx_ptr, buf); gc_clock::duration gc_grace_sec = m.s->gc_grace_seconds(); @@ -904,7 +905,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr(std::move(eptr)); } return make_ready_future<>(); - }).then_wrapped([this, units = std::move(units), rp, ctx_ptr] (future<>&& f) { + }).then_wrapped([this, units = std::move(units), rp, ctx_ptr, h = std::move(h)] (future<>&& f) { // Information about the error was already printed somewhere higher. // We just need to account in the ctx that sending of this hint has failed. if (!f.failed()) { From 439c91851f1c6e2c38e91b402c405fdbb52eb35f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Wed, 16 Aug 2023 14:51:32 +0400 Subject: [PATCH 08/12] hints: add debug log for dropped hints Dropping data is rather important event, let's log it at least at the debug level. It'll help in debugging tests. --- db/hints/manager.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index afb82aef56..e413d0aa2c 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -876,7 +876,10 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr gc_grace_sec - manager::hints_flush_period) { + if (const auto now = gc_clock::now().time_since_epoch(); now - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) { + manager_logger.debug("send_hints(): the hint is too old, skipping it, " + "secs since file last modification {}, gc_grace_sec {}, hints_flush_period {}", + now - secs_since_file_mod, gc_grace_sec, manager::hints_flush_period); return make_ready_future<>(); } From a639d161e64ab845f0e85adaa176b52095dae7f6 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 18 Aug 2023 00:39:32 +0400 Subject: [PATCH 09/12] test.py: add mode fixture Sometimes a test wants to know what mode it is running in so that e.g. it can skip itself in some of them. --- test.py | 1 + test/topology/conftest.py | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/test.py b/test.py index c7e90c3eb8..84fd02bd89 100755 --- a/test.py +++ b/test.py @@ -928,6 +928,7 @@ class TopologyTest(PythonTest): test_path = os.path.join(self.suite.options.tmpdir, self.mode) async with get_cluster_manager(self.mode + '/' + self.uname, self.suite.clusters, test_path) as manager: + self.args.insert(0, "--mode={}".format(self.mode)) self.args.insert(0, "--manager-api={}".format(manager.sock_path)) try: diff --git a/test/topology/conftest.py b/test/topology/conftest.py index 8385e7080f..392bfc005b 100644 --- a/test/topology/conftest.py +++ b/test/topology/conftest.py @@ -35,6 +35,8 @@ print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}") def pytest_addoption(parser): parser.addoption('--manager-api', action='store', required=True, help='Manager unix socket path') + parser.addoption('--mode', action='store', required=True, + help='Scylla build mode. Tests can use it to adjust their behavior.') parser.addoption('--host', action='store', default='localhost', help='CQL server host to connect to') parser.addoption('--port', action='store', default='9042', @@ -190,3 +192,7 @@ async def random_tables(request, manager): failed = request.node.stash[FAILED_KEY] if not failed and not await manager.is_dirty(): tables.drop_all() + +@pytest.fixture(scope="function") +def mode(request): + return request.config.getoption('mode') From c434d26b36c97d925fe182e0de191a15ee507ee9 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 17 Aug 2023 16:17:24 +0400 Subject: [PATCH 10/12] test.py: add skip_mode decorator and fixture Syntactic sugar for marking tests to be skipped in a particular mode. There is skip_in_debug/skip_in_release in suite.yaml, but they can be applied only on the entire file, which is unnatural and inconvenient. Also, they don't allow to specify a reason why the test is skipped. Separate dictionary skipped_funcs is needed since we can't use pytest fixtures in decorators. --- test/topology/conftest.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/topology/conftest.py b/test/topology/conftest.py index 392bfc005b..92db2cd1bf 100644 --- a/test/topology/conftest.py +++ b/test/topology/conftest.py @@ -196,3 +196,16 @@ async def random_tables(request, manager): @pytest.fixture(scope="function") def mode(request): return request.config.getoption('mode') + +skipped_funcs = {} +def skip_mode(mode: str, reason: str): + def wrap(func): + skipped_funcs[(func, mode)] = reason + return func + return wrap + +@pytest.fixture(scope="function", autouse=True) +def skip_mode_fixture(request, mode): + skip_reason = skipped_funcs.get((request.function, mode)) + if skip_reason is not None: + pytest.skip(f'{request.node.name} skipped, reason: {skip_reason}') From 3ccd2abad4dd419f6206450a1201da522cc9bcef Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 17 Aug 2023 16:22:06 +0400 Subject: [PATCH 11/12] test.py: output the skipped tests pytest option -rs forces it to print all the skipped tests along with the reasons. Without this option we can't tell why certain tests were skipped, maybe some of them shouldn't already. --- test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test.py b/test.py index 84fd02bd89..7598e8b228 100755 --- a/test.py +++ b/test.py @@ -853,7 +853,8 @@ class PythonTest(Test): "--log-level=DEBUG", # Capture logs "-o", "junit_family=xunit2", - "--junit-xml={}".format(self.xmlout)] + "--junit-xml={}".format(self.xmlout), + "-rs"] if options.markers: self.args.append(f"-m={options.markers}") From 1ddc76ffd1e1ea331de6dc5ea40099bc596329fe Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 14 Aug 2023 14:20:50 +0400 Subject: [PATCH 12/12] test_fencing: add test_fence_hints The test makes a write through the first node with the third node down, this causes a hint to be stored on the first node for the second. We increment the version and fence_version on the third node, restart it, and expect to see a hint delivery failure because of versions mismatch. Then we update the versions of the first node and expect hint to be successfully delivered. --- .../test_fencing.py | 126 ++++++++++++++++-- 1 file changed, 118 insertions(+), 8 deletions(-) diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py index 3b197baaa3..bd901eda5f 100644 --- a/test/topology_experimental_raft/test_fencing.py +++ b/test/topology_experimental_raft/test_fencing.py @@ -5,9 +5,13 @@ # from test.pylib.manager_client import ManagerClient from test.pylib.random_tables import RandomTables, Column, IntType, CounterType -from test.pylib.util import unique_name, wait_for_cql_and_get_hosts -from cassandra import WriteFailure - +from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for +from cassandra import WriteFailure, ConsistencyLevel +from test.pylib.internal_types import ServerInfo +from test.pylib.rest_client import ScyllaMetrics +from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module +from cassandra.query import SimpleStatement +from test.topology.conftest import skip_mode import pytest import logging import time @@ -16,6 +20,40 @@ import time logger = logging.getLogger(__name__) +def host_by_server(hosts: list[Host], srv: ServerInfo): + for h in hosts: + if h.address == srv.ip_addr: + return h + raise ValueError(f"can't find host for server {srv}") + + +async def set_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.topology set version=%s where key = 'topology'", + parameters=[new_version], + host=host) + + +async def set_fence_version(manager: ManagerClient, host: Host, new_version: int): + await manager.cql.run_async("update system.scylla_local set value=%s where key = 'topology_fence_version'", + parameters=[str(new_version)], + host=host) + + +async def get_version(manager: ManagerClient, host: Host): + rows = await manager.cql.run_async( + "select version from system.topology where key = 'topology'", + host=host) + return rows[0].version + + +def send_errors_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_send_errors') + + +def sent_metric(metrics: ScyllaMetrics): + return metrics.get('scylla_hints_manager_sent') + + @pytest.mark.asyncio async def test_fence_writes(request, manager: ManagerClient): logger.info("Bootstrapping first two nodes") @@ -41,13 +79,10 @@ async def test_fence_writes(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') host2 = (await wait_for_cql_and_get_hosts(manager.cql, [servers[2]], time.time() + 60))[0] - version = (await manager.cql.run_async( - "select version from system.topology where key = 'topology'", - host=host2))[0].version + version = await get_version(manager, host2) logger.info(f"version on host2 {version}") - await manager.cql.run_async(f"update system.topology set version={version - 1} where key = 'topology'", - host=host2) + await set_version(manager, host2, version - 1) logger.info(f"decremented version on host2") await manager.server_restart(servers[2].server_id, wait_others=2) @@ -64,3 +99,78 @@ async def test_fence_writes(request, manager: ManagerClient): await manager.cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2) random_tables.drop_all() + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_fence_hints(request, manager: ManagerClient): + logger.info("Bootstrapping cluster with three nodes") + s0 = await manager.server_add(config={ + 'error_injections_at_startup': ['decrease_hints_flush_period'] + }) + s1 = await manager.server_add() + s2 = await manager.server_add() + + logger.info(f'Creating test table') + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + table1 = await random_tables.add_table(name='t1', pks=1, columns=[ + Column("pk", IntType), + Column('int_c', IntType) + ]) + await manager.cql.run_async(f"USE {random_tables.keyspace}") + + logger.info(f'Waiting for cql and hosts') + hosts = await wait_for_cql_and_get_hosts(manager.cql, [s0, s2], time.time() + 60) + + host2 = host_by_server(hosts, s2) + new_version = (await get_version(manager, host2)) + 1 + logger.info(f"Set version and fence_version to {new_version} on node {host2}") + await set_version(manager, host2, new_version) + await set_fence_version(manager, host2, new_version) + + select_all_stmt = SimpleStatement("select * from t1", consistency_level=ConsistencyLevel.ONE) + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info(f"Stopping node {host2}") + await manager.server_stop_gracefully(s2.server_id) + + host0 = host_by_server(hosts, s0) + logger.info(f"Writing through {host0} to regular column") + await manager.cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host0) + + logger.info(f"Starting last node {host2}") + await manager.server_start(s2.server_id) + + logger.info(f"Waiting for failed hints on {host0}") + async def at_least_one_hint_failed(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) >= 1 and sent_metric(metrics_data) == 0: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(at_least_one_hint_failed, time.time() + 5) + + host2 = (await wait_for_cql_and_get_hosts(manager.cql, [s2], time.time() + 60))[0] + + # Check there is no new data on host2. + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 0 + + logger.info("Restarting first node with new version") + await set_version(manager, host0, new_version) + await set_fence_version(manager, host0, new_version) + await manager.server_restart(s0.server_id, wait_others=2) + + logger.info(f"Waiting for sent hints on {host0}") + async def exactly_one_hint_sent(): + metrics_data = await manager.metrics.query(s0.ip_addr) + if send_errors_metric(metrics_data) == 0 and sent_metric(metrics_data) == 1: + return True + logger.info(f"Metrics on {s0}: {metrics_data.lines_by_prefix('scylla_hints_manager_')}") + await wait_for(exactly_one_hint_sent, time.time() + 5) + + # Check the hint is delivered, and we see the new data on host2 + rows = await manager.cql.run_async(select_all_stmt, host=host2) + assert len(list(rows)) == 1 + + random_tables.drop_all()