reader_concurrency_semaphore: Optimize resource_units destruction by postponing wait list processing
Observed 3% throughput improvement in sstable-heavy workload bounded by CPU.
SStable parsing involves lots of buffer operations which obtain and
destroy resource_units. Before the patch, reosurce_unit destruction
invoked maybe_admit_waiters(), which performs some computations on
waiting permits. We don't really need to admit on each change of
resources, since the CPU is used by other things anyway. We can batch
the computation. There is already a fiber which does this for
processing the _ready_list. We can reuse it for processing _wait_list
as well.
The changes violate an assumption made by tests that releasing
resources immediately triggers an admission check. Therefore, some of
the BOOST_REQUIRE_EQUAL needs to be replaced with REQUIRE_EVENTUALLY_EQUAL
as the admision check is now done in the fiber processing the _ready_list.
`perf-simple-query` --tablets --smp 1 -m 1G results obtained for
fixed 400MHz frequency:
Before:
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
112590.60 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41353 insns/op, 17992 cycles/op, 0 errors)
122620.68 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41310 insns/op, 17713 cycles/op, 0 errors)
118169.48 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41353 insns/op, 17857 cycles/op, 0 errors)
120634.65 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41328 insns/op, 17733 cycles/op, 0 errors)
117317.18 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41347 insns/op, 17822 cycles/op, 0 errors)
throughput: mean=118266.52 standard-deviation=3797.81 median=118169.48 median-absolute-deviation=2368.13 maximum=122620.68 minimum=112590.60
instructions_per_op: mean=41337.86 standard-deviation=18.73 median=41346.89 median-absolute-deviation=14.64 maximum=41352.53 minimum=41309.83
cpu_cycles_per_op: mean=17823.50 standard-deviation=111.75 median=17821.97 median-absolute-deviation=90.45 maximum=17992.04 minimum=17713.00
```
After
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
123689.63 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40997 insns/op, 17384 cycles/op, 0 errors)
129643.24 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40997 insns/op, 17325 cycles/op, 0 errors)
128907.27 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41009 insns/op, 17325 cycles/op, 0 errors)
130342.56 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40993 insns/op, 17286 cycles/op, 0 errors)
130294.09 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40972 insns/op, 17336 cycles/op, 0 errors)
throughput: mean=128575.36 standard-deviation=2792.75 median=129643.24 median-absolute-deviation=1718.73 maximum=130342.56 minimum=123689.63
instructions_per_op: mean=40993.51 standard-deviation=13.23 median=40996.73 median-absolute-deviation=3.30 maximum=41008.86 minimum=40972.48
cpu_cycles_per_op: mean=17331.16 standard-deviation=35.02 median=17324.84 median-absolute-deviation=6.49 maximum=17383.97 minimum=17286.33
```
Closes scylladb/scylladb#21918
[avi: patch was co-authored by Łukasz Paszkowski <lukasz.paszkowski@scylladb.com>]
This commit is contained in:
committed by
Avi Kivity
parent
b32b7ab806
commit
bf3d0b3543
@@ -967,6 +967,8 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept {
|
||||
co_return;
|
||||
}
|
||||
|
||||
maybe_admit_waiters();
|
||||
|
||||
while (!_ready_list.empty()) {
|
||||
auto& permit = _ready_list.front();
|
||||
dequeue_permit(permit);
|
||||
@@ -1023,7 +1025,7 @@ void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resource
|
||||
|
||||
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
namespace sm = seastar::metrics;
|
||||
@@ -1139,7 +1141,7 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
|
||||
permit->on_register_as_inactive();
|
||||
if (_blessed_permit == &*permit) {
|
||||
_blessed_permit = nullptr;
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
if (!should_evict_inactive_read()) {
|
||||
try {
|
||||
@@ -1463,7 +1465,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit::impl& pe
|
||||
// So at any point in time, there should either be no waiters, or it
|
||||
// shouldn't be able to admit new reads. Otherwise something went wrong.
|
||||
maybe_dump_reader_permit_diagnostics(*this, "semaphore could admit new reads yet there are waiters", nullptr);
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
} else if (admit == can_admit::maybe) {
|
||||
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] evicting inactive reads in the background to free up resources", _name);
|
||||
++_stats.reads_queued_with_eviction;
|
||||
@@ -1512,6 +1514,12 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::maybe_wake_execution_loop() noexcept {
|
||||
if (!_wait_list.empty()) {
|
||||
_ready_list_cv.signal();
|
||||
}
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::request_memory(reader_permit::impl& permit, size_t memory) {
|
||||
// Already blocked on memory?
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
@@ -1568,7 +1576,7 @@ void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& perm
|
||||
--_stats.current_permits;
|
||||
if (_blessed_permit == &permit) {
|
||||
_blessed_permit = nullptr;
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1580,13 +1588,13 @@ void reader_concurrency_semaphore::on_permit_not_need_cpu() noexcept {
|
||||
SCYLLA_ASSERT(_stats.need_cpu_permits);
|
||||
--_stats.need_cpu_permits;
|
||||
SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_awaits() noexcept {
|
||||
++_stats.awaits_permits;
|
||||
SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_not_awaits() noexcept {
|
||||
@@ -1652,7 +1660,7 @@ void reader_concurrency_semaphore::set_resources(resources r) {
|
||||
auto delta = r - _initial_resources;
|
||||
_initial_resources = r;
|
||||
_resources += delta;
|
||||
maybe_admit_waiters();
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
|
||||
@@ -231,6 +231,8 @@ private:
|
||||
|
||||
void maybe_admit_waiters() noexcept;
|
||||
|
||||
void maybe_wake_execution_loop() noexcept;
|
||||
|
||||
// Request more memory for the permit.
|
||||
// Request is instantly granted while memory consumption of all reads is
|
||||
// below _kill_limit_multiplier.
|
||||
|
||||
@@ -851,6 +851,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2));
|
||||
|
||||
BOOST_REQUIRE(eventually_true([&] { return !irh1 || !irh2; }));
|
||||
require_can_admit(true, "evictable reads");
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
@@ -870,7 +871,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_enqueued_for_admission, stats_before.reads_enqueued_for_admission + 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
|
||||
|
||||
std::ignore = post_enqueue_hook(cookie1);
|
||||
[[maybe_unused]] auto guard = post_enqueue_hook(cookie1);
|
||||
|
||||
if (!eventually_true([&] { return permit2_fut.available(); })) {
|
||||
semaphore.broken();
|
||||
@@ -1163,9 +1164,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
|
||||
BOOST_REQUIRE(irh1);
|
||||
|
||||
// Marking p2 as awaits should now allow p3 to be admitted by evicting p1
|
||||
// Marking p2 as awaits should eventually allow p3 to be admitted by evicting p1
|
||||
rd2.mark_as_awaits();
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0);
|
||||
REQUIRE_EVENTUALLY_EQUAL(semaphore.get_stats().waiters, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().need_cpu_permits, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().awaits_permits, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
@@ -1210,7 +1211,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
|
||||
|
||||
semaphore.set_resources({4, 4 * 1024});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0);
|
||||
REQUIRE_EVENTUALLY_EQUAL(semaphore.get_stats().waiters, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(1, 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
permit3_fut.get();
|
||||
@@ -1992,7 +1993,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
ncpu_guard.reset();
|
||||
BOOST_REQUIRE(!handle);
|
||||
REQUIRE_EVENTUALLY_EQUAL(bool(handle), false);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
@@ -2021,8 +2022,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
|
||||
|
||||
ncpu_guard.reset();
|
||||
thread::yield(); // allow debug builds to schedule the fiber evicting the reads again
|
||||
BOOST_REQUIRE(!handle);
|
||||
REQUIRE_EVENTUALLY_EQUAL(bool(handle), false);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, ++evicted_reads);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user