Files
scylladb/core/reactor.hh
Gleb Natapov 6ad9114c0b reactor: add at_destroy() function to the reactor and use it
Unfortunately at_exit() cannot be used to delete objects since when
it runs the reactor is still active and deleted object may still been used.
We need another API that runs its task after reactor is already stopped.
at_destroy() will be such api.
2014-12-30 15:21:10 +02:00

1484 lines
47 KiB
C++

/*
* Copyright 2014 Cloudius Systems
*/
#ifndef REACTOR_HH_
#define REACTOR_HH_
#include <memory>
#include <type_traits>
#include <libaio.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unordered_map>
#include <netinet/ip.h>
#include <cstring>
#include <cassert>
#include <stdexcept>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <queue>
#include <algorithm>
#include <thread>
#include <system_error>
#include <chrono>
#include <ratio>
#include <atomic>
#include <experimental/optional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/program_options.hpp>
#include "util/eclipse.hh"
#include "future.hh"
#include "posix.hh"
#include "apply.hh"
#include "sstring.hh"
#include "timer-set.hh"
#include "deleter.hh"
#include "net/api.hh"
#include "temporary_buffer.hh"
#include "circular_buffer.hh"
#include "file.hh"
#include "semaphore.hh"
#include "core/scattered_message.hh"
#ifdef HAVE_OSV
#include <osv/newpoll.hh>
#endif
class reactor;
class pollable_fd;
class pollable_fd_state;
class lowres_clock;
template <typename CharType>
class input_stream;
template <typename CharType>
class output_stream;
struct free_deleter {
void operator()(void* p) { ::free(p); }
};
template <typename CharType>
inline
std::unique_ptr<CharType[], free_deleter> allocate_aligned_buffer(size_t size, size_t align) {
static_assert(sizeof(CharType) == 1, "must allocate byte type");
void* ret;
auto r = posix_memalign(&ret, align, size);
assert(r == 0);
return std::unique_ptr<CharType[], free_deleter>(reinterpret_cast<CharType*>(ret));
}
using clock_type = std::chrono::high_resolution_clock;
template <typename Clock = std::chrono::high_resolution_clock>
class timer {
public:
typedef typename Clock::time_point time_point;
typedef typename Clock::duration duration;
typedef Clock clock;
private:
using callback_t = std::function<void()>;
boost::intrusive::list_member_hook<> _link;
callback_t _callback;
time_point _expiry;
boost::optional<duration> _period;
bool _armed = false;
bool _queued = false;
bool _expired = false;
public:
~timer();
future<> expired();
void set_callback(callback_t&& callback);
void arm(time_point until, boost::optional<duration> period = {});
void rearm(time_point until, boost::optional<duration> period = {});
void arm(duration delta);
void arm_periodic(duration delta);
bool armed() const { return _armed; }
bool cancel();
time_point get_timeout();
friend class reactor;
friend class timer_set<timer, &timer::_link>;
};
class lowres_clock {
public:
typedef int64_t rep;
// The lowres_clock's resolution is 10ms. However, to make it is easier to
// do calcuations with std::chrono::milliseconds, we make the clock's
// period to 1ms instead of 10ms.
typedef std::ratio<1, 1000> period;
typedef std::chrono::duration<rep, period> duration;
typedef std::chrono::time_point<lowres_clock, duration> time_point;
lowres_clock();
static time_point now() {
auto nr = _now.load(std::memory_order_relaxed);
return time_point(duration(nr));
}
private:
static void update();
// _now is updated by cpu0 and read by other cpus. Make _now on its own
// cache line to avoid false sharing.
static std::atomic<rep> _now [[gnu::aligned(64)]];
// High resolution timer to drive this low resolution clock
static timer<> _timer [[gnu::aligned(64)]];
// High resolution timer expires every 10 milliseconds
static constexpr std::chrono::milliseconds _granularity{10};
};
class pollable_fd_state {
public:
struct speculation {
int events = 0;
explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
};
~pollable_fd_state();
explicit pollable_fd_state(file_desc fd, speculation speculate = speculation())
: fd(std::move(fd)), events_known(speculate.events) {}
pollable_fd_state(const pollable_fd_state&) = delete;
void operator=(const pollable_fd_state&) = delete;
void speculate_epoll(int events) { events_known |= events; }
file_desc fd;
int events_requested = 0; // wanted by pollin/pollout promises
int events_epoll = 0; // installed in epoll
int events_known = 0; // returned from epoll
promise<> pollin;
promise<> pollout;
friend class reactor;
friend class pollable_fd;
};
inline
size_t iovec_len(const std::vector<iovec>& iov)
{
size_t ret = 0;
for (auto&& e : iov) {
ret += e.iov_len;
}
return ret;
}
class pollable_fd {
public:
using speculation = pollable_fd_state::speculation;
std::unique_ptr<pollable_fd_state> _s;
pollable_fd(file_desc fd, speculation speculate = speculation())
: _s(std::make_unique<pollable_fd_state>(std::move(fd), speculate)) {}
public:
pollable_fd(pollable_fd&&) = default;
pollable_fd& operator=(pollable_fd&&) = default;
future<size_t> read_some(char* buffer, size_t size);
future<size_t> read_some(uint8_t* buffer, size_t size);
future<size_t> read_some(const std::vector<iovec>& iov);
future<> write_all(const char* buffer, size_t size);
future<> write_all(const uint8_t* buffer, size_t size);
future<size_t> write_some(net::packet& p);
future<> write_all(net::packet& p);
future<pollable_fd, socket_address> accept();
future<size_t> sendmsg(struct msghdr *msg);
future<size_t> recvmsg(struct msghdr *msg);
future<size_t> sendto(socket_address addr, const void* buf, size_t len);
file_desc& get_file_desc() const { return _s->fd; }
void close() { _s.reset(); }
protected:
int get_fd() const { return _s->fd.get(); }
friend class reactor;
friend class readable_eventfd;
friend class writeable_eventfd;
};
class connected_socket_impl {
public:
virtual ~connected_socket_impl() {}
virtual input_stream<char> input() = 0;
virtual output_stream<char> output() = 0;
};
class connected_socket {
std::unique_ptr<connected_socket_impl> _csi;
public:
explicit connected_socket(std::unique_ptr<connected_socket_impl> csi)
: _csi(std::move(csi)) {}
input_stream<char> input();
output_stream<char> output();
};
class server_socket_impl {
public:
virtual ~server_socket_impl() {}
virtual future<connected_socket, socket_address> accept() = 0;
};
namespace std {
template <>
struct hash<::sockaddr_in> {
size_t operator()(::sockaddr_in a) const {
return a.sin_port ^ a.sin_addr.s_addr;
}
};
}
bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
class server_socket {
std::unique_ptr<server_socket_impl> _ssi;
public:
explicit server_socket(std::unique_ptr<server_socket_impl> ssi)
: _ssi(std::move(ssi)) {}
future<connected_socket, socket_address> accept() {
return _ssi->accept();
}
};
class network_stack {
public:
virtual ~network_stack() {}
virtual server_socket listen(socket_address sa, listen_options opts) = 0;
virtual net::udp_channel make_udp_channel(ipv4_addr addr = {}) = 0;
virtual future<> initialize() {
return make_ready_future();
}
virtual bool has_per_core_namespace() = 0;
};
class network_stack_registry {
public:
using options = boost::program_options::variables_map;
private:
static std::unordered_map<sstring,
std::function<future<std::unique_ptr<network_stack>> (options opts)>>& _map() {
static std::unordered_map<sstring,
std::function<future<std::unique_ptr<network_stack>> (options opts)>> map;
return map;
}
static sstring& _default() {
static sstring def;
return def;
}
public:
static boost::program_options::options_description& options_description() {
static boost::program_options::options_description opts;
return opts;
}
static void register_stack(sstring name,
boost::program_options::options_description opts,
std::function<future<std::unique_ptr<network_stack>> (options opts)> create,
bool make_default = false);
static sstring default_stack();
static std::vector<sstring> list();
static future<std::unique_ptr<network_stack>> create(options opts);
static future<std::unique_ptr<network_stack>> create(sstring name, options opts);
};
class network_stack_registrator {
public:
using options = boost::program_options::variables_map;
explicit network_stack_registrator(sstring name,
boost::program_options::options_description opts,
std::function<future<std::unique_ptr<network_stack>> (options opts)> factory,
bool make_default = false) {
network_stack_registry::register_stack(name, opts, factory, make_default);
}
};
class writeable_eventfd;
class readable_eventfd {
pollable_fd _fd;
public:
explicit readable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
readable_eventfd(readable_eventfd&&) = default;
writeable_eventfd write_side();
future<size_t> wait();
int get_write_fd() { return _fd.get_fd(); }
private:
explicit readable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
static file_desc try_create_eventfd(size_t initial);
friend class writeable_eventfd;
};
class writeable_eventfd {
file_desc _fd;
public:
explicit writeable_eventfd(size_t initial = 0) : _fd(try_create_eventfd(initial)) {}
writeable_eventfd(writeable_eventfd&&) = default;
readable_eventfd read_side();
void signal(size_t nr);
int get_read_fd() { return _fd.get(); }
private:
explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
static file_desc try_create_eventfd(size_t initial);
friend class readable_eventfd;
};
// The reactor_notifier interface is a simplified version of Linux's eventfd
// interface (with semaphore behavior off, and signal() always signaling 1).
//
// A call to signal() causes an ongoing wait() to invoke its continuation.
// If no wait() is ongoing, the next wait() will continue immediately.
class reactor_notifier {
public:
virtual future<> wait() = 0;
virtual void signal() = 0;
virtual ~reactor_notifier() {}
};
class thread_pool;
class smp;
class syscall_work_queue {
static constexpr size_t queue_length = 128;
struct work_item;
using lf_queue = boost::lockfree::spsc_queue<work_item*,
boost::lockfree::capacity<queue_length>>;
lf_queue _pending;
lf_queue _completed;
writeable_eventfd _start_eventfd;
semaphore _queue_has_room = { queue_length };
struct work_item {
virtual ~work_item() {}
virtual void process() = 0;
virtual void complete() = 0;
};
template <typename T, typename Func>
struct work_item_returning : work_item {
Func _func;
promise<T> _promise;
boost::optional<T> _result;
work_item_returning(Func&& func) : _func(std::move(func)) {}
virtual void process() override { _result = this->_func(); }
virtual void complete() override { _promise.set_value(std::move(*_result)); }
future<T> get_future() { return _promise.get_future(); }
};
public:
syscall_work_queue();
template <typename T, typename Func>
future<T> submit(Func func) {
auto wi = new work_item_returning<T, Func>(std::move(func));
auto fut = wi->get_future();
submit_item(wi);
return fut;
}
private:
void work();
void complete();
void submit_item(work_item* wi);
friend class thread_pool;
};
class smp_message_queue {
static constexpr size_t queue_length = 128;
static constexpr size_t batch_size = 16;
struct work_item;
using lf_queue = boost::lockfree::spsc_queue<work_item*,
boost::lockfree::capacity<queue_length>>;
lf_queue _pending;
lf_queue _completed;
size_t _current_queue_length = 0;
reactor* _pending_peer;
reactor* _complete_peer;
struct work_item {
virtual ~work_item() {}
virtual future<> process() = 0;
virtual void complete() = 0;
};
template <typename Func, typename Future>
struct async_work_item : work_item {
smp_message_queue& _q;
Func _func;
using value_type = typename Future::value_type;
std::experimental::optional<value_type> _result;
std::exception_ptr _ex; // if !_result
typename Future::promise_type _promise; // used on local side
async_work_item(smp_message_queue& q, Func&& func) : _q(q), _func(std::move(func)) {}
virtual future<> process() override {
try {
return this->_func().rescue([this] (auto&& get_result) {
try {
_result = get_result();
} catch (...) {
_ex = std::current_exception();
}
});
} catch (...) {
_ex = std::current_exception();
return make_ready_future();
}
}
virtual void complete() override {
if (_result) {
_promise.set_value(std::move(*_result));
} else {
// FIXME: _ex was allocated on another cpu
_promise.set_exception(std::move(_ex));
}
}
Future get_future() { return _promise.get_future(); }
};
union tx_side {
tx_side() {}
~tx_side() {}
void init() { new (&a) aa; }
struct aa {
std::deque<work_item*> pending_fifo;
} a;
} _tx;
std::vector<work_item*> _completed_fifo;
public:
smp_message_queue();
template <typename Func>
std::result_of_t<Func()> submit(Func func) {
using future = std::result_of_t<Func()>;
auto wi = new async_work_item<Func, future>(*this, std::move(func));
auto fut = wi->get_future();
submit_item(wi);
return fut;
}
void start();
size_t process_incoming();
size_t process_completions();
private:
void work();
void submit_item(work_item* wi);
void respond(work_item* wi);
void move_pending();
void flush_request_batch();
void flush_response_batch();
friend class smp;
};
class thread_pool {
#ifndef HAVE_OSV
// FIXME: implement using reactor_notifier abstraction we used for SMP
syscall_work_queue inter_thread_wq;
posix_thread _worker_thread;
std::atomic<bool> _stopped = { false };
pthread_t _notify;
public:
thread_pool();
~thread_pool();
template <typename T, typename Func>
future<T> submit(Func func) {return inter_thread_wq.submit<T>(std::move(func));}
#else
public:
template <typename T, typename Func>
future<T> submit(Func func) { std::cout << "thread_pool not yet implemented on osv\n"; abort(); }
#endif
private:
void work();
};
// The "reactor_backend" interface provides a method of waiting for various
// basic events on one thread. We have one implementation based on epoll and
// file-descriptors (reactor_backend_epoll) and one implementation based on
// OSv-specific file-descriptor-less mechanisms (reactor_backend_osv).
class reactor_backend {
public:
virtual ~reactor_backend() {};
// wait_and_process() waits for some events to become available, and
// processes one or more of them. If block==false, it doesn't wait,
// and just processes events that have already happened, if any.
// After the optional wait, just before processing the events, the
// pre_process() function is called.
virtual void wait_and_process() = 0;
// Methods that allow polling on file descriptors. This will only work on
// reactor_backend_epoll. Other reactor_backend will probably abort if
// they are called (which is fine if no file descriptors are waited on):
virtual future<> readable(pollable_fd_state& fd) = 0;
virtual future<> writeable(pollable_fd_state& fd) = 0;
virtual void forget(pollable_fd_state& fd) = 0;
// Methods that allow polling on a reactor_notifier. This is currently
// used only for reactor_backend_osv, but in the future it should really
// replace the above functions.
virtual future<> notified(reactor_notifier *n) = 0;
// Methods for allowing sending notifications events between threads.
virtual std::unique_ptr<reactor_notifier> make_reactor_notifier() = 0;
};
// reactor backend using file-descriptor & epoll, suitable for running on
// Linux. Can wait on multiple file descriptors, and converts other events
// (such as timers, signals, inter-thread notifications) into file descriptors
// using mechanisms like timerfd, signalfd and eventfd respectively.
class reactor_backend_epoll : public reactor_backend {
private:
file_desc _epollfd;
future<> get_epoll_future(pollable_fd_state& fd,
promise<> pollable_fd_state::* pr, int event);
void complete_epoll_event(pollable_fd_state& fd,
promise<> pollable_fd_state::* pr, int events, int event);
public:
reactor_backend_epoll();
virtual ~reactor_backend_epoll() override { }
virtual void wait_and_process() override;
virtual future<> readable(pollable_fd_state& fd) override;
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
virtual future<> notified(reactor_notifier *n) override;
virtual std::unique_ptr<reactor_notifier> make_reactor_notifier() override;
};
#ifdef HAVE_OSV
// reactor_backend using OSv-specific features, without any file descriptors.
// This implementation cannot currently wait on file descriptors, but unlike
// reactor_backend_epoll it doesn't need file descriptors for waiting on a
// timer, for example, so file descriptors are not necessary.
class reactor_notifier_osv;
class reactor_backend_osv : public reactor_backend {
private:
osv::newpoll::poller _poller;
future<> get_poller_future(reactor_notifier_osv *n);
promise<> _timer_promise;
public:
reactor_backend_osv();
virtual ~reactor_backend_osv() override { }
virtual void wait_and_process() override;
virtual future<> readable(pollable_fd_state& fd) override;
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
virtual future<> notified(reactor_notifier *n) override;
virtual std::unique_ptr<reactor_notifier> make_reactor_notifier() override;
friend class reactor_notifier_osv;
};
#endif /* HAVE_OSV */
class reactor {
private:
struct pollfn {
virtual ~pollfn() {}
virtual bool poll_and_check_more_work() = 0;
};
public:
class poller {
std::unique_ptr<pollfn> _pollfn;
class registration_task;
class deregistration_task;
registration_task* _registration_task;
public:
template <typename Func> // signature: bool ()
explicit poller(Func&& poll_and_check_more_work)
: _pollfn(make_pollfn(std::forward<Func>(poll_and_check_more_work))) {
do_register();
}
~poller();
poller(poller&& x);
poller& operator=(poller&& x);
void do_register();
friend class reactor;
};
private:
// FIXME: make _backend a unique_ptr<reactor_backend>, not a compile-time #ifdef.
#ifdef HAVE_OSV
reactor_backend_osv _backend;
#else
reactor_backend_epoll _backend;
#endif
std::vector<pollfn*> _pollers;
static constexpr size_t max_aio = 128;
promise<> _exit_promise;
future<> _exit_future;
unsigned _id = 0;
bool _stopped = false;
bool _handle_sigint = true;
promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
int _return = 0;
timer_t _timer;
promise<> _start_promise;
semaphore _cpu_started;
uint64_t _tasks_processed = 0;
timer_set<timer<>, &timer<>::_link> _timers;
timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
io_context_t _io_context;
semaphore _io_context_available;
circular_buffer<std::unique_ptr<task>> _pending_tasks;
circular_buffer<std::unique_ptr<task>> _at_destroy_tasks;
size_t _task_quota;
std::unique_ptr<network_stack> _network_stack;
// _lowres_clock will only be created on cpu 0
std::unique_ptr<lowres_clock> _lowres_clock;
lowres_clock::time_point _lowres_next_timeout;
promise<> _lowres_timer_promise;
promise<> _timer_promise;
std::experimental::optional<poller> _epoll_poller;
private:
void abort_on_error(int ret);
template <typename T, typename E>
void complete_timers(T&, E&, std::function<future<> ()>, std::function<void ()>);
/**
* Returns TRUE if all pollers allow blocking.
*
* @return FALSE if at least one of the blockers requires a non-blocking
* execution.
*/
bool poll_once();
template <typename Func> // signature: bool ()
static std::unique_ptr<pollfn> make_pollfn(Func&& func);
struct signal_handler {
signal_handler(int signo);
promise<> _promise;
static thread_local std::atomic<uint64_t> pending;
};
std::unordered_map<int, signal_handler> _signal_handlers;
void poll_signal();
friend void sigaction(int signo, siginfo_t* siginfo, void* ignore);
thread_pool _thread_pool;
void run_tasks(circular_buffer<std::unique_ptr<task>>& tasks, size_t task_quota);
public:
static boost::program_options::options_description get_options_description();
reactor();
reactor(const reactor&) = delete;
~reactor() {
auto eraser = [](auto& list) {
while (!list.empty()) {
auto timer = *list.begin();
timer.cancel();
}
};
eraser(_expired_timers);
eraser(_expired_lowres_timers);
}
void operator=(const reactor&) = delete;
void configure(boost::program_options::variables_map config);
server_socket listen(socket_address sa, listen_options opts = {});
pollable_fd posix_listen(socket_address sa, listen_options opts = {});
future<pollable_fd, socket_address> accept(pollable_fd_state& listen_fd);
future<size_t> read_some(pollable_fd_state& fd, void* buffer, size_t size);
future<size_t> read_some(pollable_fd_state& fd, const std::vector<iovec>& iov);
future<size_t> write_some(pollable_fd_state& fd, const void* buffer, size_t size);
future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size);
future<file> open_file_dma(sstring name);
template <typename Func>
future<io_event> submit_io(Func prepare_io);
future<> receive_signal(int signo);
int run();
void exit(int ret);
future<> when_started() { return _start_promise.get_future(); }
template <typename Func>
void at_exit(Func&& func) {
_exit_future = _exit_future.then(std::forward<Func>(func));
}
template <typename Func>
void at_destroy(Func&& func) {
_at_destroy_tasks.push_back(make_task(std::forward<Func>(func)));
}
void add_task(std::unique_ptr<task>&& t) { _pending_tasks.push_back(std::move(t)); }
network_stack& net() { return *_network_stack; }
unsigned cpu_id() const { return _id; }
void start_epoll() {
if (!_epoll_poller) {
_epoll_poller = poller([this] {
wait_and_process(); return true;
});
}
}
private:
/**
* Add a new "poller" - a non-blocking function returning a boolean, that
* will be called every iteration of a main loop.
* If it returns FALSE then reactor's main loop is forbidden to block in the
* current iteration.
*
* @param fn a new "poller" function to register
*/
void register_poller(pollfn* p);
void unregister_poller(pollfn* p);
void replace_poller(pollfn* old, pollfn* neww);
struct collectd_registrations;
collectd_registrations register_collectd_metrics();
future<> write_all_part(pollable_fd_state& fd, const void* buffer, size_t size, size_t completed);
void process_io();
void add_timer(timer<>*);
void del_timer(timer<>*);
void add_timer(timer<lowres_clock>*);
void del_timer(timer<lowres_clock>*);
future<> run_exit_tasks();
void stop();
friend class pollable_fd;
friend class pollable_fd_state;
friend class posix_file_impl;
friend class blockdev_file_impl;
friend class readable_eventfd;
friend class timer<>;
friend class timer<lowres_clock>;
friend class smp;
friend class smp_message_queue;
friend class poller;
public:
void wait_and_process() {
_backend.wait_and_process();
}
future<> readable(pollable_fd_state& fd) {
return _backend.readable(fd);
}
future<> writeable(pollable_fd_state& fd) {
return _backend.writeable(fd);
}
void forget(pollable_fd_state& fd) {
_backend.forget(fd);
}
future<> notified(reactor_notifier *n) {
return _backend.notified(n);
}
void enable_timer(clock_type::time_point when);
future<> timers_completed() {
return _timer_promise.get_future();
}
future<> lowres_timers_completed() {
return _lowres_timer_promise.get_future();
}
std::unique_ptr<reactor_notifier> make_reactor_notifier() {
return _backend.make_reactor_notifier();
}
};
template <typename Func> // signature: bool ()
inline
std::unique_ptr<reactor::pollfn>
reactor::make_pollfn(Func&& func) {
struct the_pollfn : pollfn {
the_pollfn(Func&& func) : func(std::forward<Func>(func)) {}
Func func;
virtual bool poll_and_check_more_work() override {
return func();
}
};
return std::make_unique<the_pollfn>(std::forward<Func>(func));
}
extern thread_local reactor engine;
extern __thread size_t task_quota;
class smp {
#if HAVE_DPDK
using thread_adaptor = std::function<void ()>;
#else
using thread_adaptor = posix_thread;
#endif
static std::vector<thread_adaptor> _threads;
static smp_message_queue** _qs;
static std::thread::id _tmain;
template <typename Func>
using returns_future = is_future<std::result_of_t<Func()>>;
template <typename Func>
using returns_void = std::is_same<std::result_of_t<Func()>, void>;
public:
static boost::program_options::options_description get_options_description();
static void configure(boost::program_options::variables_map vm);
static void join_all();
static bool main_thread() { return std::this_thread::get_id() == _tmain; }
template <typename Func>
static std::result_of_t<Func()> submit_to(unsigned t, Func func,
std::enable_if_t<returns_future<Func>::value, void*> = nullptr) {
if (t == engine.cpu_id()) {
return func();
} else {
return _qs[t][engine.cpu_id()].submit(std::move(func));
}
}
template <typename Func>
static future<std::result_of_t<Func()>> submit_to(unsigned t, Func func,
std::enable_if_t<!returns_future<Func>::value && !returns_void<Func>::value, void*> = nullptr) {
return submit_to(t, [func = std::move(func)] () mutable {
return make_ready_future<std::result_of_t<Func()>>(func());
});
}
template <typename Func>
static future<> submit_to(unsigned t, Func func,
std::enable_if_t<!returns_future<Func>::value && returns_void<Func>::value, void*> = nullptr) {
return submit_to(t, [func = std::move(func)] () mutable {
func();
return make_ready_future<>();
});
}
static bool poll_queues() {
size_t got = 0;
for (unsigned i = 0; i < count; i++) {
if (engine.cpu_id() != i) {
auto& rxq = _qs[engine.cpu_id()][i];
rxq.flush_response_batch();
got += rxq.process_incoming();
auto& txq = _qs[i][engine._id];
txq.flush_request_batch();
got += txq.process_completions();
}
}
return got != 0;
}
private:
static void listen_all(smp_message_queue* qs);
static void start_all_queues();
static void pin(unsigned cpu_id);
public:
static unsigned count;
};
inline
pollable_fd_state::~pollable_fd_state() {
engine.forget(*this);
}
class data_source_impl {
public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
};
class data_source {
std::unique_ptr<data_source_impl> _dsi;
public:
explicit data_source(std::unique_ptr<data_source_impl> dsi) : _dsi(std::move(dsi)) {}
data_source(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
};
class data_sink_impl {
public:
virtual ~data_sink_impl() {}
virtual future<> put(net::packet data) = 0;
virtual future<> put(std::vector<temporary_buffer<char>> data) {
net::packet p;
p.reserve(data.size());
for (auto& buf : data) {
p = net::packet(std::move(p), net::fragment{buf.get_write(), buf.size()}, buf.release());
}
return put(std::move(p));
}
virtual future<> put(temporary_buffer<char> buf) {
return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
}
virtual future<> close() = 0;
};
class data_sink {
std::unique_ptr<data_sink_impl> _dsi;
public:
explicit data_sink(std::unique_ptr<data_sink_impl> dsi) : _dsi(std::move(dsi)) {}
data_sink(data_sink&& x) = default;
future<> put(std::vector<temporary_buffer<char>> data) {
return _dsi->put(std::move(data));
}
future<> put(temporary_buffer<char> data) {
return _dsi->put(std::move(data));
}
future<> put(net::packet p) {
return _dsi->put(std::move(p));
}
future<> close() { return _dsi->close(); }
};
template <typename CharType>
class input_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_source _fd;
temporary_buffer<CharType> _buf;
bool _eof = false;
private:
using tmp_buf = temporary_buffer<CharType>;
size_t available() const { return _buf.size(); }
public:
// Consumer concept, for consume() method:
struct ConsumerConcept {
// call done(tmp_buf) to signal end of processing. tmp_buf parameter to
// done is unconsumed data
template <typename Done>
void operator()(tmp_buf data, Done done);
};
using char_type = CharType;
explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {}
future<temporary_buffer<CharType>> read_exactly(size_t n);
template <typename Consumer>
future<> consume(Consumer& c);
bool eof() { return _eof; }
private:
future<temporary_buffer<CharType>> read_exactly_part(size_t n, tmp_buf buf, size_t completed);
};
// Facilitates data buffering before it's handed over to data_sink.
//
// When trim_to_size is true it's guaranteed that data sink will not receive
// chunks larger than the configured size, which could be the case when a
// single write call is made with data larger than the configured size.
//
// The data sink will not receive empty chunks.
//
template <typename CharType>
class output_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
size_t _size;
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
public:
using char_type = CharType;
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);
future<> write(net::packet p);
future<> write(scattered_message<char_type> msg);
future<> flush();
future<> close() { return _fd.close(); }
private:
};
template<typename CharType>
inline
future<> output_stream<CharType>::write(const char_type* buf) {
return write(buf, strlen(buf));
}
template<typename CharType>
inline
future<> output_stream<CharType>::write(const sstring& s) {
return write(s.c_str(), s.size());
}
template<typename CharType>
future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
return write(std::move(msg).release());
}
template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) {
static_assert(std::is_same<CharType, char>::value, "packet works on char");
if (p.len() == 0) {
return make_ready_future<>();
}
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
if (!_trim_to_size || p.len() <= _size) {
// TODO: aggregate buffers for later coalescing. Currently we flush right
// after appending the message anyway, so it doesn't matter.
return _fd.put(std::move(p));
}
auto head = p.share(0, _size);
p.trim_front(_size);
return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable {
return write(std::move(p));
});
}
inline
size_t iovec_len(const iovec* begin, size_t len)
{
size_t ret = 0;
auto end = begin + len;
while (begin != end) {
ret += begin++->iov_len;
}
return ret;
}
inline
future<pollable_fd, socket_address>
reactor::accept(pollable_fd_state& listenfd) {
return readable(listenfd).then([this, &listenfd] () mutable {
socket_address sa;
socklen_t sl = sizeof(&sa.u.sas);
file_desc fd = listenfd.fd.accept(sa.u.sa, sl, SOCK_NONBLOCK | SOCK_CLOEXEC);
pollable_fd pfd(std::move(fd), pollable_fd::speculation(EPOLLOUT));
return make_ready_future<pollable_fd, socket_address>(std::move(pfd), std::move(sa));
});
}
inline
future<size_t>
reactor::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
return readable(fd).then([this, &fd, buffer, len] () mutable {
auto r = fd.fd.read(buffer, len);
if (!r) {
return read_some(fd, buffer, len);
}
if (size_t(*r) == len) {
fd.speculate_epoll(EPOLLIN);
}
return make_ready_future<size_t>(*r);
});
}
inline
future<size_t>
reactor::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
return readable(fd).then([this, &fd, iov = iov] () mutable {
::msghdr mh = {};
mh.msg_iov = &iov[0];
mh.msg_iovlen = iov.size();
auto r = fd.fd.recvmsg(&mh, 0);
if (!r) {
return read_some(fd, iov);
}
if (size_t(*r) == iovec_len(iov)) {
fd.speculate_epoll(EPOLLIN);
}
return make_ready_future<size_t>(*r);
});
}
inline
future<size_t>
reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
return writeable(fd).then([this, &fd, buffer, len] () mutable {
auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL);
if (!r) {
return write_some(fd, buffer, len);
}
if (size_t(*r) == len) {
fd.speculate_epoll(EPOLLOUT);
}
return make_ready_future<size_t>(*r);
});
}
inline
future<>
reactor::write_all_part(pollable_fd_state& fd, const void* buffer, size_t len, size_t completed) {
if (completed == len) {
return make_ready_future<>();
} else {
return write_some(fd, static_cast<const char*>(buffer) + completed, len - completed).then(
[&fd, buffer, len, completed, this] (size_t part) mutable {
return write_all_part(fd, buffer, len, completed + part);
});
}
}
inline
future<>
reactor::write_all(pollable_fd_state& fd, const void* buffer, size_t len) {
assert(len);
return write_all_part(fd, buffer, len, 0);
}
template <typename T, typename E>
void reactor::complete_timers(T& timers, E& expired_timers,
std::function<future<> ()> completed_fn,
std::function<void ()> enable_fn) {
completed_fn().then([this, &timers, &expired_timers, completed_fn,
enable_fn = std::move(enable_fn)] () mutable {
expired_timers = timers.expire(timers.now());
for (auto& t : expired_timers) {
t._expired = true;
}
while (!expired_timers.empty()) {
auto t = &*expired_timers.begin();
expired_timers.pop_front();
t->_queued = false;
if (t->_armed) {
t->_armed = false;
if (t->_period) {
t->arm_periodic(*t->_period);
}
t->_callback();
}
}
enable_fn();
complete_timers(timers, expired_timers, std::move(completed_fn), std::move(enable_fn));
});
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly_part(size_t n, tmp_buf out, size_t completed) {
if (available()) {
auto now = std::min(n - completed, available());
std::copy(_buf.get(), _buf.get() + now, out.get_write() + completed);
_buf.trim_front(now);
completed += now;
}
if (completed == n) {
return make_ready_future<tmp_buf>(std::move(out));
}
// _buf is now empty
return _fd.get().then([this, n, out = std::move(out), completed] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly_part(n, std::move(out), completed);
});
}
template <typename CharType>
future<temporary_buffer<CharType>>
input_stream<CharType>::read_exactly(size_t n) {
if (_buf.size() == n) {
// easy case: steal buffer, return to caller
return make_ready_future<tmp_buf>(std::move(_buf));
} else if (_buf.size() > n) {
// buffer large enough, share it with caller
auto front = _buf.share(0, n);
_buf.trim_front(n);
return make_ready_future<tmp_buf>(std::move(front));
} else if (_buf.size() == 0) {
// buffer is empty: grab one and retry
return _fd.get().then([this, n] (auto buf) mutable {
if (buf.size() == 0) {
return make_ready_future<tmp_buf>(std::move(buf));
}
_buf = std::move(buf);
return this->read_exactly(n);
});
} else {
// buffer too small: start copy/read loop
tmp_buf b(n);
return read_exactly_part(n, std::move(b), 0);
}
}
template <typename CharType>
template <typename Consumer>
future<>
input_stream<CharType>::consume(Consumer& consumer) {
if (_buf.empty() && !_eof) {
return _fd.get().then([this, &consumer] (tmp_buf buf) {
_buf = std::move(buf);
_eof = _buf.empty();
return consume(consumer);
});
} else {
auto tmp = std::move(_buf);
bool done = tmp.empty();
consumer(std::move(tmp), [this, &done] (tmp_buf unconsumed) {
done = true;
if (!unconsumed.empty()) {
_buf = std::move(unconsumed);
}
});
if (!done) {
return consume(consumer);
} else {
return make_ready_future<>();
}
}
}
#include <iostream>
#include "sstring.hh"
// Writes @buf in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
assert(_end == 0);
if (buf.size() < _size) {
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
std::copy(buf.get(), buf.get() + buf.size(), _buf.get_write());
_end = buf.size();
return make_ready_future<>();
}
auto chunk = buf.share(0, _size);
buf.trim_front(_size);
return _fd.put(std::move(chunk)).then([this, buf = std::move(buf)] () mutable {
return split_and_put(std::move(buf));
});
}
template <typename CharType>
future<>
output_stream<CharType>::write(const char_type* buf, size_t n) {
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
if (_end) {
auto now = _size - _end;
std::copy(buf, buf + now, _buf.get_write() + _end);
_end = _size;
temporary_buffer<char> tmp(n - now);
std::copy(buf + now, buf + n, tmp.get_write());
return flush().then([this, tmp = std::move(tmp)]() mutable {
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
});
} else {
temporary_buffer<char> tmp(n);
std::copy(buf, buf + n, tmp.get_write());
if (_trim_to_size) {
return split_and_put(std::move(tmp));
} else {
return _fd.put(std::move(tmp));
}
}
}
if (!_buf) {
_buf = temporary_buffer<char>(_size);
}
auto now = std::min(n, _size - _end);
std::copy(buf, buf + now, _buf.get_write() + _end);
_end += now;
if (now == n) {
return make_ready_future<>();
} else {
temporary_buffer<CharType> next(_size);
std::copy(buf + now, buf + n, next.get_write());
_end = n - now;
std::swap(next, _buf);
return _fd.put(std::move(next));
}
}
template <typename CharType>
future<>
output_stream<CharType>::flush() {
if (!_end) {
return make_ready_future<>();
}
_buf.trim(_end);
_end = 0;
return _fd.put(std::move(_buf));
}
inline
future<size_t> pollable_fd::read_some(char* buffer, size_t size) {
return engine.read_some(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::read_some(uint8_t* buffer, size_t size) {
return engine.read_some(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::read_some(const std::vector<iovec>& iov) {
return engine.read_some(*_s, iov);
}
inline
future<> pollable_fd::write_all(const char* buffer, size_t size) {
return engine.write_all(*_s, buffer, size);
}
inline
future<> pollable_fd::write_all(const uint8_t* buffer, size_t size) {
return engine.write_all(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::write_some(net::packet& p) {
return engine.writeable(*_s).then([this, &p] () mutable {
static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) &&
sizeof(iovec::iov_base) == sizeof(net::fragment::base) &&
offsetof(iovec, iov_len) == offsetof(net::fragment, size) &&
sizeof(iovec::iov_len) == sizeof(net::fragment::size) &&
alignof(iovec) == alignof(net::fragment) &&
sizeof(iovec) == sizeof(net::fragment)
, "net::fragment and iovec should be equivalent");
iovec* iov = reinterpret_cast<iovec*>(p.fragment_array());
auto r = get_file_desc().writev(iov, p.nr_frags());
if (!r) {
return write_some(p);
}
if (size_t(*r) == p.len()) {
_s->speculate_epoll(EPOLLOUT);
}
return make_ready_future<size_t>(*r);
});
}
inline
future<> pollable_fd::write_all(net::packet& p) {
return write_some(p).then([this, &p] (size_t size) {
if (p.len() == size) {
return make_ready_future<>();
}
p.trim_front(size);
return write_all(p);
});
}
inline
future<pollable_fd, socket_address> pollable_fd::accept() {
return engine.accept(*_s);
}
inline
future<size_t> pollable_fd::recvmsg(struct msghdr *msg) {
return engine.readable(*_s).then([this, msg] {
auto r = get_file_desc().recvmsg(msg, 0);
if (!r) {
return recvmsg(msg);
}
// We always speculate here to optimize for throughput in a workload
// with multiple outstanding requests. This way the caller can consume
// all messages without resorting to epoll. However this adds extra
// recvmsg() call when we hit the empty queue condition, so it may
// hurt request-response workload in which the queue is empty when we
// initially enter recvmsg(). If that turns out to be a problem, we can
// improve speculation by using recvmmsg().
_s->speculate_epoll(EPOLLIN);
return make_ready_future<size_t>(*r);
});
};
inline
future<size_t> pollable_fd::sendmsg(struct msghdr* msg) {
return engine.writeable(*_s).then([this, msg] () mutable {
auto r = get_file_desc().sendmsg(msg, 0);
if (!r) {
return sendmsg(msg);
}
// For UDP this will always speculate. We can't know if there's room
// or not, but most of the time there should be so the cost of mis-
// speculation is amortized.
if (size_t(*r) == iovec_len(msg->msg_iov, msg->msg_iovlen)) {
_s->speculate_epoll(EPOLLOUT);
}
return make_ready_future<size_t>(*r);
});
}
inline
future<size_t> pollable_fd::sendto(socket_address addr, const void* buf, size_t len) {
return engine.writeable(*_s).then([this, buf, len, addr] () mutable {
auto r = get_file_desc().sendto(addr, buf, len, 0);
if (!r) {
return sendto(std::move(addr), buf, len);
}
// See the comment about speculation in sendmsg().
if (size_t(*r) == len) {
_s->speculate_epoll(EPOLLOUT);
}
return make_ready_future<size_t>(*r);
});
}
template <typename Clock>
inline
timer<Clock>::~timer() {
if (_queued) {
engine.del_timer(this);
}
}
template <typename Clock>
inline
void timer<Clock>::set_callback(callback_t&& callback) {
_callback = std::move(callback);
}
template <typename Clock>
inline
void timer<Clock>::arm(time_point until, boost::optional<duration> period) {
assert(!_armed);
_period = period;
_armed = true;
_expired = false;
_expiry = until;
engine.add_timer(this);
_queued = true;
}
template <typename Clock>
inline
void timer<Clock>::rearm(time_point until, boost::optional<duration> period) {
if (_armed) {
cancel();
}
arm(until, period);
}
template <typename Clock>
inline
void timer<Clock>::arm(duration delta) {
return arm(Clock::now() + delta);
}
template <typename Clock>
inline
void timer<Clock>::arm_periodic(duration delta) {
arm(Clock::now() + delta, {delta});
}
template <typename Clock>
inline
bool timer<Clock>::cancel() {
if (!_armed) {
return false;
}
_armed = false;
if (_queued) {
engine.del_timer(this);
_queued = false;
}
return true;
}
template <typename Clock>
inline
typename timer<Clock>::time_point timer<Clock>::get_timeout() {
return _expiry;
}
inline
input_stream<char>
connected_socket::input() {
return _csi->input();
}
inline
output_stream<char>
connected_socket::output() {
return _csi->output();
}
#endif /* REACTOR_HH_ */