mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 17:10:35 +00:00
query_processor::prepare() could race with prepared statement invalidation: after loading from the prepared cache, we converted the cached object to a checked weak pointer and then continued asynchronous work (including error-injection waitpoints). If invalidation happened in that window, the weak handle could no longer be promoted and the prepare path could fail nondeterministically. This change keeps a strong cache entry reference alive across the whole critical section in prepare() by using a pinned cache accessor (get_pinned()), and only deriving the weak handle while the entry is pinned. This removes the lifetime gap without adding retry loops. Test coverage was extended in test/cluster/test_prepare_race.py: - reproduces the invalidation-during-prepare window with injection, - verifies prepare completes successfully, - then invalidates again and executes the same stale client prepared object, - confirms the driver transparently re-requests/re-prepares and execution succeeds. This change introduces: - no behavior change for normal prepare flow besides stronger lifetime guarantees, - no new protocol semantics, - preserves existing cache invalidation logic, - adds explicit cluster-level regression coverage for both the race and driver reprepare path. - pushes the re prepare operation twards the driver, the server will return unprepared error for the first time and the driver will have to re prepare during execution stage
66 lines
3.2 KiB
Python
66 lines
3.2 KiB
Python
#
|
|
# Copyright (C) 2026-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import asyncio
|
|
import pytest
|
|
|
|
from test.cluster.util import new_test_keyspace, new_test_table
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
|
|
async def test_prepare_fails_if_cached_statement_is_invalidated_mid_prepare(manager: ManagerClient):
|
|
server = await manager.server_add()
|
|
cql = manager.get_cql()
|
|
log = await manager.server_open_log(server.server_id)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY") as table:
|
|
query = f"SELECT * FROM {table} WHERE pk = ?"
|
|
loop = asyncio.get_running_loop()
|
|
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (7)")
|
|
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (8)")
|
|
|
|
handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
|
|
mark = await log.mark()
|
|
prepare_future = loop.run_in_executor(None, lambda: cql.prepare(query))
|
|
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=mark, timeout=60)
|
|
|
|
# Trigger table schema update (metadata-only) to invalidate prepared statements while PREPARE is paused.
|
|
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race'")
|
|
|
|
await handler.message()
|
|
done, _ = await asyncio.wait({prepare_future}, timeout=15)
|
|
if not done:
|
|
pytest.fail("Timed out waiting for PREPARE to complete after signaling injection")
|
|
|
|
result = done.pop().result()
|
|
print(f"PREPARE succeeded as expected: {result!r}")
|
|
|
|
rows = cql.execute(result, [7])
|
|
row = rows.one()
|
|
assert row is not None and row.pk == 7
|
|
|
|
# Invalidate prepared statements again, then execute the same prepared object.
|
|
# The driver should transparently re-prepare and re-request execution.
|
|
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race-again'")
|
|
|
|
reprepare_handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
|
|
reprepare_mark = await log.mark()
|
|
execute_future = loop.run_in_executor(None, lambda: cql.execute(result, [8]))
|
|
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=reprepare_mark, timeout=60)
|
|
|
|
await reprepare_handler.message()
|
|
execute_done, _ = await asyncio.wait({execute_future}, timeout=15)
|
|
if not execute_done:
|
|
pytest.fail("Timed out waiting for driver execute to finish after re-prepare signaling")
|
|
|
|
retried_rows = execute_done.pop().result()
|
|
retried_row = retried_rows.one()
|
|
assert retried_row is not None and retried_row.pk == 8
|