Merge branch 'master' of github.com:cloudius-systems/seastar into db

This commit is contained in:
Avi Kivity
2015-01-13 10:03:49 +02:00
13 changed files with 172 additions and 168 deletions

View File

@@ -46,11 +46,11 @@ public:
void push_front(const T& data);
void push_front(T&& data);
template <typename... A>
void emplace_front(A... args);
void emplace_front(A&&... args);
void push_back(const T& data);
void push_back(T&& data);
template <typename... A>
void emplace_back(A... args);
void emplace_back(A&&... args);
T& front();
T& back();
void pop_front();
@@ -187,7 +187,7 @@ template <typename T, typename Alloc>
template <typename... Args>
inline
void
circular_buffer<T, Alloc>::emplace_front(Args... args) {
circular_buffer<T, Alloc>::emplace_front(Args&&... args) {
maybe_expand();
auto p = &_impl.storage[mask(_impl.begin - 1)];
_impl.construct(p, std::forward<Args>(args)...);
@@ -218,7 +218,7 @@ template <typename T, typename Alloc>
template <typename... Args>
inline
void
circular_buffer<T, Alloc>::emplace_back(Args... args) {
circular_buffer<T, Alloc>::emplace_back(Args&&... args) {
maybe_expand();
auto p = &_impl.storage[mask(_impl.end)];
_impl.construct(p, std::forward<Args>(args)...);

View File

@@ -11,6 +11,7 @@
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <linux/fs.h>
#include <sys/uio.h>
enum class directory_entry_type {
block_device,

View File

@@ -123,25 +123,25 @@ template <typename T>
class lw_shared_ptr {
mutable shared_ptr_impl<T>* _p = nullptr;
private:
lw_shared_ptr(shared_ptr_impl<T>* p) : _p(p) {
lw_shared_ptr(shared_ptr_impl<T>* p) noexcept : _p(p) {
if (_p) {
++_p->_count;
}
}
template <typename... A>
static lw_shared_ptr make(A&&... a) {
static lw_shared_ptr make(A&&... a) noexcept {
return lw_shared_ptr(new typename shared_ptr_impl<T>::ctor(std::forward<A>(a)...));
}
public:
using element_type = T;
lw_shared_ptr() = default;
lw_shared_ptr(const lw_shared_ptr& x) : _p(x._p) {
lw_shared_ptr() noexcept = default;
lw_shared_ptr(const lw_shared_ptr& x) noexcept : _p(x._p) {
if (_p) {
++_p->_count;
}
}
lw_shared_ptr(lw_shared_ptr&& x) : _p(x._p) {
lw_shared_ptr(lw_shared_ptr&& x) noexcept : _p(x._p) {
x._p = nullptr;
}
~lw_shared_ptr() {
@@ -149,31 +149,31 @@ public:
delete _p->to_internal_object();
}
}
lw_shared_ptr& operator=(const lw_shared_ptr& x) {
lw_shared_ptr& operator=(const lw_shared_ptr& x) noexcept {
if (_p != x._p) {
this->~lw_shared_ptr();
new (this) lw_shared_ptr(x);
}
return *this;
}
lw_shared_ptr& operator=(lw_shared_ptr&& x) {
lw_shared_ptr& operator=(lw_shared_ptr&& x) noexcept {
if (_p != x._p) {
this->~lw_shared_ptr();
new (this) lw_shared_ptr(std::move(x));
}
return *this;
}
lw_shared_ptr& operator=(T&& x) {
lw_shared_ptr& operator=(T&& x) noexcept {
this->~lw_shared_ptr();
new (this) lw_shared_ptr(make_lw_shared<T>(std::move(x)));
return *this;
}
T& operator*() const { return *_p->to_value(); }
T* operator->() const { return _p->to_value(); }
T* get() const { return _p->to_value(); }
T& operator*() const noexcept { return *_p->to_value(); }
T* operator->() const noexcept { return _p->to_value(); }
T* get() const noexcept { return _p->to_value(); }
long int use_count() {
long int use_count() noexcept {
if (_p) {
return _p->_count;
} else {
@@ -181,15 +181,15 @@ public:
}
}
operator lw_shared_ptr<const T>() const {
operator lw_shared_ptr<const T>() const noexcept {
return lw_shared_ptr<const T>(_p);
}
explicit operator bool() const {
explicit operator bool() const noexcept {
return _p;
}
bool owned() const {
bool owned() const noexcept {
return _p->_count == 1;
}
@@ -260,37 +260,37 @@ class shared_ptr {
mutable shared_ptr_count_base* _b = nullptr;
mutable T* _p = nullptr;
private:
explicit shared_ptr(shared_ptr_count_for<T>* b) : _b(b), _p(&b->data) {
explicit shared_ptr(shared_ptr_count_for<T>* b) noexcept : _b(b), _p(&b->data) {
++_b->count;
}
shared_ptr(shared_ptr_count_base* b, T* p) : _b(b), _p(p) {
shared_ptr(shared_ptr_count_base* b, T* p) noexcept : _b(b), _p(p) {
// test _p, not _b, since dynamic_pointer_cast<>() can zero p but not b
if (_p) {
++_b->count;
}
}
explicit shared_ptr(enable_shared_from_this<T>* p) : _b(p), _p(static_cast<T*>(p)) {
explicit shared_ptr(enable_shared_from_this<T>* p) noexcept : _b(p), _p(static_cast<T*>(p)) {
if (_b) {
++_b->count;
}
}
public:
shared_ptr() = default;
shared_ptr(const shared_ptr& x)
shared_ptr() noexcept = default;
shared_ptr(const shared_ptr& x) noexcept
: _b(x._b)
, _p(x._p) {
if (_b) {
++_b->count;
}
}
shared_ptr(shared_ptr&& x)
shared_ptr(shared_ptr&& x) noexcept
: _b(x._b)
, _p(x._p) {
x._b = nullptr;
x._p = nullptr;
}
template <typename U, typename = std::enable_if_t<std::is_base_of<T, U>::value>>
shared_ptr(const shared_ptr<U>& x)
shared_ptr(const shared_ptr<U>& x) noexcept
: _b(x._b)
, _p(x._p) {
if (_b) {
@@ -298,7 +298,7 @@ public:
}
}
template <typename U, typename = std::enable_if_t<std::is_base_of<T, U>::value>>
shared_ptr(shared_ptr<U>&& x)
shared_ptr(shared_ptr<U>&& x) noexcept
: _b(x._b)
, _p(x._p) {
x._b = nullptr;
@@ -309,14 +309,14 @@ public:
delete _b;
}
}
shared_ptr& operator=(const shared_ptr& x) {
shared_ptr& operator=(const shared_ptr& x) noexcept {
if (this != &x) {
this->~shared_ptr();
new (this) shared_ptr(x);
}
return *this;
}
shared_ptr& operator=(shared_ptr&& x) {
shared_ptr& operator=(shared_ptr&& x) noexcept {
if (this != &x) {
this->~shared_ptr();
new (this) shared_ptr(std::move(x));
@@ -324,7 +324,7 @@ public:
return *this;
}
template <typename U, typename = std::enable_if_t<std::is_base_of<T, U>::value>>
shared_ptr& operator=(const shared_ptr<U>& x) {
shared_ptr& operator=(const shared_ptr<U>& x) noexcept {
if (this != &x) {
this->~shared_ptr();
new (this) shared_ptr(x);
@@ -332,23 +332,23 @@ public:
return *this;
}
template <typename U, typename = std::enable_if_t<std::is_base_of<T, U>::value>>
shared_ptr& operator=(shared_ptr<U>&& x) {
shared_ptr& operator=(shared_ptr<U>&& x) noexcept {
if (this != &x) {
this->~shared_ptr();
new (this) shared_ptr(std::move(x));
}
return *this;
}
explicit operator bool() const {
explicit operator bool() const noexcept {
return _p;
}
T& operator*() const{
T& operator*() const noexcept {
return *_p;
}
T* operator->() const {
T* operator->() const noexcept {
return _p;
}
T* get() const {
T* get() const noexcept {
return _p;
}

View File

@@ -17,12 +17,12 @@
#include <experimental/string_view>
#include "core/temporary_buffer.hh"
template <typename char_type, typename size_type, size_type max_size>
template <typename char_type, typename Size, Size max_size>
class basic_sstring {
union contents {
struct external_type {
char_type* str;
size_type size;
Size size;
int8_t pad;
} external;
struct internal_type {
@@ -44,6 +44,19 @@ class basic_sstring {
char_type* str() {
return is_internal() ? u.internal.str : u.external.str;
}
public:
using value_type = char_type;
using traits_type = std::char_traits<char_type>;
using allocator_type = std::allocator<char_type>;
using reference = char_type&;
using const_reference = const char_type&;
using pointer = char_type*;
using const_pointer = const char_type*;
using iterator = char_type*;
using const_iterator = const char_type*;
// FIXME: add reverse_iterator and friend
using difference_type = ssize_t; // std::make_signed_t<Size> can be too small
using size_type = Size;
public:
struct initialized_later {};

View File

@@ -101,7 +101,9 @@ private:
void send(l2addr to, packet p);
public:
future<> send_query(const l3addr& paddr);
explicit arp_for(arp& a) : arp_for_protocol(a, L3::arp_protocol_type()) {}
explicit arp_for(arp& a) : arp_for_protocol(a, L3::arp_protocol_type()) {
_table[L3::broadcast_address()] = ethernet::broadcast_address();
}
future<ethernet_address> lookup(const l3addr& addr);
void learn(l2addr l2, l3addr l3);
void run();

View File

@@ -155,14 +155,11 @@ public:
} __attribute__((packed));
struct dhcp_packet_base {
ip_hdr ip;
udp_hdr udp;
dhcp_payload dhp;
template <typename Adjuster>
auto adjust_endianness(Adjuster a) {
return a(ip, udp, dhp);
return a(dhp);
}
} __attribute__((packed));
@@ -255,7 +252,9 @@ public:
impl(ipv4 & stack)
: _stack(stack)
{}
{
_sock = _stack.get_udp().make_channel({0, client_port});
}
future<> process_packet(packet p, dhcp_payload* dhp, size_t opt_off) {
_retry_timer.cancel();
@@ -350,30 +349,7 @@ public:
}
template<typename T>
future<> build_ip_headers_and_send(T && pkt) {
auto size = sizeof(pkt);
auto & ip = pkt.ip;
ip.ihl = sizeof(ip) / 4;
ip.ver = 4;
ip.dscp = 0;
ip.ecn = 0;
ip.len = uint16_t(size);
ip.id = 0;
ip.frag = 0;
ip.ttl = 64;
ip.csum = 0;
ip.ip_proto = uint8_t(ip_protocol_num::udp);
ip.dst_ip = ipv4_address(0xffffffff);
auto & udp = pkt.udp;
udp.src_port = client_port;
udp.dst_port = server_port;
udp.len = uint16_t(size - sizeof(ip));
udp.cksum = 0; // TODO etc.
future<> send(T && pkt) {
pkt.dhp.bootp.xid = _xid;
auto ipf = _stack.netif();
auto mac = ipf->hw_address().mac;
@@ -381,13 +357,8 @@ public:
pkt = hton(pkt);
checksummer csum;
csum.sum(reinterpret_cast<char*>(&ip), sizeof(ip));
ip.csum = csum.get();
_sock.send({0xffffffff, server_port}, packet(reinterpret_cast<char *>(&pkt), sizeof(pkt)));
packet p(reinterpret_cast<char *>(&pkt), sizeof(pkt));
_stack.send_raw(ethernet::broadcast_address(), std::move(p));
return make_ready_future<>();
}
@@ -411,7 +382,7 @@ public:
_xid = xid_dist(e1);
_state = state::DISCOVER;
return build_ip_headers_and_send(d);
return send(d);
}
future<> send_request(const lease & info) {
@@ -430,7 +401,7 @@ public:
log() << "sending request for " << info.ip << std::endl;
_state = state::REQUEST;
return build_ip_headers_and_send(d);
return send(d);
}
private:
@@ -439,6 +410,7 @@ private:
timer<> _timer;
timer<> _retry_timer;
ipv4 & _stack;
udp_channel _sock;
uint32_t _xid = 0;
};

View File

@@ -181,6 +181,7 @@ public:
virtual future<> link_ready() { return _link_ready_promise.get_future(); }
virtual std::unique_ptr<qp> init_local_queue(boost::program_options::variables_map opts, uint16_t qid) override;
virtual unsigned hash2qid(uint32_t hash) override {
assert(_redir_table.size());
return _redir_table[hash & (_redir_table.size() - 1)];
}
uint8_t port_idx() { return _port_idx; }
@@ -301,17 +302,16 @@ int dpdk_device::init_port_start()
// This comes from the ETH_RSS_RETA_NUM_ENTRIES being 128
_rss_table_bits = 7;
#else
// Check that the returned RETA size is sane:
// greater than 0 and is a power of 2.
assert(_dev_info.reta_size &&
(_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
// Set the RSS table to the correct size
_redir_table.resize(_dev_info.reta_size);
_rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
printf("Port %d: RSS table size is %d\n",
_port_idx, _dev_info.reta_size);
if (_dev_info.reta_size) {
// RETA size should be a power of 2
assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
// Set the RSS table to the correct size
_redir_table.resize(_dev_info.reta_size);
_rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
printf("Port %d: RSS table size is %d\n",
_port_idx, _dev_info.reta_size);
}
#endif
}
@@ -709,7 +709,8 @@ void dpdk_device::get_rss_table()
#else
void dpdk_device::get_rss_table()
{
assert(_dev_info.reta_size);
if (_dev_info.reta_size == 0)
return;
int i, reta_conf_size =
std::max(1, _dev_info.reta_size / RTE_RETA_GROUP_SIZE);

View File

@@ -28,6 +28,9 @@ ipv4::ipv4(interface* netif)
: _netif(netif)
, _global_arp(netif)
, _arp(_global_arp)
, _host_address(0)
, _gw_address(0)
, _netmask(0)
, _l3(netif, eth_protocol_num::ipv4, [this] { return get_packet(); })
, _rx_packets(_l3.receive([this] (packet p, ethernet_address ea) {
return handle_received_packet(std::move(p), ea); },
@@ -202,10 +205,10 @@ future<ethernet_address> ipv4::get_l2_dst_address(ipv4_address to) {
return _arp.lookup(dst);
}
void ipv4::send(ipv4_address to, ip_protocol_num proto_num, packet p, l4send_completion complete, std::experimental::optional<ethernet_address> e_dst) {
void ipv4::send(ipv4_address to, ip_protocol_num proto_num, packet p, ethernet_address e_dst) {
auto needs_frag = this->needs_frag(p, proto_num, hw_features());
auto send_pkt = [this, to, proto_num, needs_frag, complete = std::move(complete), e_dst = std::move(e_dst)] (packet& pkt, uint16_t remaining, uint16_t offset) mutable {
auto send_pkt = [this, to, proto_num, needs_frag, e_dst] (packet& pkt, uint16_t remaining, uint16_t offset) mutable {
auto iph = pkt.prepend_header<ip_hdr>();
iph->ihl = sizeof(*iph) / 4;
iph->ver = 4;
@@ -238,14 +241,7 @@ void ipv4::send(ipv4_address to, ip_protocol_num proto_num, packet p, l4send_com
iph->csum = csum.get();
}
auto&& send_complete = remaining ? l4send_completion() : std::move(complete);
if (!e_dst) {
get_l2_dst_address(to).then([this, pkt = std::move(pkt), send_complete = std::move(send_complete)] (ethernet_address e_dst) mutable {
send_raw(e_dst, std::move(pkt), std::move(send_complete));
});
} else {
send_raw(e_dst.value(), std::move(pkt), std::move(send_complete));
}
_packetq.push_back(l3_protocol::l3packet{eth_protocol_num::ipv4, e_dst, std::move(pkt)});
};
if (needs_frag) {
@@ -266,29 +262,27 @@ void ipv4::send(ipv4_address to, ip_protocol_num proto_num, packet p, l4send_com
}
}
void ipv4::send_raw(ethernet_address dst, packet p, l4send_completion complete) {
_packetq.push_back(ipv4packet{l3_protocol::l3packet{eth_protocol_num::ipv4, dst, std::move(p)}, std::move(complete)});
}
std::experimental::optional<l3_protocol::l3packet> ipv4::get_packet() {
for (size_t i = 0; i < _pkt_providers.size(); i++) {
auto l4p = _pkt_providers[_pkt_provider_idx++]();
if (_pkt_provider_idx == _pkt_providers.size()) {
_pkt_provider_idx = 0;
}
if (l4p) {
auto l4pv = std::move(l4p.value());
send(l4pv.to, l4pv.proto_num, std::move(l4pv.p), l4send_completion(), l4pv.e_dst);
break;
// _packetq will be mostly empty here unless it hold remnants of previously
// fragmented packet
if (_packetq.empty()) {
for (size_t i = 0; i < _pkt_providers.size(); i++) {
auto l4p = _pkt_providers[_pkt_provider_idx++]();
if (_pkt_provider_idx == _pkt_providers.size()) {
_pkt_provider_idx = 0;
}
if (l4p) {
auto l4pv = std::move(l4p.value());
send(l4pv.to, l4pv.proto_num, std::move(l4pv.p), l4pv.e_dst);
break;
}
}
}
std::experimental::optional<l3_protocol::l3packet> p;
if (!_packetq.empty()) {
auto ipv4p = std::move(_packetq.front());
p = std::move(_packetq.front());
_packetq.pop_front();
p = std::move(ipv4p.l3packet);
ipv4p.complete();
}
return p;
}
@@ -442,7 +436,12 @@ void icmp::received(packet p, ipaddr from, ipaddr to) {
checksummer csum;
csum.sum(reinterpret_cast<char*>(hdr), p.len());
hdr->csum = csum.get();
_inet.send(to, from, std::move(p));
if (_queue_space.try_wait(p.len())) { // drop packets that do not fit the queue
_inet.get_l2_dst_address(from).then([this, from, p = std::move(p)] (ethernet_address e_dst) mutable {
_packetq.emplace_back(ipv4_traits::l4packet{from, std::move(p), e_dst, ip_protocol_num::icmp});
});
}
}
}

View File

@@ -99,7 +99,6 @@ public:
ipv4& _inet;
public:
ipv4_l4(ipv4& inet) : _inet(inet) {}
void send(ipv4_address from, ipv4_address to, packet p);
void register_packet_provider(ipv4_traits::packet_provider_type func);
future<ethernet_address> get_l2_dst_address(ipv4_address to);
};
@@ -139,21 +138,6 @@ struct l4connid {
}
};
class l4send_completion {
lw_shared_ptr<semaphore> _stream;
size_t _len = 0;
public:
l4send_completion() = default;
l4send_completion(lw_shared_ptr<semaphore> s, size_t l) : _stream(std::move(s)), _len(l) {}
l4send_completion(l4send_completion&) = delete;
l4send_completion(l4send_completion&& v) : _stream(std::move(v._stream)), _len(v._len) {}
void operator()() {
if (_len) {
_stream->signal(_len);
}
}
};
class ipv4_tcp final : public ip_protocol {
ipv4_l4<ip_protocol_num::tcp> _inet_l4;
std::unique_ptr<tcp<ipv4_traits>> _tcp;
@@ -185,10 +169,22 @@ class icmp {
public:
using ipaddr = ipv4_address;
using inet_type = ipv4_l4<ip_protocol_num::icmp>;
explicit icmp(inet_type& inet) : _inet(inet) {}
explicit icmp(inet_type& inet) : _inet(inet) {
_inet.register_packet_provider([this] {
std::experimental::optional<ipv4_traits::l4packet> l4p;
if (!_packetq.empty()) {
l4p = std::move(_packetq.front());
_packetq.pop_front();
_queue_space.signal(l4p.value().p.len());
}
return l4p;
});
}
void received(packet p, ipaddr from, ipaddr to);
private:
inet_type& _inet;
circular_buffer<ipv4_traits::l4packet> _packetq;
semaphore _queue_space = {212992};
};
class ipv4_icmp final : public ip_protocol {
@@ -214,6 +210,7 @@ private:
std::unordered_map<uint16_t, lw_shared_ptr<udp_channel_state>> _channels;
int _queue_size = default_queue_size;
uint16_t _next_anonymous_port = min_anonymous_port;
circular_buffer<std::tuple<ipv4_traits::l4packet, lw_shared_ptr<udp_channel_state>, size_t>> _packetq;
private:
uint16_t next_port(uint16_t port);
public:
@@ -236,7 +233,7 @@ public:
ipv4_udp(ipv4& inet);
udp_channel make_channel(ipv4_addr addr);
virtual void received(packet p, ipv4_address from, ipv4_address to) override;
void send(uint16_t src_port, ipv4_addr dst, packet &&p, l4send_completion completion);
void send(uint16_t src_port, ipv4_addr dst, packet &&p, lw_shared_ptr<udp_channel_state> channel);
bool forward(forward_hash& out_hash_data, packet& p, size_t off) override;
void set_queue_size(int size) { _queue_size = size; }
};
@@ -316,13 +313,7 @@ private:
static constexpr uint32_t _frag_high_thresh{4 * 1024 * 1024};
uint32_t _frag_mem{0};
timer<lowres_clock> _frag_timer;
struct ipv4packet {
l3_protocol::l3packet l3packet;
l4send_completion complete;
ipv4packet(ipv4packet&& v) noexcept : l3packet(std::move(v.l3packet)), complete(std::move(v.complete)) {}
ipv4packet(l3_protocol::l3packet&& p, l4send_completion&& c) : l3packet(std::move(p)), complete(std::move(c)) {}
};
circular_buffer<ipv4packet> _packetq;
circular_buffer<l3_protocol::l3packet> _packetq;
unsigned _pkt_provider_idx = 0;
private:
future<> handle_received_packet(packet p, ethernet_address from);
@@ -356,9 +347,7 @@ public:
// But for now, a simple single raw pointer suffices
void set_packet_filter(ip_packet_filter *);
ip_packet_filter * packet_filter() const;
void send(ipv4_address to, ip_protocol_num proto_num, packet p, l4send_completion complete = l4send_completion(),
std::experimental::optional<ethernet_address> e_dst = std::experimental::optional<ethernet_address>());
void send_raw(ethernet_address, packet, l4send_completion completion = l4send_completion());
void send(ipv4_address to, ip_protocol_num proto_num, packet p, ethernet_address e_dst);
tcp<ipv4_traits>& get_tcp() { return *_tcp._tcp; }
ipv4_udp& get_udp() { return _udp; }
void register_l4(proto_type id, ip_protocol* handler);
@@ -373,12 +362,6 @@ public:
future<ethernet_address> get_l2_dst_address(ipv4_address to);
};
template <ip_protocol_num ProtoNum>
inline
void ipv4_l4<ProtoNum>::send(ipv4_address from, ipv4_address to, packet p) {
_inet.send(/* from, */ to, ProtoNum, std::move(p));
}
template <ip_protocol_num ProtoNum>
inline
void ipv4_l4<ProtoNum>::register_packet_provider(ipv4_traits::packet_provider_type func) {

View File

@@ -165,13 +165,15 @@ add_native_net_options_description(boost::program_options::options_description &
native_network_stack::native_network_stack(boost::program_options::variables_map opts, std::shared_ptr<device> dev)
: _netif(std::move(dev))
, _inet(&_netif) {
_inet.set_host_address(ipv4_address(opts["host-ipv4-addr"].as<std::string>()));
_inet.set_gw_address(ipv4_address(opts["gw-ipv4-addr"].as<std::string>()));
_inet.set_netmask_address(ipv4_address(opts["netmask-ipv4-addr"].as<std::string>()));
_inet.get_udp().set_queue_size(opts["udpv4-queue-size"].as<int>());
_dhcp = opts["host-ipv4-addr"].defaulted()
&& opts["gw-ipv4-addr"].defaulted()
&& opts["netmask-ipv4-addr"].defaulted() && opts["dhcp"].as<bool>();
if (!_dhcp) {
_inet.set_host_address(ipv4_address(_dhcp ? 0 : opts["host-ipv4-addr"].as<std::string>()));
_inet.set_gw_address(ipv4_address(opts["gw-ipv4-addr"].as<std::string>()));
_inet.set_netmask_address(ipv4_address(opts["netmask-ipv4-addr"].as<std::string>()));
}
}
server_socket

View File

@@ -317,6 +317,9 @@ private:
std::default_random_engine _e;
std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs;
// queue for packets that do not belong to any tcb
circular_buffer<ipv4_traits::l4packet> _packetq;
semaphore _queue_space = {212992};
public:
class connection {
lw_shared_ptr<tcb> _tcb;
@@ -390,18 +393,25 @@ private:
template <typename InetTraits>
tcp<InetTraits>::tcp(inet_type& inet) : _inet(inet), _e(_rd()) {
_inet.register_packet_provider([this] {
_inet.register_packet_provider([this, tcb_polled = 0u] () mutable {
std::experimental::optional<typename InetTraits::l4packet> l4p;
auto c = _poll_tcbs.size();
while (c--) {
lw_shared_ptr<tcb> tcb;
ethernet_address dst;
std::tie(tcb, dst) = std::move(_poll_tcbs.front());
_poll_tcbs.pop_front();
l4p = tcb->get_packet();
if (l4p) {
l4p.value().e_dst = dst;
break;
if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
l4p = std::move(_packetq.front());
_packetq.pop_front();
_queue_space.signal(l4p.value().p.len());
} else {
while (c--) {
tcb_polled++;
lw_shared_ptr<tcb> tcb;
ethernet_address dst;
std::tie(tcb, dst) = std::move(_poll_tcbs.front());
_poll_tcbs.pop_front();
l4p = tcb->get_packet();
if (l4p) {
l4p.value().e_dst = dst;
break;
}
}
}
return l4p;
@@ -496,7 +506,11 @@ void tcp<InetTraits>::received(packet p, ipaddr from, ipaddr to) {
template <typename InetTraits>
void tcp<InetTraits>::send(ipaddr from, ipaddr to, packet p) {
_inet.send(from, to, std::move(p));
if (_queue_space.try_wait(p.len())) { // drop packets that do not fit the queue
_inet.get_l2_dst_address(to).then([this, to, p = std::move(p)] (ethernet_address e_dst) mutable {
_packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
});
}
}
template <typename InetTraits>

View File

@@ -53,8 +53,6 @@ private:
ipv4_udp::registration _reg;
bool _closed;
lw_shared_ptr<udp_channel_state> _state;
// Limit number of data queued into send queue
lw_shared_ptr<semaphore> _user_queue_space;
public:
native_channel(ipv4_udp &proto, ipv4_udp::registration reg, lw_shared_ptr<udp_channel_state> state)
@@ -63,7 +61,6 @@ public:
, _closed(false)
, _state(state)
{
_user_queue_space = make_lw_shared<semaphore>(212992);
}
virtual future<udp_datagram> receive() override {
@@ -76,8 +73,8 @@ public:
virtual future<> send(ipv4_addr dst, packet p) override {
auto len = p.len();
return _user_queue_space->wait(len).then([this, dst, p = std::move(p), len] () mutable {
_proto.send(_reg.port(), dst, std::move(p), l4send_completion(_user_queue_space, len));
return _state->wait_for_send_buffer(len).then([this, dst, p = std::move(p), len] () mutable {
_proto.send(_reg.port(), dst, std::move(p), _state);
});
}
@@ -100,6 +97,19 @@ const int ipv4_udp::default_queue_size = 1024;
ipv4_udp::ipv4_udp(ipv4& inet)
: _inet(inet)
{
_inet.register_packet_provider([this] {
std::experimental::optional<ipv4_traits::l4packet> l4p;
if (!_packetq.empty()) {
ipv4_traits::l4packet p;
lw_shared_ptr<udp_channel_state> channel;
size_t len;
std::tie(p, channel, len) = std::move(_packetq.front());
_packetq.pop_front();
l4p = std::move(p);
channel->complete_send(len);
}
return l4p;
});
}
bool ipv4_udp::forward(forward_hash& out_hash_data, packet& p, size_t off)
@@ -124,8 +134,9 @@ void ipv4_udp::received(packet p, ipv4_address from, ipv4_address to)
}
}
void ipv4_udp::send(uint16_t src_port, ipv4_addr dst, packet &&p, l4send_completion completion)
void ipv4_udp::send(uint16_t src_port, ipv4_addr dst, packet &&p, lw_shared_ptr<udp_channel_state> channel)
{
size_t len = p.len();
auto src = _inet.host_address();
auto hdr = p.prepend_header<udp_hdr>();
hdr->src_port = src_port;
@@ -148,7 +159,9 @@ void ipv4_udp::send(uint16_t src_port, ipv4_addr dst, packet &&p, l4send_complet
oi.protocol = ip_protocol_num::udp;
p.set_offload_info(oi);
_inet.send(dst, ip_protocol_num::udp, std::move(p), std::move(completion));
_inet.get_l2_dst_address(dst).then([this, dst, p = std::move(p), channel = std::move(channel), len] (ethernet_address e_dst) mutable {
_packetq.emplace_back(std::make_tuple(ipv4_traits::l4packet{dst, std::move(p), e_dst, ip_protocol_num::udp}, std::move(channel), len));
});
}
uint16_t ipv4_udp::next_port(uint16_t port) {

View File

@@ -30,7 +30,11 @@ struct udp_hdr {
struct udp_channel_state {
queue<udp_datagram> _queue;
// Limit number of data queued into send queue
semaphore _user_queue_space = {212992};
udp_channel_state(size_t queue_size) : _queue(queue_size) {}
future<> wait_for_send_buffer(size_t len) { return _user_queue_space.wait(len); }
void complete_send(size_t len) { _user_queue_space.signal(len); }
};
}