diff --git a/reactor.cc b/reactor.cc index 9b294ba546..10d0ef4088 100644 --- a/reactor.cc +++ b/reactor.cc @@ -239,9 +239,8 @@ void reactor::run() { thread_pool::thread_pool() : _pending(queue_length) , _completed(queue_length) - , _start_eventfd(::eventfd(0, EFD_CLOEXEC)) - , _complete_eventfd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) - , _completion(_complete_eventfd) + , _start_eventfd(0) + , _complete_eventfd(0) , _worker_thread([this] { work(); }) { _worker_thread.detach(); complete(); @@ -250,14 +249,14 @@ thread_pool::thread_pool() void thread_pool::work() { while (true) { uint64_t count; - auto r = ::read(_start_eventfd, &count, sizeof(count)); + auto r = ::read(_start_eventfd.get_read_fd(), &count, sizeof(count)); assert(r == sizeof(count)); auto nr = _pending.consume_all([this] (work_item* wi) { wi->process(); _completed.push(wi); }); count = nr; - r = ::write(_complete_eventfd, &count, sizeof(count)); + r = ::write(_complete_eventfd.get_write_fd(), &count, sizeof(count)); assert(r == sizeof(count)); } } @@ -265,17 +264,12 @@ void thread_pool::work() { void thread_pool::submit_item(thread_pool::work_item* item) { _queue_has_room.wait().then([this, item] { _pending.push(item); - uint64_t one = 1; - auto r = ::write(_start_eventfd, &one, sizeof(one)); - assert(r == sizeof(one)); + _start_eventfd.signal(1); }); } void thread_pool::complete() { - the_reactor.readable(_completion).then([this] { - uint64_t count; - auto r = ::read(_complete_eventfd, &count, sizeof(count)); - assert(r == sizeof(count)); + _complete_eventfd.wait().then([this] (size_t count) { auto nr = _completed.consume_all([this] (work_item* wi) { wi->complete(); delete wi; @@ -285,6 +279,35 @@ void thread_pool::complete() { }); } +int writeable_eventfd::try_create_eventfd(size_t initial) { + assert(size_t(int(initial)) == initial); + int r = ::eventfd(initial, EFD_CLOEXEC); + throw_system_error_on(r == -1); + return r; +} + +void writeable_eventfd::signal(size_t count) { + uint64_t c = count; + auto r = ::write(_fd, &c, sizeof(c)); + assert(r == sizeof(c)); +} + +int readable_eventfd::try_create_eventfd(size_t initial) { + assert(size_t(int(initial)) == initial); + int r = ::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK); + throw_system_error_on(r == -1); + return r; +} + +future readable_eventfd::wait() { + return the_reactor.readable(*_fd._s).then([this] { + uint64_t count; + int r = ::read(_fd.get_fd(), &count, sizeof(count)); + assert(r == sizeof(count)); + return make_ready_future(count); + }); +} + socket_address make_ipv4_address(ipv4_addr addr) { socket_address sa; sa.u.in.sin_family = AF_INET; diff --git a/reactor.hh b/reactor.hh index 9dd10ea036..14eebdf504 100644 --- a/reactor.hh +++ b/reactor.hh @@ -439,14 +439,33 @@ protected: friend class readable_eventfd; }; +class readable_eventfd { + pollable_fd _fd; +public: + explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {} + future wait(); + int get_write_fd() { return _fd.get_fd(); } +private: + static int try_create_eventfd(size_t initial); +}; + +class writeable_eventfd { + int _fd; +public: + explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {} + void signal(size_t nr); + int get_read_fd() { return _fd; } +private: + static int try_create_eventfd(size_t initial); +}; + class thread_pool { static constexpr size_t queue_length = 128; struct work_item; boost::lockfree::queue _pending; boost::lockfree::queue _completed; - int _start_eventfd; - int _complete_eventfd; - pollable_fd_state _completion; + writeable_eventfd _start_eventfd; + readable_eventfd _complete_eventfd; semaphore _queue_has_room = { queue_length }; std::thread _worker_thread; struct work_item { @@ -536,6 +555,7 @@ private: friend class pollable_fd_state; friend class file; friend class thread_pool; + friend class readable_eventfd; }; extern reactor the_reactor;