diff --git a/test.py b/test.py index 71ebf31900..24946d5310 100755 --- a/test.py +++ b/test.py @@ -79,6 +79,7 @@ class TestSuite(ABC): suites = dict() artifacts = ArtifactRegistry() hosts = HostRegistry() + FLAKY_RETRIES = 5 _next_id = 0 def __init__(self, path, cfg, options, mode): @@ -99,6 +100,7 @@ class TestSuite(ABC): self.disabled_tests = set(self.cfg.get("disable", [])) # Skip tests disabled in specific mode. self.disabled_tests.update(self.cfg.get("skip_in_" + mode, [])) + self.flaky_tests = set(self.cfg.get("flaky", [])) # If this mode is one of the debug modes, and there are # tests disabled in a debug mode, add these tests to the skip list. if mode in debug_modes: @@ -175,7 +177,13 @@ class TestSuite(ABC): async def run(self, test, options): try: - await test.run(options) + for i in range(1, self.FLAKY_RETRIES): + await test.run(options) + if test.success or not test.is_flaky or test.is_cancelled: + break + logging.info("Retrying test %s after a flaky fail, retry %d", test.uname, i) + test.is_flaky_failure = True + test.reset() finally: self.pending_test_count -= 1 self.n_failed += int(not test.success) @@ -412,6 +420,22 @@ class Test: # Unique file name, which is also readable by human, as filename prefix self.uname = "{}.{}".format(self.shortname, self.id) self.log_filename = os.path.join(suite.options.tmpdir, self.mode, self.uname + ".log") + self.is_flaky = self.shortname in suite.flaky_tests + # True if the test was retried after it failed + self.is_flaky_failure = False + # True if the test was cancelled by a ctrl-c or timeout, so + # shouldn't be retried, even if it is flaky + self.is_cancelled = False + Test._reset(self) + + def reset(self): + """Reset this object, including all derived state.""" + for cls in reversed(self.__class__.__mro__): + if hasattr(cls, "_reset"): + cls._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.success = None @abstractmethod @@ -446,6 +470,11 @@ class UnitTest(Test): self.env = coverage.env(self.path) else: self.env = dict() + UnitTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + pass def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) @@ -474,6 +503,10 @@ class BoostTest(UnitTest): boost_args += ['--'] self.args = boost_args + self.args self.casename = casename + BoostTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.__junit_etree = None def get_junit_etree(self): @@ -522,6 +555,10 @@ class CQLApprovalTest(Test): "--input={}".format(self.cql), "--output={}".format(self.tmpfile), ] + CQLApprovalTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.is_before_test_ok = False self.is_executed_ok = False self.is_new = False @@ -531,6 +568,9 @@ class CQLApprovalTest(Test): self.unidiff = None self.server_log = None self.env = dict() + old_tmpfile = pathlib.Path(self.tmpfile) + if old_tmpfile.exists(): + old_tmpfile.unlink() async def run(self, options): self.success = False @@ -617,6 +657,11 @@ class RunTest(Test): self.path = os.path.join(suite.path, shortname) self.xmlout = os.path.join(suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml") self.args = ["--junit-xml={}".format(self.xmlout)] + RunTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + pass def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) @@ -635,13 +680,17 @@ class PythonTest(Test): def __init__(self, test_no, shortname, suite): super().__init__(test_no, shortname, suite) self.path = "pytest" - self.server_log = None - self.is_before_test_ok = False - self.is_after_test_ok = False self.xmlout = os.path.join(self.suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml") self.args = ["-o", "junit_family=xunit2", "--junit-xml={}".format(self.xmlout), os.path.join(suite.path, shortname + ".py")] + PythonTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + self.server_log = None + self.is_before_test_ok = False + self.is_after_test_ok = False def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) @@ -694,10 +743,20 @@ class TabularConsoleOutput: def print_progress(self, test): self.last_test_no += 1 + status = "" + if test.success: + logging.info("Test {} is flaky {}".format(test.uname, + test.is_flaky_failure)) + if test.is_flaky_failure: + status = palette.warn("[ FLKY ]") + else: + status = palette.ok("[ PASS ]") + else: + status = palette.fail("[ FAIL ]") msg = "{:10s} {:^8s} {:^7s} {:8s} {}".format( "[{}/{}]".format(self.last_test_no, self.test_count), test.suite.name, test.mode[:7], - palette.ok("[ PASS ]") if test.success else palette.fail("[ FAIL ]"), + status, test.uname ) if self.verbose is False: @@ -786,6 +845,7 @@ async def run_test(test, options, gentle_kill=False, env=dict()): # return False return True except (asyncio.TimeoutError, asyncio.CancelledError) as e: + test.is_cancelled = True if process is not None: if gentle_kill: process.terminate() diff --git a/test/pylib/artifact_registry.py b/test/pylib/artifact_registry.py index dbb4e4efe7..1f611784fc 100644 --- a/test/pylib/artifact_registry.py +++ b/test/pylib/artifact_registry.py @@ -47,7 +47,7 @@ class ArtifactRegistry: the suite. Executing exit artifacts right away is a good idea because it kills running processes and frees their resources early.""" - logging.info("Cleaning up after suite %s...", suite) + logging.info("Cleaning up after suite %s...", suite.suite_key) # Only drop suite artifacts if the suite executed successfully. if not failed and suite in self.suite_artifacts: await asyncio.gather(*self.suite_artifacts[suite]) @@ -55,7 +55,7 @@ class ArtifactRegistry: if suite in self.exit_artifacts: await asyncio.gather(*self.exit_artifacts[suite]) del self.exit_artifacts[suite] - logging.info("Done cleaning up after suite %s...", suite) + logging.info("Done cleaning up after suite %s...", suite.suite_key) def add_suite_artifact(self, suite: Suite, artifact: Callable[[], Artifact]) -> None: self.suite_artifacts.setdefault(suite, []).append(artifact()) diff --git a/test/pylib/scylla_server.py b/test/pylib/scylla_server.py index 4246de95d8..9e9556bcfc 100644 --- a/test/pylib/scylla_server.py +++ b/test/pylib/scylla_server.py @@ -15,6 +15,7 @@ from typing import Optional, List, Callable from cassandra import InvalidRequest # type: ignore from cassandra.auth import PlainTextAuthProvider # type: ignore from cassandra.cluster import Cluster, NoHostAvailable # type: ignore +from cassandra.cluster import Session # type: ignore from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore from cassandra.policies import RoundRobinPolicy # type: ignore @@ -108,6 +109,8 @@ class ScyllaServer: self.seeds = seed self.cmd: Optional[asyncio.subprocess.Process] = None self.log_savepoint = 0 + self.control_cluster: Optional[Cluster] = None + self.control_connection: Optional[Session] = None async def stop_server() -> None: if self.is_running: @@ -227,8 +230,9 @@ class ScyllaServer: session.execute("CREATE KEYSPACE k WITH REPLICATION = {" + "'class' : 'SimpleStrategy', 'replication_factor' : 1 }") session.execute("DROP KEYSPACE k") - self.control_connection = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, - contact_points=[self.hostname], auth_provider=auth).connect() + self.control_cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + contact_points=[self.hostname], auth_provider=auth) + self.control_connection = self.control_cluster.connect() return True except (NoHostAvailable, InvalidRequest): return False @@ -316,6 +320,11 @@ Check the log files: return try: + if self.control_connection is not None: + self.control_connection.shutdown() + if self.control_cluster is not None: + self.control_cluster.shutdown() + self.cmd.kill() except ProcessLookupError: pass @@ -326,6 +335,7 @@ Check the log files: logging.info("stopped server at host %s", hostname) self.cmd = None self.control_connection = None + self.control_cluster = None async def uninstall(self) -> None: """Clear all files left from a stopped server, including the