diff --git a/database.cc b/database.cc index 94a9501654..be4a99e8f4 100644 --- a/database.cc +++ b/database.cc @@ -2117,6 +2117,9 @@ database::database(const db::config& cfg, database_config dbcfg) [this] { ++_stats->sstable_read_queue_overloaded; return std::make_exception_ptr(std::runtime_error("sstable inactive read queue overloaded")); + }, + [this] { + return _querier_cache.evict_one(); }) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. diff --git a/mutation_reader.cc b/mutation_reader.cc index 8089364f5a..f29c043f1d 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -568,6 +568,9 @@ future> reader_concur return make_exception_future>(_make_queue_overloaded_exception()); } auto r = resources(1, static_cast(memory)); + if (!may_proceed(r) && _evict_an_inactive_reader) { + while (_evict_an_inactive_reader() && !may_proceed(r)); + } if (may_proceed(r)) { _resources -= r; return make_ready_future>(make_lw_shared(*this, r)); @@ -697,7 +700,7 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { }; std::variant _state; - static const std::size_t new_reader_base_cost{16 * 1024}; + static const ssize_t new_reader_base_cost{16 * 1024}; template GCC6_CONCEPT( diff --git a/querier.cc b/querier.cc index f6ff7b06ef..bbe7293d00 100644 --- a/querier.cc +++ b/querier.cc @@ -224,6 +224,23 @@ void querier_cache::set_entry_ttl(std::chrono::seconds entry_ttl) { _expiry_timer.rearm(lowres_clock::now() + _entry_ttl / 2, _entry_ttl / 2); } +bool querier_cache::evict_one() { + if (_entries.empty()) { + return false; + } + + auto it = _meta_entries.begin(); + const auto end = _meta_entries.end(); + while (it != end) { + const auto is_live = bool(*it); + it = _meta_entries.erase(it); + if (is_live) { + return true; + } + } + return false; +} + querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page) : _cache(&cache) , _key(key) diff --git a/querier.hh b/querier.hh index ac38249db8..682e9d9e13 100644 --- a/querier.hh +++ b/querier.hh @@ -224,6 +224,9 @@ public: /// Inserted queriers will have a TTL. When this expires the querier is /// evicted. This is to avoid excess and unnecessary resource usage due to /// abandoned queriers. +/// Provides a way to evict readers one-by-one via `evict_one()`. This can be +/// used by the concurrency-limiting code to evict cached readers to free up +/// resources for admitting new ones. class querier_cache { public: static const std::chrono::seconds default_entry_ttl; @@ -282,6 +285,10 @@ private: bool is_expired(const lowres_clock::time_point& now) const { return !_entry_ptr || _entry_ptr->is_expired(now); } + + explicit operator bool() const { + return bool(_entry_ptr); + } }; entries _entries; @@ -329,8 +336,13 @@ public: tracing::trace_state_ptr trace_state, const noncopyable_function& create_fun); - void set_entry_ttl(std::chrono::seconds entry_ttl); + + /// Evict a querier. + /// + /// Return true if a querier was evicted and false otherwise (if the cache + /// is empty). + bool evict_one(); }; class querier_cache_context { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 976bdd4e9f..158f57f6d9 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -130,6 +130,7 @@ private: size_t _max_queue_length = std::numeric_limits::max(); std::function _make_queue_overloaded_exception = default_make_queue_overloaded_exception; + std::function _evict_an_inactive_reader; bool has_available_units(const resources& r) const { return bool(_resources) && _resources >= r; @@ -152,10 +153,12 @@ public: reader_concurrency_semaphore(unsigned count, size_t memory, size_t max_queue_length = std::numeric_limits::max(), - std::function raise_queue_overloaded_exception = default_make_queue_overloaded_exception) + std::function raise_queue_overloaded_exception = default_make_queue_overloaded_exception, + std::function evict_an_inactive_reader = {}) : _resources(count, memory) , _max_queue_length(max_queue_length) - , _make_queue_overloaded_exception(raise_queue_overloaded_exception) { + , _make_queue_overloaded_exception(raise_queue_overloaded_exception) + , _evict_an_inactive_reader(std::move(evict_an_inactive_reader)) { } reader_concurrency_semaphore(const reader_concurrency_semaphore&) = delete;