querier_cache: move insert/lookup related logic into free functions
In preparations for introducing support multiple entry types in the querier_cache move all insert/lookup related logic into free functions. Later these functions will be templated so they can handle multiple entry types with the same code.
This commit is contained in:
72
querier.cc
72
querier.cc
@@ -160,21 +160,22 @@ void querier_cache::scan_cache_entries() {
|
||||
}
|
||||
}
|
||||
|
||||
querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) {
|
||||
const auto queriers = _index.equal_range(key);
|
||||
static querier_cache::entries::iterator find_querier(querier_cache::entries& entries, querier_cache::index& index, utils::UUID key,
|
||||
const dht::partition_range& range, tracing::trace_state_ptr trace_state) {
|
||||
const auto queriers = index.equal_range(key);
|
||||
|
||||
if (queriers.first == _index.end()) {
|
||||
if (queriers.first == index.end()) {
|
||||
tracing::trace(trace_state, "Found no cached querier for key {}", key);
|
||||
return _entries.end();
|
||||
return entries.end();
|
||||
}
|
||||
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const entry& e) {
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::entry& e) {
|
||||
return e.value().matches(range);
|
||||
});
|
||||
|
||||
if (it == queriers.second) {
|
||||
tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range {}", key, range);
|
||||
return _entries.end();
|
||||
return entries.end();
|
||||
}
|
||||
tracing::trace(trace_state, "Found cached querier for key {} and range {}", key, range);
|
||||
return it->pos();
|
||||
@@ -187,7 +188,8 @@ querier_cache::querier_cache(size_t max_cache_size, std::chrono::seconds entry_t
|
||||
_expiry_timer.arm_periodic(entry_ttl / 2);
|
||||
}
|
||||
|
||||
void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
static void insert_querier(querier_cache::entries& entries, querier_cache::index& index, querier_cache::stats& stats,
|
||||
size_t max_queriers_memory_usage, utils::UUID key, querier&& q, lowres_clock::time_point expires, tracing::trace_state_ptr trace_state) {
|
||||
// FIXME: see #3159
|
||||
// In reverse mode flat_mutation_reader drops any remaining rows of the
|
||||
// current partition when the page ends so it cannot be reused across
|
||||
@@ -198,7 +200,7 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt
|
||||
|
||||
tracing::trace(trace_state, "Caching querier with key {}", key);
|
||||
|
||||
auto memory_usage = boost::accumulate(_entries | boost::adaptors::transformed(std::mem_fn(&entry::memory_usage)), size_t(0));
|
||||
auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0));
|
||||
|
||||
// We add the memory-usage of the to-be added querier to the memory-usage
|
||||
// of all the cached queriers. We now need to makes sure this number is
|
||||
@@ -207,39 +209,46 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt
|
||||
// it goes below the limit.
|
||||
memory_usage += q.memory_usage();
|
||||
|
||||
if (memory_usage >= _max_queriers_memory_usage) {
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end && memory_usage >= _max_queriers_memory_usage) {
|
||||
++_stats.memory_based_evictions;
|
||||
if (memory_usage >= max_queriers_memory_usage) {
|
||||
auto it = entries.begin();
|
||||
const auto end = entries.end();
|
||||
while (it != end && memory_usage >= max_queriers_memory_usage) {
|
||||
++stats.memory_based_evictions;
|
||||
memory_usage -= it->memory_usage();
|
||||
--_stats.population;
|
||||
it = _entries.erase(it);
|
||||
--stats.population;
|
||||
it = entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
auto& e = _entries.emplace_back(key, std::move(q), lowres_clock::now() + _entry_ttl);
|
||||
e.set_pos(--_entries.end());
|
||||
_index.insert(e);
|
||||
++_stats.population;
|
||||
auto& e = entries.emplace_back(key, std::move(q), expires);
|
||||
e.set_pos(--entries.end());
|
||||
index.insert(e);
|
||||
++stats.population;
|
||||
}
|
||||
|
||||
std::optional<querier> querier_cache::lookup(utils::UUID key,
|
||||
void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) {
|
||||
insert_querier(_entries, _index, _stats, _max_queriers_memory_usage, key, std::move(q), lowres_clock::now() + _entry_ttl, std::move(trace_state));
|
||||
}
|
||||
|
||||
static std::optional<querier> lookup_querier(querier_cache::entries& entries,
|
||||
querier_cache::index& index,
|
||||
querier_cache::stats& stats,
|
||||
utils::UUID key,
|
||||
emit_only_live_rows only_live,
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto it = find_querier(key, range, trace_state);
|
||||
++_stats.lookups;
|
||||
if (it == _entries.end()) {
|
||||
++_stats.misses;
|
||||
auto it = find_querier(entries, index, key, range, trace_state);
|
||||
++stats.lookups;
|
||||
if (it == entries.end()) {
|
||||
++stats.misses;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto q = std::move(*it).value();
|
||||
_entries.erase(it);
|
||||
--_stats.population;
|
||||
entries.erase(it);
|
||||
--stats.population;
|
||||
|
||||
const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice);
|
||||
if (can_be_used == querier::can_use::yes) {
|
||||
@@ -248,10 +257,19 @@ std::optional<querier> querier_cache::lookup(utils::UUID key,
|
||||
}
|
||||
|
||||
tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used));
|
||||
++_stats.drops;
|
||||
++stats.drops;
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<querier> querier_cache::lookup(utils::UUID key,
|
||||
emit_only_live_rows only_live,
|
||||
const schema& s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return lookup_querier(_entries, _index, _stats, key, only_live, s, range, slice, std::move(trace_state));
|
||||
}
|
||||
|
||||
void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) {
|
||||
_entry_ttl = entry_ttl;
|
||||
_expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2);
|
||||
|
||||
@@ -262,7 +262,6 @@ public:
|
||||
uint64_t population = 0;
|
||||
};
|
||||
|
||||
private:
|
||||
class entry : public boost::intrusive::set_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
// Self reference so that we can remove the entry given an `entry&`.
|
||||
std::list<entry>::iterator _pos;
|
||||
@@ -327,8 +326,6 @@ private:
|
||||
stats _stats;
|
||||
size_t _max_queriers_memory_usage;
|
||||
|
||||
entries::iterator find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state);
|
||||
|
||||
void scan_cache_entries();
|
||||
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user