diff --git a/net/net.cc b/net/net.cc index ff6f31b5e1..3de9e7d7da 100644 --- a/net/net.cc +++ b/net/net.cc @@ -11,6 +11,120 @@ using std::move; namespace net { +inline +bool qp::poll_tx() { + if (_tx_packetq.size() < 16) { + // refill send queue from upper layers + uint32_t work; + do { + work = 0; + for (auto&& pr : _pkt_providers) { + auto p = pr(); + if (p) { + work++; + _tx_packetq.push_back(std::move(p.value())); + if (_tx_packetq.size() == 128) { + break; + } + } + } + } while (work && _tx_packetq.size() < 128); + + } + if (!_tx_packetq.empty()) { + _last_tx_bunch = send(_tx_packetq); + _packets_snt += _last_tx_bunch; + return true; + } + + return false; +} + +qp::qp() + : _tx_poller([this] { return poll_tx(); }) + , _collectd_regs({ + // queue_length value:GAUGE:0:U + // Absolute value of num packets in last tx bunch. + scollectd::add_polled_metric(scollectd::type_instance_id("network" + , scollectd::per_cpu_plugin_instance + , "queue_length", "tx-packet-queue") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("network" + , scollectd::per_cpu_plugin_instance + , "total_operations", "tx-packets") + , scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt) + ), + // queue_length value:GAUGE:0:U + // Absolute value of num packets in last rx bunch. + scollectd::add_polled_metric(scollectd::type_instance_id("network" + , scollectd::per_cpu_plugin_instance + , "queue_length", "rx-packet-queue") + , scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("network" + , scollectd::per_cpu_plugin_instance + , "total_operations", "rx-packets") + , scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv) + ), + }) { +} + +qp::~qp() { +} + +void qp::configure_proxies(const std::map& cpu_weights) { + assert(!cpu_weights.empty()); + if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) { + // special case queue sending to self only, to avoid requiring a hash value + return; + } + register_packet_provider([this] { + std::experimental::optional p; + if (!_proxy_packetq.empty()) { + p = std::move(_proxy_packetq.front()); + _proxy_packetq.pop_front(); + } + return p; + }); + build_sw_reta(cpu_weights); +} + +void qp::build_sw_reta(const std::map& cpu_weights) { + float total_weight = 0; + for (auto&& x : cpu_weights) { + total_weight += x.second; + } + float accum = 0; + unsigned idx = 0; + std::array reta; + for (auto&& entry : cpu_weights) { + auto cpu = entry.first; + auto weight = entry.second; + accum += weight; + while (idx < (accum / total_weight * reta.size() - 0.5)) { + reta[idx++] = cpu; + } + } + _sw_reta = reta; +} + +subscription +device::receive(std::function (packet)> next_packet) { + auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet)); + _queues[engine.cpu_id()]->rx_start(); + return std::move(sub); +} + +void device::set_local_queue(std::unique_ptr dev) { + assert(!_queues[engine.cpu_id()]); + _queues[engine.cpu_id()] = dev.get(); + engine.at_destroy([dev = std::move(dev)] {}); +} + + l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func) : _netif(netif), _proto_num(proto_num) { _netif->register_packet_provider(std::move(func)); diff --git a/net/net.hh b/net/net.hh index e9b57d6c78..401c166361 100644 --- a/net/net.hh +++ b/net/net.hh @@ -133,35 +133,8 @@ protected: _packets_rcv += count; } public: - qp() : _tx_poller([this] { return poll_tx(); }), _collectd_regs({ - // queue_length value:GAUGE:0:U - // Absolute value of num packets in last tx bunch. - scollectd::add_polled_metric(scollectd::type_instance_id("network" - , scollectd::per_cpu_plugin_instance - , "queue_length", "tx-packet-queue") - , scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch) - ), - // total_operations value:DERIVE:0:U - scollectd::add_polled_metric(scollectd::type_instance_id("network" - , scollectd::per_cpu_plugin_instance - , "total_operations", "tx-packets") - , scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt) - ), - // queue_length value:GAUGE:0:U - // Absolute value of num packets in last rx bunch. - scollectd::add_polled_metric(scollectd::type_instance_id("network" - , scollectd::per_cpu_plugin_instance - , "queue_length", "rx-packet-queue") - , scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch) - ), - // total_operations value:DERIVE:0:U - scollectd::add_polled_metric(scollectd::type_instance_id("network" - , scollectd::per_cpu_plugin_instance - , "total_operations", "rx-packets") - , scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv) - ), - }) {} - virtual ~qp() {} + qp(); + virtual ~qp(); virtual future<> send(packet p) = 0; virtual uint32_t send(circular_buffer& p) { uint32_t sent = 0; @@ -173,74 +146,16 @@ public: return sent; } virtual void rx_start() {}; - void configure_proxies(const std::map& cpu_weights) { - assert(!cpu_weights.empty()); - if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) { - // special case queue sending to self only, to avoid requiring a hash value - return; - } - register_packet_provider([this] { - std::experimental::optional p; - if (!_proxy_packetq.empty()) { - p = std::move(_proxy_packetq.front()); - _proxy_packetq.pop_front(); - } - return p; - }); - build_sw_reta(cpu_weights); - } + void configure_proxies(const std::map& cpu_weights); // build REdirection TAble for cpu_weights map: target cpu -> weight - void build_sw_reta(const std::map& cpu_weights) { - float total_weight = 0; - for (auto&& x : cpu_weights) { - total_weight += x.second; - } - float accum = 0; - unsigned idx = 0; - std::array reta; - for (auto&& entry : cpu_weights) { - auto cpu = entry.first; - auto weight = entry.second; - accum += weight; - while (idx < (accum / total_weight * reta.size() - 0.5)) { - reta[idx++] = cpu; - } - } - _sw_reta = reta; - } + void build_sw_reta(const std::map& cpu_weights); void proxy_send(packet p) { _proxy_packetq.push_back(std::move(p)); } void register_packet_provider(packet_provider_type func) { _pkt_providers.push_back(std::move(func)); } - bool poll_tx() { - if (_tx_packetq.size() < 16) { - // refill send queue from upper layers - uint32_t work; - do { - work = 0; - for (auto&& pr : _pkt_providers) { - auto p = pr(); - if (p) { - work++; - _tx_packetq.push_back(std::move(p.value())); - if (_tx_packetq.size() == 128) { - break; - } - } - } - } while (work && _tx_packetq.size() < 128); - - } - if (!_tx_packetq.empty()) { - _last_tx_bunch = send(_tx_packetq); - _packets_snt += _last_tx_bunch; - return true; - } - - return false; - } + bool poll_tx(); friend class device; }; @@ -256,11 +171,7 @@ public: qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; } qp& local_queue() { return queue_for_cpu(engine.cpu_id()); } void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); } - subscription receive(std::function (packet)> next_packet) { - auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet)); - _queues[engine.cpu_id()]->rx_start(); - return std::move(sub); - } + subscription receive(std::function (packet)> next_packet); virtual ethernet_address hw_address() = 0; virtual net::hw_features hw_features() = 0; virtual uint16_t hw_queues_count() { return 1; } @@ -269,11 +180,7 @@ public: virtual unsigned hash2qid(uint32_t hash) { return hash % hw_queues_count(); } - void set_local_queue(std::unique_ptr dev) { - assert(!_queues[engine.cpu_id()]); - _queues[engine.cpu_id()] = dev.get(); - engine.at_destroy([dev = std::move(dev)] {}); - } + void set_local_queue(std::unique_ptr dev); template unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) { auto& qp = queue_for_cpu(src_cpuid);