Merge branch 'master' of github.com:cloudius-systems/seastar into db

This commit is contained in:
Avi Kivity
2015-04-22 10:12:16 +03:00
20 changed files with 3238 additions and 177 deletions

2362
Doxyfile Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -336,7 +336,7 @@ private:
size_t _resize_up_threshold = load_factor * initial_bucket_count;
cache_type::bucket_type* _buckets;
cache_type _cache;
timer_set<item, &item::_timer_link> _alive;
seastar::timer_set<item, &item::_timer_link> _alive;
timer<> _timer;
cache_stats _stats;
timer<> _flush_timer;

View File

@@ -27,12 +27,6 @@ def get_flags():
if line.rstrip('\n').startswith('flags'):
return re.sub(r'^flags\s+: ', '', line).split()
def has_avx():
return 'avx' in get_flags()
def has_avx2():
return 'avx2' in get_flags()
def add_tristate(arg_parser, name, dest, help):
arg_parser.add_argument('--enable-' + name, dest = dest, action = 'store_true', default = None,
help = 'Enable ' + help)
@@ -51,6 +45,36 @@ def apply_tristate(var, test, note, missing):
return False
return False
#
# dpdk_cflags - fetch the DPDK specific CFLAGS
#
# Run a simple makefile that "includes" the DPDK main makefile and prints the
# MACHINE_CFLAGS value
#
def dpdk_cflags (dpdk_target):
with tempfile.NamedTemporaryFile() as sfile:
dpdk_target = os.path.abspath(dpdk_target)
dpdk_target = re.sub(r'\/+$', '', dpdk_target)
dpdk_sdk_path = os.path.dirname(dpdk_target)
dpdk_target_name = os.path.basename(dpdk_target)
dpdk_arch = dpdk_target_name.split('-')[0]
sfile.file.write(bytes('include ' + dpdk_sdk_path + '/mk/rte.vars.mk' + "\n", 'utf-8'))
sfile.file.write(bytes('all:' + "\n\t", 'utf-8'))
sfile.file.write(bytes('@echo $(MACHINE_CFLAGS)' + "\n", 'utf-8'))
sfile.file.flush()
dpdk_cflags = subprocess.check_output(['make', '--no-print-directory',
'-f', sfile.name,
'RTE_SDK=' + dpdk_sdk_path,
'RTE_TARGET=' + dpdk_target_name,
'RTE_ARCH=' + dpdk_arch])
dpdk_cflags_str = dpdk_cflags.decode('utf-8')
dpdk_cflags_str = re.sub(r'\n+$', '', dpdk_cflags_str)
dpdk_cflags_final = ''
return dpdk_cflags_str
def try_compile(compiler, source = '', flags = []):
with tempfile.NamedTemporaryFile() as sfile:
sfile.file.write(bytes(source, 'utf-8'))
@@ -452,16 +476,19 @@ if args.with_osv:
args.hwloc = False
args.user_cflags = (args.user_cflags +
' -DDEFAULT_ALLOCATOR -fvisibility=default -DHAVE_OSV -I' +
args.with_osv + '/include')
args.with_osv + ' -I' + args.with_osv + '/include -I' +
args.with_osv + '/arch/x64')
if args.dpdk_target:
args.user_cflags = (args.user_cflags +
' -DHAVE_DPDK -I' +
args.dpdk_target + '/include -Wno-error=literal-suffix -Wno-literal-suffix -Wno-invalid-offsetof -m64' +
' -mavx' if has_avx() else '' +
' -mavx2' if has_avx2() else '')
libs += (' -L' + args.dpdk_target + '/lib ' +
'-Wl,--whole-archive -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -Wl,--no-whole-archive -lrte_distributor -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_hash -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_kvargs -lrte_mbuf -lrte_ip_frag -lethdev -lrte_eal -lrte_malloc -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -lrt -lm -ldl')
' -DHAVE_DPDK -I' + args.dpdk_target + '/include ' +
dpdk_cflags(args.dpdk_target) +
' -Wno-error=literal-suffix -Wno-literal-suffix -Wno-invalid-offsetof')
libs += (' -L' + args.dpdk_target + '/lib ')
if args.with_osv:
libs += '-lintel_dpdk -lrt -lm -ldl'
else:
libs += '-Wl,--whole-archive -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -Wl,--no-whole-archive -lrte_distributor -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_hash -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_kvargs -lrte_mbuf -lrte_ip_frag -lethdev -lrte_eal -lrte_malloc -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -lrt -lm -ldl'
warnings = [w
for w in warnings

View File

@@ -68,6 +68,9 @@ void eal::init(cpuset cpus, boost::program_options::variables_map opts)
} else if (!opts.count("dpdk-pmd")) {
args.push_back(string2vector("--no-huge"));
}
#ifdef HAVE_OSV
args.push_back(string2vector("--no-shconf"));
#endif
std::vector<char*> cargs;

View File

@@ -799,9 +799,9 @@ void* allocate_aligned(size_t align, size_t size) {
if (size <= sizeof(free_object)) {
size = sizeof(free_object);
}
if (size <= max_small_allocation) {
if (size <= max_small_allocation && align <= page_size) {
// Our small allocator only guarantees alignment for power-of-two
// allocations.
// allocations which are not larger than a page.
size = 1 << log2(size);
return cpu_mem.allocate_small(size);
} else {

View File

@@ -175,6 +175,11 @@ void reactor::signals::action(int signo, siginfo_t* siginfo, void* ignore) {
reactor::reactor()
: _backend()
#ifdef HAVE_OSV
, _timer_thread(
[&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
, _engine_thread(sched::thread::current())
#endif
, _exit_future(_exit_promise.get_future())
, _cpu_started(0)
, _io_context(0)
@@ -183,12 +188,16 @@ reactor::reactor()
auto r = ::io_setup(max_aio, &_io_context);
assert(r >= 0);
#ifdef HAVE_OSV
_timer_thread.start();
#else
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev._sigev_un._tid = syscall(SYS_gettid);
sev.sigev_signo = SIGALRM;
r = timer_create(CLOCK_REALTIME, &sev, &_timer);
assert(r >= 0);
#endif
memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
// push it in the front of the queue so we reclaim memory quickly
_pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] {
@@ -197,6 +206,41 @@ reactor::reactor()
});
}
#ifdef HAVE_OSV
void reactor::timer_thread_func() {
sched::timer tmr(*sched::thread::current());
WITH_LOCK(_timer_mutex) {
while (!_stopped) {
if (_timer_due != 0) {
set_timer(tmr, _timer_due);
_timer_cond.wait(_timer_mutex, &tmr);
if (tmr.expired()) {
_timer_due = 0;
_engine_thread->unsafe_stop();
_pending_tasks.push_front(make_task([this] {
complete_timers(_timers, _expired_timers, [this] {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
});
}));
_engine_thread->wake();
} else {
tmr.cancel();
}
} else {
_timer_cond.wait(_timer_mutex);
}
}
}
}
void reactor::set_timer(sched::timer &tmr, s64 t) {
using namespace osv::clock;
tmr.set(wall::time_point(std::chrono::nanoseconds(t)));
}
#endif
void reactor::configure(boost::program_options::variables_map vm) {
auto network_stack_ready = vm.count("network-stack")
? network_stack_registry::create(sstring(vm["network-stack"].as<std::string>()), vm)
@@ -620,11 +664,19 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
void reactor::enable_timer(clock_type::time_point when)
{
#ifndef HAVE_OSV
itimerspec its;
its.it_interval = {};
its.it_value = to_timespec(when);
auto ret = timer_settime(_timer, TIMER_ABSTIME, &its, NULL);
throw_system_error_on(ret == -1);
#else
using ns = std::chrono::nanoseconds;
WITH_LOCK(_timer_mutex) {
_timer_due = std::chrono::duration_cast<ns>(when.time_since_epoch()).count();
_timer_cond.wake_one();
}
#endif
}
void reactor::add_timer(timer<>* tmr) {
@@ -810,6 +862,7 @@ int reactor::run() {
smp_poller = poller(smp::poll_queues);
}
#ifndef HAVE_OSV
_signals.handle_signal(SIGALRM, [this] {
complete_timers(_timers, _expired_timers, [this] {
if (!_timers.empty()) {
@@ -817,6 +870,7 @@ int reactor::run() {
}
});
});
#endif
poller drain_cross_cpu_freelist([] {
return memory::drain_cross_cpu_freelist();

View File

@@ -65,6 +65,9 @@
#include "core/enum.hh"
#ifdef HAVE_OSV
#include <osv/sched.hh>
#include <osv/mutex.h>
#include <osv/condvar.h>
#include <osv/newpoll.hh>
#endif
@@ -122,7 +125,7 @@ public:
bool cancel();
time_point get_timeout();
friend class reactor;
friend class timer_set<timer, &timer::_link>;
friend class seastar::timer_set<timer, &timer::_link>;
};
class lowres_clock {
@@ -638,6 +641,11 @@ private:
// FIXME: make _backend a unique_ptr<reactor_backend>, not a compile-time #ifdef.
#ifdef HAVE_OSV
reactor_backend_osv _backend;
sched::thread _timer_thread;
sched::thread *_engine_thread;
mutable mutex _timer_mutex;
condvar _timer_cond;
s64 _timer_due = 0;
#else
reactor_backend_epoll _backend;
#endif
@@ -654,10 +662,10 @@ private:
promise<> _start_promise;
semaphore _cpu_started;
uint64_t _tasks_processed = 0;
timer_set<timer<>, &timer<>::_link> _timers;
timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
seastar::timer_set<timer<>, &timer<>::_link> _timers;
seastar::timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
seastar::timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
seastar::timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
io_context_t _io_context;
semaphore _io_context_available;
circular_buffer<std::unique_ptr<task>> _pending_tasks;
@@ -782,6 +790,11 @@ public:
});
}
}
#ifdef HAVE_OSV
void timer_thread_func();
void set_timer(sched::timer &tmr, s64 t);
#endif
private:
/**
* Add a new "poller" - a non-blocking function returning a boolean, that

View File

@@ -99,8 +99,10 @@ public:
return std::move(_deleter);
}
static temporary_buffer aligned(size_t alignment, size_t size) {
auto buf = static_cast<CharType*>(::memalign(alignment, size * sizeof(CharType)));
if (size && !buf) {
void *ptr = nullptr;
auto ret = ::posix_memalign(&ptr, alignment, size * sizeof(CharType));
auto buf = static_cast<CharType*>(ptr);
if (ret) {
throw std::bad_alloc();
}
return temporary_buffer(buf, size, make_free_deleter(buf));

View File

@@ -11,8 +11,8 @@
* BSD license as described in the LICENSE file in the top-level directory.
*/
#ifndef __OSV_TIMER_SET_HH
#define __OSV_TIMER_SET_HH
#ifndef __TIMER_SET_HH
#define __TIMER_SET_HH
#include <chrono>
#include <limits>
@@ -23,6 +23,7 @@
namespace bi = boost::intrusive;
namespace seastar {
/**
* A data structure designed for holding and expiring timers. It's
* optimized for timer non-delivery by deferring sorting cost until
@@ -251,5 +252,6 @@ public:
return Timer::clock::now();
}
};
};
#endif

View File

@@ -284,7 +284,7 @@ public:
add_param(req, url.substr(curr, end_param - curr) );
curr = end_param + 1;
}
add_param(req, url.substr(pos + 1));
add_param(req, url.substr(curr));
return req._url.substr(0, pos);
}

View File

@@ -161,6 +161,38 @@ static constexpr const char* pktmbuf_pool_name = "dpdk_net_pktmbuf_pool";
static constexpr uint8_t packet_read_size = 32;
/******************************************************************************/
struct port_stats {
port_stats() {
std::memset(&rx, 0, sizeof(rx));
std::memset(&tx, 0, sizeof(tx));
}
struct {
struct {
uint64_t mcast; // number of received multicast packets
uint64_t pause_xon; // number of received PAUSE XON frames
uint64_t pause_xoff; // number of received PAUSE XOFF frames
} good;
struct {
uint64_t dropped; // missed packets (e.g. full FIFO)
uint64_t crc; // packets with CRC error
uint64_t len; // packets with a bad length
uint64_t total; // total number of erroneous received packets
} bad;
} rx;
struct {
struct {
uint64_t pause_xon; // number of sent PAUSE XON frames
uint64_t pause_xoff; // number of sent PAUSE XOFF frames
} good;
struct {
uint64_t total; // total number of failed transmitted packets
} bad;
} tx;
};
class dpdk_device : public device {
uint8_t _port_idx;
@@ -168,11 +200,18 @@ class dpdk_device : public device {
net::hw_features _hw_features;
uint8_t _queues_ready = 0;
unsigned _home_cpu;
bool _use_lro;
bool _enable_fc;
std::vector<uint8_t> _redir_table;
#ifdef RTE_VERSION_1_7
struct rte_eth_rxconf _rx_conf_default = {};
struct rte_eth_txconf _tx_conf_default = {};
#endif
port_stats _stats;
timer<> _stats_collector;
const std::string _stats_plugin_name;
const std::string _stats_plugin_inst;
std::vector<scollectd::registration> _collectd_regs;
public:
rte_eth_dev_info _dev_info = {};
@@ -213,18 +252,129 @@ private:
*/
void check_port_link_status();
/**
* Configures the HW Flow Control
*/
void set_hw_flow_control();
public:
dpdk_device(uint8_t port_idx, uint16_t num_queues)
dpdk_device(uint8_t port_idx, uint16_t num_queues, bool use_lro,
bool enable_fc)
: _port_idx(port_idx)
, _num_queues(num_queues)
, _home_cpu(engine().cpu_id()) {
, _home_cpu(engine().cpu_id())
, _use_lro(use_lro)
, _enable_fc(enable_fc)
, _stats_plugin_name("network")
, _stats_plugin_inst(std::string("port") + std::to_string(_port_idx))
{
/* now initialise the port we will use */
int ret = init_port_start();
if (ret != 0) {
rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n", _port_idx);
}
_stats_collector.set_callback([&] {
rte_eth_stats rte_stats = {};
int rc = rte_eth_stats_get(_port_idx, &rte_stats);
if (rc) {
printf("Failed to get port statistics: %s\n", strerror(rc));
}
_stats.rx.good.mcast = rte_stats.imcasts;
_stats.rx.good.pause_xon = rte_stats.rx_pause_xon;
_stats.rx.good.pause_xoff = rte_stats.rx_pause_xoff;
_stats.rx.bad.crc = rte_stats.ibadcrc;
_stats.rx.bad.dropped = rte_stats.imissed;
_stats.rx.bad.len = rte_stats.ibadlen;
_stats.rx.bad.total = rte_stats.ierrors;
_stats.tx.good.pause_xon = rte_stats.tx_pause_xon;
_stats.tx.good.pause_xoff = rte_stats.tx_pause_xoff;
_stats.tx.bad.total = rte_stats.oerrors;
});
// Register port statistics collectd pollers
// Rx Good
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_multicast", _stats_plugin_inst + " Rx Multicast")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.mcast)
));
// Rx Errors
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_rx_errors", "Bad CRC")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.crc)
));
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_rx_errors", "Dropped")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.dropped)
));
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_rx_errors", "Bad Length")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.len)
));
// Coupled counters:
// Good
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_packets", _stats_plugin_inst + " Pause XON")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.pause_xon)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.pause_xon)
));
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_packets", _stats_plugin_inst + " Pause XOFF")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.pause_xoff)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.pause_xoff)
));
// Errors
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, _stats_plugin_inst
, "if_errors", _stats_plugin_inst)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.total)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.bad.total)
));
}
~dpdk_device() {
_stats_collector.cancel();
}
ethernet_address hw_address() override {
struct ether_addr mac;
rte_eth_macaddr_get(_port_idx, &mac);
@@ -235,6 +385,8 @@ public:
return _hw_features;
}
net::hw_features& hw_features_ref() { return _hw_features; }
const rte_eth_rxconf* def_rx_conf() const {
#ifdef RTE_VERSION_1_7
return &_rx_conf_default;
@@ -284,38 +436,36 @@ class dpdk_qp : public net::qp {
* way.
*
* @param p packet to translate
* @param dev Parent dpdk_device
* @param fc Buffers' factory to use
* @param qp dpdk_qp handle
*
* @return the HEAD tx_buf of the cluster or nullptr in case of a
* failure
*/
static tx_buf* from_packet_zc(
packet&& p, dpdk_device& dev, tx_buf_factory& fc) {
packet&& p, dpdk_qp& qp) {
return from_packet(check_frag0, translate_one_frag, copy_one_frag,
[](packet&& _p, tx_buf& _last_seg) {
_last_seg.set_packet(std::move(_p));
}, std::move(p), dev, fc);
}, std::move(p), qp);
}
/**
* Creates a tx_buf cluster representing a given packet in a "copy" way.
*
* @param p packet to translate
* @param dev Parent dpdk_device
* @param fc Buffers' factory to use
* @param qp dpdk_qp handle
*
* @return the HEAD tx_buf of the cluster or nullptr in case of a
* failure
*/
static tx_buf* from_packet_copy(
packet&& p, dpdk_device& dev, tx_buf_factory& fc) {
packet&& p, dpdk_qp& qp) {
return from_packet([](packet& _p) { return true; },
copy_one_frag, copy_one_frag,
[](packet&& _p, tx_buf& _last_seg) {},
std::move(p), dev, fc);
std::move(p), qp);
}
private:
/**
@@ -326,8 +476,7 @@ class dpdk_qp : public net::qp {
* @param do_one_frag Functor that handles a single frag translation
* @param fin Functor that performs a cluster finalization
* @param p packet to translate
* @param dev Parent dpdk_device object
* @param fc Buffers' factory to use
* @param qp dpdk_qp handle
*
* @return the HEAD tx_buf of the cluster or nullptr in case of a
* failure
@@ -337,7 +486,7 @@ class dpdk_qp : public net::qp {
static tx_buf* from_packet(
FirstFragCheck frag0_check, TrOneFunc do_one_frag,
CopyOneFunc copy_one_frag, FinalizeFunc fin,
packet&& p, dpdk_device& dev, tx_buf_factory& fc) {
packet&& p, dpdk_qp& qp) {
// Too fragmented - linearize
if (p.nr_frags() > max_frags) {
@@ -352,10 +501,10 @@ class dpdk_qp : public net::qp {
// copied and if yes - send it in a copy way
//
if (!frag0_check(p)) {
if (!copy_one_frag(fc, p.frag(0), head, last_seg, nsegs)) {
if (!copy_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
return nullptr;
}
} else if (!do_one_frag(fc, p.frag(0), head, last_seg, nsegs)) {
} else if (!do_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
return nullptr;
}
@@ -363,7 +512,7 @@ class dpdk_qp : public net::qp {
for (unsigned i = 1; i < p.nr_frags(); i++) {
rte_mbuf *h = nullptr, *new_last_seg = nullptr;
if (!do_one_frag(fc, p.frag(i), h, new_last_seg, nsegs)) {
if (!do_one_frag(qp, p.frag(i), h, new_last_seg, nsegs)) {
me(head)->recycle();
return nullptr;
}
@@ -387,7 +536,7 @@ class dpdk_qp : public net::qp {
rte_mbuf_l2_len(head) = sizeof(struct ether_hdr);
rte_mbuf_l3_len(head) = oi.ip_hdr_len;
}
if (dev.hw_features().tx_csum_l4_offload) {
if (qp.port().hw_features().tx_csum_l4_offload) {
if (oi.protocol == ip_protocol_num::tcp) {
head->ol_flags |= PKT_TX_TCP_CKSUM;
// TODO: Take a VLAN header into an account here
@@ -420,7 +569,7 @@ class dpdk_qp : public net::qp {
*
* @param do_one_buf Functor responsible for a single rte_mbuf
* handling
* @param fc Buffers' factory to allocate the tx_buf from (in)
* @param qp dpdk_qp handle (in)
* @param frag Fragment to copy (in)
* @param head Head of the cluster (out)
* @param last_seg Last segment of the cluster (out)
@@ -429,7 +578,7 @@ class dpdk_qp : public net::qp {
* @return TRUE in case of success
*/
template <class DoOneBufFunc>
static bool do_one_frag(DoOneBufFunc do_one_buf, tx_buf_factory& fc,
static bool do_one_frag(DoOneBufFunc do_one_buf, dpdk_qp& qp,
fragment& frag, rte_mbuf*& head,
rte_mbuf*& last_seg, unsigned& nsegs) {
//
@@ -446,7 +595,7 @@ class dpdk_qp : public net::qp {
assert(frag.size);
// Create a HEAD of mbufs' cluster and set the first bytes into it
len = do_one_buf(fc, head, base, left_to_set);
len = do_one_buf(qp, head, base, left_to_set);
if (!len) {
return false;
}
@@ -461,7 +610,7 @@ class dpdk_qp : public net::qp {
//
rte_mbuf* prev_seg = head;
while (left_to_set) {
len = do_one_buf(fc, m, base, left_to_set);
len = do_one_buf(qp, m, base, left_to_set);
if (!len) {
me(head)->recycle();
return false;
@@ -484,7 +633,7 @@ class dpdk_qp : public net::qp {
/**
* Zero-copy handling of a single net::fragment.
*
* @param fc Buffers' factory to allocate the tx_buf from (in)
* @param qp dpdk_qp handle (in)
* @param frag Fragment to copy (in)
* @param head Head of the cluster (out)
* @param last_seg Last segment of the cluster (out)
@@ -492,17 +641,17 @@ class dpdk_qp : public net::qp {
*
* @return TRUE in case of success
*/
static bool translate_one_frag(tx_buf_factory& fc, fragment& frag,
static bool translate_one_frag(dpdk_qp& qp, fragment& frag,
rte_mbuf*& head, rte_mbuf*& last_seg,
unsigned& nsegs) {
return do_one_frag(set_one_data_buf, fc, frag, head,
return do_one_frag(set_one_data_buf, qp, frag, head,
last_seg, nsegs);
}
/**
* Copies one net::fragment into the cluster of rte_mbuf's.
*
* @param fc Buffers' factory to allocate the tx_buf from (in)
* @param qp dpdk_qp handle (in)
* @param frag Fragment to copy (in)
* @param head Head of the cluster (out)
* @param last_seg Last segment of the cluster (out)
@@ -513,10 +662,10 @@ class dpdk_qp : public net::qp {
*
* @return TRUE in case of success
*/
static bool copy_one_frag(tx_buf_factory& fc, fragment& frag,
static bool copy_one_frag(dpdk_qp& qp, fragment& frag,
rte_mbuf*& head, rte_mbuf*& last_seg,
unsigned& nsegs) {
return do_one_frag(copy_one_data_buf, fc, frag, head,
return do_one_frag(copy_one_data_buf, qp, frag, head,
last_seg, nsegs);
}
@@ -524,7 +673,7 @@ class dpdk_qp : public net::qp {
* Allocates a single rte_mbuf and sets it to point to a given data
* buffer.
*
* @param fc Buffers' factory to allocate the tx_buf from (in)
* @param qp dpdk_qp handle (in)
* @param m New allocated rte_mbuf (out)
* @param va virtual address of a data buffer (in)
* @param buf_len length of the data to copy (in)
@@ -532,7 +681,7 @@ class dpdk_qp : public net::qp {
* @return The actual number of bytes that has been set in the mbuf
*/
static size_t set_one_data_buf(
tx_buf_factory& fc, rte_mbuf*& m, char* va, size_t buf_len) {
dpdk_qp& qp, rte_mbuf*& m, char* va, size_t buf_len) {
using namespace memory;
translation tr = translate(va, buf_len);
@@ -547,10 +696,10 @@ class dpdk_qp : public net::qp {
phys_addr_t pa = tr.addr;
if (!tr.size) {
return copy_one_data_buf(fc, m, va, buf_len);
return copy_one_data_buf(qp, m, va, buf_len);
}
tx_buf* buf = fc.get();
tx_buf* buf = qp.get_tx_buf();
if (!buf) {
return 0;
}
@@ -567,7 +716,7 @@ class dpdk_qp : public net::qp {
/**
* Allocates a single rte_mbuf and copies a given data into it.
*
* @param fc Buffers' factory to allocate the tx_buf from (in)
* @param qp dpdk_qp handle (in)
* @param m New allocated rte_mbuf (out)
* @param data Data to copy from (in)
* @param buf_len length of the data to copy (in)
@@ -575,9 +724,9 @@ class dpdk_qp : public net::qp {
* @return The actual number of bytes that has been copied
*/
static size_t copy_one_data_buf(
tx_buf_factory& fc, rte_mbuf*& m, char* data, size_t buf_len)
dpdk_qp& qp, rte_mbuf*& m, char* data, size_t buf_len)
{
tx_buf* buf = fc.get();
tx_buf* buf = qp.get_tx_buf();
if (!buf) {
return 0;
}
@@ -590,6 +739,7 @@ class dpdk_qp : public net::qp {
rte_mbuf_data_len(m) = len;
rte_mbuf_pkt_len(m) = len;
qp._stats.tx.good.update_copy_stats(1, len);
rte_memcpy(rte_pktmbuf_mtod(m, void*), data, len);
@@ -885,7 +1035,8 @@ class dpdk_qp : public net::qp {
};
public:
explicit dpdk_qp(dpdk_device* dev, uint8_t qid);
explicit dpdk_qp(dpdk_device* dev, uint8_t qid,
const std::string stats_plugin_name);
virtual void rx_start() override;
virtual future<> send(packet p) override {
@@ -897,17 +1048,18 @@ public:
if (HugetlbfsMemBackend) {
// Zero-copy send
return _send(pb, [&] (packet&& p) {
return tx_buf::from_packet_zc(
std::move(p), *_dev, _tx_buf_factory);
return tx_buf::from_packet_zc(std::move(p), *this);
});
} else {
// "Copy"-send
return _send(pb, [&](packet&& p) {
return tx_buf::from_packet_copy(
std::move(p), *_dev, _tx_buf_factory);
return tx_buf::from_packet_copy(std::move(p), *this);
});
}
}
dpdk_device& port() const { return *_dev; }
tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
private:
template <class Func>
@@ -930,10 +1082,17 @@ private:
_tx_burst.data() + _tx_burst_idx,
_tx_burst.size() - _tx_burst_idx);
uint64_t nr_frags = 0, bytes = 0;
for (int i = 0; i < sent; i++) {
rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
bytes += rte_mbuf_pkt_len(m);
nr_frags += rte_mbuf_nb_segs(m);
pb.pop_front();
}
_stats.tx.good.update_frags_stats(nr_frags, bytes);
_tx_burst_idx += sent;
if (_tx_burst_idx == _tx_burst.size()) {
@@ -1039,9 +1198,19 @@ private:
* Translate rte_mbuf into the "packet".
* @param m mbuf to translate
*
* @return a "packet" object representing the newly received data
* @return a "optional" object representing the newly received data if in an
* "engaged" state or an error if in a "disengaged" state.
*/
packet from_mbuf(rte_mbuf* m);
std::experimental::optional<packet> from_mbuf(rte_mbuf* m);
/**
* Transform an LRO rte_mbuf cluster into the "packet" object.
* @param m HEAD of the mbufs' cluster to transform
*
* @return a "optional" object representing the newly received LRO packet if
* in an "engaged" state or an error if in a "disengaged" state.
*/
std::experimental::optional<packet> from_mbuf_lro(rte_mbuf* m);
private:
dpdk_device* _dev;
@@ -1049,6 +1218,8 @@ private:
rte_mempool *_pktmbuf_pool_rx;
std::vector<rte_mbuf*> _rx_free_pkts;
std::vector<rte_mbuf*> _rx_free_bufs;
std::vector<fragment> _frags;
std::vector<char*> _bufs;
size_t _num_rx_free_segs = 0;
reactor::poller _rx_gc_poller;
std::unique_ptr<void, free_deleter> _rx_xmem;
@@ -1079,8 +1250,38 @@ int dpdk_device::init_port_start()
_tx_conf_default.tx_free_thresh = 0; /* Use PMD default values */
_tx_conf_default.tx_rs_thresh = 0; /* Use PMD default values */
#else
// Clear txq_flags - we want to support all available offload features.
_dev_info.default_txconf.txq_flags = 0;
// Clear txq_flags - we want to support all available offload features
// except for multi-mempool and refcnt'ing which we don't need
_dev_info.default_txconf.txq_flags =
ETH_TXQ_FLAGS_NOMULTMEMP | ETH_TXQ_FLAGS_NOREFCOUNT;
//
// Disable features that are not supported by port's HW
//
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
}
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
}
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
}
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
}
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
}
if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO)) {
_dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
}
#endif
/* for port configuration all features are off by default */
@@ -1133,6 +1334,19 @@ int dpdk_device::init_port_start()
port_conf.rxmode.hw_vlan_strip = 1;
}
// Enable HW CRC stripping
port_conf.rxmode.hw_strip_crc = 1;
#ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
// Enable LRO
if (_use_lro && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)) {
printf("LRO is on\n");
port_conf.rxmode.enable_lro = 1;
_hw_features.rx_lro = true;
} else
#endif
printf("LRO is off\n");
// Check that all CSUM features are either all set all together or not set
// all together. If this assumption breaks we need to rework the below logic
// by splitting the csum offload feature bit into separate bits for IPv4,
@@ -1209,8 +1423,48 @@ int dpdk_device::init_port_start()
return 0;
}
void dpdk_device::set_hw_flow_control()
{
// Read the port's current/default flow control settings
struct rte_eth_fc_conf fc_conf;
auto ret = rte_eth_dev_flow_ctrl_get(_port_idx, &fc_conf);
if (ret == -ENOTSUP) {
goto not_supported;
}
if (ret < 0) {
rte_exit(EXIT_FAILURE, "Port %u: failed to get hardware flow control settings: (error %d)\n", _port_idx, ret);
}
if (_enable_fc) {
fc_conf.mode = RTE_FC_FULL;
} else {
fc_conf.mode = RTE_FC_NONE;
}
ret = rte_eth_dev_flow_ctrl_set(_port_idx, &fc_conf);
if (ret == -ENOTSUP) {
goto not_supported;
}
if (ret < 0) {
rte_exit(EXIT_FAILURE, "Port %u: failed to set hardware flow control (error %d)\n", _port_idx, ret);
}
printf("Port %u: %s HW FC\n", _port_idx,
(_enable_fc ? "Enabling" : "Disabling"));
return;
not_supported:
printf("Port %u: Changing HW FC settings is not supported\n", _port_idx);
}
void dpdk_device::init_port_fini()
{
// Changing FC requires HW reset, so set it before the port is initialized.
set_hw_flow_control();
if (rte_eth_dev_start(_port_idx) < 0) {
rte_exit(EXIT_FAILURE, "Cannot start port %d\n", _port_idx);
}
@@ -1364,6 +1618,9 @@ void dpdk_device::check_port_link_status()
("full-duplex") : ("half-duplex\n")) <<
std::endl;
_link_ready_promise.set_value();
// We may start collecting statistics only after the Link is UP.
_stats_collector.arm_periodic(2s);
} else if (count++ < max_check_time) {
std::cout << "." << std::flush;
return;
@@ -1377,8 +1634,9 @@ void dpdk_device::check_port_link_status()
}
template <bool HugetlbfsMemBackend>
dpdk_qp<HugetlbfsMemBackend>::dpdk_qp(dpdk_device* dev, uint8_t qid)
: _dev(dev), _qid(qid),
dpdk_qp<HugetlbfsMemBackend>::dpdk_qp(dpdk_device* dev, uint8_t qid,
const std::string stats_plugin_name)
: qp(true, stats_plugin_name, qid), _dev(dev), _qid(qid),
_rx_gc_poller([&] { return rx_gc(); }),
_tx_buf_factory(qid),
_tx_gc_poller([&] { return _tx_buf_factory.gc(); })
@@ -1405,6 +1663,34 @@ dpdk_qp<HugetlbfsMemBackend>::dpdk_qp(dpdk_device* dev, uint8_t qid)
rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_tx_conf()) < 0) {
rte_exit(EXIT_FAILURE, "Cannot initialize tx queue\n");
}
// Register error statistics: Rx total and checksum errors
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_rx_errors", "Bad CSUM")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.csum)
));
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_rx_errors", "Total")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.total)
));
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_rx_errors", "No Memory")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.bad.no_mem)
));
}
template <bool HugetlbfsMemBackend>
@@ -1413,47 +1699,112 @@ void dpdk_qp<HugetlbfsMemBackend>::rx_start() {
}
template<>
inline packet dpdk_qp<false>::from_mbuf(rte_mbuf* m)
inline std::experimental::optional<packet>
dpdk_qp<false>::from_mbuf_lro(rte_mbuf* m)
{
//
// Try to allocate a buffer for packet's data. If we fail - give the
// application an mbuf itself. If we succeed - copy the data into this
// buffer, create a packet based on this buffer and return the mbuf to its
// pool.
// Try to allocate a buffer for the whole packet's data.
// If we fail - construct the packet from mbufs.
// If we succeed - copy the data into this buffer, create a packet based on
// this buffer and return the mbuf to its pool.
//
auto len = rte_pktmbuf_data_len(m);
char* buf = (char*)malloc(len);
if (!buf) {
fragment f{rte_pktmbuf_mtod(m, char*), len};
return packet(f, make_deleter(deleter(), [m] { rte_pktmbuf_free(m); }));
} else {
rte_memcpy(buf, rte_pktmbuf_mtod(m, char*), len);
auto pkt_len = rte_pktmbuf_pkt_len(m);
char* buf = (char*)malloc(pkt_len);
if (buf) {
// Copy the contents of the packet into the buffer we've just allocated
size_t offset = 0;
for (rte_mbuf* m1 = m; m1 != nullptr; m1 = m1->next) {
char* data = rte_pktmbuf_mtod(m1, char*);
auto len = rte_pktmbuf_data_len(m1);
rte_memcpy(buf + offset, data, len);
offset += len;
}
rte_pktmbuf_free(m);
fragment f{buf, len};
return packet(f, make_free_deleter(buf));
return packet(fragment{buf, pkt_len}, make_free_deleter(buf));
}
// Drop if allocation failed
rte_pktmbuf_free(m);
return std::experimental::nullopt;
}
template<>
inline std::experimental::optional<packet>
dpdk_qp<false>::from_mbuf(rte_mbuf* m)
{
if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
//
// Try to allocate a buffer for packet's data. If we fail - give the
// application an mbuf itself. If we succeed - copy the data into this
// buffer, create a packet based on this buffer and return the mbuf to
// its pool.
//
auto len = rte_pktmbuf_data_len(m);
char* buf = (char*)malloc(len);
if (!buf) {
// Drop if allocation failed
rte_pktmbuf_free(m);
return std::experimental::nullopt;
} else {
rte_memcpy(buf, rte_pktmbuf_mtod(m, char*), len);
rte_pktmbuf_free(m);
return packet(fragment{buf, len}, make_free_deleter(buf));
}
} else {
return from_mbuf_lro(m);
}
}
template<>
inline packet dpdk_qp<true>::from_mbuf(rte_mbuf* m)
inline std::experimental::optional<packet>
dpdk_qp<true>::from_mbuf_lro(rte_mbuf* m)
{
char* data = rte_pktmbuf_mtod(m, char*);
_frags.clear();
_bufs.clear();
fragment f{data, rte_pktmbuf_data_len(m)};
packet p(f, make_free_deleter(data));
for (; m != nullptr; m = m->next) {
char* data = rte_pktmbuf_mtod(m, char*);
_frags.emplace_back(fragment{data, rte_pktmbuf_data_len(m)});
_bufs.push_back(data);
}
return packet(_frags.begin(), _frags.end(),
make_deleter(deleter(),
[bufs_vec = std::move(_bufs)] {
for (auto&& b : bufs_vec) {
free(b);
}
}));
}
template<>
inline std::experimental::optional<packet> dpdk_qp<true>::from_mbuf(rte_mbuf* m)
{
_rx_free_pkts.push_back(m);
_num_rx_free_segs += rte_mbuf_nb_segs(m);
return p;
if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
char* data = rte_pktmbuf_mtod(m, char*);
return packet(fragment{data, rte_pktmbuf_data_len(m)},
make_free_deleter(data));
} else {
return from_mbuf_lro(m);
}
}
template <bool HugetlbfsMemBackend>
inline bool dpdk_qp<HugetlbfsMemBackend>::refill_one_cluster(rte_mbuf* head)
{
while (head != NULL) {
struct rte_mbuf *m_next = head->next;
for (; head != nullptr; head = head->next) {
if (!refill_rx_mbuf(head)) {
//
// If we failed to allocate a new buffer - push the rest of the
@@ -1463,7 +1814,6 @@ inline bool dpdk_qp<HugetlbfsMemBackend>::refill_one_cluster(rte_mbuf* head)
return false;
}
_rx_free_bufs.push_back(head);
head = m_next;
}
return true;
@@ -1512,18 +1862,22 @@ template <bool HugetlbfsMemBackend>
void dpdk_qp<HugetlbfsMemBackend>::process_packets(
struct rte_mbuf **bufs, uint16_t count)
{
update_rx_count(count);
uint64_t nr_frags = 0, bytes = 0;
for (uint16_t i = 0; i < count; i++) {
struct rte_mbuf *m = bufs[i];
offload_info oi;
// TODO: Remove this when implement LRO support
if (!rte_pktmbuf_is_contiguous(m)) {
rte_exit(EXIT_FAILURE,
"DPDK-Rx: Have got a fragmented buffer - not supported\n");
std::experimental::optional<packet> p = from_mbuf(m);
// Drop the packet if translation above has failed
if (!p) {
_stats.rx.bad.inc_no_mem();
continue;
}
packet p = from_mbuf(m);
nr_frags += rte_mbuf_nb_segs(m);
bytes += rte_mbuf_pkt_len(m);
// Set stipped VLAN value if available
if ((_dev->_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) &&
@@ -1535,6 +1889,7 @@ void dpdk_qp<HugetlbfsMemBackend>::process_packets(
if (_dev->hw_features().rx_csum_offload) {
if (m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
// Packet with bad checksum, just drop it.
_stats.rx.bad.inc_csum_err();
continue;
}
// Note that when _hw_features.rx_csum_offload is on, the receive
@@ -1542,12 +1897,20 @@ void dpdk_qp<HugetlbfsMemBackend>::process_packets(
// the checksum again, because we did this here.
}
p.set_offload_info(oi);
(*p).set_offload_info(oi);
if (m->ol_flags & PKT_RX_RSS_HASH) {
p.set_rss_hash(rte_mbuf_rss_hash(m));
(*p).set_rss_hash(rte_mbuf_rss_hash(m));
}
_dev->l2receive(std::move(p));
_dev->l2receive(std::move(*p));
}
_stats.rx.good.update_pkts_bunch(count);
_stats.rx.good.update_frags_stats(nr_frags, bytes);
if (!HugetlbfsMemBackend) {
_stats.rx.good.copy_frags = _stats.rx.good.nr_frags;
_stats.rx.good.copy_bytes = _stats.rx.good.bytes;
}
}
@@ -1613,9 +1976,11 @@ std::unique_ptr<qp> dpdk_device::init_local_queue(boost::program_options::variab
std::unique_ptr<qp> qp;
if (opts.count("hugepages")) {
qp = std::make_unique<dpdk_qp<true>>(this, qid);
qp = std::make_unique<dpdk_qp<true>>(this, qid,
_stats_plugin_name + "-" + _stats_plugin_inst);
} else {
qp = std::make_unique<dpdk_qp<false>>(this, qid);
qp = std::make_unique<dpdk_qp<false>>(this, qid,
_stats_plugin_name + "-" + _stats_plugin_inst);
}
smp::submit_to(_home_cpu, [this] () mutable {
@@ -1631,7 +1996,9 @@ std::unique_ptr<qp> dpdk_device::init_local_queue(boost::program_options::variab
std::unique_ptr<net::device> create_dpdk_net_device(
uint8_t port_idx,
uint8_t num_queues)
uint8_t num_queues,
bool use_lro,
bool enable_fc)
{
static bool called = false;
@@ -1647,7 +2014,8 @@ std::unique_ptr<net::device> create_dpdk_net_device(
printf("ports number: %d\n", rte_eth_dev_count());
}
return std::make_unique<dpdk::dpdk_device>(port_idx, num_queues);
return std::make_unique<dpdk::dpdk_device>(port_idx, num_queues, use_lro,
enable_fc);
}
boost::program_options::options_description
@@ -1655,6 +2023,11 @@ get_dpdk_net_options_description()
{
boost::program_options::options_description opts(
"DPDK net options");
opts.add_options()
("hw-fc",
boost::program_options::value<std::string>()->default_value("on"),
"Enable HW Flow Control (on / off)");
#if 0
opts.add_options()
("csum-offload",

View File

@@ -30,7 +30,9 @@
std::unique_ptr<net::device> create_dpdk_net_device(
uint8_t port_idx = 0,
uint8_t num_queues = 1);
uint8_t num_queues = 1,
bool use_lro = true,
bool enable_fc = true);
boost::program_options::options_description get_dpdk_net_options_description();

View File

@@ -85,7 +85,9 @@ void create_native_net_device(boost::program_options::variables_map opts) {
if (opts.count("dpdk-pmd")) {
// Hardcoded port index 0.
// TODO: Inherit it from the opts
dev = create_dpdk_net_device(0, smp::count);
dev = create_dpdk_net_device(0, smp::count,
!(opts.count("lro") && opts["lro"].as<std::string>() == "off"),
!(opts.count("hw-fc") && opts["hw-fc"].as<std::string>() == "off"));
} else
#endif
dev = create_virtio_net_device(opts);
@@ -325,6 +327,9 @@ boost::program_options::options_description nns_options() {
#ifdef HAVE_DPDK
("dpdk-pmd", "Use DPDK PMD drivers")
#endif
("lro",
boost::program_options::value<std::string>()->default_value("on"),
"Enable LRO")
;
add_native_net_options_description(opts);

View File

@@ -62,47 +62,136 @@ bool qp::poll_tx() {
}
}
} while (work && _tx_packetq.size() < 128);
}
if (!_tx_packetq.empty()) {
_last_tx_bunch = send(_tx_packetq);
_packets_snt += _last_tx_bunch;
_stats.tx.good.update_pkts_bunch(send(_tx_packetq));
return true;
}
return false;
}
qp::qp()
qp::qp(bool register_copy_stats,
const std::string stats_plugin_name, uint8_t qid)
: _tx_poller([this] { return poll_tx(); })
, _stats_plugin_name(stats_plugin_name)
, _queue_name(std::string("queue") + std::to_string(qid))
, _collectd_regs({
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last tx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
//
// Packets rate: DERIVE:0:u
//
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_packets", _queue_name)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.packets)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.packets)
),
//
// Bytes rate: DERIVE:0:U
//
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_octets", _queue_name)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.bytes)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.bytes)
),
//
// Queue length: GAUGE:0:U
//
// Tx
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "queue_length", "tx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch)
, scollectd::make_typed(scollectd::data_type::GAUGE
, std::bind(&decltype(_tx_packetq)::size, &_tx_packetq))
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
//
// Number of packets in last bunch: GAUGE:0:U
//
// Tx
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "total_operations", "tx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt)
, "requests", "tx-packet-queue-last-bunch")
, scollectd::make_typed(scollectd::data_type::GAUGE
, _stats.tx.good.last_bunch)
),
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last rx bunch.
scollectd::add_polled_metric(scollectd::type_instance_id("network"
// Rx
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "queue_length", "rx-packet-queue")
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch)
, "requests", "rx-packet-queue-last-bunch")
, scollectd::make_typed(scollectd::data_type::GAUGE
, _stats.rx.good.last_bunch)
),
// total_operations value:DERIVE:0:U
scollectd::add_polled_metric(scollectd::type_instance_id("network"
//
// Fragments rate: DERIVE:0:U
//
// Tx
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "total_operations", "rx-packets")
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv)
, "total_operations", "tx-frags")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.nr_frags)
),
}) {
// Rx
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "total_operations", "rx-frags")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.nr_frags)
),
})
{
if (register_copy_stats) {
//
// Non-zero-copy data bytes rate: DERIVE:0:u
//
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "if_octets", _queue_name + " Copy Bytes")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.copy_bytes)
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.copy_bytes)
));
//
// Non-zero-copy data fragments rate: DERIVE:0:u
//
// Tx
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "total_operations", "tx-frags-copy")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.tx.good.copy_frags)
));
// Rx
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id(
_stats_plugin_name
, scollectd::per_cpu_plugin_instance
, "total_operations", "rx-frags-copy")
, scollectd::make_typed(scollectd::data_type::DERIVE
, _stats.rx.good.copy_frags)
));
}
}
qp::~qp() {

View File

@@ -71,6 +71,8 @@ struct hw_features {
bool tx_csum_l4_offload = false;
// Enable rx checksum offload
bool rx_csum_offload = false;
// LRO is enabled
bool rx_lro = false;
// Enable tx TCP segment offload
bool tx_tso = false;
// Enable tx UDP fragmentation offload
@@ -131,6 +133,81 @@ public:
friend class l3_protocol;
};
struct qp_stats_good {
/**
* Update the packets bunch related statistics.
*
* Update the last packets bunch size and the total packets counter.
*
* @param count Number of packets in the last packets bunch.
*/
void update_pkts_bunch(uint64_t count) {
last_bunch = count;
packets += count;
}
/**
* Increment the appropriate counters when a few fragments have been
* processed in a copy-way.
*
* @param nr_frags Number of copied fragments
* @param bytes Number of copied bytes
*/
void update_copy_stats(uint64_t nr_frags, uint64_t bytes) {
copy_frags += nr_frags;
copy_bytes += bytes;
}
/**
* Increment total fragments and bytes statistics
*
* @param nfrags Number of processed fragments
* @param nbytes Number of bytes in the processed fragments
*/
void update_frags_stats(uint64_t nfrags, uint64_t nbytes) {
nr_frags += nfrags;
bytes += nbytes;
}
uint64_t bytes; // total number of bytes
uint64_t nr_frags; // total number of fragments
uint64_t copy_frags; // fragments that were copied on L2 level
uint64_t copy_bytes; // bytes that were copied on L2 level
uint64_t packets; // total number of packets
uint64_t last_bunch; // number of packets in the last sent/received bunch
};
struct qp_stats {
qp_stats() {
std::memset(&rx, 0, sizeof(rx));
std::memset(&tx, 0, sizeof(tx));
}
struct {
struct qp_stats_good good;
struct {
void inc_csum_err() {
++csum;
++total;
}
void inc_no_mem() {
++no_mem;
++total;
}
uint64_t no_mem; // Packets dropped due to allocation failure
uint64_t total; // total number of erroneous packets
uint64_t csum; // packets with bad checksum
} bad;
} rx;
struct {
struct qp_stats_good good;
} tx;
};
class qp {
using packet_provider_type = std::function<std::experimental::optional<packet> ()>;
std::vector<packet_provider_type> _pkt_providers;
@@ -139,18 +216,17 @@ class qp {
stream<packet> _rx_stream;
reactor::poller _tx_poller;
circular_buffer<packet> _tx_packetq;
uint64_t _packets_snt = 0;
uint64_t _packets_rcv = 0;
uint64_t _last_tx_bunch = 0;
uint64_t _last_rx_bunch = 0;
std::vector<scollectd::registration> _collectd_regs;
protected:
void update_rx_count(uint64_t count) {
_last_rx_bunch = count;
_packets_rcv += count;
}
const std::string _stats_plugin_name;
const std::string _queue_name;
std::vector<scollectd::registration> _collectd_regs;
qp_stats _stats;
public:
qp();
qp(bool register_copy_stats = false,
const std::string stats_plugin_name = std::string("network"),
uint8_t qid = 0);
virtual ~qp();
virtual future<> send(packet p) = 0;
virtual uint32_t send(circular_buffer<packet>& p) {

View File

@@ -39,6 +39,7 @@
#include <experimental/optional>
#include <random>
#include <stdexcept>
#include <system_error>
#define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
#include <cryptopp/md5.h>
@@ -49,24 +50,20 @@ namespace net {
class tcp_hdr;
class tcp_error : public std::runtime_error {
public:
tcp_error(const std::string& msg) : std::runtime_error(msg) {}
inline auto tcp_error(int err) {
return std::system_error(err, std::system_category());
}
inline auto tcp_reset_error() {
return tcp_error(ECONNRESET);
};
class tcp_reset_error : public tcp_error {
public:
tcp_reset_error() : tcp_error("connection is reset") {}
};
inline auto tcp_connect_error() {
return tcp_error(ECONNABORTED);
}
class tcp_connect_error : public tcp_error {
public:
tcp_connect_error() : tcp_error("fail to connect") {}
};
class tcp_refused_error : public tcp_error {
public:
tcp_refused_error() : tcp_error("connection refused") {}
inline auto tcp_refused_error() {
return tcp_error(ECONNREFUSED);
};
enum class tcp_state : uint16_t {
@@ -1481,9 +1478,7 @@ void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
// segment length set to 0. All the rest is the same as for a TCP Tx
// CSUM offload case.
//
if (_tcp.hw_features().tx_tso &&
p.len() > _snd.mss + sizeof(eth_hdr) + oi.ip_hdr_len +
oi.tcp_hdr_len) {
if (_tcp.hw_features().tx_tso && len > _snd.mss) {
oi.tso_seg_size = _snd.mss;
} else {
pseudo_hdr_seg_len = sizeof(*th) + options_size + len;

View File

@@ -87,11 +87,18 @@ private:
}
if (!(_opts.count("tso") && _opts["tso"].as<std::string>() == "off")) {
seastar_supported_features |= VIRTIO_NET_F_HOST_TSO4;
seastar_supported_features |= VIRTIO_NET_F_GUEST_TSO4;
_hw_features.tx_tso = true;
} else {
_hw_features.tx_tso = false;
}
if (!(_opts.count("lro") && _opts["lro"].as<std::string>() == "off")) {
seastar_supported_features |= VIRTIO_NET_F_GUEST_TSO4;
_hw_features.rx_lro = true;
} else {
_hw_features.rx_lro = false;
}
if (!(_opts.count("ufo") && _opts["ufo"].as<std::string>() == "off")) {
seastar_supported_features |= VIRTIO_NET_F_HOST_UFO;
seastar_supported_features |= VIRTIO_NET_F_GUEST_UFO;
@@ -546,7 +553,7 @@ protected:
_ring.wake_notifier_wait();
}
void update_rx_count(uint64_t c) {
_dev.update_rx_count(c);
_dev._stats.rx.good.update_pkts_bunch(c);
}
private:
future<> prepare_buffers();
@@ -581,11 +588,17 @@ qp::txq::txq(qp& dev, ring_config config)
uint32_t
qp::txq::post(circular_buffer<packet>& pb) {
uint64_t bytes = 0, nr_frags = 0;
_packets.clear();
while (!pb.empty() && pb.front().nr_frags() + 1 <= _ring.available_descriptors().current()) {
net_hdr_mrg vhdr = {};
auto p = std::move(pb.front());
bytes += p.len();
nr_frags += p.nr_frags();
pb.pop_front();
// Handle TCP checksum offload
auto oi = p.offload_info();
@@ -632,6 +645,9 @@ qp::txq::post(circular_buffer<packet>& pb) {
_packets.emplace_back(packet_as_buffer_chain{ std::move(q) });
}
_ring.post(_packets.begin(), _packets.end());
_dev._stats.tx.good.update_frags_stats(nr_frags, bytes);
return _packets.size();
}
@@ -713,7 +729,12 @@ qp::rxq::complete_buffer(single_buffer&& bc, size_t len) {
del = make_object_deleter(std::move(_buffers));
}
packet p(_fragments.begin(), _fragments.end(), std::move(del));
_dev._stats.rx.good.update_frags_stats(p.nr_frags(), p.len());
_dev._dev->l2receive(std::move(p));
_ring.available_descriptors().signal(_fragments.size());
}
}

View File

@@ -119,6 +119,7 @@ public:
~xenfront_qp();
virtual void rx_start() override;
virtual future<> send(packet p) override;
void inc_rx_error_count() { ++_stats.rx.bad.total; }
};
std::unordered_map<std::string, std::string>
@@ -149,6 +150,10 @@ xenfront_qp::send(packet _p) {
//
// In-kernel should be fine
if (_p.nr_frags() > 1) {
_stats.tx.good.update_copy_stats(_p.nr_frags(), _p.len());
}
// FIXME: negotiate and use scatter/gather
_p.linearize();
@@ -180,14 +185,15 @@ xenfront_qp::send(packet _p) {
_tx_ring.req_prod_pvt = idx;
_tx_ring._sring->req_prod = req_prod + 1;
_tx_ring._sring->req_event++;
if ((frag + 1) == p.nr_frags()) {
_tx_evtchn.notify();
return make_ready_future<>();
} else {
return make_ready_future<>();
}
_stats.tx.good.update_frags_stats(1, f.size);
return make_ready_future<>();
});
// FIXME: Don't forget to clear all grant refs when frontend closes. Or is it automatic?
@@ -212,7 +218,7 @@ unsigned front_ring<T>::entries::get_index() {
}
template <typename T>
future<> front_ring<T>::process_ring(std::function<bool (gntref &entry, T& el)> func, grant_head *refs)
void front_ring<T>::process_ring(std::function<bool (gntref &entry, T& el)> func, grant_head *refs)
{
auto prod = _sring->rsp_prod;
rmb();
@@ -222,6 +228,7 @@ future<> front_ring<T>::process_ring(std::function<bool (gntref &entry, T& el)>
if (el.rsp.status < 0) {
dump("Packet error", el.rsp);
_dev.inc_rx_error_count();
continue;
}
@@ -240,20 +247,32 @@ future<> front_ring<T>::process_ring(std::function<bool (gntref &entry, T& el)>
}
rsp_cons = prod;
_sring->rsp_event = prod + 1;
return make_ready_future<>();
}
future<> xenfront_qp::queue_rx_packet()
{
uint64_t bunch;
return _rx_ring.process_ring([this, &bunch] (gntref &entry, rx &rx) mutable {
uint64_t bunch = 0;
uint64_t bytes = 0;
_rx_ring.process_ring([this, &bunch, &bytes] (gntref &entry, rx &rx) mutable {
packet p(static_cast<char *>(entry.page) + rx.rsp.offset, rx.rsp.status);
_dev->l2receive(std::move(p));
bytes += rx.rsp.status;
bunch++;
return true;
}, _rx_refs);
update_rx_count(bunch);
_stats.rx.good.update_pkts_bunch(bunch);
//
// Our XEN implementation only supports packets with a single fragment
// at the moment.
//
_stats.rx.good.update_frags_stats(bunch, bytes);
return make_ready_future<>();
}
void xenfront_qp::alloc_one_rx_reference(unsigned index) {
@@ -283,7 +302,7 @@ future<> xenfront_qp::alloc_rx_references() {
future<> xenfront_qp::handle_tx_completions() {
return _tx_ring.process_ring([this] (gntref &entry, tx &tx) {
_tx_ring.process_ring([this] (gntref &entry, tx &tx) {
if (tx.rsp.status == 1) {
return false;
}
@@ -295,6 +314,8 @@ future<> xenfront_qp::handle_tx_completions() {
return true;
}, _tx_refs);
return make_ready_future<>();
}
port xenfront_qp::bind_tx_evtchn(bool split) {
@@ -310,13 +331,13 @@ port xenfront_qp::bind_rx_evtchn(bool split) {
}
xenfront_qp::xenfront_qp(xenfront_device* dev, boost::program_options::variables_map opts)
: _dev(dev)
: qp(true), _dev(dev)
, _otherend(_dev->_xenstore->read<int>(path("backend-id")))
, _backend(_dev->_xenstore->read(path("backend")))
, _gntalloc(gntalloc::instance(_dev->_userspace, _otherend))
, _evtchn(evtchn::instance(_dev->_userspace, _otherend))
, _tx_ring(_gntalloc->alloc_ref())
, _rx_ring(_gntalloc->alloc_ref())
, _tx_ring(_gntalloc->alloc_ref(), *this)
, _rx_ring(_gntalloc->alloc_ref(), *this)
, _tx_refs(_gntalloc->alloc_ref(front_ring<tx>::nr_ents))
, _rx_refs(_gntalloc->alloc_ref(front_ring<rx>::nr_ents)) {
@@ -360,6 +381,15 @@ xenfront_qp::xenfront_qp(xenfront_device* dev, boost::program_options::variables
_dev->_xenstore->write<int>(path("state"), 4, t);
}
// Register Rx error statistics
_collectd_regs.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("network"
, scollectd::per_cpu_plugin_instance
, "requests", "rx-errors")
, scollectd::make_typed(scollectd::data_type::GAUGE
, _stats.rx.bad.total)
));
keep_doing([this] {
return alloc_rx_references();
});

View File

@@ -90,6 +90,7 @@ public:
};
using phys = uint64_t;
class xenfront_qp;
template <typename T>
class front_ring {
@@ -118,14 +119,14 @@ public:
uint32_t rsp_cons = 0;
int32_t ref = -1;
front_ring(gntref r)
front_ring(gntref r, xenfront_qp& dev)
: ref(r.xen_id), entries(this)
, _sring(new (r.page) sring<T>()) {
, _sring(new (r.page) sring<T>()), _dev(dev) {
}
entries entries;
future<> process_ring(std::function<bool (gntref &entry, T& el)> func, grant_head *refs);
void process_ring(std::function<bool (gntref &entry, T& el)> func, grant_head *refs);
void dump() {
_sring->dump();
@@ -145,6 +146,8 @@ public:
sring<T> *_sring;
T& operator[](std::size_t i) { return _sring->_ring[idx(i)]; }
private:
xenfront_qp& _dev;
};
}

View File

@@ -85,6 +85,10 @@ BOOST_AUTO_TEST_CASE(test_decode_url) {
sstring url = http_server::connection::set_query_param(req);
BOOST_REQUIRE_EQUAL(url, "/a");
BOOST_REQUIRE_EQUAL(req.get_query_param("q"), "#$#");
req._url = "/a?a=%23%24%23&b=%22%26%22";
http_server::connection::set_query_param(req);
BOOST_REQUIRE_EQUAL(req.get_query_param("a"), "#$#");
BOOST_REQUIRE_EQUAL(req.get_query_param("b"), "\"&\"");
}
BOOST_AUTO_TEST_CASE(test_routes) {