commit 193ac5919df51df2366604fc5cbfd73d15bb52f5 Author: Avi Kivity Date: Sun Aug 10 08:56:33 2014 +0300 Initial commit diff --git a/Makefile b/Makefile new file mode 100644 index 0000000000..16ff470ba6 --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ + + +CXXFLAGS = -std=gnu++1y -g -Wall -O2 -MD -MT $@ -MP -flto + +tests = test-reactor + +all: seastar $(tests) + +seastar: main.o reactor.o + $(CXX) $(CXXFLAGS) -o $@ $^ + +test-reactor: test-reactor.o reactor.o + $(CXX) $(CXXFLAGS) -o $@ $^ + + +-include *.d diff --git a/main.cc b/main.cc new file mode 100644 index 0000000000..e79623840d --- /dev/null +++ b/main.cc @@ -0,0 +1,18 @@ +/* + * main.cc + * + * Created on: Aug 1, 2014 + * Author: avi + */ + + +#include "reactor.hh" + +int main(int ac, char** av) +{ + reactor r; + r.run(); + return 0; +} + + diff --git a/reactor.cc b/reactor.cc new file mode 100644 index 0000000000..295df005c0 --- /dev/null +++ b/reactor.cc @@ -0,0 +1,85 @@ +/* + * reactor.cc + * + * Created on: Aug 1, 2014 + * Author: avi + */ + +#include "reactor.hh" +#include +#include +#include + +reactor::reactor() + : _epollfd(epoll_create1(EPOLL_CLOEXEC)) { + assert(_epollfd != -1); +} + +reactor::~reactor() { + ::close(_epollfd); +} + +void reactor::epoll_add_in(pollable_fd& pfd, std::unique_ptr t) { + auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + pfd.events |= EPOLLIN; + assert(!pfd.pollin); + pfd.pollin = std::move(t); + ::epoll_event eevt; + eevt.events = pfd.events; + eevt.data.ptr = &pfd; + int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt); + assert(r == 0); +} + +void reactor::epoll_add_out(pollable_fd& pfd, std::unique_ptr t) { + auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + pfd.events |= EPOLLOUT; + assert(!pfd.pollout); + pfd.pollout = std::move(t); + ::epoll_event eevt; + eevt.events = pfd.events; + eevt.data.ptr = &pfd; + int r = ::epoll_ctl(_epollfd, ctl, pfd.fd, &eevt); + assert(r == 0); +} + +std::unique_ptr +reactor::listen(socket_address sa) +{ + int fd = ::socket(sa.u.sa.sa_family, SOCK_STREAM, SOCK_NONBLOCK | SOCK_CLOEXEC); + assert(fd != -1); + int r = ::bind(fd, &sa.u.sa, sizeof(sa.u.sas)); + assert(r != -1); + ::listen(fd, 100); + return std::unique_ptr(new pollable_fd(fd)); +} + +void reactor::run() { + while (true) { + std::array eevt; + int nr = ::epoll_wait(_epollfd, eevt.data(), eevt.size(), -1); + assert(nr != -1); + for (int i = 0; i < nr; ++i) { + auto& evt = eevt[i]; + auto pfd = reinterpret_cast(evt.data.ptr); + auto events = evt.events; + if (events & EPOLLIN) { + auto t = std::move(pfd->pollin); + t->run(); + } + if (events & EPOLLOUT) { + auto t = std::move(pfd->pollout); + t->run(); + } + } + } +} + +socket_address make_ipv4_address(ipv4_addr addr) { + socket_address sa; + sa.u.in.sin_family = AF_INET; + sa.u.in.sin_port = htons(addr.port); + std::memcpy(&sa.u.in.sin_addr, addr.host, 4); + return sa; +} + diff --git a/reactor.hh b/reactor.hh new file mode 100644 index 0000000000..47ee018b64 --- /dev/null +++ b/reactor.hh @@ -0,0 +1,168 @@ +/* + * reactor.hh + * + * Created on: Aug 1, 2014 + * Author: avi + */ + +#ifndef REACTOR_HH_ +#define REACTOR_HH_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class socket_address; +class reactor; +class pollable_fd; + +struct ipv4_addr { + uint8_t host[4]; + uint16_t port; +}; + +socket_address make_ipv4_address(ipv4_addr addr); + +class socket_address { +private: + union { + ::sockaddr_storage sas; + ::sockaddr sa; + ::sockaddr_in in; + } u; + friend socket_address make_ipv4_address(ipv4_addr addr); + friend class reactor; +}; + +template +class promise; + +template +struct future_state { + virtual ~future_state(); + promise* promise; + bool value_valid = false; + bool ex_valid = false; + union { + T value; + std::exception_ptr ex; + } u; + void set(const T& value); + void set(T&& value); + void set_exception(std::exception_ptr ex); + T get() { + while (promise) { + promise->wait(); + } + if (ex) { + std::rethrow_exception(ex); + } + return std::move(u.value); + } +}; + +template +class future { + std::unique_ptr> _state; +public: + T get() { + return _state.get(); + } + template + void then(Func func) { + + } +}; + +class reactor { + class task; +public: + int _epollfd; + io_context_t _io_context; +private: + class task { + public: + virtual ~task() {} + virtual void run() = 0; + }; + template + class lambda_task : public task { + Func _func; + public: + lambda_task(Func func) : _func(func) {} + virtual void run() { _func(); } + }; + + template + std::unique_ptr + make_task(Func func) { + return std::make_unique>(func); + } + + void epoll_add_in(pollable_fd& fd, std::unique_ptr t); + void epoll_add_out(pollable_fd& fd, std::unique_ptr t); + void abort_on_error(int ret); +public: + reactor(); + ~reactor(); + + std::unique_ptr listen(socket_address sa); + + template + void accept(pollable_fd& listenfd, Func with_pfd_sockaddr); + + future> accept(pollable_fd& listen_fd) + + future read_some(pollable_fd& fd, void* buffer, size_t size); + template + void read_some(pollable_fd& fd, void* buffer, size_t len, Func with_len); + + void run(); + + friend class pollable_fd; +}; + +class pollable_fd { +protected: + explicit pollable_fd(int fd) : fd(fd) {} + pollable_fd(const pollable_fd&) = delete; + void operator=(const pollable_fd&) = delete; + int fd; + int events = 0; + std::unique_ptr pollin; + std::unique_ptr pollout; + friend class reactor; +}; + +template +inline +void reactor::accept(pollable_fd& listenfd, Func with_pfd_sockaddr) { + auto lfd = listenfd.fd; + epoll_add_in(listenfd, make_task([=] { + socket_address sa; + socklen_t sl = sizeof(&sa.u.sas); + int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC); + assert(fd != -1); + auto pfd = std::unique_ptr(new pollable_fd(fd)); + with_pfd_sockaddr(std::move(pfd), sa); + })); +} + +template +void reactor::read_some(pollable_fd& fd, void* buffer, size_t len, Func with_len) { + auto rfd = fd.fd; + epoll_add_in(fd, make_task([=] { + ssize_t r = ::recv(rfd, buffer, len, 0); + assert(r != -1); + with_len(len); + })); +} + + +#endif /* REACTOR_HH_ */ diff --git a/sstring.hh b/sstring.hh new file mode 100644 index 0000000000..f20197f8c2 --- /dev/null +++ b/sstring.hh @@ -0,0 +1,124 @@ +/* + * sstring.hh + * + * Created on: Jul 31, 2014 + * Author: avi + */ + +#ifndef SSTRING_HH_ +#define SSTRING_HH_ + +#include +#include +#include +#include +#include + +template +class basic_sstring { + union { + struct external_type { + char* str; + size_type size; + } external; + struct internal_type { + char str[max_size - 1]; + int8_t size; + } internal; + static_assert(sizeof(external_type) < sizeof(internal_type), "max_size too small"); + static_assert(max_size <= 127, "max_size too large"); + } u; + bool is_internal() const noexcept { + return u.internal.size >= 0; + } + bool is_external() const noexcept { + return !is_internal(); + } + const char* str() const { + return is_internal() ? u.internal.str : u.external.str; + } + char* str() { + return is_internal() ? u.internal.str : u.external.str; + } +public: + basic_sstring() noexcept { + u.internal.size = 0; + u.internal.str[0] = '\0'; + } + basic_sstring(const basic_sstring& x) { + if (x.is_internal()) { + u.internal = x.u.internal; + } else { + u.external.str = new char[x.u.external.size + 1]; + std::copy(x.u.str, x.u.str + x.u.extenal.size + 1, u.external.str); + u.external.size = x.u.external.size; + } + } + basic_sstring(basic_sstring&& x) noexcept { + u = x.u; + x.u.internal.size = 0; + x.u.internal.str[0] = '\0'; + } + basic_sstring(const char* x, size_t len) { + if (size_type(size) != size) { + throw std::overflow_error("sstring overflow"); + } + if (size + 1 <= sizeof(u.internal.str)) { + std::copy(x, x + size + 1, u.internal.str); + u.internal.size = size; + } else { + u.internal.size = -1; + u.external.str = new char[size + 1]; + u.external.size = size; + std::copy(x, x + size + 1, u.external.str); + } + } + basic_sstring(const char* x) : basic_sstring(x, std::strlen(x)) {} + basic_sstring(std::string& x) : basic_sstring(x.c_str(), x.size()) {} + ~basic_sstring() noexcept { + if (!is_external()) { + delete[] u.external.str; + } + } + basic_sstring& operator=(const basic_sstring& x) { + basic_sstring tmp(x); + swap(tmp); + } + basic_sstring& operator=(basic_sstring&& x) noexcept { + reset(); + swap(x); + } + operator std::string() const { + return str(); + } + size_t size() const noexcept { + return is_internal() ? u.internal.size : u.external.size; + } + bool empty() const noexcept { + return u.internal.size == 0; + } + void reset() noexcept { + if (is_external()) { + delete u.external.str; + } + u.internal.size = 0; + } + void swap(basic_sstring& x) noexcept { + basic_sstring tmp; + tmp.u = x.u; + x.u = u; + u = tmp.u; + } +}; + +template +inline +void swap(basic_sstring& x, + basic_sstring& y) noexcept +{ + return x.swap(y); +} + +using sstring = basic_sstring; + +#endif /* SSTRING_HH_ */ diff --git a/test-reactor.cc b/test-reactor.cc new file mode 100644 index 0000000000..85aad2ba71 --- /dev/null +++ b/test-reactor.cc @@ -0,0 +1,29 @@ +/* + * test-reactor.cc + * + * Created on: Aug 2, 2014 + * Author: avi + */ + +#include "reactor.hh" +#include + +struct test { + reactor r; + std::unique_ptr listener; + void on_accept(std::unique_ptr pfd, socket_address sa) { + std::cout << "got connection\n"; + r.accept(*listener, [=] (std::unique_ptr pfd, socket_address sa) { + on_accept(std::move(pfd), sa); + }); + } + +int main(int ac, char** av) +{ + test t; + ipv4_addr addr{{}, 10000}; + t.listener = r.listen(make_ipv4_address(addr)); + r.accept(*listener, [&] + r.run(); +} +