Files
scylladb/test/pylib/async_cql.py
Evgeniy Naydanov 1a0c14aa50 test.py: async_cql: remove unused event_loop fixture
Newer version of pytest-asyncio (0.24.0) allows to control the scope
of async loop per fixture.  Don't need this workaround anymore.
2025-03-30 03:19:30 +00:00

78 lines
2.5 KiB
Python

# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
async_cql:
This module provides a helper to run async CQL queries with Cassandra's Python driver
from asyncio loop.
Example usage:
from cassandra.cluster import Session, Cluster
from test.pylib.async_cql import run_async
Session.run_async = run_async
ccluster = Cluster(...)
cql = cluster.connect(...)
await cql.run_async(f"SELECT * FROM {table}")
"""
import asyncio
import logging
from cassandra.cluster import ResponseFuture # type: ignore # pylint: disable=no-name-in-module
logger = logging.getLogger(__name__)
def _wrap_future(driver_response_future: ResponseFuture, all_pages: bool = False) -> asyncio.Future:
"""Wrap a cassandra Future into an asyncio.Future object.
Args:
driver_response_future: future to wrap
all_pages: fetch all pages
Returns:
And asyncio.Future object which can be awaited.
"""
loop = asyncio.get_event_loop()
aio_future = loop.create_future()
_result = []
def on_result(result):
if aio_future.done():
logger.debug("_wrap_future: on_result() on already done future: %s", result)
else:
if result is None:
loop.call_soon_threadsafe(aio_future.set_result, None)
else:
_result.extend(result)
if driver_response_future.has_more_pages and all_pages:
driver_response_future.start_fetching_next_page()
else:
loop.call_soon_threadsafe(aio_future.set_result, _result)
def on_error(exception, *_):
if not aio_future.done():
loop.call_soon_threadsafe(aio_future.set_exception, exception)
else:
logger.debug("_wrap_future: on_error(): %s", exception)
driver_response_future.add_callback(on_result)
driver_response_future.add_errback(on_error)
return aio_future
# TODO: paged result query handling (iterable?)
def run_async(self, *args, all_pages = True, **kwargs) -> asyncio.Future:
"""Execute a CQL query asynchronously by wrapping the driver's future"""
# The default timeouts should have been more than enough, but in some
# extreme cases with a very slow debug build running on a slow or very busy
# machine, they may not be. Observed tests reach 160 seconds. So it's
# incremented to 200 seconds.
# See issue #11289.
kwargs.setdefault("timeout", 200.0)
return _wrap_future(self.execute_async(*args, **kwargs), all_pages = all_pages)