From ec3fed5c4dd281f610c63709b35c37ff518eafdc Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 10 Jul 2017 14:36:05 -0400 Subject: [PATCH 01/12] utils: introduce loading_shared_values This class implements an key-value container that is populated using the provided asynchronous callback. The value is loaded when there are active references to the value for the given key. Container ensures that only one entry is loaded per key at any given time. The returned value is a lw_shared_ptr to the actual value. The value for a specific key is immediately evicted when there are no more references to it. The container is based on the boost::intrusive::unordered_set and is rehashed (grown) if needed every time a new value is added (asynchronously loaded). The container has a rehash() method that would grow or shrink the container as needed in order to get the load factor into the [0.25, 0.75] range. Signed-off-by: Vlad Zolotarov --- utils/loading_shared_values.hh | 322 +++++++++++++++++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100644 utils/loading_shared_values.hh diff --git a/utils/loading_shared_values.hh b/utils/loading_shared_values.hh new file mode 100644 index 0000000000..c5c833cb8b --- /dev/null +++ b/utils/loading_shared_values.hh @@ -0,0 +1,322 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "seastarx.hh" +#include "stdx.hh" + +namespace bi = boost::intrusive; + +namespace utils { + +struct do_nothing_loading_shared_values_stats { + static void inc_hits() noexcept {} // Increase the number of times entry was found ready + static void inc_misses() noexcept {} // Increase the number of times entry was not found + static void inc_blocks() noexcept {} // Increase the number of times entry was not ready (>= misses) + static void inc_evictions() noexcept {} // Increase the number of times entry was evicted +}; + +// Entries stay around as long as there is any live external reference (entry_ptr) to them. +// Supports asynchronous insertion, ensures that only one entry will be loaded. +// InitialBucketsCount is required to be greater than zero. Otherwise a constructor will throw an +// std::invalid_argument exception. +template, + typename EqualPred = std::equal_to, + typename Stats = do_nothing_loading_shared_values_stats, + size_t InitialBucketsCount = 16> +GCC6_CONCEPT( requires requires () { + Stats::inc_hits(); + Stats::inc_misses(); + Stats::inc_blocks(); + Stats::inc_evictions(); +}) +class loading_shared_values { +public: + using key_type = Key; + using value_type = Tp; + static constexpr size_t initial_buckets_count = InitialBucketsCount; + +private: + class entry : public bi::unordered_set_base_hook>, public enable_lw_shared_from_this { + private: + loading_shared_values& _parent; + key_type _key; + stdx::optional _val; + shared_promise<> _loaded; + + public: + const key_type& key() const noexcept { + return _key; + } + + const value_type& value() const noexcept { + return *_val; + } + + value_type& value() noexcept { + return *_val; + } + + /// \brief "Release" the object from the contained value. + /// After this call the state of the value kept inside this object is undefined and it may no longer be used. + /// + /// \return The r-value reference to the value kept inside this object. + value_type&& release() { + return *std::move(_val); + } + + void set_value(value_type new_val) { + _val.emplace(std::move(new_val)); + } + + shared_promise<>& loaded() { + return _loaded; + } + + bool ready() const noexcept { + return bool(_val); + } + + struct key_eq { + bool operator()(const key_type& k, const entry& c) const { + return EqualPred()(k, c.key()); + } + + bool operator()(const entry& c, const key_type& k) const { + return EqualPred()(c.key(), k); + } + }; + + entry(loading_shared_values& parent, key_type k) + : _parent(parent), _key(std::move(k)) {} + + ~entry() { + _parent._set.erase(_parent._set.iterator_to(*this)); + Stats::inc_evictions(); + } + + friend bool operator==(const entry& a, const entry& b){ + return EqualPred()(a.key(), b.key()); + } + + friend std::size_t hash_value(const entry& v) { + return Hash()(v.key()); + } + }; + + using set_type = bi::unordered_set, bi::compare_hash>; + using bi_set_bucket_traits = typename set_type::bucket_traits; + using set_iterator = typename set_type::iterator; + using value_extractor_fn = std::function; + enum class shrinking_is_allowed { no, yes }; + +public: + using iterator = boost::transform_iterator; + +public: + // Pointer to entry value + class entry_ptr { + lw_shared_ptr _e; + public: + using element_type = value_type; + entry_ptr() = default; + explicit entry_ptr(lw_shared_ptr e) : _e(std::move(e)) {} + entry_ptr& operator=(std::nullptr_t) noexcept { + _e = nullptr; + return *this; + } + explicit operator bool() const noexcept { return bool(_e); } + element_type& operator*() const noexcept { return _e->value(); } + element_type* operator->() const noexcept { return &_e->value(); } + + /// \brief Get the wrapped value. Avoid the copy if this is the last reference to this value. + /// If this is the last reference then the wrapped value is going to be std::move()ed. Otherwise it's going to + /// be copied. + /// \return The wrapped value. + element_type release() { + auto res = _e.owned() ? _e->release() : _e->value(); + _e = nullptr; + return res; + } + + friend class loading_shared_values; + }; + +private: + std::vector _buckets; + set_type _set; + value_extractor_fn _value_extractor_fn; + +public: + static const key_type& to_key(const entry_ptr& e_ptr) noexcept { + return e_ptr._e->key(); + } + + /// \throw std::invalid_argument if InitialBucketsCount is zero + loading_shared_values() + : _buckets(InitialBucketsCount) + , _set(bi_set_bucket_traits(_buckets.data(), _buckets.size())) + , _value_extractor_fn([] (entry& e) -> value_type& { return e.value(); }) + { + static_assert(noexcept(Stats::inc_evictions()), "Stats::inc_evictions must be non-throwing"); + static_assert(noexcept(Stats::inc_hits()), "Stats::inc_hits must be non-throwing"); + static_assert(noexcept(Stats::inc_misses()), "Stats::inc_misses must be non-throwing"); + static_assert(noexcept(Stats::inc_blocks()), "Stats::inc_blocks must be non-throwing"); + + static_assert(InitialBucketsCount && ((InitialBucketsCount & (InitialBucketsCount - 1)) == 0), "Initial buckets count should be a power of two"); + } + loading_shared_values(loading_shared_values&&) = default; + loading_shared_values(const loading_shared_values&) = delete; + ~loading_shared_values() { + assert(!_set.size()); + } + + /// \brief + /// Returns a future which resolves with a shared pointer to the entry for the given key. + /// Always returns a valid pointer if succeeds. + /// + /// If entry is missing, the loader is invoked. If entry is already loading, this invocation + /// will wait for prior loading to complete and use its result when it's done. + /// + /// The loader object does not survive deferring, so the caller must deal with its liveness. + template + future get_or_load(const key_type& key, Loader&& loader) noexcept { + static_assert(std::is_same, std::result_of_t>::value, "Bad Loader signature"); + try { + auto i = _set.find(key, Hash(), typename entry::key_eq()); + lw_shared_ptr e; + future<> f = make_ready_future<>(); + if (i != _set.end()) { + e = i->shared_from_this(); + // take a short cut if the value is ready + if (e->ready()) { + return make_ready_future(entry_ptr(std::move(e))); + } + f = e->loaded().get_shared_future(); + } else { + Stats::inc_misses(); + e = make_lw_shared(*this, key); + rehash_before_insert(); + _set.insert(*e); + // get_shared_future() may throw, so make sure to call it before invoking the loader(key) + f = e->loaded().get_shared_future(); + loader(key).then_wrapped([e](future&& val_fut) mutable { + if (val_fut.failed()) { + e->loaded().set_exception(val_fut.get_exception()); + } else { + e->set_value(val_fut.get0()); + e->loaded().set_value(); + } + }); + } + if (!f.available()) { + Stats::inc_blocks(); + return f.then([e]() mutable { + return entry_ptr(std::move(e)); + }); + } else if (f.failed()) { + return make_exception_future(std::move(f).get_exception()); + } else { + Stats::inc_hits(); + return make_ready_future(entry_ptr(std::move(e))); + } + } catch (...) { + return make_exception_future(std::current_exception()); + } + } + + /// \brief Try to rehash the container so that the load factor is between 0.25 and 0.75. + /// \throw May throw if allocation of a new buckets array throws. + void rehash() { + rehash(_set.size()); + } + + size_t buckets_count() const { + return _buckets.size(); + } + + size_t size() const { + return _set.size(); + } + + iterator end() { + return boost::make_transform_iterator(_set.end(), _value_extractor_fn); + } + + iterator begin() { + return boost::make_transform_iterator(_set.begin(), _value_extractor_fn); + } + + iterator find(const key_type& key) noexcept { + set_iterator it = _set.find(key, Hash(), typename entry::key_eq()); + if (it == _set.end() || !it->ready()) { + return end(); + } + return boost::make_transform_iterator(it, _value_extractor_fn); + } + +private: + void rehash_before_insert() noexcept { + try { + rehash(_set.size() + 1); + } catch (...) { + // if rehashing fails - continue with the current buckets array + } + } + + template + void rehash(size_t new_size) { + size_t new_buckets_count = 0; + + // Try to keep the load factor between 0.25 (when shrinking is allowed) and 0.75. + if (ShrinkingIsAllowed == shrinking_is_allowed::yes && new_size < buckets_count() / 4) { + if (!new_size) { + new_buckets_count = 1; + } else { + new_buckets_count = size_t(1) << log2floor(new_size * 4); + } + } else if (new_size > 3 * buckets_count() / 4) { + new_buckets_count = buckets_count() * 2; + } + + if (new_buckets_count < InitialBucketsCount) { + return; + } + + std::vector new_buckets(new_buckets_count); + _set.rehash(bi_set_bucket_traits(new_buckets.data(), new_buckets.size())); + _buckets = std::move(new_buckets); + } +}; + +} From d56684b1a542f1f18148c62bbf852cbfa8abd9d7 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 10 Jul 2017 22:21:14 -0400 Subject: [PATCH 02/12] sstables::shared_index_list: use utils::loading_shared_values Since utils::loading_shared_values API is based on the original shared_index_list this change is mostly a drop-in replacement of the corresponding parts. Signed-off-by: Vlad Zolotarov --- sstables/shared_index_lists.hh | 90 ++++++---------------------------- 1 file changed, 16 insertions(+), 74 deletions(-) diff --git a/sstables/shared_index_lists.hh b/sstables/shared_index_lists.hh index 343544a946..ee12807ed0 100644 --- a/sstables/shared_index_lists.hh +++ b/sstables/shared_index_lists.hh @@ -22,10 +22,9 @@ #pragma once #include "types.hh" -#include #include -#include #include +#include "utils/loading_shared_values.hh" namespace sstables { @@ -37,50 +36,26 @@ using index_list = std::vector; class shared_index_lists { public: using key_type = uint64_t; - struct stats { + static thread_local struct stats { uint64_t hits = 0; // Number of times entry was found ready uint64_t misses = 0; // Number of times entry was not found uint64_t blocks = 0; // Number of times entry was not ready (>= misses) - }; -private: - class entry : public enable_lw_shared_from_this { - public: - key_type key; - index_list list; - shared_promise<> loaded; - shared_index_lists& parent; + } _shard_stats; - entry(shared_index_lists& parent, key_type key) - : key(key), parent(parent) - { } - ~entry() { - parent._lists.erase(key); - } - bool operator==(const entry& e) const { return key == e.key; } - bool operator!=(const entry& e) const { return key != e.key; } + struct stats_updater { + static void inc_hits() noexcept { ++_shard_stats.hits; } + static void inc_misses() noexcept { ++_shard_stats.misses; } + static void inc_blocks() noexcept { ++_shard_stats.blocks; } + static void inc_evictions() noexcept {} }; - std::unordered_map _lists; - static thread_local stats _shard_stats; -public: + + using loading_shared_lists_type = utils::loading_shared_values, std::equal_to, stats_updater>; // Pointer to index_list - class list_ptr { - lw_shared_ptr _e; - public: - using element_type = index_list; - list_ptr() = default; - explicit list_ptr(lw_shared_ptr e) : _e(std::move(e)) {} - explicit operator bool() const { return static_cast(_e); } - index_list& operator*() { return _e->list; } - const index_list& operator*() const { return _e->list; } - index_list* operator->() { return &_e->list; } - const index_list* operator->() const { return &_e->list; } + using list_ptr = loading_shared_lists_type::entry_ptr; +private: - index_list release() { - auto res = _e.owned() ? index_list(std::move(_e->list)) : index_list(_e->list); - _e = {}; - return std::move(res); - } - }; + loading_shared_lists_type _lists; +public: shared_index_lists() = default; shared_index_lists(shared_index_lists&&) = delete; @@ -94,41 +69,8 @@ public: // // The loader object does not survive deferring, so the caller must deal with its liveness. template - future get_or_load(key_type key, Loader&& loader) { - auto i = _lists.find(key); - lw_shared_ptr e; - auto f = [&] { - if (i != _lists.end()) { - e = i->second->shared_from_this(); - return e->loaded.get_shared_future(); - } else { - ++_shard_stats.misses; - e = make_lw_shared(*this, key); - auto f = e->loaded.get_shared_future(); - auto res = _lists.emplace(key, e.get()); - assert(res.second); - futurize_apply(loader, key).then_wrapped([e](future&& f) mutable { - if (f.failed()) { - e->loaded.set_exception(f.get_exception()); - } else { - e->list = f.get0(); - e->loaded.set_value(); - } - }); - return f; - } - }(); - if (!f.available()) { - ++_shard_stats.blocks; - return f.then([e]() mutable { - return list_ptr(std::move(e)); - }); - } else if (f.failed()) { - return make_exception_future(std::move(f).get_exception()); - } else { - ++_shard_stats.hits; - return make_ready_future(list_ptr(std::move(e))); - } + future get_or_load(const key_type& key, Loader&& loader) { + return _lists.get_or_load(key, std::forward(loader)); } static const stats& shard_stats() { return _shard_stats; } From 6024014f92303d8866b4d932a7a7ff86c0b38226 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 18 Jul 2017 09:55:28 -0400 Subject: [PATCH 03/12] utils::loading_cache: rework on top of utils::loading_shared_values Get rid of the "proprietary" solution for asynchronous values on-demand loading. Use utils::loading_shared_values instead. We would still need to maintain intrusive set and list for efficient shrink and invalidate operations but their entry is not going to contain the actual key and value anymore but rather a loading_shared_values::entry_ptr which is essentially a shared pointer to a key-value pair value. In general, we added another level of dereferencing in order to get the key value but since we use the bi::store_hash in the hook and the bi::compare_hash in the bi::unordered_set this should not translate into an additional set lookup latency. Signed-off-by: Vlad Zolotarov --- utils/loading_cache.hh | 360 +++++++++++++++++++---------------------- 1 file changed, 165 insertions(+), 195 deletions(-) diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 4ea37c4434..0134d2eb49 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -29,76 +29,49 @@ #include #include -#include "utils/exceptions.hh" +#include "exceptions/exceptions.hh" +#include "utils/loading_shared_values.hh" +#include "log.hh" namespace bi = boost::intrusive; namespace utils { -// Simple variant of the "LoadingCache" used for permissions in origin. -typedef lowres_clock loading_cache_clock_type; -typedef bi::list_base_hook> auto_unlink_list_hook; +using loading_cache_clock_type = seastar::lowres_clock; +using auto_unlink_list_hook = bi::list_base_hook>; -template -class timestamped_val : public auto_unlink_list_hook, public bi::unordered_set_base_hook> { +template +class timestamped_val { public: - typedef bi::list> lru_list_type; - typedef Key key_type; - typedef Tp value_type; + using value_type = Tp; + using loading_values_type = typename utils::loading_shared_values; + class lru_entry; private: - std::experimental::optional _opt_value; + value_type _value; loading_cache_clock_type::time_point _loaded; loading_cache_clock_type::time_point _last_read; - lru_list_type& _lru_list; /// MRU item is at the front, LRU - at the back - Key _key; + lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back public: - struct key_eq { - bool operator()(const Key& k, const timestamped_val& c) const { - return EqualPred()(k, c.key()); - } - - bool operator()(const timestamped_val& c, const Key& k) const { - return EqualPred()(c.key(), k); - } - }; - - timestamped_val(lru_list_type& lru_list, const Key& key) - : _loaded(loading_cache_clock_type::now()) + timestamped_val(value_type val) + : _value(std::move(val)) + , _loaded(loading_cache_clock_type::now()) , _last_read(_loaded) - , _lru_list(lru_list) - , _key(key) {} - - timestamped_val(lru_list_type& lru_list, Key&& key) - : _loaded(loading_cache_clock_type::now()) - , _last_read(_loaded) - , _lru_list(lru_list) - , _key(std::move(key)) {} - - timestamped_val(const timestamped_val&) = default; + {} timestamped_val(timestamped_val&&) = default; - // Make sure copy/move-assignments don't go through the template below - timestamped_val& operator=(const timestamped_val&) = default; - timestamped_val& operator=(timestamped_val&) = default; - timestamped_val& operator=(timestamped_val&&) = default; + timestamped_val& operator=(value_type new_val) { + assert(_lru_entry_ptr); - template - timestamped_val& operator=(U&& new_val) { - _opt_value = std::forward(new_val); + _value = std::move(new_val); _loaded = loading_cache_clock_type::now(); return *this; } - const Tp& value() { - _last_read = loading_cache_clock_type::now(); + value_type& value() noexcept { touch(); - return _opt_value.value(); - } - - explicit operator bool() const noexcept { - return bool(_opt_value); + return _value; } loading_cache_clock_type::time_point last_read() const noexcept { @@ -109,65 +82,90 @@ public: return _loaded; } - const Key& key() const { - return _key; - } - - friend bool operator==(const timestamped_val& a, const timestamped_val& b){ - return EqualPred()(a.key(), b.key()); - } - - friend std::size_t hash_value(const timestamped_val& v) { - return Hash()(v.key()); + bool ready() const noexcept { + return _lru_entry_ptr; } private: + void touch() noexcept { + assert(_lru_entry_ptr); + _last_read = loading_cache_clock_type::now(); + _lru_entry_ptr->touch(); + } + + void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept { + _lru_entry_ptr = lru_entry_ptr; + } +}; + +/// \brief This is and LRU list entry which is also an anchor for a loading_cache value. +template +class timestamped_val::lru_entry : public auto_unlink_list_hook { +private: + using ts_value_type = timestamped_val; + using loading_values_type = typename ts_value_type::loading_values_type; + +public: + using lru_list_type = bi::list>; + using timestamped_val_ptr = typename loading_values_type::entry_ptr; + +private: + timestamped_val_ptr _ts_val_ptr; + lru_list_type& _lru_list; + +public: + lru_entry(timestamped_val_ptr ts_val, lru_list_type& lru_list) + : _ts_val_ptr(std::move(ts_val)) + , _lru_list(lru_list) + { + _ts_val_ptr->set_anchor_back_reference(this); + } + + ~lru_entry() { + _ts_val_ptr->set_anchor_back_reference(nullptr); + } + /// Set this item as the most recently used item. /// The MRU item is going to be at the front of the _lru_list, the LRU item - at the back. void touch() noexcept { auto_unlink_list_hook::unlink(); _lru_list.push_front(*this); } -}; -class shared_mutex { -private: - lw_shared_ptr _mutex_ptr; - -public: - shared_mutex() : _mutex_ptr(make_lw_shared(1)) {} - semaphore& get() const noexcept { - return *_mutex_ptr; + const Key& key() const noexcept { + return loading_values_type::to_key(_ts_val_ptr); } + + timestamped_val& timestamped_value() noexcept { return *_ts_val_ptr; } + const timestamped_val& timestamped_value() const noexcept { return *_ts_val_ptr; } + timestamped_val_ptr timestamped_value_ptr() noexcept { return _ts_val_ptr; } }; +enum class loading_cache_reload_enabled { no, yes }; + template, typename EqualPred = std::equal_to, - typename Alloc = std::allocator>, - typename SharedMutexMapAlloc = std::allocator>> + typename LoadingSharedValuesStats = utils::do_nothing_loading_shared_values_stats, + typename Alloc = std::allocator::lru_entry>> class loading_cache { private: - typedef timestamped_val ts_value_type; - typedef bi::unordered_set, bi::compare_hash> set_type; - typedef std::unordered_map write_mutex_map_type; - typedef typename ts_value_type::lru_list_type lru_list_type; - typedef typename set_type::bucket_traits bi_set_bucket_traits; - - static constexpr int initial_num_buckets = 256; - static constexpr int max_num_buckets = 1024 * 1024; + using ts_value_type = timestamped_val; + using loading_values_type = typename ts_value_type::loading_values_type; + using timestamped_val_ptr = typename loading_values_type::entry_ptr; + using ts_value_lru_entry = typename ts_value_type::lru_entry; + using set_iterator = typename loading_values_type::iterator; + using lru_list_type = typename ts_value_lru_entry::lru_list_type; public: - typedef Tp value_type; - typedef Key key_type; - typedef typename set_type::iterator iterator; + using value_type = Tp; + using key_type = Key; + template loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger, Func&& load) - : _buckets(initial_num_buckets) - , _set(bi_set_bucket_traits(_buckets.data(), _buckets.size())) - , _max_size(max_size) + : _max_size(max_size) , _expiry(expiry) , _refresh(refresh) , _logger(logger) @@ -189,7 +187,7 @@ public: } ~loading_cache() { - _set.clear_and_dispose([] (ts_value_type* ptr) { loading_cache::destroy_ts_value(ptr); }); + _lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); }); } future get(const Key& k) { @@ -198,16 +196,24 @@ public: return _load(k); } - // If the key is not in the cache yet, then find_or_create() is going to - // create a new uninitialized value in the map. If the value is already - // in the cache (the fast path) simply return the value. Otherwise, take - // the mutex and try to load the value (the slow path). - iterator ts_value_it = find_or_create(k); - if (*ts_value_it) { - return make_ready_future(ts_value_it->value()); - } else { - return slow_load(k); - } + return _loading_values.get_or_load(k, [this] (const Key& k) { + return _load(k).then([this] (value_type val) { + return ts_value_type(std::move(val)); + }); + }).then([this, k] (timestamped_val_ptr ts_val_ptr) { + // check again since it could have already been inserted and initialized + if (!ts_val_ptr->ready()) { + _logger.trace("{}: storing the value for the first time", k); + + ts_value_lru_entry* new_lru_entry = Alloc().allocate(1); + new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), _lru_list); + + // This will "touch" the entry and add it to the LRU list. + return make_ready_future(new_lru_entry->timestamped_value_ptr()->value()); + } + + return make_ready_future(ts_val_ptr->value()); + }); } future<> stop() { @@ -215,58 +221,42 @@ public: } private: + set_iterator set_find(const Key& k) noexcept { + set_iterator it = _loading_values.find(k); + set_iterator end_it = set_end(); + + if (it == end_it || !it->ready()) { + return end_it; + } + return it; + } + + set_iterator set_end() noexcept { + return _loading_values.end(); + } + + set_iterator set_begin() noexcept { + return _loading_values.begin(); + } + bool caching_enabled() const { return _expiry != std::chrono::milliseconds(0); } - /// Look for the entry with the given key. It it doesn't exist - create a new one and add it to the _set. - /// - /// \param k The key to look for - /// - /// \return An iterator to the value with the given key (always dirrerent from _set.end()) - template - iterator find_or_create(KeyType&& k) { - iterator i = _set.find(k, Hash(), typename ts_value_type::key_eq()); - if (i == _set.end()) { - ts_value_type* new_ts_val = Alloc().allocate(1); - new(new_ts_val) ts_value_type(_lru_list, std::forward(k)); - auto p = _set.insert(*new_ts_val); - i = p.first; - } - - return i; - } - - static void destroy_ts_value(ts_value_type* val) { - val->~ts_value_type(); + static void destroy_ts_value(ts_value_lru_entry* val) { + val->~ts_value_lru_entry(); Alloc().deallocate(val, 1); } - future slow_load(const Key& k) { - // If the key is not in the cache yet, then _write_mutex_map[k] is going - // to create a new value with the initialized mutex. The mutex is going - // to serialize the producers and only the first one is going to - // actually issue a load operation and initialize the value with the - // received result. The rest are going to see (and read) the initialized - // value when they enter the critical section. - shared_mutex sm = _write_mutex_map[k]; - return with_semaphore(sm.get(), 1, [this, k] { - iterator ts_value_it = find_or_create(k); - if (*ts_value_it) { - return make_ready_future(ts_value_it->value()); + future<> reload(ts_value_lru_entry& lru_entry) { + return _load(lru_entry.key()).then_wrapped([this, key = lru_entry.key()] (auto&& f) mutable { + // if the entry has been evicted by now - simply end here + set_iterator it = set_find(key); + if (it == set_end()) { + _logger.trace("{}: entry was dropped during the reload", key); + return make_ready_future<>(); } - _logger.trace("{}: storing the value for the first time", k); - return _load(k).then([this, k] (Tp t) { - // we have to "re-read" the _set here because the value may have been evicted by now - iterator ts_value_it = find_or_create(std::move(k)); - *ts_value_it = std::move(t); - return make_ready_future(ts_value_it->value()); - }); - }).finally([sm] {}); - } - future<> reload(ts_value_type& ts_val) { - return _load(ts_val.key()).then_wrapped([this, &ts_val] (auto&& f) { // The exceptions are related to the load operation itself. // We should ignore them for the background reads - if // they persist the value will age and will be reloaded in @@ -274,83 +264,66 @@ private: // will be propagated up to the user and will fail the // corresponding query. try { - ts_val = f.get0(); + *it = f.get0(); } catch (std::exception& e) { - _logger.debug("{}: reload failed: {}", ts_val.key(), e.what()); + _logger.debug("{}: reload failed: {}", key, e.what()); } catch (...) { - _logger.debug("{}: reload failed: unknown error", ts_val.key()); + _logger.debug("{}: reload failed: unknown error", key); } - }); - } - void erase(iterator it) { - _set.erase_and_dispose(it, [] (ts_value_type* ptr) { loading_cache::destroy_ts_value(ptr); }); - // no need to delete the item from _lru_list - it's auto-deleted + return make_ready_future<>(); + }); } void drop_expired() { auto now = loading_cache_clock_type::now(); - _lru_list.remove_and_dispose_if([now, this] (const ts_value_type& v) { + _lru_list.remove_and_dispose_if([now, this] (const ts_value_lru_entry& lru_entry) { using namespace std::chrono; // An entry should be discarded if it hasn't been reloaded for too long or nobody cares about it anymore + const ts_value_type& v = lru_entry.timestamped_value(); auto since_last_read = now - v.last_read(); auto since_loaded = now - v.loaded(); if (_expiry < since_last_read || _expiry < since_loaded) { - _logger.trace("drop_expired(): {}: dropping the entry: _expiry {}, ms passed since: loaded {} last_read {}", v.key(), _expiry.count(), duration_cast(since_loaded).count(), duration_cast(since_last_read).count()); + _logger.trace("drop_expired(): {}: dropping the entry: _expiry {}, ms passed since: loaded {} last_read {}", lru_entry.key(), _expiry.count(), duration_cast(since_loaded).count(), duration_cast(since_last_read).count()); return true; } return false; - }, [this] (ts_value_type* p) { - erase(_set.iterator_to(*p)); + }, [this] (ts_value_lru_entry* p) { + loading_cache::destroy_ts_value(p); }); } // Shrink the cache to the _max_size discarding the least recently used items void shrink() { - if (_set.size() > _max_size) { - auto num_items_to_erase = _set.size() - _max_size; + if (_loading_values.size() > _max_size) { + auto num_items_to_erase = _loading_values.size() - _max_size; for (size_t i = 0; i < num_items_to_erase; ++i) { using namespace std::chrono; - ts_value_type& ts_val = *_lru_list.rbegin(); - _logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", ts_val.key(), duration_cast(loading_cache_clock_type::now() - ts_val.last_read()).count()); - erase(_set.iterator_to(ts_val)); + auto it = _lru_list.rbegin(); + // This may happen if there are pending insertions into the loading_shared_values that hasn't been yet finalized. + // In this case the number of elements in the _lru_list will be less than the _loading_values.size(). + if (_lru_list.rbegin() == _lru_list.rend()) { + return; + } + ts_value_lru_entry& lru_entry = *it; + _logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count()); + loading_cache::destroy_ts_value(&lru_entry); } } } - void rehash() { - size_t new_buckets_count = 0; - - // Don't grow or shrink too fast even if there is a steep drop/growth in the number of elements in the set. - // Exponential growth/backoff should be good enough. - // - // Try to keep the load factor between 0.25 and 1.0. - if (_set.size() < _current_buckets_count / 4) { - new_buckets_count = _current_buckets_count / 4; - } else if (_set.size() > _current_buckets_count) { - new_buckets_count = _current_buckets_count * 2; + // Try to bring the load factors of the _loading_values into a known range. + void periodic_rehash() noexcept { + try { + _loading_values.rehash(); + } catch (...) { + // if rehashing fails - continue with the current buckets array } - - if (new_buckets_count < initial_num_buckets || new_buckets_count > max_num_buckets) { - return; - } - - std::vector new_buckets(new_buckets_count); - _set.rehash(bi_set_bucket_traits(new_buckets.data(), new_buckets.size())); - _logger.trace("rehash(): buckets count changed: {} -> {}", _current_buckets_count, new_buckets_count); - - _buckets.swap(new_buckets); - _current_buckets_count = new_buckets_count; } void on_timer() { _logger.trace("on_timer(): start"); - auto timer_start_tp = loading_cache_clock_type::now(); - - // Clear all cached mutexes - _write_mutex_map.clear(); - // Clean up items that were not touched for the whole _expiry period. drop_expired(); @@ -358,30 +331,27 @@ private: shrink(); // check if rehashing is needed and do it if it is. - rehash(); + periodic_rehash(); // Reload all those which vlaue needs to be reloaded. - with_gate(_timer_reads_gate, [this, timer_start_tp] { - return parallel_for_each(_set.begin(), _set.end(), [this, curr_time = timer_start_tp] (auto& ts_val) { - _logger.trace("on_timer(): {}: checking the value age", ts_val.key()); - if (ts_val && ts_val.loaded() + _refresh < curr_time) { - _logger.trace("on_timer(): {}: reloading the value", ts_val.key()); - return this->reload(ts_val); + with_gate(_timer_reads_gate, [this] { + return parallel_for_each(_lru_list.begin(), _lru_list.end(), [this] (ts_value_lru_entry& lru_entry) { + _logger.trace("on_timer(): {}: checking the value age", lru_entry.key()); + if (lru_entry.timestamped_value().loaded() + _refresh < loading_cache_clock_type::now()) { + _logger.trace("on_timer(): {}: reloading the value", lru_entry.key()); + return this->reload(lru_entry); } return now(); - }).finally([this, timer_start_tp] { + }).finally([this] { _logger.trace("on_timer(): rearming"); - _timer.arm(timer_start_tp + _timer_period); + _timer.arm(loading_cache_clock_type::now() + _timer_period); }); }); } - std::vector _buckets; - size_t _current_buckets_count = initial_num_buckets; - set_type _set; - write_mutex_map_type _write_mutex_map; + loading_values_type _loading_values; lru_list_type _lru_list; - size_t _max_size; + size_t _max_size = 0; std::chrono::milliseconds _expiry; std::chrono::milliseconds _refresh; loading_cache_clock_type::duration _timer_period; From c24d85f632cf1365666526b86612269a97181aec Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 14:47:44 -0400 Subject: [PATCH 04/12] utils::loading_cache: add EntrySize template parameter Allow a variable entry size parameter. Provide an EntrySize functor that would return a size for a specific entry. Signed-off-by: Vlad Zolotarov --- auth/auth.cc | 2 +- utils/loading_cache.hh | 100 ++++++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 23 deletions(-) diff --git a/auth/auth.cc b/auth/auth.cc index c3471d45d8..d72dc6116c 100644 --- a/auth/auth.cc +++ b/auth/auth.cc @@ -114,7 +114,7 @@ struct hash { class auth::auth::permissions_cache { public: - typedef utils::loading_cache, permission_set, utils::tuple_hash> cache_type; + typedef utils::loading_cache, permission_set, utils::simple_entry_size, utils::tuple_hash> cache_type; typedef typename cache_type::key_type key_type; permissions_cache() diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 0134d2eb49..ed20544a08 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -40,7 +40,7 @@ namespace utils { using loading_cache_clock_type = seastar::lowres_clock; using auto_unlink_list_hook = bi::list_base_hook>; -template +template class timestamped_val { public: using value_type = Tp; @@ -52,12 +52,14 @@ private: loading_cache_clock_type::time_point _loaded; loading_cache_clock_type::time_point _last_read; lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back + size_t _size = 0; public: timestamped_val(value_type val) : _value(std::move(val)) , _loaded(loading_cache_clock_type::now()) , _last_read(_loaded) + , _size(EntrySize()(_value)) {} timestamped_val(timestamped_val&&) = default; @@ -66,6 +68,9 @@ public: _value = std::move(new_val); _loaded = loading_cache_clock_type::now(); + _lru_entry_ptr->cache_size() -= _size; + _size = EntrySize()(_value); + _lru_entry_ptr->cache_size() += _size; return *this; } @@ -82,6 +87,10 @@ public: return _loaded; } + size_t size() const { + return _size; + } + bool ready() const noexcept { return _lru_entry_ptr; } @@ -98,11 +107,18 @@ private: } }; +template +struct simple_entry_size { + size_t operator()(const Tp& val) { + return 1; + } +}; + /// \brief This is and LRU list entry which is also an anchor for a loading_cache value. -template -class timestamped_val::lru_entry : public auto_unlink_list_hook { +template +class timestamped_val::lru_entry : public auto_unlink_list_hook { private: - using ts_value_type = timestamped_val; + using ts_value_type = timestamped_val; using loading_values_type = typename ts_value_type::loading_values_type; public: @@ -112,19 +128,27 @@ public: private: timestamped_val_ptr _ts_val_ptr; lru_list_type& _lru_list; + size_t& _cache_size; public: - lru_entry(timestamped_val_ptr ts_val, lru_list_type& lru_list) + lru_entry(timestamped_val_ptr ts_val, lru_list_type& lru_list, size_t& cache_size) : _ts_val_ptr(std::move(ts_val)) , _lru_list(lru_list) + , _cache_size(cache_size) { _ts_val_ptr->set_anchor_back_reference(this); + _cache_size += _ts_val_ptr->size(); } ~lru_entry() { + _cache_size -= _ts_val_ptr->size(); _ts_val_ptr->set_anchor_back_reference(nullptr); } + size_t& cache_size() noexcept { + return _cache_size; + } + /// Set this item as the most recently used item. /// The MRU item is going to be at the front of the _lru_list, the LRU item - at the back. void touch() noexcept { @@ -143,15 +167,50 @@ public: enum class loading_cache_reload_enabled { no, yes }; +/// \brief Loading cache is a cache that loads the value into the cache using the given asynchronous callback. +/// +/// Each cached value is reloaded after the "refresh" time period since it was loaded for the last time. +/// +/// The values are going to be evicted from the cache if they are not accessed during the "expiration" period or haven't +/// been reloaded even once during the same period. +/// +/// If "expiration" is set to zero - the caching is going to be disabled and get_XXX(...) is going to call the "loader" callback +/// every time in order to get the requested value. +/// +/// \note In order to avoid the eviction of cached entries due to "aging" of the contained value the user has to choose +/// the "expiration" to be at least ("refresh" + "load latency"). This way the value is going to stay in the cache and is going to be +/// read in a non-blocking way as long as it's frequently accessed. +/// +/// The cache is also limited in size and if adding the next value is going +/// to exceed the cache size limit the least recently used value(s) is(are) going to be evicted until the size of the cache +/// becomes such that adding the new value is not going to break the size limit. If the new entry's size is greater than +/// the cache size then the get_XXX(...) method is going to return a future with the loading_cache::entry_is_too_big exception. +/// +/// The size of the cache is defined as a sum of sizes of all cached entries. +/// The size of each entry is defined by the value returned by the \tparam EntrySize predicate applied on it. +/// +/// The get(key) or get_ptr(key) methods ensures that the "loader" callback is called only once for each cached entry regardless of how many +/// callers are calling for the get_XXX(key) for the same "key" at the same time. Only after the value is evicted from the cache +/// it's going to be "loaded" in the context of get_XXX(key). As long as the value is cached get_XXX(key) is going to return the +/// cached value immediately and reload it in the background every "refresh" time period as described above. +/// +/// \tparam Key type of the cache key +/// \tparam Tp type of the cached value +/// \tparam EntrySize predicate to calculate the entry size +/// \tparam Hash hash function +/// \tparam EqualPred equality predicate +/// \tparam LoadingSharedValuesStats statistics incrementing class (see utils::loading_shared_values) +/// \tparam Alloc elements allocator template, typename Hash = std::hash, typename EqualPred = std::equal_to, typename LoadingSharedValuesStats = utils::do_nothing_loading_shared_values_stats, - typename Alloc = std::allocator::lru_entry>> + typename Alloc = std::allocator::lru_entry>> class loading_cache { private: - using ts_value_type = timestamped_val; + using ts_value_type = timestamped_val; using loading_values_type = typename ts_value_type::loading_values_type; using timestamped_val_ptr = typename loading_values_type::entry_ptr; using ts_value_lru_entry = typename ts_value_type::lru_entry; @@ -162,6 +221,7 @@ public: using value_type = Tp; using key_type = Key; + class entry_is_too_big : public std::exception {}; template loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger, Func&& load) @@ -205,8 +265,12 @@ public: if (!ts_val_ptr->ready()) { _logger.trace("{}: storing the value for the first time", k); + if (ts_val_ptr->size() > _max_size) { + return make_exception_future(entry_is_too_big()); + } + ts_value_lru_entry* new_lru_entry = Alloc().allocate(1); - new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), _lru_list); + new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), _lru_list, _current_size); // This will "touch" the entry and add it to the LRU list. return make_ready_future(new_lru_entry->timestamped_value_ptr()->value()); @@ -295,20 +359,11 @@ private: // Shrink the cache to the _max_size discarding the least recently used items void shrink() { - if (_loading_values.size() > _max_size) { - auto num_items_to_erase = _loading_values.size() - _max_size; - for (size_t i = 0; i < num_items_to_erase; ++i) { - using namespace std::chrono; - auto it = _lru_list.rbegin(); - // This may happen if there are pending insertions into the loading_shared_values that hasn't been yet finalized. - // In this case the number of elements in the _lru_list will be less than the _loading_values.size(). - if (_lru_list.rbegin() == _lru_list.rend()) { - return; - } - ts_value_lru_entry& lru_entry = *it; - _logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count()); - loading_cache::destroy_ts_value(&lru_entry); - } + while (_current_size > _max_size) { + using namespace std::chrono; + ts_value_lru_entry& lru_entry = *_lru_list.rbegin(); + _logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count()); + loading_cache::destroy_ts_value(&lru_entry); } } @@ -351,6 +406,7 @@ private: loading_values_type _loading_values; lru_list_type _lru_list; + size_t _current_size = 0; size_t _max_size = 0; std::chrono::milliseconds _expiry; std::chrono::milliseconds _refresh; From a60a77dfc827df82fe11a6ac8dda3d619c021b9f Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 15:08:32 -0400 Subject: [PATCH 05/12] utils::loading_cache: add the ability to work with not-copy-constructable values Current get(...) interface restricts the cache to work only with copy-constructable values (it returns future). To make it able to work with non-copyable value we need to introduce an interface that would return something like a reference to the cached value (like regular containers do). We can't return future since the caller would have to ensure somehow that the underlying value is still alive. The much more safe and easy-to-use way would be to return a shared_ptr-like pointer to that value. "Luckily" to us we value we actually store in a cache is already wrapped into the lw_shared_ptr and we may simply return an object that impersonates itself as a smart_pointer value while it keeps a "reference" to an object stored in the cache. Signed-off-by: Vlad Zolotarov --- utils/loading_cache.hh | 59 ++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index ed20544a08..e826de9fe2 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -46,6 +46,7 @@ public: using value_type = Tp; using loading_values_type = typename utils::loading_shared_values; class lru_entry; + class value_ptr; private: value_type _value; @@ -74,10 +75,8 @@ public: return *this; } - value_type& value() noexcept { - touch(); - return _value; - } + value_type& value() noexcept { return _value; } + const value_type& value() const noexcept { return _value; } loading_cache_clock_type::time_point last_read() const noexcept { return _last_read; @@ -114,6 +113,26 @@ struct simple_entry_size { } }; +template +class timestamped_val::value_ptr { +private: + using ts_value_type = timestamped_val; + using loading_values_type = typename ts_value_type::loading_values_type; + +public: + using timestamped_val_ptr = typename loading_values_type::entry_ptr; + using value_type = Tp; + +private: + timestamped_val_ptr _ts_val_ptr; + +public: + value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) { _ts_val_ptr->touch(); } + explicit operator bool() const noexcept { return bool(_ts_val_ptr); } + value_type& operator*() const noexcept { return _ts_val_ptr->value(); } + value_type* operator->() const noexcept { return &_ts_val_ptr->value(); } +}; + /// \brief This is and LRU list entry which is also an anchor for a loading_cache value. template class timestamped_val::lru_entry : public auto_unlink_list_hook { @@ -220,6 +239,7 @@ private: public: using value_type = Tp; using key_type = Key; + using value_ptr = typename ts_value_type::value_ptr; class entry_is_too_big : public std::exception {}; @@ -250,11 +270,9 @@ public: _lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); }); } - future get(const Key& k) { - // If caching is disabled - always load in the foreground - if (!caching_enabled()) { - return _load(k); - } + future get_ptr(const Key& k) { + // We shouldn't be here if caching is disabled + assert(caching_enabled()); return _loading_values.get_or_load(k, [this] (const Key& k) { return _load(k).then([this] (value_type val) { @@ -266,17 +284,32 @@ public: _logger.trace("{}: storing the value for the first time", k); if (ts_val_ptr->size() > _max_size) { - return make_exception_future(entry_is_too_big()); + return make_exception_future(entry_is_too_big()); } ts_value_lru_entry* new_lru_entry = Alloc().allocate(1); new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), _lru_list, _current_size); - // This will "touch" the entry and add it to the LRU list. - return make_ready_future(new_lru_entry->timestamped_value_ptr()->value()); + // This will "touch" the entry and add it to the LRU list + value_ptr vp(new_lru_entry->timestamped_value_ptr()); + + return make_ready_future(std::move(vp)); } - return make_ready_future(ts_val_ptr->value()); + return make_ready_future(std::move(ts_val_ptr)); + }); + } + + future get(const Key& k) { + // If caching is disabled - always load in the foreground + if (!caching_enabled()) { + return _load(k).then([] (Tp val) { + return make_ready_future(std::move(val)); + }); + } + + return get_ptr(k).then([] (value_ptr v_ptr) { + return make_ready_future(*v_ptr); }); } From fa2f8162a5c2ab3debab991587cebd72373acdd8 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 15:26:14 -0400 Subject: [PATCH 06/12] utils::loading_cache: add the ability to create a cache that would not reload the values Sometimes we don't want the cached values to be periodically reloaded. This patch adds the ability to control this using a ReloadEnabled template parameter. In case the reloading is not needed the "loading" function is not given to the constructor but rather to the get_ptr(key, loader) method (currently it's the only method that is used, we may add the corresponding get(key, loader) method in the future when needed). Signed-off-by: Vlad Zolotarov --- auth/auth.cc | 2 +- utils/loading_cache.hh | 76 ++++++++++++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/auth/auth.cc b/auth/auth.cc index d72dc6116c..50900b0e0d 100644 --- a/auth/auth.cc +++ b/auth/auth.cc @@ -114,7 +114,7 @@ struct hash { class auth::auth::permissions_cache { public: - typedef utils::loading_cache, permission_set, utils::simple_entry_size, utils::tuple_hash> cache_type; + typedef utils::loading_cache, permission_set, utils::loading_cache_reload_enabled::yes, utils::simple_entry_size, utils::tuple_hash> cache_type; typedef typename cache_type::key_type key_type; permissions_cache() diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index e826de9fe2..4c7d34aed2 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -188,7 +188,8 @@ enum class loading_cache_reload_enabled { no, yes }; /// \brief Loading cache is a cache that loads the value into the cache using the given asynchronous callback. /// -/// Each cached value is reloaded after the "refresh" time period since it was loaded for the last time. +/// Each cached value if reloading is enabled (\tparam ReloadEnabled == loading_cache_reload_enabled::yes) is reloaded after +/// the "refresh" time period since it was loaded for the last time. /// /// The values are going to be evicted from the cache if they are not accessed during the "expiration" period or haven't /// been reloaded even once during the same period. @@ -197,8 +198,10 @@ enum class loading_cache_reload_enabled { no, yes }; /// every time in order to get the requested value. /// /// \note In order to avoid the eviction of cached entries due to "aging" of the contained value the user has to choose -/// the "expiration" to be at least ("refresh" + "load latency"). This way the value is going to stay in the cache and is going to be -/// read in a non-blocking way as long as it's frequently accessed. +/// the "expiration" to be at least ("refresh" + "max load latency"). This way the value is going to stay in the cache and is going to be +/// read in a non-blocking way as long as it's frequently accessed. Note however that since reloading is an asynchronous +/// procedure it may get delayed by other running task. Therefore choosing the "expiration" too close to the ("refresh" + "max load latency") +/// value one risks to have his/her cache values evicted when the system is heavily loaded. /// /// The cache is also limited in size and if adding the next value is going /// to exceed the cache size limit the least recently used value(s) is(are) going to be evicted until the size of the cache @@ -215,6 +218,7 @@ enum class loading_cache_reload_enabled { no, yes }; /// /// \tparam Key type of the cache key /// \tparam Tp type of the cached value +/// \tparam ReloadEnabled if loading_cache_reload_enabled::yes allow reloading the values otherwise don't reload /// \tparam EntrySize predicate to calculate the entry size /// \tparam Hash hash function /// \tparam EqualPred equality predicate @@ -222,6 +226,7 @@ enum class loading_cache_reload_enabled { no, yes }; /// \tparam Alloc elements allocator template, typename Hash = std::hash, typename EqualPred = std::equal_to, @@ -243,26 +248,49 @@ public: class entry_is_too_big : public std::exception {}; +private: + loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger) + : _max_size(max_size) + , _expiry(expiry) + , _refresh(refresh) + , _logger(logger) + , _timer([this] { on_timer(); }) + { + // Sanity check: if expiration period is given then non-zero refresh period and maximal size are required + if (caching_enabled() && (_refresh == std::chrono::milliseconds(0) || _max_size == 0)) { + throw exceptions::configuration_exception("loading_cache: caching is enabled but refresh period and/or max_size are zero"); + } + } + +public: template loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger, Func&& load) - : _max_size(max_size) - , _expiry(expiry) - , _refresh(refresh) - , _logger(logger) - , _load(std::forward(load)) { + : loading_cache(max_size, expiry, refresh, logger) + { + static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "This constructor should only be invoked when ReloadEnabled == loading_cache_reload_enabled::yes"); + + _load = std::forward(load); // If expiration period is zero - caching is disabled if (!caching_enabled()) { return; } - // Sanity check: if expiration period is given then non-zero refresh period and maximal size are required - if (_refresh == std::chrono::milliseconds(0) || _max_size == 0) { - throw exceptions::configuration_exception("loading_cache: caching is enabled but refresh period and/or max_size are zero"); + _timer_period = std::min(_expiry, _refresh); + _timer.arm(_timer_period); + } + + loading_cache(size_t max_size, std::chrono::milliseconds expiry, logging::logger& logger) + : loading_cache(max_size, expiry, loading_cache_clock_type::time_point::max().time_since_epoch(), logger) + { + static_assert(ReloadEnabled == loading_cache_reload_enabled::no, "This constructor should only be invoked when ReloadEnabled == loading_cache_reload_enabled::no"); + + // If expiration period is zero - caching is disabled + if (!caching_enabled()) { + return; } - _timer_period = std::min(_expiry, _refresh); - _timer.set_callback([this] { on_timer(); }); + _timer_period = _expiry; _timer.arm(_timer_period); } @@ -270,12 +298,13 @@ public: _lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); }); } - future get_ptr(const Key& k) { + template + future get_ptr(const Key& k, LoadFunc&& load) { // We shouldn't be here if caching is disabled assert(caching_enabled()); - return _loading_values.get_or_load(k, [this] (const Key& k) { - return _load(k).then([this] (value_type val) { + return _loading_values.get_or_load(k, [this, load = std::forward(load)] (const Key& k) mutable { + return load(k).then([this] (value_type val) { return ts_value_type(std::move(val)); }); }).then([this, k] (timestamped_val_ptr ts_val_ptr) { @@ -300,7 +329,14 @@ public: }); } + future get_ptr(const Key& k) { + static_assert(ReloadEnabled == loading_cache_reload_enabled::yes); + return get_ptr(k, _load); + } + future get(const Key& k) { + static_assert(ReloadEnabled == loading_cache_reload_enabled::yes); + // If caching is disabled - always load in the foreground if (!caching_enabled()) { return _load(k).then([] (Tp val) { @@ -380,7 +416,7 @@ private: const ts_value_type& v = lru_entry.timestamped_value(); auto since_last_read = now - v.last_read(); auto since_loaded = now - v.loaded(); - if (_expiry < since_last_read || _expiry < since_loaded) { + if (_expiry < since_last_read || (ReloadEnabled == loading_cache_reload_enabled::yes && _expiry < since_loaded)) { _logger.trace("drop_expired(): {}: dropping the entry: _expiry {}, ms passed since: loaded {} last_read {}", lru_entry.key(), _expiry.count(), duration_cast(since_loaded).count(), duration_cast(since_last_read).count()); return true; } @@ -421,6 +457,12 @@ private: // check if rehashing is needed and do it if it is. periodic_rehash(); + if (ReloadEnabled == loading_cache_reload_enabled::no) { + _logger.trace("on_timer(): rearming"); + _timer.arm(loading_cache_clock_type::now() + _timer_period); + return; + } + // Reload all those which vlaue needs to be reloaded. with_gate(_timer_reads_gate, [this] { return parallel_for_each(_lru_list.begin(), _lru_list.end(), [this] (ts_value_lru_entry& lru_entry) { From a13362e74b406cb67678dc6f5c520980d5c28d29 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 15:29:21 -0400 Subject: [PATCH 07/12] utils::loading_cache: add a bunch of standard synchronous methods Add a few standard synchronous methods to the cache, e.g. find(), remove_if(), etc. Signed-off-by: Vlad Zolotarov --- utils/loading_cache.hh | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 4c7d34aed2..33ea820b70 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -240,6 +240,7 @@ private: using ts_value_lru_entry = typename ts_value_type::lru_entry; using set_iterator = typename loading_values_type::iterator; using lru_list_type = typename ts_value_lru_entry::lru_list_type; + using value_extractor_fn = std::function; public: using value_type = Tp; @@ -247,6 +248,7 @@ public: using value_ptr = typename ts_value_type::value_ptr; class entry_is_too_big : public std::exception {}; + using iterator = boost::transform_iterator; private: loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger) @@ -255,6 +257,7 @@ private: , _refresh(refresh) , _logger(logger) , _timer([this] { on_timer(); }) + , _value_extractor_fn([] (ts_value_type& v) -> value_type& { return v.value(); }) { // Sanity check: if expiration period is given then non-zero refresh period and maximal size are required if (caching_enabled() && (_refresh == std::chrono::milliseconds(0) || _max_size == 0)) { @@ -353,6 +356,38 @@ public: return _timer_reads_gate.close().finally([this] { _timer.cancel(); }); } + iterator find(const Key& k) noexcept { + return boost::make_transform_iterator(set_find(k), _value_extractor_fn); + } + + iterator end() { + return boost::make_transform_iterator(_loading_values.end(), _value_extractor_fn); + } + + iterator begin() { + return boost::make_transform_iterator(_loading_values.begin(), _value_extractor_fn); + } + + template + void remove_if(Pred&& pred) { + static_assert(std::is_same>::value, "Bad Pred signature"); + + _lru_list.remove_and_dispose_if([this, &pred] (const ts_value_lru_entry& v) { + return pred(v.timestamped_value().value()); + }, [this] (ts_value_lru_entry* p) { + loading_cache::destroy_ts_value(p); + }); + } + + size_t size() const { + return _loading_values.size(); + } + + /// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate. + size_t memory_footprint() const { + return _current_size; + } + private: set_iterator set_find(const Key& k) noexcept { set_iterator it = _loading_values.find(k); @@ -490,6 +525,7 @@ private: std::function(const Key&)> _load; timer _timer; seastar::gate _timer_reads_gate; + value_extractor_fn _value_extractor_fn; }; } From 4e72a56310f9b5cf3b234ee2846116b426464dcf Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 15:30:56 -0400 Subject: [PATCH 08/12] utils::loading_cache: added static_asserts for checking the callbacks signatures Signed-off-by: Vlad Zolotarov --- utils/loading_cache.hh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 33ea820b70..33b31bce7a 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -271,6 +271,7 @@ public: : loading_cache(max_size, expiry, refresh, logger) { static_assert(ReloadEnabled == loading_cache_reload_enabled::yes, "This constructor should only be invoked when ReloadEnabled == loading_cache_reload_enabled::yes"); + static_assert(std::is_same, std::result_of_t>::value, "Bad Func signature"); _load = std::forward(load); @@ -303,6 +304,7 @@ public: template future get_ptr(const Key& k, LoadFunc&& load) { + static_assert(std::is_same, std::result_of_t>::value, "Bad LoadFunc signature"); // We shouldn't be here if caching is disabled assert(caching_enabled()); From 9a43398d6aef03a260cc1d1082860ba207db2842 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 11 Sep 2017 15:33:34 -0400 Subject: [PATCH 09/12] utils::loading_cache: make the size limitation more strict Ensure that the size of the cache is never bigger than the "max_size". Before this patch the size of the cache could have been indefinitely bigger than the requested value during the refresh time period which is clearly an undesirable behaviour. Signed-off-by: Vlad Zolotarov --- utils/loading_cache.hh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/utils/loading_cache.hh b/utils/loading_cache.hh index 33b31bce7a..014a0a1741 100644 --- a/utils/loading_cache.hh +++ b/utils/loading_cache.hh @@ -324,9 +324,12 @@ public: ts_value_lru_entry* new_lru_entry = Alloc().allocate(1); new(new_lru_entry) ts_value_lru_entry(std::move(ts_val_ptr), _lru_list, _current_size); - // This will "touch" the entry and add it to the LRU list + // This will "touch" the entry and add it to the LRU list - we must do this before the shrink() call. value_ptr vp(new_lru_entry->timestamped_value_ptr()); + // Remove the least recently used items if map is too big. + shrink(); + return make_ready_future(std::move(vp)); } @@ -488,9 +491,6 @@ private: // Clean up items that were not touched for the whole _expiry period. drop_expired(); - // Remove the least recently used items if map is too big. - shrink(); - // check if rehashing is needed and do it if it is. periodic_rehash(); From 8f912b46b1dbd1af012547ed95c00d30e4e8f9ad Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Wed, 9 Aug 2017 15:43:07 -0400 Subject: [PATCH 10/12] cql3: prepared statements cache on top of loading_cache This is a template class that implements caching of prepared statements for a given ID type: - Each cache instance is given 1/256 of the total shard memory. If the new entry is going to overflow this memory limit - the less recently used entries are going to be evicted so that the new entry could be added. - The memory consumption of a single prepared statement is defined by a cql3::prepared_cache_entry_size functor class that returns a number of bytes for a given prepared statement (currently returns 10000 bytes for any statement). - The cache entry is going to be evicted if not used for 60 minutes or more. Signed-off-by: Vlad Zolotarov --- cql3/prepared_statements_cache.hh | 168 ++++++++++++++++++++++++++++++ cql3/query_processor.cc | 3 + 2 files changed, 171 insertions(+) create mode 100644 cql3/prepared_statements_cache.hh diff --git a/cql3/prepared_statements_cache.hh b/cql3/prepared_statements_cache.hh new file mode 100644 index 0000000000..5177db8706 --- /dev/null +++ b/cql3/prepared_statements_cache.hh @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2017 ScyllaDB + * + * Modified by ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "utils/loading_cache.hh" +#include "cql3/statements/prepared_statement.hh" + +namespace cql3 { + +using prepared_cache_entry = std::unique_ptr; + +struct prepared_cache_entry_size { + size_t operator()(const prepared_cache_entry& val) { + // TODO: improve the size approximation + return 10000; + } +}; + +typedef bytes cql_prepared_id_type; +typedef int32_t thrift_prepared_id_type; + +/// \brief The key of the prepared statements cache +/// +/// We are going to store the CQL and Thrift prepared statements in the same cache therefore we need generate the key +/// that is going to be unique in both cases. Thrift use int32_t as a prepared statement ID, CQL - MD5 digest. +/// +/// We are going to use an std::pair as a key. For CQL statements we will use {CQL_PREP_ID, std::numeric_limits::max()} as a key +/// and for Thrift - {CQL_PREP_ID_TYPE(0), THRIFT_PREP_ID}. This way CQL and Thrift keys' values will never collide. +class prepared_cache_key_type { +public: + using cache_key_type = std::pair; + +private: + cache_key_type _key; + +public: + prepared_cache_key_type() = default; + explicit prepared_cache_key_type(cql_prepared_id_type cql_id) : _key(std::move(cql_id), std::numeric_limits::max()) {} + explicit prepared_cache_key_type(thrift_prepared_id_type thrift_id) : _key(cql_prepared_id_type(), thrift_id) {} + + cache_key_type& key() { return _key; } + const cache_key_type& key() const { return _key; } + + static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) { + return key.key().first; + } + static thrift_prepared_id_type thrift_id(const prepared_cache_key_type& key) { + return key.key().second; + } +}; + +class prepared_statements_cache { +public: + struct stats { + uint64_t prepared_cache_evictions = 0; + }; + + static stats& shard_stats() { + static thread_local stats _stats; + return _stats; + } + + struct prepared_cache_stats_updater { + static void inc_hits() noexcept {} + static void inc_misses() noexcept {} + static void inc_blocks() noexcept {} + static void inc_evictions() noexcept { + ++shard_stats().prepared_cache_evictions; + } + }; + +private: + using cache_key_type = typename prepared_cache_key_type::cache_key_type; + using cache_type = utils::loading_cache, prepared_cache_stats_updater>; + using cache_value_ptr = typename cache_type::value_ptr; + using cache_iterator = typename cache_type::iterator; + using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr; + using value_extractor_fn = std::function; + + static const std::chrono::minutes entry_expiry; + +public: + using key_type = prepared_cache_key_type; + using value_type = checked_weak_ptr; + using statement_is_too_big = typename cache_type::entry_is_too_big; + /// \note both iterator::reference and iterator::value_type are checked_weak_ptr + using iterator = boost::transform_iterator; + +private: + cache_type _cache; + value_extractor_fn _value_extractor_fn; + +public: + prepared_statements_cache(logging::logger& logger) + : _cache(memory::stats().total_memory() / 256, entry_expiry, logger) + , _value_extractor_fn([] (prepared_cache_entry& e) -> value_type { return e->checked_weak_from_this(); }) + {} + + template + future get(const key_type& key, LoadFunc&& load) { + return _cache.get_ptr(key.key(), [load = std::forward(load)] (const cache_key_type&) { return load(); }).then([] (cache_value_ptr v_ptr) { + return make_ready_future((*v_ptr)->checked_weak_from_this()); + }); + } + + iterator find(const key_type& key) { + return boost::make_transform_iterator(_cache.find(key.key()), _value_extractor_fn); + } + + iterator end() { + return boost::make_transform_iterator(_cache.end(), _value_extractor_fn); + } + + iterator begin() { + return boost::make_transform_iterator(_cache.begin(), _value_extractor_fn); + } + + template + void remove_if(Pred&& pred) { + static_assert(std::is_same)>>::value, "Bad Pred signature"); + + _cache.remove_if([&pred] (const prepared_cache_entry& e) { + return pred(e->statement); + }); + } + + size_t size() const { + return _cache.size(); + } + + size_t memory_footprint() const { + return _cache.memory_footprint(); + } +}; +} + +namespace std { // for prepared_statements_cache log printouts +inline std::ostream& operator<<(std::ostream& os, const typename cql3::prepared_cache_key_type::cache_key_type& p) { + os << "{cql_id: " << p.first << ", thrift_id: " << p.second << "}"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const cql3::prepared_cache_key_type& p) { + os << p.key(); + return os; +} +} diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 72868736f7..cd89813e84 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -50,6 +50,7 @@ #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1 #include +#include "cql3/prepared_statements_cache.hh" namespace cql3 { @@ -62,6 +63,8 @@ distributed _the_query_processor; const sstring query_processor::CQL_VERSION = "3.3.1"; +const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60); + class query_processor::internal_state { service::query_state _qs; public: From 66568be96960e870cde4c1f3b128c296aa7e0e63 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Thu, 10 Aug 2017 22:02:53 -0400 Subject: [PATCH 11/12] cql3::query_processor: implement CQL and Thrift prepared statements caches using cql3::prepared_statements_cache - Transition the prepared statements caches for both CQL and Trhift to the cql3::prepared_statements_cache class. - Add the corresponding metrics to the query_processor: - Evictions count. - Current entries count. - Current memory footprint. Fixes #2474 Signed-off-by: Vlad Zolotarov --- cql3/query_processor.cc | 93 +++++++++------------------- cql3/query_processor.hh | 118 +++++++++++++++++++++++------------- tests/cql_test_env.cc | 4 +- tests/cql_test_env.hh | 7 ++- tests/schema_change_test.cc | 2 +- thrift/handler.cc | 2 +- transport/server.cc | 18 +++--- 7 files changed, 121 insertions(+), 123 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index cd89813e84..834dbe5628 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -50,7 +50,6 @@ #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1 #include -#include "cql3/prepared_statements_cache.hh" namespace cql3 { @@ -58,6 +57,7 @@ using namespace statements; using namespace cql_transport::messages; logging::logger log("query_processor"); +logging::logger prep_cache_log("prepared_statements_cache"); distributed _the_query_processor; @@ -98,6 +98,7 @@ query_processor::query_processor(distributed& proxy, , _proxy(proxy) , _db(db) , _internal_state(new internal_state()) + , _prepared_cache(prep_cache_log) { namespace sm = seastar::metrics; @@ -133,6 +134,15 @@ query_processor::query_processor(distributed& proxy, sm::make_derive("batches_unlogged_from_logged", _cql_stats.batches_unlogged_from_logged, sm::description("Counts a total number of LOGGED batches that were executed as UNLOGGED batches.")), + + sm::make_derive("prepared_cache_evictions", [] { return prepared_statements_cache::shard_stats().prepared_cache_evictions; }, + sm::description("Counts a number of prepared statements cache entries evictions.")), + + sm::make_gauge("prepared_cache_size", [this] { return _prepared_cache.size(); }, + sm::description("A number of entries in the prepared statements cache.")), + + sm::make_gauge("prepared_cache_memory_footprint", [this] { return _prepared_cache.memory_footprint(); }, + sm::description("Size (in bytes) of the prepared statements cache.")), }); service::get_local_migration_manager().register_listener(_migration_subscriber.get()); @@ -200,31 +210,21 @@ query_processor::process_statement(::shared_ptr statement, } future<::shared_ptr> -query_processor::prepare(const std::experimental::string_view& query_string, service::query_state& query_state) +query_processor::prepare(sstring query_string, service::query_state& query_state) { auto& client_state = query_state.get_client_state(); - return prepare(query_string, client_state, client_state.is_thrift()); + return prepare(std::move(query_string), client_state, client_state.is_thrift()); } future<::shared_ptr> -query_processor::prepare(const std::experimental::string_view& query_string, - const service::client_state& client_state, - bool for_thrift) +query_processor::prepare(sstring query_string, const service::client_state& client_state, bool for_thrift) { - auto existing = get_stored_prepared_statement(query_string, client_state.get_raw_keyspace(), for_thrift); - if (existing) { - return make_ready_future<::shared_ptr>(existing); + using namespace cql_transport::messages; + if (for_thrift) { + return prepare_one(std::move(query_string), client_state, compute_thrift_id, prepared_cache_key_type::thrift_id); + } else { + return prepare_one(std::move(query_string), client_state, compute_id, prepared_cache_key_type::cql_id); } - - return futurize<::shared_ptr>::apply([this, &query_string, &client_state, for_thrift] { - auto prepared = get_statement(query_string, client_state); - auto bound_terms = prepared->statement->get_bound_terms(); - if (bound_terms > std::numeric_limits::max()) { - throw exceptions::invalid_request_exception(sprint("Too many markers(?). %d markers exceed the allowed maximum of %d", bound_terms, std::numeric_limits::max())); - } - assert(bound_terms == prepared->bound_names.size()); - return store_prepared_statement(query_string, client_state.get_raw_keyspace(), std::move(prepared), for_thrift); - }); } ::shared_ptr @@ -232,50 +232,11 @@ query_processor::get_stored_prepared_statement(const std::experimental::string_v const sstring& keyspace, bool for_thrift) { + using namespace cql_transport::messages; if (for_thrift) { - auto statement_id = compute_thrift_id(query_string, keyspace); - auto it = _thrift_prepared_statements.find(statement_id); - if (it == _thrift_prepared_statements.end()) { - return ::shared_ptr(); - } - return ::make_shared(statement_id, it->second->checked_weak_from_this()); + return get_stored_prepared_statement_one(query_string, keyspace, compute_thrift_id, prepared_cache_key_type::thrift_id); } else { - auto statement_id = compute_id(query_string, keyspace); - auto it = _prepared_statements.find(statement_id); - if (it == _prepared_statements.end()) { - return ::shared_ptr(); - } - return ::make_shared(statement_id, it->second->checked_weak_from_this()); - } -} - -future<::shared_ptr> -query_processor::store_prepared_statement(const std::experimental::string_view& query_string, - const sstring& keyspace, - std::unique_ptr prepared, - bool for_thrift) -{ -#if 0 - // Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352). - // (if the keyspace is null, queryString has to have a fully-qualified keyspace so it's fine. - long statementSize = measure(prepared.statement); - // don't execute the statement if it's bigger than the allowed threshold - if (statementSize > MAX_CACHE_PREPARED_MEMORY) - throw new InvalidRequestException(String.format("Prepared statement of size %d bytes is larger than allowed maximum of %d bytes.", - statementSize, - MAX_CACHE_PREPARED_MEMORY)); -#endif - prepared->raw_cql_statement = query_string.data(); - if (for_thrift) { - auto statement_id = compute_thrift_id(query_string, keyspace); - auto msg = ::make_shared(statement_id, prepared->checked_weak_from_this()); - _thrift_prepared_statements.emplace(statement_id, std::move(prepared)); - return make_ready_future<::shared_ptr>(std::move(msg)); - } else { - auto statement_id = compute_id(query_string, keyspace); - auto msg = ::make_shared(statement_id, prepared->checked_weak_from_this()); - _prepared_statements.emplace(statement_id, std::move(prepared)); - return make_ready_future<::shared_ptr>(std::move(msg)); + return get_stored_prepared_statement_one(query_string, keyspace, compute_id, prepared_cache_key_type::cql_id); } } @@ -292,19 +253,19 @@ static sstring hash_target(const std::experimental::string_view& query_string, c return keyspace + query_string.to_string(); } -bytes query_processor::compute_id(const std::experimental::string_view& query_string, const sstring& keyspace) +prepared_cache_key_type query_processor::compute_id(const std::experimental::string_view& query_string, const sstring& keyspace) { - return md5_calculate(hash_target(query_string, keyspace)); + return prepared_cache_key_type(md5_calculate(hash_target(query_string, keyspace))); } -int32_t query_processor::compute_thrift_id(const std::experimental::string_view& query_string, const sstring& keyspace) +prepared_cache_key_type query_processor::compute_thrift_id(const std::experimental::string_view& query_string, const sstring& keyspace) { auto target = hash_target(query_string, keyspace); uint32_t h = 0; for (auto&& c : hash_target(query_string, keyspace)) { h = 31*h + c; } - return static_cast(h); + return prepared_cache_key_type(static_cast(h)); } std::unique_ptr @@ -625,7 +586,7 @@ void query_processor::migration_subscriber::on_drop_view(const sstring& ks_name, void query_processor::migration_subscriber::remove_invalid_prepared_statements(sstring ks_name, std::experimental::optional cf_name) { - _qp->invalidate_prepared_statements([&] (::shared_ptr stmt) { + _qp->_prepared_cache.remove_if([&] (::shared_ptr stmt) { return this->should_invalidate(ks_name, cf_name, stmt); }); } diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 1b01eda4eb..88847cf539 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -57,6 +57,7 @@ #include "statements/prepared_statement.hh" #include "transport/messages/result_message.hh" #include "untyped_result_set.hh" +#include "prepared_statements_cache.hh" namespace cql3 { @@ -71,9 +72,32 @@ class batch_statement; */ struct internal_query_state; +class prepared_statement_is_too_big : public std::exception { +public: + static constexpr int max_query_prefix = 100; + +private: + sstring _msg; + +public: + prepared_statement_is_too_big(const sstring& query_string) + : _msg(seastar::format("Prepared statement is too big: {}", query_string.substr(0, max_query_prefix))) + { + // mark that we clipped the query string + if (query_string.size() > max_query_prefix) { + _msg += "..."; + } + } + + virtual const char* what() const noexcept override { + return _msg.c_str(); + } +}; + class query_processor { public: class migration_subscriber; + private: std::unique_ptr _migration_subscriber; distributed& _proxy; @@ -134,9 +158,7 @@ private: } }; #endif - - std::unordered_map> _prepared_statements; - std::unordered_map> _thrift_prepared_statements; + prepared_statements_cache _prepared_cache; std::unordered_map> _internal_statements; #if 0 @@ -228,21 +250,14 @@ private: } #endif public: - statements::prepared_statement::checked_weak_ptr get_prepared(const bytes& id) { - auto it = _prepared_statements.find(id); - if (it == _prepared_statements.end()) { + statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) { + auto it = _prepared_cache.find(key); + if (it == _prepared_cache.end()) { return statements::prepared_statement::checked_weak_ptr(); } - return it->second->checked_weak_from_this(); + return *it; } - statements::prepared_statement::checked_weak_ptr get_prepared_for_thrift(int32_t id) { - auto it = _thrift_prepared_statements.find(id); - if (it == _thrift_prepared_statements.end()) { - return statements::prepared_statement::checked_weak_ptr(); - } - return it->second->checked_weak_from_this(); - } #if 0 public static void validateKey(ByteBuffer key) throws InvalidRequestException { @@ -494,42 +509,61 @@ public: #endif future<::shared_ptr> - prepare(const std::experimental::string_view& query_string, service::query_state& query_state); + prepare(sstring query_string, service::query_state& query_state); future<::shared_ptr> - prepare(const std::experimental::string_view& query_string, const service::client_state& client_state, bool for_thrift); + prepare(sstring query_string, const service::client_state& client_state, bool for_thrift); - static bytes compute_id(const std::experimental::string_view& query_string, const sstring& keyspace); - static int32_t compute_thrift_id(const std::experimental::string_view& query_string, const sstring& keyspace); + static prepared_cache_key_type compute_id(const std::experimental::string_view& query_string, const sstring& keyspace); + static prepared_cache_key_type compute_thrift_id(const std::experimental::string_view& query_string, const sstring& keyspace); private: + /// + /// \tparam ResultMsgType type of the returned result message (CQL or Thrift) + /// \tparam PreparedKeyGenerator a function that generates the prepared statement cache key for given query and keyspace + /// \tparam IdGetter a function that returns the corresponding prepared statement ID (CQL or Thrift) for a given prepared statement cache key + /// \param query_string + /// \param client_state + /// \param id_gen prepared ID generator, called before the first deferring + /// \param id_getter prepared ID getter, passed to deferred context by reference. The caller must ensure its liveness. + /// \return + template + future<::shared_ptr> + prepare_one(sstring query_string, const service::client_state& client_state, PreparedKeyGenerator&& id_gen, IdGetter&& id_getter) { + return do_with(id_gen(query_string, client_state.get_raw_keyspace()), std::move(query_string), [this, &client_state, &id_getter] (const prepared_cache_key_type& key, const sstring& query_string) { + return _prepared_cache.get(key, [this, &query_string, &client_state] { + auto prepared = get_statement(query_string, client_state); + auto bound_terms = prepared->statement->get_bound_terms(); + if (bound_terms > std::numeric_limits::max()) { + throw exceptions::invalid_request_exception(sprint("Too many markers(?). %d markers exceed the allowed maximum of %d", bound_terms, std::numeric_limits::max())); + } + assert(bound_terms == prepared->bound_names.size()); + prepared->raw_cql_statement = query_string; + return make_ready_future>(std::move(prepared)); + }).then([&key, &id_getter] (auto prep_ptr) { + return make_ready_future<::shared_ptr>(::make_shared(id_getter(key), std::move(prep_ptr))); + }).handle_exception_type([&query_string] (typename prepared_statements_cache::statement_is_too_big&) { + return make_exception_future<::shared_ptr>(prepared_statement_is_too_big(query_string)); + }); + }); + }; + + template + ::shared_ptr + get_stored_prepared_statement_one(const std::experimental::string_view& query_string, const sstring& keyspace, KeyGenerator&& key_gen, IdGetter&& id_getter) + { + auto cache_key = key_gen(query_string, keyspace); + auto it = _prepared_cache.find(cache_key); + if (it == _prepared_cache.end()) { + return ::shared_ptr(); + } + + return ::make_shared(id_getter(cache_key), *it); + } + ::shared_ptr get_stored_prepared_statement(const std::experimental::string_view& query_string, const sstring& keyspace, bool for_thrift); - future<::shared_ptr> - store_prepared_statement(const std::experimental::string_view& query_string, const sstring& keyspace, std::unique_ptr prepared, bool for_thrift); - - // Erases the statements for which filter returns true. - template - void invalidate_prepared_statements(Pred filter) { - static_assert(std::is_same)>>::value, - "bad Pred signature"); - for (auto it = _prepared_statements.begin(); it != _prepared_statements.end(); ) { - if (filter(it->second->statement)) { - it = _prepared_statements.erase(it); - } else { - ++it; - } - } - for (auto it = _thrift_prepared_statements.begin(); it != _thrift_prepared_statements.end(); ) { - if (filter(it->second->statement)) { - it = _thrift_prepared_statements.erase(it); - } else { - ++it; - } - } - } - #if 0 public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 8c75a72b9e..648c4eb81e 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -127,7 +127,7 @@ public: }); } - virtual future prepare(sstring query) override { + virtual future prepare(sstring query) override { return qp().invoke_on_all([query, this] (auto& local_qp) { auto qs = this->make_query_state(); return local_qp.prepare(query, *qs).finally([qs] {}).discard_result(); @@ -137,7 +137,7 @@ public: } virtual future<::shared_ptr> execute_prepared( - bytes id, + cql3::prepared_cache_key_type id, std::vector values) override { auto prepared = local_qp().get_prepared(id); diff --git a/tests/cql_test_env.hh b/tests/cql_test_env.hh index ffd66d4b66..956a22bfd2 100644 --- a/tests/cql_test_env.hh +++ b/tests/cql_test_env.hh @@ -32,6 +32,7 @@ #include "transport/messages/result_message_base.hh" #include "cql3/query_options_fwd.hh" #include "cql3/values.hh" +#include "cql3/prepared_statements_cache.hh" #include "bytes.hh" #include "schema.hh" @@ -43,7 +44,7 @@ namespace cql3 { class not_prepared_exception : public std::runtime_error { public: - not_prepared_exception(const bytes& id) : std::runtime_error(sprint("Not prepared: %s", id)) {} + not_prepared_exception(const cql3::prepared_cache_key_type& id) : std::runtime_error(sprint("Not prepared: %s", id)) {} }; namespace db { @@ -59,10 +60,10 @@ public: virtual future<::shared_ptr> execute_cql( const sstring& text, std::unique_ptr qo) = 0; - virtual future prepare(sstring query) = 0; + virtual future prepare(sstring query) = 0; virtual future<::shared_ptr> execute_prepared( - bytes id, std::vector values) = 0; + cql3::prepared_cache_key_type id, std::vector values) = 0; virtual future<> create_table(std::function schema_maker) = 0; diff --git a/tests/schema_change_test.cc b/tests/schema_change_test.cc index e4f4525a2f..905b2b7731 100644 --- a/tests/schema_change_test.cc +++ b/tests/schema_change_test.cc @@ -364,7 +364,7 @@ SEASTAR_TEST_CASE(test_prepared_statement_is_invalidated_by_schema_change) { logging::logger_registry().set_logger_level("query_processor", logging::log_level::debug); e.execute_cql("create keyspace tests with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get(); e.execute_cql("create table tests.table1 (pk int primary key, c1 int, c2 int);").get(); - bytes id = e.prepare("select * from tests.table1;").get0(); + auto id = e.prepare("select * from tests.table1;").get0(); e.execute_cql("alter table tests.table1 add s1 int;").get(); diff --git a/thrift/handler.cc b/thrift/handler.cc index c65b224b4c..1d2f34971c 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -1002,7 +1002,7 @@ public: void execute_prepared_cql3_query(tcxx::function cob, tcxx::function exn_cob, const int32_t itemId, const std::vector & values, const ConsistencyLevel::type consistency) { with_exn_cob(std::move(exn_cob), [&] { - auto prepared = _query_processor.local().get_prepared_for_thrift(itemId); + auto prepared = _query_processor.local().get_prepared(cql3::prepared_cache_key_type(itemId)); if (!prepared) { throw make_exception("Prepared query with id %d not found", itemId); } diff --git a/transport/server.cc b/transport/server.cc index 3e1f5be758..36d8860df5 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -830,15 +830,15 @@ future cql_server::connection::process_prepare(uint16_t stream, b const auto& cs = *client_state; return parallel_for_each(cpus.begin(), cpus.end(), [this, query, cpu_id, &cs] (unsigned int c) mutable { if (c != cpu_id) { - return smp::submit_to(c, [this, query, &cs] () mutable { - return _server._query_processor.local().prepare(query, cs, false).discard_result(); + return smp::submit_to(c, [this, query = std::move(query), &cs] () mutable { + return _server._query_processor.local().prepare(std::move(query), cs, false).discard_result(); }); } else { return make_ready_future<>(); } - }).then([this, query, stream, &cs] { + }).then([this, query, stream, &cs] () mutable { tracing::trace(cs.get_trace_state(), "Done preparing on remote shards"); - return _server._query_processor.local().prepare(query, cs, false).then([this, stream, &cs] (auto msg) { + return _server._query_processor.local().prepare(std::move(query), cs, false).then([this, stream, &cs] (auto msg) { tracing::trace(cs.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] { return messages::result_message::prepared::cql::get_id(msg); })); @@ -852,8 +852,9 @@ future cql_server::connection::process_prepare(uint16_t stream, b future cql_server::connection::process_execute(uint16_t stream, bytes_view buf, service::client_state client_state) { - auto id = read_short_bytes(buf); - auto prepared = _server._query_processor.local().get_prepared(id); + cql3::prepared_cache_key_type cache_key(read_short_bytes(buf)); + auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); + auto prepared = _server._query_processor.local().get_prepared(cache_key); if (!prepared) { throw exceptions::prepared_query_not_found_exception(id); } @@ -929,8 +930,9 @@ cql_server::connection::process_batch(uint16_t stream, bytes_view buf, service:: break; } case 1: { - auto id = read_short_bytes(buf); - ps = _server._query_processor.local().get_prepared(id); + cql3::prepared_cache_key_type cache_key(read_short_bytes(buf)); + auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); + ps = _server._query_processor.local().get_prepared(cache_key); if (!ps) { throw exceptions::prepared_query_not_found_exception(id); } From cea15486c4f315624cad0e10fab963273c19d69c Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Fri, 15 Sep 2017 13:55:31 -0400 Subject: [PATCH 12/12] tests: loading_cache_test: initial commit Test utils::loading_shared_values and utils::loading_cache. Signed-off-by: Vlad Zolotarov --- configure.py | 1 + tests/loading_cache_test.cc | 321 ++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+) create mode 100644 tests/loading_cache_test.cc diff --git a/configure.py b/configure.py index 0b8536155e..387d9d80a3 100755 --- a/configure.py +++ b/configure.py @@ -246,6 +246,7 @@ scylla_tests = [ 'tests/vint_serialization_test', 'tests/compress_test', 'tests/chunked_vector_test', + 'tests/loading_cache_test', ] apps = [ diff --git a/tests/loading_cache_test.cc b/tests/loading_cache_test.cc new file mode 100644 index 0000000000..0e919db250 --- /dev/null +++ b/tests/loading_cache_test.cc @@ -0,0 +1,321 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include "utils/loading_shared_values.hh" +#include "utils/loading_cache.hh" +#include +#include +#include +#include +#include + + +#include "seastarx.hh" + +#include "tests/test-utils.hh" +#include "tmpdir.hh" +#include "log.hh" + +#include +#include +#include + +/// Get a random integer in the [0, max) range. +/// \param upper bound of the random value range +/// \return The uniformly distributed random integer from the [0, \ref max) range. +static int rand_int(int max) { + std::random_device rd; // only used once to initialise (seed) engine + std::mt19937 rng(rd()); // random-number engine used (Mersenne-Twister in this case) + std::uniform_int_distribution uni(0, max - 1); // guaranteed unbiased + return uni(rng); +} + + +#include "disk-error-handler.hh" + +thread_local disk_error_signal_type general_disk_error; +thread_local disk_error_signal_type commit_error; + +static const sstring test_file_name = "loading_cache_test.txt"; +static const sstring test_string = "1"; +static bool file_prepared = false; +static constexpr int num_loaders = 1000; + +static logging::logger test_logger("loading_cache_test"); + +static thread_local int load_count; +static const tmpdir& get_tmpdir() { + static thread_local tmpdir tmp; + return tmp; +} + +static future<> prepare() { + if (file_prepared) { + return make_ready_future<>(); + } + + return open_file_dma((boost::filesystem::path(get_tmpdir().path) / test_file_name.c_str()).c_str(), open_flags::create | open_flags::wo).then([] (file f) { + return do_with(std::move(f), [] (file& f) { + return f.dma_write(0, test_string.c_str(), test_string.size() + 1).then([] (size_t s) { + BOOST_REQUIRE_EQUAL(s, test_string.size() + 1); + file_prepared = true; + }); + }); + }); +} + +static future loader(const int& k) { + return open_file_dma((boost::filesystem::path(get_tmpdir().path) / test_file_name.c_str()).c_str(), open_flags::ro).then([] (file f) -> future { + return do_with(std::move(f), [] (file& f) -> future { + return f.dma_read_exactly(0, test_string.size() + 1).then([] (auto buf) { + sstring str(buf.get()); + BOOST_REQUIRE_EQUAL(str, test_string); + ++load_count; + return make_ready_future(std::move(str)); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_loading_shared_values_parallel_loading_same_key) { + return seastar::async([] { + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_shared_values shared_values; + std::list::entry_ptr> anchors_list; + + prepare().get(); + + std::fill(ivec.begin(), ivec.end(), 0); + + parallel_for_each(ivec, [&] (int& k) { + return shared_values.get_or_load(k, loader).then([&] (auto entry_ptr) { + anchors_list.emplace_back(std::move(entry_ptr)); + }); + }).get(); + + // "loader" must be called exactly once + BOOST_REQUIRE_EQUAL(load_count, 1); + BOOST_REQUIRE_EQUAL(shared_values.size(), 1); + anchors_list.clear(); + }); +} + +SEASTAR_TEST_CASE(test_loading_shared_values_parallel_loading_different_keys) { + return seastar::async([] { + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_shared_values shared_values; + std::list::entry_ptr> anchors_list; + + prepare().get(); + + std::iota(ivec.begin(), ivec.end(), 0); + + parallel_for_each(ivec, [&] (int& k) { + return shared_values.get_or_load(k, loader).then([&] (auto entry_ptr) { + anchors_list.emplace_back(std::move(entry_ptr)); + }); + }).get(); + + // "loader" must be called once for each key + BOOST_REQUIRE_EQUAL(load_count, num_loaders); + BOOST_REQUIRE_EQUAL(shared_values.size(), num_loaders); + anchors_list.clear(); + }); +} + +SEASTAR_TEST_CASE(test_loading_shared_values_rehash) { + return seastar::async([] { + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_shared_values shared_values; + std::list::entry_ptr> anchors_list; + + prepare().get(); + + std::iota(ivec.begin(), ivec.end(), 0); + + // verify that load factor is always in the (0.25, 0.75) range + for (int k = 0; k < num_loaders; ++k) { + shared_values.get_or_load(k, loader).then([&] (auto entry_ptr) { + anchors_list.emplace_back(std::move(entry_ptr)); + }).get(); + BOOST_REQUIRE_LE(shared_values.size(), 3 * shared_values.buckets_count() / 4); + } + + BOOST_REQUIRE_GE(shared_values.size(), shared_values.buckets_count() / 4); + + // minimum buckets count (by default) is 16, so don't check for less than 4 elements + for (int k = 0; k < num_loaders - 4; ++k) { + anchors_list.pop_back(); + shared_values.rehash(); + BOOST_REQUIRE_GE(shared_values.size(), shared_values.buckets_count() / 4); + } + + anchors_list.clear(); + }); +} + +SEASTAR_TEST_CASE(test_loading_shared_values_parallel_loading_explicit_eviction) { + return seastar::async([] { + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_shared_values shared_values; + std::vector::entry_ptr> anchors_vec(num_loaders); + + prepare().get(); + + std::iota(ivec.begin(), ivec.end(), 0); + + parallel_for_each(ivec, [&] (int& k) { + return shared_values.get_or_load(k, loader).then([&] (auto entry_ptr) { + anchors_vec[k] = std::move(entry_ptr); + }); + }).get(); + + int rand_key = rand_int(num_loaders); + BOOST_REQUIRE(shared_values.find(rand_key) != shared_values.end()); + anchors_vec[rand_key] = nullptr; + BOOST_REQUIRE_MESSAGE(shared_values.find(rand_key) == shared_values.end(), format("explicit removal for key {} failed", rand_key)); + anchors_vec.clear(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_loading_same_key) { + return seastar::async([] { + using namespace std::chrono; + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_cache loading_cache(num_loaders, 1s, test_logger); + + prepare().get(); + + std::fill(ivec.begin(), ivec.end(), 0); + + parallel_for_each(ivec, [&] (int& k) { + return loading_cache.get_ptr(k, loader).discard_result(); + }).get(); + + // "loader" must be called exactly once + BOOST_REQUIRE_EQUAL(load_count, 1); + BOOST_REQUIRE_EQUAL(loading_cache.size(), 1); + loading_cache.stop().get(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_loading_different_keys) { + return seastar::async([] { + using namespace std::chrono; + std::vector ivec(num_loaders); + load_count = 0; + utils::loading_cache loading_cache(num_loaders, 1s, test_logger); + + prepare().get(); + + std::iota(ivec.begin(), ivec.end(), 0); + + parallel_for_each(ivec, [&] (int& k) { + return loading_cache.get_ptr(k, loader).discard_result(); + }).get(); + + BOOST_REQUIRE_EQUAL(load_count, num_loaders); + BOOST_REQUIRE_EQUAL(loading_cache.size(), num_loaders); + loading_cache.stop().get(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_eviction) { + return seastar::async([] { + using namespace std::chrono; + utils::loading_cache loading_cache(num_loaders, 20ms, test_logger); + + prepare().get(); + + loading_cache.get_ptr(0, loader).discard_result().get(); + + BOOST_REQUIRE(loading_cache.find(0) != loading_cache.end()); + + // timers get delayed sometimes (especially in a debug mode) + constexpr int max_retry = 10; + int i = 0; + do_until( + [&] { return i++ > max_retry || loading_cache.find(0) == loading_cache.end(); }, + [] { return sleep(40ms); } + ).get(); + BOOST_REQUIRE(loading_cache.find(0) == loading_cache.end()); + loading_cache.stop().get(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_loading_reloading) { + return seastar::async([] { + using namespace std::chrono; + load_count = 0; + utils::loading_cache loading_cache(num_loaders, 100ms, 20ms, test_logger, loader); + prepare().get(); + loading_cache.get_ptr(0, loader).discard_result().get(); + sleep(60ms).get(); + BOOST_REQUIRE_MESSAGE(load_count >= 2, format("load_count is {}", load_count)); + loading_cache.stop().get(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_max_size_eviction) { + return seastar::async([] { + using namespace std::chrono; + load_count = 0; + utils::loading_cache loading_cache(1, 1s, test_logger); + + prepare().get(); + + for (int i = 0; i < num_loaders; ++i) { + loading_cache.get_ptr(i % 2, loader).discard_result().get(); + } + + BOOST_REQUIRE_EQUAL(load_count, num_loaders); + BOOST_REQUIRE_EQUAL(loading_cache.size(), 1); + loading_cache.stop().get(); + }); +} + +SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) { + return seastar::async([] { + using namespace std::chrono; + load_count = 0; + utils::loading_cache loading_cache(1, 100ms, 10ms, test_logger, loader); + + prepare().get(); + + auto curr_time = lowres_clock::now(); + int i = 0; + + // this will cause reloading when values are being actively evicted due to the limited cache size + do_until( + [&] { return lowres_clock::now() - curr_time > 1s; }, + [&] { return loading_cache.get_ptr(i++ % 2).discard_result(); } + ).get(); + + BOOST_REQUIRE_EQUAL(loading_cache.size(), 1); + loading_cache.stop().get(); + }); +} \ No newline at end of file