From ebd10bf34907a998f047a77054dda0b37a168b05 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 24 Aug 2014 10:34:08 +0300 Subject: [PATCH] Add a thread pool to augment kernel support for aio Some operations cannot be done either non-blockingly or asynchronously, so add a thread pool to execute them in. Currently the thread pool has just one thread. --- Makefile | 1 + reactor.cc | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ reactor.hh | 45 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/Makefile b/Makefile index 6238a525b9..5e6fab2fab 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ opt = $(opt.$(mode)) libs = -laio CXXFLAGS = -std=gnu++1y -g -Wall -Werror $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden $(libs) +CXXFLAGS += -pthread tests = test-reactor fileiotest diff --git a/reactor.cc b/reactor.cc index 31884eaa6c..7eb4fae1d1 100644 --- a/reactor.cc +++ b/reactor.cc @@ -206,6 +206,55 @@ void reactor::run() { } } +thread_pool::thread_pool() + : _pending(queue_length) + , _completed(queue_length) + , _start_eventfd(::eventfd(0, EFD_CLOEXEC)) + , _complete_eventfd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) + , _completion(_complete_eventfd) + , _worker_thread([this] { work(); }) { + _worker_thread.detach(); + complete(); +} + +void thread_pool::work() { + while (true) { + uint64_t count; + auto r = ::read(_start_eventfd, &count, sizeof(count)); + assert(r == sizeof(count)); + auto nr = _pending.consume_all([this] (work_item* wi) { + wi->process(); + _completed.push(wi); + }); + count = nr; + r = ::write(_complete_eventfd, &count, sizeof(count)); + assert(r == sizeof(count)); + } +} + +void thread_pool::submit_item(thread_pool::work_item* item) { + _queue_has_room.wait().then([this, item] { + _pending.push(item); + uint64_t one = 1; + auto r = ::write(_start_eventfd, &one, sizeof(one)); + assert(r == sizeof(one)); + }); +} + +void thread_pool::complete() { + the_reactor.readable(_completion).then([this] { + uint64_t count; + auto r = ::read(_complete_eventfd, &count, sizeof(count)); + assert(r == sizeof(count)); + auto nr = _completed.consume_all([this] (work_item* wi) { + wi->complete(); + delete wi; + }); + _queue_has_room.signal(nr); + complete(); + }); +} + socket_address make_ipv4_address(ipv4_addr addr) { socket_address sa; sa.u.in.sin_family = AF_INET; diff --git a/reactor.hh b/reactor.hh index 21025f5d6e..0da37cb851 100644 --- a/reactor.hh +++ b/reactor.hh @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include #include "apply.hh" #include "sstring.hh" @@ -345,6 +348,46 @@ public: friend class pollable_fd; }; +class thread_pool { + static constexpr size_t queue_length = 128; + struct work_item; + boost::lockfree::queue _pending; + boost::lockfree::queue _completed; + int _start_eventfd; + int _complete_eventfd; + pollable_fd_state _completion; + semaphore _queue_has_room = { queue_length }; + std::thread _worker_thread; + struct work_item { + virtual ~work_item() {} + virtual void process() = 0; + virtual void complete() = 0; + }; + template + struct work_item_returning : work_item { + promise _promise; + boost::optional _result; + Func _func; + work_item_returning(Func&& func) : _func(std::move(func)) {} + virtual void process() override { _result = _func(); } + virtual void complete() override { _promise.set_value(std::move(*_result)); } + future get_future() { return _promise.get_future(); } + }; +public: + thread_pool(); + template + future submit(Func func) { + auto wi = new work_item_returning(std::move(func)); + auto fut = wi->get_future(); + submit_item(wi); + return fut; + } +private: + void work(); + void complete(); + void submit_item(work_item* wi); +}; + class reactor { static constexpr size_t max_aio = 128; public: @@ -354,6 +397,7 @@ public: io_context_t _io_context; semaphore _io_context_available; std::vector> _pending_tasks; + thread_pool _thread_pool; private: future<> get_epoll_future(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int event); void complete_epoll_event(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int events, int event); @@ -400,6 +444,7 @@ private: friend class pollable_fd; friend class pollable_fd_state; friend class file; + friend class thread_pool; }; extern reactor the_reactor;