reader_concurrency_semaphore: remove wait_admission and consume_resources()

Permits are now created with `make_permit()` and code is using the
permit to do all resource consumption tracking and admission waiting, so
we can remove these from the semaphore. This allows us to remove some
now unused code from the permit as well, namely the `base_cost` which
was used to track the resource amount the permit was created with. Now
this amount is also tracked with a `resource_units` RAII object, returned
from `reader_permit::wait_admission()`, so it can be removed. Curiously,
this reduces the reader permit to be glorified semaphore pointer. Still,
the permit abstraction is worth keeping, because it allows us to make
changes to how the resource tracking part of the semaphore works,
without having to change the huge amount of code sites passing around
the permit.
This commit is contained in:
Botond Dénes
2020-05-18 12:31:42 +03:00
parent a08467da29
commit f417b9a3ea
3 changed files with 18 additions and 57 deletions

View File

@@ -26,13 +26,6 @@
#include "utils/exceptions.hh"
reader_permit::impl::impl(reader_concurrency_semaphore& semaphore, reader_resources base_cost) : semaphore(semaphore), base_cost(base_cost) {
}
reader_permit::impl::~impl() {
semaphore.signal(base_cost);
}
reader_permit::resource_units::resource_units(reader_concurrency_semaphore& semaphore, reader_resources res) noexcept
: _semaphore(&semaphore), _resources(res) {
}
@@ -69,41 +62,32 @@ void reader_permit::resource_units::reset(reader_resources res) {
_resources = res;
}
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, reader_resources base_cost)
: _impl(make_lw_shared<reader_permit::impl>(semaphore, base_cost)) {
}
reader_concurrency_semaphore* reader_permit::semaphore() {
return _impl ? &_impl->semaphore : nullptr;
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore)
: _semaphore(&semaphore) {
}
future<reader_permit::resource_units> reader_permit::wait_admission(size_t memory, db::timeout_clock::time_point timeout) {
if (!_impl) {
if (!_semaphore) {
return make_ready_future<resource_units>(resource_units());
}
return _impl->semaphore.do_wait_admission(memory, timeout);
return _semaphore->do_wait_admission(memory, timeout);
}
reader_permit::resource_units reader_permit::consume_memory(size_t memory) {
if (!_impl) {
if (!_semaphore) {
return resource_units();
}
const auto res = reader_resources{0, ssize_t(memory)};
_impl->semaphore.consume(res);
return resource_units(_impl->semaphore, res);
_semaphore->consume(res);
return resource_units(*_semaphore, res);
}
reader_permit::resource_units reader_permit::consume_resources(reader_resources res) {
if (!_impl) {
if (!_semaphore) {
return resource_units();
}
_impl->semaphore.consume(res);
return resource_units(_impl->semaphore, res);
}
void reader_permit::release() {
_impl->semaphore.signal(_impl->base_cost);
_impl->base_cost = {};
_semaphore->consume(res);
return resource_units(*_semaphore, res);
}
reader_permit no_reader_permit() {
@@ -198,20 +182,8 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
return fut;
}
future<reader_permit> reader_concurrency_semaphore::wait_admission(size_t memory, db::timeout_clock::time_point timeout) {
return do_wait_admission(memory, timeout).then([this] (reader_permit::resource_units res) {
auto underlying_res = std::exchange(res._resources, {});
return reader_permit(*this, underlying_res);
});
}
reader_permit reader_concurrency_semaphore::consume_resources(resources r) {
_resources -= r;
return reader_permit(*this, r);
}
reader_permit reader_concurrency_semaphore::make_permit() {
return reader_permit(*this, {});
return reader_permit(*this);
}
void reader_concurrency_semaphore::broken(std::exception_ptr ex) {

View File

@@ -191,11 +191,6 @@ public:
return _inactive_read_stats;
}
future<reader_permit> wait_admission(size_t memory, db::timeout_clock::time_point timeout);
/// Consume the specific amount of resources without waiting.
reader_permit consume_resources(resources r);
reader_permit make_permit();
const resources available_resources() const {

View File

@@ -70,14 +70,6 @@ class reader_concurrency_semaphore;
/// `resource_units` RAII object that should be held onto while the respective
/// resources are in use.
class reader_permit {
struct impl {
reader_concurrency_semaphore& semaphore;
reader_resources base_cost;
impl(reader_concurrency_semaphore& semaphore, reader_resources base_cost);
~impl();
};
friend reader_permit no_reader_permit();
friend class reader_concurrency_semaphore;
@@ -102,22 +94,24 @@ public:
};
private:
lw_shared_ptr<impl> _impl;
reader_concurrency_semaphore* _semaphore;
private:
reader_permit() = default;
reader_permit(reader_concurrency_semaphore& semaphore, reader_resources base_cost);
explicit reader_permit(reader_concurrency_semaphore& semaphore);
public:
bool operator==(const reader_permit& o) const {
return _impl == o._impl;
return _semaphore == o._semaphore;
}
operator bool() const {
return bool(_impl);
return bool(_semaphore);
}
reader_concurrency_semaphore* semaphore();
reader_concurrency_semaphore* semaphore() {
return _semaphore;
}
future<resource_units> wait_admission(size_t memory, db::timeout_clock::time_point timeout);