mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-23 01:50:35 +00:00
Switch epoll_add_int()/epoll_add_out() to future<void>
This integrates better with the rest of the framework, and allows adding specializations such as likely_readable() easily.
This commit is contained in:
18
reactor.cc
18
reactor.cc
@@ -19,28 +19,28 @@ reactor::~reactor() {
|
||||
::close(_epollfd);
|
||||
}
|
||||
|
||||
void reactor::epoll_add_in(pollable_fd_state& pfd, std::unique_ptr<task> t) {
|
||||
future<void> reactor::readable(pollable_fd_state& pfd) {
|
||||
auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
|
||||
pfd.events |= EPOLLIN;
|
||||
assert(!pfd.pollin);
|
||||
pfd.pollin = std::move(t);
|
||||
pfd.pollin = promise<void>();
|
||||
::epoll_event eevt;
|
||||
eevt.events = pfd.events;
|
||||
eevt.data.ptr = &pfd;
|
||||
int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt);
|
||||
assert(r == 0);
|
||||
return pfd.pollin.get_future();
|
||||
}
|
||||
|
||||
void reactor::epoll_add_out(pollable_fd_state& pfd, std::unique_ptr<task> t) {
|
||||
future<void> reactor::writeable(pollable_fd_state& pfd) {
|
||||
auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
|
||||
pfd.events |= EPOLLOUT;
|
||||
assert(!pfd.pollout);
|
||||
pfd.pollout = std::move(t);
|
||||
pfd.pollout = promise<void>();
|
||||
::epoll_event eevt;
|
||||
eevt.events = pfd.events;
|
||||
eevt.data.ptr = &pfd;
|
||||
int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt);
|
||||
assert(r == 0);
|
||||
return pfd.pollout.get_future();
|
||||
}
|
||||
|
||||
void reactor::forget(pollable_fd_state& fd) {
|
||||
@@ -84,11 +84,13 @@ void reactor::run() {
|
||||
std::unique_ptr<task> t_in, t_out;
|
||||
if (events & EPOLLIN) {
|
||||
pfd->events &= ~EPOLLIN;
|
||||
add_task(std::move(pfd->pollin));
|
||||
pfd->pollin.set_value();
|
||||
pfd->pollin = promise<void>();
|
||||
}
|
||||
if (events & EPOLLOUT) {
|
||||
pfd->events &= ~EPOLLOUT;
|
||||
add_task(std::move(pfd->pollout));
|
||||
pfd->pollout.set_value();
|
||||
pfd->pollout = promise<void>();
|
||||
}
|
||||
evt.events = pfd->events;
|
||||
auto op = evt.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
|
||||
|
||||
24
reactor.hh
24
reactor.hh
@@ -429,8 +429,8 @@ public:
|
||||
io_context_t _io_context;
|
||||
std::vector<std::unique_ptr<task>> _pending_tasks;
|
||||
private:
|
||||
void epoll_add_in(pollable_fd_state& fd, std::unique_ptr<task> t);
|
||||
void epoll_add_out(pollable_fd_state& fd, std::unique_ptr<task> t);
|
||||
future<void> readable(pollable_fd_state& fd);
|
||||
future<void> writeable(pollable_fd_state& fd);
|
||||
void forget(pollable_fd_state& fd);
|
||||
void abort_on_error(int ret);
|
||||
public:
|
||||
@@ -471,8 +471,8 @@ public:
|
||||
void operator=(const pollable_fd_state&) = delete;
|
||||
int fd;
|
||||
int events = 0;
|
||||
std::unique_ptr<task> pollin;
|
||||
std::unique_ptr<task> pollout;
|
||||
promise<void> pollin;
|
||||
promise<void> pollout;
|
||||
friend class reactor;
|
||||
friend class pollable_fd;
|
||||
};
|
||||
@@ -599,13 +599,13 @@ future<accept_result>
|
||||
reactor::accept(pollable_fd_state& listenfd) {
|
||||
promise<accept_result> pr;
|
||||
future<accept_result> fut = pr.get_future();
|
||||
epoll_add_in(listenfd, make_task([this, pr = std::move(pr), lfd = listenfd.fd] () mutable {
|
||||
readable(listenfd).then([this, pr = std::move(pr), lfd = listenfd.fd] () mutable {
|
||||
socket_address sa;
|
||||
socklen_t sl = sizeof(&sa.u.sas);
|
||||
int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||
assert(fd != -1);
|
||||
pr.set_value(accept_result{pollable_fd(fd), sa});
|
||||
}));
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -614,11 +614,11 @@ future<size_t>
|
||||
reactor::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, buffer, len] () mutable {
|
||||
readable(fd).then([pr = std::move(pr), rfd = fd.fd, buffer, len] () mutable {
|
||||
ssize_t r = ::recv(rfd, buffer, len, 0);
|
||||
assert(r != -1);
|
||||
pr.set_value(r);
|
||||
}));
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -627,14 +627,14 @@ future<size_t>
|
||||
reactor::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
epoll_add_in(fd, make_task([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable {
|
||||
readable(fd).then([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable {
|
||||
::msghdr mh = {};
|
||||
mh.msg_iov = &iov[0];
|
||||
mh.msg_iovlen = iov.size();
|
||||
ssize_t r = ::recvmsg(rfd, &mh, 0);
|
||||
assert(r != -1);
|
||||
pr.set_value(r);
|
||||
}));
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
|
||||
@@ -643,11 +643,11 @@ future<size_t>
|
||||
reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
epoll_add_out(fd, make_task([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable {
|
||||
writeable(fd).then([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable {
|
||||
ssize_t r = ::send(sfd, buffer, len, 0);
|
||||
assert(r != -1);
|
||||
pr.set_value(r);
|
||||
}));
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user