Encapsulate eventfd into classes

Typically one side (read or write) of the eventfd is used within the
framework, and the other side is used by an external process, so two
classes are provided, depending on which side is used in the framework.

The new classes are used with the thread pool.
This commit is contained in:
Avi Kivity
2014-08-24 19:44:52 +03:00
parent 745a420aea
commit 53dcf701b9
2 changed files with 58 additions and 15 deletions

View File

@@ -239,9 +239,8 @@ void reactor::run() {
thread_pool::thread_pool()
: _pending(queue_length)
, _completed(queue_length)
, _start_eventfd(::eventfd(0, EFD_CLOEXEC))
, _complete_eventfd(::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
, _completion(_complete_eventfd)
, _start_eventfd(0)
, _complete_eventfd(0)
, _worker_thread([this] { work(); }) {
_worker_thread.detach();
complete();
@@ -250,14 +249,14 @@ thread_pool::thread_pool()
void thread_pool::work() {
while (true) {
uint64_t count;
auto r = ::read(_start_eventfd, &count, sizeof(count));
auto r = ::read(_start_eventfd.get_read_fd(), &count, sizeof(count));
assert(r == sizeof(count));
auto nr = _pending.consume_all([this] (work_item* wi) {
wi->process();
_completed.push(wi);
});
count = nr;
r = ::write(_complete_eventfd, &count, sizeof(count));
r = ::write(_complete_eventfd.get_write_fd(), &count, sizeof(count));
assert(r == sizeof(count));
}
}
@@ -265,17 +264,12 @@ void thread_pool::work() {
void thread_pool::submit_item(thread_pool::work_item* item) {
_queue_has_room.wait().then([this, item] {
_pending.push(item);
uint64_t one = 1;
auto r = ::write(_start_eventfd, &one, sizeof(one));
assert(r == sizeof(one));
_start_eventfd.signal(1);
});
}
void thread_pool::complete() {
the_reactor.readable(_completion).then([this] {
uint64_t count;
auto r = ::read(_complete_eventfd, &count, sizeof(count));
assert(r == sizeof(count));
_complete_eventfd.wait().then([this] (size_t count) {
auto nr = _completed.consume_all([this] (work_item* wi) {
wi->complete();
delete wi;
@@ -285,6 +279,35 @@ void thread_pool::complete() {
});
}
int writeable_eventfd::try_create_eventfd(size_t initial) {
assert(size_t(int(initial)) == initial);
int r = ::eventfd(initial, EFD_CLOEXEC);
throw_system_error_on(r == -1);
return r;
}
void writeable_eventfd::signal(size_t count) {
uint64_t c = count;
auto r = ::write(_fd, &c, sizeof(c));
assert(r == sizeof(c));
}
int readable_eventfd::try_create_eventfd(size_t initial) {
assert(size_t(int(initial)) == initial);
int r = ::eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK);
throw_system_error_on(r == -1);
return r;
}
future<size_t> readable_eventfd::wait() {
return the_reactor.readable(*_fd._s).then([this] {
uint64_t count;
int r = ::read(_fd.get_fd(), &count, sizeof(count));
assert(r == sizeof(count));
return make_ready_future<size_t>(count);
});
}
socket_address make_ipv4_address(ipv4_addr addr) {
socket_address sa;
sa.u.in.sin_family = AF_INET;

View File

@@ -439,14 +439,33 @@ protected:
friend class readable_eventfd;
};
class readable_eventfd {
pollable_fd _fd;
public:
explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
future<size_t> wait();
int get_write_fd() { return _fd.get_fd(); }
private:
static int try_create_eventfd(size_t initial);
};
class writeable_eventfd {
int _fd;
public:
explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
void signal(size_t nr);
int get_read_fd() { return _fd; }
private:
static int try_create_eventfd(size_t initial);
};
class thread_pool {
static constexpr size_t queue_length = 128;
struct work_item;
boost::lockfree::queue<work_item*> _pending;
boost::lockfree::queue<work_item*> _completed;
int _start_eventfd;
int _complete_eventfd;
pollable_fd_state _completion;
writeable_eventfd _start_eventfd;
readable_eventfd _complete_eventfd;
semaphore _queue_has_room = { queue_length };
std::thread _worker_thread;
struct work_item {
@@ -536,6 +555,7 @@ private:
friend class pollable_fd_state;
friend class file;
friend class thread_pool;
friend class readable_eventfd;
};
extern reactor the_reactor;