From 950d606e3846973a15b00222380d2477cbdd40c8 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Thu, 2 Jun 2022 21:06:33 +0300 Subject: [PATCH 1/4] test.py: shutdown cassandra-python connection before exit Shutdown cassandra-python connections before exit, to avoid warnings/exceptions at shutdown. Cassandra-python runs a thread pool and if connections are not shut down before exit, there could be a warning that the thread pool is not destroyed before exiting main. --- test/pylib/scylla_server.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/test/pylib/scylla_server.py b/test/pylib/scylla_server.py index 4246de95d8..9e9556bcfc 100644 --- a/test/pylib/scylla_server.py +++ b/test/pylib/scylla_server.py @@ -15,6 +15,7 @@ from typing import Optional, List, Callable from cassandra import InvalidRequest # type: ignore from cassandra.auth import PlainTextAuthProvider # type: ignore from cassandra.cluster import Cluster, NoHostAvailable # type: ignore +from cassandra.cluster import Session # type: ignore from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore from cassandra.policies import RoundRobinPolicy # type: ignore @@ -108,6 +109,8 @@ class ScyllaServer: self.seeds = seed self.cmd: Optional[asyncio.subprocess.Process] = None self.log_savepoint = 0 + self.control_cluster: Optional[Cluster] = None + self.control_connection: Optional[Session] = None async def stop_server() -> None: if self.is_running: @@ -227,8 +230,9 @@ class ScyllaServer: session.execute("CREATE KEYSPACE k WITH REPLICATION = {" + "'class' : 'SimpleStrategy', 'replication_factor' : 1 }") session.execute("DROP KEYSPACE k") - self.control_connection = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, - contact_points=[self.hostname], auth_provider=auth).connect() + self.control_cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + contact_points=[self.hostname], auth_provider=auth) + self.control_connection = self.control_cluster.connect() return True except (NoHostAvailable, InvalidRequest): return False @@ -316,6 +320,11 @@ Check the log files: return try: + if self.control_connection is not None: + self.control_connection.shutdown() + if self.control_cluster is not None: + self.control_cluster.shutdown() + self.cmd.kill() except ProcessLookupError: pass @@ -326,6 +335,7 @@ Check the log files: logging.info("stopped server at host %s", hostname) self.cmd = None self.control_connection = None + self.control_cluster = None async def uninstall(self) -> None: """Clear all files left from a stopped server, including the From 2b92d96c87e8d8a245fec96f6c21047b8316f14a Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Thu, 2 Jun 2022 21:08:03 +0300 Subject: [PATCH 2/4] test.py: proper suite name in the log Use a nice suite name rather than an internal Python object key in the log. Fixes a regression introduced when addressing a style-related review remark. --- test/pylib/artifact_registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pylib/artifact_registry.py b/test/pylib/artifact_registry.py index dbb4e4efe7..1f611784fc 100644 --- a/test/pylib/artifact_registry.py +++ b/test/pylib/artifact_registry.py @@ -47,7 +47,7 @@ class ArtifactRegistry: the suite. Executing exit artifacts right away is a good idea because it kills running processes and frees their resources early.""" - logging.info("Cleaning up after suite %s...", suite) + logging.info("Cleaning up after suite %s...", suite.suite_key) # Only drop suite artifacts if the suite executed successfully. if not failed and suite in self.suite_artifacts: await asyncio.gather(*self.suite_artifacts[suite]) @@ -55,7 +55,7 @@ class ArtifactRegistry: if suite in self.exit_artifacts: await asyncio.gather(*self.exit_artifacts[suite]) del self.exit_artifacts[suite] - logging.info("Done cleaning up after suite %s...", suite) + logging.info("Done cleaning up after suite %s...", suite.suite_key) def add_suite_artifact(self, suite: Suite, artifact: Callable[[], Artifact]) -> None: self.suite_artifacts.setdefault(suite, []).append(artifact()) From 4cf63efe6c1390fc1bf4a65a52d6e47f709b64e5 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Thu, 2 Jun 2022 19:46:26 +0300 Subject: [PATCH 3/4] test.py: make Test hierarchy resettable Introduce reset() hierarchy, which is similar to __init__(), i.e. allows to reset test execution state before retrying it. Useful for retrying flaky tests. --- test.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/test.py b/test.py index 71ebf31900..388a62d8e4 100755 --- a/test.py +++ b/test.py @@ -412,6 +412,16 @@ class Test: # Unique file name, which is also readable by human, as filename prefix self.uname = "{}.{}".format(self.shortname, self.id) self.log_filename = os.path.join(suite.options.tmpdir, self.mode, self.uname + ".log") + Test._reset(self) + + def reset(self): + """Reset this object, including all derived state.""" + for cls in reversed(self.__class__.__mro__): + if hasattr(cls, "_reset"): + cls._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.success = None @abstractmethod @@ -446,6 +456,11 @@ class UnitTest(Test): self.env = coverage.env(self.path) else: self.env = dict() + UnitTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + pass def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) @@ -474,6 +489,10 @@ class BoostTest(UnitTest): boost_args += ['--'] self.args = boost_args + self.args self.casename = casename + BoostTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.__junit_etree = None def get_junit_etree(self): @@ -522,6 +541,10 @@ class CQLApprovalTest(Test): "--input={}".format(self.cql), "--output={}".format(self.tmpfile), ] + CQLApprovalTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" self.is_before_test_ok = False self.is_executed_ok = False self.is_new = False @@ -617,6 +640,11 @@ class RunTest(Test): self.path = os.path.join(suite.path, shortname) self.xmlout = os.path.join(suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml") self.args = ["--junit-xml={}".format(self.xmlout)] + RunTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + pass def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) @@ -635,13 +663,17 @@ class PythonTest(Test): def __init__(self, test_no, shortname, suite): super().__init__(test_no, shortname, suite) self.path = "pytest" - self.server_log = None - self.is_before_test_ok = False - self.is_after_test_ok = False self.xmlout = os.path.join(self.suite.options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml") self.args = ["-o", "junit_family=xunit2", "--junit-xml={}".format(self.xmlout), os.path.join(suite.path, shortname + ".py")] + PythonTest._reset(self) + + def _reset(self): + """Reset the test before a retry, if it is retried as flaky""" + self.server_log = None + self.is_before_test_ok = False + self.is_after_test_ok = False def print_summary(self): print("Output of {} {}:".format(self.path, " ".join(self.args))) From 8036d19b840cadc8e0a850d5bd7bd8737e821bb5 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Thu, 2 Jun 2022 21:08:47 +0300 Subject: [PATCH 4/4] test.py: add support for flaky tests The idea is that a flaky test can be marked as flaky rather than disabled to make sure it passes in CI. This reduces chances of a regression being added while the flakiness is being resolved and the number of disabled tests doesn't grow. --- test.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/test.py b/test.py index 388a62d8e4..24946d5310 100755 --- a/test.py +++ b/test.py @@ -79,6 +79,7 @@ class TestSuite(ABC): suites = dict() artifacts = ArtifactRegistry() hosts = HostRegistry() + FLAKY_RETRIES = 5 _next_id = 0 def __init__(self, path, cfg, options, mode): @@ -99,6 +100,7 @@ class TestSuite(ABC): self.disabled_tests = set(self.cfg.get("disable", [])) # Skip tests disabled in specific mode. self.disabled_tests.update(self.cfg.get("skip_in_" + mode, [])) + self.flaky_tests = set(self.cfg.get("flaky", [])) # If this mode is one of the debug modes, and there are # tests disabled in a debug mode, add these tests to the skip list. if mode in debug_modes: @@ -175,7 +177,13 @@ class TestSuite(ABC): async def run(self, test, options): try: - await test.run(options) + for i in range(1, self.FLAKY_RETRIES): + await test.run(options) + if test.success or not test.is_flaky or test.is_cancelled: + break + logging.info("Retrying test %s after a flaky fail, retry %d", test.uname, i) + test.is_flaky_failure = True + test.reset() finally: self.pending_test_count -= 1 self.n_failed += int(not test.success) @@ -412,6 +420,12 @@ class Test: # Unique file name, which is also readable by human, as filename prefix self.uname = "{}.{}".format(self.shortname, self.id) self.log_filename = os.path.join(suite.options.tmpdir, self.mode, self.uname + ".log") + self.is_flaky = self.shortname in suite.flaky_tests + # True if the test was retried after it failed + self.is_flaky_failure = False + # True if the test was cancelled by a ctrl-c or timeout, so + # shouldn't be retried, even if it is flaky + self.is_cancelled = False Test._reset(self) def reset(self): @@ -554,6 +568,9 @@ class CQLApprovalTest(Test): self.unidiff = None self.server_log = None self.env = dict() + old_tmpfile = pathlib.Path(self.tmpfile) + if old_tmpfile.exists(): + old_tmpfile.unlink() async def run(self, options): self.success = False @@ -726,10 +743,20 @@ class TabularConsoleOutput: def print_progress(self, test): self.last_test_no += 1 + status = "" + if test.success: + logging.info("Test {} is flaky {}".format(test.uname, + test.is_flaky_failure)) + if test.is_flaky_failure: + status = palette.warn("[ FLKY ]") + else: + status = palette.ok("[ PASS ]") + else: + status = palette.fail("[ FAIL ]") msg = "{:10s} {:^8s} {:^7s} {:8s} {}".format( "[{}/{}]".format(self.last_test_no, self.test_count), test.suite.name, test.mode[:7], - palette.ok("[ PASS ]") if test.success else palette.fail("[ FAIL ]"), + status, test.uname ) if self.verbose is False: @@ -818,6 +845,7 @@ async def run_test(test, options, gentle_kill=False, env=dict()): # return False return True except (asyncio.TimeoutError, asyncio.CancelledError) as e: + test.is_cancelled = True if process is not None: if gentle_kill: process.terminate()