Merge 'Improve load shedding on the replica side' from Łukasz Paszkowski

When reads arrive, they have to wait for admission on the reader
concurrency semaphore. If the node is overloaded, the reads will
be queued. They can time out while in the queue, but will not time
out once admitted.

Once the shard is sufficiently loaded, it is possible that most
queued reads will time out, because the average time it takes to
for a queued read to be admitted is around that of the timeout.

If a read times out, any work we already did, or are about to do
on it is wasted effort. Therefore, the patch tries to prevent it
by checking if an admitted read has a chance to complete in time
and abort it if not. It uses the following criteria:

if read's remaining time <= read's timeout when arrived to the semaphore * live updateable preemptive_abort_factor;
the read is rejected and the next one from the wait list is considered.

Fixes https://github.com/scylladb/scylladb/issues/14909
Fixes: SCYLLADB-353

Backport is not needed. Better to first observe its impact.

Closes scylladb/scylladb#21649

* github.com:scylladb/scylladb:
  reader_concurrency_semaphore: Check during admission if read may timeout
  permit_reader::impl: Replace break with return after evicting inactive permit on timeout
  reader_concurrency_semaphore: Add preemptive_abort_factor to constructors
  config: Add parameters to control reads' preemptive_abort_factor
  permit_reader: Add a new state: preemptive_aborted
  reader_concurrency_semaphore: validate waiters counter when dequeueing a waiting permit
  reader_concurrency_semaphore: Remove cpu_concurrency's default value
This commit is contained in:
Botond Dénes
2026-01-29 08:27:22 +02:00
13 changed files with 161 additions and 39 deletions

View File

@@ -1394,6 +1394,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Start killing reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, reader_concurrency_semaphore_cpu_concurrency(this, "reader_concurrency_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 2,
"Admit new reads while there are less than this number of requests that need CPU.")
, reader_concurrency_semaphore_preemptive_abort_factor(this, "reader_concurrency_semaphore_preemptive_abort_factor", liveness::LiveUpdate, value_status::Used, 0.3,
"Admit new reads while their remaining time is more than this factor times their timeout times when arrived to a semaphore. Its vale means\n"
"* <= 0.0 means new reads will never get rejected during admission\n"
"* >= 1.0 means new reads will always get rejected during admission\n")
, view_update_reader_concurrency_semaphore_serialize_limit_multiplier(this, "view_update_reader_concurrency_semaphore_serialize_limit_multiplier", liveness::LiveUpdate, value_status::Used, 2,
"Start serializing view update reads after their collective memory consumption goes above $normal_limit * $multiplier.")
, view_update_reader_concurrency_semaphore_kill_limit_multiplier(this, "view_update_reader_concurrency_semaphore_kill_limit_multiplier", liveness::LiveUpdate, value_status::Used, 4,

View File

@@ -446,6 +446,7 @@ public:
named_value<uint32_t> reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> reader_concurrency_semaphore_cpu_concurrency;
named_value<float> reader_concurrency_semaphore_preemptive_abort_factor;
named_value<uint32_t> view_update_reader_concurrency_semaphore_serialize_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_kill_limit_multiplier;
named_value<uint32_t> view_update_reader_concurrency_semaphore_cpu_concurrency;

View File

@@ -78,6 +78,7 @@ Permits are in one of the following states:
* `active/await` - a previously `active/need_cpu` permit, which needs something other than CPU to proceed, it is waiting on I/O or a remote shards, other permits can be admitted while the permit is in this state, pending resource availability;
* `inactive` - the permit was marked inactive, it can be evicted to make room for admitting more permits if needed;
* `evicted` - a former inactive permit which was evicted, the permit has to undergo admission again for the read to resume;
* `preemptive_aborted` - the permit timed out or was rejected during admission as it was detected the read might time out later during execution;
Note that some older releases will have different names for some of these states or lack some of the states altogether:

View File

@@ -148,6 +148,7 @@ public:
};
private:
const db::timeout_clock::time_point _created;
reader_concurrency_semaphore& _semaphore;
schema_ptr _schema;
@@ -237,17 +238,25 @@ private:
break;
case state::inactive:
_semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time);
break;
// Return here on purpose. The evicted permit is destroyed when closing a reader.
// As a consequence, any member access beyond this point is invalid.
return;
case state::evicted:
case state::preemptive_aborted:
break;
}
// The function call not only sets state to reader_permit::state::preemptive_aborted
// but also correctly decreases the statistics i.e. need_cpu_permits and awaits_permits.
on_permit_inactive(reader_permit::state::preemptive_aborted);
}
public:
struct value_tag {};
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
: _semaphore(semaphore)
: _created(db::timeout_clock::now())
, _semaphore(semaphore)
, _schema(std::move(schema))
, _op_name_view(op_name)
, _base_resources(base_resources)
@@ -258,7 +267,8 @@ public:
_semaphore.on_permit_created(*this);
}
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
: _semaphore(semaphore)
: _created(db::timeout_clock::now())
, _semaphore(semaphore)
, _schema(std::move(schema))
, _op_name(std::move(op_name))
, _op_name_view(_op_name)
@@ -360,6 +370,17 @@ public:
on_permit_active();
}
void on_preemptive_aborted() {
if (_state != reader_permit::state::waiting_for_admission && _state != reader_permit::state::waiting_for_memory) {
on_internal_error(rcslog, format("on_preemptive_aborted(): permit in invalid state {}", _state));
}
_ttl_timer.cancel();
_state = reader_permit::state::preemptive_aborted;
_aux_data.pr.set_exception(named_semaphore_aborted(_semaphore._name));
_semaphore.on_permit_preemptive_aborted();
}
void on_register_as_inactive() {
SCYLLA_ASSERT(_state == reader_permit::state::active || _state == reader_permit::state::active_need_cpu || _state == reader_permit::state::waiting_for_memory);
on_permit_inactive(reader_permit::state::inactive);
@@ -467,6 +488,10 @@ public:
return _semaphore.do_wait_admission(*this);
}
db::timeout_clock::time_point created() const noexcept {
return _created;
}
db::timeout_clock::time_point timeout() const noexcept {
return _ttl_timer.armed() ? _ttl_timer.get_timeout() : db::no_timeout;
}
@@ -689,6 +714,9 @@ auto fmt::formatter<reader_permit::state>::format(reader_permit::state s, fmt::f
case reader_permit::state::evicted:
name = "evicted";
break;
case reader_permit::state::preemptive_aborted:
name = "preemptive_aborted";
break;
}
return formatter<string_view>::format(name, ctx);
}
@@ -1038,6 +1066,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics)
: _initial_resources(count, memory)
, _resources(count, memory)
@@ -1047,6 +1076,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(cpu_concurrency)
, _preemptive_abort_factor(preemptive_abort_factor)
, _close_readers_gate(format("[reader_concurrency_semaphore {}] close_readers", _name))
, _permit_gate(format("[reader_concurrency_semaphore {}] permit", _name))
{
@@ -1114,6 +1144,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring na
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(float(0.0)),
metrics) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
@@ -1489,6 +1520,25 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
auto& permit = _wait_list.front();
dequeue_permit(permit);
try {
// Do not admit the read as it is unlikely to finish before its timeout. The condition is:
// permit's remaining time <= preemptive_abort_factor * permit's time budget
//
// The additional check for remaining_time > 0 is to avoid preemptive aborting reads
// that already timed out but are still in the wait list due to scheduling delays.
// It also effectively disables preemptive aborting when the factor is set to 0.
const auto time_budget = permit.timeout() - permit.created();
const auto remaining_time = permit.timeout() - db::timeout_clock::now();
if (remaining_time > db::timeout_clock::duration::zero() && remaining_time <= _preemptive_abort_factor() * time_budget) {
permit.on_preemptive_aborted();
using ms = std::chrono::milliseconds;
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] read shed as unlikely to finish (elapsed: {}, timeout: {}, preemptive_factor: {})",
_name,
std::chrono::duration_cast<ms>(time_budget - remaining_time),
std::chrono::duration_cast<ms>(time_budget),
_preemptive_abort_factor());
continue;
}
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
_blessed_permit = &permit;
permit.on_granted_memory();
@@ -1549,7 +1599,11 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
case reader_permit::state::waiting_for_admission:
case reader_permit::state::waiting_for_memory:
case reader_permit::state::waiting_for_execution:
--_stats.waiters;
if (_stats.waiters > 0) {
--_stats.waiters;
} else {
on_internal_error_noexcept(rcslog, "reader_concurrency_semaphore::dequeue_permit(): invalid state: no waiters yet dequeueing a waiting permit");
}
break;
case reader_permit::state::inactive:
case reader_permit::state::evicted:
@@ -1558,12 +1612,17 @@ void reader_concurrency_semaphore::dequeue_permit(reader_permit::impl& permit) {
case reader_permit::state::active:
case reader_permit::state::active_need_cpu:
case reader_permit::state::active_await:
case reader_permit::state::preemptive_aborted:
on_internal_error_noexcept(rcslog, format("reader_concurrency_semaphore::dequeue_permit(): unrecognized queued state: {}", permit.get_state()));
}
permit.unlink();
_permit_list.push_back(permit);
}
void reader_concurrency_semaphore::on_permit_preemptive_aborted() noexcept {
++_stats.total_reads_shed_due_to_overload;
}
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
_permit_gate.enter();
_permit_list.push_back(permit);

View File

@@ -42,7 +42,7 @@ using mutation_reader_opt = optimized_optional<mutation_reader>;
/// number of waiting readers becomes equal or greater than
/// `max_queue_length` (upon calling `obtain_permit()`) an exception of
/// type `std::runtime_error` is thrown. Optionally, some additional
/// code can be executed just before throwing (`prethrow_action`
/// code can be executed just before throwing (`prethrow_action`
/// constructor parameter).
///
/// The semaphore has 3 layers of defense against consuming more memory
@@ -89,6 +89,7 @@ public:
// Total number of failed reads executed through this semaphore.
uint64_t total_failed_reads = 0;
// Total number of reads rejected because the admission queue reached its max capacity
// or rejected due to a high probability of not getting finalized on time.
uint64_t total_reads_shed_due_to_overload = 0;
// Total number of reads killed due to the memory consumption reaching the kill limit.
uint64_t total_reads_killed_due_to_kill_limit = 0;
@@ -192,6 +193,8 @@ private:
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
utils::updateable_value<float> _preemptive_abort_factor;
stats _stats;
std::optional<seastar::metrics::metric_groups> _metrics;
bool _stopped = false;
@@ -250,6 +253,8 @@ private:
void on_permit_created(reader_permit::impl&);
void on_permit_destroyed(reader_permit::impl&) noexcept;
void on_permit_preemptive_aborted() noexcept;
void on_permit_need_cpu() noexcept;
void on_permit_not_need_cpu() noexcept;
@@ -287,6 +292,7 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics);
reader_concurrency_semaphore(
@@ -296,9 +302,12 @@ public:
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
register_metrics metrics)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), utils::updateable_value<uint32_t>(1), metrics)
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), std::move(cpu_concurrency),
std::move(preemptive_abort_factor), metrics)
{ }
/// Create a semaphore with practically unlimited count and memory.
@@ -318,9 +327,10 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> cpu_concurrency = utils::updateable_value<uint32_t>(1),
utils::updateable_value<float> preemptive_abort_factor = utils::updateable_value<float>(0.0f),
register_metrics metrics = register_metrics::no)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), std::move(cpu_concurrency), metrics)
std::move(kill_limit_multipler), std::move(cpu_concurrency), std::move(preemptive_abort_factor), metrics)
{}
virtual ~reader_concurrency_semaphore();

View File

@@ -70,7 +70,8 @@ reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(
_max_queue_length,
_serialize_limit_multiplier,
_kill_limit_multiplier,
_cpu_concurrency
_cpu_concurrency,
_preemptive_abort_factor
);
auto&& it = result.first;
// since we serialize all group changes this change wait will be queues and no further operations

View File

@@ -26,6 +26,7 @@ class reader_concurrency_semaphore_group {
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
utils::updateable_value<uint32_t> _cpu_concurrency;
utils::updateable_value<float> _preemptive_abort_factor;
friend class database_test_wrapper;
@@ -36,11 +37,12 @@ class reader_concurrency_semaphore_group {
weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency)
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor)
: weight(shares)
, memory_share(0)
, sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier),
std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {}
std::move(cpu_concurrency), std::move(preemptive_abort_factor), reader_concurrency_semaphore::register_metrics::yes) {}
};
std::unordered_map<scheduling_group, weighted_reader_concurrency_semaphore> _semaphores;
@@ -54,6 +56,7 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
utils::updateable_value<uint32_t> cpu_concurrency,
utils::updateable_value<float> preemptive_abort_factor,
std::optional<sstring> name_prefix = std::nullopt)
: _total_memory(memory)
, _total_weight(0)
@@ -62,6 +65,7 @@ public:
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
, _cpu_concurrency(std::move(cpu_concurrency))
, _preemptive_abort_factor(std::move(preemptive_abort_factor))
, _operations_serializer(1)
, _name_prefix(std::move(name_prefix)) { }

View File

@@ -92,6 +92,7 @@ public:
active_await,
inactive,
evicted,
preemptive_aborted,
};
class impl;

View File

@@ -412,6 +412,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::yes)
// No limits, just for accounting.
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)
@@ -423,6 +424,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::yes)
, _view_update_read_concurrency_semaphores_group(
max_memory_concurrent_view_update_reads(),
@@ -431,6 +434,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,
utils::updateable_value(0.0f),
"view_update")
, _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value<double>(), cache_tracker::register_metrics::yes)
, _apply_stage("db_apply", &database::do_apply)
@@ -460,7 +464,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(),
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.reader_concurrency_semaphore_cpu_concurrency)
_cfg.reader_concurrency_semaphore_cpu_concurrency,
_cfg.reader_concurrency_semaphore_preemptive_abort_factor)
, _stop_barrier(std::move(barrier))
, _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); })
, _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer()))

View File

@@ -45,6 +45,8 @@ sstables_manager::sstables_manager(
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(uint32_t(1)),
utils::updateable_value(0.0f),
reader_concurrency_semaphore::register_metrics::no)
, _dir_semaphore(dir_sem)
, _resolve_host_id(std::move(resolve_host_id))

View File

@@ -2152,6 +2152,7 @@ struct scoped_execption_log_level {
SEASTAR_TEST_CASE(replica_read_timeout_no_exception) {
cql_test_config cfg;
cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0);
const auto read_timeout = 10ms;
const auto write_timeout = 10s;
cfg.query_timeout.emplace(timeout_config{

View File

@@ -517,6 +517,38 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
});
}
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_abort) {
const auto preemptive_abort_factor = 0.5f;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost,
100, utils::updateable_value(std::numeric_limits<uint32_t>::max()), utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t>(1), utils::updateable_value<float>(preemptive_abort_factor));
auto stop_sem = deferred_stop(semaphore);
{
BOOST_REQUIRE(semaphore.get_stats().total_reads_shed_due_to_overload == 0);
auto timeout = db::timeout_clock::now() + 500ms;
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get();
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {});
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
// The permits are rejected when the remaining time is less than half of its timeout when arrived to the semaphore.
// Hence, sleep 300ms to reject the permits in the waitlist during admission.
seastar::sleep(300ms).get();
permit1 = {};
const auto futures_failed = eventually_true([&] { return permit2_fut.failed(); });
BOOST_CHECK(futures_failed);
BOOST_CHECK_THROW(std::rethrow_exception(permit2_fut.get_exception()), semaphore_aborted);
BOOST_CHECK(semaphore.get_stats().total_reads_shed_due_to_overload > 0);
}
// All units should have been deposited back.
REQUIRE_EVENTUALLY_EQUAL<ssize_t>([&] { return semaphore.available_resources().memory; }, replica::new_reader_base_cost);
}
SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
return async([&] () {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost, 2);
@@ -597,7 +629,8 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
permit.resources = permit.permit->consume_resources(reader_resources(tests::random::get_int<unsigned>(0, 1), tests::random::get_int<unsigned>(1024, 16 * 1024 * 1024)));
} else {
const auto timeout_seconds = tests::random::get_int<unsigned>(0, 3);
//Ensure timeout_seconds > 0 to avoid permits being rejected during admission. The test will become flaky.
const auto timeout_seconds = tests::random::get_int<unsigned>(1, 4);
permit.permit_fut = semaphore.obtain_permit(
schema,
@@ -1226,11 +1259,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) {
auto serialize_multiplier = utils::updateable_value_source<uint32_t>(2);
auto kill_multiplier = utils::updateable_value_source<uint32_t>(3);
auto cpu_concurrency = utils::updateable_value_source<uint32_t>(1);
auto preemptive_abort_factor = utils::updateable_value_source<float>(0.0f);
reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000,
utils::updateable_value(serialize_multiplier),
utils::updateable_value(kill_multiplier),
utils::updateable_value(cpu_concurrency));
utils::updateable_value(cpu_concurrency),
utils::updateable_value(preemptive_abort_factor));
auto stop_sem = deferred_stop(sem_group);
circular_buffer<scheduling_group> recycle_bin;
@@ -1472,8 +1508,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks
const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024};
const auto serialize_multiplier = 2;
const auto kill_multiplier = 3;
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
auto stop_sem = deferred_stop(semaphore);
const size_t reader_count_target = 6;
@@ -1726,9 +1762,8 @@ SEASTAR_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_engages) {
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preserves_state) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
auto stop_sem = deferred_stop(semaphore);
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -1789,9 +1824,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_inactive) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1851,9 +1885,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_memory_goes_inactive) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count,
initial_resources.memory, 100, utils::updateable_value<uint32_t>(serialize_multiplier));
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -1897,10 +1930,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me
// This test covers all the cases where eviction should **not** happen.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicting) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1990,10 +2020,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicti
// Check that inactive reads are evicted when they are blocking admission
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -2147,10 +2174,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
// resources.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeup) {
const auto initial_resources = reader_concurrency_semaphore::resources{2, 4 * 1024};
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory, 100);
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();
@@ -2186,6 +2210,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
const uint32_t initial_memory = 4 * 1024;
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
const auto cpu_concurrency = 1;
const auto preemptive_abort_factor = 0.0f;
reader_concurrency_semaphore semaphore(
utils::updateable_value(count),
@@ -2194,7 +2220,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
100,
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<uint32_t>(cpu_concurrency),
utils::updateable_value<float>(preemptive_abort_factor),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2214,6 +2241,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
const uint32_t initial_memory = 4 * 1024;
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
const auto preemptive_abort_factor = 0.0f;
reader_concurrency_semaphore semaphore(
utils::updateable_value<int>(initial_count),
@@ -2223,6 +2251,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_cpu_concu
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
utils::updateable_value(cpu_concurrency),
utils::updateable_value<float>(preemptive_abort_factor),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2275,6 +2304,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_wait_queue_overload_c
utils::updateable_value<uint32_t>(2),
utils::updateable_value<uint32_t>(4),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2328,6 +2358,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
utils::updateable_value<uint32_t>(2),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(2),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2392,6 +2423,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_perm
utils::updateable_value<uint32_t>(200),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
@@ -2433,6 +2465,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resource
utils::updateable_value<uint32_t>(200),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(1),
utils::updateable_value<float>(0.0f),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);

View File

@@ -4837,8 +4837,8 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
// of course doesn't necessarily help release pressure on the semaphore.
SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) {
simple_schema s;
reader_concurrency_semaphore semaphore(100, 1, get_name(), std::numeric_limits<size_t>::max(), utils::updateable_value<uint32_t>(1),
utils::updateable_value<uint32_t>(1), reader_concurrency_semaphore::register_metrics::no);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 1, std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t>(1), utils::updateable_value<uint32_t>(1));
auto stop_semaphore = deferred_stop(semaphore);
cache_tracker tracker;