diff --git a/core/reactor.cc b/core/reactor.cc index 83e71ccfc7..f287e32115 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -323,19 +323,30 @@ reactor::signal_handler::signal_handler(int signo) throw_system_error_on(r == -1); } -thread_pool::thread_pool() +inter_thread_work_queue::inter_thread_work_queue() : _pending(queue_length) , _completed(queue_length) , _start_eventfd(0) - , _complete_eventfd(0) - , _worker_thread([this] { work(); }) { + , _complete_eventfd(0) { complete(); } -thread_pool::~thread_pool() { - _stopped.store(true, std::memory_order_relaxed); - _start_eventfd.signal(1); - _worker_thread.join(); +void inter_thread_work_queue::submit_item(inter_thread_work_queue::work_item* item) { + _queue_has_room.wait().then([this, item] { + _pending.push(item); + _start_eventfd.signal(1); + }); +} + +void inter_thread_work_queue::complete() { + _complete_eventfd.wait().then([this] (size_t count) { + auto nr = _completed.consume_all([this] (work_item* wi) { + wi->complete(); + delete wi; + }); + _queue_has_room.signal(nr); + complete(); + }); } void thread_pool::work() { @@ -345,37 +356,25 @@ void thread_pool::work() { throw_system_error_on(r == -1); while (true) { uint64_t count; - auto r = ::read(_start_eventfd.get_read_fd(), &count, sizeof(count)); + auto r = ::read(inter_thread_wq._start_eventfd.get_read_fd(), &count, sizeof(count)); assert(r == sizeof(count)); if (_stopped.load(std::memory_order_relaxed)) { break; } - auto nr = _pending.consume_all([this] (work_item* wi) { + auto nr = inter_thread_wq._pending.consume_all([this] (inter_thread_work_queue::work_item* wi) { wi->process(); - _completed.push(wi); + inter_thread_wq._completed.push(wi); }); count = nr; - r = ::write(_complete_eventfd.get_write_fd(), &count, sizeof(count)); + r = ::write(inter_thread_wq._complete_eventfd.get_write_fd(), &count, sizeof(count)); assert(r == sizeof(count)); } } -void thread_pool::submit_item(thread_pool::work_item* item) { - _queue_has_room.wait().then([this, item] { - _pending.push(item); - _start_eventfd.signal(1); - }); -} - -void thread_pool::complete() { - _complete_eventfd.wait().then([this] (size_t count) { - auto nr = _completed.consume_all([this] (work_item* wi) { - wi->complete(); - delete wi; - }); - _queue_has_room.signal(nr); - complete(); - }); +thread_pool::~thread_pool() { + _stopped.store(true, std::memory_order_relaxed); + inter_thread_wq._start_eventfd.signal(1); + _worker_thread.join(); } file_desc writeable_eventfd::try_create_eventfd(size_t initial) { diff --git a/core/reactor.hh b/core/reactor.hh index b2cb2dde3a..7086be254b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -300,16 +300,16 @@ private: static file_desc try_create_eventfd(size_t initial); }; -class thread_pool { +class thread_pool; + +class inter_thread_work_queue { static constexpr size_t queue_length = 128; struct work_item; - std::atomic _stopped = { false }; boost::lockfree::queue _pending; boost::lockfree::queue _completed; writeable_eventfd _start_eventfd; readable_eventfd _complete_eventfd; semaphore _queue_has_room = { queue_length }; - std::thread _worker_thread; struct work_item { virtual ~work_item() {} virtual void process() = 0; @@ -326,8 +326,7 @@ class thread_pool { future get_future() { return _promise.get_future(); } }; public: - thread_pool(); - ~thread_pool(); + inter_thread_work_queue(); template future submit(Func func) { auto wi = new work_item_returning(std::move(func)); @@ -339,6 +338,21 @@ private: void work(); void complete(); void submit_item(work_item* wi); + + friend class thread_pool; +}; + +class thread_pool { + inter_thread_work_queue inter_thread_wq; + std::thread _worker_thread; + std::atomic _stopped = { false }; +public: + thread_pool() : _worker_thread([this] { work(); }) {} + ~thread_pool(); + template + future submit(Func func) {return inter_thread_wq.submit(std::move(func));} +private: + void work(); }; class reactor {