reader_permit: only forward resource consumption to semaphore after admission

In the next patches we plan to start tracking the memory consumption of
the actual allocations made by the circular_buffer<mutation_fragment>,
as well as the memory consumed by the mutation fragments.
This means that readers will start consuming memory off the permit right
after being constructed. Ironically this can prevent the reader from
being admitted, due to its own pre-admission memory consumption. To
prevent this hold on forwarding the memory consumption to the semaphore,
until the permit is actually admitted.
This commit is contained in:
Botond Dénes
2020-09-15 11:20:44 +03:00
parent e1eee0dc34
commit 4c8ab10563
2 changed files with 20 additions and 2 deletions

View File

@@ -70,6 +70,7 @@ void reader_permit::resource_units::reset(reader_resources res) {
class reader_permit::impl {
reader_concurrency_semaphore& _semaphore;
reader_resources _resources;
bool _admitted = false;
public:
impl(reader_concurrency_semaphore& semaphore) : _semaphore(semaphore) { }
@@ -84,14 +85,23 @@ public:
return _semaphore;
}
void on_admission() {
_admitted = true;
_semaphore.consume(_resources);
}
void consume(reader_resources res) {
_resources += res;
_semaphore.consume(res);
if (_admitted) {
_semaphore.consume(res);
}
}
void signal(reader_resources res) {
_resources -= res;
_semaphore.signal(res);
if (_admitted) {
_semaphore.signal(res);
}
}
};
@@ -99,6 +109,10 @@ reader_permit::reader_permit(reader_concurrency_semaphore& semaphore)
: _impl(make_shared<impl>(semaphore)) {
}
void reader_permit::on_admission() {
_impl->on_admission();
}
reader_permit::~reader_permit() {
}
@@ -131,6 +145,7 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
auto& x = _wait_list.front();
try {
x.permit.on_admission();
x.pr.set_value(reader_permit::resource_units(std::move(x.permit), x.res));
} catch (...) {
x.pr.set_exception(std::current_exception());
@@ -225,6 +240,7 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
--_stats.inactive_reads;
}
if (may_proceed(r)) {
permit.on_admission();
return make_ready_future<reader_permit::resource_units>(reader_permit::resource_units(std::move(permit), r));
}
promise<reader_permit::resource_units> pr;

View File

@@ -90,6 +90,8 @@ private:
private:
explicit reader_permit(reader_concurrency_semaphore& semaphore);
void on_admission();
public:
~reader_permit();