Files
scylladb/utils/loading_shared_values.hh
Kefu Chai 7215d4bfe9 utils: do not include unused headers
these unused includes were identifier by clang-include-cleaner. after
auditing these source files, all of the reports have been confirmed.

please note, because quite a few source files relied on
`utils/to_string.hh` to pull in the specialization of
`fmt::formatter<std::optional<T>>`, after removing
`#include <fmt/std.h>` from `utils/to_string.hh`, we have to
include `fmt/std.h` directly.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
2025-01-14 07:56:39 -05:00

370 lines
13 KiB
C++

/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/assert.hh"
#include <vector>
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/bitops.hh>
#include <boost/intrusive/unordered_set.hpp>
#include "seastarx.hh"
namespace bi = boost::intrusive;
namespace utils {
struct do_nothing_loading_shared_values_stats {
static void inc_hits() noexcept {} // Increase the number of times entry was found ready
static void inc_misses() noexcept {} // Increase the number of times entry was not found
static void inc_blocks() noexcept {} // Increase the number of times entry was not ready (>= misses)
static void inc_evictions() noexcept {} // Increase the number of times entry was evicted
};
// Entries stay around as long as there is any live external reference (entry_ptr) to them.
// Supports asynchronous insertion, ensures that only one entry will be loaded.
// InitialBucketsCount is required to be greater than zero. Otherwise a constructor will throw an
// std::invalid_argument exception.
template<typename Key,
typename Tp,
typename Hash = std::hash<Key>,
typename EqualPred = std::equal_to<Key>,
typename Stats = do_nothing_loading_shared_values_stats,
size_t InitialBucketsCount = 16>
requires requires () {
Stats::inc_hits();
Stats::inc_misses();
Stats::inc_blocks();
Stats::inc_evictions();
}
class loading_shared_values {
public:
using key_type = Key;
using value_type = Tp;
static constexpr size_t initial_buckets_count = InitialBucketsCount;
private:
class entry : public bi::unordered_set_base_hook<bi::store_hash<true>>, public enable_lw_shared_from_this<entry> {
private:
loading_shared_values& _parent;
key_type _key;
std::optional<value_type> _val;
shared_promise<> _loaded;
public:
const key_type& key() const noexcept {
return _key;
}
const value_type& value() const noexcept {
return *_val;
}
value_type& value() noexcept {
return *_val;
}
/// \brief "Release" the object from the contained value.
/// After this call the state of the value kept inside this object is undefined and it may no longer be used.
///
/// \return The r-value reference to the value kept inside this object.
value_type&& release() {
return *std::move(_val);
}
void set_value(value_type new_val) {
_val.emplace(std::move(new_val));
}
bool orphaned() const {
return !is_linked();
}
shared_promise<>& loaded() {
return _loaded;
}
bool ready() const noexcept {
return bool(_val);
}
entry(loading_shared_values& parent, key_type k)
: _parent(parent), _key(std::move(k)) {}
~entry() {
if (is_linked()) {
_parent._set.erase(_parent._set.iterator_to(*this));
}
Stats::inc_evictions();
}
friend bool operator==(const entry& a, const entry& b){
return EqualPred()(a.key(), b.key());
}
friend std::size_t hash_value(const entry& v) {
return Hash()(v.key());
}
};
template<typename KeyType, typename KeyEqual>
struct key_eq {
bool operator()(const KeyType& k, const entry& c) const {
return KeyEqual()(k, c.key());
}
bool operator()(const entry& c, const KeyType& k) const {
return KeyEqual()(c.key(), k);
}
};
using set_type = bi::unordered_set<entry, bi::power_2_buckets<true>, bi::compare_hash<true>>;
using bi_set_bucket_traits = typename set_type::bucket_traits;
using set_iterator = typename set_type::iterator;
enum class shrinking_is_allowed { no, yes };
public:
// Pointer to entry value
class entry_ptr {
lw_shared_ptr<entry> _e;
public:
using element_type = value_type;
entry_ptr() = default;
entry_ptr(std::nullptr_t) noexcept : _e() {};
explicit entry_ptr(lw_shared_ptr<entry> e) : _e(std::move(e)) {}
entry_ptr& operator=(std::nullptr_t) noexcept {
_e = nullptr;
return *this;
}
explicit operator bool() const noexcept { return bool(_e); }
bool operator==(const entry_ptr&) const = default;
element_type& operator*() const noexcept { return _e->value(); }
element_type* operator->() const noexcept { return &_e->value(); }
/// \brief Get the wrapped value. Avoid the copy if this is the last reference to this value.
/// If this is the last reference then the wrapped value is going to be std::move()ed. Otherwise it's going to
/// be copied.
/// \return The wrapped value.
element_type release() {
auto res = _e.owned() ? _e->release() : _e->value();
_e = nullptr;
return res;
}
// Returns the key this entry is associated with.
// Valid if bool(*this).
const key_type& key() const {
return _e->key();
}
// Returns true iff the entry is not linked in the set.
// Call only when bool(*this).
bool orphaned() const {
return _e->orphaned();
}
friend class loading_shared_values;
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
return os << ep._e.get();
}
};
private:
std::vector<typename set_type::bucket_type> _buckets;
set_type _set;
public:
static const key_type& to_key(const entry_ptr& e_ptr) noexcept {
return e_ptr._e->key();
}
/// \throw std::invalid_argument if InitialBucketsCount is zero
loading_shared_values()
: _buckets(InitialBucketsCount)
, _set(bi_set_bucket_traits(_buckets.data(), _buckets.size()))
{
static_assert(noexcept(Stats::inc_evictions()), "Stats::inc_evictions must be non-throwing");
static_assert(noexcept(Stats::inc_hits()), "Stats::inc_hits must be non-throwing");
static_assert(noexcept(Stats::inc_misses()), "Stats::inc_misses must be non-throwing");
static_assert(noexcept(Stats::inc_blocks()), "Stats::inc_blocks must be non-throwing");
static_assert(InitialBucketsCount && ((InitialBucketsCount & (InitialBucketsCount - 1)) == 0), "Initial buckets count should be a power of two");
}
loading_shared_values(loading_shared_values&&) = default;
loading_shared_values(const loading_shared_values&) = delete;
~loading_shared_values() {
SCYLLA_ASSERT(!_set.size());
}
/// \brief
/// Returns a future which resolves with a shared pointer to the entry for the given key.
/// Always returns a valid pointer if succeeds.
///
/// If entry is missing, the loader is invoked. If entry is already loading, this invocation
/// will wait for prior loading to complete and use its result when it's done.
///
/// The loader object does not survive deferring, so the caller must deal with its liveness.
template<typename Loader>
requires std::same_as<typename futurize<std::invoke_result_t<Loader, const key_type&>>::type, future<value_type>>
future<entry_ptr> get_or_load(const key_type& key, Loader&& loader) noexcept {
try {
auto i = _set.find(key, Hash(), key_eq<key_type, EqualPred>());
lw_shared_ptr<entry> e;
future<> f = make_ready_future<>();
if (i != _set.end()) {
e = i->shared_from_this();
// take a short cut if the value is ready
if (e->ready()) {
Stats::inc_hits();
return make_ready_future<entry_ptr>(entry_ptr(std::move(e)));
}
f = e->loaded().get_shared_future();
} else {
Stats::inc_misses();
e = make_lw_shared<entry>(*this, key);
rehash_before_insert();
_set.insert(*e);
// get_shared_future() may throw, so make sure to call it before invoking the loader(key)
f = e->loaded().get_shared_future();
// Future indirectly forwarded to `e`.
(void)futurize_invoke([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
if (val_fut.failed()) {
e->loaded().set_exception(val_fut.get_exception());
} else {
e->set_value(val_fut.get());
e->loaded().set_value();
}
});
}
if (!f.available()) {
Stats::inc_blocks();
return f.then([e]() mutable {
return entry_ptr(std::move(e));
});
} else if (f.failed()) {
return make_exception_future<entry_ptr>(std::move(f).get_exception());
} else {
Stats::inc_hits();
return make_ready_future<entry_ptr>(entry_ptr(std::move(e)));
}
} catch (...) {
return make_exception_future<entry_ptr>(std::current_exception());
}
}
/// \brief Try to rehash the container so that the load factor is between 0.25 and 0.75.
/// \throw May throw if allocation of a new buckets array throws.
void rehash() {
rehash<shrinking_is_allowed::yes>(_set.size());
}
size_t buckets_count() const {
return _buckets.size();
}
size_t size() const {
return _set.size();
}
template<typename KeyType, typename KeyHasher, typename KeyEqual>
entry_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it == _set.end() || !it->ready()) {
return entry_ptr();
}
return entry_ptr(it->shared_from_this());
};
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it != _set.end()) {
_set.erase(it);
}
}
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType>
void remove(const KeyType& key) {
remove(key, Hash(), EqualPred());
}
// Removes all values which match a given predicate or are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// Same effects as if remove(e.key()) was called on each matching entry.
template<typename Pred>
requires std::is_invocable_r_v<bool, Pred, const Tp&>
void remove_if(const Pred& pred) {
auto it = _set.begin();
while (it != _set.end()) {
if (!it->ready() || pred(it->value())) {
auto next = std::next(it);
_set.erase(it);
it = next;
} else {
++it;
}
}
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
entry_ptr find(const key_type& key) noexcept {
return find(key, Hash(), EqualPred());
}
private:
void rehash_before_insert() noexcept {
try {
rehash<shrinking_is_allowed::no>(_set.size() + 1);
} catch (...) {
// if rehashing fails - continue with the current buckets array
}
}
template <shrinking_is_allowed ShrinkingIsAllowed>
void rehash(size_t new_size) {
size_t new_buckets_count = 0;
// Try to keep the load factor between 0.25 (when shrinking is allowed) and 0.75.
if (ShrinkingIsAllowed == shrinking_is_allowed::yes && new_size < buckets_count() / 4) {
if (!new_size) {
new_buckets_count = 1;
} else {
new_buckets_count = size_t(1) << log2floor(new_size * 4);
}
} else if (new_size > 3 * buckets_count() / 4) {
new_buckets_count = buckets_count() * 2;
}
if (new_buckets_count < InitialBucketsCount) {
return;
}
std::vector<typename set_type::bucket_type> new_buckets(new_buckets_count);
_set.rehash(bi_set_bucket_traits(new_buckets.data(), new_buckets.size()));
_buckets = std::move(new_buckets);
}
};
}