diff --git a/apps/httpd/httpd.cc b/apps/httpd/httpd.cc index 9cfb3833ac..0d9b06782d 100644 --- a/apps/httpd/httpd.cc +++ b/apps/httpd/httpd.cc @@ -7,7 +7,7 @@ #include "core/sstring.hh" #include "core/app-template.hh" #include "core/circular_buffer.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/queue.hh" #include "core/future-util.hh" #include "core/scollectd.hh" diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index e998cd4832..39ec852ba5 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -16,7 +16,7 @@ #include "core/stream.hh" #include "core/memory.hh" #include "core/units.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/vector-data-sink.hh" #include "net/api.hh" #include "net/packet-data-source.hh" diff --git a/core/align.hh b/core/align.hh index a24c470bcd..1bfaf63859 100644 --- a/core/align.hh +++ b/core/align.hh @@ -9,26 +9,26 @@ #include template -inline +inline constexpr T align_up(T v, T align) { return (v + align - 1) & ~(align - 1); } template -inline +inline constexpr T* align_up(T* v, size_t align) { static_assert(sizeof(T) == 1, "align byte pointers only"); return reinterpret_cast(align_up(reinterpret_cast(v), align)); } template -inline +inline constexpr T align_down(T v, T align) { return v & ~(align - 1); } template -inline +inline constexpr T* align_down(T* v, size_t align) { static_assert(sizeof(T) == 1, "align byte pointers only"); return reinterpret_cast(align_down(reinterpret_cast(v), align)); diff --git a/core/smp.hh b/core/distributed.hh similarity index 99% rename from core/smp.hh rename to core/distributed.hh index 43f3732e46..15b1e22d61 100644 --- a/core/smp.hh +++ b/core/distributed.hh @@ -2,8 +2,8 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#ifndef SMP_HH_ -#define SMP_HH_ +#ifndef DISTRIBUTED_HH_ +#define DISTRIBUTED_HH_ #include "reactor.hh" #include "future-util.hh" @@ -226,4 +226,4 @@ foreign_ptr make_foreign(T ptr) { return foreign_ptr(std::move(ptr)); } -#endif /* SMP_HH_ */ +#endif /* DISTRIBUTED_HH_ */ diff --git a/core/file.hh b/core/file.hh index d24ffa9c57..27def5fca5 100644 --- a/core/file.hh +++ b/core/file.hh @@ -91,6 +91,7 @@ private: explicit file(int fd) : _file_impl(make_file_impl(fd)) {} public: file(file&& x) : _file_impl(std::move(x._file_impl)) {} + file& operator=(file&& x) noexcept = default; template future dma_read(uint64_t pos, CharType* buffer, size_t len) { return _file_impl->read_dma(pos, buffer, len); diff --git a/core/reactor.cc b/core/reactor.cc index b61640a1e2..9e3f836609 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -62,6 +62,20 @@ struct syscall_result { } }; +// Wrapper for a system call result containing the return value, +// an output parameter that was returned from the syscall, and errno. +template +struct syscall_result_extra { + int result; + Extra extra; + int error; + void throw_if_error() { + if (result == -1) { + throw std::system_error(error, std::system_category()); + } + } +}; + template syscall_result wrap_syscall(T result) { @@ -71,6 +85,12 @@ wrap_syscall(T result) { return sr; } +template +syscall_result_extra +wrap_syscall(int result, const Extra& extra) { + return {result, extra, errno}; +} + reactor_backend_epoll::reactor_backend_epoll() : _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) { } @@ -80,7 +100,8 @@ reactor::reactor() , _exit_future(_exit_promise.get_future()) , _cpu_started(0) , _io_context(0) - , _io_context_available(max_aio) { + , _io_context_available(max_aio) + , _reuseport(posix_reuseport_detect()) { auto r = ::io_setup(max_aio, &_io_context); assert(r >= 0); struct sigevent sev; @@ -165,12 +186,25 @@ reactor::posix_listen(socket_address sa, listen_options opts) { if (opts.reuse_address) { fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1); } + if (_reuseport) + fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); fd.bind(sa.u.sa, sizeof(sa.u.sas)); fd.listen(100); return pollable_fd(std::move(fd)); } +bool +reactor::posix_reuseport_detect() { + try { + file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + return true; + } catch(std::system_error& e) { + return false; + } +} + future reactor::posix_connect(socket_address sa) { file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); @@ -306,11 +340,13 @@ posix_file_impl::flush(void) { future posix_file_impl::stat(void) { - return engine._thread_pool.submit([this] { + return engine._thread_pool.submit>([this] { struct stat st; auto ret = ::fstat(_fd, &st); - throw_system_error_on(ret == -1); - return (st); + return wrap_syscall(ret, st); + }).then([] (syscall_result_extra ret) { + ret.throw_if_error(); + return make_ready_future(ret.extra); }); } @@ -338,21 +374,20 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) { future posix_file_impl::size(void) { - return engine._thread_pool.submit([this] { - struct stat st; - auto ret = ::fstat(_fd, &st); - throw_system_error_on(ret == -1); - return st.st_size; + return posix_file_impl::stat().then([] (struct stat&& st) { + return make_ready_future(st.st_size); }); } future blockdev_file_impl::size(void) { - return engine._thread_pool.submit([this] { + return engine._thread_pool.submit>([this] { size_t size; - auto ret = ::ioctl(_fd, BLKGETSIZE64, &size); - throw_system_error_on(ret == -1); - return size; + int ret = ::ioctl(_fd, BLKGETSIZE64, &size); + return wrap_syscall(ret, size); + }).then([] (syscall_result_extra ret) { + ret.throw_if_error(); + return make_ready_future(ret.extra); }); } @@ -393,16 +428,16 @@ posix_file_impl::list_directory(std::function (directory_entry de)> nex auto eofcond = [w] { return w->eof; }; return do_until(eofcond, [w, this] { if (w->current == w->total) { - return engine._thread_pool.submit([w , this] () { + return engine._thread_pool.submit>([w , this] () { auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast(w->buffer), sizeof(w->buffer)); - throw_system_error_on(ret == -1); - return ret; - }).then([w] (int ret) { - if (ret == 0) { + return wrap_syscall(ret); + }).then([w] (syscall_result ret) { + ret.throw_if_error(); + if (ret.result == 0) { w->eof = true; } else { w->current = 0; - w->total = ret; + w->total = ret.result; } }); } @@ -859,6 +894,8 @@ void smp_message_queue::move_pending() { _pending.push(begin, end); _tx.a.pending_fifo.erase(begin, end); _current_queue_length += nr; + _last_snt_batch = nr; + _sent += nr; } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { @@ -891,6 +928,8 @@ size_t smp_message_queue::process_completions() { } _current_queue_length -= nr; + _compl += nr; + _last_cmpl_batch = nr; return nr; } @@ -908,12 +947,57 @@ size_t smp_message_queue::process_incoming() { respond(wi); }); } + _received += nr; + _last_rcv_batch = nr; return nr; } -void smp_message_queue::start() { +void smp_message_queue::start(unsigned cpuid) { _tx.init(); - _complete_peer = &engine; + char instance[10]; + std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid); + _collectd_regs = { + // queue_length value:GAUGE:0:U + // Absolute value of num packets in last tx batch. + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "send-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_snt_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "receive-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_rcv_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "complete-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_cmpl_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "send-queue-length") + , scollectd::make_typed(scollectd::data_type::GAUGE, _current_queue_length) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "received-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _received) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "sent-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _sent) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "completed-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _compl) + ), + }; } /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ @@ -1060,19 +1144,13 @@ smp_message_queue** smp::_qs; std::thread::id smp::_tmain; unsigned smp::count = 1; -void smp::listen_all(smp_message_queue* qs) -{ - for (unsigned i = 0; i < smp::count; i++) { - qs[i]._pending_peer = &engine; - } -} - void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { - _qs[c][engine.cpu_id()].start(); + if (c != engine.cpu_id()) { + _qs[c][engine.cpu_id()].start(c); + } } - listen_all(_qs[engine.cpu_id()]); } #ifdef HAVE_DPDK diff --git a/core/reactor.hh b/core/reactor.hh index beb9285380..71415bb38b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -48,6 +48,8 @@ #include #endif +namespace scollectd { class registration; } + class reactor; class pollable_fd; class pollable_fd_state; @@ -388,9 +390,14 @@ class smp_message_queue { boost::lockfree::capacity>; lf_queue _pending; lf_queue _completed; + size_t _received = 0; + size_t _sent = 0; + size_t _compl = 0; size_t _current_queue_length = 0; - reactor* _pending_peer; - reactor* _complete_peer; + size_t _last_snt_batch = 0; + size_t _last_rcv_batch = 0; + size_t _last_cmpl_batch = 0; + std::vector _collectd_regs; struct work_item { virtual ~work_item() {} virtual future<> process() = 0; @@ -448,7 +455,7 @@ public: submit_item(wi); return fut; } - void start(); + void start(unsigned cpuid); size_t process_incoming(); size_t process_completions(); private: @@ -618,6 +625,7 @@ private: promise<> _lowres_timer_promise; promise<> _timer_promise; std::experimental::optional _epoll_poller; + const bool _reuseport; private: void abort_on_error(int ret); template @@ -645,6 +653,7 @@ private: thread_pool _thread_pool; void run_tasks(circular_buffer>& tasks, size_t task_quota); + bool posix_reuseport_detect(); public: static boost::program_options::options_description get_options_description(); reactor(); @@ -669,6 +678,8 @@ public: pollable_fd posix_listen(socket_address sa, listen_options opts = {}); + bool posix_reuseport_available() const { return _reuseport; } + future posix_connect(socket_address sa); future accept(pollable_fd_state& listen_fd); @@ -854,7 +865,6 @@ public: return got != 0; } private: - static void listen_all(smp_message_queue* qs); static void start_all_queues(); static void pin(unsigned cpu_id); public: diff --git a/core/scollectd.cc b/core/scollectd.cc index 7c4744c370..f369e182b1 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -15,14 +15,39 @@ #include #include "scollectd.hh" -#include "core/shared_ptr.hh" -#include "core/app-template.hh" #include "core/future-util.hh" -#include "core/shared_ptr.hh" +#include "net/api.hh" + +namespace std { +inline bool operator<(const scollectd::type_instance_id & id1, + const scollectd::type_instance_id & id2) { + return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), + id1.type_instance()) + < std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), + id2.type_instance()); +} +inline bool operator==(const scollectd::type_instance_id & id1, + const scollectd::type_instance_id & id2) { + return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), + id1.type_instance()) + == std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), + id2.type_instance()); +} +} + +namespace scollectd { using namespace std::chrono_literals; +using clock_type = std::chrono::high_resolution_clock; -class scollectd::impl { + +static const ipv4_addr default_addr("239.192.74.66:25826"); +static const clock_type::duration default_period(1s); +const plugin_instance_id per_cpu_plugin_instance("#cpu"); + +future<> send_metric(const type_instance_id &, const value_list &); + +class impl { net::udp_channel _chan; timer<> _timer; @@ -35,17 +60,15 @@ class scollectd::impl { double _avg = 0; public: - // Note: we use std::shared_ptr, not the C* one. This is because currently - // seastar sp does not handle polymorphism. And we use it. - typedef std::map > value_list_map; + typedef std::map > value_list_map; typedef value_list_map::value_type value_list_pair; void add_polled(const type_instance_id & id, - const std::shared_ptr & values) { + const shared_ptr & values) { _values.insert(std::make_pair(id, values)); } void remove_polled(const type_instance_id & id) { - _values.insert(std::make_pair(id, std::shared_ptr())); + _values.insert(std::make_pair(id, shared_ptr())); } // explicitly send a type_instance value list (outside polling) future<> send_metric(const type_instance_id & id, @@ -350,35 +373,31 @@ private: std::vector _regs; }; -const ipv4_addr scollectd::default_addr("239.192.74.66:25826"); -const clock_type::duration scollectd::default_period(1s); -const scollectd::plugin_instance_id scollectd::per_cpu_plugin_instance("#cpu"); - -scollectd::impl & scollectd::get_impl() { +impl & get_impl() { static thread_local impl per_cpu_instance; return per_cpu_instance; } -void scollectd::add_polled(const type_instance_id & id, - const std::shared_ptr & values) { +void add_polled(const type_instance_id & id, + const shared_ptr & values) { get_impl().add_polled(id, values); } -void scollectd::remove_polled_metric(const type_instance_id & id) { +void remove_polled_metric(const type_instance_id & id) { get_impl().remove_polled(id); } -future<> scollectd::send_notification(const type_instance_id & id, +future<> send_notification(const type_instance_id & id, const std::string & msg) { return get_impl().send_notification(id, msg); } -future<> scollectd::send_metric(const type_instance_id & id, +future<> send_metric(const type_instance_id & id, const value_list & values) { return get_impl().send_metric(id, values); } -void scollectd::configure(const boost::program_options::variables_map & opts) { +void configure(const boost::program_options::variables_map & opts) { bool enable = opts["collectd"].as(); if (!enable) { return; @@ -398,7 +417,7 @@ void scollectd::configure(const boost::program_options::variables_map & opts) { } } -boost::program_options::options_description scollectd::get_options_description() { +boost::program_options::options_description get_options_description() { namespace bpo = boost::program_options; bpo::options_description opts("COLLECTD options"); opts.add_options()("collectd", bpo::value()->default_value(true), @@ -412,3 +431,4 @@ boost::program_options::options_description scollectd::get_options_description() "collectd host name"); return opts; } +} diff --git a/core/scollectd.hh b/core/scollectd.hh index 88b27452e5..c127b43985 100644 --- a/core/scollectd.hh +++ b/core/scollectd.hh @@ -17,9 +17,12 @@ #include #include #include +#include +#include -#include "core/reactor.hh" +#include "future.hh" #include "net/byteorder.hh" +#include "core/shared_ptr.hh" /** * Implementation of rudimentary collectd data gathering. @@ -59,389 +62,360 @@ * */ -/* all-static. using a class instead of namespace to hide implementation templates */ -class scollectd { - class impl; -public: - scollectd() = delete; +namespace scollectd { - // The value binding data types - enum class data_type : uint8_t { - COUNTER, // unsigned int 64 - GAUGE, // double - DERIVE, // signed int 64 - ABSOLUTE, // unsigned int 64 - }; - - // don't use directly. use make_typed. - template - struct typed { - typed(data_type t, T && v) - : type(t), value(std::forward(v)) { - } - data_type type; - T value; - }; - - template - static inline typed make_typed(data_type type, T&& t) { - return typed(type, std::forward(t)); - } - - typedef std::string plugin_id; - typedef std::string plugin_instance_id; - typedef std::string type_id; - - class type_instance_id { - public: - type_instance_id() = default; - type_instance_id(const plugin_id & p, const plugin_instance_id & pi, - const type_id & t, const std::string & ti = std::string()) - : _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) { - } - type_instance_id(type_instance_id &&) = default; - type_instance_id(const type_instance_id &) = default; - - type_instance_id & operator=(type_instance_id &&) = default; - type_instance_id & operator=(const type_instance_id &) = default; - - const plugin_id & plugin() const { - return _plugin; - } - const plugin_instance_id & plugin_instance() const { - return _plugin_instance; - } - const type_id & type() const { - return _type; - } - const std::string & type_instance() const { - return _type_instance; - } - private: - plugin_id _plugin; - plugin_instance_id _plugin_instance; - type_id _type; - std::string _type_instance; - }; - - static const plugin_instance_id per_cpu_plugin_instance; - - static void configure(const boost::program_options::variables_map&); - static boost::program_options::options_description get_options_description(); - - /** - * Anchor for polled registration. - * Iff the registered type is in some way none-persistent, - * use this as receiver of the reg and ensure it dies before the - * added value(s). - * - * Use: - * uint64_t v = 0; - * registration r = add_polled_metric(v); - * ++r; - * - */ - struct registration { - registration() = default; - registration(const type_instance_id& id) - : _id(id) { - } - registration(type_instance_id&& id) - : _id(std::forward(id)) { - } - registration(const registration&) = default; - registration(registration&&) = default; - ~registration() { - unregister(); - } - registration & operator=(const registration&) = default; - registration & operator=(registration&&) = default; - - void unregister() { - remove_polled_metric(_id); - _id = type_instance_id(); - } - private: - type_instance_id _id; - }; - - typedef std::function notify_function; - - template - static type_instance_id add_polled_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return add_polled_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static future<> send_explicit_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return send_explicit_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static notify_function create_explicit_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return create_explicit_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static type_instance_id add_polled_metric(const type_instance_id & id, - _Args&& ... args) { - typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type; - add_polled(id, - std::make_shared( - make_type_instance(std::forward<_Args>(args)...))); - return id; - } - // "Explicit" metric sends. Sends a single value list as a message. - // Obviously not super efficient either. But maybe someone needs it sometime. - template - static future<> send_explicit_metric(const type_instance_id & id, - _Args&& ... args) { - return send_metric(id, make_type_instance(std::forward<_Args>(args)...)); - } - template - static notify_function create_explicit_metric(const type_instance_id & id, - _Args&& ... args) { - auto list = make_type_instance(std::forward<_Args>(args)...); - return [id, list=std::move(list)]() { - send_metric(id, list); - }; - } - - static void remove_polled_metric(const type_instance_id &); - - // Send a message packet (string) - static future<> send_notification(const type_instance_id & id, - const std::string & msg); - -private: - // lots of template junk to build typed value list tuples - // for registered values. - template - struct data_type_for; - - template - struct is_callable; - - template - struct is_callable::type>::value, - void>::type> : public std::true_type { - }; - - template - struct is_callable::value, void>::type> : public std::false_type { - }; - - template - struct data_type_for::value && std::is_unsigned::value, - void>::type> : public std::integral_constant { - }; - template - struct data_type_for::value && std::is_signed::value, void>::type> : public std::integral_constant< - data_type, data_type::DERIVE> { - }; - template - struct data_type_for::value, void>::type> : public std::integral_constant< - data_type, data_type::GAUGE> { - }; - template - struct data_type_for::value, void>::type> : public data_type_for< - typename std::result_of::type> { - }; - template - struct data_type_for> : public data_type_for { - }; - - template - class value { - public: - template - struct wrap { - wrap(const W & v) - : _v(v) { - } - const W & operator()() const { - return _v; - } - const W & _v; - }; - - typedef typename std::remove_reference::type value_type; - typedef typename std::conditional< - is_callable::type>::value, - value_type, wrap >::type stored_type; - - value(const value_type & t) - : value(data_type_for::value, t) { - } - value(data_type type, const value_type & t) - : _type(type), _t(t) { - } - uint64_t operator()() const { - auto v = _t(); - if (_type == data_type::GAUGE) { - return convert(double(v)); - } else { - uint64_t u = v; - return convert(u); - } - } - operator uint64_t() const { - return (*this)(); - } - operator data_type() const { - return _type; - } - data_type type() const { - return _type; - } - private: - // not super quick value -> protocol endian 64-bit values. - template - void bpack(_Iter s, _Iter e, uint64_t v) const { - while (s != e) { - *s++ = (v & 0xff); - v >>= 8; - } - } - template - typename std::enable_if::value, uint64_t>::type convert( - V v) const { - uint64_t i = v; - // network byte order - return ntohq(i); - } - template - typename std::enable_if::value, uint64_t>::type convert( - V t) const { - union { - uint64_t i; - double v; - } v; - union { - uint64_t i; - uint8_t b[8]; - } u; - v.v = t; - // intel byte order. could also obviously be faster. - // could be ignored if we just assume we're le (for now), - // but this is ok me thinks. - bpack(std::begin(u.b), std::end(u.b), v.i); - return u.i; - } - ; - - const data_type _type; - const stored_type _t; - }; - - template - class value> : public value { - public: - value(const typed & args) - : value(args.type, args.value) { - } - }; - - class value_list { - public: - virtual size_t size() const = 0; - - virtual void types(data_type *) const = 0; - virtual void values(net::packed *) const = 0; - - bool empty() const { - return size() == 0; - } - }; - - template - class values_impl: public value_list { - public: - static const size_t num_values = sizeof...(Args); - - values_impl(Args&& ...args) - : _values(std::forward(args)...) - {} - - values_impl(values_impl&& a) = default; - values_impl(const values_impl& a) = default; - - size_t size() const override { - return num_values; - } - void types(data_type * p) const override { - unpack(_values, [p](Args... args) { - const std::array tmp = { {args}...}; - std::copy(tmp.begin(), tmp.end(), p); - }); - } - void values(net::packed * p) const override { - unpack(_values, [p](Args... args) { - std::array tmp = { {args}...}; - std::copy(tmp.begin(), tmp.end(), p); - }); - } - private: - template - void unpack(const std::tuple& t, _Op&& op) const { - do_unpack(t, std::index_sequence_for {}, std::forward<_Op>(op)); - } - - template - void do_unpack(const std::tuple& t, const std::index_sequence &, _Op&& op) const { - op(std::get(t)...); - } - - std::tuple < Args... > _values; - }; - - template - static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... > - { - return values_impl(std::forward<_Args>(args)))... > - (value<_Args>(std::forward<_Args>(args))...); - } - - static const ipv4_addr default_addr; - static const clock_type::duration default_period; - - static void add_polled(const type_instance_id &, const std::shared_ptr &); - static future<> send_metric(const type_instance_id &, const value_list &); - static impl & get_impl(); +// The value binding data types +enum class data_type : uint8_t { + COUNTER, // unsigned int 64 + GAUGE, // double + DERIVE, // signed int 64 + ABSOLUTE, // unsigned int 64 }; -inline bool operator<(const scollectd::type_instance_id & id1, - const scollectd::type_instance_id & id2) { - return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), - id1.type_instance()) - < std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), - id2.type_instance()); -} -inline bool operator==(const scollectd::type_instance_id & id1, - const scollectd::type_instance_id & id2) { - return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), - id1.type_instance()) - == std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), - id2.type_instance()); +// don't use directly. use make_typed. +template +struct typed { + typed(data_type t, T && v) + : type(t), value(std::forward(v)) { + } + data_type type; + T value; +}; + +template +static inline typed make_typed(data_type type, T&& t) { + return typed(type, std::forward(t)); } +typedef std::string plugin_id; +typedef std::string plugin_instance_id; +typedef std::string type_id; + +class type_instance_id { +public: + type_instance_id() = default; + type_instance_id(const plugin_id & p, const plugin_instance_id & pi, + const type_id & t, const std::string & ti = std::string()) + : _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) { + } + type_instance_id(type_instance_id &&) = default; + type_instance_id(const type_instance_id &) = default; + + type_instance_id & operator=(type_instance_id &&) = default; + type_instance_id & operator=(const type_instance_id &) = default; + + const plugin_id & plugin() const { + return _plugin; + } + const plugin_instance_id & plugin_instance() const { + return _plugin_instance; + } + const type_id & type() const { + return _type; + } + const std::string & type_instance() const { + return _type_instance; + } +private: + plugin_id _plugin; + plugin_instance_id _plugin_instance; + type_id _type; + std::string _type_instance; +}; + +extern const plugin_instance_id per_cpu_plugin_instance; + +void configure(const boost::program_options::variables_map&); +boost::program_options::options_description get_options_description(); +void remove_polled_metric(const type_instance_id &); + +/** + * Anchor for polled registration. + * Iff the registered type is in some way none-persistent, + * use this as receiver of the reg and ensure it dies before the + * added value(s). + * + * Use: + * uint64_t v = 0; + * registration r = add_polled_metric(v); + * ++r; + * + */ +struct registration { + registration() = default; + registration(const type_instance_id& id) + : _id(id) { + } + registration(type_instance_id&& id) + : _id(std::move(id)) { + } + registration(const registration&) = default; + registration(registration&&) = default; + ~registration() { + unregister(); + } + registration & operator=(const registration&) = default; + registration & operator=(registration&&) = default; + + void unregister() { + remove_polled_metric(_id); + _id = type_instance_id(); + } +private: + type_instance_id _id; +}; + +// lots of template junk to build typed value list tuples +// for registered values. +template +struct data_type_for; + +template +struct is_callable; + +template +struct is_callable::type>::value, +void>::type> : public std::true_type { +}; + +template +struct is_callable::value, void>::type> : public std::false_type { +}; + +template +struct data_type_for::value && std::is_unsigned::value, +void>::type> : public std::integral_constant { +}; +template +struct data_type_for::value && std::is_signed::value, void>::type> : public std::integral_constant< +data_type, data_type::DERIVE> { +}; +template +struct data_type_for::value, void>::type> : public std::integral_constant< +data_type, data_type::GAUGE> { +}; +template +struct data_type_for::value, void>::type> : public data_type_for< +typename std::result_of::type> { +}; +template +struct data_type_for> : public data_type_for { +}; + +template +class value { +public: + template + struct wrap { + wrap(const W & v) + : _v(v) { + } + const W & operator()() const { + return _v; + } + const W & _v; + }; + + typedef typename std::remove_reference::type value_type; + typedef typename std::conditional< + is_callable::type>::value, + value_type, wrap >::type stored_type; + + value(const value_type & t) + : value(data_type_for::value, t) { + } + value(data_type type, const value_type & t) + : _type(type), _t(t) { + } + uint64_t operator()() const { + auto v = _t(); + if (_type == data_type::GAUGE) { + return convert(double(v)); + } else { + uint64_t u = v; + return convert(u); + } + } + operator uint64_t() const { + return (*this)(); + } + operator data_type() const { + return _type; + } + data_type type() const { + return _type; + } +private: + // not super quick value -> protocol endian 64-bit values. + template + void bpack(_Iter s, _Iter e, uint64_t v) const { + while (s != e) { + *s++ = (v & 0xff); + v >>= 8; + } + } + template + typename std::enable_if::value, uint64_t>::type convert( + V v) const { + uint64_t i = v; + // network byte order + return ntohq(i); + } + template + typename std::enable_if::value, uint64_t>::type convert( + V t) const { + union { + uint64_t i; + double v; + } v; + union { + uint64_t i; + uint8_t b[8]; + } u; + v.v = t; + // intel byte order. could also obviously be faster. + // could be ignored if we just assume we're le (for now), + // but this is ok me thinks. + bpack(std::begin(u.b), std::end(u.b), v.i); + return u.i; + } + ; + + const data_type _type; + const stored_type _t; +}; + +template +class value> : public value { +public: + value(const typed & args) +: value(args.type, args.value) { + } +}; + +class value_list { +public: + virtual size_t size() const = 0; + + virtual void types(data_type *) const = 0; + virtual void values(net::packed *) const = 0; + + bool empty() const { + return size() == 0; + } +}; + +template +class values_impl: public value_list { +public: + static const size_t num_values = sizeof...(Args); + + values_impl(Args&& ...args) + : _values(std::forward(args)...) + {} + + values_impl(values_impl&& a) = default; + values_impl(const values_impl& a) = default; + + size_t size() const override { + return num_values; + } + void types(data_type * p) const override { + unpack(_values, [p](Args... args) { + const std::array tmp = { {args}...}; + std::copy(tmp.begin(), tmp.end(), p); + }); + } + void values(net::packed * p) const override { + unpack(_values, [p](Args... args) { + std::array tmp = { {args}...}; + std::copy(tmp.begin(), tmp.end(), p); + }); + } +private: + template + void unpack(const std::tuple& t, _Op&& op) const { + do_unpack(t, std::index_sequence_for {}, std::forward<_Op>(op)); + } + + template + void do_unpack(const std::tuple& t, const std::index_sequence &, _Op&& op) const { + op(std::get(t)...); + } + + std::tuple < Args... > _values; +}; + +void add_polled(const type_instance_id &, const shared_ptr &); + +typedef std::function notify_function; +template +static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... > +{ + return values_impl(std::forward<_Args>(args)))... > + (value<_Args>(std::forward<_Args>(args))...); +} +template +static type_instance_id add_polled_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return add_polled_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static future<> send_explicit_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return send_explicit_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static notify_function create_explicit_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return create_explicit_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static type_instance_id add_polled_metric(const type_instance_id & id, + _Args&& ... args) { + typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type; + add_polled(id, + ::make_shared( + make_type_instance(std::forward<_Args>(args)...))); + return id; +} +// "Explicit" metric sends. Sends a single value list as a message. +// Obviously not super efficient either. But maybe someone needs it sometime. +template +static future<> send_explicit_metric(const type_instance_id & id, + _Args&& ... args) { + return send_metric(id, make_type_instance(std::forward<_Args>(args)...)); +} +template +static notify_function create_explicit_metric(const type_instance_id & id, + _Args&& ... args) { + auto list = make_type_instance(std::forward<_Args>(args)...); + return [id, list=std::move(list)]() { + send_metric(id, list); + }; +} + +// Send a message packet (string) +future<> send_notification(const type_instance_id & id, const std::string & msg); +}; + #endif /* SCOLLECTD_HH_ */ diff --git a/core/sstring.hh b/core/sstring.hh index 60cee6ebe5..d7990d2794 100644 --- a/core/sstring.hh +++ b/core/sstring.hh @@ -212,6 +212,18 @@ public: } }; +template +inline +basic_sstring +operator+(const char(&s)[N], const basic_sstring& t) { + using sstring = basic_sstring; + // don't copy the terminating NUL character + sstring ret(typename sstring::initialized_later(), N-1 + t.size()); + auto p = std::copy(std::begin(s), std::end(s)-1, ret.begin()); + std::copy(t.begin(), t.end(), p); + return ret; +} + template static inline size_t str_len(const char(&s)[N]) { return N - 1; } diff --git a/main.cc b/main.cc index 3ee7f28140..07f84d4568 100644 --- a/main.cc +++ b/main.cc @@ -5,7 +5,7 @@ #include "database.hh" #include "core/app-template.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include "thrift/server.hh" namespace bpo = boost::program_options; diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 371773bc7b..3908f3672a 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -18,6 +18,7 @@ public: virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } friend class posix_server_socket_impl; friend class posix_ap_server_socket_impl; + friend class posix_reuseport_server_socket_impl; friend class posix_network_stack; friend class posix_ap_network_stack; }; @@ -55,6 +56,15 @@ future posix_ap_server_socket_impl::accept() { } } +future +posix_reuseport_server_socket_impl::accept() { + return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) { + std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); + return make_ready_future( + connected_socket(std::move(csi)), sa); + }); +} + void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) { auto i = sockets.find(sa.as_posix_sockaddr_in()); if (i != sockets.end()) { @@ -115,7 +125,10 @@ posix_data_sink_impl::put(packet p) { server_socket posix_network_stack::listen(socket_address sa, listen_options opt) { - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + if (_reuseport) + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + else + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); } future @@ -131,7 +144,10 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl: server_socket posix_ap_network_stack::listen(socket_address sa, listen_options opt) { - return server_socket(std::make_unique(sa)); + if (_reuseport) + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + else + return server_socket(std::make_unique(sa)); } future diff --git a/net/posix-stack.hh b/net/posix-stack.hh index 255c181719..01439d3c81 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -59,21 +59,33 @@ public: virtual future accept(); }; -class posix_network_stack : public network_stack { +class posix_reuseport_server_socket_impl : public server_socket_impl { + socket_address _sa; + pollable_fd _lfd; public: - posix_network_stack(boost::program_options::variables_map opts) {} + explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} + virtual future accept(); +}; + +class posix_network_stack : public network_stack { +private: + const bool _reuseport; +public: + explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; virtual net::udp_channel make_udp_channel(ipv4_addr addr) override; static future> create(boost::program_options::variables_map opts) { return make_ready_future>(std::unique_ptr(new posix_network_stack(opts))); } - virtual bool has_per_core_namespace() override { return false; }; + virtual bool has_per_core_namespace() override { return _reuseport; }; }; class posix_ap_network_stack : public posix_network_stack { +private: + const bool _reuseport; public: - posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)) {} + posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; static future> create(boost::program_options::variables_map opts) { diff --git a/tests/sstring_test.cc b/tests/sstring_test.cc index 9c5e60fcdd..db942e9eb2 100644 --- a/tests/sstring_test.cc +++ b/tests/sstring_test.cc @@ -15,3 +15,7 @@ BOOST_AUTO_TEST_CASE(test_equality) { BOOST_AUTO_TEST_CASE(test_to_sstring) { BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567")); } + +BOOST_AUTO_TEST_CASE(test_add_literal_to_sstring) { + BOOST_REQUIRE_EQUAL("x" + sstring("y"), sstring("xy")); +} diff --git a/tests/tcp_server.cc b/tests/tcp_server.cc index 30947010b7..f226061cc6 100644 --- a/tests/tcp_server.cc +++ b/tests/tcp_server.cc @@ -5,7 +5,7 @@ #include "core/reactor.hh" #include "core/app-template.hh" #include "core/temporary_buffer.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include #include diff --git a/tests/udp_server.cc b/tests/udp_server.cc index f18501717f..05a194ea8c 100644 --- a/tests/udp_server.cc +++ b/tests/udp_server.cc @@ -2,7 +2,7 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/app-template.hh" #include "core/future-util.hh"