From cfeec3799e616b3ad5fd67b4e4819ad030b045f2 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 21 Aug 2014 13:45:09 +0300 Subject: [PATCH] Add a an asynchronous semaphore class --- reactor.hh | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/reactor.hh b/reactor.hh index ffcab0c793..9211d1b6e4 100644 --- a/reactor.hh +++ b/reactor.hh @@ -22,6 +22,7 @@ #include #include #include +#include #include "apply.hh" class socket_address; @@ -275,10 +276,51 @@ future make_ready_future(T&&... value) { return future(s); } +struct free_deleter { + void operator()(void* p) { ::free(p); } +}; + +template +inline +std::unique_ptr allocate_aligned_buffer(size_t size, size_t align) { + static_assert(sizeof(CharType) == 1, "must allocate byte type"); + void* ret; + auto r = posix_memalign(&ret, align, size); + assert(r == 0); + return std::unique_ptr(reinterpret_cast(ret)); +} + struct listen_options { bool reuse_address = false; }; +class semaphore { +private: + size_t _count; + std::list, size_t>> _wait_list; +public: + semaphore(size_t count = 1) : _count(count) {} + future<> wait(size_t nr = 1) { + if (_count >= nr && _wait_list.empty()) { + _count -= nr; + return make_ready_future<>(); + } + promise<> pr; + auto fut = pr.get_future(); + _wait_list.push_back({ std::move(pr), nr }); + return fut; + } + void signal(size_t nr = 1) { + _count += nr; + while (!_wait_list.empty() && _wait_list.front().second <= _count) { + auto& x = _wait_list.front(); + _count -= x.second; + x.first.set_value(); + _wait_list.pop_front(); + } + } +}; + class pollable_fd_state { public: struct speculation {