From 53dcf701b9db3ad6e1267f80f656d6b10764724b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 24 Aug 2014 19:44:52 +0300 Subject: [PATCH] Encapsulate eventfd into classes Typically one side (read or write) of the eventfd is used within the framework, and the other side is used by an external process, so two classes are provided, depending on which side is used in the framework. The new classes are used with the thread pool. --- reactor.cc | 47 +++++++++++++++++++++++++++++++++++------------ reactor.hh | 26 +++++++++++++++++++++++--- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/reactor.cc b/reactor.cc index 9b294ba546..10d0ef4088 100644 --- a/reactor.cc +++ b/reactor.cc @@ -239,9 +239,8 @@ void reactor::run() { thread_pool::thread_pool() : _pending(queue_length) , _completed(queue_length) - , _start_eventfd(::eventfd(0, EFD_CLOEXEC)) - , _complete_eventfd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) - , _completion(_complete_eventfd) + , _start_eventfd(0) + , _complete_eventfd(0) , _worker_thread([this] { work(); }) { _worker_thread.detach(); complete(); @@ -250,14 +249,14 @@ thread_pool::thread_pool() void thread_pool::work() { while (true) { uint64_t count; - auto r = ::read(_start_eventfd, &count, sizeof(count)); + auto r = ::read(_start_eventfd.get_read_fd(), &count, sizeof(count)); assert(r == sizeof(count)); auto nr = _pending.consume_all([this] (work_item* wi) { wi->process(); _completed.push(wi); }); count = nr; - r = ::write(_complete_eventfd, &count, sizeof(count)); + r = ::write(_complete_eventfd.get_write_fd(), &count, sizeof(count)); assert(r == sizeof(count)); } } @@ -265,17 +264,12 @@ void thread_pool::work() { void thread_pool::submit_item(thread_pool::work_item* item) { _queue_has_room.wait().then([this, item] { _pending.push(item); - uint64_t one = 1; - auto r = ::write(_start_eventfd, &one, sizeof(one)); - assert(r == sizeof(one)); + _start_eventfd.signal(1); }); } void thread_pool::complete() { - the_reactor.readable(_completion).then([this] { - uint64_t count; - auto r = ::read(_complete_eventfd, &count, sizeof(count)); - assert(r == sizeof(count)); + _complete_eventfd.wait().then([this] (size_t count) { auto nr = _completed.consume_all([this] (work_item* wi) { wi->complete(); delete wi; @@ -285,6 +279,35 @@ void thread_pool::complete() { }); } +int writeable_eventfd::try_create_eventfd(size_t initial) { + assert(size_t(int(initial)) == initial); + int r = ::eventfd(initial, EFD_CLOEXEC); + throw_system_error_on(r == -1); + return r; +} + +void writeable_eventfd::signal(size_t count) { + uint64_t c = count; + auto r = ::write(_fd, &c, sizeof(c)); + assert(r == sizeof(c)); +} + +int readable_eventfd::try_create_eventfd(size_t initial) { + assert(size_t(int(initial)) == initial); + int r = ::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK); + throw_system_error_on(r == -1); + return r; +} + +future readable_eventfd::wait() { + return the_reactor.readable(*_fd._s).then([this] { + uint64_t count; + int r = ::read(_fd.get_fd(), &count, sizeof(count)); + assert(r == sizeof(count)); + return make_ready_future(count); + }); +} + socket_address make_ipv4_address(ipv4_addr addr) { socket_address sa; sa.u.in.sin_family = AF_INET; diff --git a/reactor.hh b/reactor.hh index 9dd10ea036..14eebdf504 100644 --- a/reactor.hh +++ b/reactor.hh @@ -439,14 +439,33 @@ protected: friend class readable_eventfd; }; +class readable_eventfd { + pollable_fd _fd; +public: + explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {} + future wait(); + int get_write_fd() { return _fd.get_fd(); } +private: + static int try_create_eventfd(size_t initial); +}; + +class writeable_eventfd { + int _fd; +public: + explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {} + void signal(size_t nr); + int get_read_fd() { return _fd; } +private: + static int try_create_eventfd(size_t initial); +}; + class thread_pool { static constexpr size_t queue_length = 128; struct work_item; boost::lockfree::queue _pending; boost::lockfree::queue _completed; - int _start_eventfd; - int _complete_eventfd; - pollable_fd_state _completion; + writeable_eventfd _start_eventfd; + readable_eventfd _complete_eventfd; semaphore _queue_has_room = { queue_length }; std::thread _worker_thread; struct work_item { @@ -536,6 +555,7 @@ private: friend class pollable_fd_state; friend class file; friend class thread_pool; + friend class readable_eventfd; }; extern reactor the_reactor;