Merge branch 'master' of github.com:cloudius-systems/seastar into db

This commit is contained in:
Avi Kivity
2015-01-14 17:00:46 +02:00
16 changed files with 581 additions and 454 deletions

View File

@@ -7,7 +7,7 @@
#include "core/sstring.hh"
#include "core/app-template.hh"
#include "core/circular_buffer.hh"
#include "core/smp.hh"
#include "core/distributed.hh"
#include "core/queue.hh"
#include "core/future-util.hh"
#include "core/scollectd.hh"

View File

@@ -16,7 +16,7 @@
#include "core/stream.hh"
#include "core/memory.hh"
#include "core/units.hh"
#include "core/smp.hh"
#include "core/distributed.hh"
#include "core/vector-data-sink.hh"
#include "net/api.hh"
#include "net/packet-data-source.hh"

View File

@@ -9,26 +9,26 @@
#include <cstdlib>
template <typename T>
inline
inline constexpr
T align_up(T v, T align) {
return (v + align - 1) & ~(align - 1);
}
template <typename T>
inline
inline constexpr
T* align_up(T* v, size_t align) {
static_assert(sizeof(T) == 1, "align byte pointers only");
return reinterpret_cast<T*>(align_up(reinterpret_cast<uintptr_t>(v), align));
}
template <typename T>
inline
inline constexpr
T align_down(T v, T align) {
return v & ~(align - 1);
}
template <typename T>
inline
inline constexpr
T* align_down(T* v, size_t align) {
static_assert(sizeof(T) == 1, "align byte pointers only");
return reinterpret_cast<T*>(align_down(reinterpret_cast<uintptr_t>(v), align));

View File

@@ -2,8 +2,8 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#ifndef SMP_HH_
#define SMP_HH_
#ifndef DISTRIBUTED_HH_
#define DISTRIBUTED_HH_
#include "reactor.hh"
#include "future-util.hh"
@@ -226,4 +226,4 @@ foreign_ptr<T> make_foreign(T ptr) {
return foreign_ptr<T>(std::move(ptr));
}
#endif /* SMP_HH_ */
#endif /* DISTRIBUTED_HH_ */

View File

@@ -91,6 +91,7 @@ private:
explicit file(int fd) : _file_impl(make_file_impl(fd)) {}
public:
file(file&& x) : _file_impl(std::move(x._file_impl)) {}
file& operator=(file&& x) noexcept = default;
template <typename CharType>
future<size_t> dma_read(uint64_t pos, CharType* buffer, size_t len) {
return _file_impl->read_dma(pos, buffer, len);

View File

@@ -62,6 +62,20 @@ struct syscall_result {
}
};
// Wrapper for a system call result containing the return value,
// an output parameter that was returned from the syscall, and errno.
template <typename Extra>
struct syscall_result_extra {
int result;
Extra extra;
int error;
void throw_if_error() {
if (result == -1) {
throw std::system_error(error, std::system_category());
}
}
};
template <typename T>
syscall_result<T>
wrap_syscall(T result) {
@@ -71,6 +85,12 @@ wrap_syscall(T result) {
return sr;
}
template <typename Extra>
syscall_result_extra<Extra>
wrap_syscall(int result, const Extra& extra) {
return {result, extra, errno};
}
reactor_backend_epoll::reactor_backend_epoll()
: _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) {
}
@@ -80,7 +100,8 @@ reactor::reactor()
, _exit_future(_exit_promise.get_future())
, _cpu_started(0)
, _io_context(0)
, _io_context_available(max_aio) {
, _io_context_available(max_aio)
, _reuseport(posix_reuseport_detect()) {
auto r = ::io_setup(max_aio, &_io_context);
assert(r >= 0);
struct sigevent sev;
@@ -165,12 +186,25 @@ reactor::posix_listen(socket_address sa, listen_options opts) {
if (opts.reuse_address) {
fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
}
if (_reuseport)
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
fd.bind(sa.u.sa, sizeof(sa.u.sas));
fd.listen(100);
return pollable_fd(std::move(fd));
}
bool
reactor::posix_reuseport_detect() {
try {
file_desc fd = file_desc::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
return true;
} catch(std::system_error& e) {
return false;
}
}
future<pollable_fd>
reactor::posix_connect(socket_address sa) {
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
@@ -306,11 +340,13 @@ posix_file_impl::flush(void) {
future<struct stat>
posix_file_impl::stat(void) {
return engine._thread_pool.submit<struct stat>([this] {
return engine._thread_pool.submit<syscall_result_extra<struct stat>>([this] {
struct stat st;
auto ret = ::fstat(_fd, &st);
throw_system_error_on(ret == -1);
return (st);
return wrap_syscall(ret, st);
}).then([] (syscall_result_extra<struct stat> ret) {
ret.throw_if_error();
return make_ready_future<struct stat>(ret.extra);
});
}
@@ -338,21 +374,20 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
future<size_t>
posix_file_impl::size(void) {
return engine._thread_pool.submit<size_t>([this] {
struct stat st;
auto ret = ::fstat(_fd, &st);
throw_system_error_on(ret == -1);
return st.st_size;
return posix_file_impl::stat().then([] (struct stat&& st) {
return make_ready_future<size_t>(st.st_size);
});
}
future<size_t>
blockdev_file_impl::size(void) {
return engine._thread_pool.submit<size_t>([this] {
return engine._thread_pool.submit<syscall_result_extra<size_t>>([this] {
size_t size;
auto ret = ::ioctl(_fd, BLKGETSIZE64, &size);
throw_system_error_on(ret == -1);
return size;
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
return wrap_syscall(ret, size);
}).then([] (syscall_result_extra<size_t> ret) {
ret.throw_if_error();
return make_ready_future<size_t>(ret.extra);
});
}
@@ -393,16 +428,16 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
auto eofcond = [w] { return w->eof; };
return do_until(eofcond, [w, this] {
if (w->current == w->total) {
return engine._thread_pool.submit<int>([w , this] () {
return engine._thread_pool.submit<syscall_result<long>>([w , this] () {
auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast<linux_dirent*>(w->buffer), sizeof(w->buffer));
throw_system_error_on(ret == -1);
return ret;
}).then([w] (int ret) {
if (ret == 0) {
return wrap_syscall(ret);
}).then([w] (syscall_result<long> ret) {
ret.throw_if_error();
if (ret.result == 0) {
w->eof = true;
} else {
w->current = 0;
w->total = ret;
w->total = ret.result;
}
});
}
@@ -859,6 +894,8 @@ void smp_message_queue::move_pending() {
_pending.push(begin, end);
_tx.a.pending_fifo.erase(begin, end);
_current_queue_length += nr;
_last_snt_batch = nr;
_sent += nr;
}
void smp_message_queue::submit_item(smp_message_queue::work_item* item) {
@@ -891,6 +928,8 @@ size_t smp_message_queue::process_completions() {
}
_current_queue_length -= nr;
_compl += nr;
_last_cmpl_batch = nr;
return nr;
}
@@ -908,12 +947,57 @@ size_t smp_message_queue::process_incoming() {
respond(wi);
});
}
_received += nr;
_last_rcv_batch = nr;
return nr;
}
void smp_message_queue::start() {
void smp_message_queue::start(unsigned cpuid) {
_tx.init();
_complete_peer = &engine;
char instance[10];
std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid);
_collectd_regs = {
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last tx batch.
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "queue_length", "send-batch")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_snt_batch)
),
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "queue_length", "receive-batch")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rcv_batch)
),
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "queue_length", "complete-batch")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_cmpl_batch)
),
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "queue_length", "send-queue-length")
, scollectd::make_typed(scollectd::data_type::GAUGE, _current_queue_length)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "total_operations", "received-messages")
, scollectd::make_typed(scollectd::data_type::DERIVE, _received)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "total_operations", "sent-messages")
, scollectd::make_typed(scollectd::data_type::DERIVE, _sent)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("smp"
, instance
, "total_operations", "completed-messages")
, scollectd::make_typed(scollectd::data_type::DERIVE, _compl)
),
};
}
/* not yet implemented for OSv. TODO: do the notification like we do class smp. */
@@ -1060,19 +1144,13 @@ smp_message_queue** smp::_qs;
std::thread::id smp::_tmain;
unsigned smp::count = 1;
void smp::listen_all(smp_message_queue* qs)
{
for (unsigned i = 0; i < smp::count; i++) {
qs[i]._pending_peer = &engine;
}
}
void smp::start_all_queues()
{
for (unsigned c = 0; c < count; c++) {
_qs[c][engine.cpu_id()].start();
if (c != engine.cpu_id()) {
_qs[c][engine.cpu_id()].start(c);
}
}
listen_all(_qs[engine.cpu_id()]);
}
#ifdef HAVE_DPDK

View File

@@ -48,6 +48,8 @@
#include <osv/newpoll.hh>
#endif
namespace scollectd { class registration; }
class reactor;
class pollable_fd;
class pollable_fd_state;
@@ -388,9 +390,14 @@ class smp_message_queue {
boost::lockfree::capacity<queue_length>>;
lf_queue _pending;
lf_queue _completed;
size_t _received = 0;
size_t _sent = 0;
size_t _compl = 0;
size_t _current_queue_length = 0;
reactor* _pending_peer;
reactor* _complete_peer;
size_t _last_snt_batch = 0;
size_t _last_rcv_batch = 0;
size_t _last_cmpl_batch = 0;
std::vector<scollectd::registration> _collectd_regs;
struct work_item {
virtual ~work_item() {}
virtual future<> process() = 0;
@@ -448,7 +455,7 @@ public:
submit_item(wi);
return fut;
}
void start();
void start(unsigned cpuid);
size_t process_incoming();
size_t process_completions();
private:
@@ -618,6 +625,7 @@ private:
promise<> _lowres_timer_promise;
promise<> _timer_promise;
std::experimental::optional<poller> _epoll_poller;
const bool _reuseport;
private:
void abort_on_error(int ret);
template <typename T, typename E>
@@ -645,6 +653,7 @@ private:
thread_pool _thread_pool;
void run_tasks(circular_buffer<std::unique_ptr<task>>& tasks, size_t task_quota);
bool posix_reuseport_detect();
public:
static boost::program_options::options_description get_options_description();
reactor();
@@ -669,6 +678,8 @@ public:
pollable_fd posix_listen(socket_address sa, listen_options opts = {});
bool posix_reuseport_available() const { return _reuseport; }
future<pollable_fd> posix_connect(socket_address sa);
future<pollable_fd, socket_address> accept(pollable_fd_state& listen_fd);
@@ -854,7 +865,6 @@ public:
return got != 0;
}
private:
static void listen_all(smp_message_queue* qs);
static void start_all_queues();
static void pin(unsigned cpu_id);
public:

View File

@@ -15,14 +15,39 @@
#include <unordered_map>
#include "scollectd.hh"
#include "core/shared_ptr.hh"
#include "core/app-template.hh"
#include "core/future-util.hh"
#include "core/shared_ptr.hh"
#include "net/api.hh"
namespace std {
inline bool operator<(const scollectd::type_instance_id & id1,
const scollectd::type_instance_id & id2) {
return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(),
id1.type_instance())
< std::tie(id2.plugin(), id2.plugin_instance(), id2.type(),
id2.type_instance());
}
inline bool operator==(const scollectd::type_instance_id & id1,
const scollectd::type_instance_id & id2) {
return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(),
id1.type_instance())
== std::tie(id2.plugin(), id2.plugin_instance(), id2.type(),
id2.type_instance());
}
}
namespace scollectd {
using namespace std::chrono_literals;
using clock_type = std::chrono::high_resolution_clock;
class scollectd::impl {
static const ipv4_addr default_addr("239.192.74.66:25826");
static const clock_type::duration default_period(1s);
const plugin_instance_id per_cpu_plugin_instance("#cpu");
future<> send_metric(const type_instance_id &, const value_list &);
class impl {
net::udp_channel _chan;
timer<> _timer;
@@ -35,17 +60,15 @@ class scollectd::impl {
double _avg = 0;
public:
// Note: we use std::shared_ptr, not the C* one. This is because currently
// seastar sp does not handle polymorphism. And we use it.
typedef std::map<type_instance_id, std::shared_ptr<value_list> > value_list_map;
typedef std::map<type_instance_id, shared_ptr<value_list> > value_list_map;
typedef value_list_map::value_type value_list_pair;
void add_polled(const type_instance_id & id,
const std::shared_ptr<value_list> & values) {
const shared_ptr<value_list> & values) {
_values.insert(std::make_pair(id, values));
}
void remove_polled(const type_instance_id & id) {
_values.insert(std::make_pair(id, std::shared_ptr<value_list>()));
_values.insert(std::make_pair(id, shared_ptr<value_list>()));
}
// explicitly send a type_instance value list (outside polling)
future<> send_metric(const type_instance_id & id,
@@ -350,35 +373,31 @@ private:
std::vector<registration> _regs;
};
const ipv4_addr scollectd::default_addr("239.192.74.66:25826");
const clock_type::duration scollectd::default_period(1s);
const scollectd::plugin_instance_id scollectd::per_cpu_plugin_instance("#cpu");
scollectd::impl & scollectd::get_impl() {
impl & get_impl() {
static thread_local impl per_cpu_instance;
return per_cpu_instance;
}
void scollectd::add_polled(const type_instance_id & id,
const std::shared_ptr<value_list> & values) {
void add_polled(const type_instance_id & id,
const shared_ptr<value_list> & values) {
get_impl().add_polled(id, values);
}
void scollectd::remove_polled_metric(const type_instance_id & id) {
void remove_polled_metric(const type_instance_id & id) {
get_impl().remove_polled(id);
}
future<> scollectd::send_notification(const type_instance_id & id,
future<> send_notification(const type_instance_id & id,
const std::string & msg) {
return get_impl().send_notification(id, msg);
}
future<> scollectd::send_metric(const type_instance_id & id,
future<> send_metric(const type_instance_id & id,
const value_list & values) {
return get_impl().send_metric(id, values);
}
void scollectd::configure(const boost::program_options::variables_map & opts) {
void configure(const boost::program_options::variables_map & opts) {
bool enable = opts["collectd"].as<bool>();
if (!enable) {
return;
@@ -398,7 +417,7 @@ void scollectd::configure(const boost::program_options::variables_map & opts) {
}
}
boost::program_options::options_description scollectd::get_options_description() {
boost::program_options::options_description get_options_description() {
namespace bpo = boost::program_options;
bpo::options_description opts("COLLECTD options");
opts.add_options()("collectd", bpo::value<bool>()->default_value(true),
@@ -412,3 +431,4 @@ boost::program_options::options_description scollectd::get_options_description()
"collectd host name");
return opts;
}
}

View File

@@ -17,9 +17,12 @@
#include <memory>
#include <string>
#include <tuple>
#include <chrono>
#include <boost/program_options.hpp>
#include "core/reactor.hh"
#include "future.hh"
#include "net/byteorder.hh"
#include "core/shared_ptr.hh"
/**
* Implementation of rudimentary collectd data gathering.
@@ -59,389 +62,360 @@
*
*/
/* all-static. using a class instead of namespace to hide implementation templates */
class scollectd {
class impl;
public:
scollectd() = delete;
namespace scollectd {
// The value binding data types
enum class data_type : uint8_t {
COUNTER, // unsigned int 64
GAUGE, // double
DERIVE, // signed int 64
ABSOLUTE, // unsigned int 64
};
// don't use directly. use make_typed.
template<typename T>
struct typed {
typed(data_type t, T && v)
: type(t), value(std::forward<T>(v)) {
}
data_type type;
T value;
};
template<typename T>
static inline typed<T> make_typed(data_type type, T&& t) {
return typed<T>(type, std::forward<T>(t));
}
typedef std::string plugin_id;
typedef std::string plugin_instance_id;
typedef std::string type_id;
class type_instance_id {
public:
type_instance_id() = default;
type_instance_id(const plugin_id & p, const plugin_instance_id & pi,
const type_id & t, const std::string & ti = std::string())
: _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) {
}
type_instance_id(type_instance_id &&) = default;
type_instance_id(const type_instance_id &) = default;
type_instance_id & operator=(type_instance_id &&) = default;
type_instance_id & operator=(const type_instance_id &) = default;
const plugin_id & plugin() const {
return _plugin;
}
const plugin_instance_id & plugin_instance() const {
return _plugin_instance;
}
const type_id & type() const {
return _type;
}
const std::string & type_instance() const {
return _type_instance;
}
private:
plugin_id _plugin;
plugin_instance_id _plugin_instance;
type_id _type;
std::string _type_instance;
};
static const plugin_instance_id per_cpu_plugin_instance;
static void configure(const boost::program_options::variables_map&);
static boost::program_options::options_description get_options_description();
/**
* Anchor for polled registration.
* Iff the registered type is in some way none-persistent,
* use this as receiver of the reg and ensure it dies before the
* added value(s).
*
* Use:
* uint64_t v = 0;
* registration r = add_polled_metric(v);
* ++r;
* <scope end, above dies>
*/
struct registration {
registration() = default;
registration(const type_instance_id& id)
: _id(id) {
}
registration(type_instance_id&& id)
: _id(std::forward<type_instance_id>(id)) {
}
registration(const registration&) = default;
registration(registration&&) = default;
~registration() {
unregister();
}
registration & operator=(const registration&) = default;
registration & operator=(registration&&) = default;
void unregister() {
remove_polled_metric(_id);
_id = type_instance_id();
}
private:
type_instance_id _id;
};
typedef std::function<void()> notify_function;
template<typename ... _Args>
static type_instance_id add_polled_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return add_polled_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static future<> send_explicit_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return send_explicit_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static notify_function create_explicit_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return create_explicit_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static type_instance_id add_polled_metric(const type_instance_id & id,
_Args&& ... args) {
typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type;
add_polled(id,
std::make_shared<impl_type>(
make_type_instance(std::forward<_Args>(args)...)));
return id;
}
// "Explicit" metric sends. Sends a single value list as a message.
// Obviously not super efficient either. But maybe someone needs it sometime.
template<typename ... _Args>
static future<> send_explicit_metric(const type_instance_id & id,
_Args&& ... args) {
return send_metric(id, make_type_instance(std::forward<_Args>(args)...));
}
template<typename ... _Args>
static notify_function create_explicit_metric(const type_instance_id & id,
_Args&& ... args) {
auto list = make_type_instance(std::forward<_Args>(args)...);
return [id, list=std::move(list)]() {
send_metric(id, list);
};
}
static void remove_polled_metric(const type_instance_id &);
// Send a message packet (string)
static future<> send_notification(const type_instance_id & id,
const std::string & msg);
private:
// lots of template junk to build typed value list tuples
// for registered values.
template<typename T, typename En = void>
struct data_type_for;
template<typename T, typename En = void>
struct is_callable;
template<typename T>
struct is_callable<T,
typename std::enable_if<
!std::is_void<typename std::result_of<T()>::type>::value,
void>::type> : public std::true_type {
};
template<typename T>
struct is_callable<T,
typename std::enable_if<std::is_fundamental<T>::value, void>::type> : public std::false_type {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<
std::is_integral<T>::value && std::is_unsigned<T>::value,
void>::type> : public std::integral_constant<data_type,
data_type::COUNTER> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<
std::is_integral<T>::value && std::is_signed<T>::value, void>::type> : public std::integral_constant<
data_type, data_type::DERIVE> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<std::is_floating_point<T>::value, void>::type> : public std::integral_constant<
data_type, data_type::GAUGE> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<is_callable<T>::value, void>::type> : public data_type_for<
typename std::result_of<T()>::type> {
};
template<typename T>
struct data_type_for<typed<T>> : public data_type_for<T> {
};
template<typename T>
class value {
public:
template<typename W>
struct wrap {
wrap(const W & v)
: _v(v) {
}
const W & operator()() const {
return _v;
}
const W & _v;
};
typedef typename std::remove_reference<T>::type value_type;
typedef typename std::conditional<
is_callable<typename std::remove_reference<T>::type>::value,
value_type, wrap<value_type> >::type stored_type;
value(const value_type & t)
: value<T>(data_type_for<value_type>::value, t) {
}
value(data_type type, const value_type & t)
: _type(type), _t(t) {
}
uint64_t operator()() const {
auto v = _t();
if (_type == data_type::GAUGE) {
return convert(double(v));
} else {
uint64_t u = v;
return convert(u);
}
}
operator uint64_t() const {
return (*this)();
}
operator data_type() const {
return _type;
}
data_type type() const {
return _type;
}
private:
// not super quick value -> protocol endian 64-bit values.
template<typename _Iter>
void bpack(_Iter s, _Iter e, uint64_t v) const {
while (s != e) {
*s++ = (v & 0xff);
v >>= 8;
}
}
template<typename V>
typename std::enable_if<std::is_integral<V>::value, uint64_t>::type convert(
V v) const {
uint64_t i = v;
// network byte order
return ntohq(i);
}
template<typename V>
typename std::enable_if<std::is_floating_point<V>::value, uint64_t>::type convert(
V t) const {
union {
uint64_t i;
double v;
} v;
union {
uint64_t i;
uint8_t b[8];
} u;
v.v = t;
// intel byte order. could also obviously be faster.
// could be ignored if we just assume we're le (for now),
// but this is ok me thinks.
bpack(std::begin(u.b), std::end(u.b), v.i);
return u.i;
}
;
const data_type _type;
const stored_type _t;
};
template<typename T>
class value<typed<T>> : public value<T> {
public:
value(const typed<T> & args)
: value<T>(args.type, args.value) {
}
};
class value_list {
public:
virtual size_t size() const = 0;
virtual void types(data_type *) const = 0;
virtual void values(net::packed<uint64_t> *) const = 0;
bool empty() const {
return size() == 0;
}
};
template<typename ... Args>
class values_impl: public value_list {
public:
static const size_t num_values = sizeof...(Args);
values_impl(Args&& ...args)
: _values(std::forward<Args>(args)...)
{}
values_impl(values_impl<Args...>&& a) = default;
values_impl(const values_impl<Args...>& a) = default;
size_t size() const override {
return num_values;
}
void types(data_type * p) const override {
unpack(_values, [p](Args... args) {
const std::array<data_type, sizeof...(args)> tmp = { {args}...};
std::copy(tmp.begin(), tmp.end(), p);
});
}
void values(net::packed<uint64_t> * p) const override {
unpack(_values, [p](Args... args) {
std::array<uint64_t, num_values> tmp = { {args}...};
std::copy(tmp.begin(), tmp.end(), p);
});
}
private:
template<typename _Op>
void unpack(const std::tuple<Args...>& t, _Op&& op) const {
do_unpack(t, std::index_sequence_for<Args...> {}, std::forward<_Op>(op));
}
template<size_t ...S, typename _Op>
void do_unpack(const std::tuple<Args...>& t, const std::index_sequence<S...> &, _Op&& op) const {
op(std::get<S>(t)...);
}
std::tuple < Args... > _values;
};
template<typename... _Args>
static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... >
{
return values_impl<decltype(value<_Args>(std::forward<_Args>(args)))... >
(value<_Args>(std::forward<_Args>(args))...);
}
static const ipv4_addr default_addr;
static const clock_type::duration default_period;
static void add_polled(const type_instance_id &, const std::shared_ptr<value_list> &);
static future<> send_metric(const type_instance_id &, const value_list &);
static impl & get_impl();
// The value binding data types
enum class data_type : uint8_t {
COUNTER, // unsigned int 64
GAUGE, // double
DERIVE, // signed int 64
ABSOLUTE, // unsigned int 64
};
inline bool operator<(const scollectd::type_instance_id & id1,
const scollectd::type_instance_id & id2) {
return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(),
id1.type_instance())
< std::tie(id2.plugin(), id2.plugin_instance(), id2.type(),
id2.type_instance());
}
inline bool operator==(const scollectd::type_instance_id & id1,
const scollectd::type_instance_id & id2) {
return std::tie(id1.plugin(), id1.plugin_instance(), id1.type(),
id1.type_instance())
== std::tie(id2.plugin(), id2.plugin_instance(), id2.type(),
id2.type_instance());
// don't use directly. use make_typed.
template<typename T>
struct typed {
typed(data_type t, T && v)
: type(t), value(std::forward<T>(v)) {
}
data_type type;
T value;
};
template<typename T>
static inline typed<T> make_typed(data_type type, T&& t) {
return typed<T>(type, std::forward<T>(t));
}
typedef std::string plugin_id;
typedef std::string plugin_instance_id;
typedef std::string type_id;
class type_instance_id {
public:
type_instance_id() = default;
type_instance_id(const plugin_id & p, const plugin_instance_id & pi,
const type_id & t, const std::string & ti = std::string())
: _plugin(p), _plugin_instance(pi), _type(t), _type_instance(ti) {
}
type_instance_id(type_instance_id &&) = default;
type_instance_id(const type_instance_id &) = default;
type_instance_id & operator=(type_instance_id &&) = default;
type_instance_id & operator=(const type_instance_id &) = default;
const plugin_id & plugin() const {
return _plugin;
}
const plugin_instance_id & plugin_instance() const {
return _plugin_instance;
}
const type_id & type() const {
return _type;
}
const std::string & type_instance() const {
return _type_instance;
}
private:
plugin_id _plugin;
plugin_instance_id _plugin_instance;
type_id _type;
std::string _type_instance;
};
extern const plugin_instance_id per_cpu_plugin_instance;
void configure(const boost::program_options::variables_map&);
boost::program_options::options_description get_options_description();
void remove_polled_metric(const type_instance_id &);
/**
* Anchor for polled registration.
* Iff the registered type is in some way none-persistent,
* use this as receiver of the reg and ensure it dies before the
* added value(s).
*
* Use:
* uint64_t v = 0;
* registration r = add_polled_metric(v);
* ++r;
* <scope end, above dies>
*/
struct registration {
registration() = default;
registration(const type_instance_id& id)
: _id(id) {
}
registration(type_instance_id&& id)
: _id(std::move(id)) {
}
registration(const registration&) = default;
registration(registration&&) = default;
~registration() {
unregister();
}
registration & operator=(const registration&) = default;
registration & operator=(registration&&) = default;
void unregister() {
remove_polled_metric(_id);
_id = type_instance_id();
}
private:
type_instance_id _id;
};
// lots of template junk to build typed value list tuples
// for registered values.
template<typename T, typename En = void>
struct data_type_for;
template<typename T, typename En = void>
struct is_callable;
template<typename T>
struct is_callable<T,
typename std::enable_if<
!std::is_void<typename std::result_of<T()>::type>::value,
void>::type> : public std::true_type {
};
template<typename T>
struct is_callable<T,
typename std::enable_if<std::is_fundamental<T>::value, void>::type> : public std::false_type {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<
std::is_integral<T>::value && std::is_unsigned<T>::value,
void>::type> : public std::integral_constant<data_type,
data_type::COUNTER> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<
std::is_integral<T>::value && std::is_signed<T>::value, void>::type> : public std::integral_constant<
data_type, data_type::DERIVE> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<std::is_floating_point<T>::value, void>::type> : public std::integral_constant<
data_type, data_type::GAUGE> {
};
template<typename T>
struct data_type_for<T,
typename std::enable_if<is_callable<T>::value, void>::type> : public data_type_for<
typename std::result_of<T()>::type> {
};
template<typename T>
struct data_type_for<typed<T>> : public data_type_for<T> {
};
template<typename T>
class value {
public:
template<typename W>
struct wrap {
wrap(const W & v)
: _v(v) {
}
const W & operator()() const {
return _v;
}
const W & _v;
};
typedef typename std::remove_reference<T>::type value_type;
typedef typename std::conditional<
is_callable<typename std::remove_reference<T>::type>::value,
value_type, wrap<value_type> >::type stored_type;
value(const value_type & t)
: value<T>(data_type_for<value_type>::value, t) {
}
value(data_type type, const value_type & t)
: _type(type), _t(t) {
}
uint64_t operator()() const {
auto v = _t();
if (_type == data_type::GAUGE) {
return convert(double(v));
} else {
uint64_t u = v;
return convert(u);
}
}
operator uint64_t() const {
return (*this)();
}
operator data_type() const {
return _type;
}
data_type type() const {
return _type;
}
private:
// not super quick value -> protocol endian 64-bit values.
template<typename _Iter>
void bpack(_Iter s, _Iter e, uint64_t v) const {
while (s != e) {
*s++ = (v & 0xff);
v >>= 8;
}
}
template<typename V>
typename std::enable_if<std::is_integral<V>::value, uint64_t>::type convert(
V v) const {
uint64_t i = v;
// network byte order
return ntohq(i);
}
template<typename V>
typename std::enable_if<std::is_floating_point<V>::value, uint64_t>::type convert(
V t) const {
union {
uint64_t i;
double v;
} v;
union {
uint64_t i;
uint8_t b[8];
} u;
v.v = t;
// intel byte order. could also obviously be faster.
// could be ignored if we just assume we're le (for now),
// but this is ok me thinks.
bpack(std::begin(u.b), std::end(u.b), v.i);
return u.i;
}
;
const data_type _type;
const stored_type _t;
};
template<typename T>
class value<typed<T>> : public value<T> {
public:
value(const typed<T> & args)
: value<T>(args.type, args.value) {
}
};
class value_list {
public:
virtual size_t size() const = 0;
virtual void types(data_type *) const = 0;
virtual void values(net::packed<uint64_t> *) const = 0;
bool empty() const {
return size() == 0;
}
};
template<typename ... Args>
class values_impl: public value_list {
public:
static const size_t num_values = sizeof...(Args);
values_impl(Args&& ...args)
: _values(std::forward<Args>(args)...)
{}
values_impl(values_impl<Args...>&& a) = default;
values_impl(const values_impl<Args...>& a) = default;
size_t size() const override {
return num_values;
}
void types(data_type * p) const override {
unpack(_values, [p](Args... args) {
const std::array<data_type, sizeof...(args)> tmp = { {args}...};
std::copy(tmp.begin(), tmp.end(), p);
});
}
void values(net::packed<uint64_t> * p) const override {
unpack(_values, [p](Args... args) {
std::array<uint64_t, num_values> tmp = { {args}...};
std::copy(tmp.begin(), tmp.end(), p);
});
}
private:
template<typename _Op>
void unpack(const std::tuple<Args...>& t, _Op&& op) const {
do_unpack(t, std::index_sequence_for<Args...> {}, std::forward<_Op>(op));
}
template<size_t ...S, typename _Op>
void do_unpack(const std::tuple<Args...>& t, const std::index_sequence<S...> &, _Op&& op) const {
op(std::get<S>(t)...);
}
std::tuple < Args... > _values;
};
void add_polled(const type_instance_id &, const shared_ptr<value_list> &);
typedef std::function<void()> notify_function;
template<typename... _Args>
static auto make_type_instance(_Args && ... args) -> values_impl < decltype(value<_Args>(std::forward<_Args>(args)))... >
{
return values_impl<decltype(value<_Args>(std::forward<_Args>(args)))... >
(value<_Args>(std::forward<_Args>(args))...);
}
template<typename ... _Args>
static type_instance_id add_polled_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return add_polled_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static future<> send_explicit_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return send_explicit_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static notify_function create_explicit_metric(const plugin_id & plugin,
const plugin_instance_id & plugin_instance, const type_id & type,
const std::string & type_instance, _Args&& ... args) {
return create_explicit_metric(
type_instance_id(plugin, plugin_instance, type, type_instance),
std::forward<_Args>(args)...);
}
template<typename ... _Args>
static type_instance_id add_polled_metric(const type_instance_id & id,
_Args&& ... args) {
typedef decltype(make_type_instance(std::forward<_Args>(args)...)) impl_type;
add_polled(id,
::make_shared<impl_type>(
make_type_instance(std::forward<_Args>(args)...)));
return id;
}
// "Explicit" metric sends. Sends a single value list as a message.
// Obviously not super efficient either. But maybe someone needs it sometime.
template<typename ... _Args>
static future<> send_explicit_metric(const type_instance_id & id,
_Args&& ... args) {
return send_metric(id, make_type_instance(std::forward<_Args>(args)...));
}
template<typename ... _Args>
static notify_function create_explicit_metric(const type_instance_id & id,
_Args&& ... args) {
auto list = make_type_instance(std::forward<_Args>(args)...);
return [id, list=std::move(list)]() {
send_metric(id, list);
};
}
// Send a message packet (string)
future<> send_notification(const type_instance_id & id, const std::string & msg);
};
#endif /* SCOLLECTD_HH_ */

View File

@@ -212,6 +212,18 @@ public:
}
};
template <typename char_type, typename size_type, size_type Max, size_type N>
inline
basic_sstring<char_type, size_type, Max>
operator+(const char(&s)[N], const basic_sstring<char_type, size_type, Max>& t) {
using sstring = basic_sstring<char_type, size_type, Max>;
// don't copy the terminating NUL character
sstring ret(typename sstring::initialized_later(), N-1 + t.size());
auto p = std::copy(std::begin(s), std::end(s)-1, ret.begin());
std::copy(t.begin(), t.end(), p);
return ret;
}
template <size_t N>
static inline
size_t str_len(const char(&s)[N]) { return N - 1; }

View File

@@ -5,7 +5,7 @@
#include "database.hh"
#include "core/app-template.hh"
#include "core/smp.hh"
#include "core/distributed.hh"
#include "thrift/server.hh"
namespace bpo = boost::program_options;

View File

@@ -18,6 +18,7 @@ public:
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192); }
friend class posix_server_socket_impl;
friend class posix_ap_server_socket_impl;
friend class posix_reuseport_server_socket_impl;
friend class posix_network_stack;
friend class posix_ap_network_stack;
};
@@ -55,6 +56,15 @@ future<connected_socket, socket_address> posix_ap_server_socket_impl::accept() {
}
}
future<connected_socket, socket_address>
posix_reuseport_server_socket_impl::accept() {
return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) {
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
return make_ready_future<connected_socket, socket_address>(
connected_socket(std::move(csi)), sa);
});
}
void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) {
auto i = sockets.find(sa.as_posix_sockaddr_in());
if (i != sockets.end()) {
@@ -115,7 +125,10 @@ posix_data_sink_impl::put(packet p) {
server_socket
posix_network_stack::listen(socket_address sa, listen_options opt) {
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
if (_reuseport)
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
else
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
}
future<connected_socket>
@@ -131,7 +144,10 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl:
server_socket
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
return server_socket(std::make_unique<posix_ap_server_socket_impl>(sa));
if (_reuseport)
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
else
return server_socket(std::make_unique<posix_ap_server_socket_impl>(sa));
}
future<connected_socket>

View File

@@ -59,21 +59,33 @@ public:
virtual future<connected_socket, socket_address> accept();
};
class posix_network_stack : public network_stack {
class posix_reuseport_server_socket_impl : public server_socket_impl {
socket_address _sa;
pollable_fd _lfd;
public:
posix_network_stack(boost::program_options::variables_map opts) {}
explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
virtual future<connected_socket, socket_address> accept();
};
class posix_network_stack : public network_stack {
private:
const bool _reuseport;
public:
explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
virtual future<connected_socket> connect(socket_address sa) override;
virtual net::udp_channel make_udp_channel(ipv4_addr addr) override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {
return make_ready_future<std::unique_ptr<network_stack>>(std::unique_ptr<network_stack>(new posix_network_stack(opts)));
}
virtual bool has_per_core_namespace() override { return false; };
virtual bool has_per_core_namespace() override { return _reuseport; };
};
class posix_ap_network_stack : public posix_network_stack {
private:
const bool _reuseport;
public:
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)) {}
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
virtual future<connected_socket> connect(socket_address sa) override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {

View File

@@ -15,3 +15,7 @@ BOOST_AUTO_TEST_CASE(test_equality) {
BOOST_AUTO_TEST_CASE(test_to_sstring) {
BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567"));
}
BOOST_AUTO_TEST_CASE(test_add_literal_to_sstring) {
BOOST_REQUIRE_EQUAL("x" + sstring("y"), sstring("xy"));
}

View File

@@ -5,7 +5,7 @@
#include "core/reactor.hh"
#include "core/app-template.hh"
#include "core/temporary_buffer.hh"
#include "core/smp.hh"
#include "core/distributed.hh"
#include <vector>
#include <iostream>

View File

@@ -2,7 +2,7 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include "core/smp.hh"
#include "core/distributed.hh"
#include "core/app-template.hh"
#include "core/future-util.hh"