Merge "Unify inactive readers" from Botond
" Currently inactive readers are stored in two different places: * reader concurrency semaphore * querier cache With the latter registering its inactive readers with the former. This is an unnecessarily complex (and possibly surprising) setup that we want to move away from. This series solves this by moving the responsibility if storing of inactive reads solely to the reader concurrency semaphore, including all supported eviction policies. The querier cache is now only responsible for indexing queriers and maintaining relevant stats. This makes the ownership of the inactive readers much more clear, hopefully making Benny's work on introducing close() and abort() a little bit easier. Tests: unit(release, debug:v1) " * 'unify-inactive-readers/v2' of https://github.com/denesb/scylla: reader_concurrency_semaphore: store inactive readers directly querier_cache: store readers in the reader concurrency semaphore directly querier_cache: retire memory based cache eviction querier_cache: delegate expiry to the reader_concurrency_semaphore reader_concurrency_semaphore: introduce ttl for inactive reads querier_cache: use new eviction notify mechanism to maintain stats reader_concurrency_semaphore: add eviction notification facility reader_concurrency_semaphore: extract evict code into method evict()
This commit is contained in:
@@ -377,7 +377,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(make_compaction_manager(_cfg, dbcfg, as))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _querier_cache(dbcfg.available_memory * 0.04)
|
||||
, _large_data_handler(std::make_unique<db::cql_table_large_data_handler>(_cfg.compaction_large_partition_warning_threshold_mb()*1024*1024,
|
||||
_cfg.compaction_large_row_warning_threshold_mb()*1024*1024,
|
||||
_cfg.compaction_large_cell_warning_threshold_mb()*1024*1024,
|
||||
@@ -590,10 +589,6 @@ database::setup_metrics() {
|
||||
sm::description("Counts querier cache entries that were evicted to free up resources "
|
||||
"(limited by reader concurency limits) necessary to create new readers.")),
|
||||
|
||||
sm::make_derive("querier_cache_memory_based_evictions", _querier_cache.get_stats().memory_based_evictions,
|
||||
sm::description("Counts querier cache entries that were evicted because the memory usage "
|
||||
"of the cached queriers were above the limit.")),
|
||||
|
||||
sm::make_gauge("querier_cache_population", _querier_cache.get_stats().population,
|
||||
sm::description("The number of entries currently in the querier cache.")),
|
||||
|
||||
|
||||
@@ -259,15 +259,6 @@ queriers are evicted from the cache until enough permits are recovered
|
||||
to admit all new readers, or until the cache is empty. Queriers are
|
||||
evicted in LRU order.
|
||||
|
||||
###### Memory based
|
||||
|
||||
To avoid excessive memory usage the size of the querier cache is
|
||||
limited. To avoid crossing this limit, the cumulative size of all the
|
||||
cached queriers is calculated before inserting a new one. If, together
|
||||
with the to-be-added querier, the limit would be crossed, queriers
|
||||
are evicted such that the memory consumption stays below the limit.
|
||||
Queriers are evicted in LRU order.
|
||||
|
||||
#### Diagnostics
|
||||
|
||||
To observe the effectiveness of the caching, as well as aid in finding
|
||||
@@ -290,10 +281,7 @@ any problems a number of counters are added:
|
||||
5. `querier_cache_resource_based_evictions` counts the cached entries
|
||||
that were evicted due to reader-resource (those limited by
|
||||
reader-concurrency limits) shortage.
|
||||
6. `querier_cache_memory_based_evictions` counts the cached entries
|
||||
that were evicted due to reaching the cache's memory limits (currently
|
||||
set to 4% of the shards' memory).
|
||||
7. `querier_cache_querier_population` is the current number of querier
|
||||
6. `querier_cache_querier_population` is the current number of querier
|
||||
entries in the cache.
|
||||
|
||||
Note:
|
||||
|
||||
@@ -1000,24 +1000,6 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
return make_flat_mutation_reader<foreign_reader>(std::move(schema), std::move(permit), std::move(reader), fwd_sm);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class inactive_evictable_reader : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
public:
|
||||
inactive_evictable_reader(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader)) {
|
||||
}
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Encapsulates all data and logic that is local to the remote shard the
|
||||
// reader lives on.
|
||||
class evictable_reader : public flat_mutation_reader::impl {
|
||||
@@ -1100,7 +1082,7 @@ public:
|
||||
|
||||
void evictable_reader::do_pause(flat_mutation_reader reader) {
|
||||
assert(!_irh);
|
||||
_irh = _permit.semaphore().register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
_irh = _permit.semaphore().register_inactive_read(std::move(reader));
|
||||
}
|
||||
|
||||
void evictable_reader::maybe_pause(flat_mutation_reader reader) {
|
||||
@@ -1116,8 +1098,7 @@ flat_mutation_reader_opt evictable_reader::try_resume() {
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
return std::move(*ir_ptr);
|
||||
}
|
||||
|
||||
void evictable_reader::update_next_position(flat_mutation_reader& reader) {
|
||||
@@ -1946,7 +1927,7 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
|
||||
return sem.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
return sem.register_inactive_read(std::move(reader));
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt
|
||||
@@ -1955,8 +1936,7 @@ reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_co
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
return std::move(*ir_ptr);
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
|
||||
188
querier.cc
188
querier.cc
@@ -183,74 +183,55 @@ static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht
|
||||
// The time-to-live of a cache-entry.
|
||||
const std::chrono::seconds querier_cache::default_entry_ttl{10};
|
||||
|
||||
void querier_cache::scan_cache_entries() {
|
||||
const auto now = lowres_clock::now();
|
||||
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end && it->is_expired(now)) {
|
||||
++_stats.time_based_evictions;
|
||||
--_stats.population;
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = _entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
static querier_cache::entries::iterator find_querier(querier_cache::entries& entries, querier_cache::index& index, utils::UUID key,
|
||||
static std::unique_ptr<querier_base> find_querier(querier_cache::index& index, utils::UUID key,
|
||||
dht::partition_ranges_view ranges, tracing::trace_state_ptr trace_state) {
|
||||
const auto queriers = index.equal_range(key);
|
||||
|
||||
if (queriers.first == index.end()) {
|
||||
tracing::trace(trace_state, "Found no cached querier for key {}", key);
|
||||
return entries.end();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::entry& e) {
|
||||
return ranges_match(e.value().schema(), e.value().ranges(), ranges);
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::index::value_type& e) {
|
||||
return ranges_match(e.second->schema(), e.second->ranges(), ranges);
|
||||
});
|
||||
|
||||
if (it == queriers.second) {
|
||||
tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range(s) {}", key, ranges);
|
||||
return entries.end();
|
||||
return nullptr;
|
||||
}
|
||||
tracing::trace(trace_state, "Found cached querier for key {} and range(s) {}", key, ranges);
|
||||
return it->pos();
|
||||
auto ptr = std::move(it->second);
|
||||
index.erase(it);
|
||||
return std::move(ptr);
|
||||
}
|
||||
|
||||
querier_cache::querier_cache(size_t max_cache_size, std::chrono::seconds entry_ttl)
|
||||
: _expiry_timer([this] { scan_cache_entries(); })
|
||||
, _entry_ttl(entry_ttl)
|
||||
, _max_queriers_memory_usage(max_cache_size) {
|
||||
_expiry_timer.arm_periodic(entry_ttl / 2);
|
||||
querier_cache::querier_cache(std::chrono::seconds entry_ttl)
|
||||
: _entry_ttl(entry_ttl) {
|
||||
}
|
||||
|
||||
class querier_inactive_read : public reader_concurrency_semaphore::inactive_read {
|
||||
querier_cache::entries& _entries;
|
||||
querier_cache::entries::iterator _pos;
|
||||
querier_cache::stats& _stats;
|
||||
|
||||
public:
|
||||
querier_inactive_read(querier_cache::entries& entries, querier_cache::entries::iterator pos, querier_cache::stats& stats)
|
||||
: _entries(entries)
|
||||
, _pos(pos)
|
||||
, _stats(stats) {
|
||||
struct querier_utils {
|
||||
static flat_mutation_reader get_reader(querier_base& q) {
|
||||
return std::move(std::get<flat_mutation_reader>(q._reader));
|
||||
}
|
||||
virtual void evict() override {
|
||||
_entries.erase(_pos);
|
||||
++_stats.resource_based_evictions;
|
||||
--_stats.population;
|
||||
static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) {
|
||||
return std::move(std::get<reader_concurrency_semaphore::inactive_read_handle>(q._reader));
|
||||
}
|
||||
static void set_reader(querier_base& q, flat_mutation_reader r) {
|
||||
q._reader = std::move(r);
|
||||
}
|
||||
static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) {
|
||||
q._reader = std::move(h);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Querier>
|
||||
static void insert_querier(
|
||||
querier_cache::entries& entries,
|
||||
utils::UUID key,
|
||||
querier_cache::index& index,
|
||||
querier_cache::stats& stats,
|
||||
size_t max_queriers_memory_usage,
|
||||
utils::UUID key,
|
||||
Querier&& q,
|
||||
lowres_clock::time_point expires,
|
||||
std::chrono::seconds ttl,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
// FIXME: see #3159
|
||||
// In reverse mode flat_mutation_reader drops any remaining rows of the
|
||||
@@ -264,57 +245,46 @@ static void insert_querier(
|
||||
|
||||
tracing::trace(trace_state, "Caching querier with key {}", key);
|
||||
|
||||
auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(
|
||||
[] (const querier_cache::entry& e) { return e.value().memory_usage(); }), size_t(0));
|
||||
|
||||
// We add the memory-usage of the to-be added querier to the memory-usage
|
||||
// of all the cached queriers. We now need to makes sure this number is
|
||||
// smaller then the maximum allowed memory usage. If it isn't we evict
|
||||
// cached queriers and substract their memory usage from this number until
|
||||
// it goes below the limit.
|
||||
memory_usage += q.memory_usage();
|
||||
|
||||
if (memory_usage >= max_queriers_memory_usage) {
|
||||
auto it = entries.begin();
|
||||
while (it != entries.end() && memory_usage >= max_queriers_memory_usage) {
|
||||
memory_usage -= it->value().memory_usage();
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = entries.erase(it);
|
||||
--stats.population;
|
||||
++stats.memory_based_evictions;
|
||||
}
|
||||
}
|
||||
|
||||
auto& sem = q.permit().semaphore();
|
||||
|
||||
auto& e = entries.emplace_back(key, std::move(q), expires);
|
||||
e.set_pos(--entries.end());
|
||||
auto it = index.emplace(key, std::make_unique<Querier>(std::move(q)));
|
||||
|
||||
++stats.population;
|
||||
|
||||
if (auto irh = sem.register_inactive_read(std::make_unique<querier_inactive_read>(entries, e.pos(), stats))) {
|
||||
e.set_inactive_handle(std::move(irh));
|
||||
index.insert(e);
|
||||
auto notify_handler = [&stats, &index, it] (reader_concurrency_semaphore::evict_reason reason) {
|
||||
index.erase(it);
|
||||
switch (reason) {
|
||||
case reader_concurrency_semaphore::evict_reason::permit:
|
||||
++stats.resource_based_evictions;
|
||||
break;
|
||||
case reader_concurrency_semaphore::evict_reason::time:
|
||||
++stats.time_based_evictions;
|
||||
break;
|
||||
case reader_concurrency_semaphore::evict_reason::manual:
|
||||
break;
|
||||
}
|
||||
--stats.population;
|
||||
};
|
||||
|
||||
if (auto irh = sem.register_inactive_read(querier_utils::get_reader(*it->second), ttl, std::move(notify_handler))) {
|
||||
querier_utils::set_inactive_read_handle(*it->second, std::move(irh));
|
||||
}
|
||||
}
|
||||
|
||||
void querier_cache::insert(utils::UUID key, data_querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
insert_querier(_entries, _data_querier_index, _stats, _max_queriers_memory_usage, key, std::move(q), lowres_clock::now() + _entry_ttl,
|
||||
std::move(trace_state));
|
||||
insert_querier(key, _data_querier_index, _stats, std::move(q), _entry_ttl, std::move(trace_state));
|
||||
}
|
||||
|
||||
void querier_cache::insert(utils::UUID key, mutation_querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
insert_querier(_entries, _mutation_querier_index, _stats, _max_queriers_memory_usage, key, std::move(q), lowres_clock::now() + _entry_ttl,
|
||||
std::move(trace_state));
|
||||
insert_querier(key, _mutation_querier_index, _stats, std::move(q), _entry_ttl, std::move(trace_state));
|
||||
}
|
||||
|
||||
void querier_cache::insert(utils::UUID key, shard_mutation_querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
insert_querier(_entries, _shard_mutation_querier_index, _stats, _max_queriers_memory_usage, key, std::move(q), lowres_clock::now() + _entry_ttl,
|
||||
std::move(trace_state));
|
||||
insert_querier(key, _shard_mutation_querier_index, _stats, std::move(q), _entry_ttl, std::move(trace_state));
|
||||
}
|
||||
|
||||
template <typename Querier>
|
||||
static std::optional<Querier> lookup_querier(
|
||||
querier_cache::entries& entries,
|
||||
querier_cache::index& index,
|
||||
querier_cache::stats& stats,
|
||||
utils::UUID key,
|
||||
@@ -322,20 +292,23 @@ static std::optional<Querier> lookup_querier(
|
||||
dht::partition_ranges_view ranges,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto it = find_querier(entries, index, key, ranges, trace_state);
|
||||
auto base_ptr = find_querier(index, key, ranges, trace_state);
|
||||
++stats.lookups;
|
||||
if (it == entries.end()) {
|
||||
if (!base_ptr) {
|
||||
++stats.misses;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto* q_ptr = dynamic_cast<Querier*>(&it->value());
|
||||
auto* q_ptr = dynamic_cast<Querier*>(base_ptr.get());
|
||||
if (!q_ptr) {
|
||||
throw std::runtime_error("lookup_querier(): found querier is not of the expected type");
|
||||
}
|
||||
auto q = std::move(*q_ptr);
|
||||
q.permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
entries.erase(it);
|
||||
auto& q = *q_ptr;
|
||||
auto read_ptr = q.permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(q));
|
||||
if (!read_ptr) {
|
||||
throw std::runtime_error("lookup_querier(): found querier that is evicted");
|
||||
}
|
||||
querier_utils::set_reader(q, std::move(*read_ptr.get()));
|
||||
--stats.population;
|
||||
|
||||
const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice);
|
||||
@@ -354,7 +327,7 @@ std::optional<data_querier> querier_cache::lookup_data_querier(utils::UUID key,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return lookup_querier<data_querier>(_entries, _data_querier_index, _stats, key, s, range, slice, std::move(trace_state));
|
||||
return lookup_querier<data_querier>(_data_querier_index, _stats, key, s, range, slice, std::move(trace_state));
|
||||
}
|
||||
|
||||
std::optional<mutation_querier> querier_cache::lookup_mutation_querier(utils::UUID key,
|
||||
@@ -362,7 +335,7 @@ std::optional<mutation_querier> querier_cache::lookup_mutation_querier(utils::UU
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return lookup_querier<mutation_querier>(_entries, _mutation_querier_index, _stats, key, s, range, slice, std::move(trace_state));
|
||||
return lookup_querier<mutation_querier>(_mutation_querier_index, _stats, key, s, range, slice, std::move(trace_state));
|
||||
}
|
||||
|
||||
std::optional<shard_mutation_querier> querier_cache::lookup_shard_mutation_querier(utils::UUID key,
|
||||
@@ -370,41 +343,46 @@ std::optional<shard_mutation_querier> querier_cache::lookup_shard_mutation_queri
|
||||
const dht::partition_range_vector& ranges,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return lookup_querier<shard_mutation_querier>(_entries, _shard_mutation_querier_index, _stats, key, s, ranges, slice,
|
||||
return lookup_querier<shard_mutation_querier>(_shard_mutation_querier_index, _stats, key, s, ranges, slice,
|
||||
std::move(trace_state));
|
||||
}
|
||||
|
||||
void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) {
|
||||
_entry_ttl = entry_ttl;
|
||||
_expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2);
|
||||
}
|
||||
|
||||
bool querier_cache::evict_one() {
|
||||
if (_entries.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto maybe_evict_from_index = [this] (index& idx) -> bool {
|
||||
if (idx.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto it = idx.begin();
|
||||
it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
|
||||
idx.erase(it);
|
||||
++_stats.resource_based_evictions;
|
||||
--_stats.population;
|
||||
return true;
|
||||
};
|
||||
|
||||
++_stats.resource_based_evictions;
|
||||
--_stats.population;
|
||||
auto& sem = _entries.front().value().permit().semaphore();
|
||||
sem.unregister_inactive_read(std::move(_entries.front()).get_inactive_handle());
|
||||
_entries.pop_front();
|
||||
|
||||
return true;
|
||||
return maybe_evict_from_index(_data_querier_index) || maybe_evict_from_index(_mutation_querier_index) || maybe_evict_from_index(_shard_mutation_querier_index);
|
||||
}
|
||||
|
||||
void querier_cache::evict_all_for_table(const utils::UUID& schema_id) {
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end) {
|
||||
if (it->value().schema().id() == schema_id) {
|
||||
--_stats.population;
|
||||
it->value().permit().semaphore().unregister_inactive_read(std::move(*it).get_inactive_handle());
|
||||
it = _entries.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
auto evict_from_index = [this, schema_id] (index& idx) {
|
||||
for (auto it = idx.begin(); it != idx.end();) {
|
||||
if (it->second->schema().id() == schema_id) {
|
||||
it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
|
||||
it = idx.erase(it);
|
||||
--_stats.population;
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
evict_from_index(_data_querier_index);
|
||||
evict_from_index(_mutation_querier_index);
|
||||
evict_from_index(_shard_mutation_querier_index);
|
||||
}
|
||||
|
||||
querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, query::is_first_page is_first_page)
|
||||
|
||||
81
querier.hh
81
querier.hh
@@ -117,12 +117,14 @@ struct position_view {
|
||||
};
|
||||
|
||||
class querier_base {
|
||||
friend class querier_utils;
|
||||
|
||||
protected:
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::unique_ptr<const dht::partition_range> _range;
|
||||
std::unique_ptr<const query::partition_slice> _slice;
|
||||
flat_mutation_reader _reader;
|
||||
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
|
||||
dht::partition_ranges_view _query_ranges;
|
||||
|
||||
public:
|
||||
@@ -224,8 +226,8 @@ public:
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::max_result_size max_size) {
|
||||
return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time,
|
||||
timeout, max_size).then([this] (auto&& results) {
|
||||
return ::query::consume_page(std::get<flat_mutation_reader>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
|
||||
partition_limit, query_time, timeout, max_size).then([this] (auto&& results) {
|
||||
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
|
||||
constexpr auto size = std::tuple_size<std::decay_t<decltype(results)>>::value;
|
||||
static_assert(size <= 2);
|
||||
@@ -304,7 +306,7 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(_reader);
|
||||
return std::move(std::get<flat_mutation_reader>(_reader));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -352,85 +354,21 @@ public:
|
||||
// The number of queriers evicted to free up resources to be able to
|
||||
// create new readers.
|
||||
uint64_t resource_based_evictions = 0;
|
||||
// The number of queriers evicted to because the maximum memory usage
|
||||
// was reached.
|
||||
uint64_t memory_based_evictions = 0;
|
||||
// The number of queriers currently in the cache.
|
||||
uint64_t population = 0;
|
||||
};
|
||||
|
||||
class entry : public boost::intrusive::set_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
// Self reference so that we can remove the entry given an `entry&`.
|
||||
std::list<entry>::iterator _pos;
|
||||
const utils::UUID _key;
|
||||
const lowres_clock::time_point _expires;
|
||||
std::unique_ptr<querier_base> _value;
|
||||
reader_concurrency_semaphore::inactive_read_handle _handle;
|
||||
|
||||
public:
|
||||
template <typename Querier>
|
||||
entry(utils::UUID key, Querier q, lowres_clock::time_point expires)
|
||||
: _key(key)
|
||||
, _expires(expires)
|
||||
, _value(std::make_unique<Querier>(std::move(q))) {
|
||||
}
|
||||
|
||||
std::list<entry>::iterator pos() const {
|
||||
return _pos;
|
||||
}
|
||||
|
||||
void set_pos(std::list<entry>::iterator pos) {
|
||||
_pos = pos;
|
||||
}
|
||||
|
||||
void set_inactive_handle(reader_concurrency_semaphore::inactive_read_handle handle) {
|
||||
_handle = std::move(handle);
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle get_inactive_handle() && {
|
||||
return std::move(_handle);
|
||||
}
|
||||
|
||||
const utils::UUID& key() const {
|
||||
return _key;
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return _expires <= now;
|
||||
}
|
||||
|
||||
const querier_base& value() const {
|
||||
return *_value;
|
||||
}
|
||||
|
||||
querier_base& value() {
|
||||
return *_value;
|
||||
}
|
||||
};
|
||||
|
||||
struct key_of_entry {
|
||||
using type = utils::UUID;
|
||||
const type& operator()(const entry& e) { return e.key(); }
|
||||
};
|
||||
|
||||
using entries = std::list<entry>;
|
||||
using index = boost::intrusive::multiset<entry, boost::intrusive::key_of_value<key_of_entry>,
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
using index = std::unordered_multimap<utils::UUID, std::unique_ptr<querier_base>>;
|
||||
|
||||
private:
|
||||
entries _entries;
|
||||
index _data_querier_index;
|
||||
index _mutation_querier_index;
|
||||
index _shard_mutation_querier_index;
|
||||
timer<lowres_clock> _expiry_timer;
|
||||
std::chrono::seconds _entry_ttl;
|
||||
stats _stats;
|
||||
size_t _max_queriers_memory_usage;
|
||||
|
||||
void scan_cache_entries();
|
||||
|
||||
public:
|
||||
explicit querier_cache(size_t max_cache_size = 1'000'000, std::chrono::seconds entry_ttl = default_entry_ttl);
|
||||
explicit querier_cache(std::chrono::seconds entry_ttl = default_entry_ttl);
|
||||
|
||||
querier_cache(const querier_cache&) = delete;
|
||||
querier_cache& operator=(const querier_cache&) = delete;
|
||||
@@ -482,6 +420,9 @@ public:
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state);
|
||||
|
||||
/// Change the ttl of cache entries
|
||||
///
|
||||
/// Applies only to entries inserted after the change.
|
||||
void set_entry_ttl(std::chrono::seconds entry_ttl);
|
||||
|
||||
/// Evict a querier.
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "utils/exceptions.hh"
|
||||
#include "schema.hh"
|
||||
#include "utils/human_readable.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
|
||||
logger rcslog("reader_concurrency_semaphore");
|
||||
|
||||
@@ -338,6 +339,13 @@ void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept
|
||||
maybe_dump_reader_permit_diagnostics(_semaphore, *_semaphore._permit_list, "timed out");
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read::inactive_read(flat_mutation_reader reader)
|
||||
: reader(std::make_unique<flat_mutation_reader>(std::move(reader))) {
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read::~inactive_read() {
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
|
||||
@@ -372,24 +380,40 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
broken(std::make_exception_ptr(broken_semaphore{}));
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(std::unique_ptr<inactive_read> ir) {
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader,
|
||||
eviction_notify_handler notify_handler) {
|
||||
return register_inactive_read(std::move(reader), std::chrono::duration_values<std::chrono::seconds>::max(), std::move(notify_handler));
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader reader,
|
||||
std::chrono::seconds ttl, eviction_notify_handler notify_handler) {
|
||||
// Implies _inactive_reads.empty(), we don't queue new readers before
|
||||
// evicting all inactive reads.
|
||||
if (_wait_list.empty()) {
|
||||
inactive_read ir(std::move(reader));
|
||||
ir.notify_handler = std::move(notify_handler);
|
||||
const auto [it, _] = _inactive_reads.emplace(_next_id++, std::move(ir));
|
||||
(void)_;
|
||||
if (ttl != std::chrono::duration_values<std::chrono::seconds>::max()) {
|
||||
it->second.ttl_timer.emplace([this, it = it] {
|
||||
evict(it, evict_reason::time);
|
||||
});
|
||||
it->second.ttl_timer->arm(lowres_clock::now() + ttl);
|
||||
}
|
||||
++_stats.inactive_reads;
|
||||
return inactive_read_handle(*this, it->first);
|
||||
}
|
||||
|
||||
// The evicted reader will release its permit, hopefully allowing us to
|
||||
// admit some readers from the _wait_list.
|
||||
ir->evict();
|
||||
if (notify_handler) {
|
||||
notify_handler(evict_reason::permit);
|
||||
}
|
||||
++_stats.permit_based_evictions;
|
||||
return inactive_read_handle();
|
||||
}
|
||||
|
||||
std::unique_ptr<reader_concurrency_semaphore::inactive_read> reader_concurrency_semaphore::unregister_inactive_read(inactive_read_handle irh) {
|
||||
std::unique_ptr<flat_mutation_reader> reader_concurrency_semaphore::unregister_inactive_read(inactive_read_handle irh) {
|
||||
if (irh && irh._sem != this) {
|
||||
throw std::runtime_error(fmt::format(
|
||||
"reader_concurrency_semaphore::unregister_inactive_read(): "
|
||||
@@ -405,7 +429,7 @@ std::unique_ptr<reader_concurrency_semaphore::inactive_read> reader_concurrency_
|
||||
auto ir = std::move(it->second);
|
||||
_inactive_reads.erase(it);
|
||||
--_stats.inactive_reads;
|
||||
return ir;
|
||||
return std::move(ir.reader);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@@ -414,16 +438,29 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read() {
|
||||
if (_inactive_reads.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto it = _inactive_reads.begin();
|
||||
it->second->evict();
|
||||
_inactive_reads.erase(it);
|
||||
|
||||
++_stats.permit_based_evictions;
|
||||
--_stats.inactive_reads;
|
||||
|
||||
evict(_inactive_reads.begin(), evict_reason::manual);
|
||||
return true;
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_reads_type::iterator reader_concurrency_semaphore::evict(inactive_reads_type::iterator it, evict_reason reason) {
|
||||
auto ir = std::move(it->second);
|
||||
if (ir.notify_handler) {
|
||||
ir.notify_handler(reason);
|
||||
}
|
||||
switch (reason) {
|
||||
case evict_reason::permit:
|
||||
++_stats.permit_based_evictions;
|
||||
break;
|
||||
case evict_reason::time:
|
||||
++_stats.time_based_evictions;
|
||||
break;
|
||||
case evict_reason::manual:
|
||||
break;
|
||||
}
|
||||
--_stats.inactive_reads;
|
||||
return _inactive_reads.erase(it);
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::has_available_units(const resources& r) const {
|
||||
return bool(_resources) && _resources >= r;
|
||||
}
|
||||
@@ -449,12 +486,7 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
|
||||
auto r = resources(1, static_cast<ssize_t>(memory));
|
||||
auto it = _inactive_reads.begin();
|
||||
while (!may_proceed(r) && it != _inactive_reads.end()) {
|
||||
auto ir = std::move(it->second);
|
||||
it = _inactive_reads.erase(it);
|
||||
ir->evict();
|
||||
|
||||
++_stats.permit_based_evictions;
|
||||
--_stats.inactive_reads;
|
||||
it = evict(it, evict_reason::permit);
|
||||
}
|
||||
if (may_proceed(r)) {
|
||||
permit.on_admission();
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
class flat_mutation_reader;
|
||||
|
||||
/// Specific semaphore for controlling reader concurrency
|
||||
///
|
||||
/// Use `make_permit()` to create a permit to track the resource consumption
|
||||
@@ -53,12 +55,14 @@ public:
|
||||
|
||||
friend class reader_permit;
|
||||
|
||||
class inactive_read {
|
||||
public:
|
||||
virtual void evict() = 0;
|
||||
virtual ~inactive_read() = default;
|
||||
enum class evict_reason {
|
||||
permit, // evicted due to permit shortage
|
||||
time, // evicted due to expiring ttl
|
||||
manual, // evicted manually via `try_evict_one_inactive_read()`
|
||||
};
|
||||
|
||||
using eviction_notify_handler = noncopyable_function<void(evict_reason)>;
|
||||
|
||||
class inactive_read_handle {
|
||||
reader_concurrency_semaphore* _sem = nullptr;
|
||||
uint64_t _id = 0;
|
||||
@@ -85,6 +89,8 @@ public:
|
||||
struct stats {
|
||||
// The number of inactive reads evicted to free up permits.
|
||||
uint64_t permit_based_evictions = 0;
|
||||
// The number of inactive reads evicted due to expiring.
|
||||
uint64_t time_based_evictions = 0;
|
||||
// The number of inactive reads currently registered.
|
||||
uint64_t inactive_reads = 0;
|
||||
// Total number of successful reads executed through this semaphore.
|
||||
@@ -114,6 +120,18 @@ private:
|
||||
void operator()(entry& e) noexcept;
|
||||
};
|
||||
|
||||
struct inactive_read {
|
||||
std::unique_ptr<flat_mutation_reader> reader;
|
||||
eviction_notify_handler notify_handler;
|
||||
std::optional<timer<lowres_clock>> ttl_timer;
|
||||
|
||||
explicit inactive_read(flat_mutation_reader);
|
||||
inactive_read(inactive_read&&) = default;
|
||||
~inactive_read();
|
||||
};
|
||||
|
||||
using inactive_reads_type = std::map<uint64_t, inactive_read>;
|
||||
|
||||
private:
|
||||
const resources _initial_resources;
|
||||
resources _resources;
|
||||
@@ -124,11 +142,13 @@ private:
|
||||
size_t _max_queue_length = std::numeric_limits<size_t>::max();
|
||||
std::function<void()> _prethrow_action;
|
||||
uint64_t _next_id = 1;
|
||||
std::map<uint64_t, std::unique_ptr<inactive_read>> _inactive_reads;
|
||||
inactive_reads_type _inactive_reads;
|
||||
stats _stats;
|
||||
std::unique_ptr<permit_list> _permit_list;
|
||||
|
||||
private:
|
||||
inactive_reads_type::iterator evict(inactive_reads_type::iterator it, evict_reason reason);
|
||||
|
||||
bool has_available_units(const resources& r) const;
|
||||
|
||||
bool may_proceed(const resources& r) const;
|
||||
@@ -175,13 +195,14 @@ public:
|
||||
/// interface.
|
||||
/// The semaphore takes ownership of the created object and destroys it if
|
||||
/// it is evicted.
|
||||
inactive_read_handle register_inactive_read(std::unique_ptr<inactive_read> ir);
|
||||
inactive_read_handle register_inactive_read(flat_mutation_reader ir, eviction_notify_handler handler = {});
|
||||
inactive_read_handle register_inactive_read(flat_mutation_reader ir, std::chrono::seconds ttl, eviction_notify_handler handler = {});
|
||||
|
||||
/// Unregister the previously registered inactive read.
|
||||
///
|
||||
/// If the read was not evicted, the inactive read object, passed in to the
|
||||
/// register call, will be returned. Otherwise a nullptr is returned.
|
||||
std::unique_ptr<inactive_read> unregister_inactive_read(inactive_read_handle irh);
|
||||
std::unique_ptr<flat_mutation_reader> unregister_inactive_read(inactive_read_handle irh);
|
||||
|
||||
/// Try to evict an inactive read.
|
||||
///
|
||||
|
||||
@@ -262,7 +262,6 @@ SEASTAR_THREAD_TEST_CASE(test_read_all) {
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::misses), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::memory_based_evictions), 0u);
|
||||
|
||||
require_eventually_empty_caches(env.db());
|
||||
|
||||
@@ -308,7 +307,6 @@ SEASTAR_THREAD_TEST_CASE(test_evict_a_shard_reader_on_each_page) {
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::drops), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::time_based_evictions), 0u);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::resource_based_evictions), npages);
|
||||
tests::require_equal(aggregate_querier_cache_stat(env.db(), &query::querier_cache::stats::memory_based_evictions), 0u);
|
||||
|
||||
require_eventually_empty_caches(env.db());
|
||||
|
||||
|
||||
@@ -154,15 +154,14 @@ public:
|
||||
dht::partition_range original_range;
|
||||
query::partition_slice original_slice;
|
||||
uint64_t row_limit;
|
||||
size_t memory_usage;
|
||||
|
||||
dht::partition_range expected_range;
|
||||
query::partition_slice expected_slice;
|
||||
};
|
||||
|
||||
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h, size_t cache_size = 100000)
|
||||
test_querier_cache(const noncopyable_function<sstring(size_t)>& external_make_value, std::chrono::seconds entry_ttl = 24h)
|
||||
: _sem(reader_concurrency_semaphore::no_limits{})
|
||||
, _cache(cache_size, entry_ttl)
|
||||
, _cache(entry_ttl)
|
||||
, _mutations(make_mutations(_s, external_make_value))
|
||||
, _mutation_source([this] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
auto rd = flat_mutation_reader_from_mutations(std::move(permit), _mutations, range);
|
||||
@@ -218,7 +217,6 @@ public:
|
||||
gc_clock::now(), db::no_timeout, query::max_result_size(std::numeric_limits<uint64_t>::max())).get0();
|
||||
auto&& dk = dk_ck.first;
|
||||
auto&& ck = dk_ck.second;
|
||||
const auto memory_usage = querier.memory_usage();
|
||||
_cache.insert(cache_key, std::move(querier), nullptr);
|
||||
|
||||
// Either no keys at all (nothing read) or at least partition key.
|
||||
@@ -254,7 +252,7 @@ public:
|
||||
return expected_slice;
|
||||
}();
|
||||
|
||||
return {key, std::move(range), std::move(slice), row_limit, memory_usage, std::move(expected_range), std::move(expected_slice)};
|
||||
return {key, std::move(range), std::move(slice), row_limit, std::move(expected_range), std::move(expected_slice)};
|
||||
}
|
||||
|
||||
entry_info produce_first_page_and_save_data_querier(unsigned key, const dht::partition_range& range,
|
||||
@@ -353,28 +351,18 @@ public:
|
||||
test_querier_cache& no_evictions() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions);
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& time_based_evictions() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, ++_expected_stats.time_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions);
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& resource_based_evictions() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, ++_expected_stats.resource_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, _expected_stats.memory_based_evictions);
|
||||
return *this;
|
||||
}
|
||||
|
||||
test_querier_cache& memory_based_evictions() {
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().time_based_evictions, _expected_stats.time_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().resource_based_evictions, _expected_stats.resource_based_evictions);
|
||||
BOOST_REQUIRE_EQUAL(_cache.get_stats().memory_based_evictions, ++_expected_stats.memory_based_evictions);
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
@@ -589,38 +577,6 @@ sstring make_string_blob(size_t size) {
|
||||
return s;
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_memory_based_cache_eviction) {
|
||||
auto cache_size = memory::stats().total_memory() * 0.04;
|
||||
test_querier_cache t([] (size_t) {
|
||||
const size_t blob_size = 1 << 1; // 1K
|
||||
return make_string_blob(blob_size);
|
||||
}, 24h, cache_size);
|
||||
|
||||
size_t i = 0;
|
||||
const auto entry = t.produce_first_page_and_save_data_querier(i++);
|
||||
|
||||
const size_t queriers_needed_to_fill_cache = floor(cache_size / entry.memory_usage);
|
||||
|
||||
// Fill the cache but don't overflow.
|
||||
for (; i < queriers_needed_to_fill_cache; ++i) {
|
||||
t.produce_first_page_and_save_data_querier(i);
|
||||
}
|
||||
|
||||
const auto pop_before = t.get_semaphore().get_stats().inactive_reads;
|
||||
|
||||
// Should overflow the limit and trigger the eviction of the oldest entry.
|
||||
t.produce_first_page_and_save_data_querier(queriers_needed_to_fill_cache);
|
||||
|
||||
t.assert_cache_lookup_data_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
|
||||
.misses()
|
||||
.no_drops()
|
||||
.memory_based_evictions();
|
||||
|
||||
// Since the last insert should have evicted an existing entry, we should
|
||||
// have the same number of registered inactive reads.
|
||||
BOOST_REQUIRE_EQUAL(t.get_semaphore().get_stats().inactive_reads, pop_before);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
@@ -775,22 +731,17 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
fut.get();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class inactive_read : public reader_concurrency_semaphore::inactive_read {
|
||||
public:
|
||||
virtual void evict() override {
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
|
||||
reader_concurrency_semaphore sem1(reader_concurrency_semaphore::no_limits{}, "sem1");
|
||||
reader_concurrency_semaphore sem2(reader_concurrency_semaphore::no_limits{}, ""); // to see the message for an unnamed semaphore
|
||||
|
||||
auto sem1_h1 = sem1.register_inactive_read(std::make_unique<inactive_read>());
|
||||
auto sem2_h1 = sem2.register_inactive_read(std::make_unique<inactive_read>());
|
||||
auto schema = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("v", int32_type)
|
||||
.build();
|
||||
|
||||
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader(schema, sem1.make_permit(schema.get(), get_name())));
|
||||
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader(schema, sem2.make_permit(schema.get(), get_name())));
|
||||
|
||||
// Sanity check that lookup still works with empty handle.
|
||||
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
|
||||
|
||||
Reference in New Issue
Block a user