From 16c841452349caa00831f282d8505f54b15db9c8 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 13 Jan 2015 10:12:36 +0200 Subject: [PATCH 01/16] sstring: add literal + sstring concatentation overload --- core/sstring.hh | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/sstring.hh b/core/sstring.hh index 60cee6ebe5..d7990d2794 100644 --- a/core/sstring.hh +++ b/core/sstring.hh @@ -212,6 +212,18 @@ public: } }; +template +inline +basic_sstring +operator+(const char(&s)[N], const basic_sstring& t) { + using sstring = basic_sstring; + // don't copy the terminating NUL character + sstring ret(typename sstring::initialized_later(), N-1 + t.size()); + auto p = std::copy(std::begin(s), std::end(s)-1, ret.begin()); + std::copy(t.begin(), t.end(), p); + return ret; +} + template static inline size_t str_len(const char(&s)[N]) { return N - 1; } From 4c46f4db8c7334ac6802a9c1ca1005e66f3538a8 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 13 Jan 2015 10:13:05 +0200 Subject: [PATCH 02/16] tests: test literal + sstring concatenation --- tests/sstring_test.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/sstring_test.cc b/tests/sstring_test.cc index 9c5e60fcdd..db942e9eb2 100644 --- a/tests/sstring_test.cc +++ b/tests/sstring_test.cc @@ -15,3 +15,7 @@ BOOST_AUTO_TEST_CASE(test_equality) { BOOST_AUTO_TEST_CASE(test_to_sstring) { BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567")); } + +BOOST_AUTO_TEST_CASE(test_add_literal_to_sstring) { + BOOST_REQUIRE_EQUAL("x" + sstring("y"), sstring("xy")); +} From fde6c412c266122429fc0b7b6a39c51d35f2cf24 Mon Sep 17 00:00:00 2001 From: Takuya ASADA Date: Mon, 12 Jan 2015 11:46:58 +0900 Subject: [PATCH 03/16] reactor: Add SO_REUSEPORT availability check method Signed-off-by: Takuya ASADA --- core/reactor.cc | 16 +++++++++++++++- core/reactor.hh | 4 ++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/reactor.cc b/core/reactor.cc index b61640a1e2..56e6c7fe75 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -80,7 +80,8 @@ reactor::reactor() , _exit_future(_exit_promise.get_future()) , _cpu_started(0) , _io_context(0) - , _io_context_available(max_aio) { + , _io_context_available(max_aio) + , _reuseport(posix_reuseport_detect()) { auto r = ::io_setup(max_aio, &_io_context); assert(r >= 0); struct sigevent sev; @@ -165,12 +166,25 @@ reactor::posix_listen(socket_address sa, listen_options opts) { if (opts.reuse_address) { fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1); } + if (_reuseport) + fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); fd.bind(sa.u.sa, sizeof(sa.u.sas)); fd.listen(100); return pollable_fd(std::move(fd)); } +bool +reactor::posix_reuseport_detect() { + try { + file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + return true; + } catch(std::system_error& e) { + return false; + } +} + future reactor::posix_connect(socket_address sa) { file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); diff --git a/core/reactor.hh b/core/reactor.hh index beb9285380..bee0025bec 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -618,6 +618,7 @@ private: promise<> _lowres_timer_promise; promise<> _timer_promise; std::experimental::optional _epoll_poller; + const bool _reuseport; private: void abort_on_error(int ret); template @@ -645,6 +646,7 @@ private: thread_pool _thread_pool; void run_tasks(circular_buffer>& tasks, size_t task_quota); + bool posix_reuseport_detect(); public: static boost::program_options::options_description get_options_description(); reactor(); @@ -669,6 +671,8 @@ public: pollable_fd posix_listen(socket_address sa, listen_options opts = {}); + bool posix_reuseport_available() const { return _reuseport; } + future posix_connect(socket_address sa); future accept(pollable_fd_state& listen_fd); From 16705be1f4bc250092fe315588bc1954c2b02389 Mon Sep 17 00:00:00 2001 From: Takuya ASADA Date: Mon, 12 Jan 2015 03:34:13 +0900 Subject: [PATCH 04/16] Distribute incomming connection by kernel using SO_REUSEPORT With SO_REUSEPORT, we can bind() & accept() on each thread, kernel will dispatch incomming connections. Signed-off-by: Takuya ASADA --- net/posix-stack.cc | 20 ++++++++++++++++++-- net/posix-stack.hh | 20 ++++++++++++++++---- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 371773bc7b..3908f3672a 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -18,6 +18,7 @@ public: virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } friend class posix_server_socket_impl; friend class posix_ap_server_socket_impl; + friend class posix_reuseport_server_socket_impl; friend class posix_network_stack; friend class posix_ap_network_stack; }; @@ -55,6 +56,15 @@ future posix_ap_server_socket_impl::accept() { } } +future +posix_reuseport_server_socket_impl::accept() { + return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) { + std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); + return make_ready_future( + connected_socket(std::move(csi)), sa); + }); +} + void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) { auto i = sockets.find(sa.as_posix_sockaddr_in()); if (i != sockets.end()) { @@ -115,7 +125,10 @@ posix_data_sink_impl::put(packet p) { server_socket posix_network_stack::listen(socket_address sa, listen_options opt) { - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + if (_reuseport) + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + else + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); } future @@ -131,7 +144,10 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl: server_socket posix_ap_network_stack::listen(socket_address sa, listen_options opt) { - return server_socket(std::make_unique(sa)); + if (_reuseport) + return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + else + return server_socket(std::make_unique(sa)); } future diff --git a/net/posix-stack.hh b/net/posix-stack.hh index 255c181719..01439d3c81 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -59,21 +59,33 @@ public: virtual future accept(); }; -class posix_network_stack : public network_stack { +class posix_reuseport_server_socket_impl : public server_socket_impl { + socket_address _sa; + pollable_fd _lfd; public: - posix_network_stack(boost::program_options::variables_map opts) {} + explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} + virtual future accept(); +}; + +class posix_network_stack : public network_stack { +private: + const bool _reuseport; +public: + explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; virtual net::udp_channel make_udp_channel(ipv4_addr addr) override; static future> create(boost::program_options::variables_map opts) { return make_ready_future>(std::unique_ptr(new posix_network_stack(opts))); } - virtual bool has_per_core_namespace() override { return false; }; + virtual bool has_per_core_namespace() override { return _reuseport; }; }; class posix_ap_network_stack : public posix_network_stack { +private: + const bool _reuseport; public: - posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)) {} + posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; static future> create(boost::program_options::variables_map opts) { From d4db9493aef2e85091fffa456f90493a11fa18ed Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 13 Jan 2015 15:13:36 +0200 Subject: [PATCH 05/16] core: remove unused smp code --- core/reactor.cc | 9 --------- core/reactor.hh | 3 --- 2 files changed, 12 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index b61640a1e2..450fbc2b80 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -913,7 +913,6 @@ size_t smp_message_queue::process_incoming() { void smp_message_queue::start() { _tx.init(); - _complete_peer = &engine; } /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ @@ -1060,19 +1059,11 @@ smp_message_queue** smp::_qs; std::thread::id smp::_tmain; unsigned smp::count = 1; -void smp::listen_all(smp_message_queue* qs) -{ - for (unsigned i = 0; i < smp::count; i++) { - qs[i]._pending_peer = &engine; - } -} - void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { _qs[c][engine.cpu_id()].start(); } - listen_all(_qs[engine.cpu_id()]); } #ifdef HAVE_DPDK diff --git a/core/reactor.hh b/core/reactor.hh index beb9285380..11e60edffd 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -389,8 +389,6 @@ class smp_message_queue { lf_queue _pending; lf_queue _completed; size_t _current_queue_length = 0; - reactor* _pending_peer; - reactor* _complete_peer; struct work_item { virtual ~work_item() {} virtual future<> process() = 0; @@ -854,7 +852,6 @@ public: return got != 0; } private: - static void listen_all(smp_message_queue* qs); static void start_all_queues(); static void pin(unsigned cpu_id); public: From 2b4a309ad6fc93473594dc842a4dbafb5ee679f3 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 13 Jan 2015 15:13:37 +0200 Subject: [PATCH 06/16] core: do not start smp queue to self --- core/reactor.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/reactor.cc b/core/reactor.cc index 450fbc2b80..badf500c17 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1062,7 +1062,9 @@ unsigned smp::count = 1; void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { - _qs[c][engine.cpu_id()].start(); + if (c != engine.cpu_id()) { + _qs[c][engine.cpu_id()].start(); + } } } From 98c9a7f52f94e3324e5121de2c96e6ff944c829a Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 13 Jan 2015 15:13:38 +0200 Subject: [PATCH 07/16] core: make sconnectd.hh include file list more reasonable Currently it include the whole reactor.hh, but it needs only a small part of what it brings. --- core/scollectd.hh | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/scollectd.hh b/core/scollectd.hh index 88b27452e5..6856120464 100644 --- a/core/scollectd.hh +++ b/core/scollectd.hh @@ -17,9 +17,14 @@ #include #include #include +#include +#include -#include "core/reactor.hh" +#include "future.hh" #include "net/byteorder.hh" +#include "net/api.hh" + +using clock_type = std::chrono::high_resolution_clock; /** * Implementation of rudimentary collectd data gathering. From 759681903432a857d5c71e63bdd1a414fbfe465e Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 13 Jan 2015 19:08:48 +0200 Subject: [PATCH 08/16] file: define move assignment operator --- core/file.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/core/file.hh b/core/file.hh index d24ffa9c57..27def5fca5 100644 --- a/core/file.hh +++ b/core/file.hh @@ -91,6 +91,7 @@ private: explicit file(int fd) : _file_impl(make_file_impl(fd)) {} public: file(file&& x) : _file_impl(std::move(x._file_impl)) {} + file& operator=(file&& x) noexcept = default; template future dma_read(uint64_t pos, CharType* buffer, size_t len) { return _file_impl->read_dma(pos, buffer, len); From 58f614d858ea778dd27119b149fdd1fb9c5423fb Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 13 Jan 2015 17:26:49 +0100 Subject: [PATCH 09/16] Make align methods constexpr for usage in statics/constexprs --- core/align.hh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/align.hh b/core/align.hh index a24c470bcd..1bfaf63859 100644 --- a/core/align.hh +++ b/core/align.hh @@ -9,26 +9,26 @@ #include template -inline +inline constexpr T align_up(T v, T align) { return (v + align - 1) & ~(align - 1); } template -inline +inline constexpr T* align_up(T* v, size_t align) { static_assert(sizeof(T) == 1, "align byte pointers only"); return reinterpret_cast(align_up(reinterpret_cast(v), align)); } template -inline +inline constexpr T align_down(T v, T align) { return v & ~(align - 1); } template -inline +inline constexpr T* align_down(T* v, size_t align) { static_assert(sizeof(T) == 1, "align byte pointers only"); return reinterpret_cast(align_down(reinterpret_cast(v), align)); From e1552ad3b6fbab5b824324d50fad5c1fba764ed4 Mon Sep 17 00:00:00 2001 From: Takuya ASADA Date: Wed, 14 Jan 2015 18:13:54 +0900 Subject: [PATCH 10/16] core: rename smp.hh to distributed.hh Signed-off-by: Takuya ASADA --- apps/httpd/httpd.cc | 2 +- apps/memcached/memcache.cc | 2 +- core/{smp.hh => distributed.hh} | 6 +++--- tests/tcp_server.cc | 2 +- tests/udp_server.cc | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) rename core/{smp.hh => distributed.hh} (99%) diff --git a/apps/httpd/httpd.cc b/apps/httpd/httpd.cc index 9cfb3833ac..0d9b06782d 100644 --- a/apps/httpd/httpd.cc +++ b/apps/httpd/httpd.cc @@ -7,7 +7,7 @@ #include "core/sstring.hh" #include "core/app-template.hh" #include "core/circular_buffer.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/queue.hh" #include "core/future-util.hh" #include "core/scollectd.hh" diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index e998cd4832..39ec852ba5 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -16,7 +16,7 @@ #include "core/stream.hh" #include "core/memory.hh" #include "core/units.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/vector-data-sink.hh" #include "net/api.hh" #include "net/packet-data-source.hh" diff --git a/core/smp.hh b/core/distributed.hh similarity index 99% rename from core/smp.hh rename to core/distributed.hh index 43f3732e46..15b1e22d61 100644 --- a/core/smp.hh +++ b/core/distributed.hh @@ -2,8 +2,8 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#ifndef SMP_HH_ -#define SMP_HH_ +#ifndef DISTRIBUTED_HH_ +#define DISTRIBUTED_HH_ #include "reactor.hh" #include "future-util.hh" @@ -226,4 +226,4 @@ foreign_ptr make_foreign(T ptr) { return foreign_ptr(std::move(ptr)); } -#endif /* SMP_HH_ */ +#endif /* DISTRIBUTED_HH_ */ diff --git a/tests/tcp_server.cc b/tests/tcp_server.cc index 30947010b7..f226061cc6 100644 --- a/tests/tcp_server.cc +++ b/tests/tcp_server.cc @@ -5,7 +5,7 @@ #include "core/reactor.hh" #include "core/app-template.hh" #include "core/temporary_buffer.hh" -#include "core/smp.hh" +#include "core/distributed.hh" #include #include diff --git a/tests/udp_server.cc b/tests/udp_server.cc index f18501717f..05a194ea8c 100644 --- a/tests/udp_server.cc +++ b/tests/udp_server.cc @@ -2,7 +2,7 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#include "core/smp.hh" +#include "core/distributed.hh" #include "core/app-template.hh" #include "core/future-util.hh" From ceeab9a33008f9780d5350ff7bdb914a7da77818 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Jan 2015 14:50:14 +0200 Subject: [PATCH 11/16] replace ad-hoc index list generator with standard one --- core/apply.hh | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/core/apply.hh b/core/apply.hh index 5e99a4043f..26e45d913c 100644 --- a/core/apply.hh +++ b/core/apply.hh @@ -6,32 +6,13 @@ #define APPLY_HH_ #include - -template -struct index_list { -}; - -template -struct index_list_helper; - -template -struct index_list_helper> { - using type = index_list; -}; - -template -struct index_list_helper> { - using type = typename index_list_helper>::type; -}; - -template -using make_index_list = typename index_list_helper<0, N, index_list<>>::type; +#include template struct apply_helper; template -struct apply_helper, index_list> { +struct apply_helper, std::index_sequence> { static auto apply(Func func, std::tuple args) { return func(std::get(std::move(args))...); } @@ -39,7 +20,7 @@ struct apply_helper, index_list> { template auto apply(Func func, std::tuple&& args) { - using helper = apply_helper, make_index_list>; + using helper = apply_helper, std::index_sequence_for>; return helper::apply(std::move(func), std::move(args)); } From 1ecc67885709b81f3e09c9f0bbc1f7515fb8ae24 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 14 Jan 2015 16:47:12 +0200 Subject: [PATCH 12/16] reactor: fix potential allocations in syscall thread pool Throwing an exception may allocate memory, which we don't want to do in the syscall thread pool (only reactor threads support allocation, for now). Move the conversion of negative syscall results to exceptions to a separate task, running in the reactor thread, to avoid this. --- core/reactor.cc | 57 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 2e3ca5bed1..b62990ac16 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -62,6 +62,20 @@ struct syscall_result { } }; +// Wrapper for a system call result containing the return value, +// an output parameter that was returned from the syscall, and errno. +template +struct syscall_result_extra { + int result; + Extra extra; + int error; + void throw_if_error() { + if (result == -1) { + throw std::system_error(error, std::system_category()); + } + } +}; + template syscall_result wrap_syscall(T result) { @@ -71,6 +85,12 @@ wrap_syscall(T result) { return sr; } +template +syscall_result_extra +wrap_syscall(int result, const Extra& extra) { + return {result, extra, errno}; +} + reactor_backend_epoll::reactor_backend_epoll() : _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) { } @@ -320,11 +340,13 @@ posix_file_impl::flush(void) { future posix_file_impl::stat(void) { - return engine._thread_pool.submit([this] { + return engine._thread_pool.submit>([this] { struct stat st; auto ret = ::fstat(_fd, &st); - throw_system_error_on(ret == -1); - return (st); + return wrap_syscall(ret, st); + }).then([] (syscall_result_extra ret) { + ret.throw_if_error(); + return make_ready_future(ret.extra); }); } @@ -352,21 +374,20 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) { future posix_file_impl::size(void) { - return engine._thread_pool.submit([this] { - struct stat st; - auto ret = ::fstat(_fd, &st); - throw_system_error_on(ret == -1); - return st.st_size; + return posix_file_impl::stat().then([] (struct stat&& st) { + return make_ready_future(st.st_size); }); } future blockdev_file_impl::size(void) { - return engine._thread_pool.submit([this] { + return engine._thread_pool.submit>([this] { size_t size; - auto ret = ::ioctl(_fd, BLKGETSIZE64, &size); - throw_system_error_on(ret == -1); - return size; + int ret = ::ioctl(_fd, BLKGETSIZE64, &size); + return wrap_syscall(ret, size); + }).then([] (syscall_result_extra ret) { + ret.throw_if_error(); + return make_ready_future(ret.extra); }); } @@ -407,16 +428,16 @@ posix_file_impl::list_directory(std::function (directory_entry de)> nex auto eofcond = [w] { return w->eof; }; return do_until(eofcond, [w, this] { if (w->current == w->total) { - return engine._thread_pool.submit([w , this] () { + return engine._thread_pool.submit>([w , this] () { auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast(w->buffer), sizeof(w->buffer)); - throw_system_error_on(ret == -1); - return ret; - }).then([w] (int ret) { - if (ret == 0) { + return wrap_syscall(ret); + }).then([w] (syscall_result ret) { + ret.throw_if_error(); + if (ret.result == 0) { w->eof = true; } else { w->current = 0; - w->total = ret; + w->total = ret.result; } }); } From 660434c3a0eef61d28a732854e88965ccc15d7d9 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Jan 2015 15:36:29 +0200 Subject: [PATCH 13/16] core: change collectd to be a namespace instead of class Class was used because it provides better encapsulation, but namespace is easier to deal with. --- core/scollectd.cc | 52 +++- core/scollectd.hh | 734 ++++++++++++++++++++++------------------------ 2 files changed, 388 insertions(+), 398 deletions(-) diff --git a/core/scollectd.cc b/core/scollectd.cc index 7c4744c370..9f889123a5 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -15,14 +15,39 @@ #include #include "scollectd.hh" -#include "core/shared_ptr.hh" -#include "core/app-template.hh" #include "core/future-util.hh" -#include "core/shared_ptr.hh" +#include "net/api.hh" + +namespace std { +inline bool operator<(const scollectd::type_instance_id & id1, + const scollectd::type_instance_id & id2) { + return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), + id1.type_instance()) + < std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), + id2.type_instance()); +} +inline bool operator==(const scollectd::type_instance_id & id1, + const scollectd::type_instance_id & id2) { + return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), + id1.type_instance()) + == std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), + id2.type_instance()); +} +} + +namespace scollectd { using namespace std::chrono_literals; +using clock_type = std::chrono::high_resolution_clock; -class scollectd::impl { + +static const ipv4_addr default_addr("239.192.74.66:25826"); +static const clock_type::duration default_period(1s); +const plugin_instance_id per_cpu_plugin_instance("#cpu"); + +future<> send_metric(const type_instance_id &, const value_list &); + +class impl { net::udp_channel _chan; timer<> _timer; @@ -350,35 +375,31 @@ private: std::vector _regs; }; -const ipv4_addr scollectd::default_addr("239.192.74.66:25826"); -const clock_type::duration scollectd::default_period(1s); -const scollectd::plugin_instance_id scollectd::per_cpu_plugin_instance("#cpu"); - -scollectd::impl & scollectd::get_impl() { +impl & get_impl() { static thread_local impl per_cpu_instance; return per_cpu_instance; } -void scollectd::add_polled(const type_instance_id & id, +void add_polled(const type_instance_id & id, const std::shared_ptr & values) { get_impl().add_polled(id, values); } -void scollectd::remove_polled_metric(const type_instance_id & id) { +void remove_polled_metric(const type_instance_id & id) { get_impl().remove_polled(id); } -future<> scollectd::send_notification(const type_instance_id & id, +future<> send_notification(const type_instance_id & id, const std::string & msg) { return get_impl().send_notification(id, msg); } -future<> scollectd::send_metric(const type_instance_id & id, +future<> send_metric(const type_instance_id & id, const value_list & values) { return get_impl().send_metric(id, values); } -void scollectd::configure(const boost::program_options::variables_map & opts) { +void configure(const boost::program_options::variables_map & opts) { bool enable = opts["collectd"].as(); if (!enable) { return; @@ -398,7 +419,7 @@ void scollectd::configure(const boost::program_options::variables_map & opts) { } } -boost::program_options::options_description scollectd::get_options_description() { +boost::program_options::options_description get_options_description() { namespace bpo = boost::program_options; bpo::options_description opts("COLLECTD options"); opts.add_options()("collectd", bpo::value()->default_value(true), @@ -412,3 +433,4 @@ boost::program_options::options_description scollectd::get_options_description() "collectd host name"); return opts; } +} diff --git a/core/scollectd.hh b/core/scollectd.hh index 6856120464..c9442505eb 100644 --- a/core/scollectd.hh +++ b/core/scollectd.hh @@ -22,9 +22,6 @@ #include "future.hh" #include "net/byteorder.hh" -#include "net/api.hh" - -using clock_type = std::chrono::high_resolution_clock; /** * Implementation of rudimentary collectd data gathering. @@ -64,389 +61,360 @@ using clock_type = std::chrono::high_resolution_clock; * */ -/* all-static. using a class instead of namespace to hide implementation templates */ -class scollectd { - class impl; -public: - scollectd() = delete; +namespace scollectd { - // The value binding data types - enum class data_type : uint8_t { - COUNTER, // unsigned int 64 - GAUGE, // double - DERIVE, // signed int 64 - ABSOLUTE, // unsigned int 64 - }; - - // don't use directly. use make_typed. - template - struct typed { - typed(data_type t, T && v) - : type(t), value(std::forward(v)) { - } - data_type type; - T value; - }; - - template - static inline typed make_typed(data_type type, T&& t) { - return typed(type, std::forward(t)); - } - - typedef std::string plugin_id; - typedef std::string plugin_instance_id; - typedef std::string type_id; - - class type_instance_id { - public: - type_instance_id() = default; - type_instance_id(const plugin_id & p, const plugin_instance_id & pi, - const type_id & t, const std::string & ti = std::string()) - : _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) { - } - type_instance_id(type_instance_id &&) = default; - type_instance_id(const type_instance_id &) = default; - - type_instance_id & operator=(type_instance_id &&) = default; - type_instance_id & operator=(const type_instance_id &) = default; - - const plugin_id & plugin() const { - return _plugin; - } - const plugin_instance_id & plugin_instance() const { - return _plugin_instance; - } - const type_id & type() const { - return _type; - } - const std::string & type_instance() const { - return _type_instance; - } - private: - plugin_id _plugin; - plugin_instance_id _plugin_instance; - type_id _type; - std::string _type_instance; - }; - - static const plugin_instance_id per_cpu_plugin_instance; - - static void configure(const boost::program_options::variables_map&); - static boost::program_options::options_description get_options_description(); - - /** - * Anchor for polled registration. - * Iff the registered type is in some way none-persistent, - * use this as receiver of the reg and ensure it dies before the - * added value(s). - * - * Use: - * uint64_t v = 0; - * registration r = add_polled_metric(v); - * ++r; - * - */ - struct registration { - registration() = default; - registration(const type_instance_id& id) - : _id(id) { - } - registration(type_instance_id&& id) - : _id(std::forward(id)) { - } - registration(const registration&) = default; - registration(registration&&) = default; - ~registration() { - unregister(); - } - registration & operator=(const registration&) = default; - registration & operator=(registration&&) = default; - - void unregister() { - remove_polled_metric(_id); - _id = type_instance_id(); - } - private: - type_instance_id _id; - }; - - typedef std::function notify_function; - - template - static type_instance_id add_polled_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return add_polled_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static future<> send_explicit_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return send_explicit_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static notify_function create_explicit_metric(const plugin_id & plugin, - const plugin_instance_id & plugin_instance, const type_id & type, - const std::string & type_instance, _Args&& ... args) { - return create_explicit_metric( - type_instance_id(plugin, plugin_instance, type, type_instance), - std::forward<_Args>(args)...); - } - template - static type_instance_id add_polled_metric(const type_instance_id & id, - _Args&& ... args) { - typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type; - add_polled(id, - std::make_shared( - make_type_instance(std::forward<_Args>(args)...))); - return id; - } - // "Explicit" metric sends. Sends a single value list as a message. - // Obviously not super efficient either. But maybe someone needs it sometime. - template - static future<> send_explicit_metric(const type_instance_id & id, - _Args&& ... args) { - return send_metric(id, make_type_instance(std::forward<_Args>(args)...)); - } - template - static notify_function create_explicit_metric(const type_instance_id & id, - _Args&& ... args) { - auto list = make_type_instance(std::forward<_Args>(args)...); - return [id, list=std::move(list)]() { - send_metric(id, list); - }; - } - - static void remove_polled_metric(const type_instance_id &); - - // Send a message packet (string) - static future<> send_notification(const type_instance_id & id, - const std::string & msg); - -private: - // lots of template junk to build typed value list tuples - // for registered values. - template - struct data_type_for; - - template - struct is_callable; - - template - struct is_callable::type>::value, - void>::type> : public std::true_type { - }; - - template - struct is_callable::value, void>::type> : public std::false_type { - }; - - template - struct data_type_for::value && std::is_unsigned::value, - void>::type> : public std::integral_constant { - }; - template - struct data_type_for::value && std::is_signed::value, void>::type> : public std::integral_constant< - data_type, data_type::DERIVE> { - }; - template - struct data_type_for::value, void>::type> : public std::integral_constant< - data_type, data_type::GAUGE> { - }; - template - struct data_type_for::value, void>::type> : public data_type_for< - typename std::result_of::type> { - }; - template - struct data_type_for> : public data_type_for { - }; - - template - class value { - public: - template - struct wrap { - wrap(const W & v) - : _v(v) { - } - const W & operator()() const { - return _v; - } - const W & _v; - }; - - typedef typename std::remove_reference::type value_type; - typedef typename std::conditional< - is_callable::type>::value, - value_type, wrap >::type stored_type; - - value(const value_type & t) - : value(data_type_for::value, t) { - } - value(data_type type, const value_type & t) - : _type(type), _t(t) { - } - uint64_t operator()() const { - auto v = _t(); - if (_type == data_type::GAUGE) { - return convert(double(v)); - } else { - uint64_t u = v; - return convert(u); - } - } - operator uint64_t() const { - return (*this)(); - } - operator data_type() const { - return _type; - } - data_type type() const { - return _type; - } - private: - // not super quick value -> protocol endian 64-bit values. - template - void bpack(_Iter s, _Iter e, uint64_t v) const { - while (s != e) { - *s++ = (v & 0xff); - v >>= 8; - } - } - template - typename std::enable_if::value, uint64_t>::type convert( - V v) const { - uint64_t i = v; - // network byte order - return ntohq(i); - } - template - typename std::enable_if::value, uint64_t>::type convert( - V t) const { - union { - uint64_t i; - double v; - } v; - union { - uint64_t i; - uint8_t b[8]; - } u; - v.v = t; - // intel byte order. could also obviously be faster. - // could be ignored if we just assume we're le (for now), - // but this is ok me thinks. - bpack(std::begin(u.b), std::end(u.b), v.i); - return u.i; - } - ; - - const data_type _type; - const stored_type _t; - }; - - template - class value> : public value { - public: - value(const typed & args) - : value(args.type, args.value) { - } - }; - - class value_list { - public: - virtual size_t size() const = 0; - - virtual void types(data_type *) const = 0; - virtual void values(net::packed *) const = 0; - - bool empty() const { - return size() == 0; - } - }; - - template - class values_impl: public value_list { - public: - static const size_t num_values = sizeof...(Args); - - values_impl(Args&& ...args) - : _values(std::forward(args)...) - {} - - values_impl(values_impl&& a) = default; - values_impl(const values_impl& a) = default; - - size_t size() const override { - return num_values; - } - void types(data_type * p) const override { - unpack(_values, [p](Args... args) { - const std::array tmp = { {args}...}; - std::copy(tmp.begin(), tmp.end(), p); - }); - } - void values(net::packed * p) const override { - unpack(_values, [p](Args... args) { - std::array tmp = { {args}...}; - std::copy(tmp.begin(), tmp.end(), p); - }); - } - private: - template - void unpack(const std::tuple& t, _Op&& op) const { - do_unpack(t, std::index_sequence_for {}, std::forward<_Op>(op)); - } - - template - void do_unpack(const std::tuple& t, const std::index_sequence &, _Op&& op) const { - op(std::get(t)...); - } - - std::tuple < Args... > _values; - }; - - template - static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... > - { - return values_impl(std::forward<_Args>(args)))... > - (value<_Args>(std::forward<_Args>(args))...); - } - - static const ipv4_addr default_addr; - static const clock_type::duration default_period; - - static void add_polled(const type_instance_id &, const std::shared_ptr &); - static future<> send_metric(const type_instance_id &, const value_list &); - static impl & get_impl(); +// The value binding data types +enum class data_type : uint8_t { + COUNTER, // unsigned int 64 + GAUGE, // double + DERIVE, // signed int 64 + ABSOLUTE, // unsigned int 64 }; -inline bool operator<(const scollectd::type_instance_id & id1, - const scollectd::type_instance_id & id2) { - return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), - id1.type_instance()) - < std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), - id2.type_instance()); -} -inline bool operator==(const scollectd::type_instance_id & id1, - const scollectd::type_instance_id & id2) { - return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(), - id1.type_instance()) - == std::tie(id2.plugin(), id2.plugin_instance(), id2.type(), - id2.type_instance()); +// don't use directly. use make_typed. +template +struct typed { + typed(data_type t, T && v) + : type(t), value(std::forward(v)) { + } + data_type type; + T value; +}; + +template +static inline typed make_typed(data_type type, T&& t) { + return typed(type, std::forward(t)); } +typedef std::string plugin_id; +typedef std::string plugin_instance_id; +typedef std::string type_id; + +class type_instance_id { +public: + type_instance_id() = default; + type_instance_id(const plugin_id & p, const plugin_instance_id & pi, + const type_id & t, const std::string & ti = std::string()) + : _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) { + } + type_instance_id(type_instance_id &&) = default; + type_instance_id(const type_instance_id &) = default; + + type_instance_id & operator=(type_instance_id &&) = default; + type_instance_id & operator=(const type_instance_id &) = default; + + const plugin_id & plugin() const { + return _plugin; + } + const plugin_instance_id & plugin_instance() const { + return _plugin_instance; + } + const type_id & type() const { + return _type; + } + const std::string & type_instance() const { + return _type_instance; + } +private: + plugin_id _plugin; + plugin_instance_id _plugin_instance; + type_id _type; + std::string _type_instance; +}; + +extern const plugin_instance_id per_cpu_plugin_instance; + +void configure(const boost::program_options::variables_map&); +boost::program_options::options_description get_options_description(); +void remove_polled_metric(const type_instance_id &); + +/** + * Anchor for polled registration. + * Iff the registered type is in some way none-persistent, + * use this as receiver of the reg and ensure it dies before the + * added value(s). + * + * Use: + * uint64_t v = 0; + * registration r = add_polled_metric(v); + * ++r; + * + */ +struct registration { + registration() = default; + registration(const type_instance_id& id) + : _id(id) { + } + registration(type_instance_id&& id) + : _id(std::forward(id)) { + } + registration(const registration&) = default; + registration(registration&&) = default; + ~registration() { + unregister(); + } + registration & operator=(const registration&) = default; + registration & operator=(registration&&) = default; + + void unregister() { + remove_polled_metric(_id); + _id = type_instance_id(); + } +private: + type_instance_id _id; +}; + +// lots of template junk to build typed value list tuples +// for registered values. +template +struct data_type_for; + +template +struct is_callable; + +template +struct is_callable::type>::value, +void>::type> : public std::true_type { +}; + +template +struct is_callable::value, void>::type> : public std::false_type { +}; + +template +struct data_type_for::value && std::is_unsigned::value, +void>::type> : public std::integral_constant { +}; +template +struct data_type_for::value && std::is_signed::value, void>::type> : public std::integral_constant< +data_type, data_type::DERIVE> { +}; +template +struct data_type_for::value, void>::type> : public std::integral_constant< +data_type, data_type::GAUGE> { +}; +template +struct data_type_for::value, void>::type> : public data_type_for< +typename std::result_of::type> { +}; +template +struct data_type_for> : public data_type_for { +}; + +template +class value { +public: + template + struct wrap { + wrap(const W & v) + : _v(v) { + } + const W & operator()() const { + return _v; + } + const W & _v; + }; + + typedef typename std::remove_reference::type value_type; + typedef typename std::conditional< + is_callable::type>::value, + value_type, wrap >::type stored_type; + + value(const value_type & t) + : value(data_type_for::value, t) { + } + value(data_type type, const value_type & t) + : _type(type), _t(t) { + } + uint64_t operator()() const { + auto v = _t(); + if (_type == data_type::GAUGE) { + return convert(double(v)); + } else { + uint64_t u = v; + return convert(u); + } + } + operator uint64_t() const { + return (*this)(); + } + operator data_type() const { + return _type; + } + data_type type() const { + return _type; + } +private: + // not super quick value -> protocol endian 64-bit values. + template + void bpack(_Iter s, _Iter e, uint64_t v) const { + while (s != e) { + *s++ = (v & 0xff); + v >>= 8; + } + } + template + typename std::enable_if::value, uint64_t>::type convert( + V v) const { + uint64_t i = v; + // network byte order + return ntohq(i); + } + template + typename std::enable_if::value, uint64_t>::type convert( + V t) const { + union { + uint64_t i; + double v; + } v; + union { + uint64_t i; + uint8_t b[8]; + } u; + v.v = t; + // intel byte order. could also obviously be faster. + // could be ignored if we just assume we're le (for now), + // but this is ok me thinks. + bpack(std::begin(u.b), std::end(u.b), v.i); + return u.i; + } + ; + + const data_type _type; + const stored_type _t; +}; + +template +class value> : public value { +public: + value(const typed & args) +: value(args.type, args.value) { + } +}; + +class value_list { +public: + virtual size_t size() const = 0; + + virtual void types(data_type *) const = 0; + virtual void values(net::packed *) const = 0; + + bool empty() const { + return size() == 0; + } +}; + +template +class values_impl: public value_list { +public: + static const size_t num_values = sizeof...(Args); + + values_impl(Args&& ...args) + : _values(std::forward(args)...) + {} + + values_impl(values_impl&& a) = default; + values_impl(const values_impl& a) = default; + + size_t size() const override { + return num_values; + } + void types(data_type * p) const override { + unpack(_values, [p](Args... args) { + const std::array tmp = { {args}...}; + std::copy(tmp.begin(), tmp.end(), p); + }); + } + void values(net::packed * p) const override { + unpack(_values, [p](Args... args) { + std::array tmp = { {args}...}; + std::copy(tmp.begin(), tmp.end(), p); + }); + } +private: + template + void unpack(const std::tuple& t, _Op&& op) const { + do_unpack(t, std::index_sequence_for {}, std::forward<_Op>(op)); + } + + template + void do_unpack(const std::tuple& t, const std::index_sequence &, _Op&& op) const { + op(std::get(t)...); + } + + std::tuple < Args... > _values; +}; + +void add_polled(const type_instance_id &, const std::shared_ptr &); + +typedef std::function notify_function; +template +static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... > +{ + return values_impl(std::forward<_Args>(args)))... > + (value<_Args>(std::forward<_Args>(args))...); +} +template +static type_instance_id add_polled_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return add_polled_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static future<> send_explicit_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return send_explicit_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static notify_function create_explicit_metric(const plugin_id & plugin, + const plugin_instance_id & plugin_instance, const type_id & type, + const std::string & type_instance, _Args&& ... args) { + return create_explicit_metric( + type_instance_id(plugin, plugin_instance, type, type_instance), + std::forward<_Args>(args)...); +} +template +static type_instance_id add_polled_metric(const type_instance_id & id, + _Args&& ... args) { + typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type; + add_polled(id, + std::make_shared( + make_type_instance(std::forward<_Args>(args)...))); + return id; +} +// "Explicit" metric sends. Sends a single value list as a message. +// Obviously not super efficient either. But maybe someone needs it sometime. +template +static future<> send_explicit_metric(const type_instance_id & id, + _Args&& ... args) { + return send_metric(id, make_type_instance(std::forward<_Args>(args)...)); +} +template +static notify_function create_explicit_metric(const type_instance_id & id, + _Args&& ... args) { + auto list = make_type_instance(std::forward<_Args>(args)...); + return [id, list=std::move(list)]() { + send_metric(id, list); + }; +} + +// Send a message packet (string) +future<> send_notification(const type_instance_id & id, const std::string & msg); +}; + #endif /* SCOLLECTD_HH_ */ From 340a871a98576af905b387e251a5face2ebd2fbc Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Jan 2015 15:36:30 +0200 Subject: [PATCH 14/16] core: use seastar shared ptr in scollectd --- core/scollectd.cc | 10 ++++------ core/scollectd.hh | 5 +++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/scollectd.cc b/core/scollectd.cc index 9f889123a5..f369e182b1 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -60,17 +60,15 @@ class impl { double _avg = 0; public: - // Note: we use std::shared_ptr, not the C* one. This is because currently - // seastar sp does not handle polymorphism. And we use it. - typedef std::map > value_list_map; + typedef std::map > value_list_map; typedef value_list_map::value_type value_list_pair; void add_polled(const type_instance_id & id, - const std::shared_ptr & values) { + const shared_ptr & values) { _values.insert(std::make_pair(id, values)); } void remove_polled(const type_instance_id & id) { - _values.insert(std::make_pair(id, std::shared_ptr())); + _values.insert(std::make_pair(id, shared_ptr())); } // explicitly send a type_instance value list (outside polling) future<> send_metric(const type_instance_id & id, @@ -381,7 +379,7 @@ impl & get_impl() { } void add_polled(const type_instance_id & id, - const std::shared_ptr & values) { + const shared_ptr & values) { get_impl().add_polled(id, values); } diff --git a/core/scollectd.hh b/core/scollectd.hh index c9442505eb..602ca537f1 100644 --- a/core/scollectd.hh +++ b/core/scollectd.hh @@ -22,6 +22,7 @@ #include "future.hh" #include "net/byteorder.hh" +#include "core/shared_ptr.hh" /** * Implementation of rudimentary collectd data gathering. @@ -355,7 +356,7 @@ private: std::tuple < Args... > _values; }; -void add_polled(const type_instance_id &, const std::shared_ptr &); +void add_polled(const type_instance_id &, const shared_ptr &); typedef std::function notify_function; template @@ -393,7 +394,7 @@ static type_instance_id add_polled_metric(const type_instance_id & id, _Args&& ... args) { typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type; add_polled(id, - std::make_shared( + ::make_shared( make_type_instance(std::forward<_Args>(args)...))); return id; } From 354280fd7ccac57d748f971f890e44ae81004bfd Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Jan 2015 15:36:31 +0200 Subject: [PATCH 15/16] core: scollectd: use move instead of forward on rvalue reference --- core/scollectd.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/scollectd.hh b/core/scollectd.hh index 602ca537f1..c127b43985 100644 --- a/core/scollectd.hh +++ b/core/scollectd.hh @@ -147,7 +147,7 @@ struct registration { : _id(id) { } registration(type_instance_id&& id) - : _id(std::forward(id)) { + : _id(std::move(id)) { } registration(const registration&) = default; registration(registration&&) = default; From c6c6c6055d4664b8d90a2440698ae85b2947d745 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Jan 2015 15:36:32 +0200 Subject: [PATCH 16/16] core: add various statistics about smp communication --- core/reactor.cc | 54 +++++++++++++++++++++++++++++++++++++++++++++++-- core/reactor.hh | 11 +++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index b62990ac16..9e3f836609 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -894,6 +894,8 @@ void smp_message_queue::move_pending() { _pending.push(begin, end); _tx.a.pending_fifo.erase(begin, end); _current_queue_length += nr; + _last_snt_batch = nr; + _sent += nr; } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { @@ -926,6 +928,8 @@ size_t smp_message_queue::process_completions() { } _current_queue_length -= nr; + _compl += nr; + _last_cmpl_batch = nr; return nr; } @@ -943,11 +947,57 @@ size_t smp_message_queue::process_incoming() { respond(wi); }); } + _received += nr; + _last_rcv_batch = nr; return nr; } -void smp_message_queue::start() { +void smp_message_queue::start(unsigned cpuid) { _tx.init(); + char instance[10]; + std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid); + _collectd_regs = { + // queue_length value:GAUGE:0:U + // Absolute value of num packets in last tx batch. + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "send-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_snt_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "receive-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_rcv_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "complete-batch") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_cmpl_batch) + ), + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "queue_length", "send-queue-length") + , scollectd::make_typed(scollectd::data_type::GAUGE, _current_queue_length) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "received-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _received) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "sent-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _sent) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("smp" + , instance + , "total_operations", "completed-messages") + , scollectd::make_typed(scollectd::data_type::DERIVE, _compl) + ), + }; } /* not yet implemented for OSv. TODO: do the notification like we do class smp. */ @@ -1098,7 +1148,7 @@ void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { if (c != engine.cpu_id()) { - _qs[c][engine.cpu_id()].start(); + _qs[c][engine.cpu_id()].start(c); } } } diff --git a/core/reactor.hh b/core/reactor.hh index 33bf71e4bd..71415bb38b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -48,6 +48,8 @@ #include #endif +namespace scollectd { class registration; } + class reactor; class pollable_fd; class pollable_fd_state; @@ -388,7 +390,14 @@ class smp_message_queue { boost::lockfree::capacity>; lf_queue _pending; lf_queue _completed; + size_t _received = 0; + size_t _sent = 0; + size_t _compl = 0; size_t _current_queue_length = 0; + size_t _last_snt_batch = 0; + size_t _last_rcv_batch = 0; + size_t _last_cmpl_batch = 0; + std::vector _collectd_regs; struct work_item { virtual ~work_item() {} virtual future<> process() = 0; @@ -446,7 +455,7 @@ public: submit_item(wi); return fut; } - void start(); + void start(unsigned cpuid); size_t process_incoming(); size_t process_completions(); private: