diff --git a/querier.cc b/querier.cc index 7d1c006ea9..fec2ee3cf3 100644 --- a/querier.cc +++ b/querier.cc @@ -160,21 +160,22 @@ void querier_cache::scan_cache_entries() { } } -querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) { - const auto queriers = _index.equal_range(key); +static querier_cache::entries::iterator find_querier(querier_cache::entries& entries, querier_cache::index& index, utils::UUID key, + const dht::partition_range& range, tracing::trace_state_ptr trace_state) { + const auto queriers = index.equal_range(key); - if (queriers.first == _index.end()) { + if (queriers.first == index.end()) { tracing::trace(trace_state, "Found no cached querier for key {}", key); - return _entries.end(); + return entries.end(); } - const auto it = std::find_if(queriers.first, queriers.second, [&] (const entry& e) { + const auto it = std::find_if(queriers.first, queriers.second, [&] (const querier_cache::entry& e) { return e.value().matches(range); }); if (it == queriers.second) { tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range {}", key, range); - return _entries.end(); + return entries.end(); } tracing::trace(trace_state, "Found cached querier for key {} and range {}", key, range); return it->pos(); @@ -187,7 +188,8 @@ querier_cache::querier_cache(size_t max_cache_size, std::chrono::seconds entry_t _expiry_timer.arm_periodic(entry_ttl / 2); } -void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) { +static void insert_querier(querier_cache::entries& entries, querier_cache::index& index, querier_cache::stats& stats, + size_t max_queriers_memory_usage, utils::UUID key, querier&& q, lowres_clock::time_point expires, tracing::trace_state_ptr trace_state) { // FIXME: see #3159 // In reverse mode flat_mutation_reader drops any remaining rows of the // current partition when the page ends so it cannot be reused across @@ -198,7 +200,7 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt tracing::trace(trace_state, "Caching querier with key {}", key); - auto memory_usage = boost::accumulate(_entries | boost::adaptors::transformed(std::mem_fn(&entry::memory_usage)), size_t(0)); + auto memory_usage = boost::accumulate(entries | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0)); // We add the memory-usage of the to-be added querier to the memory-usage // of all the cached queriers. We now need to makes sure this number is @@ -207,39 +209,46 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt // it goes below the limit. memory_usage += q.memory_usage(); - if (memory_usage >= _max_queriers_memory_usage) { - auto it = _entries.begin(); - const auto end = _entries.end(); - while (it != end && memory_usage >= _max_queriers_memory_usage) { - ++_stats.memory_based_evictions; + if (memory_usage >= max_queriers_memory_usage) { + auto it = entries.begin(); + const auto end = entries.end(); + while (it != end && memory_usage >= max_queriers_memory_usage) { + ++stats.memory_based_evictions; memory_usage -= it->memory_usage(); - --_stats.population; - it = _entries.erase(it); + --stats.population; + it = entries.erase(it); } } - auto& e = _entries.emplace_back(key, std::move(q), lowres_clock::now() + _entry_ttl); - e.set_pos(--_entries.end()); - _index.insert(e); - ++_stats.population; + auto& e = entries.emplace_back(key, std::move(q), expires); + e.set_pos(--entries.end()); + index.insert(e); + ++stats.population; } -std::optional querier_cache::lookup(utils::UUID key, +void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) { + insert_querier(_entries, _index, _stats, _max_queriers_memory_usage, key, std::move(q), lowres_clock::now() + _entry_ttl, std::move(trace_state)); +} + +static std::optional lookup_querier(querier_cache::entries& entries, + querier_cache::index& index, + querier_cache::stats& stats, + utils::UUID key, emit_only_live_rows only_live, const schema& s, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state) { - auto it = find_querier(key, range, trace_state); - ++_stats.lookups; - if (it == _entries.end()) { - ++_stats.misses; + auto it = find_querier(entries, index, key, range, trace_state); + ++stats.lookups; + if (it == entries.end()) { + ++stats.misses; return std::nullopt; } auto q = std::move(*it).value(); - _entries.erase(it); - --_stats.population; + entries.erase(it); + --stats.population; const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice); if (can_be_used == querier::can_use::yes) { @@ -248,10 +257,19 @@ std::optional querier_cache::lookup(utils::UUID key, } tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used)); - ++_stats.drops; + ++stats.drops; return std::nullopt; } +std::optional querier_cache::lookup(utils::UUID key, + emit_only_live_rows only_live, + const schema& s, + const dht::partition_range& range, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state) { + return lookup_querier(_entries, _index, _stats, key, only_live, s, range, slice, std::move(trace_state)); +} + void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) { _entry_ttl = entry_ttl; _expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2); diff --git a/querier.hh b/querier.hh index f786e6bd4c..01ab85585c 100644 --- a/querier.hh +++ b/querier.hh @@ -262,7 +262,6 @@ public: uint64_t population = 0; }; -private: class entry : public boost::intrusive::set_base_hook> { // Self reference so that we can remove the entry given an `entry&`. std::list::iterator _pos; @@ -327,8 +326,6 @@ private: stats _stats; size_t _max_queriers_memory_usage; - entries::iterator find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state); - void scan_cache_entries(); public: