diff --git a/test.py b/test.py index 633aa3c755..0ff4ce0d45 100755 --- a/test.py +++ b/test.py @@ -696,7 +696,7 @@ class CQLApprovalTest(Test): logger.info("Server log:\n%s", self.server_log) # TODO: consider dirty_on_exception=True - async with self.suite.clusters.instance(False, logger) as cluster: + async with (cm := self.suite.clusters.instance(False, logger)) as cluster: try: cluster.before_test(self.uname) logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname) @@ -706,7 +706,8 @@ class CQLApprovalTest(Test): self.is_before_test_ok = True cluster.take_log_savepoint() self.is_executed_ok = await run_test(self, options, env=self.env) - cluster.after_test(self.uname) + cluster.after_test(self.uname, self.is_executed_ok) + cm.dirty = cluster.is_dirty self.is_after_test_ok = True if self.is_executed_ok is False: @@ -860,7 +861,7 @@ class PythonTest(Test): self.is_before_test_ok = True cluster.take_log_savepoint() status = await run_test(self, options) - cluster.after_test(self.uname) + cluster.after_test(self.uname, status) self.is_after_test_ok = True self.success = status except Exception as e: @@ -874,9 +875,7 @@ class PythonTest(Test): print("Test {} post-check failed: {}".format(self.name, str(e))) print("Server log of the first server:\n{}".format(self.server_log)) logger.info(f"Discarding cluster after failed test %s...", self.name) - await self.suite.clusters.put(cluster, is_dirty=True) - else: - await self.suite.clusters.put(cluster, is_dirty=False) + await self.suite.clusters.put(cluster, is_dirty=cluster.is_dirty) logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ") return self diff --git a/test/pylib/internal_types.py b/test/pylib/internal_types.py index c6673dd999..b3e8cba577 100644 --- a/test/pylib/internal_types.py +++ b/test/pylib/internal_types.py @@ -18,6 +18,5 @@ class ServerInfo(NamedTuple): """Server id (test local) and IP address""" server_id: ServerNum ip_addr: IPAddress - host_id: HostID def __str__(self): - return f"Server({self.server_id}, {self.ip_addr}, {self.host_id})" + return f"Server({self.server_id}, {self.ip_addr})" diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index eabd958285..9e90b99a63 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -15,7 +15,7 @@ import logging from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient from test.pylib.util import wait_for from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo -from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module import aiohttp @@ -48,10 +48,11 @@ class ManagerClient(): """Close driver""" self.driver_close() - async def driver_connect(self) -> None: + async def driver_connect(self, server: Optional[ServerInfo] = None) -> None: """Connect to cluster""" if self.con_gen is not None: - servers = [s_info.ip_addr for s_info in await self.running_servers()] + targets = [server] if server else await self.running_servers() + servers = [s_info.ip_addr for s_info in targets] logger.debug("driver connecting to %s", servers) self.ccluster = self.con_gen(servers, self.port, self.use_ssl) self.cql = self.ccluster.connect() @@ -81,15 +82,17 @@ class ManagerClient(): logger.info(f"Using cluster: {cluster_str} for test {test_case_name}") except aiohttp.ClientError as exc: raise RuntimeError(f"Failed before test check {exc}") from exc - if self.cql is None: + servers = await self.running_servers() + if self.cql is None and servers: # TODO: if cluster is not up yet due to taking long and HTTP timeout, wait for it # await self._wait_for_cluster() await self.driver_connect() # Connect driver to new cluster - async def after_test(self, test_case_name: str) -> None: + async def after_test(self, test_case_name: str, success: bool) -> None: """Tell harness this test finished""" - logger.debug("after_test for %s", test_case_name) - await self.client.get(f"/cluster/after-test") + logger.debug("after_test for %s (success: %s)", test_case_name, success) + cluster_str = await self.client.get_text(f"/cluster/after-test/{success}") + logger.info("Cluster after test %s: %s", test_case_name, cluster_str) async def is_manager_up(self) -> bool: """Check if Manager server is up""" @@ -118,7 +121,7 @@ class ManagerClient(): except RuntimeError as exc: raise Exception("Failed to get list of running servers") from exc assert isinstance(server_info_list, list), "running_servers got unknown data type" - return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), HostID(info[2])) + return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1])) for info in server_info_list] async def mark_dirty(self) -> None: @@ -136,37 +139,47 @@ class ManagerClient(): logger.debug("ManagerClient stopping gracefully %s", server_id) await self.client.get_text(f"/cluster/server/{server_id}/stop_gracefully") - async def server_start(self, server_id: ServerNum) -> None: - """Start specified server""" + async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None, + wait_others: int = 0, wait_interval: float = 45) -> None: + """Start specified server and optionally wait for it to learn of other servers""" logger.debug("ManagerClient starting %s", server_id) - await self.client.get_text(f"/cluster/server/{server_id}/start") + params = {'expected_error': expected_error} if expected_error is not None else None + await self.client.get_text(f"/cluster/server/{server_id}/start", params=params) + await self.server_sees_others(server_id, wait_others, interval = wait_interval) self._driver_update() - async def server_restart(self, server_id: ServerNum) -> None: - """Restart specified server""" + async def server_restart(self, server_id: ServerNum, wait_others: int = 0, + wait_interval: float = 45) -> None: + """Restart specified server and optionally wait for it to learn of other servers""" logger.debug("ManagerClient restarting %s", server_id) await self.client.get_text(f"/cluster/server/{server_id}/restart") + await self.server_sees_others(server_id, wait_others, interval = wait_interval) self._driver_update() - async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None) -> ServerInfo: + async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo: """Add a new server""" try: - data: dict[str, Any] = {} + data: dict[str, Any] = {'start': start} if replace_cfg: data['replace_cfg'] = replace_cfg._asdict() if cmdline: data['cmdline'] = cmdline - server_info = await self.client.put_json("/cluster/addserver", data, response_type="json") + if config: + data['config'] = config + server_info = await self.client.put_json("/cluster/addserver", data, response_type="json", + timeout=ScyllaServer.TOPOLOGY_TIMEOUT) except Exception as exc: raise Exception("Failed to add server") from exc try: s_info = ServerInfo(ServerNum(int(server_info["server_id"])), - IPAddress(server_info["ip_addr"]), - HostID(server_info["host_id"])) + IPAddress(server_info["ip_addr"])) except Exception as exc: raise RuntimeError(f"server_add got invalid server data {server_info}") from exc logger.debug("ManagerClient added %s", s_info) - self._driver_update() + if self.cql: + self._driver_update() + else: + await self.driver_connect() return s_info async def remove_node(self, initiator_id: ServerNum, server_id: ServerNum, @@ -174,13 +187,15 @@ class ManagerClient(): """Invoke remove node Scylla REST API for a specified server""" logger.debug("ManagerClient remove node %s on initiator %s", server_id, initiator_id) data = {"server_id": server_id, "ignore_dead": ignore_dead} - await self.client.put_json(f"/cluster/remove-node/{initiator_id}", data) + await self.client.put_json(f"/cluster/remove-node/{initiator_id}", data, + timeout=ScyllaServer.TOPOLOGY_TIMEOUT) self._driver_update() async def decommission_node(self, server_id: ServerNum) -> None: """Tell a node to decommission with Scylla REST API""" logger.debug("ManagerClient decommission %s", server_id) - await self.client.get_text(f"/cluster/decommission-node/{server_id}") + await self.client.get_text(f"/cluster/decommission-node/{server_id}", + timeout=ScyllaServer.TOPOLOGY_TIMEOUT) self._driver_update() async def server_get_config(self, server_id: ServerNum) -> dict[str, object]: @@ -224,3 +239,32 @@ class ManagerClient(): except Exception as exc: raise Exception(f"Failed to get local host id address for server {server_id}") from exc return HostID(host_id) + + async def server_sees_others(self, server_id: ServerNum, count: int, interval: float = 45.): + """Wait till a server sees a minimum given count of other servers""" + if count < 1: + return + server_ip = await self.get_host_ip(server_id) + async def _sees_min_others(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if len(alive_nodes) > count: + return True + await wait_for(_sees_min_others, time() + interval, period=.1) + + async def server_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress, + interval: float = 45.): + """Wait till a server sees another specific server IP as alive""" + async def _sees_another_server(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if other_ip in alive_nodes: + return True + await wait_for(_sees_another_server, time() + interval, period=.1) + + async def server_not_sees_other_server(self, server_ip: IPAddress, other_ip: IPAddress, + interval: float = 45.): + """Wait till a server sees another specific server IP as dead""" + async def _not_sees_another_server(): + alive_nodes = await self.api.get_alive_endpoints(server_ip) + if not other_ip in alive_nodes: + return True + await wait_for(_not_sees_another_server, time() + interval, period=.1) diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py index 7289fcaddf..ecca26c501 100644 --- a/test/pylib/random_tables.py +++ b/test/pylib/random_tables.py @@ -35,10 +35,12 @@ import itertools import logging import random import uuid +import time from typing import Optional, Type, List, Set, Union, TYPE_CHECKING if TYPE_CHECKING: from cassandra.cluster import Session as CassandraSession # type: ignore from test.pylib.manager_client import ManagerClient +from test.pylib.util import get_available_host, read_barrier logger = logging.getLogger('random_tables') @@ -173,10 +175,11 @@ class RandomTable(): return await self.manager.cql.run_async(cql_stmt) async def add_column(self, name: str = None, ctype: Type[ValueType] = None, column: Column = None): + """Add a value column to the table""" if column is not None: assert type(column) is Column, "Wrong column type to add_column" else: - name = name if name is not None else f"c_{self.next_clustering_id():02}" + name = name if name is not None else f"v_{self.next_value_id():02}" ctype = ctype if ctype is not None else TextType column = Column(name, ctype=ctype) self.columns.append(column) @@ -244,7 +247,8 @@ class RandomTable(): class RandomTables(): """A list of managed random tables""" - def __init__(self, test_name: str, manager: ManagerClient, keyspace: str): + def __init__(self, test_name: str, manager: ManagerClient, keyspace: str, + replication_factor: int): self.test_name = test_name self.manager = manager self.keyspace = keyspace @@ -252,7 +256,8 @@ class RandomTables(): self.removed_tables: List[RandomTable] = [] assert self.manager.cql is not None self.manager.cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = " - "{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }") + "{ 'class' : 'NetworkTopologyStrategy', " + f"'replication_factor' : {replication_factor} }}") async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None: """Add random tables to the list. @@ -297,7 +302,7 @@ class RandomTables(): assert self.manager.cql is not None self.manager.cql.execute(f"DROP KEYSPACE {self.keyspace}") - async def verify_schema(self, table: Union[RandomTable, str] = None) -> None: + async def verify_schema(self, table: Union[RandomTable, str] = None, do_read_barrier: bool = True) -> None: """Verify schema of all active managed random tables""" if isinstance(table, RandomTable): tables = {table.name} @@ -315,8 +320,18 @@ class RandomTables(): f"WHERE keyspace_name = '{self.keyspace}'" logger.debug(cql_stmt1) - assert self.manager.cql is not None - res1 = {row.table_name for row in await self.manager.cql.run_async(cql_stmt1)} + + cql = self.manager.cql + assert cql + + host = await get_available_host(cql, time.time() + 60) + if do_read_barrier: + # Issue a read barrier on some node and then keep using that node to do the queries. + # This ensures that the queries return recent data (at least all data committed + # when `verify_schema` was called). + await read_barrier(cql, host) + + res1 = {row.table_name for row in await cql.run_async(cql_stmt1, host=host)} assert not tables - res1, f"Tables {tables - res1} not present" for table_name in tables: @@ -326,8 +341,7 @@ class RandomTables(): cql_stmt2 = f"SELECT column_name, position, kind, type FROM system_schema.columns " \ f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table_name}'" logger.debug(cql_stmt2) - assert self.manager.cql is not None - res2 = {row.column_name: row for row in await self.manager.cql.run_async(cql_stmt2)} + res2 = {row.column_name: row for row in await cql.run_async(cql_stmt2, host=host)} assert res2.keys() == cols.keys(), f"Column names for {table_name} do not match " \ f"expected ({', '.join(cols.keys())}) " \ f"got ({', '.join(res2.keys())})" diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index c2ea2a5133..e6b5ffec62 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -94,15 +94,15 @@ class RESTClient(metaclass=ABCMeta): async def post(self, resource_uri: str, host: Optional[str] = None, port: Optional[int] = None, params: Optional[Mapping[str, str]] = None, - json: Mapping = None) -> None: + json: Mapping = None, timeout: Optional[float] = None) -> None: await self._fetch("POST", resource_uri, host = host, port = port, params = params, - json = json) + json = json, timeout = timeout) async def put_json(self, resource_uri: str, data: Mapping, host: Optional[str] = None, port: Optional[int] = None, params: Optional[dict[str, str]] = None, - response_type: Optional[str] = None) -> Any: + response_type: Optional[str] = None, timeout: Optional[float] = None) -> Any: ret = await self._fetch("PUT", resource_uri, response_type = response_type, host = host, - port = port, params = params, json = data) + port = port, params = params, json = data, timeout = timeout) return ret async def delete(self, resource_uri: str, host: Optional[str] = None, @@ -161,19 +161,20 @@ class ScyllaRESTAPIClient(): return result async def remove_node(self, initiator_ip: IPAddress, host_id: HostID, - ignore_dead: list[IPAddress]) -> None: + ignore_dead: list[IPAddress], timeout: float) -> None: """Initiate remove node of host_id in initiator initiator_ip""" logger.info("remove_node for %s on %s", host_id, initiator_ip) await self.client.post("/storage_service/remove_node", params = {"host_id": host_id, "ignore_nodes": ",".join(ignore_dead)}, - host = initiator_ip) + host = initiator_ip, timeout = timeout) logger.debug("remove_node for %s finished", host_id) - async def decommission_node(self, host_ip: str) -> None: + async def decommission_node(self, host_ip: str, timeout: float) -> None: """Initiate decommission node of host_ip""" logger.debug("decommission_node %s", host_ip) - await self.client.post("/storage_service/decommission", host = host_ip) + await self.client.post("/storage_service/decommission", host = host_ip, + timeout = timeout) logger.debug("decommission_node %s finished", host_ip) async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> int: @@ -189,6 +190,12 @@ class ScyllaRESTAPIClient(): assert(type(data) == list) return data + async def get_alive_endpoints(self, node_ip: str) -> list: + """Get the list of alive nodes according to `node_ip`.""" + data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip) + assert(type(data) == list) + return data + async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None: """Enable error injection named `injection` on `node_ip`. Depending on `one_shot`, the injection will be executed only once or every time the process passes the injection point. @@ -217,12 +224,13 @@ class ScyllaRESTAPIClient(): @asynccontextmanager -async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str, - one_shot: bool): +async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str): """Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize. It will trigger a test to be skipped if attempting to enable an injection has no effect. + This is a context manager for enabling and disabling when done, therefore it can't be + used for one shot. """ - await api.enable_injection(node_ip, injection, one_shot) + await api.enable_injection(node_ip, injection, False) enabled = await api.get_enabled_injections(node_ip) logging.info(f"Error injections enabled on {node_ip}: {enabled}") if not enabled: @@ -232,3 +240,15 @@ async def inject_error(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: finally: logger.info(f"Disabling error injection {injection}") await api.disable_injection(node_ip, injection) + + +async def inject_error_one_shot(api: ScyllaRESTAPIClient, node_ip: IPAddress, injection: str): + """Attempts to inject an error. Works only in specific build modes: debug,dev,sanitize. + It will trigger a test to be skipped if attempting to enable an injection has no effect. + This is a one-shot injection enable. + """ + await api.enable_injection(node_ip, injection, True) + enabled = await api.get_enabled_injections(node_ip) + logging.info(f"Error injections enabled on {node_ip}: {enabled}") + if not enabled: + pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode") diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index b63166d7f3..93f40709db 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -25,7 +25,7 @@ from io import BufferedWriter from test.pylib.host_registry import Host, HostRegistry from test.pylib.pool import Pool from test.pylib.rest_client import ScyllaRESTAPIClient, HTTPError -from test.pylib.util import LogPrefixAdapter +from test.pylib.util import LogPrefixAdapter, read_last_line from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo import aiohttp import aiohttp.web @@ -99,6 +99,8 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str 'permissions_update_interval_in_ms': 100, 'permissions_validity_in_ms': 100, + + 'force_schema_commit_log': True, } # Seastar options can not be passed through scylla.yaml, use command line @@ -185,7 +187,9 @@ class ScyllaServer: """Starts and handles a single Scylla server, managing logs, checking if responsive, and cleanup when finished.""" # pylint: disable=too-many-instance-attributes - START_TIMEOUT = 300 # seconds + + # in seconds, used for topology operations such as bootstrap or decommission + TOPOLOGY_TIMEOUT = 1000 start_time: float sleep_interval: float log_file: BufferedWriter @@ -224,11 +228,7 @@ class ScyllaServer: async def install_and_start(self, api: ScyllaRESTAPIClient) -> None: """Setup and start this server""" - try: - await self.install() - except: - await self.uninstall() - raise + await self.install() self.logger.info("starting server at host %s in %s...", self.ip_addr, self.workdir.name) @@ -263,11 +263,19 @@ class ScyllaServer: # Cleanup any remains of the previously running server in this path shutil.rmtree(self.workdir, ignore_errors=True) - self.workdir.mkdir(parents=True, exist_ok=True) - self.config_filename.parent.mkdir(parents=True, exist_ok=True) - self._write_config_file() + try: + self.workdir.mkdir(parents=True, exist_ok=True) + self.config_filename.parent.mkdir(parents=True, exist_ok=True) + self._write_config_file() - self.log_file = self.log_filename.open("wb") + self.log_file = self.log_filename.open("wb") + except: + try: + shutil.rmtree(self.workdir) + except FileNotFoundError: + pass + self.log_filename.unlink(missing_ok=True) + raise def get_config(self) -> dict[str, object]: """Return the contents of conf/scylla.yaml as a dict.""" @@ -317,7 +325,7 @@ class ScyllaServer: # initializing. When the role is ready, queries begin to # work, so rely on this "side effect". profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.ip_addr]), - request_timeout=self.START_TIMEOUT) + request_timeout=self.TOPOLOGY_TIMEOUT) connected = False try: # In a cluster setup, it's possible that the CQL @@ -358,7 +366,7 @@ class ScyllaServer: return False # Any other exception may indicate a problem, and is passed to the caller. - async def start(self, api: ScyllaRESTAPIClient) -> None: + async def start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None) -> None: """Start an installed server. May be used for restarts.""" env = os.environ.copy() @@ -377,43 +385,45 @@ class ScyllaServer: sleep_interval = 0.1 cql_up_state = CqlUpState.NOT_CONNECTED - while time.time() < self.start_time + self.START_TIMEOUT: + def report_error(message: str): + message += f", server_id {self.server_id}, IP {self.ip_addr}, workdir {self.workdir.name}" + message += f", host_id {self.host_id if hasattr(self, 'host_id') else ''}" + message += f", cql [{'connected' if cql_up_state == CqlUpState.CONNECTED else 'not connected'}]" + if expected_error is not None: + message += f", the node log was expected to contain the string [{expected_error}]" + self.logger.error(message) + self.logger.error("last line of %s:\n%s", self.log_filename, read_last_line(self.log_filename)) + log_handler = logging.getLogger().handlers[0] + if hasattr(log_handler, 'baseFilename'): + logpath = log_handler.baseFilename # type: ignore + else: + logpath = "?" + raise RuntimeError(message + "\nCheck the log files:\n" + f"{logpath}\n" + f"{self.log_filename}") + + while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT: if self.cmd.returncode: - with self.log_filename.open('r') as log_file: - self.logger.error("failed to start server at host %s in %s", - self.ip_addr, self.workdir.name) - self.logger.error("last line of %s:", self.log_filename) - log_file.seek(0, 0) - self.logger.error(log_file.readlines()[-1].rstrip()) - log_handler = logging.getLogger().handlers[0] - if hasattr(log_handler, 'baseFilename'): - logpath = log_handler.baseFilename # type: ignore - else: - logpath = "?" - raise RuntimeError(f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}.\n" - "Check the log files:\n" - f"{logpath}\n" - f"{self.log_filename}") + self.cmd = None + if expected_error is not None: + with self.log_filename.open('r') as log_file: + for line in log_file: + if expected_error in line: + return + report_error("the node startup failed, but the log file doesn't contain the expected error") + report_error("failed to start the node") if hasattr(self, "host_id") or await self.get_host_id(api): cql_up_state = await self.cql_is_up() if cql_up_state == CqlUpState.QUERIED: + if expected_error is not None: + report_error("the node started, but was expected to fail with the expected error") return # Sleep and retry await asyncio.sleep(sleep_interval) - err = f"Failed to start server with ID = {self.server_id}, IP = {self.ip_addr}." - if hasattr(self, "host_id"): - err += f" Managed to obtain the server's Host ID ({self.host_id})" - if cql_up_state == CqlUpState.CONNECTED: - err += " and to connect the CQL driver, but failed to execute a query." - else: - err += " but failed to connect the CQL driver." - else: - err += " Failed to obtain the server's Host ID." - err += f"\nCheck server log at {self.log_filename}." - raise RuntimeError(err) + report_error('failed to start the node, timeout reached') async def force_schema_migration(self) -> None: """This is a hack to change schema hash on an existing cluster node @@ -422,7 +432,7 @@ class ScyllaServer: previous state propagation was missed.""" auth = PlainTextAuthProvider(username='cassandra', password='cassandra') profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy(self.seeds), - request_timeout=self.START_TIMEOUT) + request_timeout=self.TOPOLOGY_TIMEOUT) with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, contact_points=self.seeds, auth_provider=auth, @@ -566,7 +576,8 @@ class ScyllaCluster: try: for _ in range(self.replicas): await self.add_server() - self.keyspace_count = self._get_keyspace_count() + if self.replicas > 0: + self.keyspace_count = self._get_keyspace_count() except Exception as exc: # If start fails, swallow the error to throw later, # at test time. @@ -578,7 +589,7 @@ class ScyllaCluster: async def uninstall(self) -> None: """Stop running servers and uninstall all servers""" self.is_dirty = True - self.logger.info("Uninstalling cluster") + self.logger.info("Uninstalling cluster %s", self) await self.stop() await asyncio.gather(*(srv.uninstall() for srv in self.stopped.values())) await asyncio.gather(*(self.host_registry.release_host(Host(ip)) @@ -588,6 +599,7 @@ class ScyllaCluster: """Release all IPs leased from the host registry by this cluster. Call this function only if the cluster is stopped and will not be started again.""" assert not self.running + self.logger.info("Cluster %s releases ips %s", self, self.leased_ips) while self.leased_ips: ip = self.leased_ips.pop() await self.host_registry.release_host(Host(ip)) @@ -617,11 +629,11 @@ class ScyllaCluster: def _seeds(self) -> List[str]: return [server.ip_addr for server in self.running.values()] - async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None) -> ServerInfo: + async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo: """Add a new server to the cluster""" self.is_dirty = True - extra_config: dict[str, str] = {} + extra_config: dict[str, str] = config.copy() if config else {} if replace_cfg: replaced_id = replace_cfg.replaced_id assert replaced_id in self.servers, \ @@ -662,7 +674,10 @@ class ScyllaCluster: try: server = self.create_server(params) self.logger.info("Cluster %s adding server...", self) - await server.install_and_start(self.api) + if start: + await server.install_and_start(self.api) + else: + await server.install() except Exception as exc: self.logger.error("Failed to start Scylla server at host %s in %s: %s", ip_addr, server.workdir.name, str(exc)) @@ -670,9 +685,12 @@ class ScyllaCluster: self.leased_ips.remove(ip_addr) await self.host_registry.release_host(Host(ip_addr)) raise - self.running[server.server_id] = server + if start: + self.running[server.server_id] = server + else: + self.stopped[server.server_id] = server self.logger.info("Cluster %s added %s", self, server) - return ServerInfo(server.server_id, server.ip_addr, server.host_id) + return ServerInfo(server.server_id, server.ip_addr) def endpoint(self) -> str: """Get a server id (IP) from running servers""" @@ -704,9 +722,9 @@ class ScyllaCluster: stopped = ", ".join(str(server) for server in self.stopped.values()) return f"ScyllaCluster(name: {self.name}, running: {running}, stopped: {stopped})" - def running_servers(self) -> List[Tuple[ServerNum, IPAddress, HostID]]: + def running_servers(self) -> List[Tuple[ServerNum, IPAddress]]: """Get a list of tuples of server id and IP address of running servers (and not removed)""" - return [(server.server_id, server.ip_addr, server.host_id) for server in self.running.values() + return [(server.server_id, server.ip_addr) for server in self.running.values() if server.server_id not in self.removed] def _get_keyspace_count(self) -> int: @@ -730,18 +748,26 @@ class ScyllaCluster: if self.start_exception: # Mark as dirty so further test cases don't try to reuse this cluster. self.is_dirty = True - raise self.start_exception + raise Exception(f'Exception when starting cluster {self}:\n{self.start_exception}') for server in self.running.values(): server.write_log_marker(f"------ Starting test {name} ------\n") - def after_test(self, name) -> None: - """Check that the cluster is still alive and the test + def after_test(self, name: str, success: bool) -> None: + """Mark the cluster as dirty after a failed test. + If the cluster is not dirty, check that it's still alive and the test hasn't left any garbage.""" assert self.start_exception is None - if self._get_keyspace_count() != self.keyspace_count: - raise RuntimeError("Test post-condition failed, " - "the test must drop all keyspaces it creates.") + if not success: + self.logger.debug(f"Test failed using cluster {self.name}, marking the cluster as dirty") + self.is_dirty = True + if self.is_dirty: + self.logger.info(f"The cluster {self.name} is dirty, not checking" + f" keyspace count post-condition") + else: + if self.running and self._get_keyspace_count() != self.keyspace_count: + raise RuntimeError(f"Test post-condition on cluster {self.name} failed, " + f"the test must drop all keyspaces it creates.") for server in itertools.chain(self.running.values(), self.stopped.values()): server.write_log_marker(f"------ Ending test {name} ------\n") @@ -770,9 +796,8 @@ class ScyllaCluster: self.logger.debug("Cluster %s marking server %s as removed", self, server_id) self.removed.add(server_id) - async def server_start(self, server_id: ServerNum) -> ActionReturn: + async def server_start(self, server_id: ServerNum, expected_error: Optional[str] = None) -> ActionReturn: """Start a stopped server""" - self.logger.info("Cluster %s starting server %s", self, server_id) if server_id in self.running: return ScyllaCluster.ActionReturn(success=True, msg=f"{self.running[server_id]} already running") @@ -780,11 +805,16 @@ class ScyllaCluster: return ScyllaCluster.ActionReturn(success=False, msg=f"Server {server_id} unknown") self.is_dirty = True server = self.stopped.pop(server_id) + self.logger.info("Cluster %s starting server %s ip %s", self, + server_id, server.ip_addr) server.seeds = self._seeds() # Put the server in `running` before starting it. # Starting may fail and if we didn't add it now it might leak. self.running[server_id] = server - await server.start(self.api) + await server.start(self.api, expected_error) + if expected_error is not None: + self.running.pop(server_id) + self.stopped[server_id] = server return ScyllaCluster.ActionReturn(success=True, msg=f"{server} started") async def server_restart(self, server_id: ServerNum) -> ActionReturn: @@ -930,7 +960,7 @@ class ScyllaClusterManager: add_get('/cluster/host-ip/{server_id}', self._cluster_server_ip_addr) add_get('/cluster/host-id/{server_id}', self._cluster_host_id) add_get('/cluster/before-test/{test_case_name}', self._before_test_req) - add_get('/cluster/after-test', self._after_test) + add_get('/cluster/after-test/{success}', self._after_test) add_get('/cluster/mark-dirty', self._mark_dirty) add_get('/cluster/server/{server_id}/stop', self._cluster_server_stop) add_get('/cluster/server/{server_id}/stop_gracefully', self._cluster_server_stop_gracefully) @@ -982,13 +1012,16 @@ class ScyllaClusterManager: async def _after_test(self, _request) -> aiohttp.web.Response: assert self.cluster is not None assert self.current_test_case_full_name - self.logger.info("Finished test %s, cluster: %s", self.current_test_case_full_name, self.cluster) + success = _request.match_info["success"] == "True" + self.logger.info("Test %s %s, cluster: %s", self.current_test_case_full_name, + "SUCCEEDED" if success else "FAILED", self.cluster) try: - self.cluster.after_test(self.current_test_case_full_name) + self.cluster.after_test(self.current_test_case_full_name, success) finally: self.current_test_case_full_name = '' self.is_after_test_ok = True - return aiohttp.web.Response(text="True") + cluster_str = str(self.cluster) + return aiohttp.web.Response(text=cluster_str) async def _mark_dirty(self, _request) -> aiohttp.web.Response: """Mark current cluster dirty""" @@ -1018,7 +1051,8 @@ class ScyllaClusterManager: """Start a specified server (must be stopped)""" assert self.cluster server_id = ServerNum(int(request.match_info["server_id"])) - ret = await self.cluster.server_start(server_id) + expected_error = request.query.get("expected_error") + ret = await self.cluster.server_start(server_id, expected_error) return aiohttp.web.Response(status=200 if ret[0] else 500, text=ret[1]) async def _cluster_server_restart(self, request) -> aiohttp.web.Response: @@ -1033,10 +1067,9 @@ class ScyllaClusterManager: assert self.cluster data = await request.json() replace_cfg = ReplaceConfig(**data["replace_cfg"]) if "replace_cfg" in data else None - s_info = await self.cluster.add_server(replace_cfg, data.get('cmdline')) + s_info = await self.cluster.add_server(replace_cfg, data.get('cmdline'), data.get('config'), data.get('start', True)) return aiohttp.web.json_response({"server_id" : s_info.server_id, - "ip_addr": s_info.ip_addr, - "host_id": s_info.host_id}) + "ip_addr": s_info.ip_addr}) async def _cluster_remove_node(self, request: aiohttp.web.Request) -> aiohttp.web.Response: """Run remove node on Scylla REST API for a specified server""" @@ -1060,7 +1093,8 @@ class ScyllaClusterManager: # initate remove try: - await self.cluster.api.remove_node(initiator.ip_addr, to_remove.host_id, ignore_dead) + await self.cluster.api.remove_node(initiator.ip_addr, to_remove.host_id, ignore_dead, + timeout=ScyllaServer.TOPOLOGY_TIMEOUT) except RuntimeError as exc: self.logger.error("_cluster_remove_node failed initiator %s server %s ignore_dead %s, check log at %s", initiator, to_remove, ignore_dead, initiator.log_filename) @@ -1079,7 +1113,7 @@ class ScyllaClusterManager: self.logger.warning("_cluster_decommission_node %s is only running node left", server_id) server = self.cluster.running[server_id] try: - await self.cluster.api.decommission_node(server.ip_addr) + await self.cluster.api.decommission_node(server.ip_addr, timeout=ScyllaServer.TOPOLOGY_TIMEOUT) except RuntimeError as exc: self.logger.error("_cluster_decommission_node %s, check log at %s", server, server.log_filename) diff --git a/test/pylib/util.py b/test/pylib/util.py index a426218e8d..227bf69b62 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -6,11 +6,15 @@ import time import asyncio import logging +import pathlib +import os +import pytest from typing import Callable, Awaitable, Optional, TypeVar, Generic -from cassandra.cluster import NoHostAvailable, Session # type: ignore # pylint: disable=no-name-in-module -from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module +from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module +from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module +from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from test.pylib.internal_types import ServerInfo @@ -76,5 +80,45 @@ async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], de return hosts +def read_last_line(file_path: pathlib.Path, max_line_bytes = 512): + file_size = os.stat(file_path).st_size + with file_path.open('rb') as f: + f.seek(max(0, file_size - max_line_bytes), os.SEEK_SET) + line_bytes = f.read() + line_str = line_bytes.decode('utf-8', errors='ignore') + linesep = os.linesep + if line_str.endswith(linesep): + line_str = line_str[:-len(linesep)] + linesep_index = line_str.rfind(linesep) + if linesep_index != -1: + line_str = line_str[linesep_index + len(linesep):] + elif file_size > max_line_bytes: + line_str = '...' + line_str + return line_str + + +async def get_available_host(cql: Session, deadline: float) -> Host: + hosts = cql.cluster.metadata.all_hosts() + async def find_host(): + for h in hosts: + try: + await cql.run_async( + "select key from system.local where key = 'local'", host=h) + except NoHostAvailable: + logging.debug(f"get_available_host: {h} not available") + continue + return h + return None + return await wait_for(find_host, deadline) + + +async def read_barrier(cql: Session, host: Host): + """To issue a read barrier it is sufficient to attempt dropping a + non-existing table. We need to use `if exists`, otherwise the statement + would fail on prepare/validate step which happens before a read barrier is + performed. + """ + await cql.run_async("drop table if exists nosuchkeyspace.nosuchtable", host = host) + unique_name.last_ms = 0 diff --git a/test/pylib_test/__init__.py b/test/pylib_test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/pylib_test/pytest.ini b/test/pylib_test/pytest.ini new file mode 100644 index 0000000000..1b586bd36f --- /dev/null +++ b/test/pylib_test/pytest.ini @@ -0,0 +1,9 @@ +# Pytest configuration file. If we don't have one in this directory, +# pytest will look for one in our ancestor directories, and may find +# something irrelevant. So we should have one here, even if empty. +[pytest] +asyncio_mode = auto + +log_cli = true +log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s +log_date_format = %H:%M:%S diff --git a/test/pylib_test/run b/test/pylib_test/run new file mode 100755 index 0000000000..0b4ac6762a --- /dev/null +++ b/test/pylib_test/run @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +# Use the run.py library from ../cql-pytest: +import sys +sys.path.insert(1, sys.path[0] + '/../cql-pytest') +import run + +success = run.run_pytest(sys.path[0], sys.argv[1:]) + +run.summary = 'Pylib tests pass' if success else 'Pylib tests failure' + +exit(0 if success else 1) diff --git a/test/pylib_test/suite.yaml b/test/pylib_test/suite.yaml new file mode 100644 index 0000000000..8919604e66 --- /dev/null +++ b/test/pylib_test/suite.yaml @@ -0,0 +1 @@ +type: Run diff --git a/test/pylib_test/test_util.py b/test/pylib_test/test_util.py new file mode 100644 index 0000000000..8c14186f01 --- /dev/null +++ b/test/pylib_test/test_util.py @@ -0,0 +1,28 @@ +import os +import tempfile +import pathlib +from test.pylib.util import read_last_line + +def test_read_last_line(): + test_cases = [ + (b"This is the first line.\nThis is the second line.\nThis is the third line.", 'This is the third line.'), + (b"This is another file.\nIt has a few lines.\nThe last line is what we're interested in.", 'The last line is what we\'re interested in.'), + (b"This file has only one line.", 'This file has only one line.'), + (b"\n", ""), + (b"\n\n\n", ""), + (b"", ""), + (b"abc\n", 'abc'), + (b"abc", '...bc', 2), + (b"lalala\nbububu", "bububu"), + (b"line1\nline2\nline3\n", "...line3", 6), + (b"line1\nline2\nline3", "line3", 6), + (b"line1\nline2\nline3\n", "line3", 7), + (b"\xbe\xbe\xbe\xbebububu\n", "bububu") + ] + for test_case in test_cases: + with tempfile.NamedTemporaryFile(dir=os.getenv('TMPDIR', '/tmp')) as f: + f.write(test_case[0]) + f.flush() + file_path = pathlib.Path(f.name) + actual = read_last_line(file_path, test_case[2]) if len(test_case) == 3 else read_last_line(file_path) + assert(actual == test_case[1]) diff --git a/test/topology/conftest.py b/test/topology/conftest.py index 82a7627e96..4cdd776e8e 100644 --- a/test/topology/conftest.py +++ b/test/topology/conftest.py @@ -18,6 +18,8 @@ from cassandra.cluster import Session, ResponseFuture # type: from cassandra.cluster import Cluster, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module from cassandra.policies import RoundRobinPolicy # type: ignore +from cassandra.policies import TokenAwarePolicy # type: ignore +from cassandra.policies import WhiteListRoundRobinPolicy # type: ignore from cassandra.connection import DRIVER_NAME # type: ignore # pylint: disable=no-name-in-module from cassandra.connection import DRIVER_VERSION # type: ignore # pylint: disable=no-name-in-module @@ -37,6 +39,26 @@ def pytest_addoption(parser): parser.addoption('--ssl', action='store_true', help='Connect to CQL via an encrypted TLSv1.2 connection') + +# This is a constant used in `pytest_runtest_makereport` below to store a flag +# indicating test failure in a stash which can then be accessed from fixtures. +FAILED_KEY = pytest.StashKey[bool]() + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + """This is a post-test hook execucted by the pytest library. + Use it to access the test result and store a flag indicating failure + so we can later retrieve it in our fixtures like `manager`. + + `item.stash` is the same stash as `request.node.stash` (in the `request` + fixture provided by pytest). + """ + outcome = yield + report = outcome.get_result() + item.stash[FAILED_KEY] = report.when == "call" and report.failed + + # Change default pytest-asyncio event_loop fixture scope to session to # allow async fixtures with scope larger than function. (e.g. manager fixture) # See https://github.com/pytest-dev/pytest-asyncio/issues/68 @@ -105,6 +127,11 @@ def cluster_con(hosts: List[IPAddress], port: int, use_ssl: bool): # See issue #11289. # NOTE: request_timeout is the main cause of timeouts, even if logs say heartbeat request_timeout=200) + whitelist_profile = ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy(hosts)), + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL, + request_timeout=200) if use_ssl: # Scylla does not support any earlier TLS protocol. If you try, # you will get mysterious EOF errors (see issue #6971) :-( @@ -112,7 +139,7 @@ def cluster_con(hosts: List[IPAddress], port: int, use_ssl: bool): else: ssl_context = None - return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + return Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile, 'whitelist': whitelist_profile}, contact_points=hosts, port=port, # TODO: make the protocol version an option, to allow testing with @@ -159,7 +186,10 @@ async def manager(request, manager_internal): test_case_name = request.node.name await manager_internal.before_test(test_case_name) yield manager_internal - await manager_internal.after_test(test_case_name) + # `request.node.stash` contains a flag stored in `pytest_runtest_makereport` + # that indicates test failure. + failed = request.node.stash[FAILED_KEY] + await manager_internal.after_test(test_case_name, not failed) # "cql" fixture: set up client object for communicating with the CQL API. # Since connection is managed by manager just return that object @@ -192,9 +222,21 @@ def fails_without_consistent_cluster_management(request, check_pre_consistent_cl # "random_tables" fixture: Creates and returns a temporary RandomTables object -# used in tests to make schema changes. Tables are dropped after finished. +# used in tests to make schema changes. Tables are dropped after test finishes +# unless the cluster is dirty or the test has failed. @pytest.fixture(scope="function") -def random_tables(request, manager): - tables = RandomTables(request.node.name, manager, unique_name()) +async def random_tables(request, manager): + rf_marker = request.node.get_closest_marker("replication_factor") + replication_factor = rf_marker.args[0] if rf_marker is not None else 3 # Default 3 + tables = RandomTables(request.node.name, manager, unique_name(), replication_factor) yield tables - tables.drop_all() + + # Don't drop tables at the end if we failed or the cluster is dirty - it may be impossible + # (e.g. the cluster is completely dead) and it doesn't matter (we won't reuse the cluster + # anyway). + # The cluster will be marked as dirty if the test failed, but that happens + # at the end of `manager` fixture which we depend on (so these steps will be + # executed after us) - so at this point, we need to check for failure ourselves too. + failed = request.node.stash[FAILED_KEY] + if not failed and not await manager.is_dirty(): + tables.drop_all() diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py deleted file mode 100644 index e6b711f836..0000000000 --- a/test/topology/test_topology.py +++ /dev/null @@ -1,374 +0,0 @@ -# -# Copyright (C) 2022-present ScyllaDB -# -# SPDX-License-Identifier: AGPL-3.0-or-later -# -""" -Test consistency of schema changes with topology changes. -""" -import pytest -import logging -import asyncio -import random -import time - -from test.pylib.util import wait_for, wait_for_cql_and_get_hosts - -from test.pylib.internal_types import ServerInfo -from test.pylib.scylla_cluster import ReplaceConfig -from test.pylib.manager_client import ManagerClient -from cassandra.cluster import Session -from test.pylib.random_tables import RandomTables -from test.pylib.rest_client import inject_error - -logger = logging.getLogger(__name__) - - -async def get_token_ring_host_ids(manager: ManagerClient, srv: ServerInfo) -> set[str]: - """Get the host IDs of token ring members known by `srv`.""" - host_id_map = await manager.api.client.get_json('/storage_service/host_id', srv.ip_addr) - return {e['value'] for e in host_id_map} - - -async def get_current_group0_config(manager: ManagerClient, srv: ServerInfo) -> set[tuple[str, bool]]: - """Get the current Raft group 0 configuration known by `srv`. - The first element of each tuple is the Raft ID of the node (which is equal to the Host ID), - the second element indicates whether the node is a voter. - """ - assert(manager.cql) - host = (await wait_for_cql_and_get_hosts(manager.cql, [srv], time.time() + 60))[0] - group0_id = (await manager.cql.run_async( - "select value from system.scylla_local where key = 'raft_group0_id'", - host=host))[0].value - config = await manager.cql.run_async( - f"select server_id, can_vote from system.raft_state where group_id = {group0_id} and disposition = 'CURRENT'", - host=host) - return {(str(m.server_id), bool(m.can_vote)) for m in config} - - -@pytest.mark.asyncio -async def test_add_server_add_column(manager, random_tables): - """Add a node and then add a column to a table and verify""" - table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await table.add_column() - await random_tables.verify_schema() - - -@pytest.mark.asyncio -async def test_stop_server_add_column(manager, random_tables): - """Add a node, stop an original node, add a column""" - servers = await manager.running_servers() - table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await manager.server_stop(servers[1].server_id) - await table.add_column() - await random_tables.verify_schema() - - -@pytest.mark.asyncio -async def test_restart_server_add_column(manager, random_tables): - """Add a node, stop an original node, add a column""" - servers = await manager.running_servers() - table = await random_tables.add_table(ncolumns=5) - ret = await manager.server_restart(servers[1].server_id) - await table.add_column() - await random_tables.verify_schema() - - -@pytest.mark.asyncio -async def test_remove_node_add_column(manager, random_tables): - """Add a node, remove an original node, add a column""" - servers = await manager.running_servers() - table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await manager.server_stop_gracefully(servers[1].server_id) # stop [1] - await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1] - await table.add_column() - await random_tables.verify_schema() - # TODO: check that group 0 no longer contains the removed node (#12153) - - -@pytest.mark.asyncio -async def test_decommission_node_add_column(manager, random_tables): - """Add a node, remove an original node, add a column""" - table = await random_tables.add_table(ncolumns=5) - servers = await manager.running_servers() - decommission_target = servers[1] - # The sleep injections significantly increase the probability of reproducing #11780: - # 1. bootstrapped_server finishes bootstrapping and enters NORMAL state - # 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server), - # enters sleep before calling storage_service::notify_joined - # 3. we start decommission on decommission_target - # 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server - # 5. bootstrapped_server receives the RPC and enters sleep - # 6. decommission_target handle_state_normal wakes up, - # calls storage_service::notify_joined which drops some RPC clients - # 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail - await manager.api.enable_injection( - decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True) - bootstrapped_server = await manager.server_add() - async def no_joining_nodes(): - joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr) - return not joining_nodes - # Wait until decommission_target thinks that bootstrapped_server is NORMAL - # note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal - await wait_for(no_joining_nodes, time.time() + 30, period=.1) - await manager.api.enable_injection( - bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True) - await manager.decommission_node(decommission_target.server_id) - await table.add_column() - await random_tables.verify_schema() - # TODO: check that group 0 no longer contains the decommissioned node (#12153) - -@pytest.mark.asyncio -async def test_replace_different_ip(manager: ManagerClient, random_tables) -> None: - servers = await manager.running_servers() - await manager.server_stop(servers[0].server_id) - replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False) - await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node (#12153) - -@pytest.mark.asyncio -async def test_replace_different_ip_using_host_id(manager: ManagerClient, random_tables) -> None: - servers = await manager.running_servers() - await manager.server_stop(servers[0].server_id) - replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True) - await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node (#12153) - -@pytest.mark.asyncio -async def test_replace_reuse_ip(manager: ManagerClient, random_tables) -> None: - servers = await manager.running_servers() - await manager.server_stop(servers[0].server_id) - replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False) - await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node (#12153) - -@pytest.mark.asyncio -async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, random_tables) -> None: - servers = await manager.running_servers() - await manager.server_stop(servers[0].server_id) - replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True) - await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node (#12153) - - -# Checks basic functionality on the cluster with different values of the --smp parameter on the nodes. -@pytest.mark.asyncio -async def test_nodes_with_different_smp(manager: ManagerClient, random_tables: RandomTables) -> None: - # In this test it's more convenient to start with a fresh cluster. - # We don't need the default nodes, - # but there is currently no way to express this in the test infrastructure - - # When the node starts it tries to communicate with others - # by sending group0_peer_exchange message to them. - # This message can be handled on arbitrary shard of the target node. - # The method manager.server_add waits for node to start, which can only happen - # if this message has been handled correctly. - # - # Note: messaging_service is initialized with server_socket::load_balancing_algorithm::port - # policy, this means that the shard for message will be chosen as client_port % smp::count. - # The client port in turn is chosen as rand() * smp::count + current_shard - # (posix_socket_impl::find_port_and_connect). - # If this succeeds to occupy a free port in 5 tries and smp::count is the same - # on both nodes, then it's guaranteed that the message will be - # processed on the same shard as the calling code. - # In the general case, we cannot assume that this same shard guarantee holds. - logger.info(f'Adding --smp=3 server') - await manager.server_add(cmdline=['--smp', '3']) - - # Remove the original 3 servers, the problem is easier to reproduce with --smp values - # that we pick, not the (currently) default --smp=2 coming from the suite. - logger.info(f'Decommissioning old servers') - servers = await manager.running_servers() - for s in servers[:-1]: - await manager.decommission_node(s.server_id) - - logger.info(f'Adding --smp=4 server') - await manager.server_add(cmdline=['--smp', '4']) - logger.info(f'Adding --smp=5 server') - await manager.server_add(cmdline=['--smp', '5']) - - logger.info(f'Creating new tables') - await random_tables.add_tables(ntables=4, ncolumns=5) - await random_tables.verify_schema() - - -@pytest.mark.asyncio -async def test_remove_garbage_group0_members(manager: ManagerClient, random_tables): - """ - Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission - can be handled by executing removenode (which should clear the 'garbage' group 0 member), - even though the node is no longer a token ring member. - """ - # 4 servers, one dead - await manager.server_add() - servers = await manager.running_servers() - removed_host_id = await manager.get_host_id(servers[0].server_id) - await manager.server_stop_gracefully(servers[0].server_id) - - logging.info(f'removenode {servers[0]} using {servers[1]}') - # removenode will fail after removing the server from the token ring, - # but before removing it from group 0 - async with inject_error(manager.api, servers[1].ip_addr, - 'removenode_fail_before_remove_from_group0', one_shot=True): - try: - await manager.remove_node(servers[1].server_id, servers[0].server_id) - except Exception: - # Note: the exception returned here is only '500 internal server error', - # need to look in test.py log for the actual message coming from Scylla. - logging.info(f'expected exception during injection') - - # Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs - # (internally, this endpoint uses token_metadata) - token_ring_ids = await get_token_ring_host_ids(manager, servers[1]) - logging.info(f'token ring members: {token_ring_ids}') - - group0_members = await get_current_group0_config(manager, servers[1]) - logging.info(f'group 0 members: {group0_members}') - group0_ids = {m[0] for m in group0_members} - - # Token ring members should currently be a subset of group 0 members - assert token_ring_ids <= group0_ids - - garbage_members = group0_ids - token_ring_ids - logging.info(f'garbage members: {garbage_members}') - assert len(garbage_members) == 1 - garbage_member = next(iter(garbage_members)) - - # The garbage member is the one that we failed to remove - assert garbage_member == removed_host_id - - # Verify that at least it's a non-voter. - assert garbage_member in {m[0] for m in group0_members if not m[1]} - - logging.info(f'removenode {servers[0]} using {servers[1]} again') - # Retry removenode. It should skip the token ring removal step and remove the server from group 0. - await manager.remove_node(servers[1].server_id, servers[0].server_id) - - group0_members = await get_current_group0_config(manager, servers[1]) - logging.info(f'group 0 members: {group0_members}') - group0_ids = {m[0] for m in group0_members} - - # Token ring members and group 0 members should now be the same. - assert token_ring_ids == group0_ids - - # Verify that availability is not reduced. - # Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers. - - logging.info(f'stop {servers[1]}') - await manager.server_stop_gracefully(servers[1].server_id) - - logging.info(f'removenode {servers[1]} using {servers[2]}') - await manager.remove_node(servers[2].server_id, servers[1].server_id) - - # Perform a similar scenario with decommission. One of the node fails to decommission fully, - # but it manages to leave the token ring. We observe the leftovers using the same APIs as above - # and remove the leftovers. - # We can do this with only 2 nodes because during decommission we become a non-voter before - # leaving the token ring, thus the remaining single node will become a voting majority - # and will be able to perform removenode alone. - - decommissioned_host_id = await manager.get_host_id(servers[2].server_id) - await manager.api.enable_injection( - servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True) - logging.info(f'decommission {servers[2]}') - try: - await manager.decommission_node(servers[2].server_id) - except Exception: - logging.info(f'expected exception during injection') - logging.info(f'stop {servers[2]}') - await manager.server_stop_gracefully(servers[2].server_id) - - token_ring_ids = await get_token_ring_host_ids(manager, servers[3]) - logging.info(f'token ring members: {token_ring_ids}') - - group0_members = await get_current_group0_config(manager, servers[3]) - logging.info(f'group 0 members: {group0_members}') - group0_ids = {m[0] for m in group0_members} - - assert token_ring_ids <= group0_ids - - garbage_members = group0_ids - token_ring_ids - logging.info(f'garbage members: {garbage_members}') - assert len(garbage_members) == 1 - garbage_member = next(iter(garbage_members)) - - assert garbage_member == decommissioned_host_id - assert garbage_member in {m[0] for m in group0_members if not m[1]} - - logging.info(f'removenode {servers[2]} using {servers[3]}') - await manager.remove_node(servers[3].server_id, servers[2].server_id) - - group0_members = await get_current_group0_config(manager, servers[3]) - logging.info(f'group 0 members: {group0_members}') - group0_ids = {m[0] for m in group0_members} - - assert token_ring_ids == group0_ids - - -@pytest.mark.asyncio -@pytest.mark.skip(reason="Wait for @slow attribute, #11713") -async def test_remove_node_with_concurrent_ddl(manager, random_tables): - stopped = False - ddl_failed = False - - async def do_ddl(): - nonlocal ddl_failed - iteration = 0 - while not stopped: - logger.debug(f'ddl, iteration {iteration} started') - try: - # If the node was removed, the driver may retry "create table" on another node, - # but the request might have already been completed. - # The same applies to drop_table. - - await random_tables.add_tables(5, 5, if_not_exists=True) - await random_tables.verify_schema() - while len(random_tables.tables) > 0: - await random_tables.drop_table(random_tables.tables[-1], if_exists=True) - logger.debug(f'ddl, iteration {iteration} finished') - except: - logger.exception(f'ddl, iteration {iteration} failed') - ddl_failed = True - raise - iteration += 1 - - async def do_remove_node(): - for i in range(10): - logger.debug(f'do_remove_node [{i}], iteration started') - if ddl_failed: - logger.debug(f'do_remove_node [{i}], ddl failed, exiting') - break - server_ids = await manager.running_servers() - host_ids = await asyncio.gather(*(manager.get_host_id(s) for s in server_ids)) - initiator_index, target_index = random.sample(range(len(server_ids)), 2) - initiator_ip = server_ids[initiator_index] - target_ip = server_ids[target_index] - target_host_id = host_ids[target_index] - logger.info(f'do_remove_node [{i}], running remove_node, ' - f'initiator server [{initiator_ip}], target ip [{target_ip}], ' - f'target host id [{target_host_id}]') - await manager.wait_for_host_known(initiator_ip, target_host_id) - logger.info(f'do_remove_node [{i}], stopping target server [{target_ip}], host_id [{target_host_id}]') - await manager.server_stop_gracefully(target_ip) - logger.info(f'do_remove_node [{i}], target server [{target_ip}] stopped, ' - f'waiting for it to be down on [{initiator_ip}]') - await manager.wait_for_host_down(initiator_ip, target_ip) - logger.info(f'do_remove_node [{i}], invoking remove_node') - await manager.remove_node(initiator_ip, target_ip, target_host_id) - # TODO: check that group 0 no longer contains the removed node (#12153) - logger.info(f'do_remove_node [{i}], remove_node done') - new_server_ip = await manager.server_add() - logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done') - logger.info(f'do_remove_node [{i}], iteration finished') - - ddl_task = asyncio.create_task(do_ddl()) - try: - await do_remove_node() - finally: - logger.debug("do_remove_node finished, waiting for ddl fiber") - stopped = True - await ddl_task - logger.debug("ddl fiber done, finished") diff --git a/test/topology/test_topology_ip.py b/test/topology/test_topology_ip.py new file mode 100644 index 0000000000..e2b32a8787 --- /dev/null +++ b/test/topology/test_topology_ip.py @@ -0,0 +1,50 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test replacing node in different scenarios +""" +import time +from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.manager_client import ManagerClient +from test.topology.util import wait_for_token_ring_and_group0_consistency +import pytest + + +@pytest.mark.asyncio +async def test_replace_different_ip(manager: ManagerClient) -> None: + """Replace an existing node with new node using a different IP address""" + servers = await manager.running_servers() + await manager.server_stop(servers[0].server_id) + replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False) + await manager.server_add(replace_cfg) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + +@pytest.mark.asyncio +async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None: + """Replace an existing node with new node reusing the replaced node host id""" + servers = await manager.running_servers() + await manager.server_stop(servers[0].server_id) + replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True) + await manager.server_add(replace_cfg) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + +@pytest.mark.asyncio +async def test_replace_reuse_ip(manager: ManagerClient) -> None: + """Replace an existing node with new node using the same IP address""" + servers = await manager.running_servers() + await manager.server_stop(servers[0].server_id) + replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False) + await manager.server_add(replace_cfg) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + +@pytest.mark.asyncio +async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None: + """Replace an existing node with new node using the same IP address and same host id""" + servers = await manager.running_servers() + await manager.server_stop(servers[0].server_id) + replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True) + await manager.server_add(replace_cfg) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) diff --git a/test/topology/test_topology_rejoin.py b/test/topology/test_topology_rejoin.py new file mode 100644 index 0000000000..afc25c3309 --- /dev/null +++ b/test/topology/test_topology_rejoin.py @@ -0,0 +1,21 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test rejoin of a server after it was stopped suddenly (crash-like) +""" +from test.pylib.manager_client import ManagerClient +import pytest + + +@pytest.mark.asyncio +async def test_start_after_sudden_stop(manager: ManagerClient, random_tables) -> None: + """Tests a server can rejoin the cluster after being stopped suddenly""" + servers = await manager.running_servers() + table = await random_tables.add_table(ncolumns=5) + await manager.server_stop(servers[0].server_id) + await table.add_column() + await manager.server_start(servers[0].server_id) + await random_tables.verify_schema() diff --git a/test/topology/test_topology_remove_decom.py b/test/topology/test_topology_remove_decom.py new file mode 100644 index 0000000000..302e72e660 --- /dev/null +++ b/test/topology/test_topology_remove_decom.py @@ -0,0 +1,133 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test consistency of schema changes with topology changes. +""" +import logging +import asyncio +import random +import time +from test.pylib.util import wait_for +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables +from test.topology.util import check_token_ring_and_group0_consistency, \ + wait_for_token_ring_and_group0_consistency +import pytest + + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_remove_node_add_column(manager: ManagerClient, random_tables: RandomTables): + """Add a node, remove an original node, add a column""" + servers = await manager.running_servers() + table = await random_tables.add_table(ncolumns=5) + await manager.server_add() + await manager.server_stop_gracefully(servers[1].server_id) # stop [1] + await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1] + await check_token_ring_and_group0_consistency(manager) + await table.add_column() + await random_tables.verify_schema() + + +@pytest.mark.asyncio +async def test_decommission_node_add_column(manager: ManagerClient, random_tables: RandomTables): + """Add a node, remove an original node, add a column""" + table = await random_tables.add_table(ncolumns=5) + servers = await manager.running_servers() + decommission_target = servers[1] + # The sleep injections significantly increase the probability of reproducing #11780: + # 1. bootstrapped_server finishes bootstrapping and enters NORMAL state + # 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server), + # enters sleep before calling storage_service::notify_joined + # 3. we start decommission on decommission_target + # 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server + # 5. bootstrapped_server receives the RPC and enters sleep + # 6. decommission_target handle_state_normal wakes up, + # calls storage_service::notify_joined which drops some RPC clients + # 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail + await manager.api.enable_injection( + decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True) + bootstrapped_server = await manager.server_add() + async def no_joining_nodes(): + joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr) + return not joining_nodes + # Wait until decommission_target thinks that bootstrapped_server is NORMAL + # note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal + await wait_for(no_joining_nodes, time.time() + 30, period=.1) + await manager.api.enable_injection( + bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True) + await manager.decommission_node(decommission_target.server_id) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + await table.add_column() + await random_tables.verify_schema() + + +@pytest.mark.asyncio +@pytest.mark.skip(reason="Wait for @slow attribute, #11713") +async def test_remove_node_with_concurrent_ddl(manager: ManagerClient, random_tables: RandomTables): + stopped = False + ddl_failed = False + + async def do_ddl(): + nonlocal ddl_failed + iteration = 0 + while not stopped: + logger.debug(f'ddl, iteration {iteration} started') + try: + # If the node was removed, the driver may retry "create table" on another node, + # but the request might have already been completed. + # The same applies to drop_table. + + await random_tables.add_tables(5, 5, if_not_exists=True) + await random_tables.verify_schema() + while len(random_tables.tables) > 0: + await random_tables.drop_table(random_tables.tables[-1], if_exists=True) + logger.debug(f'ddl, iteration {iteration} finished') + except: + logger.exception(f'ddl, iteration {iteration} failed') + ddl_failed = True + raise + iteration += 1 + + async def do_remove_node(): + for i in range(10): + logger.debug(f'do_remove_node [{i}], iteration started') + if ddl_failed: + logger.debug(f'do_remove_node [{i}], ddl failed, exiting') + break + server_ids = await manager.running_servers() + host_ids = await asyncio.gather(*(manager.get_host_id(s) for s in server_ids)) + initiator_index, target_index = random.sample(range(len(server_ids)), 2) + initiator_ip = server_ids[initiator_index] + target_ip = server_ids[target_index] + target_host_id = host_ids[target_index] + logger.info(f'do_remove_node [{i}], running remove_node, ' + f'initiator server [{initiator_ip}], target ip [{target_ip}], ' + f'target host id [{target_host_id}]') + await manager.wait_for_host_known(initiator_ip, target_host_id) + logger.info(f'do_remove_node [{i}], stopping target server [{target_ip}], host_id [{target_host_id}]') + await manager.server_stop_gracefully(target_ip) + logger.info(f'do_remove_node [{i}], target server [{target_ip}] stopped, ' + f'waiting for it to be down on [{initiator_ip}]') + await manager.wait_for_host_down(initiator_ip, target_ip) + logger.info(f'do_remove_node [{i}], invoking remove_node') + await manager.remove_node(initiator_ip, target_ip, target_host_id) + # TODO: check that group 0 no longer contains the removed node (#12153) + logger.info(f'do_remove_node [{i}], remove_node done') + new_server_ip = await manager.server_add() + logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done') + logger.info(f'do_remove_node [{i}], iteration finished') + + ddl_task = asyncio.create_task(do_ddl()) + try: + await do_remove_node() + finally: + logger.debug("do_remove_node finished, waiting for ddl fiber") + stopped = True + await ddl_task + logger.debug("ddl fiber done, finished") diff --git a/test/topology/test_topology_remove_garbage_group0.py b/test/topology/test_topology_remove_garbage_group0.py new file mode 100644 index 0000000000..69975c8535 --- /dev/null +++ b/test/topology/test_topology_remove_garbage_group0.py @@ -0,0 +1,128 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test removenode with node with node no longer member +""" +import logging +from test.pylib.manager_client import ManagerClient +from test.pylib.rest_client import inject_error_one_shot +from test.topology.util import get_token_ring_host_ids, get_current_group0_config, \ + check_token_ring_and_group0_consistency +import pytest + + +logger = logging.getLogger(__name__) + + +async def test_remove_garbage_group0_members(manager: ManagerClient): + """ + Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission + can be handled by executing removenode (which should clear the 'garbage' group 0 member), + even though the node is no longer a token ring member. + """ + # 4 servers, one dead + await manager.server_add() + servers = await manager.running_servers() + removed_host_id = await manager.get_host_id(servers[0].server_id) + await manager.server_stop_gracefully(servers[0].server_id) + + logging.info(f'removenode {servers[0]} using {servers[1]}') + # removenode will fail after removing the server from the token ring, + # but before removing it from group 0 + await inject_error_one_shot(manager.api, servers[1].ip_addr, + 'removenode_fail_before_remove_from_group0') + try: + await manager.remove_node(servers[1].server_id, servers[0].server_id) + except Exception: + # Note: the exception returned here is only '500 internal server error', + # need to look in test.py log for the actual message coming from Scylla. + logging.info(f'expected exception during injection') + + # Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs + # (internally, this endpoint uses token_metadata) + token_ring_ids = await get_token_ring_host_ids(manager, servers[1]) + logging.info(f'token ring members: {token_ring_ids}') + + group0_members = await get_current_group0_config(manager, servers[1]) + logging.info(f'group 0 members: {group0_members}') + group0_ids = {m[0] for m in group0_members} + + # Token ring members should currently be a subset of group 0 members + assert token_ring_ids <= group0_ids + + garbage_members = group0_ids - token_ring_ids + logging.info(f'garbage members: {garbage_members}') + assert len(garbage_members) == 1 + garbage_member = next(iter(garbage_members)) + + # The garbage member is the one that we failed to remove + assert garbage_member == removed_host_id + + # Verify that at least it's a non-voter. + assert garbage_member in {m[0] for m in group0_members if not m[1]} + + logging.info(f'removenode {servers[0]} using {servers[1]} again') + # Retry removenode. It should skip the token ring removal step and remove the server from group 0. + await manager.remove_node(servers[1].server_id, servers[0].server_id) + + group0_members = await get_current_group0_config(manager, servers[1]) + logging.info(f'group 0 members: {group0_members}') + group0_ids = {m[0] for m in group0_members} + + # Token ring members and group 0 members should now be the same. + assert token_ring_ids == group0_ids + + # Verify that availability is not reduced. + # Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers. + + logging.info(f'stop {servers[1]}') + await manager.server_stop_gracefully(servers[1].server_id) + + logging.debug(f'waiting for {servers[2]} to see {servers[1]} is down') + await manager.server_not_sees_other_server(servers[2].ip_addr, servers[1].ip_addr) + logging.info(f'removenode {servers[1]} using {servers[2]}') + await manager.remove_node(servers[2].server_id, servers[1].server_id) + + # Perform a similar scenario with decommission. One of the node fails to decommission fully, + # but it manages to leave the token ring. We observe the leftovers using the same APIs as above + # and remove the leftovers. + # We can do this with only 2 nodes because during decommission we become a non-voter before + # leaving the token ring, thus the remaining single node will become a voting majority + # and will be able to perform removenode alone. + + decommissioned_host_id = await manager.get_host_id(servers[2].server_id) + await manager.api.enable_injection( + servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True) + logging.info(f'decommission {servers[2]}') + try: + await manager.decommission_node(servers[2].server_id) + except Exception: + logging.info(f'expected exception during injection') + logging.info(f'stop {servers[2]}') + await manager.server_stop_gracefully(servers[2].server_id) + + token_ring_ids = await get_token_ring_host_ids(manager, servers[3]) + logging.info(f'token ring members: {token_ring_ids}') + + group0_members = await get_current_group0_config(manager, servers[3]) + logging.info(f'group 0 members: {group0_members}') + group0_ids = {m[0] for m in group0_members} + + assert token_ring_ids <= group0_ids + + garbage_members = group0_ids - token_ring_ids + logging.info(f'garbage members: {garbage_members}') + assert len(garbage_members) == 1 + garbage_member = next(iter(garbage_members)) + + assert garbage_member == decommissioned_host_id + assert garbage_member in {m[0] for m in group0_members if not m[1]} + + logging.info(f'removenode {servers[2]} using {servers[3]}') + await manager.remove_node(servers[3].server_id, servers[2].server_id) + + await check_token_ring_and_group0_consistency(manager) + diff --git a/test/topology/test_topology_schema.py b/test/topology/test_topology_schema.py new file mode 100644 index 0000000000..3d5e30091b --- /dev/null +++ b/test/topology/test_topology_schema.py @@ -0,0 +1,34 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test consistency of schema changes with server hard stop. +""" +import time +from test.topology.util import wait_for_token_ring_and_group0_consistency +import pytest + + +@pytest.mark.asyncio +async def test_topology_schema_changes(manager, random_tables): + """Test schema consistency with restart, add, and sudden stop of servers""" + table = await random_tables.add_table(ncolumns=5) + servers = await manager.running_servers() + + # Test add column after server restart + await manager.server_restart(servers[1].server_id) + await table.add_column() + await random_tables.verify_schema() + + # Test add column after adding a server + await manager.server_add() + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + await table.add_column() + await random_tables.verify_schema() + + # Test add column after hard stop of a server (1/3) + await manager.server_stop(servers[1].server_id) + await table.add_column() + await random_tables.verify_schema() diff --git a/test/topology/util.py b/test/topology/util.py new file mode 100644 index 0000000000..0d34bb0b0b --- /dev/null +++ b/test/topology/util.py @@ -0,0 +1,81 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test consistency of schema changes with topology changes. +""" +import logging +import pytest +import time +from test.pylib.internal_types import ServerInfo +from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier + + +logger = logging.getLogger(__name__) + + +async def get_token_ring_host_ids(manager: ManagerClient, srv: ServerInfo) -> set[str]: + """Get the host IDs of normal token owners known by `srv`.""" + token_endpoint_map = await manager.api.client.get_json("/storage_service/tokens_endpoint", srv.ip_addr) + normal_endpoints = {e["value"] for e in token_endpoint_map} + logger.info(f"Normal endpoints' IPs by {srv}: {normal_endpoints}") + host_id_map = await manager.api.client.get_json('/storage_service/host_id', srv.ip_addr) + all_host_ids = {e["value"] for e in host_id_map} + logger.info(f"All host IDs by {srv}: {all_host_ids}") + normal_host_ids = {e["value"] for e in host_id_map if e["key"] in normal_endpoints} + logger.info(f"Normal endpoints' host IDs by {srv}: {normal_host_ids}") + return normal_host_ids + + +async def get_current_group0_config(manager: ManagerClient, srv: ServerInfo) -> set[tuple[str, bool]]: + """Get the current Raft group 0 configuration known by `srv`. + The first element of each tuple is the Raft ID of the node (which is equal to the Host ID), + the second element indicates whether the node is a voter. + """ + assert manager.cql + host = (await wait_for_cql_and_get_hosts(manager.cql, [srv], time.time() + 60))[0] + await read_barrier(manager.cql, host) + group0_id = (await manager.cql.run_async( + "select value from system.scylla_local where key = 'raft_group0_id'", + host=host))[0].value + config = await manager.cql.run_async( + f"select server_id, can_vote from system.raft_state where group_id = {group0_id} and disposition = 'CURRENT'", + host=host) + result = {(str(m.server_id), bool(m.can_vote)) for m in config} + logger.info(f"Group 0 members by {srv}: {result}") + return result + + +async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> None: + """Ensure that the normal token owners and group 0 members match + according to each currently running server. + """ + servers = await manager.running_servers() + for srv in servers: + group0_members = await get_current_group0_config(manager, srv) + group0_ids = {m[0] for m in group0_members} + token_ring_ids = await get_token_ring_host_ids(manager, srv) + assert token_ring_ids == group0_ids + + +async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None: + """Weaker version of the above check; the token ring is not immediately updated after + bootstrap/replace/decommission - the normal tokens of the new node propagate through gossip. + Take this into account and wait for the equality condition to hold, with a timeout. + """ + servers = await manager.running_servers() + for srv in servers: + group0_members = await get_current_group0_config(manager, srv) + group0_ids = {m[0] for m in group0_members} + async def token_ring_matches(): + token_ring_ids = await get_token_ring_host_ids(manager, srv) + diff = token_ring_ids ^ group0_ids + if diff: + logger.warning(f"Group 0 members and token ring members don't yet match" \ + f" according to {srv}, symmetric difference: {diff}") + return None + return True + await wait_for(token_ring_matches, deadline, period=.5) diff --git a/test/topology_custom/__init__.py b/test/topology_custom/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/topology_custom/conftest.py b/test/topology_custom/conftest.py new file mode 100644 index 0000000000..74f19717c3 --- /dev/null +++ b/test/topology_custom/conftest.py @@ -0,0 +1,9 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +# This file configures pytest for all tests in this directory, and also +# defines common test fixtures for all of them to use + +from test.topology.conftest import * diff --git a/test/topology_custom/pytest.ini b/test/topology_custom/pytest.ini new file mode 100644 index 0000000000..aa0aef9e89 --- /dev/null +++ b/test/topology_custom/pytest.ini @@ -0,0 +1,9 @@ +[pytest] +asyncio_mode = auto + +log_cli = true +log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s +log_date_format = %H:%M:%S + +markers = + slow: tests that take more than 30 seconds to run diff --git a/test/topology_custom/suite.yaml b/test/topology_custom/suite.yaml new file mode 100644 index 0000000000..194dd0fc2a --- /dev/null +++ b/test/topology_custom/suite.yaml @@ -0,0 +1,6 @@ +type: Topology +pool_size: 4 +cluster_size: 0 +extra_scylla_config_options: + authenticator: AllowAllAuthenticator + authorizer: AllowAllAuthorizer diff --git a/test/topology_custom/test_custom.py b/test/topology_custom/test_custom.py new file mode 100644 index 0000000000..74f4b21284 --- /dev/null +++ b/test/topology_custom/test_custom.py @@ -0,0 +1,19 @@ +# +# Copyright (C) 2023-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables +from test.pylib.util import unique_name +import pytest + + +@pytest.mark.asyncio +async def test_custom(request, manager: ManagerClient): + servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()] + tables = RandomTables(request.node.name, manager, unique_name(), 3) + table = await tables.add_table(ncolumns=5) + await table.insert_seq() + await table.add_index(2) + await tables.verify_schema(table) diff --git a/test/topology_custom/test_topology_smp.py b/test/topology_custom/test_topology_smp.py new file mode 100644 index 0000000000..28d31a3cf4 --- /dev/null +++ b/test/topology_custom/test_topology_smp.py @@ -0,0 +1,52 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +""" +Test functionality on the cluster with different values of the --smp parameter on the nodes. +""" +import logging +import time +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables +from test.pylib.util import unique_name +from test.topology.util import wait_for_token_ring_and_group0_consistency +import pytest +from pytest import FixtureRequest + +logger = logging.getLogger(__name__) + + +# Checks basic functionality on the cluster with different values of the --smp parameter on the nodes. +@pytest.mark.asyncio +async def test_nodes_with_different_smp(request: FixtureRequest, manager: ManagerClient) -> None: + # In this test it's more convenient to start with a fresh cluster. + + # When the node starts it tries to communicate with others + # by sending group0_peer_exchange message to them. + # This message can be handled on arbitrary shard of the target node. + # The method manager.server_add waits for node to start, which can only happen + # if this message has been handled correctly. + # + # Note: messaging_service is initialized with server_socket::load_balancing_algorithm::port + # policy, this means that the shard for message will be chosen as client_port % smp::count. + # The client port in turn is chosen as rand() * smp::count + current_shard + # (posix_socket_impl::find_port_and_connect). + # If this succeeds to occupy a free port in 5 tries and smp::count is the same + # on both nodes, then it's guaranteed that the message will be + # processed on the same shard as the calling code. + # In the general case, we cannot assume that this same shard guarantee holds. + logger.info(f'Adding --smp=3 server') + await manager.server_add(cmdline=['--smp', '3']) + logger.info(f'Adding --smp=4 server') + await manager.server_add(cmdline=['--smp', '4']) + logger.info(f'Adding --smp=5 server') + await manager.server_add(cmdline=['--smp', '5']) + + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + + logger.info(f'Creating new tables') + tables = RandomTables(request.node.name, manager, unique_name(), 3) + await tables.add_tables(ntables=4, ncolumns=5) + await tables.verify_schema() diff --git a/test/topology_raft_disabled/test_raft_upgrade.py b/test/topology_raft_disabled/test_raft_upgrade.py index 1f384da4b1..5d7085c449 100644 --- a/test/topology_raft_disabled/test_raft_upgrade.py +++ b/test/topology_raft_disabled/test_raft_upgrade.py @@ -16,7 +16,7 @@ from cassandra.pool import Host # type: ignore # pylint: from test.pylib.manager_client import ManagerClient, IPAddress, ServerInfo from test.pylib.random_tables import RandomTables -from test.pylib.rest_client import ScyllaRESTAPIClient, inject_error +from test.pylib.rest_client import ScyllaRESTAPIClient, inject_error_one_shot from test.pylib.util import wait_for, wait_for_cql_and_get_hosts @@ -110,6 +110,7 @@ def log_run_time(f): @pytest.mark.asyncio @log_run_time +@pytest.mark.replication_factor(1) async def test_raft_upgrade_basic(manager: ManagerClient, random_tables: RandomTables): """ kbr-: the test takes about 7 seconds in dev mode on my laptop. @@ -144,6 +145,7 @@ async def test_raft_upgrade_basic(manager: ManagerClient, random_tables: RandomT @pytest.mark.asyncio @log_run_time +@pytest.mark.replication_factor(1) async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables: RandomTables): """ We enable Raft on every server and the upgrade procedure starts. All servers join group 0. Then one @@ -163,42 +165,41 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables: # TODO error injection should probably be done through ScyllaClusterManager (we may need to mark the cluster as dirty). # In this test the cluster is dirty anyway due to a restart so it's safe. - async with inject_error(manager.api, srv1.ip_addr, 'group0_upgrade_before_synchronize', - one_shot=True): - logging.info(f"Enabling Raft on {others} and restarting") - await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in others)) - cql = await reconnect_driver(manager) + await inject_error_one_shot(manager.api, srv1.ip_addr, 'group0_upgrade_before_synchronize') + logging.info(f"Enabling Raft on {others} and restarting") + await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in others)) + cql = await reconnect_driver(manager) - logging.info(f"Cluster restarted, waiting until driver reconnects to {others}") - hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60) - logging.info(f"Driver reconnected, hosts: {hosts}") + logging.info(f"Cluster restarted, waiting until driver reconnects to {others}") + hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60) + logging.info(f"Driver reconnected, hosts: {hosts}") - logging.info(f"Waiting until {hosts} enter 'synchronize' state") - await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts)) - logging.info(f"{hosts} entered synchronize") + logging.info(f"Waiting until {hosts} enter 'synchronize' state") + await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts)) + logging.info(f"{hosts} entered synchronize") - # TODO ensure that srv1 failed upgrade - look at logs? - # '[shard 0] raft_group0_upgrade - Raft upgrade failed: std::runtime_error (error injection before group 0 upgrade enters synchronize).' + # TODO ensure that srv1 failed upgrade - look at logs? + # '[shard 0] raft_group0_upgrade - Raft upgrade failed: std::runtime_error (error injection before group 0 upgrade enters synchronize).' - logging.info(f"Setting recovery state on {hosts}") - for host in hosts: - await cql.run_async( - "update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", - host=host) + logging.info(f"Setting recovery state on {hosts}") + for host in hosts: + await cql.run_async( + "update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", + host=host) - logging.info(f"Restarting {others}") - await asyncio.gather(*(restart(manager, srv) for srv in others)) - cql = await reconnect_driver(manager) + logging.info(f"Restarting {others}") + await asyncio.gather(*(restart(manager, srv) for srv in others)) + cql = await reconnect_driver(manager) - logging.info(f"{others} restarted, waiting until driver reconnects to them") - hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60) + logging.info(f"{others} restarted, waiting until driver reconnects to them") + hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60) - logging.info(f"Checking if {hosts} are in recovery state") - for host in hosts: - rs = await cql.run_async( - "select value from system.scylla_local where key = 'group0_upgrade_state'", - host=host) - assert rs[0].value == 'recovery' + logging.info(f"Checking if {hosts} are in recovery state") + for host in hosts: + rs = await cql.run_async( + "select value from system.scylla_local where key = 'group0_upgrade_state'", + host=host) + assert rs[0].value == 'recovery' logging.info("Creating a table while in recovery state") table = await random_tables.add_table(ncolumns=5) @@ -229,6 +230,7 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables: @pytest.mark.asyncio @log_run_time +@pytest.mark.replication_factor(1) async def test_recovery_after_majority_loss(manager: ManagerClient, random_tables: RandomTables): """ We successfully upgrade a cluster. Eventually however all servers but one fail - group 0