mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
net: move some device and qp methods out-of-line
This commit is contained in:
114
net/net.cc
114
net/net.cc
@@ -11,6 +11,120 @@ using std::move;
|
||||
|
||||
namespace net {
|
||||
|
||||
inline
|
||||
bool qp::poll_tx() {
|
||||
if (_tx_packetq.size() < 16) {
|
||||
// refill send queue from upper layers
|
||||
uint32_t work;
|
||||
do {
|
||||
work = 0;
|
||||
for (auto&& pr : _pkt_providers) {
|
||||
auto p = pr();
|
||||
if (p) {
|
||||
work++;
|
||||
_tx_packetq.push_back(std::move(p.value()));
|
||||
if (_tx_packetq.size() == 128) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (work && _tx_packetq.size() < 128);
|
||||
|
||||
}
|
||||
if (!_tx_packetq.empty()) {
|
||||
_last_tx_bunch = send(_tx_packetq);
|
||||
_packets_snt += _last_tx_bunch;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
qp::qp()
|
||||
: _tx_poller([this] { return poll_tx(); })
|
||||
, _collectd_regs({
|
||||
// queue_length value:GAUGE:0:U
|
||||
// Absolute value of num packets in last tx bunch.
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "tx-packet-queue")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "tx-packets")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt)
|
||||
),
|
||||
// queue_length value:GAUGE:0:U
|
||||
// Absolute value of num packets in last rx bunch.
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "rx-packet-queue")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "rx-packets")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv)
|
||||
),
|
||||
}) {
|
||||
}
|
||||
|
||||
qp::~qp() {
|
||||
}
|
||||
|
||||
void qp::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
|
||||
assert(!cpu_weights.empty());
|
||||
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) {
|
||||
// special case queue sending to self only, to avoid requiring a hash value
|
||||
return;
|
||||
}
|
||||
register_packet_provider([this] {
|
||||
std::experimental::optional<packet> p;
|
||||
if (!_proxy_packetq.empty()) {
|
||||
p = std::move(_proxy_packetq.front());
|
||||
_proxy_packetq.pop_front();
|
||||
}
|
||||
return p;
|
||||
});
|
||||
build_sw_reta(cpu_weights);
|
||||
}
|
||||
|
||||
void qp::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
|
||||
float total_weight = 0;
|
||||
for (auto&& x : cpu_weights) {
|
||||
total_weight += x.second;
|
||||
}
|
||||
float accum = 0;
|
||||
unsigned idx = 0;
|
||||
std::array<uint8_t, 128> reta;
|
||||
for (auto&& entry : cpu_weights) {
|
||||
auto cpu = entry.first;
|
||||
auto weight = entry.second;
|
||||
accum += weight;
|
||||
while (idx < (accum / total_weight * reta.size() - 0.5)) {
|
||||
reta[idx++] = cpu;
|
||||
}
|
||||
}
|
||||
_sw_reta = reta;
|
||||
}
|
||||
|
||||
subscription<packet>
|
||||
device::receive(std::function<future<> (packet)> next_packet) {
|
||||
auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet));
|
||||
_queues[engine.cpu_id()]->rx_start();
|
||||
return std::move(sub);
|
||||
}
|
||||
|
||||
void device::set_local_queue(std::unique_ptr<qp> dev) {
|
||||
assert(!_queues[engine.cpu_id()]);
|
||||
_queues[engine.cpu_id()] = dev.get();
|
||||
engine.at_destroy([dev = std::move(dev)] {});
|
||||
}
|
||||
|
||||
|
||||
l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func)
|
||||
: _netif(netif), _proto_num(proto_num) {
|
||||
_netif->register_packet_provider(std::move(func));
|
||||
|
||||
107
net/net.hh
107
net/net.hh
@@ -133,35 +133,8 @@ protected:
|
||||
_packets_rcv += count;
|
||||
}
|
||||
public:
|
||||
qp() : _tx_poller([this] { return poll_tx(); }), _collectd_regs({
|
||||
// queue_length value:GAUGE:0:U
|
||||
// Absolute value of num packets in last tx bunch.
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "tx-packet-queue")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_tx_bunch)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "tx-packets")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_snt)
|
||||
),
|
||||
// queue_length value:GAUGE:0:U
|
||||
// Absolute value of num packets in last rx bunch.
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "queue_length", "rx-packet-queue")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, _last_rx_bunch)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("network"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "rx-packets")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _packets_rcv)
|
||||
),
|
||||
}) {}
|
||||
virtual ~qp() {}
|
||||
qp();
|
||||
virtual ~qp();
|
||||
virtual future<> send(packet p) = 0;
|
||||
virtual uint32_t send(circular_buffer<packet>& p) {
|
||||
uint32_t sent = 0;
|
||||
@@ -173,74 +146,16 @@ public:
|
||||
return sent;
|
||||
}
|
||||
virtual void rx_start() {};
|
||||
void configure_proxies(const std::map<unsigned, float>& cpu_weights) {
|
||||
assert(!cpu_weights.empty());
|
||||
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) {
|
||||
// special case queue sending to self only, to avoid requiring a hash value
|
||||
return;
|
||||
}
|
||||
register_packet_provider([this] {
|
||||
std::experimental::optional<packet> p;
|
||||
if (!_proxy_packetq.empty()) {
|
||||
p = std::move(_proxy_packetq.front());
|
||||
_proxy_packetq.pop_front();
|
||||
}
|
||||
return p;
|
||||
});
|
||||
build_sw_reta(cpu_weights);
|
||||
}
|
||||
void configure_proxies(const std::map<unsigned, float>& cpu_weights);
|
||||
// build REdirection TAble for cpu_weights map: target cpu -> weight
|
||||
void build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
|
||||
float total_weight = 0;
|
||||
for (auto&& x : cpu_weights) {
|
||||
total_weight += x.second;
|
||||
}
|
||||
float accum = 0;
|
||||
unsigned idx = 0;
|
||||
std::array<uint8_t, 128> reta;
|
||||
for (auto&& entry : cpu_weights) {
|
||||
auto cpu = entry.first;
|
||||
auto weight = entry.second;
|
||||
accum += weight;
|
||||
while (idx < (accum / total_weight * reta.size() - 0.5)) {
|
||||
reta[idx++] = cpu;
|
||||
}
|
||||
}
|
||||
_sw_reta = reta;
|
||||
}
|
||||
void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
|
||||
void proxy_send(packet p) {
|
||||
_proxy_packetq.push_back(std::move(p));
|
||||
}
|
||||
void register_packet_provider(packet_provider_type func) {
|
||||
_pkt_providers.push_back(std::move(func));
|
||||
}
|
||||
bool poll_tx() {
|
||||
if (_tx_packetq.size() < 16) {
|
||||
// refill send queue from upper layers
|
||||
uint32_t work;
|
||||
do {
|
||||
work = 0;
|
||||
for (auto&& pr : _pkt_providers) {
|
||||
auto p = pr();
|
||||
if (p) {
|
||||
work++;
|
||||
_tx_packetq.push_back(std::move(p.value()));
|
||||
if (_tx_packetq.size() == 128) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (work && _tx_packetq.size() < 128);
|
||||
|
||||
}
|
||||
if (!_tx_packetq.empty()) {
|
||||
_last_tx_bunch = send(_tx_packetq);
|
||||
_packets_snt += _last_tx_bunch;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
bool poll_tx();
|
||||
friend class device;
|
||||
};
|
||||
|
||||
@@ -256,11 +171,7 @@ public:
|
||||
qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
|
||||
qp& local_queue() { return queue_for_cpu(engine.cpu_id()); }
|
||||
void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); }
|
||||
subscription<packet> receive(std::function<future<> (packet)> next_packet) {
|
||||
auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet));
|
||||
_queues[engine.cpu_id()]->rx_start();
|
||||
return std::move(sub);
|
||||
}
|
||||
subscription<packet> receive(std::function<future<> (packet)> next_packet);
|
||||
virtual ethernet_address hw_address() = 0;
|
||||
virtual net::hw_features hw_features() = 0;
|
||||
virtual uint16_t hw_queues_count() { return 1; }
|
||||
@@ -269,11 +180,7 @@ public:
|
||||
virtual unsigned hash2qid(uint32_t hash) {
|
||||
return hash % hw_queues_count();
|
||||
}
|
||||
void set_local_queue(std::unique_ptr<qp> dev) {
|
||||
assert(!_queues[engine.cpu_id()]);
|
||||
_queues[engine.cpu_id()] = dev.get();
|
||||
engine.at_destroy([dev = std::move(dev)] {});
|
||||
}
|
||||
void set_local_queue(std::unique_ptr<qp> dev);
|
||||
template <typename Func>
|
||||
unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
|
||||
auto& qp = queue_for_cpu(src_cpuid);
|
||||
|
||||
Reference in New Issue
Block a user