92 lines
3.9 KiB
Python
92 lines
3.9 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import asyncio
|
|
import pytest
|
|
import time
|
|
import logging
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
|
from test.cluster.util import reconnect_driver, trigger_snapshot, get_topology_coordinator, get_raft_log_size, get_raft_snap_id
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_raft_snapshot_request(manager: ManagerClient):
|
|
cmdline = [
|
|
'--logger-log-level', 'raft=trace',
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline)
|
|
cql = manager.get_cql()
|
|
|
|
s1 = servers[0]
|
|
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
|
|
|
|
# Verify that snapshotting updates the snapshot ID and truncates the log.
|
|
log_size = await get_raft_log_size(cql, h1)
|
|
logger.info(f"Log size on {s1}: {log_size}")
|
|
snap_id = await get_raft_snap_id(cql, h1)
|
|
logger.info(f"Snapshot ID on {s1}: {snap_id}")
|
|
assert log_size > 0
|
|
await trigger_snapshot(manager, s1)
|
|
new_log_size = await get_raft_log_size(cql, h1)
|
|
logger.info(f"New log size on {s1}: {new_log_size}")
|
|
new_snap_id = await get_raft_snap_id(cql, h1)
|
|
logger.info(f"New snapshot ID on {s1}: {new_snap_id}")
|
|
assert new_log_size == 0
|
|
assert new_snap_id != snap_id
|
|
|
|
# If a server misses a command and a snapshot is created on the leader,
|
|
# the server once alive should eventually receive that snapshot.
|
|
s2 = servers[2]
|
|
h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0]
|
|
s2_log_size = await get_raft_log_size(cql, h2)
|
|
logger.info(f"Log size on {s2}: {s2_log_size}")
|
|
s2_snap_id = await get_raft_snap_id(cql, h2)
|
|
logger.info(f"Snapshot ID on {s2}: {s2_snap_id}")
|
|
await manager.server_stop_gracefully(s2.server_id)
|
|
logger.info(f"Stopped {s2}")
|
|
# Restarting the two servers will cause a newly elected leader to append a dummy command.
|
|
await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2]))
|
|
logger.info(f"Restarted {servers[:2]}")
|
|
cql = await reconnect_driver(manager)
|
|
# Any of the two restarted servers could have become the leader.
|
|
# Find the leader and let s1 point to it for simplicity.
|
|
leader_host_id = await get_topology_coordinator(manager)
|
|
if leader_host_id != await manager.get_host_id(s1.server_id):
|
|
s1 = servers[1]
|
|
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
|
|
# Wait for the leader to append the command.
|
|
async def appended_command() -> bool | None:
|
|
s = await get_raft_log_size(cql, h1)
|
|
return s > 0 or None
|
|
await wait_for(appended_command, time.time() + 60)
|
|
logger.info(f"{s1} appended new command")
|
|
await trigger_snapshot(manager, s1)
|
|
snap = await get_raft_snap_id(cql, h1)
|
|
logger.info(f"New snapshot ID on {s1}: {snap}")
|
|
await manager.server_start(s2.server_id)
|
|
logger.info(f"Server {s2} restarted")
|
|
cql = await reconnect_driver(manager)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
async def received_snapshot() -> str | None:
|
|
new_s2_snap_id = await get_raft_snap_id(cql, h2)
|
|
if s2_snap_id != new_s2_snap_id:
|
|
return new_s2_snap_id
|
|
return None
|
|
new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60)
|
|
logger.info(f"{s2} received new snapshot: {new_s2_snap_id}")
|
|
new_s2_log_size = await get_raft_log_size(cql, h2)
|
|
# The log size of s1 may be greater than 0 because topology coordinator fiber
|
|
# may start and commit a few entries after the last snapshot was created.
|
|
# However, the log size of s1 cannot be greater than the current leader's log
|
|
# size. The leader's log was truncated, so the assertion below would fail if
|
|
# s2 didn't truncate its log.
|
|
current_s1_log_size = await get_raft_log_size(cql, h1)
|
|
assert new_s2_log_size <= current_s1_log_size
|