From d5bcadcfdacbb238f8248e7b8b410db3862d6eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Dec 2017 17:09:34 +0200 Subject: [PATCH] Time-based cache eviction Cached queriers should not sit in the cache indefinitely otherwise abandoned reads would cause excess and unncessary resource-usage. Attach an expiry timer to each cache-entry which evicts it after the TTL passes. --- database.hh | 4 +++ querier.cc | 33 ++++++++++++++++++++--- querier.hh | 78 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 109 insertions(+), 6 deletions(-) diff --git a/database.hh b/database.hh index 66fb871086..f6a121fd79 100644 --- a/database.hh +++ b/database.hh @@ -1316,6 +1316,10 @@ public: return *_stats; } + void set_querier_cache_entry_ttl(std::chrono::seconds entry_ttl) { + _querier_cache.set_entry_ttl(entry_ttl); + } + friend class distributed_loader; }; diff --git a/querier.cc b/querier.cc index f411c7d042..f6ff7b06ef 100644 --- a/querier.cc +++ b/querier.cc @@ -142,6 +142,19 @@ querier::can_use querier::can_be_used_for_page(emit_only_live_rows only_live, co return can_use::yes; } +// The time-to-live of a cache-entry. +const std::chrono::seconds querier_cache::default_entry_ttl{10}; + +void querier_cache::scan_cache_entries() { + const auto now = lowres_clock::now(); + + auto it = _meta_entries.begin(); + const auto end = _meta_entries.end(); + while (it != end && it->is_expired(now)) { + it = _meta_entries.erase(it); + } +} + querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) { const auto queriers = _entries.equal_range(key); @@ -150,8 +163,8 @@ querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, co return _entries.end(); } - const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair& elem) { - return elem.second.matches(range); + const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair& elem) { + return elem.second.get().matches(range); }); if (it == queriers.second) { @@ -161,6 +174,12 @@ querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, co return it; } +querier_cache::querier_cache(std::chrono::seconds entry_ttl) + : _expiry_timer([this] { scan_cache_entries(); }) + , _entry_ttl(entry_ttl) { + _expiry_timer.arm_periodic(entry_ttl / 2); +} + void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) { // FIXME: see #3159 // In reverse mode flat_mutation_reader drops any remaining rows of the @@ -171,7 +190,8 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt } tracing::trace(trace_state, "Caching querier with key {}", key); - _entries.emplace(key, std::move(q)); + const auto it = _entries.emplace(key, entry::param{std::move(q), _entry_ttl}).first; + _meta_entries.emplace_back(_entries, it); } querier querier_cache::lookup(utils::UUID key, @@ -186,7 +206,7 @@ querier querier_cache::lookup(utils::UUID key, return create_fun(); } - auto q = std::move(it->second); + auto q = std::move(it->second).get(); _entries.erase(it); const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice); @@ -199,6 +219,11 @@ querier querier_cache::lookup(utils::UUID key, return create_fun(); } +void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) { + _entry_ttl = entry_ttl; + _expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2); +} + querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page) : _cache(&cache) , _key(key) diff --git a/querier.hh b/querier.hh index 38ab94ab5f..ac38249db8 100644 --- a/querier.hh +++ b/querier.hh @@ -24,6 +24,7 @@ #include "mutation_compactor.hh" #include "mutation_reader.hh" +#include #include /// One-stop object for serving queries. @@ -219,19 +220,89 @@ public: /// page will have a different schema version or will start from a position /// that is before the end position of the previous page. lookup() will /// recognize these cases and drop the previous querier and create a new one. +/// +/// Inserted queriers will have a TTL. When this expires the querier is +/// evicted. This is to avoid excess and unnecessary resource usage due to +/// abandoned queriers. class querier_cache { - using entries = std::unordered_map; +public: + static const std::chrono::seconds default_entry_ttl; + +private: + class entry : public weakly_referencable { + querier _querier; + lowres_clock::time_point _expires; + public: + // Since entry cannot be moved and unordered_map::emplace can pass only + // a single param to it's mapped-type we need to force a single-param + // constructor for entry. Oh C++... + struct param { + querier q; + std::chrono::seconds ttl; + }; + + explicit entry(param p) + : _querier(std::move(p.q)) + , _expires(lowres_clock::now() + p.ttl) { + } + + bool is_expired(const lowres_clock::time_point& now) const { + return _expires <= now; + } + + const querier& get() const & { + return _querier; + } + + querier&& get() && { + return std::move(_querier); + } + }; + + using entries = std::unordered_map; + + class meta_entry { + entries& _entries; + weak_ptr _entry_ptr; + entries::iterator _entry_it; + + public: + meta_entry(entries& e, entries::iterator it) + : _entries(e) + , _entry_ptr(it->second.weak_from_this()) + , _entry_it(it) { + } + + ~meta_entry() { + if (_entry_ptr) { + _entries.erase(_entry_it); + } + } + + bool is_expired(const lowres_clock::time_point& now) const { + return !_entry_ptr || _entry_ptr->is_expired(now); + } + }; entries _entries; + std::list _meta_entries; + timer _expiry_timer; + std::chrono::seconds _entry_ttl; entries::iterator find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state); + void scan_cache_entries(); + public: - querier_cache() = default; + querier_cache(std::chrono::seconds entry_ttl = default_entry_ttl); querier_cache(const querier_cache&) = delete; querier_cache& operator=(const querier_cache&) = delete; + // this is captured + querier_cache(querier_cache&&) = delete; + querier_cache& operator=(querier_cache&&) = delete; + void insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state); /// Lookup a querier in the cache. @@ -257,6 +328,9 @@ public: const query::partition_slice& slice, tracing::trace_state_ptr trace_state, const noncopyable_function& create_fun); + + + void set_entry_ttl(std::chrono::seconds entry_ttl); }; class querier_cache_context {