mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Time-based cache eviction
Cached queriers should not sit in the cache indefinitely otherwise abandoned reads would cause excess and unncessary resource-usage. Attach an expiry timer to each cache-entry which evicts it after the TTL passes.
This commit is contained in:
@@ -1316,6 +1316,10 @@ public:
|
||||
return *_stats;
|
||||
}
|
||||
|
||||
void set_querier_cache_entry_ttl(std::chrono::seconds entry_ttl) {
|
||||
_querier_cache.set_entry_ttl(entry_ttl);
|
||||
}
|
||||
|
||||
friend class distributed_loader;
|
||||
};
|
||||
|
||||
|
||||
33
querier.cc
33
querier.cc
@@ -142,6 +142,19 @@ querier::can_use querier::can_be_used_for_page(emit_only_live_rows only_live, co
|
||||
return can_use::yes;
|
||||
}
|
||||
|
||||
// The time-to-live of a cache-entry.
|
||||
const std::chrono::seconds querier_cache::default_entry_ttl{10};
|
||||
|
||||
void querier_cache::scan_cache_entries() {
|
||||
const auto now = lowres_clock::now();
|
||||
|
||||
auto it = _meta_entries.begin();
|
||||
const auto end = _meta_entries.end();
|
||||
while (it != end && it->is_expired(now)) {
|
||||
it = _meta_entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) {
|
||||
const auto queriers = _entries.equal_range(key);
|
||||
|
||||
@@ -150,8 +163,8 @@ querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, co
|
||||
return _entries.end();
|
||||
}
|
||||
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair<const utils::UUID, querier>& elem) {
|
||||
return elem.second.matches(range);
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair<const utils::UUID, entry>& elem) {
|
||||
return elem.second.get().matches(range);
|
||||
});
|
||||
|
||||
if (it == queriers.second) {
|
||||
@@ -161,6 +174,12 @@ querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, co
|
||||
return it;
|
||||
}
|
||||
|
||||
querier_cache::querier_cache(std::chrono::seconds entry_ttl)
|
||||
: _expiry_timer([this] { scan_cache_entries(); })
|
||||
, _entry_ttl(entry_ttl) {
|
||||
_expiry_timer.arm_periodic(entry_ttl / 2);
|
||||
}
|
||||
|
||||
void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
// FIXME: see #3159
|
||||
// In reverse mode flat_mutation_reader drops any remaining rows of the
|
||||
@@ -171,7 +190,8 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt
|
||||
}
|
||||
|
||||
tracing::trace(trace_state, "Caching querier with key {}", key);
|
||||
_entries.emplace(key, std::move(q));
|
||||
const auto it = _entries.emplace(key, entry::param{std::move(q), _entry_ttl}).first;
|
||||
_meta_entries.emplace_back(_entries, it);
|
||||
}
|
||||
|
||||
querier querier_cache::lookup(utils::UUID key,
|
||||
@@ -186,7 +206,7 @@ querier querier_cache::lookup(utils::UUID key,
|
||||
return create_fun();
|
||||
}
|
||||
|
||||
auto q = std::move(it->second);
|
||||
auto q = std::move(it->second).get();
|
||||
_entries.erase(it);
|
||||
|
||||
const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice);
|
||||
@@ -199,6 +219,11 @@ querier querier_cache::lookup(utils::UUID key,
|
||||
return create_fun();
|
||||
}
|
||||
|
||||
void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) {
|
||||
_entry_ttl = entry_ttl;
|
||||
_expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2);
|
||||
}
|
||||
|
||||
querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page)
|
||||
: _cache(&cache)
|
||||
, _key(key)
|
||||
|
||||
78
querier.hh
78
querier.hh
@@ -24,6 +24,7 @@
|
||||
#include "mutation_compactor.hh"
|
||||
#include "mutation_reader.hh"
|
||||
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <variant>
|
||||
|
||||
/// One-stop object for serving queries.
|
||||
@@ -219,19 +220,89 @@ public:
|
||||
/// page will have a different schema version or will start from a position
|
||||
/// that is before the end position of the previous page. lookup() will
|
||||
/// recognize these cases and drop the previous querier and create a new one.
|
||||
///
|
||||
/// Inserted queriers will have a TTL. When this expires the querier is
|
||||
/// evicted. This is to avoid excess and unnecessary resource usage due to
|
||||
/// abandoned queriers.
|
||||
class querier_cache {
|
||||
using entries = std::unordered_map<utils::UUID, querier>;
|
||||
public:
|
||||
static const std::chrono::seconds default_entry_ttl;
|
||||
|
||||
private:
|
||||
class entry : public weakly_referencable<entry> {
|
||||
querier _querier;
|
||||
lowres_clock::time_point _expires;
|
||||
public:
|
||||
// Since entry cannot be moved and unordered_map::emplace can pass only
|
||||
// a single param to it's mapped-type we need to force a single-param
|
||||
// constructor for entry. Oh C++...
|
||||
struct param {
|
||||
querier q;
|
||||
std::chrono::seconds ttl;
|
||||
};
|
||||
|
||||
explicit entry(param p)
|
||||
: _querier(std::move(p.q))
|
||||
, _expires(lowres_clock::now() + p.ttl) {
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return _expires <= now;
|
||||
}
|
||||
|
||||
const querier& get() const & {
|
||||
return _querier;
|
||||
}
|
||||
|
||||
querier&& get() && {
|
||||
return std::move(_querier);
|
||||
}
|
||||
};
|
||||
|
||||
using entries = std::unordered_map<utils::UUID, entry>;
|
||||
|
||||
class meta_entry {
|
||||
entries& _entries;
|
||||
weak_ptr<entry> _entry_ptr;
|
||||
entries::iterator _entry_it;
|
||||
|
||||
public:
|
||||
meta_entry(entries& e, entries::iterator it)
|
||||
: _entries(e)
|
||||
, _entry_ptr(it->second.weak_from_this())
|
||||
, _entry_it(it) {
|
||||
}
|
||||
|
||||
~meta_entry() {
|
||||
if (_entry_ptr) {
|
||||
_entries.erase(_entry_it);
|
||||
}
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return !_entry_ptr || _entry_ptr->is_expired(now);
|
||||
}
|
||||
};
|
||||
|
||||
entries _entries;
|
||||
std::list<meta_entry> _meta_entries;
|
||||
timer<lowres_clock> _expiry_timer;
|
||||
std::chrono::seconds _entry_ttl;
|
||||
|
||||
entries::iterator find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state);
|
||||
|
||||
void scan_cache_entries();
|
||||
|
||||
public:
|
||||
querier_cache() = default;
|
||||
querier_cache(std::chrono::seconds entry_ttl = default_entry_ttl);
|
||||
|
||||
querier_cache(const querier_cache&) = delete;
|
||||
querier_cache& operator=(const querier_cache&) = delete;
|
||||
|
||||
// this is captured
|
||||
querier_cache(querier_cache&&) = delete;
|
||||
querier_cache& operator=(querier_cache&&) = delete;
|
||||
|
||||
void insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state);
|
||||
|
||||
/// Lookup a querier in the cache.
|
||||
@@ -257,6 +328,9 @@ public:
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
const noncopyable_function<querier()>& create_fun);
|
||||
|
||||
|
||||
void set_entry_ttl(std::chrono::seconds entry_ttl);
|
||||
};
|
||||
|
||||
class querier_cache_context {
|
||||
|
||||
Reference in New Issue
Block a user