query_processor::prepare() could race with prepared statement invalidation: after loading from the prepared cache, we converted the cached object to a checked weak pointer and then continued asynchronous work (including error-injection waitpoints). If invalidation happened in that window, the weak handle could no longer be promoted and the prepare path could fail nondeterministically. This change keeps a strong cache entry reference alive across the whole critical section in prepare() by using a pinned cache accessor (get_pinned()), and only deriving the weak handle while the entry is pinned. This removes the lifetime gap without adding retry loops. Test coverage was extended in test/cluster/test_prepare_race.py: - reproduces the invalidation-during-prepare window with injection, - verifies prepare completes successfully, - then invalidates again and executes the same stale client prepared object, - confirms the driver transparently re-requests/re-prepares and execution succeeds. This change introduces: - no behavior change for normal prepare flow besides stronger lifetime guarantees, - no new protocol semantics, - preserves existing cache invalidation logic, - adds explicit cluster-level regression coverage for both the race and driver reprepare path. - pushes the re prepare operation twards the driver, the server will return unprepared error for the first time and the driver will have to re prepare during execution stage
199 lines
6.6 KiB
C++
199 lines
6.6 KiB
C++
/*
|
|
* Copyright (C) 2017-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "utils/loading_cache.hh"
|
|
#include "utils/hash.hh"
|
|
#include "cql3/statements/prepared_statement.hh"
|
|
#include "cql3/column_specification.hh"
|
|
#include "cql3/dialect.hh"
|
|
|
|
namespace cql3 {
|
|
|
|
using prepared_cache_entry = std::unique_ptr<statements::prepared_statement>;
|
|
|
|
struct prepared_cache_entry_size {
|
|
size_t operator()(const prepared_cache_entry& val) {
|
|
// TODO: improve the size approximation
|
|
return 10000;
|
|
}
|
|
};
|
|
|
|
typedef bytes cql_prepared_id_type;
|
|
|
|
/// \brief The key of the prepared statements cache
|
|
///
|
|
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
|
|
/// the latter was introduced for unifying the CQL and Thrift prepared
|
|
/// statements so that they can be stored in the same cache.
|
|
class prepared_cache_key_type {
|
|
public:
|
|
// derive from cql_prepared_id_type so we can customize the formatter of
|
|
// cache_key_type
|
|
struct cache_key_type : public cql_prepared_id_type {
|
|
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
|
|
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
|
|
bool operator==(const cache_key_type& other) const = default;
|
|
};
|
|
|
|
private:
|
|
cache_key_type _key;
|
|
|
|
public:
|
|
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
|
|
|
|
cache_key_type& key() { return _key; }
|
|
const cache_key_type& key() const { return _key; }
|
|
|
|
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
|
|
return key.key();
|
|
}
|
|
|
|
bool operator==(const prepared_cache_key_type& other) const = default;
|
|
};
|
|
|
|
class prepared_statements_cache {
|
|
public:
|
|
struct stats {
|
|
uint64_t prepared_cache_evictions = 0;
|
|
uint64_t privileged_entries_evictions_on_size = 0;
|
|
uint64_t unprivileged_entries_evictions_on_size = 0;
|
|
};
|
|
|
|
static stats& shard_stats() {
|
|
static thread_local stats _stats;
|
|
return _stats;
|
|
}
|
|
|
|
struct prepared_cache_stats_updater {
|
|
static void inc_hits() noexcept {}
|
|
static void inc_misses() noexcept {}
|
|
static void inc_blocks() noexcept {}
|
|
static void inc_evictions() noexcept {
|
|
++shard_stats().prepared_cache_evictions;
|
|
}
|
|
static void inc_privileged_on_cache_size_eviction() noexcept {
|
|
++shard_stats().privileged_entries_evictions_on_size;
|
|
}
|
|
static void inc_unprivileged_on_cache_size_eviction() noexcept {
|
|
++shard_stats().unprivileged_entries_evictions_on_size;
|
|
}
|
|
};
|
|
|
|
private:
|
|
using cache_key_type = typename prepared_cache_key_type::cache_key_type;
|
|
// Keep the entry in the "unprivileged" cache section till 2 hits because
|
|
// every prepared statement is accessed at least twice in the cache:
|
|
// 1) During PREPARE
|
|
// 2) During EXECUTE
|
|
//
|
|
// Therefore a typical "pollution" (when a cache entry is used only once) would involve
|
|
// 2 cache hits.
|
|
using cache_type = utils::loading_cache<cache_key_type, prepared_cache_entry, 2, utils::loading_cache_reload_enabled::no, prepared_cache_entry_size, std::hash<cache_key_type>, std::equal_to<cache_key_type>, prepared_cache_stats_updater, prepared_cache_stats_updater>;
|
|
using cache_value_ptr = typename cache_type::value_ptr;
|
|
using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr;
|
|
|
|
public:
|
|
static const std::chrono::minutes entry_expiry;
|
|
|
|
using key_type = prepared_cache_key_type;
|
|
using pinned_value_type = cache_value_ptr;
|
|
using value_type = checked_weak_ptr;
|
|
using statement_is_too_big = typename cache_type::entry_is_too_big;
|
|
|
|
private:
|
|
cache_type _cache;
|
|
|
|
public:
|
|
prepared_statements_cache(logging::logger& logger, size_t size)
|
|
: _cache(size, entry_expiry, logger)
|
|
{}
|
|
|
|
template <typename LoadFunc>
|
|
future<pinned_value_type> get_pinned(const key_type& key, LoadFunc&& load) {
|
|
return _cache.get_ptr(key.key(), [load = std::forward<LoadFunc>(load)] (const cache_key_type&) { return load(); });
|
|
}
|
|
|
|
template <typename LoadFunc>
|
|
future<value_type> get(const key_type& key, LoadFunc&& load) {
|
|
return get_pinned(key, std::forward<LoadFunc>(load)).then([] (cache_value_ptr v_ptr) {
|
|
return make_ready_future<value_type>((*v_ptr)->checked_weak_from_this());
|
|
});
|
|
}
|
|
|
|
// "Touch" the corresponding cache entry in order to bump up its reference count.
|
|
void touch(const key_type& key) {
|
|
// loading_cache::find() returns a value_ptr object which constructor does the "thouching".
|
|
_cache.find(key.key());
|
|
}
|
|
|
|
value_type find(const key_type& key) {
|
|
cache_value_ptr vp = _cache.find(key.key());
|
|
if (vp) {
|
|
return (*vp)->checked_weak_from_this();
|
|
}
|
|
return value_type();
|
|
}
|
|
|
|
template <typename Pred>
|
|
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
|
void remove_if(Pred&& pred) {
|
|
_cache.remove_if([&pred] (const prepared_cache_entry& e) {
|
|
return pred(e->statement);
|
|
});
|
|
}
|
|
|
|
size_t size() const {
|
|
return _cache.size();
|
|
}
|
|
|
|
size_t memory_footprint() const {
|
|
return _cache.memory_footprint();
|
|
}
|
|
|
|
future<> stop() {
|
|
return _cache.stop();
|
|
}
|
|
};
|
|
}
|
|
|
|
namespace std {
|
|
|
|
template<>
|
|
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
|
|
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
|
|
return std::hash<cql3::cql_prepared_id_type>()(k);
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct hash<cql3::prepared_cache_key_type> final {
|
|
size_t operator()(const cql3::prepared_cache_key_type& k) const {
|
|
return std::hash<cql3::cql_prepared_id_type>()(k.key());
|
|
}
|
|
};
|
|
}
|
|
|
|
// for prepared_statements_cache log printouts
|
|
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
|
|
}
|
|
};
|
|
|
|
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{}", p.key());
|
|
}
|
|
};
|