Files
scylladb/test/cluster/test_maintenance_mode.py
Dawid Mędrek b357c8278f test/cluster/test_maintenance_mode.py: Wait for initialization
If we try to perform queries too early, before the call to
`storage_service::start_maintenance_mode` has finished, we will
fail with the following error:

```
ERROR 2025-11-12 20:32:27,064 [shard 0:sl:d] token_metadata - sorted_tokens is empty in first_token_index!
```

To avoid that, we should wait until initialization is complete.
2025-11-13 11:07:45 +01:00

105 lines
4.7 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from cassandra.protocol import ConfigurationException
from cassandra.connection import UnixSocketEndPoint
from cassandra.policies import WhiteListRoundRobinPolicy
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import cluster_con
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import new_test_keyspace
import pytest
import logging
import socket
import time
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_maintenance_mode(manager: ManagerClient):
"""
The test checks that in maintenance mode server A is not available for other nodes and for clients.
It is possible to connect by the maintenance socket to server A and perform local CQL operations.
"""
server_a, server_b = await manager.server_add(), await manager.server_add()
socket_endpoint = UnixSocketEndPoint(await manager.server_get_maintenance_socket_path(server_a.server_id))
cluster = cluster_con([server_b.ip_addr])
cql = cluster.connect()
async with new_test_keyspace(manager, "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (k int PRIMARY KEY, v int)")
# Token ranges of the server A
# [(start_token, end_token)]
ranges = [(int(row[0]), int(row[1])) for row in await cql.run_async(f"""SELECT start_token, end_token, endpoint
FROM system.token_ring WHERE keyspace_name = '{ks}'
AND endpoint = '{server_a.ip_addr}' ALLOW FILTERING""")]
# Insert data to the cluster and find a key that is stored on server A.
for i in range(256):
await cql.run_async(f"INSERT INTO {table} (k, v) VALUES ({i}, {i})")
# [(key, token of this key)]
keys_with_tokens = [(int(row[0]), int(row[1])) for row in await cql.run_async(f"SELECT k, token(k) FROM {table}")]
key_on_server_a = None
for key, token in keys_with_tokens:
for start, end in ranges:
if (start < end and start < token <= end) or (start >= end and (token <= end or start < token)):
key_on_server_a = key
if key_on_server_a is None:
# There is only a chance ~(1/2)^256 that all keys are stored on the server B
# In this case we skip the test
pytest.skip("All keys are stored on the server B")
# Start server A in maintenance mode
await manager.server_stop_gracefully(server_a.server_id)
await manager.server_update_config(server_a.server_id, "maintenance_mode", "true")
await manager.server_start(server_a.server_id)
log = await manager.server_open_log(server_a.server_id)
await log.wait_for(r"initialization completed \(maintenance mode\)")
# Check that the regular CQL port is not available
assert socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect_ex((server_a.ip_addr, 9042)) != 0
maintenance_cluster = cluster_con([socket_endpoint],
load_balancing_policy=WhiteListRoundRobinPolicy([socket_endpoint]))
maintenance_cql = maintenance_cluster.connect()
# Check that local data is available in maintenance mode
res = await maintenance_cql.run_async(f"SELECT v FROM {table} WHERE k = {key_on_server_a}")
assert res[0][0] == key_on_server_a
# Check that group0 operations are disabled
with pytest.raises(ConfigurationException):
await maintenance_cql.run_async(f"CREATE TABLE {ks}.t2 (k int PRIMARY KEY, v int)")
await maintenance_cql.run_async(f"UPDATE {table} SET v = {key_on_server_a + 1} WHERE k = {key_on_server_a}")
# Ensure that server B recognizes server A as being shutdown, not as being alive.
res = await cql.run_async(f"SELECT status FROM system.cluster_status WHERE peer = '{server_a.ip_addr}'")
assert res[0][0] == "shutdown"
await manager.server_stop_gracefully(server_a.server_id)
# Restart in normal mode to see if the changes made in maintenance mode are persisted
await manager.server_update_config(server_a.server_id, "maintenance_mode", False)
await manager.server_start(server_a.server_id, wait_others=1)
await wait_for_cql_and_get_hosts(cql, [server_a], time.time() + 60)
await manager.servers_see_each_other([server_a, server_b])
res = await cql.run_async(f"SELECT v FROM {table} WHERE k = {key_on_server_a}")
assert res[0][0] == key_on_server_a + 1