Files
scylladb/test/cluster/test_raft_snapshot_request.py
Artsiom Mishuta d1198f8318 test.py: rename topology_custom folder to cluster
rename topology_custom folder to cluster
as it contains not only topology test cases
2025-03-04 10:32:44 +01:00

92 lines
3.9 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import time
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.cluster.util import reconnect_driver, trigger_snapshot, get_topology_coordinator, get_raft_log_size, get_raft_snap_id
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_raft_snapshot_request(manager: ManagerClient):
cmdline = [
'--logger-log-level', 'raft=trace',
]
servers = await manager.servers_add(3, cmdline=cmdline)
cql = manager.get_cql()
s1 = servers[0]
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
# Verify that snapshotting updates the snapshot ID and truncates the log.
log_size = await get_raft_log_size(cql, h1)
logger.info(f"Log size on {s1}: {log_size}")
snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"Snapshot ID on {s1}: {snap_id}")
assert log_size > 0
await trigger_snapshot(manager, s1)
new_log_size = await get_raft_log_size(cql, h1)
logger.info(f"New log size on {s1}: {new_log_size}")
new_snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"New snapshot ID on {s1}: {new_snap_id}")
assert new_log_size == 0
assert new_snap_id != snap_id
# If a server misses a command and a snapshot is created on the leader,
# the server once alive should eventually receive that snapshot.
s2 = servers[2]
h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0]
s2_log_size = await get_raft_log_size(cql, h2)
logger.info(f"Log size on {s2}: {s2_log_size}")
s2_snap_id = await get_raft_snap_id(cql, h2)
logger.info(f"Snapshot ID on {s2}: {s2_snap_id}")
await manager.server_stop_gracefully(s2.server_id)
logger.info(f"Stopped {s2}")
# Restarting the two servers will cause a newly elected leader to append a dummy command.
await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2]))
logger.info(f"Restarted {servers[:2]}")
cql = await reconnect_driver(manager)
# Any of the two restarted servers could have become the leader.
# Find the leader and let s1 point to it for simplicity.
leader_host_id = await get_topology_coordinator(manager)
if leader_host_id != await manager.get_host_id(s1.server_id):
s1 = servers[1]
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
# Wait for the leader to append the command.
async def appended_command() -> bool | None:
s = await get_raft_log_size(cql, h1)
return s > 0 or None
await wait_for(appended_command, time.time() + 60)
logger.info(f"{s1} appended new command")
await trigger_snapshot(manager, s1)
snap = await get_raft_snap_id(cql, h1)
logger.info(f"New snapshot ID on {s1}: {snap}")
await manager.server_start(s2.server_id)
logger.info(f"Server {s2} restarted")
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
async def received_snapshot() -> str | None:
new_s2_snap_id = await get_raft_snap_id(cql, h2)
if s2_snap_id != new_s2_snap_id:
return new_s2_snap_id
return None
new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60)
logger.info(f"{s2} received new snapshot: {new_s2_snap_id}")
new_s2_log_size = await get_raft_log_size(cql, h2)
# The log size of s1 may be greater than 0 because topology coordinator fiber
# may start and commit a few entries after the last snapshot was created.
# However, the log size of s1 cannot be greater than the current leader's log
# size. The leader's log was truncated, so the assertion below would fail if
# s2 didn't truncate its log.
current_s1_log_size = await get_raft_log_size(cql, h1)
assert new_s2_log_size <= current_s1_log_size