Add a thread pool to augment kernel support for aio

Some operations cannot be done either non-blockingly or asynchronously, so
add a thread pool to execute them in.

Currently the thread pool has just one thread.
This commit is contained in:
Avi Kivity
2014-08-24 10:34:08 +03:00
parent 7f609a1959
commit ebd10bf349
3 changed files with 95 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ opt = $(opt.$(mode))
libs = -laio
CXXFLAGS = -std=gnu++1y -g -Wall -Werror $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden $(libs)
CXXFLAGS += -pthread
tests = test-reactor fileiotest

View File

@@ -206,6 +206,55 @@ void reactor::run() {
}
}
thread_pool::thread_pool()
: _pending(queue_length)
, _completed(queue_length)
, _start_eventfd(::eventfd(0, EFD_CLOEXEC))
, _complete_eventfd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
, _completion(_complete_eventfd)
, _worker_thread([this] { work(); }) {
_worker_thread.detach();
complete();
}
void thread_pool::work() {
while (true) {
uint64_t count;
auto r = ::read(_start_eventfd, &count, sizeof(count));
assert(r == sizeof(count));
auto nr = _pending.consume_all([this] (work_item* wi) {
wi->process();
_completed.push(wi);
});
count = nr;
r = ::write(_complete_eventfd, &count, sizeof(count));
assert(r == sizeof(count));
}
}
void thread_pool::submit_item(thread_pool::work_item* item) {
_queue_has_room.wait().then([this, item] {
_pending.push(item);
uint64_t one = 1;
auto r = ::write(_start_eventfd, &one, sizeof(one));
assert(r == sizeof(one));
});
}
void thread_pool::complete() {
the_reactor.readable(_completion).then([this] {
uint64_t count;
auto r = ::read(_complete_eventfd, &count, sizeof(count));
assert(r == sizeof(count));
auto nr = _completed.consume_all([this] (work_item* wi) {
wi->complete();
delete wi;
});
_queue_has_room.signal(nr);
complete();
});
}
socket_address make_ipv4_address(ipv4_addr addr) {
socket_address sa;
sa.u.in.sin_family = AF_INET;

View File

@@ -23,6 +23,9 @@
#include <vector>
#include <algorithm>
#include <list>
#include <thread>
#include <boost/lockfree/queue.hpp>
#include <boost/optional.hpp>
#include "apply.hh"
#include "sstring.hh"
@@ -345,6 +348,46 @@ public:
friend class pollable_fd;
};
class thread_pool {
static constexpr size_t queue_length = 128;
struct work_item;
boost::lockfree::queue<work_item*> _pending;
boost::lockfree::queue<work_item*> _completed;
int _start_eventfd;
int _complete_eventfd;
pollable_fd_state _completion;
semaphore _queue_has_room = { queue_length };
std::thread _worker_thread;
struct work_item {
virtual ~work_item() {}
virtual void process() = 0;
virtual void complete() = 0;
};
template <typename T, typename Func>
struct work_item_returning : work_item {
promise<T> _promise;
boost::optional<T> _result;
Func _func;
work_item_returning(Func&& func) : _func(std::move(func)) {}
virtual void process() override { _result = _func(); }
virtual void complete() override { _promise.set_value(std::move(*_result)); }
future<T> get_future() { return _promise.get_future(); }
};
public:
thread_pool();
template <typename T, typename Func>
future<T> submit(Func func) {
auto wi = new work_item_returning<T, Func>(std::move(func));
auto fut = wi->get_future();
submit_item(wi);
return fut;
}
private:
void work();
void complete();
void submit_item(work_item* wi);
};
class reactor {
static constexpr size_t max_aio = 128;
public:
@@ -354,6 +397,7 @@ public:
io_context_t _io_context;
semaphore _io_context_available;
std::vector<std::unique_ptr<task>> _pending_tasks;
thread_pool _thread_pool;
private:
future<> get_epoll_future(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int event);
void complete_epoll_event(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int events, int event);
@@ -400,6 +444,7 @@ private:
friend class pollable_fd;
friend class pollable_fd_state;
friend class file;
friend class thread_pool;
};
extern reactor the_reactor;