Files
scylladb/test/cluster/test_cluster_features.py
Emil Maskovsky 08518b2c12 test/raft: fix test_joining_old_node_fails flakiness
When a node without the required feature attempts to join a Raft-based
cluster with the feature enabled, there is a race between the join
rejection response ("Feature check failed") and the ban notification
("received notification of being banned"). Depending on timing, either
message may appear in the joining node's log.

This starts to happen after 39cec4a (which introduced informing the
nodes about being banned).

Updated the test to accept both error messages as valid, making the test
robust against this race condition, which is more likely in debug mode
or under slow execution.

Fixes: scylladb/scylladb#27603

No backport: This failure is only present in master.

Closes scylladb/scylladb#27760
2025-12-19 09:44:09 +02:00

212 lines
10 KiB
Python

#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Tests the cluster feature functionality.
"""
import logging
import asyncio
import time
from typing import Set, Optional, List
from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature, get_supported_features, get_enabled_features
from test.cluster.util import reconnect_driver
import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_racks_cluster
TEST_FEATURE_NAME = "TEST_ONLY_FEATURE"
TEST_FEATURE_ENABLE_ERROR_INJECTION = "features_enable_test_feature"
ERROR_INJECTIONS_AT_STARTUP_CONFIG_KEY = "error_injections_at_startup"
async def get_error_injections_enabled_at_startup(manager: ManagerClient, srv: ServerInfo) -> Set[str]:
# TODO: An "error injection enabled at startup" might not be a string, but a dictionary
# with some options. The tests in this module only use strings, but it's worth
# keeping that in mind here in case dicts start being used.
config = await manager.server_get_config(srv.server_id)
injections = config.get(ERROR_INJECTIONS_AT_STARTUP_CONFIG_KEY) or []
assert isinstance(injections, list)
assert all(isinstance(inj, str) for inj in injections)
return set(injections)
async def change_support_for_test_feature_and_restart(manager: ManagerClient, srvs: [ServerInfo], enable: bool, expected_error: Optional[str] = None) -> None:
"""Stops all provided nodes, changes their support for test-only feature
and restarts them.
"""
logging.info(f"Reconfiguring and restarting nodes {srvs} to {'enable' if enable else 'disable'} support for {TEST_FEATURE_NAME}")
async def adjust_feature_in_config(manager: ManagerClient, srv: ServerInfo, enable: bool):
injections = await get_error_injections_enabled_at_startup(manager, srv)
if enable:
injections.add(TEST_FEATURE_ENABLE_ERROR_INJECTION)
else:
injections.remove(TEST_FEATURE_ENABLE_ERROR_INJECTION)
await manager.server_update_config(srv.server_id, ERROR_INJECTIONS_AT_STARTUP_CONFIG_KEY, list(injections))
await asyncio.gather(*(manager.server_stop(srv.server_id) for srv in srvs))
await asyncio.gather(*(adjust_feature_in_config(manager, srv, enable) for srv in srvs))
await asyncio.gather(*(manager.server_start(srv.server_id, expected_error) for srv in srvs))
@pytest.mark.asyncio
async def test_rolling_upgrade_happy_path(manager: ManagerClient) -> None:
"""Simulates an upgrade of a cluster by doing a rolling restart
and marking the test-only feature as supported on restarted nodes.
"""
servers = await manager.running_servers()
cql = manager.cql
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
for srv in servers:
host = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
# The feature should not be advertised as supported until we enable it
# via error injection.
assert TEST_FEATURE_NAME not in await get_supported_features(cql, host)
# Until all nodes are updated to support the test feature, that feature
# should not be considered enabled by any node.
for host in hosts:
assert TEST_FEATURE_NAME not in await get_enabled_features(cql, host)
await change_support_for_test_feature_and_restart(manager, [srv], enable=True)
host = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
assert TEST_FEATURE_NAME in await get_supported_features(cql, host)
# The feature should become enabled on all nodes soon.
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Waiting until {TEST_FEATURE_NAME} is enabled on all nodes")
await asyncio.gather(*(wait_for_feature(TEST_FEATURE_NAME, cql, h, time.time() + 60) for h in hosts))
@pytest.mark.asyncio
async def test_downgrade_after_partial_upgrade(manager: ManagerClient) -> None:
"""Simulates a partial upgrade of a cluster by enabling the test features
in all nodes but one, then downgrading the upgraded nodes.
"""
servers = await manager.running_servers()
upgrading_servers = servers[1:] if len(servers) > 0 else []
cql = manager.cql
# Upgrade
for srv in upgrading_servers:
await change_support_for_test_feature_and_restart(manager, [srv], enable=True)
host = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
assert TEST_FEATURE_NAME in await get_supported_features(cql, host)
# There is one node that is not upgraded. The feature should not be enabled.
for srv in servers:
assert TEST_FEATURE_NAME not in await get_enabled_features(cql, host)
# Downgrade, in reverse order
for srv in upgrading_servers[::-1]:
await change_support_for_test_feature_and_restart(manager, [srv], enable=False)
host = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
assert TEST_FEATURE_NAME not in await get_supported_features(cql, host)
@pytest.mark.asyncio
async def test_joining_old_node_fails(manager: ManagerClient) -> None:
"""Upgrades the cluster to enable a new feature. Then, it first tries to
add a new node without the feature, and then replace an existing node
with a new node without the feature. The new node should fail to join
the cluster in both cases
"""
# Restart all servers at once and mark the test feature as supported
servers = await manager.running_servers()
await change_support_for_test_feature_and_restart(manager, servers, enable=True)
await manager.servers_see_each_other(servers)
# Workaround for scylladb/python-driver#230 - the driver might not
# reconnect after all nodes are stopped at once.
cql = await reconnect_driver(manager)
# Wait until the feature is considered enabled by all nodes
cql = manager.cql
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Waiting until {TEST_FEATURE_NAME} is enabled on all nodes")
await asyncio.gather(*(wait_for_feature(TEST_FEATURE_NAME, cql, h, time.time() + 60) for h in hosts))
# Try to add a node that doesn't support the feature - should fail
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
# Try to replace with a node that doesn't support the feature - should fail
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
@pytest.mark.asyncio
async def test_downgrade_after_successful_upgrade_fails(manager: ManagerClient) -> None:
"""Upgrades the cluster to enable the test feature. Then, shuts down all nodes,
disables support for the feature, then restarts all nodes. All nodes
should fail to start as none of them support the test feature.
"""
# Restart all servers at once and mark the test feature as supported
servers = await manager.running_servers()
await change_support_for_test_feature_and_restart(manager, servers, enable=True)
# Workaround for scylladb/python-driver#230 - the driver might not
# reconnect after all nodes are stopped at once.
cql = await reconnect_driver(manager)
# Wait until the feature is considered enabled by all nodes
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Waiting until {TEST_FEATURE_NAME} is enabled on all nodes")
await asyncio.gather(*(wait_for_feature(TEST_FEATURE_NAME, cql, h, time.time() + 60) for h in hosts))
# Stop all servers to disable the feature, then restart - all should fail
await change_support_for_test_feature_and_restart(manager, servers, enable=False, \
expected_error=f"Feature '{TEST_FEATURE_NAME}' was previously enabled in the cluster")
@pytest.mark.skip(reason="issue #14194")
@pytest.mark.asyncio
async def test_partial_upgrade_can_be_finished_with_removenode(manager: ManagerClient) -> None:
"""Upgrades all but one node in the cluster to enable the test feature.
Then, the last one is shut down and removed via `nodetool removenode`.
After that, the test only feature should be enabled on all nodes.
"""
# Restart all servers but the last one at once and mark the test feature as supported
servers = await manager.running_servers()
await change_support_for_test_feature_and_restart(manager, servers[:-1], enable=True)
# All servers should see each other as alive
for srv in servers:
await manager.server_sees_others(srv.server_id, len(servers) - 1)
# The feature should not be enabled yet on any node
cql = manager.get_cql()
for srv in servers:
host = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
assert TEST_FEATURE_NAME not in await get_enabled_features(cql, host)
# Remove the last node
await manager.server_stop(servers[-1].server_id)
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
# The feature should eventually become enabled
hosts = await wait_for_cql_and_get_hosts(cql, servers[:-1], time.time() + 60)
logging.info(f"Waiting until {TEST_FEATURE_NAME} is enabled on all nodes")
await asyncio.gather(*(wait_for_feature(TEST_FEATURE_NAME, cql, h, time.time() + 300) for h in hosts))