mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 12:06:44 +00:00
Readers serving user-reads need to obtain a permit to start reading. There exists a restriction on how much active readers can be admitted based on their count and their memory onsumption. Since the saved readers of cached queriers are techically active (they hold a permit) they can block new readers from obtaining a permit. New readers have a higher priority because a cached reader might be abandoned or used later at best so in the face of memory pressure we evict cached readers to free up permits for new readers. Cached queriers are evicted in LRU order as the oldest queriers are the most likely to be evicted based on their TTL anyway.
267 lines
9.7 KiB
C++
267 lines
9.7 KiB
C++
/*
|
|
* Copyright (C) 2018 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "querier.hh"
|
|
|
|
#include "schema.hh"
|
|
|
|
static sstring cannot_use_reason(querier::can_use cu)
|
|
{
|
|
switch (cu)
|
|
{
|
|
case querier::can_use::yes:
|
|
return "can be used";
|
|
case querier::can_use::no_emit_only_live_rows_mismatch:
|
|
return "emit only live rows mismatch";
|
|
case querier::can_use::no_schema_version_mismatch:
|
|
return "schema version mismatch";
|
|
case querier::can_use::no_ring_pos_mismatch:
|
|
return "ring pos mismatch";
|
|
case querier::can_use::no_clustering_pos_mismatch:
|
|
return "clustering pos mismatch";
|
|
}
|
|
return "unknown reason";
|
|
}
|
|
|
|
querier::position querier::current_position() const {
|
|
const dht::decorated_key* dk = std::visit([] (const auto& cs) { return cs->current_partition(); }, _compaction_state);
|
|
const clustering_key_prefix* clustering_key = *_last_ckey ? &**_last_ckey : nullptr;
|
|
return {dk, clustering_key};
|
|
}
|
|
|
|
bool querier::ring_position_matches(const dht::partition_range& range, const querier::position& pos) const {
|
|
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(_slice->options.contains(query::partition_slice::option::reversed));
|
|
|
|
const auto expected_start = dht::ring_position_view(*pos.partition_key);
|
|
// If there are no clustering columns or the select is distinct we don't
|
|
// have clustering rows at all. In this case we can be sure we won't have
|
|
// anything more in the last page's partition and thus the start bound is
|
|
// exclusive. Otherwise there migh be clustering rows still and it is
|
|
// inclusive.
|
|
const auto expected_inclusiveness = _schema->clustering_key_size() > 0 &&
|
|
!_slice->options.contains<query::partition_slice::option::distinct>() &&
|
|
pos.clustering_key;
|
|
const auto comparator = dht::ring_position_comparator(*_schema);
|
|
|
|
if (is_reversed && !range.is_singular()) {
|
|
const auto& end = range.end();
|
|
return end && comparator(end->value(), expected_start) == 0 && end->is_inclusive() == expected_inclusiveness;
|
|
}
|
|
|
|
const auto& start = range.start();
|
|
return start && comparator(start->value(), expected_start) == 0 && start->is_inclusive() == expected_inclusiveness;
|
|
}
|
|
|
|
bool querier::clustering_position_matches(const query::partition_slice& slice, const querier::position& pos) const {
|
|
const auto& row_ranges = slice.row_ranges(*_schema, pos.partition_key->key());
|
|
|
|
if (row_ranges.empty()) {
|
|
// This is a valid slice on the last page of a query with
|
|
// clustering restrictions. It simply means the query is
|
|
// effectively over, no further results are expected. We
|
|
// can assume the clustering position matches.
|
|
return true;
|
|
}
|
|
|
|
if (!pos.clustering_key) {
|
|
// We stopped at a non-clustering position so the partition's clustering
|
|
// row ranges should be the default row ranges.
|
|
return &row_ranges == &slice.default_row_ranges();
|
|
}
|
|
|
|
clustering_key_prefix::equality eq(*_schema);
|
|
|
|
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(_slice->options.contains(query::partition_slice::option::reversed));
|
|
|
|
// If the page ended mid-partition the first partition range should start
|
|
// with the last clustering key (exclusive).
|
|
const auto& first_row_range = row_ranges.front();
|
|
const auto& start = is_reversed ? first_row_range.end() : first_row_range.start();
|
|
if (!start) {
|
|
return false;
|
|
}
|
|
return !start->is_inclusive() && eq(start->value(), *pos.clustering_key);
|
|
}
|
|
|
|
bool querier::matches(const dht::partition_range& range) const {
|
|
const auto& qr = *_range;
|
|
if (qr.is_singular() != range.is_singular()) {
|
|
return false;
|
|
}
|
|
|
|
const auto cmp = dht::ring_position_comparator(*_schema);
|
|
const auto bound_eq = [&] (const stdx::optional<dht::partition_range::bound>& a, const stdx::optional<dht::partition_range::bound>& b) {
|
|
return bool(a) == bool(b) && (!a || a->equal(*b, cmp));
|
|
};
|
|
|
|
return qr.is_singular() ?
|
|
bound_eq(qr.start(), range.start()) :
|
|
bound_eq(qr.start(), range.start()) || bound_eq(qr.end(), range.end());
|
|
}
|
|
|
|
querier::can_use querier::can_be_used_for_page(emit_only_live_rows only_live, const schema& s,
|
|
const dht::partition_range& range, const query::partition_slice& slice) const {
|
|
if (only_live != emit_only_live_rows(std::holds_alternative<lw_shared_ptr<compact_for_data_query_state>>(_compaction_state))) {
|
|
return can_use::no_emit_only_live_rows_mismatch;
|
|
}
|
|
if (s.version() != _schema->version()) {
|
|
return can_use::no_schema_version_mismatch;
|
|
}
|
|
|
|
const auto pos = current_position();
|
|
|
|
if (!pos.partition_key) {
|
|
// There was nothing read so far so we assume we are ok.
|
|
return can_use::yes;
|
|
}
|
|
|
|
if (!ring_position_matches(range, pos)) {
|
|
return can_use::no_ring_pos_mismatch;
|
|
}
|
|
if (!clustering_position_matches(slice, pos)) {
|
|
return can_use::no_clustering_pos_mismatch;
|
|
}
|
|
return can_use::yes;
|
|
}
|
|
|
|
// The time-to-live of a cache-entry.
|
|
const std::chrono::seconds querier_cache::default_entry_ttl{10};
|
|
|
|
void querier_cache::scan_cache_entries() {
|
|
const auto now = lowres_clock::now();
|
|
|
|
auto it = _meta_entries.begin();
|
|
const auto end = _meta_entries.end();
|
|
while (it != end && it->is_expired(now)) {
|
|
it = _meta_entries.erase(it);
|
|
}
|
|
}
|
|
|
|
querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) {
|
|
const auto queriers = _entries.equal_range(key);
|
|
|
|
if (queriers.first == _entries.end()) {
|
|
tracing::trace(trace_state, "Found no cached querier for key {}", key);
|
|
return _entries.end();
|
|
}
|
|
|
|
const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair<const utils::UUID, entry>& elem) {
|
|
return elem.second.get().matches(range);
|
|
});
|
|
|
|
if (it == queriers.second) {
|
|
tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range {}", key, range);
|
|
}
|
|
tracing::trace(trace_state, "Found cached querier for key {} and range {}", key, range);
|
|
return it;
|
|
}
|
|
|
|
querier_cache::querier_cache(std::chrono::seconds entry_ttl)
|
|
: _expiry_timer([this] { scan_cache_entries(); })
|
|
, _entry_ttl(entry_ttl) {
|
|
_expiry_timer.arm_periodic(entry_ttl / 2);
|
|
}
|
|
|
|
void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_ptr trace_state) {
|
|
// FIXME: see #3159
|
|
// In reverse mode flat_mutation_reader drops any remaining rows of the
|
|
// current partition when the page ends so it cannot be reused across
|
|
// pages.
|
|
if (q.is_reversed()) {
|
|
return;
|
|
}
|
|
|
|
tracing::trace(trace_state, "Caching querier with key {}", key);
|
|
const auto it = _entries.emplace(key, entry::param{std::move(q), _entry_ttl}).first;
|
|
_meta_entries.emplace_back(_entries, it);
|
|
}
|
|
|
|
querier querier_cache::lookup(utils::UUID key,
|
|
emit_only_live_rows only_live,
|
|
const schema& s,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
const noncopyable_function<querier()>& create_fun) {
|
|
auto it = find_querier(key, range, trace_state);
|
|
if (it == _entries.end()) {
|
|
return create_fun();
|
|
}
|
|
|
|
auto q = std::move(it->second).get();
|
|
_entries.erase(it);
|
|
|
|
const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice);
|
|
if (can_be_used == querier::can_use::yes) {
|
|
tracing::trace(trace_state, "Reusing querier");
|
|
return q;
|
|
}
|
|
|
|
tracing::trace(trace_state, "Dropping querier because {}", cannot_use_reason(can_be_used));
|
|
return create_fun();
|
|
}
|
|
|
|
void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) {
|
|
_entry_ttl = entry_ttl;
|
|
_expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2);
|
|
}
|
|
|
|
bool querier_cache::evict_one() {
|
|
if (_entries.empty()) {
|
|
return false;
|
|
}
|
|
|
|
auto it = _meta_entries.begin();
|
|
const auto end = _meta_entries.end();
|
|
while (it != end) {
|
|
const auto is_live = bool(*it);
|
|
it = _meta_entries.erase(it);
|
|
if (is_live) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page)
|
|
: _cache(&cache)
|
|
, _key(key)
|
|
, _is_first_page(is_first_page) {
|
|
}
|
|
|
|
void querier_cache_context::insert(querier&& q, tracing::trace_state_ptr trace_state) {
|
|
if (_cache && _key != utils::UUID{}) {
|
|
_cache->insert(_key, std::move(q), std::move(trace_state));
|
|
}
|
|
}
|
|
|
|
querier querier_cache_context::lookup(emit_only_live_rows only_live,
|
|
const schema& s,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
const noncopyable_function<querier()>& create_fun) {
|
|
if (_cache && _key != utils::UUID{} && !_is_first_page) {
|
|
return _cache->lookup(_key, only_live, s, range, slice, std::move(trace_state), create_fun);
|
|
}
|
|
return create_fun();
|
|
}
|