Merge 'Backport 5.2 test.py stability/UX improvemenets' from Kamil Braun

Backport the following improvements for test.py topology tests for CI stability:
- https://github.com/scylladb/scylladb/pull/12652
- https://github.com/scylladb/scylladb/pull/12630
- https://github.com/scylladb/scylladb/pull/12619
- https://github.com/scylladb/scylladb/pull/12686
- picked from https://github.com/scylladb/scylladb/pull/12726: 9ceb6aba81
- picked from https://github.com/scylladb/scylladb/pull/12173: fc60484422
- https://github.com/scylladb/scylladb/pull/12765
- https://github.com/scylladb/scylladb/pull/12804
- https://github.com/scylladb/scylladb/pull/13342
- https://github.com/scylladb/scylladb/pull/13589
- picked from https://github.com/scylladb/scylladb/pull/13135: 7309a1bd6b
- picked from https://github.com/scylladb/scylladb/pull/13134: 21b505e67c, a4411e9ec4, c1d0ee2bce, 8e3392c64f, 794d0e4000, e407956e9f
- https://github.com/scylladb/scylladb/pull/13271
- https://github.com/scylladb/scylladb/pull/13399
- picked from https://github.com/scylladb/scylladb/pull/12699: 3508a4e41e, 08d754e13f, 62a945ccd5, 041ee3ffdd
- https://github.com/scylladb/scylladb/pull/13438 (but skipped the test_mutation_schema_change.py fix since I didn't backport this new test)
- https://github.com/scylladb/scylladb/pull/13427
- https://github.com/scylladb/scylladb/pull/13756
- https://github.com/scylladb/scylladb/pull/13789
- https://github.com/scylladb/scylladb/pull/13933 (but skipped the test_snapshot.py fix since I didn't backport this new test)

Closes #14215

* github.com:scylladb/scylladb:
  test: pylib: fix `read_barrier` implementation
  test: pylib: random_tables: perform read barrier in `verify_schema`
  test: issue a read barrier before checking ring consistency
  Merge 'scylla_cluster.py: fix read_last_line' from Gusev Petr
  test/pylib: ManagerClient helpers to wait for...
  test: pylib: Add a way to create cql connections with particular coordinators
  test/pylib: get gossiper alive endpoints
  test/topology: default replication factor 3
  test/pylib: configurable replication factor
  scylla_cluster.py: optimize node logs reading
  test/pylib: RandomTables.add_column with value column
  scylla_cluster.py: add start flag to server_add
  ServerInfo: drop host_id
  scylla_cluster.py: add config to server_add
  scylla_cluster.py: add expected_error to server_start
  scylla_cluster.py: ScyllaServer.start, refactor error reporting
  scylla_cluster.py: fix ScyllaServer.start, reset cmd if start failed
  test: improve logging in ScyllaCluster
  test: topology smp test with custom cluster
  test/pylib: topology: support clusters of initial size 0
  Merge 'test/pylib: split and refactor topology tests' from Alecco
  Merge 'test/pylib: use larger timeout for decommission/removenode' from Kamil Braun
  test: Increase START_TIMEOUT
  test/pylib: one-shot error injection helper
  test: topology: wait for token ring/group 0 consistency after decommission
  test: topology: verify that group 0 and token ring are consistent
  Merge 'pytest: start after ungraceful stop' from Alecco
  Merge 'test.py: improve test failure handling' from Kamil Braun
This commit is contained in:
Botond Dénes
2023-06-15 07:19:39 +03:00
27 changed files with 944 additions and 529 deletions

11
test.py
View File

@@ -696,7 +696,7 @@ class CQLApprovalTest(Test):
logger.info("Server log:\n%s", self.server_log)
# TODO: consider dirty_on_exception=True
async with self.suite.clusters.instance(False, logger) as cluster:
async with (cm := self.suite.clusters.instance(False, logger)) as cluster:
try:
cluster.before_test(self.uname)
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
@@ -706,7 +706,8 @@ class CQLApprovalTest(Test):
self.is_before_test_ok = True
cluster.take_log_savepoint()
self.is_executed_ok = await run_test(self, options, env=self.env)
cluster.after_test(self.uname)
cluster.after_test(self.uname, self.is_executed_ok)
cm.dirty = cluster.is_dirty
self.is_after_test_ok = True
if self.is_executed_ok is False:
@@ -860,7 +861,7 @@ class PythonTest(Test):
self.is_before_test_ok = True
cluster.take_log_savepoint()
status = await run_test(self, options)
cluster.after_test(self.uname)
cluster.after_test(self.uname, status)
self.is_after_test_ok = True
self.success = status
except Exception as e:
@@ -874,9 +875,7 @@ class PythonTest(Test):
print("Test {} post-check failed: {}".format(self.name, str(e)))
print("Server log of the first server:\n{}".format(self.server_log))
logger.info(f"Discarding cluster after failed test %s...", self.name)
await self.suite.clusters.put(cluster, is_dirty=True)
else:
await self.suite.clusters.put(cluster, is_dirty=False)
await self.suite.clusters.put(cluster, is_dirty=cluster.is_dirty)
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
return self

View File

@@ -18,6 +18,5 @@ class ServerInfo(NamedTuple):
"""Server id (test local) and IP address"""
server_id: ServerNum
ip_addr: IPAddress
host_id: HostID
def __str__(self):
return f"Server({self.server_id}, {self.ip_addr}, {self.host_id})"
return f"Server({self.server_id}, {self.ip_addr})"

View File

@@ -15,7 +15,7 @@ import logging
from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient
from test.pylib.util import wait_for
from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer
from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module
import aiohttp
@@ -48,10 +48,11 @@ class ManagerClient():
"""Close driver"""
self.driver_close()
async def driver_connect(self) -> None:
async def driver_connect(self, server: Optional[ServerInfo] = None) -> None:
"""Connect to cluster"""
if self.con_gen is not None:
servers = [s_info.ip_addr for s_info in await self.running_servers()]
targets = [server] if server else await self.running_servers()
servers = [s_info.ip_addr for s_info in targets]
logger.debug("driver connecting to %s", servers)
self.ccluster = self.con_gen(servers, self.port, self.use_ssl)
self.cql = self.ccluster.connect()
@@ -81,15 +82,17 @@ class ManagerClient():
logger.info(f"Using cluster: {cluster_str} for test {test_case_name}")
except aiohttp.ClientError as exc:
raise RuntimeError(f"Failed before test check {exc}") from exc
if self.cql is None:
servers = await self.running_servers()
if self.cql is None and servers:
# TODO: if cluster is not up yet due to taking long and HTTP timeout, wait for it
# await self._wait_for_cluster()
await self.driver_connect() # Connect driver to new cluster
async def after_test(self, test_case_name: str) -> None:
async def after_test(self, test_case_name: str, success: bool) -> None:
"""Tell harness this test finished"""
logger.debug("after_test for %s", test_case_name)
await self.client.get(f"/cluster/after-test")
logger.debug("after_test for %s (success: %s)", test_case_name, success)
cluster_str = await self.client.get_text(f"/cluster/after-test/{success}")
logger.info("Cluster after test %s: %s", test_case_name, cluster_str)
async def is_manager_up(self) -> bool:
"""Check if Manager server is up"""
@@ -118,7 +121,7 @@ class ManagerClient():
except RuntimeError as exc:
raise Exception("Failed to get list of running servers") from exc
assert isinstance(server_info_list, list), "running_servers got unknown data type"
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), HostID(info[2]))
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]))
for info in server_info_list]
async def mark_dirty(self) -> None:
@@ -136,37 +139,47 @@ class ManagerClient():
logger.debug("ManagerClient stopping gracefully %s", server_id)
await self.client.get_text(f"/cluster/server/{server_id}/stop_gracefully")
async def server_start(self, server_id: ServerNum) -> None:
"""Start specified server"""
async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None,
wait_others: int = 0, wait_interval: float = 45) -> None:
"""Start specified server and optionally wait for it to learn of other servers"""
logger.debug("ManagerClient starting %s", server_id)
await self.client.get_text(f"/cluster/server/{server_id}/start")
params = {'expected_error': expected_error} if expected_error is not None else None
await self.client.get_text(f"/cluster/server/{server_id}/start", params=params)
await self.server_sees_others(server_id, wait_others, interval = wait_interval)
self._driver_update()
async def server_restart(self, server_id: ServerNum) -> None:
"""Restart specified server"""
async def server_restart(self, server_id: ServerNum, wait_others: int = 0,
wait_interval: float = 45) -> None:
"""Restart specified server and optionally wait for it to learn of other servers"""
logger.debug("ManagerClient restarting %s", server_id)
await self.client.get_text(f"/cluster/server/{server_id}/restart")
await self.server_sees_others(server_id, wait_others, interval = wait_interval)
self._driver_update()
async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None) -> ServerInfo:
async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo:
"""Add a new server"""
try:
data: dict[str, Any] = {}
data: dict[str, Any] = {'start': start}
if replace_cfg:
data['replace_cfg'] = replace_cfg._asdict()
if cmdline:
data['cmdline'] = cmdline
server_info = await self.client.put_json("/cluster/addserver", data, response_type="json")
if config:
data['config'] = config
server_info = await self.client.put_json("/cluster/addserver", data, response_type="json",
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
except Exception as exc:
raise Exception("Failed to add server") from exc
try:
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]),
HostID(server_info["host_id"]))
IPAddress(server_info["ip_addr"]))
except Exception as exc:
raise RuntimeError(f"server_add got invalid server data {server_info}") from exc
logger.debug("ManagerClient added %s", s_info)
self._driver_update()
if self.cql:
self._driver_update()
else:
await self.driver_connect()
return s_info
async def remove_node(self, initiator_id: ServerNum, server_id: ServerNum,
@@ -174,13 +187,15 @@ class ManagerClient():
"""Invoke remove node Scylla REST API for a specified server"""
logger.debug("ManagerClient remove node %s on initiator %s", server_id, initiator_id)
data = {"server_id": server_id, "ignore_dead": ignore_dead}
await self.client.put_json(f"/cluster/remove-node/{initiator_id}", data)
await self.client.put_json(f"/cluster/remove-node/{initiator_id}", data,
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
self._driver_update()
async def decommission_node(self, server_id: ServerNum) -> None:
"""Tell a node to decommission with Scylla REST API"""
logger.debug("ManagerClient decommission %s", server_id)
await self.client.get_text(f"/cluster/decommission-node/{server_id}")
await self.client.get_text(f"/cluster/decommission-node/{server_id}",
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
self._driver_update()
async def server_get_config(self, server_id: ServerNum) -> dict[str, object]:
@@ -224,3 +239,32 @@ class ManagerClient():
except Exception as exc:
raise Exception(f"Failed to get local host id address for server {server_id}") from exc
return HostID(host_id)
async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.):
"""Wait till a server sees a minimum given count of other servers"""
if count < 1:
return
server_ip = await self.get_host_ip(server_id)
async def _sees_min_others():
alive_nodes = await self.api.get_alive_endpoints(server_ip)
if len(alive_nodes) > count:
return True
await wait_for(_sees_min_others, time() + interval, period=.1)
async def server_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress,
interval: float = 45.):
"""Wait till a server sees another specific server IP as alive"""
async def _sees_another_server():
alive_nodes = await self.api.get_alive_endpoints(server_ip)
if other_ip in alive_nodes:
return True
await wait_for(_sees_another_server, time() + interval, period=.1)
async def server_not_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress,
interval: float = 45.):
"""Wait till a server sees another specific server IP as dead"""
async def _not_sees_another_server():
alive_nodes = await self.api.get_alive_endpoints(server_ip)
if not other_ip in alive_nodes:
return True
await wait_for(_not_sees_another_server, time() + interval, period=.1)

View File

@@ -35,10 +35,12 @@ import itertools
import logging
import random
import uuid
import time
from typing import Optional, Type, List, Set, Union, TYPE_CHECKING
if TYPE_CHECKING:
from cassandra.cluster import Session as CassandraSession # type: ignore
from test.pylib.manager_client import ManagerClient
from test.pylib.util import get_available_host, read_barrier
logger = logging.getLogger('random_tables')
@@ -173,10 +175,11 @@ class RandomTable():
return await self.manager.cql.run_async(cql_stmt)
async def add_column(self, name: str = None, ctype: Type[ValueType] = None, column: Column = None):
"""Add a value column to the table"""
if column is not None:
assert type(column) is Column, "Wrong column type to add_column"
else:
name = name if name is not None else f"c_{self.next_clustering_id():02}"
name = name if name is not None else f"v_{self.next_value_id():02}"
ctype = ctype if ctype is not None else TextType
column = Column(name, ctype=ctype)
self.columns.append(column)
@@ -244,7 +247,8 @@ class RandomTable():
class RandomTables():
"""A list of managed random tables"""
def __init__(self, test_name: str, manager: ManagerClient, keyspace: str):
def __init__(self, test_name: str, manager: ManagerClient, keyspace: str,
replication_factor: int):
self.test_name = test_name
self.manager = manager
self.keyspace = keyspace
@@ -252,7 +256,8 @@ class RandomTables():
self.removed_tables: List[RandomTable] = []
assert self.manager.cql is not None
self.manager.cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = "
"{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
"{ 'class' : 'NetworkTopologyStrategy', "
f"'replication_factor' : {replication_factor} }}")
async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None:
"""Add random tables to the list.
@@ -297,7 +302,7 @@ class RandomTables():
assert self.manager.cql is not None
self.manager.cql.execute(f"DROP KEYSPACE {self.keyspace}")
async def verify_schema(self, table: Union[RandomTable, str] = None) -> None:
async def verify_schema(self, table: Union[RandomTable, str] = None, do_read_barrier: bool = True) -> None:
"""Verify schema of all active managed random tables"""
if isinstance(table, RandomTable):
tables = {table.name}
@@ -315,8 +320,18 @@ class RandomTables():
f"WHERE keyspace_name = '{self.keyspace}'"
logger.debug(cql_stmt1)
assert self.manager.cql is not None
res1 = {row.table_name for row in await self.manager.cql.run_async(cql_stmt1)}
cql = self.manager.cql
assert cql
host = await get_available_host(cql, time.time() + 60)
if do_read_barrier:
# Issue a read barrier on some node and then keep using that node to do the queries.
# This ensures that the queries return recent data (at least all data committed
# when `verify_schema` was called).
await read_barrier(cql, host)
res1 = {row.table_name for row in await cql.run_async(cql_stmt1, host=host)}
assert not tables - res1, f"Tables {tables - res1} not present"
for table_name in tables:
@@ -326,8 +341,7 @@ class RandomTables():
cql_stmt2 = f"SELECT column_name, position, kind, type FROM system_schema.columns " \
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table_name}'"
logger.debug(cql_stmt2)
assert self.manager.cql is not None
res2 = {row.column_name: row for row in await self.manager.cql.run_async(cql_stmt2)}
res2 = {row.column_name: row for row in await cql.run_async(cql_stmt2, host=host)}
assert res2.keys() == cols.keys(), f"Column names for {table_name} do not match " \
f"expected ({', '.join(cols.keys())}) " \
f"got ({', '.join(res2.keys())})"

View File

@@ -94,15 +94,15 @@ class RESTClient(metaclass=ABCMeta):
async def post(self, resource_uri: str, host: Optional[str] = None,
port: Optional[int] = None, params: Optional[Mapping[str, str]] = None,
json: Mapping = None) -> None:
json: Mapping = None, timeout: Optional[float] = None) -> None:
await self._fetch("POST", resource_uri, host = host, port = port, params = params,
json = json)
json = json, timeout = timeout)
async def put_json(self, resource_uri: str, data: Mapping, host: Optional[str] = None,
port: Optional[int] = None, params: Optional[dict[str, str]] = None,
response_type: Optional[str] = None) -> Any:
response_type: Optional[str] = None, timeout: Optional[float] = None) -> Any:
ret = await self._fetch("PUT", resource_uri, response_type = response_type, host = host,
port = port, params = params, json = data)
port = port, params = params, json = data, timeout = timeout)
return ret
async def delete(self, resource_uri: str, host: Optional[str] = None,
@@ -161,19 +161,20 @@ class ScyllaRESTAPIClient():
return result
async def remove_node(self, initiator_ip: IPAddress, host_id: HostID,
ignore_dead: list[IPAddress]) -> None:
ignore_dead: list[IPAddress], timeout: float) -> None:
"""Initiate remove node of host_id in initiator initiator_ip"""
logger.info("remove_node for %s on %s", host_id, initiator_ip)
await self.client.post("/storage_service/remove_node",
params = {"host_id": host_id,
"ignore_nodes": ",".join(ignore_dead)},
host = initiator_ip)
host = initiator_ip, timeout = timeout)
logger.debug("remove_node for %s finished", host_id)
async def decommission_node(self, host_ip: str) -> None:
async def decommission_node(self, host_ip: str, timeout: float) -> None:
"""Initiate decommission node of host_ip"""
logger.debug("decommission_node %s", host_ip)
await self.client.post("/storage_service/decommission", host = host_ip)
await self.client.post("/storage_service/decommission", host = host_ip,
timeout = timeout)
logger.debug("decommission_node %s finished", host_ip)
async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> int:
@@ -189,6 +190,12 @@ class ScyllaRESTAPIClient():
assert(type(data) == list)
return data
async def get_alive_endpoints(self, node_ip: str) -> list:
"""Get the list of alive nodes according to `node_ip`."""
data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip)
assert(type(data) == list)
return data
async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None:
"""Enable error injection named `injection` on `node_ip`. Depending on `one_shot`,
the injection will be executed only once or every time the process passes the injection point.
@@ -217,12 +224,13 @@ class ScyllaRESTAPIClient():
@asynccontextmanager
async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str,
one_shot: bool):
async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str):
"""Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize.
It will trigger a test to be skipped if attempting to enable an injection has no effect.
This is a context manager for enabling and disabling when done, therefore it can't be
used for one shot.
"""
await api.enable_injection(node_ip, injection, one_shot)
await api.enable_injection(node_ip, injection, False)
enabled = await api.get_enabled_injections(node_ip)
logging.info(f"Error injections enabled on {node_ip}: {enabled}")
if not enabled:
@@ -232,3 +240,15 @@ async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection:
finally:
logger.info(f"Disabling error injection {injection}")
await api.disable_injection(node_ip, injection)
async def inject_error_one_shot(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str):
"""Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize.
It will trigger a test to be skipped if attempting to enable an injection has no effect.
This is a one-shot injection enable.
"""
await api.enable_injection(node_ip, injection, True)
enabled = await api.get_enabled_injections(node_ip)
logging.info(f"Error injections enabled on {node_ip}: {enabled}")
if not enabled:
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")

View File

@@ -25,7 +25,7 @@ from io import BufferedWriter
from test.pylib.host_registry import Host, HostRegistry
from test.pylib.pool import Pool
from test.pylib.rest_client import ScyllaRESTAPIClient, HTTPError
from test.pylib.util import LogPrefixAdapter
from test.pylib.util import LogPrefixAdapter, read_last_line
from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo
import aiohttp
import aiohttp.web
@@ -99,6 +99,8 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str
'permissions_update_interval_in_ms': 100,
'permissions_validity_in_ms': 100,
'force_schema_commit_log': True,
}
# Seastar options can not be passed through scylla.yaml, use command line
@@ -185,7 +187,9 @@ class ScyllaServer:
"""Starts and handles a single Scylla server, managing logs, checking if responsive,
and cleanup when finished."""
# pylint: disable=too-many-instance-attributes
START_TIMEOUT = 300 # seconds
# in seconds, used for topology operations such as bootstrap or decommission
TOPOLOGY_TIMEOUT = 1000
start_time: float
sleep_interval: float
log_file: BufferedWriter
@@ -224,11 +228,7 @@ class ScyllaServer:
async def install_and_start(self, api: ScyllaRESTAPIClient) -> None:
"""Setup and start this server"""
try:
await self.install()
except:
await self.uninstall()
raise
await self.install()
self.logger.info("starting server at host %s in %s...", self.ip_addr, self.workdir.name)
@@ -263,11 +263,19 @@ class ScyllaServer:
# Cleanup any remains of the previously running server in this path
shutil.rmtree(self.workdir, ignore_errors=True)
self.workdir.mkdir(parents=True, exist_ok=True)
self.config_filename.parent.mkdir(parents=True, exist_ok=True)
self._write_config_file()
try:
self.workdir.mkdir(parents=True, exist_ok=True)
self.config_filename.parent.mkdir(parents=True, exist_ok=True)
self._write_config_file()
self.log_file = self.log_filename.open("wb")
self.log_file = self.log_filename.open("wb")
except:
try:
shutil.rmtree(self.workdir)
except FileNotFoundError:
pass
self.log_filename.unlink(missing_ok=True)
raise
def get_config(self) -> dict[str, object]:
"""Return the contents of conf/scylla.yaml as a dict."""
@@ -317,7 +325,7 @@ class ScyllaServer:
# initializing. When the role is ready, queries begin to
# work, so rely on this "side effect".
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.ip_addr]),
request_timeout=self.START_TIMEOUT)
request_timeout=self.TOPOLOGY_TIMEOUT)
connected = False
try:
# In a cluster setup, it's possible that the CQL
@@ -358,7 +366,7 @@ class ScyllaServer:
return False
# Any other exception may indicate a problem, and is passed to the caller.
async def start(self, api: ScyllaRESTAPIClient) -> None:
async def start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None) -> None:
"""Start an installed server. May be used for restarts."""
env = os.environ.copy()
@@ -377,43 +385,45 @@ class ScyllaServer:
sleep_interval = 0.1
cql_up_state = CqlUpState.NOT_CONNECTED
while time.time() < self.start_time + self.START_TIMEOUT:
def report_error(message: str):
message += f", server_id {self.server_id}, IP {self.ip_addr}, workdir {self.workdir.name}"
message += f", host_id {self.host_id if hasattr(self, 'host_id') else '<missing>'}"
message += f", cql [{'connected' if cql_up_state == CqlUpState.CONNECTED else 'not connected'}]"
if expected_error is not None:
message += f", the node log was expected to contain the string [{expected_error}]"
self.logger.error(message)
self.logger.error("last line of %s:\n%s", self.log_filename, read_last_line(self.log_filename))
log_handler = logging.getLogger().handlers[0]
if hasattr(log_handler, 'baseFilename'):
logpath = log_handler.baseFilename # type: ignore
else:
logpath = "?"
raise RuntimeError(message + "\nCheck the log files:\n"
f"{logpath}\n"
f"{self.log_filename}")
while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT:
if self.cmd.returncode:
with self.log_filename.open('r') as log_file:
self.logger.error("failed to start server at host %s in %s",
self.ip_addr, self.workdir.name)
self.logger.error("last line of %s:", self.log_filename)
log_file.seek(0, 0)
self.logger.error(log_file.readlines()[-1].rstrip())
log_handler = logging.getLogger().handlers[0]
if hasattr(log_handler, 'baseFilename'):
logpath = log_handler.baseFilename # type: ignore
else:
logpath = "?"
raise RuntimeError(f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}.\n"
"Check the log files:\n"
f"{logpath}\n"
f"{self.log_filename}")
self.cmd = None
if expected_error is not None:
with self.log_filename.open('r') as log_file:
for line in log_file:
if expected_error in line:
return
report_error("the node startup failed, but the log file doesn't contain the expected error")
report_error("failed to start the node")
if hasattr(self, "host_id") or await self.get_host_id(api):
cql_up_state = await self.cql_is_up()
if cql_up_state == CqlUpState.QUERIED:
if expected_error is not None:
report_error("the node started, but was expected to fail with the expected error")
return
# Sleep and retry
await asyncio.sleep(sleep_interval)
err = f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}."
if hasattr(self, "host_id"):
err += f" Managed to obtain the server's Host ID ({self.host_id})"
if cql_up_state == CqlUpState.CONNECTED:
err += " and to connect the CQL driver, but failed to execute a query."
else:
err += " but failed to connect the CQL driver."
else:
err += " Failed to obtain the server's Host ID."
err += f"\nCheck server log at {self.log_filename}."
raise RuntimeError(err)
report_error('failed to start the node, timeout reached')
async def force_schema_migration(self) -> None:
"""This is a hack to change schema hash on an existing cluster node
@@ -422,7 +432,7 @@ class ScyllaServer:
previous state propagation was missed."""
auth = PlainTextAuthProvider(username='cassandra', password='cassandra')
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(self.seeds),
request_timeout=self.START_TIMEOUT)
request_timeout=self.TOPOLOGY_TIMEOUT)
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
contact_points=self.seeds,
auth_provider=auth,
@@ -566,7 +576,8 @@ class ScyllaCluster:
try:
for _ in range(self.replicas):
await self.add_server()
self.keyspace_count = self._get_keyspace_count()
if self.replicas > 0:
self.keyspace_count = self._get_keyspace_count()
except Exception as exc:
# If start fails, swallow the error to throw later,
# at test time.
@@ -578,7 +589,7 @@ class ScyllaCluster:
async def uninstall(self) -> None:
"""Stop running servers and uninstall all servers"""
self.is_dirty = True
self.logger.info("Uninstalling cluster")
self.logger.info("Uninstalling cluster %s", self)
await self.stop()
await asyncio.gather(*(srv.uninstall() for srv in self.stopped.values()))
await asyncio.gather(*(self.host_registry.release_host(Host(ip))
@@ -588,6 +599,7 @@ class ScyllaCluster:
"""Release all IPs leased from the host registry by this cluster.
Call this function only if the cluster is stopped and will not be started again."""
assert not self.running
self.logger.info("Cluster %s releases ips %s", self, self.leased_ips)
while self.leased_ips:
ip = self.leased_ips.pop()
await self.host_registry.release_host(Host(ip))
@@ -617,11 +629,11 @@ class ScyllaCluster:
def _seeds(self) -> List[str]:
return [server.ip_addr for server in self.running.values()]
async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None) -> ServerInfo:
async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
extra_config: dict[str, str] = {}
extra_config: dict[str, str] = config.copy() if config else {}
if replace_cfg:
replaced_id = replace_cfg.replaced_id
assert replaced_id in self.servers, \
@@ -662,7 +674,10 @@ class ScyllaCluster:
try:
server = self.create_server(params)
self.logger.info("Cluster %s adding server...", self)
await server.install_and_start(self.api)
if start:
await server.install_and_start(self.api)
else:
await server.install()
except Exception as exc:
self.logger.error("Failed to start Scylla server at host %s in %s: %s",
ip_addr, server.workdir.name, str(exc))
@@ -670,9 +685,12 @@ class ScyllaCluster:
self.leased_ips.remove(ip_addr)
await self.host_registry.release_host(Host(ip_addr))
raise
self.running[server.server_id] = server
if start:
self.running[server.server_id] = server
else:
self.stopped[server.server_id] = server
self.logger.info("Cluster %s added %s", self, server)
return ServerInfo(server.server_id, server.ip_addr, server.host_id)
return ServerInfo(server.server_id, server.ip_addr)
def endpoint(self) -> str:
"""Get a server id (IP) from running servers"""
@@ -704,9 +722,9 @@ class ScyllaCluster:
stopped = ", ".join(str(server) for server in self.stopped.values())
return f"ScyllaCluster(name: {self.name}, running: {running}, stopped: {stopped})"
def running_servers(self) -> List[Tuple[ServerNum, IPAddress, HostID]]:
def running_servers(self) -> List[Tuple[ServerNum, IPAddress]]:
"""Get a list of tuples of server id and IP address of running servers (and not removed)"""
return [(server.server_id, server.ip_addr, server.host_id) for server in self.running.values()
return [(server.server_id, server.ip_addr) for server in self.running.values()
if server.server_id not in self.removed]
def _get_keyspace_count(self) -> int:
@@ -730,18 +748,26 @@ class ScyllaCluster:
if self.start_exception:
# Mark as dirty so further test cases don't try to reuse this cluster.
self.is_dirty = True
raise self.start_exception
raise Exception(f'Exception when starting cluster {self}:\n{self.start_exception}')
for server in self.running.values():
server.write_log_marker(f"------ Starting test {name} ------\n")
def after_test(self, name) -> None:
"""Check that the cluster is still alive and the test
def after_test(self, name: str, success: bool) -> None:
"""Mark the cluster as dirty after a failed test.
If the cluster is not dirty, check that it's still alive and the test
hasn't left any garbage."""
assert self.start_exception is None
if self._get_keyspace_count() != self.keyspace_count:
raise RuntimeError("Test post-condition failed, "
"the test must drop all keyspaces it creates.")
if not success:
self.logger.debug(f"Test failed using cluster {self.name}, marking the cluster as dirty")
self.is_dirty = True
if self.is_dirty:
self.logger.info(f"The cluster {self.name} is dirty, not checking"
f" keyspace count post-condition")
else:
if self.running and self._get_keyspace_count() != self.keyspace_count:
raise RuntimeError(f"Test post-condition on cluster {self.name} failed, "
f"the test must drop all keyspaces it creates.")
for server in itertools.chain(self.running.values(), self.stopped.values()):
server.write_log_marker(f"------ Ending test {name} ------\n")
@@ -770,9 +796,8 @@ class ScyllaCluster:
self.logger.debug("Cluster %s marking server %s as removed", self, server_id)
self.removed.add(server_id)
async def server_start(self, server_id: ServerNum) -> ActionReturn:
async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None) -> ActionReturn:
"""Start a stopped server"""
self.logger.info("Cluster %s starting server %s", self, server_id)
if server_id in self.running:
return ScyllaCluster.ActionReturn(success=True,
msg=f"{self.running[server_id]} already running")
@@ -780,11 +805,16 @@ class ScyllaCluster:
return ScyllaCluster.ActionReturn(success=False, msg=f"Server {server_id} unknown")
self.is_dirty = True
server = self.stopped.pop(server_id)
self.logger.info("Cluster %s starting server %s ip %s", self,
server_id, server.ip_addr)
server.seeds = self._seeds()
# Put the server in `running` before starting it.
# Starting may fail and if we didn't add it now it might leak.
self.running[server_id] = server
await server.start(self.api)
await server.start(self.api, expected_error)
if expected_error is not None:
self.running.pop(server_id)
self.stopped[server_id] = server
return ScyllaCluster.ActionReturn(success=True, msg=f"{server} started")
async def server_restart(self, server_id: ServerNum) -> ActionReturn:
@@ -930,7 +960,7 @@ class ScyllaClusterManager:
add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr)
add_get('/cluster/host-id/{server_id}', self._cluster_host_id)
add_get('/cluster/before-test/{test_case_name}', self._before_test_req)
add_get('/cluster/after-test', self._after_test)
add_get('/cluster/after-test/{success}', self._after_test)
add_get('/cluster/mark-dirty', self._mark_dirty)
add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop)
add_get('/cluster/server/{server_id}/stop_gracefully', self._cluster_server_stop_gracefully)
@@ -982,13 +1012,16 @@ class ScyllaClusterManager:
async def _after_test(self, _request) -> aiohttp.web.Response:
assert self.cluster is not None
assert self.current_test_case_full_name
self.logger.info("Finished test %s, cluster: %s", self.current_test_case_full_name, self.cluster)
success = _request.match_info["success"] == "True"
self.logger.info("Test %s %s, cluster: %s", self.current_test_case_full_name,
"SUCCEEDED" if success else "FAILED", self.cluster)
try:
self.cluster.after_test(self.current_test_case_full_name)
self.cluster.after_test(self.current_test_case_full_name, success)
finally:
self.current_test_case_full_name = ''
self.is_after_test_ok = True
return aiohttp.web.Response(text="True")
cluster_str = str(self.cluster)
return aiohttp.web.Response(text=cluster_str)
async def _mark_dirty(self, _request) -> aiohttp.web.Response:
"""Mark current cluster dirty"""
@@ -1018,7 +1051,8 @@ class ScyllaClusterManager:
"""Start a specified server (must be stopped)"""
assert self.cluster
server_id = ServerNum(int(request.match_info["server_id"]))
ret = await self.cluster.server_start(server_id)
expected_error = request.query.get("expected_error")
ret = await self.cluster.server_start(server_id, expected_error)
return aiohttp.web.Response(status=200 if ret[0] else 500, text=ret[1])
async def _cluster_server_restart(self, request) -> aiohttp.web.Response:
@@ -1033,10 +1067,9 @@ class ScyllaClusterManager:
assert self.cluster
data = await request.json()
replace_cfg = ReplaceConfig(**data["replace_cfg"]) if "replace_cfg" in data else None
s_info = await self.cluster.add_server(replace_cfg, data.get('cmdline'))
s_info = await self.cluster.add_server(replace_cfg, data.get('cmdline'), data.get('config'), data.get('start', True))
return aiohttp.web.json_response({"server_id" : s_info.server_id,
"ip_addr": s_info.ip_addr,
"host_id": s_info.host_id})
"ip_addr": s_info.ip_addr})
async def _cluster_remove_node(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
"""Run remove node on Scylla REST API for a specified server"""
@@ -1060,7 +1093,8 @@ class ScyllaClusterManager:
# initate remove
try:
await self.cluster.api.remove_node(initiator.ip_addr, to_remove.host_id, ignore_dead)
await self.cluster.api.remove_node(initiator.ip_addr, to_remove.host_id, ignore_dead,
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
except RuntimeError as exc:
self.logger.error("_cluster_remove_node failed initiator %s server %s ignore_dead %s, check log at %s",
initiator, to_remove, ignore_dead, initiator.log_filename)
@@ -1079,7 +1113,7 @@ class ScyllaClusterManager:
self.logger.warning("_cluster_decommission_node %s is only running node left", server_id)
server = self.cluster.running[server_id]
try:
await self.cluster.api.decommission_node(server.ip_addr)
await self.cluster.api.decommission_node(server.ip_addr, timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
except RuntimeError as exc:
self.logger.error("_cluster_decommission_node %s, check log at %s", server,
server.log_filename)

View File

@@ -6,11 +6,15 @@
import time
import asyncio
import logging
import pathlib
import os
import pytest
from typing import Callable, Awaitable, Optional, TypeVar, Generic
from cassandra.cluster import NoHostAvailable, Session # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo
@@ -76,5 +80,45 @@ async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], de
return hosts
def read_last_line(file_path: pathlib.Path, max_line_bytes = 512):
file_size = os.stat(file_path).st_size
with file_path.open('rb') as f:
f.seek(max(0, file_size - max_line_bytes), os.SEEK_SET)
line_bytes = f.read()
line_str = line_bytes.decode('utf-8', errors='ignore')
linesep = os.linesep
if line_str.endswith(linesep):
line_str = line_str[:-len(linesep)]
linesep_index = line_str.rfind(linesep)
if linesep_index != -1:
line_str = line_str[linesep_index + len(linesep):]
elif file_size > max_line_bytes:
line_str = '...' + line_str
return line_str
async def get_available_host(cql: Session, deadline: float) -> Host:
hosts = cql.cluster.metadata.all_hosts()
async def find_host():
for h in hosts:
try:
await cql.run_async(
"select key from system.local where key = 'local'", host=h)
except NoHostAvailable:
logging.debug(f"get_available_host: {h} not available")
continue
return h
return None
return await wait_for(find_host, deadline)
async def read_barrier(cql: Session, host: Host):
"""To issue a read barrier it is sufficient to attempt dropping a
non-existing table. We need to use `if exists`, otherwise the statement
would fail on prepare/validate step which happens before a read barrier is
performed.
"""
await cql.run_async("drop table if exists nosuchkeyspace.nosuchtable", host = host)
unique_name.last_ms = 0

View File

View File

@@ -0,0 +1,9 @@
# Pytest configuration file. If we don't have one in this directory,
# pytest will look for one in our ancestor directories, and may find
# something irrelevant. So we should have one here, even if empty.
[pytest]
asyncio_mode = auto
log_cli = true
log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s
log_date_format = %H:%M:%S

11
test/pylib_test/run Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
# Use the run.py library from ../cql-pytest:
import sys
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
import run
success = run.run_pytest(sys.path[0], sys.argv[1:])
run.summary = 'Pylib tests pass' if success else 'Pylib tests failure'
exit(0 if success else 1)

View File

@@ -0,0 +1 @@
type: Run

View File

@@ -0,0 +1,28 @@
import os
import tempfile
import pathlib
from test.pylib.util import read_last_line
def test_read_last_line():
test_cases = [
(b"This is the first line.\nThis is the second line.\nThis is the third line.", 'This is the third line.'),
(b"This is another file.\nIt has a few lines.\nThe last line is what we're interested in.", 'The last line is what we\'re interested in.'),
(b"This file has only one line.", 'This file has only one line.'),
(b"\n", ""),
(b"\n\n\n", ""),
(b"", ""),
(b"abc\n", 'abc'),
(b"abc", '...bc', 2),
(b"lalala\nbububu", "bububu"),
(b"line1\nline2\nline3\n", "...line3", 6),
(b"line1\nline2\nline3", "line3", 6),
(b"line1\nline2\nline3\n", "line3", 7),
(b"\xbe\xbe\xbe\xbebububu\n", "bububu")
]
for test_case in test_cases:
with tempfile.NamedTemporaryFile(dir=os.getenv('TMPDIR', '/tmp')) as f:
f.write(test_case[0])
f.flush()
file_path = pathlib.Path(f.name)
actual = read_last_line(file_path, test_case[2]) if len(test_case) == 3 else read_last_line(file_path)
assert(actual == test_case[1])

View File

@@ -18,6 +18,8 @@ from cassandra.cluster import Session, ResponseFuture # type:
from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module
from cassandra.policies import RoundRobinPolicy # type: ignore
from cassandra.policies import TokenAwarePolicy # type: ignore
from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore
from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module
from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module
@@ -37,6 +39,26 @@ def pytest_addoption(parser):
parser.addoption('--ssl', action='store_true',
help='Connect to CQL via an encrypted TLSv1.2 connection')
# This is a constant used in `pytest_runtest_makereport` below to store a flag
# indicating test failure in a stash which can then be accessed from fixtures.
FAILED_KEY = pytest.StashKey[bool]()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""This is a post-test hook execucted by the pytest library.
Use it to access the test result and store a flag indicating failure
so we can later retrieve it in our fixtures like `manager`.
`item.stash` is the same stash as `request.node.stash` (in the `request`
fixture provided by pytest).
"""
outcome = yield
report = outcome.get_result()
item.stash[FAILED_KEY] = report.when == "call" and report.failed
# Change default pytest-asyncio event_loop fixture scope to session to
# allow async fixtures with scope larger than function. (e.g. manager fixture)
# See https://github.com/pytest-dev/pytest-asyncio/issues/68
@@ -105,6 +127,11 @@ def cluster_con(hosts: List[IPAddress], port: int, use_ssl: bool):
# See issue #11289.
# NOTE: request_timeout is the main cause of timeouts, even if logs say heartbeat
request_timeout=200)
whitelist_profile = ExecutionProfile(
load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
request_timeout=200)
if use_ssl:
# Scylla does not support any earlier TLS protocol. If you try,
# you will get mysterious EOF errors (see issue #6971) :-(
@@ -112,7 +139,7 @@ def cluster_con(hosts: List[IPAddress], port: int, use_ssl: bool):
else:
ssl_context = None
return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile, 'whitelist': whitelist_profile},
contact_points=hosts,
port=port,
# TODO: make the protocol version an option, to allow testing with
@@ -159,7 +186,10 @@ async def manager(request, manager_internal):
test_case_name = request.node.name
await manager_internal.before_test(test_case_name)
yield manager_internal
await manager_internal.after_test(test_case_name)
# `request.node.stash` contains a flag stored in `pytest_runtest_makereport`
# that indicates test failure.
failed = request.node.stash[FAILED_KEY]
await manager_internal.after_test(test_case_name, not failed)
# "cql" fixture: set up client object for communicating with the CQL API.
# Since connection is managed by manager just return that object
@@ -192,9 +222,21 @@ def fails_without_consistent_cluster_management(request, check_pre_consistent_cl
# "random_tables" fixture: Creates and returns a temporary RandomTables object
# used in tests to make schema changes. Tables are dropped after finished.
# used in tests to make schema changes. Tables are dropped after test finishes
# unless the cluster is dirty or the test has failed.
@pytest.fixture(scope="function")
def random_tables(request, manager):
tables = RandomTables(request.node.name, manager, unique_name())
async def random_tables(request, manager):
rf_marker = request.node.get_closest_marker("replication_factor")
replication_factor = rf_marker.args[0] if rf_marker is not None else 3 # Default 3
tables = RandomTables(request.node.name, manager, unique_name(), replication_factor)
yield tables
tables.drop_all()
# Don't drop tables at the end if we failed or the cluster is dirty - it may be impossible
# (e.g. the cluster is completely dead) and it doesn't matter (we won't reuse the cluster
# anyway).
# The cluster will be marked as dirty if the test failed, but that happens
# at the end of `manager` fixture which we depend on (so these steps will be
# executed after us) - so at this point, we need to check for failure ourselves too.
failed = request.node.stash[FAILED_KEY]
if not failed and not await manager.is_dirty():
tables.drop_all()

View File

@@ -1,374 +0,0 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test consistency of schema changes with topology changes.
"""
import pytest
import logging
import asyncio
import random
import time
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.pylib.internal_types import ServerInfo
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from cassandra.cluster import Session
from test.pylib.random_tables import RandomTables
from test.pylib.rest_client import inject_error
logger = logging.getLogger(__name__)
async def get_token_ring_host_ids(manager: ManagerClient, srv: ServerInfo) -> set[str]:
"""Get the host IDs of token ring members known by `srv`."""
host_id_map = await manager.api.client.get_json('/storage_service/host_id', srv.ip_addr)
return {e['value'] for e in host_id_map}
async def get_current_group0_config(manager: ManagerClient, srv: ServerInfo) -> set[tuple[str, bool]]:
"""Get the current Raft group 0 configuration known by `srv`.
The first element of each tuple is the Raft ID of the node (which is equal to the Host ID),
the second element indicates whether the node is a voter.
"""
assert(manager.cql)
host = (await wait_for_cql_and_get_hosts(manager.cql, [srv], time.time() + 60))[0]
group0_id = (await manager.cql.run_async(
"select value from system.scylla_local where key = 'raft_group0_id'",
host=host))[0].value
config = await manager.cql.run_async(
f"select server_id, can_vote from system.raft_state where group_id = {group0_id} and disposition = 'CURRENT'",
host=host)
return {(str(m.server_id), bool(m.can_vote)) for m in config}
@pytest.mark.asyncio
async def test_add_server_add_column(manager, random_tables):
"""Add a node and then add a column to a table and verify"""
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_stop_server_add_column(manager, random_tables):
"""Add a node, stop an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.server_stop(servers[1].server_id)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_restart_server_add_column(manager, random_tables):
"""Add a node, stop an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
ret = await manager.server_restart(servers[1].server_id)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_remove_node_add_column(manager, random_tables):
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
await table.add_column()
await random_tables.verify_schema()
# TODO: check that group 0 no longer contains the removed node (#12153)
@pytest.mark.asyncio
async def test_decommission_node_add_column(manager, random_tables):
"""Add a node, remove an original node, add a column"""
table = await random_tables.add_table(ncolumns=5)
servers = await manager.running_servers()
decommission_target = servers[1]
# The sleep injections significantly increase the probability of reproducing #11780:
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
# enters sleep before calling storage_service::notify_joined
# 3. we start decommission on decommission_target
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
# 5. bootstrapped_server receives the RPC and enters sleep
# 6. decommission_target handle_state_normal wakes up,
# calls storage_service::notify_joined which drops some RPC clients
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add()
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
await manager.api.enable_injection(
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
await manager.decommission_node(decommission_target.server_id)
await table.add_column()
await random_tables.verify_schema()
# TODO: check that group 0 no longer contains the decommissioned node (#12153)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node (#12153)
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node (#12153)
@pytest.mark.asyncio
async def test_replace_reuse_ip(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node (#12153)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node (#12153)
# Checks basic functionality on the cluster with different values of the --smp parameter on the nodes.
@pytest.mark.asyncio
async def test_nodes_with_different_smp(manager: ManagerClient, random_tables: RandomTables) -> None:
# In this test it's more convenient to start with a fresh cluster.
# We don't need the default nodes,
# but there is currently no way to express this in the test infrastructure
# When the node starts it tries to communicate with others
# by sending group0_peer_exchange message to them.
# This message can be handled on arbitrary shard of the target node.
# The method manager.server_add waits for node to start, which can only happen
# if this message has been handled correctly.
#
# Note: messaging_service is initialized with server_socket::load_balancing_algorithm::port
# policy, this means that the shard for message will be chosen as client_port % smp::count.
# The client port in turn is chosen as rand() * smp::count + current_shard
# (posix_socket_impl::find_port_and_connect).
# If this succeeds to occupy a free port in 5 tries and smp::count is the same
# on both nodes, then it's guaranteed that the message will be
# processed on the same shard as the calling code.
# In the general case, we cannot assume that this same shard guarantee holds.
logger.info(f'Adding --smp=3 server')
await manager.server_add(cmdline=['--smp', '3'])
# Remove the original 3 servers, the problem is easier to reproduce with --smp values
# that we pick, not the (currently) default --smp=2 coming from the suite.
logger.info(f'Decommissioning old servers')
servers = await manager.running_servers()
for s in servers[:-1]:
await manager.decommission_node(s.server_id)
logger.info(f'Adding --smp=4 server')
await manager.server_add(cmdline=['--smp', '4'])
logger.info(f'Adding --smp=5 server')
await manager.server_add(cmdline=['--smp', '5'])
logger.info(f'Creating new tables')
await random_tables.add_tables(ntables=4, ncolumns=5)
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_remove_garbage_group0_members(manager: ManagerClient, random_tables):
"""
Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission
can be handled by executing removenode (which should clear the 'garbage' group 0 member),
even though the node is no longer a token ring member.
"""
# 4 servers, one dead
await manager.server_add()
servers = await manager.running_servers()
removed_host_id = await manager.get_host_id(servers[0].server_id)
await manager.server_stop_gracefully(servers[0].server_id)
logging.info(f'removenode {servers[0]} using {servers[1]}')
# removenode will fail after removing the server from the token ring,
# but before removing it from group 0
async with inject_error(manager.api, servers[1].ip_addr,
'removenode_fail_before_remove_from_group0', one_shot=True):
try:
await manager.remove_node(servers[1].server_id, servers[0].server_id)
except Exception:
# Note: the exception returned here is only '500 internal server error',
# need to look in test.py log for the actual message coming from Scylla.
logging.info(f'expected exception during injection')
# Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs
# (internally, this endpoint uses token_metadata)
token_ring_ids = await get_token_ring_host_ids(manager, servers[1])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members should currently be a subset of group 0 members
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
# The garbage member is the one that we failed to remove
assert garbage_member == removed_host_id
# Verify that at least it's a non-voter.
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[0]} using {servers[1]} again')
# Retry removenode. It should skip the token ring removal step and remove the server from group 0.
await manager.remove_node(servers[1].server_id, servers[0].server_id)
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members and group 0 members should now be the same.
assert token_ring_ids == group0_ids
# Verify that availability is not reduced.
# Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers.
logging.info(f'stop {servers[1]}')
await manager.server_stop_gracefully(servers[1].server_id)
logging.info(f'removenode {servers[1]} using {servers[2]}')
await manager.remove_node(servers[2].server_id, servers[1].server_id)
# Perform a similar scenario with decommission. One of the node fails to decommission fully,
# but it manages to leave the token ring. We observe the leftovers using the same APIs as above
# and remove the leftovers.
# We can do this with only 2 nodes because during decommission we become a non-voter before
# leaving the token ring, thus the remaining single node will become a voting majority
# and will be able to perform removenode alone.
decommissioned_host_id = await manager.get_host_id(servers[2].server_id)
await manager.api.enable_injection(
servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True)
logging.info(f'decommission {servers[2]}')
try:
await manager.decommission_node(servers[2].server_id)
except Exception:
logging.info(f'expected exception during injection')
logging.info(f'stop {servers[2]}')
await manager.server_stop_gracefully(servers[2].server_id)
token_ring_ids = await get_token_ring_host_ids(manager, servers[3])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[3])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
assert garbage_member == decommissioned_host_id
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[2]} using {servers[3]}')
await manager.remove_node(servers[3].server_id, servers[2].server_id)
group0_members = await get_current_group0_config(manager, servers[3])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
assert token_ring_ids == group0_ids
@pytest.mark.asyncio
@pytest.mark.skip(reason="Wait for @slow attribute, #11713")
async def test_remove_node_with_concurrent_ddl(manager, random_tables):
stopped = False
ddl_failed = False
async def do_ddl():
nonlocal ddl_failed
iteration = 0
while not stopped:
logger.debug(f'ddl, iteration {iteration} started')
try:
# If the node was removed, the driver may retry "create table" on another node,
# but the request might have already been completed.
# The same applies to drop_table.
await random_tables.add_tables(5, 5, if_not_exists=True)
await random_tables.verify_schema()
while len(random_tables.tables) > 0:
await random_tables.drop_table(random_tables.tables[-1], if_exists=True)
logger.debug(f'ddl, iteration {iteration} finished')
except:
logger.exception(f'ddl, iteration {iteration} failed')
ddl_failed = True
raise
iteration += 1
async def do_remove_node():
for i in range(10):
logger.debug(f'do_remove_node [{i}], iteration started')
if ddl_failed:
logger.debug(f'do_remove_node [{i}], ddl failed, exiting')
break
server_ids = await manager.running_servers()
host_ids = await asyncio.gather(*(manager.get_host_id(s) for s in server_ids))
initiator_index, target_index = random.sample(range(len(server_ids)), 2)
initiator_ip = server_ids[initiator_index]
target_ip = server_ids[target_index]
target_host_id = host_ids[target_index]
logger.info(f'do_remove_node [{i}], running remove_node, '
f'initiator server [{initiator_ip}], target ip [{target_ip}], '
f'target host id [{target_host_id}]')
await manager.wait_for_host_known(initiator_ip, target_host_id)
logger.info(f'do_remove_node [{i}], stopping target server [{target_ip}], host_id [{target_host_id}]')
await manager.server_stop_gracefully(target_ip)
logger.info(f'do_remove_node [{i}], target server [{target_ip}] stopped, '
f'waiting for it to be down on [{initiator_ip}]')
await manager.wait_for_host_down(initiator_ip, target_ip)
logger.info(f'do_remove_node [{i}], invoking remove_node')
await manager.remove_node(initiator_ip, target_ip, target_host_id)
# TODO: check that group 0 no longer contains the removed node (#12153)
logger.info(f'do_remove_node [{i}], remove_node done')
new_server_ip = await manager.server_add()
logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')
logger.info(f'do_remove_node [{i}], iteration finished')
ddl_task = asyncio.create_task(do_ddl())
try:
await do_remove_node()
finally:
logger.debug("do_remove_node finished, waiting for ddl fiber")
stopped = True
await ddl_task
logger.debug("ddl fiber done, finished")

View File

@@ -0,0 +1,50 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test replacing node in different scenarios
"""
import time
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.topology.util import wait_for_token_ring_and_group0_consistency
import pytest
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

View File

@@ -0,0 +1,21 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test rejoin of a server after it was stopped suddenly (crash-like)
"""
from test.pylib.manager_client import ManagerClient
import pytest
@pytest.mark.asyncio
async def test_start_after_sudden_stop(manager: ManagerClient, random_tables) -> None:
"""Tests a server can rejoin the cluster after being stopped suddenly"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_stop(servers[0].server_id)
await table.add_column()
await manager.server_start(servers[0].server_id)
await random_tables.verify_schema()

View File

@@ -0,0 +1,133 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test consistency of schema changes with topology changes.
"""
import logging
import asyncio
import random
import time
from test.pylib.util import wait_for
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.topology.util import check_token_ring_and_group0_consistency, \
wait_for_token_ring_and_group0_consistency
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_remove_node_add_column(manager: ManagerClient, random_tables: RandomTables):
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
await check_token_ring_and_group0_consistency(manager)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_decommission_node_add_column(manager: ManagerClient, random_tables: RandomTables):
"""Add a node, remove an original node, add a column"""
table = await random_tables.add_table(ncolumns=5)
servers = await manager.running_servers()
decommission_target = servers[1]
# The sleep injections significantly increase the probability of reproducing #11780:
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
# enters sleep before calling storage_service::notify_joined
# 3. we start decommission on decommission_target
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
# 5. bootstrapped_server receives the RPC and enters sleep
# 6. decommission_target handle_state_normal wakes up,
# calls storage_service::notify_joined which drops some RPC clients
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add()
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
await manager.api.enable_injection(
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
await manager.decommission_node(decommission_target.server_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
@pytest.mark.skip(reason="Wait for @slow attribute, #11713")
async def test_remove_node_with_concurrent_ddl(manager: ManagerClient, random_tables: RandomTables):
stopped = False
ddl_failed = False
async def do_ddl():
nonlocal ddl_failed
iteration = 0
while not stopped:
logger.debug(f'ddl, iteration {iteration} started')
try:
# If the node was removed, the driver may retry "create table" on another node,
# but the request might have already been completed.
# The same applies to drop_table.
await random_tables.add_tables(5, 5, if_not_exists=True)
await random_tables.verify_schema()
while len(random_tables.tables) > 0:
await random_tables.drop_table(random_tables.tables[-1], if_exists=True)
logger.debug(f'ddl, iteration {iteration} finished')
except:
logger.exception(f'ddl, iteration {iteration} failed')
ddl_failed = True
raise
iteration += 1
async def do_remove_node():
for i in range(10):
logger.debug(f'do_remove_node [{i}], iteration started')
if ddl_failed:
logger.debug(f'do_remove_node [{i}], ddl failed, exiting')
break
server_ids = await manager.running_servers()
host_ids = await asyncio.gather(*(manager.get_host_id(s) for s in server_ids))
initiator_index, target_index = random.sample(range(len(server_ids)), 2)
initiator_ip = server_ids[initiator_index]
target_ip = server_ids[target_index]
target_host_id = host_ids[target_index]
logger.info(f'do_remove_node [{i}], running remove_node, '
f'initiator server [{initiator_ip}], target ip [{target_ip}], '
f'target host id [{target_host_id}]')
await manager.wait_for_host_known(initiator_ip, target_host_id)
logger.info(f'do_remove_node [{i}], stopping target server [{target_ip}], host_id [{target_host_id}]')
await manager.server_stop_gracefully(target_ip)
logger.info(f'do_remove_node [{i}], target server [{target_ip}] stopped, '
f'waiting for it to be down on [{initiator_ip}]')
await manager.wait_for_host_down(initiator_ip, target_ip)
logger.info(f'do_remove_node [{i}], invoking remove_node')
await manager.remove_node(initiator_ip, target_ip, target_host_id)
# TODO: check that group 0 no longer contains the removed node (#12153)
logger.info(f'do_remove_node [{i}], remove_node done')
new_server_ip = await manager.server_add()
logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')
logger.info(f'do_remove_node [{i}], iteration finished')
ddl_task = asyncio.create_task(do_ddl())
try:
await do_remove_node()
finally:
logger.debug("do_remove_node finished, waiting for ddl fiber")
stopped = True
await ddl_task
logger.debug("ddl fiber done, finished")

View File

@@ -0,0 +1,128 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test removenode with node with node no longer member
"""
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.topology.util import get_token_ring_host_ids, get_current_group0_config, \
check_token_ring_and_group0_consistency
import pytest
logger = logging.getLogger(__name__)
async def test_remove_garbage_group0_members(manager: ManagerClient):
"""
Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission
can be handled by executing removenode (which should clear the 'garbage' group 0 member),
even though the node is no longer a token ring member.
"""
# 4 servers, one dead
await manager.server_add()
servers = await manager.running_servers()
removed_host_id = await manager.get_host_id(servers[0].server_id)
await manager.server_stop_gracefully(servers[0].server_id)
logging.info(f'removenode {servers[0]} using {servers[1]}')
# removenode will fail after removing the server from the token ring,
# but before removing it from group 0
await inject_error_one_shot(manager.api, servers[1].ip_addr,
'removenode_fail_before_remove_from_group0')
try:
await manager.remove_node(servers[1].server_id, servers[0].server_id)
except Exception:
# Note: the exception returned here is only '500 internal server error',
# need to look in test.py log for the actual message coming from Scylla.
logging.info(f'expected exception during injection')
# Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs
# (internally, this endpoint uses token_metadata)
token_ring_ids = await get_token_ring_host_ids(manager, servers[1])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members should currently be a subset of group 0 members
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
# The garbage member is the one that we failed to remove
assert garbage_member == removed_host_id
# Verify that at least it's a non-voter.
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[0]} using {servers[1]} again')
# Retry removenode. It should skip the token ring removal step and remove the server from group 0.
await manager.remove_node(servers[1].server_id, servers[0].server_id)
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members and group 0 members should now be the same.
assert token_ring_ids == group0_ids
# Verify that availability is not reduced.
# Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers.
logging.info(f'stop {servers[1]}')
await manager.server_stop_gracefully(servers[1].server_id)
logging.debug(f'waiting for {servers[2]} to see {servers[1]} is down')
await manager.server_not_sees_other_server(servers[2].ip_addr, servers[1].ip_addr)
logging.info(f'removenode {servers[1]} using {servers[2]}')
await manager.remove_node(servers[2].server_id, servers[1].server_id)
# Perform a similar scenario with decommission. One of the node fails to decommission fully,
# but it manages to leave the token ring. We observe the leftovers using the same APIs as above
# and remove the leftovers.
# We can do this with only 2 nodes because during decommission we become a non-voter before
# leaving the token ring, thus the remaining single node will become a voting majority
# and will be able to perform removenode alone.
decommissioned_host_id = await manager.get_host_id(servers[2].server_id)
await manager.api.enable_injection(
servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True)
logging.info(f'decommission {servers[2]}')
try:
await manager.decommission_node(servers[2].server_id)
except Exception:
logging.info(f'expected exception during injection')
logging.info(f'stop {servers[2]}')
await manager.server_stop_gracefully(servers[2].server_id)
token_ring_ids = await get_token_ring_host_ids(manager, servers[3])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[3])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
assert garbage_member == decommissioned_host_id
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[2]} using {servers[3]}')
await manager.remove_node(servers[3].server_id, servers[2].server_id)
await check_token_ring_and_group0_consistency(manager)

View File

@@ -0,0 +1,34 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test consistency of schema changes with server hard stop.
"""
import time
from test.topology.util import wait_for_token_ring_and_group0_consistency
import pytest
@pytest.mark.asyncio
async def test_topology_schema_changes(manager, random_tables):
"""Test schema consistency with restart, add, and sudden stop of servers"""
table = await random_tables.add_table(ncolumns=5)
servers = await manager.running_servers()
# Test add column after server restart
await manager.server_restart(servers[1].server_id)
await table.add_column()
await random_tables.verify_schema()
# Test add column after adding a server
await manager.server_add()
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await table.add_column()
await random_tables.verify_schema()
# Test add column after hard stop of a server (1/3)
await manager.server_stop(servers[1].server_id)
await table.add_column()
await random_tables.verify_schema()

81
test/topology/util.py Normal file
View File

@@ -0,0 +1,81 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test consistency of schema changes with topology changes.
"""
import logging
import pytest
import time
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier
logger = logging.getLogger(__name__)
async def get_token_ring_host_ids(manager: ManagerClient, srv: ServerInfo) -> set[str]:
"""Get the host IDs of normal token owners known by `srv`."""
token_endpoint_map = await manager.api.client.get_json("/storage_service/tokens_endpoint", srv.ip_addr)
normal_endpoints = {e["value"] for e in token_endpoint_map}
logger.info(f"Normal endpoints' IPs by {srv}: {normal_endpoints}")
host_id_map = await manager.api.client.get_json('/storage_service/host_id', srv.ip_addr)
all_host_ids = {e["value"] for e in host_id_map}
logger.info(f"All host IDs by {srv}: {all_host_ids}")
normal_host_ids = {e["value"] for e in host_id_map if e["key"] in normal_endpoints}
logger.info(f"Normal endpoints' host IDs by {srv}: {normal_host_ids}")
return normal_host_ids
async def get_current_group0_config(manager: ManagerClient, srv: ServerInfo) -> set[tuple[str, bool]]:
"""Get the current Raft group 0 configuration known by `srv`.
The first element of each tuple is the Raft ID of the node (which is equal to the Host ID),
the second element indicates whether the node is a voter.
"""
assert manager.cql
host = (await wait_for_cql_and_get_hosts(manager.cql, [srv], time.time() + 60))[0]
await read_barrier(manager.cql, host)
group0_id = (await manager.cql.run_async(
"select value from system.scylla_local where key = 'raft_group0_id'",
host=host))[0].value
config = await manager.cql.run_async(
f"select server_id, can_vote from system.raft_state where group_id = {group0_id} and disposition = 'CURRENT'",
host=host)
result = {(str(m.server_id), bool(m.can_vote)) for m in config}
logger.info(f"Group 0 members by {srv}: {result}")
return result
async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> None:
"""Ensure that the normal token owners and group 0 members match
according to each currently running server.
"""
servers = await manager.running_servers()
for srv in servers:
group0_members = await get_current_group0_config(manager, srv)
group0_ids = {m[0] for m in group0_members}
token_ring_ids = await get_token_ring_host_ids(manager, srv)
assert token_ring_ids == group0_ids
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
"""Weaker version of the above check; the token ring is not immediately updated after
bootstrap/replace/decommission - the normal tokens of the new node propagate through gossip.
Take this into account and wait for the equality condition to hold, with a timeout.
"""
servers = await manager.running_servers()
for srv in servers:
group0_members = await get_current_group0_config(manager, srv)
group0_ids = {m[0] for m in group0_members}
async def token_ring_matches():
token_ring_ids = await get_token_ring_host_ids(manager, srv)
diff = token_ring_ids ^ group0_ids
if diff:
logger.warning(f"Group 0 members and token ring members don't yet match" \
f" according to {srv}, symmetric difference: {diff}")
return None
return True
await wait_for(token_ring_matches, deadline, period=.5)

View File

View File

@@ -0,0 +1,9 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# This file configures pytest for all tests in this directory, and also
# defines common test fixtures for all of them to use
from test.topology.conftest import *

View File

@@ -0,0 +1,9 @@
[pytest]
asyncio_mode = auto
log_cli = true
log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s
log_date_format = %H:%M:%S
markers =
slow: tests that take more than 30 seconds to run

View File

@@ -0,0 +1,6 @@
type: Topology
pool_size: 4
cluster_size: 0
extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer

View File

@@ -0,0 +1,19 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
import pytest
@pytest.mark.asyncio
async def test_custom(request, manager: ManagerClient):
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]
tables = RandomTables(request.node.name, manager, unique_name(), 3)
table = await tables.add_table(ncolumns=5)
await table.insert_seq()
await table.add_index(2)
await tables.verify_schema(table)

View File

@@ -0,0 +1,52 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test functionality on the cluster with different values of the --smp parameter on the nodes.
"""
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
from test.topology.util import wait_for_token_ring_and_group0_consistency
import pytest
from pytest import FixtureRequest
logger = logging.getLogger(__name__)
# Checks basic functionality on the cluster with different values of the --smp parameter on the nodes.
@pytest.mark.asyncio
async def test_nodes_with_different_smp(request: FixtureRequest, manager: ManagerClient) -> None:
# In this test it's more convenient to start with a fresh cluster.
# When the node starts it tries to communicate with others
# by sending group0_peer_exchange message to them.
# This message can be handled on arbitrary shard of the target node.
# The method manager.server_add waits for node to start, which can only happen
# if this message has been handled correctly.
#
# Note: messaging_service is initialized with server_socket::load_balancing_algorithm::port
# policy, this means that the shard for message will be chosen as client_port % smp::count.
# The client port in turn is chosen as rand() * smp::count + current_shard
# (posix_socket_impl::find_port_and_connect).
# If this succeeds to occupy a free port in 5 tries and smp::count is the same
# on both nodes, then it's guaranteed that the message will be
# processed on the same shard as the calling code.
# In the general case, we cannot assume that this same shard guarantee holds.
logger.info(f'Adding --smp=3 server')
await manager.server_add(cmdline=['--smp', '3'])
logger.info(f'Adding --smp=4 server')
await manager.server_add(cmdline=['--smp', '4'])
logger.info(f'Adding --smp=5 server')
await manager.server_add(cmdline=['--smp', '5'])
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
logger.info(f'Creating new tables')
tables = RandomTables(request.node.name, manager, unique_name(), 3)
await tables.add_tables(ntables=4, ncolumns=5)
await tables.verify_schema()

View File

@@ -16,7 +16,7 @@ from cassandra.pool import Host # type: ignore # pylint:
from test.pylib.manager_client import ManagerClient, IPAddress, ServerInfo
from test.pylib.random_tables import RandomTables
from test.pylib.rest_client import ScyllaRESTAPIClient, inject_error
from test.pylib.rest_client import ScyllaRESTAPIClient, inject_error_one_shot
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
@@ -110,6 +110,7 @@ def log_run_time(f):
@pytest.mark.asyncio
@log_run_time
@pytest.mark.replication_factor(1)
async def test_raft_upgrade_basic(manager: ManagerClient, random_tables: RandomTables):
"""
kbr-: the test takes about 7 seconds in dev mode on my laptop.
@@ -144,6 +145,7 @@ async def test_raft_upgrade_basic(manager: ManagerClient, random_tables: RandomT
@pytest.mark.asyncio
@log_run_time
@pytest.mark.replication_factor(1)
async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables: RandomTables):
"""
We enable Raft on every server and the upgrade procedure starts. All servers join group 0. Then one
@@ -163,42 +165,41 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables:
# TODO error injection should probably be done through ScyllaClusterManager (we may need to mark the cluster as dirty).
# In this test the cluster is dirty anyway due to a restart so it's safe.
async with inject_error(manager.api, srv1.ip_addr, 'group0_upgrade_before_synchronize',
one_shot=True):
logging.info(f"Enabling Raft on {others} and restarting")
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in others))
cql = await reconnect_driver(manager)
await inject_error_one_shot(manager.api, srv1.ip_addr, 'group0_upgrade_before_synchronize')
logging.info(f"Enabling Raft on {others} and restarting")
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in others))
cql = await reconnect_driver(manager)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
logging.info(f"Waiting until {hosts} enter 'synchronize' state")
await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts))
logging.info(f"{hosts} entered synchronize")
logging.info(f"Waiting until {hosts} enter 'synchronize' state")
await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts))
logging.info(f"{hosts} entered synchronize")
# TODO ensure that srv1 failed upgrade - look at logs?
# '[shard 0] raft_group0_upgrade - Raft upgrade failed: std::runtime_error (error injection before group 0 upgrade enters synchronize).'
# TODO ensure that srv1 failed upgrade - look at logs?
# '[shard 0] raft_group0_upgrade - Raft upgrade failed: std::runtime_error (error injection before group 0 upgrade enters synchronize).'
logging.info(f"Setting recovery state on {hosts}")
for host in hosts:
await cql.run_async(
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'",
host=host)
logging.info(f"Setting recovery state on {hosts}")
for host in hosts:
await cql.run_async(
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'",
host=host)
logging.info(f"Restarting {others}")
await asyncio.gather(*(restart(manager, srv) for srv in others))
cql = await reconnect_driver(manager)
logging.info(f"Restarting {others}")
await asyncio.gather(*(restart(manager, srv) for srv in others))
cql = await reconnect_driver(manager)
logging.info(f"{others} restarted, waiting until driver reconnects to them")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
logging.info(f"{others} restarted, waiting until driver reconnects to them")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
logging.info(f"Checking if {hosts} are in recovery state")
for host in hosts:
rs = await cql.run_async(
"select value from system.scylla_local where key = 'group0_upgrade_state'",
host=host)
assert rs[0].value == 'recovery'
logging.info(f"Checking if {hosts} are in recovery state")
for host in hosts:
rs = await cql.run_async(
"select value from system.scylla_local where key = 'group0_upgrade_state'",
host=host)
assert rs[0].value == 'recovery'
logging.info("Creating a table while in recovery state")
table = await random_tables.add_table(ncolumns=5)
@@ -229,6 +230,7 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables:
@pytest.mark.asyncio
@log_run_time
@pytest.mark.replication_factor(1)
async def test_recovery_after_majority_loss(manager: ManagerClient, random_tables: RandomTables):
"""
We successfully upgrade a cluster. Eventually however all servers but one fail - group 0