test: auth_cluster: use safe_driver_shutdown() for Cluster teardown

A handful of cassandra-driver Cluster.shutdown() call sites in the
auth_cluster tests were missed by the previous sweep that introduced
safe_driver_shutdown(), because the local variable holding the Cluster
is named "c" rather than "cluster".

Direct Cluster.shutdown() is racy: the driver's "Task Scheduler"
thread may raise RuntimeError ("cannot schedule new futures after
shutdown") during or after the call, occasionally failing tests.
safe_driver_shutdown() suppresses this expected RuntimeError and
joins the scheduler thread.

Replace the remaining c.shutdown() calls in:
  - test/cluster/auth_cluster/test_startup_response.py
  - test/cluster/auth_cluster/test_maintenance_socket.py
with safe_driver_shutdown(c) and add the corresponding import from
test.pylib.driver_utils.

No behavioral change to the tests; only the driver teardown is
hardened against a known driver-side race.

Fixes SCYLLADB-1662

Closes scylladb/scylladb#29576
This commit is contained in:
Dario Mirovic
2026-04-20 19:31:52 +02:00
committed by Marcin Maliszkiewicz
parent 6f7bf30a14
commit cf237e060a
6 changed files with 18 additions and 14 deletions

View File

@@ -16,6 +16,8 @@ Usage:
import argparse, os, sys
from typing import Sequence
from test.pylib.driver_utils import safe_driver_shutdown
def read_statements(path: str) -> list[tuple[int, str]]:
stms: list[tuple[int, str]] = []
with open(path, 'r', encoding='utf-8') as f:
@@ -56,7 +58,7 @@ def exec_statements(statements: list[tuple[int, str]], socket_path: str, timeout
print(f"ERROR executing statement from file line {lineno}: {s}\n{e}", file=sys.stderr)
return 1
finally:
cluster.shutdown()
safe_driver_shutdown(cluster)
return 0
def main(argv: Sequence[str]) -> int:

View File

@@ -58,7 +58,7 @@ async def get_ready_maintenance_session(socket_path: str, timeout: int = 60):
session.execute("SELECT key FROM system.local LIMIT 1")
return session
except Exception:
c.shutdown()
safe_driver_shutdown(c)
return None
session = await wait_for(try_connect, deadline)
@@ -90,7 +90,7 @@ async def connect_with_credentials(ip: str, username: str, password: str, timeou
try:
return c.connect()
except NoHostAvailable:
c.shutdown()
safe_driver_shutdown(c)
return None
return await wait_for(try_connect, time.time() + timeout)
@@ -240,7 +240,7 @@ async def test_no_default_superuser_maintenance_socket_ops(manager: ManagerClien
except Unauthorized:
return True
finally:
c.shutdown()
safe_driver_shutdown(c)
await wait_for(check_superuser_revoked, time.time() + 60)
@@ -257,11 +257,11 @@ async def test_no_default_superuser_maintenance_socket_ops(manager: ManagerClien
auth_provider=PlainTextAuthProvider(username=new_role, password=new_role_password))
try:
c.connect()
c.shutdown()
return None # Still cached, retry
except NoHostAvailable:
c.shutdown()
return True
finally:
safe_driver_shutdown(c)
await wait_for(check_role_dropped, time.time() + 60)

View File

@@ -19,7 +19,7 @@ from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.protocol import ResultMessage
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
from test.pylib.manager_client import ManagerClient
from test.pylib.manager_client import ManagerClient, safe_driver_shutdown
from test.pylib.util import unique_name
@@ -138,7 +138,7 @@ def _prepare_and_execute(host: str, query: str) -> tuple[bytes, bool, int]:
return prepared_metadata_id, captured["metadata_changed"], len(rows)
finally:
session.shutdown()
cluster.shutdown()
safe_driver_shutdown(cluster)
@pytest.mark.asyncio

View File

@@ -14,7 +14,7 @@ from unittest import mock
from cassandra.cluster import Cluster, DefaultConnection, NoHostAvailable
from cassandra import connection
from cassandra.auth import PlainTextAuthProvider
from test.pylib.manager_client import ManagerClient
from test.pylib.manager_client import ManagerClient, safe_driver_shutdown
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
@pytest.mark.asyncio
@@ -51,7 +51,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
# We expect failure or timeout
pass
finally:
c.shutdown()
safe_driver_shutdown(c)
def attempt_good_connection():
nonlocal connections_observed
@@ -66,7 +66,7 @@ async def test_startup_no_auth_response(manager: ManagerClient, build_mode):
if count >= num_connections/2:
connections_observed = True
finally:
c.shutdown()
safe_driver_shutdown(c)
loop = asyncio.get_running_loop()

View File

@@ -7,7 +7,7 @@
import asyncio
import pytest
from test.cluster.util import new_test_keyspace, new_test_table
from test.pylib.manager_client import ManagerClient
from test.pylib.manager_client import ManagerClient, safe_driver_shutdown
from test.pylib.util import wait_for
from cassandra.connection import UnixSocketEndPoint
from cassandra.policies import WhiteListRoundRobinPolicy
@@ -89,4 +89,4 @@ async def test_describe_cluster_sanity(manager: ManagerClient, mode: str):
assert describe_results[0].cluster == system_local_results[0].cluster_name
finally:
if mode == "maintenance":
cluster.shutdown()
safe_driver_shutdown(cluster)

View File

@@ -39,6 +39,8 @@ import typing
import uuid
import yaml
from test.pylib.driver_utils import safe_driver_shutdown
################################################################################
# Common aliases.
@@ -612,7 +614,7 @@ async def main(seed: int, partition_count: Optional[int], row_count: Optional[in
if list(result_rows) != [row]:
raise RuntimeError("Expected: {}, got: {}".format([row], result_rows))
finally:
cluster.shutdown()
safe_driver_shutdown(cluster)
if __name__ == "__main__":
parser = argparse.ArgumentParser()