reader_permit: only forward resource consumption to semaphore after admission
In the next patches we plan to start tracking the memory consumption of the actual allocations made by the circular_buffer<mutation_fragment>, as well as the memory consumed by the mutation fragments. This means that readers will start consuming memory off the permit right after being constructed. Ironically this can prevent the reader from being admitted, due to its own pre-admission memory consumption. To prevent this hold on forwarding the memory consumption to the semaphore, until the permit is actually admitted.
This commit is contained in:
@@ -70,6 +70,7 @@ void reader_permit::resource_units::reset(reader_resources res) {
|
||||
class reader_permit::impl {
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
reader_resources _resources;
|
||||
bool _admitted = false;
|
||||
|
||||
public:
|
||||
impl(reader_concurrency_semaphore& semaphore) : _semaphore(semaphore) { }
|
||||
@@ -84,14 +85,23 @@ public:
|
||||
return _semaphore;
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
_admitted = true;
|
||||
_semaphore.consume(_resources);
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
_resources += res;
|
||||
_semaphore.consume(res);
|
||||
if (_admitted) {
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
}
|
||||
|
||||
void signal(reader_resources res) {
|
||||
_resources -= res;
|
||||
_semaphore.signal(res);
|
||||
if (_admitted) {
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -99,6 +109,10 @@ reader_permit::reader_permit(reader_concurrency_semaphore& semaphore)
|
||||
: _impl(make_shared<impl>(semaphore)) {
|
||||
}
|
||||
|
||||
void reader_permit::on_admission() {
|
||||
_impl->on_admission();
|
||||
}
|
||||
|
||||
reader_permit::~reader_permit() {
|
||||
}
|
||||
|
||||
@@ -131,6 +145,7 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
|
||||
auto& x = _wait_list.front();
|
||||
try {
|
||||
x.permit.on_admission();
|
||||
x.pr.set_value(reader_permit::resource_units(std::move(x.permit), x.res));
|
||||
} catch (...) {
|
||||
x.pr.set_exception(std::current_exception());
|
||||
@@ -225,6 +240,7 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
|
||||
--_stats.inactive_reads;
|
||||
}
|
||||
if (may_proceed(r)) {
|
||||
permit.on_admission();
|
||||
return make_ready_future<reader_permit::resource_units>(reader_permit::resource_units(std::move(permit), r));
|
||||
}
|
||||
promise<reader_permit::resource_units> pr;
|
||||
|
||||
@@ -90,6 +90,8 @@ private:
|
||||
private:
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore);
|
||||
|
||||
void on_admission();
|
||||
|
||||
public:
|
||||
~reader_permit();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user