From b10da4fa1d01afe2b08635adc78d68c8634ae799 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 18 Aug 2014 10:56:36 +0300 Subject: [PATCH] Switch epoll_add_int()/epoll_add_out() to future This integrates better with the rest of the framework, and allows adding specializations such as likely_readable() easily. --- reactor.cc | 18 ++++++++++-------- reactor.hh | 24 ++++++++++++------------ 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/reactor.cc b/reactor.cc index db7175b6d6..10ca3a1643 100644 --- a/reactor.cc +++ b/reactor.cc @@ -19,28 +19,28 @@ reactor::~reactor() { ::close(_epollfd); } -void reactor::epoll_add_in(pollable_fd_state& pfd, std::unique_ptr t) { +future reactor::readable(pollable_fd_state& pfd) { auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; pfd.events |= EPOLLIN; - assert(!pfd.pollin); - pfd.pollin = std::move(t); + pfd.pollin = promise(); ::epoll_event eevt; eevt.events = pfd.events; eevt.data.ptr = &pfd; int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt); assert(r == 0); + return pfd.pollin.get_future(); } -void reactor::epoll_add_out(pollable_fd_state& pfd, std::unique_ptr t) { +future reactor::writeable(pollable_fd_state& pfd) { auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; pfd.events |= EPOLLOUT; - assert(!pfd.pollout); - pfd.pollout = std::move(t); + pfd.pollout = promise(); ::epoll_event eevt; eevt.events = pfd.events; eevt.data.ptr = &pfd; int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt); assert(r == 0); + return pfd.pollout.get_future(); } void reactor::forget(pollable_fd_state& fd) { @@ -84,11 +84,13 @@ void reactor::run() { std::unique_ptr t_in, t_out; if (events & EPOLLIN) { pfd->events &= ~EPOLLIN; - add_task(std::move(pfd->pollin)); + pfd->pollin.set_value(); + pfd->pollin = promise(); } if (events & EPOLLOUT) { pfd->events &= ~EPOLLOUT; - add_task(std::move(pfd->pollout)); + pfd->pollout.set_value(); + pfd->pollout = promise(); } evt.events = pfd->events; auto op = evt.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; diff --git a/reactor.hh b/reactor.hh index e9ae6db93f..0282d49d88 100644 --- a/reactor.hh +++ b/reactor.hh @@ -429,8 +429,8 @@ public: io_context_t _io_context; std::vector> _pending_tasks; private: - void epoll_add_in(pollable_fd_state& fd, std::unique_ptr t); - void epoll_add_out(pollable_fd_state& fd, std::unique_ptr t); + future readable(pollable_fd_state& fd); + future writeable(pollable_fd_state& fd); void forget(pollable_fd_state& fd); void abort_on_error(int ret); public: @@ -471,8 +471,8 @@ public: void operator=(const pollable_fd_state&) = delete; int fd; int events = 0; - std::unique_ptr pollin; - std::unique_ptr pollout; + promise pollin; + promise pollout; friend class reactor; friend class pollable_fd; }; @@ -599,13 +599,13 @@ future reactor::accept(pollable_fd_state& listenfd) { promise pr; future fut = pr.get_future(); - epoll_add_in(listenfd, make_task([this, pr = std::move(pr), lfd = listenfd.fd] () mutable { + readable(listenfd).then([this, pr = std::move(pr), lfd = listenfd.fd] () mutable { socket_address sa; socklen_t sl = sizeof(&sa.u.sas); int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC); assert(fd != -1); pr.set_value(accept_result{pollable_fd(fd), sa}); - })); + }); return fut; } @@ -614,11 +614,11 @@ future reactor::read_some(pollable_fd_state& fd, void* buffer, size_t len) { promise pr; auto fut = pr.get_future(); - epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, buffer, len] () mutable { + readable(fd).then([pr = std::move(pr), rfd = fd.fd, buffer, len] () mutable { ssize_t r = ::recv(rfd, buffer, len, 0); assert(r != -1); pr.set_value(r); - })); + }); return fut; } @@ -627,14 +627,14 @@ future reactor::read_some(pollable_fd_state& fd, const std::vector& iov) { promise pr; auto fut = pr.get_future(); - epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable { + readable(fd).then([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable { ::msghdr mh = {}; mh.msg_iov = &iov[0]; mh.msg_iovlen = iov.size(); ssize_t r = ::recvmsg(rfd, &mh, 0); assert(r != -1); pr.set_value(r); - })); + }); return fut; } @@ -643,11 +643,11 @@ future reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) { promise pr; auto fut = pr.get_future(); - epoll_add_out(fd, make_task([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable { + writeable(fd).then([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable { ssize_t r = ::send(sfd, buffer, len, 0); assert(r != -1); pr.set_value(r); - })); + }); return fut; }