mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
Merge branch 'asias/net' of github.com:cloudius-systems/seastar-dev
Net queue from Asias.
This commit is contained in:
@@ -30,6 +30,7 @@ libnet = [
|
||||
'net/stack.cc',
|
||||
'net/ip_checksum.cc',
|
||||
'net/udp.cc',
|
||||
'net/posix-stack.cc',
|
||||
]
|
||||
|
||||
core = [
|
||||
|
||||
242
core/reactor.cc
242
core/reactor.cc
@@ -115,61 +115,6 @@ reactor::listen(socket_address sa, listen_options opt) {
|
||||
return _network_stack->listen(sa, opt);
|
||||
}
|
||||
|
||||
class posix_connected_socket_impl final : public connected_socket_impl {
|
||||
pollable_fd _fd;
|
||||
private:
|
||||
explicit posix_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {}
|
||||
public:
|
||||
virtual input_stream<char> input() override { return input_stream<char>(posix_data_source(_fd)); }
|
||||
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192); }
|
||||
friend class posix_server_socket_impl;
|
||||
friend class posix_ap_server_socket_impl;
|
||||
};
|
||||
|
||||
future<connected_socket, socket_address>
|
||||
posix_server_socket_impl::accept() {
|
||||
return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) {
|
||||
static unsigned balance = 0;
|
||||
auto cpu = balance++ % smp::count;
|
||||
|
||||
if (cpu == engine._id) {
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
|
||||
return make_ready_future<connected_socket, socket_address>(
|
||||
connected_socket(std::move(csi)), sa);
|
||||
} else {
|
||||
smp::submit_to(cpu, [this, fd = std::move(fd.get_file_desc()), sa] () mutable {
|
||||
posix_ap_server_socket_impl::move_connected_socket(_sa, pollable_fd(std::move(fd)), sa);
|
||||
});
|
||||
return accept();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<connected_socket, socket_address> posix_ap_server_socket_impl::accept() {
|
||||
auto conni = conn_q.find(_sa.as_posix_sockaddr_in());
|
||||
if (conni != conn_q.end()) {
|
||||
connection c = std::move(conni->second);
|
||||
conn_q.erase(conni);
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(c.fd)));
|
||||
return make_ready_future<connected_socket, socket_address>(connected_socket(std::move(csi)), std::move(c.addr));
|
||||
} else {
|
||||
auto i = sockets.emplace(std::piecewise_construct, std::make_tuple(_sa.as_posix_sockaddr_in()), std::make_tuple());
|
||||
assert(i.second);
|
||||
return i.first->second.get_future();
|
||||
}
|
||||
}
|
||||
|
||||
void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) {
|
||||
auto i = sockets.find(sa.as_posix_sockaddr_in());
|
||||
if (i != sockets.end()) {
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
|
||||
i->second.set_value(connected_socket(std::move(csi)), std::move(addr));
|
||||
sockets.erase(i);
|
||||
} else {
|
||||
conn_q.emplace(std::piecewise_construct, std::make_tuple(sa.as_posix_sockaddr_in()), std::make_tuple(std::move(fd), std::move(addr)));
|
||||
}
|
||||
}
|
||||
|
||||
void reactor::complete_epoll_event(pollable_fd_state& pfd, promise<> pollable_fd_state::*pr,
|
||||
int events, int event) {
|
||||
if (pfd.events_requested & events & event) {
|
||||
@@ -497,191 +442,12 @@ void schedule(std::unique_ptr<task> t) {
|
||||
engine.add_task(std::move(t));
|
||||
}
|
||||
|
||||
data_source posix_data_source(pollable_fd& fd) {
|
||||
return data_source(std::make_unique<posix_data_source_impl>(fd));
|
||||
}
|
||||
|
||||
future<temporary_buffer<char>>
|
||||
posix_data_source_impl::get() {
|
||||
return _fd.read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
|
||||
_buf.trim(size);
|
||||
auto ret = std::move(_buf);
|
||||
_buf = temporary_buffer<char>(_buf_size);
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(ret));
|
||||
});
|
||||
}
|
||||
|
||||
data_sink posix_data_sink(pollable_fd& fd) {
|
||||
return data_sink(std::make_unique<posix_data_sink_impl>(fd));
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_data_sink_impl::put(std::vector<temporary_buffer<char>> data) {
|
||||
std::swap(data, _data);
|
||||
return do_write(0);
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_data_sink_impl::do_write(size_t idx) {
|
||||
// FIXME: use writev
|
||||
return _fd.write_all(_data[idx].get(), _data[idx].size()).then([this, idx] (size_t size) mutable {
|
||||
assert(size == _data[idx].size()); // FIXME: exception? short write?
|
||||
if (++idx == _data.size()) {
|
||||
_data.clear();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_write(idx);
|
||||
});
|
||||
}
|
||||
|
||||
server_socket
|
||||
posix_network_stack::listen(socket_address sa, listen_options opt) {
|
||||
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
|
||||
}
|
||||
|
||||
thread_local std::unordered_map<::sockaddr_in, promise<connected_socket, socket_address>> posix_ap_server_socket_impl::sockets;
|
||||
thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl::connection> posix_ap_server_socket_impl::conn_q;
|
||||
|
||||
namespace std {
|
||||
bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) {
|
||||
return (a.sin_addr.s_addr == b.sin_addr.s_addr) && (a.sin_port == b.sin_port);
|
||||
}
|
||||
};
|
||||
|
||||
server_socket
|
||||
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
|
||||
return server_socket(std::make_unique<posix_ap_server_socket_impl>(sa));
|
||||
}
|
||||
|
||||
struct cmsg_with_pktinfo {
|
||||
struct cmsghdrcmh;
|
||||
struct in_pktinfo pktinfo;
|
||||
};
|
||||
|
||||
std::vector<struct iovec> to_iovec(const packet& p) {
|
||||
std::vector<struct iovec> v;
|
||||
v.reserve(p.nr_frags());
|
||||
for (auto&& f : p.fragments()) {
|
||||
v.push_back({.iov_base = f.base, .iov_len = f.size});
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
class posix_udp_channel : public udp_channel_impl {
|
||||
private:
|
||||
static constexpr int MAX_DATAGRAM_SIZE = 65507;
|
||||
struct recv_ctx {
|
||||
struct msghdr _hdr;
|
||||
struct iovec _iov;
|
||||
socket_address _src_addr;
|
||||
char* _buffer;
|
||||
cmsg_with_pktinfo _cmsg;
|
||||
|
||||
recv_ctx() {
|
||||
memset(&_hdr, 0, sizeof(_hdr));
|
||||
_hdr.msg_iov = &_iov;
|
||||
_hdr.msg_iovlen = 1;
|
||||
_hdr.msg_name = &_src_addr.u.sa;
|
||||
_hdr.msg_namelen = sizeof(_src_addr.u.sas);
|
||||
memset(&_cmsg, 0, sizeof(_cmsg));
|
||||
_hdr.msg_control = &_cmsg;
|
||||
_hdr.msg_controllen = sizeof(_cmsg);
|
||||
}
|
||||
|
||||
void prepare() {
|
||||
_buffer = new char[MAX_DATAGRAM_SIZE];
|
||||
_iov.iov_base = _buffer;
|
||||
_iov.iov_len = MAX_DATAGRAM_SIZE;
|
||||
}
|
||||
};
|
||||
struct send_ctx {
|
||||
struct msghdr _hdr;
|
||||
std::vector<struct iovec> _iovecs;
|
||||
socket_address _dst;
|
||||
packet _p;
|
||||
|
||||
send_ctx() {
|
||||
memset(&_hdr, 0, sizeof(_hdr));
|
||||
_hdr.msg_name = &_dst.u.sa;
|
||||
_hdr.msg_namelen = sizeof(_dst.u.sas);
|
||||
}
|
||||
|
||||
void prepare(ipv4_addr dst, packet p) {
|
||||
_dst = make_ipv4_address(dst);
|
||||
_p = std::move(p);
|
||||
_iovecs = std::move(to_iovec(_p));
|
||||
_hdr.msg_iov = _iovecs.data();
|
||||
_hdr.msg_iovlen = _iovecs.size();
|
||||
}
|
||||
};
|
||||
std::unique_ptr<pollable_fd> _fd;
|
||||
ipv4_addr _address;
|
||||
recv_ctx _recv;
|
||||
send_ctx _send;
|
||||
bool _closed;
|
||||
public:
|
||||
posix_udp_channel(ipv4_addr bind_address)
|
||||
: _closed(false) {
|
||||
auto sa = make_ipv4_address(bind_address);
|
||||
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
|
||||
bool pktinfo_flag = true;
|
||||
::setsockopt(fd.get(), SOL_IP, IP_PKTINFO, &pktinfo_flag, sizeof(pktinfo_flag));
|
||||
fd.bind(sa.u.sa, sizeof(sa.u.sas));
|
||||
_address = ipv4_addr(fd.get_address());
|
||||
_fd = std::make_unique<pollable_fd>(std::move(fd));
|
||||
}
|
||||
virtual ~posix_udp_channel() {};
|
||||
virtual future<udp_datagram> receive() override;
|
||||
virtual future<> send(ipv4_addr dst, const char *msg);
|
||||
virtual future<> send(ipv4_addr dst, packet p);
|
||||
virtual void close() override {
|
||||
_closed = true;
|
||||
_fd.reset();
|
||||
}
|
||||
virtual bool is_closed() const override { return _closed; }
|
||||
};
|
||||
|
||||
future<> posix_udp_channel::send(ipv4_addr dst, const char *message) {
|
||||
auto len = strlen(message);
|
||||
return _fd->sendto(make_ipv4_address(dst), message, len)
|
||||
.then([len] (size_t size) { assert(size == len); });
|
||||
}
|
||||
|
||||
future<> posix_udp_channel::send(ipv4_addr dst, packet p) {
|
||||
auto len = p.len();
|
||||
_send.prepare(dst, std::move(p));
|
||||
return _fd->sendmsg(&_send._hdr)
|
||||
.then([len] (size_t size) { assert(size == len); });
|
||||
}
|
||||
|
||||
udp_channel
|
||||
posix_network_stack::make_udp_channel(ipv4_addr addr) {
|
||||
return udp_channel(std::make_unique<posix_udp_channel>(addr));
|
||||
}
|
||||
|
||||
class posix_datagram : public udp_datagram_impl {
|
||||
private:
|
||||
ipv4_addr _src;
|
||||
ipv4_addr _dst;
|
||||
packet _p;
|
||||
public:
|
||||
posix_datagram(ipv4_addr src, ipv4_addr dst, packet p) : _src(src), _dst(dst), _p(std::move(p)) {}
|
||||
virtual ipv4_addr get_src() override { return _src; }
|
||||
virtual ipv4_addr get_dst() override { return _dst; }
|
||||
virtual uint16_t get_dst_port() override { return _dst.port; }
|
||||
virtual packet& get_data() override { return _p; }
|
||||
};
|
||||
|
||||
future<udp_datagram>
|
||||
posix_udp_channel::receive() {
|
||||
_recv.prepare();
|
||||
return _fd->recvmsg(&_recv._hdr).then([this] (size_t size) {
|
||||
auto dst = ipv4_addr(_recv._cmsg.pktinfo.ipi_addr.s_addr, _address.port);
|
||||
return make_ready_future<udp_datagram>(udp_datagram(std::make_unique<posix_datagram>(
|
||||
_recv._src_addr, dst, packet(fragment{_recv._buffer, size}, [buf = _recv._buffer] { delete[] buf; }))));
|
||||
});
|
||||
}
|
||||
|
||||
void network_stack_registry::register_stack(sstring name,
|
||||
boost::program_options::options_description opts,
|
||||
std::function<std::unique_ptr<network_stack> (options opts)> create, bool make_default) {
|
||||
@@ -729,14 +495,6 @@ reactor::get_options_description() {
|
||||
return opts;
|
||||
}
|
||||
|
||||
network_stack_registrator nsr_posix{"posix",
|
||||
boost::program_options::options_description(),
|
||||
[](boost::program_options::variables_map ops) {
|
||||
return smp::main_thread() ? posix_network_stack::create(ops) : posix_ap_network_stack::create(ops);
|
||||
},
|
||||
true
|
||||
};
|
||||
|
||||
boost::program_options::options_description
|
||||
smp::get_options_description()
|
||||
{
|
||||
|
||||
@@ -164,13 +164,12 @@ public:
|
||||
future<size_t> sendmsg(struct msghdr *msg);
|
||||
future<size_t> recvmsg(struct msghdr *msg);
|
||||
future<size_t> sendto(socket_address addr, const void* buf, size_t len);
|
||||
file_desc& get_file_desc() const { return _s->fd; }
|
||||
protected:
|
||||
int get_fd() const { return _s->fd.get(); }
|
||||
file_desc& get_file_desc() const { return _s->fd; }
|
||||
friend class reactor;
|
||||
friend class readable_eventfd;
|
||||
friend class writeable_eventfd;
|
||||
friend class posix_server_socket_impl; // need to access get_file_desc
|
||||
};
|
||||
|
||||
class connected_socket_impl {
|
||||
@@ -195,13 +194,6 @@ public:
|
||||
virtual future<connected_socket, socket_address> accept() = 0;
|
||||
};
|
||||
|
||||
class posix_server_socket_impl : public server_socket_impl {
|
||||
socket_address _sa;
|
||||
pollable_fd _lfd;
|
||||
public:
|
||||
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
|
||||
virtual future<connected_socket, socket_address> accept();
|
||||
};
|
||||
|
||||
namespace std {
|
||||
|
||||
@@ -214,21 +206,6 @@ struct hash<::sockaddr_in> {
|
||||
bool operator==(const ::sockaddr_in a, const ::sockaddr_in b);
|
||||
}
|
||||
|
||||
class posix_ap_server_socket_impl : public server_socket_impl {
|
||||
struct connection {
|
||||
pollable_fd fd;
|
||||
socket_address addr;
|
||||
connection(pollable_fd xfd, socket_address xaddr) : fd(std::move(xfd)), addr(xaddr) {}
|
||||
};
|
||||
static thread_local std::unordered_map<::sockaddr_in, promise<connected_socket, socket_address>> sockets;
|
||||
static thread_local std::unordered_multimap<::sockaddr_in, connection> conn_q;
|
||||
socket_address _sa;
|
||||
public:
|
||||
explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {}
|
||||
virtual future<connected_socket, socket_address> accept();
|
||||
static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr);
|
||||
};
|
||||
|
||||
class server_socket {
|
||||
std::unique_ptr<server_socket_impl> _ssi;
|
||||
public:
|
||||
@@ -286,25 +263,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class posix_network_stack : public network_stack {
|
||||
public:
|
||||
posix_network_stack(boost::program_options::variables_map opts) {}
|
||||
virtual server_socket listen(socket_address sa, listen_options opts) override;
|
||||
virtual net::udp_channel make_udp_channel(ipv4_addr addr) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::unique_ptr<network_stack>(new posix_network_stack(opts));
|
||||
}
|
||||
};
|
||||
|
||||
class posix_ap_network_stack : public posix_network_stack {
|
||||
public:
|
||||
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)) {}
|
||||
virtual server_socket listen(socket_address sa, listen_options opts) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::unique_ptr<network_stack>(new posix_ap_network_stack(opts));
|
||||
}
|
||||
};
|
||||
|
||||
class writeable_eventfd;
|
||||
|
||||
class readable_eventfd {
|
||||
@@ -574,29 +532,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class posix_data_source_impl final : public data_source_impl {
|
||||
pollable_fd& _fd;
|
||||
temporary_buffer<char> _buf;
|
||||
size_t _buf_size;
|
||||
public:
|
||||
explicit posix_data_source_impl(pollable_fd& fd, size_t buf_size = 8192)
|
||||
: _fd(fd), _buf(buf_size), _buf_size(buf_size) {}
|
||||
virtual future<temporary_buffer<char>> get() override;
|
||||
};
|
||||
|
||||
class posix_data_sink_impl : public data_sink_impl {
|
||||
pollable_fd& _fd;
|
||||
std::vector<temporary_buffer<char>> _data;
|
||||
private:
|
||||
future<> do_write(size_t idx);
|
||||
public:
|
||||
explicit posix_data_sink_impl(pollable_fd& fd) : _fd(fd) {}
|
||||
future<> put(std::vector<temporary_buffer<char>> data) override;
|
||||
};
|
||||
|
||||
data_source posix_data_source(pollable_fd& fd);
|
||||
data_sink posix_data_sink(pollable_fd& fd);
|
||||
|
||||
template <typename CharType>
|
||||
class input_stream {
|
||||
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
|
||||
|
||||
@@ -15,7 +15,7 @@ arp_for_protocol::~arp_for_protocol() {
|
||||
_arp.del(_proto_num);
|
||||
}
|
||||
|
||||
arp::arp(interface* netif) : _netif(netif), _proto(netif, 0x0806)
|
||||
arp::arp(interface* netif) : _netif(netif), _proto(netif, eth_protocol_num::arp)
|
||||
, _rx_packets(_proto.receive([this] (packet p, ethernet_address ea) {
|
||||
return process_packet(std::move(p), ea);
|
||||
},
|
||||
|
||||
18
net/const.hh
Normal file
18
net/const.hh
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#ifndef CONST_HH_
|
||||
#define CONST_HH_
|
||||
namespace net {
|
||||
|
||||
enum class ip_protocol_num : uint8_t {
|
||||
icmp = 1, tcp = 6, udp = 17, unused = 255
|
||||
};
|
||||
|
||||
enum class eth_protocol_num : uint16_t {
|
||||
ipv4 = 0x0800, arp = 0x0806, ipv6 = 0x86dd
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
||||
12
net/ip.cc
12
net/ip.cc
@@ -21,14 +21,14 @@ ipv4::ipv4(interface* netif)
|
||||
: _netif(netif)
|
||||
, _global_arp(netif)
|
||||
, _arp(_global_arp)
|
||||
, _l3(netif, 0x0800)
|
||||
, _l3(netif, eth_protocol_num::ipv4)
|
||||
, _rx_packets(_l3.receive([this] (packet p, ethernet_address ea) {
|
||||
return handle_received_packet(std::move(p), ea); },
|
||||
[this] (packet& p, size_t off) {
|
||||
return handle_on_cpu(p, off);}))
|
||||
, _tcp(*this)
|
||||
, _icmp(*this)
|
||||
, _l4({ { 6, &_tcp }, { 1, &_icmp }}) {
|
||||
, _l4({ { uint8_t(ip_protocol_num::tcp), &_tcp }, { uint8_t(ip_protocol_num::icmp), &_icmp }}) {
|
||||
}
|
||||
|
||||
unsigned ipv4::handle_on_cpu(packet& p, size_t off)
|
||||
@@ -77,7 +77,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> ipv4::send(ipv4_address to, uint8_t proto_num, packet p) {
|
||||
future<> ipv4::send(ipv4_address to, ip_protocol_num proto_num, packet p) {
|
||||
// FIXME: fragment
|
||||
auto iph = p.prepend_header<ip_hdr>();
|
||||
iph->ihl = sizeof(*iph) / 4;
|
||||
@@ -88,7 +88,7 @@ future<> ipv4::send(ipv4_address to, uint8_t proto_num, packet p) {
|
||||
iph->id = 0;
|
||||
iph->frag = 0;
|
||||
iph->ttl = 64;
|
||||
iph->ip_proto = proto_num;
|
||||
iph->ip_proto = (uint8_t)proto_num;
|
||||
iph->csum = 0;
|
||||
iph->src_ip = _host_address;
|
||||
|
||||
@@ -115,6 +115,10 @@ void ipv4::set_host_address(ipv4_address ip) {
|
||||
_arp.set_self_addr(ip);
|
||||
}
|
||||
|
||||
ipv4_address ipv4::host_address() {
|
||||
return _host_address;
|
||||
}
|
||||
|
||||
void ipv4::set_gw_address(ipv4_address ip) {
|
||||
_gw_address = ip;
|
||||
}
|
||||
|
||||
27
net/ip.hh
27
net/ip.hh
@@ -15,11 +15,12 @@
|
||||
#include "arp.hh"
|
||||
#include "tcp.hh"
|
||||
#include "ip_checksum.hh"
|
||||
#include "const.hh"
|
||||
|
||||
namespace net {
|
||||
|
||||
class ipv4;
|
||||
template <uint8_t ProtoNum>
|
||||
template <ip_protocol_num ProtoNum>
|
||||
class ipv4_l4;
|
||||
struct ipv4_address;
|
||||
|
||||
@@ -65,13 +66,16 @@ namespace net {
|
||||
|
||||
struct ipv4_traits {
|
||||
using address_type = ipv4_address;
|
||||
using inet_type = ipv4_l4<6>;
|
||||
static void pseudo_header_checksum(checksummer& csum, ipv4_address src, ipv4_address dst, uint16_t len) {
|
||||
csum.sum_many(src.ip.raw, dst.ip.raw, uint8_t(0), uint8_t(6), len);
|
||||
using inet_type = ipv4_l4<ip_protocol_num::tcp>;
|
||||
static void tcp_pseudo_header_checksum(checksummer& csum, ipv4_address src, ipv4_address dst, uint16_t len) {
|
||||
csum.sum_many(src.ip.raw, dst.ip.raw, uint8_t(0), uint8_t(ip_protocol_num::tcp), len);
|
||||
}
|
||||
static void udp_pseudo_header_checksum(checksummer& csum, ipv4_address src, ipv4_address dst, uint16_t len) {
|
||||
csum.sum_many(src.ip.raw, dst.ip.raw, uint8_t(0), uint8_t(ip_protocol_num::udp), len);
|
||||
}
|
||||
};
|
||||
|
||||
template <uint8_t ProtoNum>
|
||||
template <ip_protocol_num ProtoNum>
|
||||
class ipv4_l4 {
|
||||
public:
|
||||
ipv4& _inet;
|
||||
@@ -88,7 +92,7 @@ public:
|
||||
};
|
||||
|
||||
class ipv4_tcp final : public ip_protocol {
|
||||
ipv4_l4<6> _inet_l4;
|
||||
ipv4_l4<ip_protocol_num::tcp> _inet_l4;
|
||||
tcp<ipv4_traits> _tcp;
|
||||
public:
|
||||
ipv4_tcp(ipv4& inet) : _inet_l4(inet), _tcp(_inet_l4) {}
|
||||
@@ -120,7 +124,7 @@ struct icmp_hdr {
|
||||
class icmp {
|
||||
public:
|
||||
using ipaddr = ipv4_address;
|
||||
using inet_type = ipv4_l4<1>;
|
||||
using inet_type = ipv4_l4<ip_protocol_num::icmp>;
|
||||
explicit icmp(inet_type& inet) : _inet(inet) {}
|
||||
void received(packet p, ipaddr from, ipaddr to);
|
||||
private:
|
||||
@@ -128,7 +132,7 @@ private:
|
||||
};
|
||||
|
||||
class ipv4_icmp final : public ip_protocol {
|
||||
ipv4_l4<1> _inet_l4;
|
||||
ipv4_l4<ip_protocol_num::icmp> _inet_l4;
|
||||
icmp _icmp;
|
||||
public:
|
||||
ipv4_icmp(ipv4& inet) : _inet_l4(inet), _icmp(_inet_l4) {}
|
||||
@@ -143,7 +147,7 @@ public:
|
||||
using address_type = ipv4_address;
|
||||
using proto_type = uint16_t;
|
||||
static address_type broadcast_address() { return ipv4_address(0xffffffff); }
|
||||
static proto_type arp_protocol_type() { return 0x0800; }
|
||||
static proto_type arp_protocol_type() { return proto_type(eth_protocol_num::ipv4); }
|
||||
private:
|
||||
interface* _netif;
|
||||
arp _global_arp;
|
||||
@@ -163,15 +167,16 @@ private:
|
||||
public:
|
||||
explicit ipv4(interface* netif);
|
||||
void set_host_address(ipv4_address ip);
|
||||
ipv4_address host_address();
|
||||
void set_gw_address(ipv4_address ip);
|
||||
void set_netmask_address(ipv4_address ip);
|
||||
future<> send(ipv4_address to, uint8_t proto_num, packet p);
|
||||
future<> send(ipv4_address to, ip_protocol_num proto_num, packet p);
|
||||
tcp<ipv4_traits>& get_tcp() { return _tcp._tcp; }
|
||||
void register_l4(proto_type id, ip_protocol* handler);
|
||||
net::hw_features hw_features() { return _netif->hw_features(); }
|
||||
};
|
||||
|
||||
template <uint8_t ProtoNum>
|
||||
template <ip_protocol_num ProtoNum>
|
||||
inline
|
||||
future<> ipv4_l4<ProtoNum>::send(ipv4_address from, ipv4_address to, packet p) {
|
||||
return _inet.send(/* from, */ to, ProtoNum, std::move(p));
|
||||
|
||||
10
net/net.cc
10
net/net.cc
@@ -12,7 +12,7 @@ namespace net {
|
||||
|
||||
__thread device *dev;
|
||||
|
||||
l3_protocol::l3_protocol(interface* netif, uint16_t proto_num)
|
||||
l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num)
|
||||
: _netif(netif), _proto_num(proto_num) {
|
||||
}
|
||||
|
||||
@@ -34,10 +34,10 @@ interface::interface(std::unique_ptr<device> dev)
|
||||
}
|
||||
|
||||
subscription<packet, ethernet_address>
|
||||
interface::register_l3(uint16_t proto_num,
|
||||
interface::register_l3(eth_protocol_num proto_num,
|
||||
std::function<future<> (packet p, ethernet_address from)> next,
|
||||
std::function<unsigned (packet&, size_t)> forward) {
|
||||
auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(proto_num), std::forward_as_tuple(std::move(forward)));
|
||||
auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
|
||||
assert(i.second);
|
||||
l3_rx_stream& l3_rx = i.first->second;
|
||||
return l3_rx.packet_stream.listen(std::move(next));
|
||||
@@ -76,11 +76,11 @@ future<> interface::dispatch_packet(packet p) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> interface::send(uint16_t proto_num, ethernet_address to, packet p) {
|
||||
future<> interface::send(eth_protocol_num proto_num, ethernet_address to, packet p) {
|
||||
auto eh = p.prepend_header<eth_hdr>();
|
||||
eh->dst_mac = to;
|
||||
eh->src_mac = _hw_address;
|
||||
eh->eth_proto = proto_num;
|
||||
eh->eth_proto = uint16_t(proto_num);
|
||||
hton(*eh);
|
||||
return _dev->send(std::move(p));
|
||||
}
|
||||
|
||||
@@ -37,9 +37,9 @@ struct hw_features {
|
||||
|
||||
class l3_protocol {
|
||||
interface* _netif;
|
||||
uint16_t _proto_num;
|
||||
eth_protocol_num _proto_num;
|
||||
public:
|
||||
explicit l3_protocol(interface* netif, uint16_t proto_num);
|
||||
explicit l3_protocol(interface* netif, eth_protocol_num proto_num);
|
||||
subscription<packet, ethernet_address> receive(
|
||||
std::function<future<> (packet, ethernet_address)> rx_fn,
|
||||
std::function<unsigned (packet&, size_t)> forward);
|
||||
@@ -62,12 +62,12 @@ class interface {
|
||||
net::hw_features _hw_features;
|
||||
private:
|
||||
future<> dispatch_packet(packet p);
|
||||
future<> send(uint16_t proto_num, ethernet_address to, packet p);
|
||||
future<> send(eth_protocol_num proto_num, ethernet_address to, packet p);
|
||||
public:
|
||||
explicit interface(std::unique_ptr<device> dev);
|
||||
ethernet_address hw_address() { return _hw_address; }
|
||||
net::hw_features hw_features() { return _hw_features; }
|
||||
subscription<packet, ethernet_address> register_l3(uint16_t proto_num,
|
||||
subscription<packet, ethernet_address> register_l3(eth_protocol_num proto_num,
|
||||
std::function<future<> (packet p, ethernet_address from)> next,
|
||||
std::function<unsigned (packet&, size_t)> forward);
|
||||
friend class l3_protocol;
|
||||
|
||||
@@ -6,8 +6,10 @@
|
||||
#define PACKET_HH_
|
||||
|
||||
#include "core/deleter.hh"
|
||||
#include "const.hh"
|
||||
#include <vector>
|
||||
#include <cassert>
|
||||
#include <algorithm>
|
||||
|
||||
namespace net {
|
||||
|
||||
@@ -17,10 +19,10 @@ struct fragment {
|
||||
};
|
||||
|
||||
struct offload_info {
|
||||
enum class protocol_type : uint8_t { tcp = 6, udp = 17, unused = 255 };
|
||||
protocol_type protocol = protocol_type::unused;
|
||||
ip_protocol_num protocol = ip_protocol_num::unused;
|
||||
uint8_t ip_hdr_len = 20;
|
||||
uint8_t tcp_hdr_len = 20;
|
||||
uint8_t udp_hdr_len = 8;
|
||||
};
|
||||
|
||||
// Zero-copy friendly packet class
|
||||
@@ -170,6 +172,9 @@ public:
|
||||
// zero-copy multiple fragment
|
||||
template <typename Deleter>
|
||||
packet(std::vector<fragment> frag, Deleter deleter);
|
||||
// build packet with iterator
|
||||
template <typename Iterator, typename Deleter>
|
||||
packet(Iterator begin, Iterator end, Deleter del);
|
||||
// append fragment (copying new fragment)
|
||||
packet(packet&& x, fragment frag);
|
||||
// prepend fragment (copying new fragment, with header optimization)
|
||||
@@ -299,6 +304,20 @@ packet::packet(std::vector<fragment> frag, Deleter d)
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Iterator, typename Deleter>
|
||||
inline
|
||||
packet::packet(Iterator begin, Iterator end, Deleter del) {
|
||||
unsigned nr_frags = 0, len = 0;
|
||||
nr_frags = std::distance(begin, end);
|
||||
std::for_each(begin, end, [&] (fragment& frag) { len += frag.size; });
|
||||
_impl = impl::allocate(nr_frags);
|
||||
_impl->_deleter = make_deleter(deleter(), std::move(del));
|
||||
_impl->_len = len;
|
||||
_impl->_nr_frags = nr_frags;
|
||||
std::copy(begin, end, _impl->_frags);
|
||||
}
|
||||
|
||||
|
||||
inline
|
||||
packet::packet(packet&& x, fragment frag)
|
||||
: _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
|
||||
|
||||
251
net/posix-stack.cc
Normal file
251
net/posix-stack.cc
Normal file
@@ -0,0 +1,251 @@
|
||||
#include "stack.hh"
|
||||
#include "net.hh"
|
||||
#include "packet.hh"
|
||||
#include "api.hh"
|
||||
#include "posix-stack.hh"
|
||||
|
||||
namespace net {
|
||||
|
||||
class posix_connected_socket_impl final : public connected_socket_impl {
|
||||
pollable_fd _fd;
|
||||
private:
|
||||
explicit posix_connected_socket_impl(pollable_fd fd) : _fd(std::move(fd)) {}
|
||||
public:
|
||||
virtual input_stream<char> input() override { return input_stream<char>(posix_data_source(_fd)); }
|
||||
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192); }
|
||||
friend class posix_server_socket_impl;
|
||||
friend class posix_ap_server_socket_impl;
|
||||
};
|
||||
|
||||
future<connected_socket, socket_address>
|
||||
posix_server_socket_impl::accept() {
|
||||
return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) {
|
||||
static unsigned balance = 0;
|
||||
auto cpu = balance++ % smp::count;
|
||||
|
||||
if (cpu == engine._id) {
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
|
||||
return make_ready_future<connected_socket, socket_address>(
|
||||
connected_socket(std::move(csi)), sa);
|
||||
} else {
|
||||
smp::submit_to(cpu, [this, fd = std::move(fd.get_file_desc()), sa] () mutable {
|
||||
posix_ap_server_socket_impl::move_connected_socket(_sa, pollable_fd(std::move(fd)), sa);
|
||||
});
|
||||
return accept();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<connected_socket, socket_address> posix_ap_server_socket_impl::accept() {
|
||||
auto conni = conn_q.find(_sa.as_posix_sockaddr_in());
|
||||
if (conni != conn_q.end()) {
|
||||
connection c = std::move(conni->second);
|
||||
conn_q.erase(conni);
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(c.fd)));
|
||||
return make_ready_future<connected_socket, socket_address>(connected_socket(std::move(csi)), std::move(c.addr));
|
||||
} else {
|
||||
auto i = sockets.emplace(std::piecewise_construct, std::make_tuple(_sa.as_posix_sockaddr_in()), std::make_tuple());
|
||||
assert(i.second);
|
||||
return i.first->second.get_future();
|
||||
}
|
||||
}
|
||||
|
||||
void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) {
|
||||
auto i = sockets.find(sa.as_posix_sockaddr_in());
|
||||
if (i != sockets.end()) {
|
||||
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
|
||||
i->second.set_value(connected_socket(std::move(csi)), std::move(addr));
|
||||
sockets.erase(i);
|
||||
} else {
|
||||
conn_q.emplace(std::piecewise_construct, std::make_tuple(sa.as_posix_sockaddr_in()), std::make_tuple(std::move(fd), std::move(addr)));
|
||||
}
|
||||
}
|
||||
|
||||
data_source posix_data_source(pollable_fd& fd) {
|
||||
return data_source(std::make_unique<posix_data_source_impl>(fd));
|
||||
}
|
||||
|
||||
future<temporary_buffer<char>>
|
||||
posix_data_source_impl::get() {
|
||||
return _fd.read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
|
||||
_buf.trim(size);
|
||||
auto ret = std::move(_buf);
|
||||
_buf = temporary_buffer<char>(_buf_size);
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(ret));
|
||||
});
|
||||
}
|
||||
|
||||
data_sink posix_data_sink(pollable_fd& fd) {
|
||||
return data_sink(std::make_unique<posix_data_sink_impl>(fd));
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_data_sink_impl::put(std::vector<temporary_buffer<char>> data) {
|
||||
std::swap(data, _data);
|
||||
return do_write(0);
|
||||
}
|
||||
|
||||
future<>
|
||||
posix_data_sink_impl::do_write(size_t idx) {
|
||||
// FIXME: use writev
|
||||
return _fd.write_all(_data[idx].get(), _data[idx].size()).then([this, idx] (size_t size) mutable {
|
||||
assert(size == _data[idx].size()); // FIXME: exception? short write?
|
||||
if (++idx == _data.size()) {
|
||||
_data.clear();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_write(idx);
|
||||
});
|
||||
}
|
||||
|
||||
server_socket
|
||||
posix_network_stack::listen(socket_address sa, listen_options opt) {
|
||||
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
|
||||
}
|
||||
|
||||
thread_local std::unordered_map<::sockaddr_in, promise<connected_socket, socket_address>> posix_ap_server_socket_impl::sockets;
|
||||
thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl::connection> posix_ap_server_socket_impl::conn_q;
|
||||
|
||||
server_socket
|
||||
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
|
||||
return server_socket(std::make_unique<posix_ap_server_socket_impl>(sa));
|
||||
}
|
||||
|
||||
struct cmsg_with_pktinfo {
|
||||
struct cmsghdrcmh;
|
||||
struct in_pktinfo pktinfo;
|
||||
};
|
||||
|
||||
std::vector<struct iovec> to_iovec(const packet& p) {
|
||||
std::vector<struct iovec> v;
|
||||
v.reserve(p.nr_frags());
|
||||
for (auto&& f : p.fragments()) {
|
||||
v.push_back({.iov_base = f.base, .iov_len = f.size});
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
class posix_udp_channel : public udp_channel_impl {
|
||||
private:
|
||||
static constexpr int MAX_DATAGRAM_SIZE = 65507;
|
||||
struct recv_ctx {
|
||||
struct msghdr _hdr;
|
||||
struct iovec _iov;
|
||||
socket_address _src_addr;
|
||||
char* _buffer;
|
||||
cmsg_with_pktinfo _cmsg;
|
||||
|
||||
recv_ctx() {
|
||||
memset(&_hdr, 0, sizeof(_hdr));
|
||||
_hdr.msg_iov = &_iov;
|
||||
_hdr.msg_iovlen = 1;
|
||||
_hdr.msg_name = &_src_addr.u.sa;
|
||||
_hdr.msg_namelen = sizeof(_src_addr.u.sas);
|
||||
memset(&_cmsg, 0, sizeof(_cmsg));
|
||||
_hdr.msg_control = &_cmsg;
|
||||
_hdr.msg_controllen = sizeof(_cmsg);
|
||||
}
|
||||
|
||||
void prepare() {
|
||||
_buffer = new char[MAX_DATAGRAM_SIZE];
|
||||
_iov.iov_base = _buffer;
|
||||
_iov.iov_len = MAX_DATAGRAM_SIZE;
|
||||
}
|
||||
};
|
||||
struct send_ctx {
|
||||
struct msghdr _hdr;
|
||||
std::vector<struct iovec> _iovecs;
|
||||
socket_address _dst;
|
||||
packet _p;
|
||||
|
||||
send_ctx() {
|
||||
memset(&_hdr, 0, sizeof(_hdr));
|
||||
_hdr.msg_name = &_dst.u.sa;
|
||||
_hdr.msg_namelen = sizeof(_dst.u.sas);
|
||||
}
|
||||
|
||||
void prepare(ipv4_addr dst, packet p) {
|
||||
_dst = make_ipv4_address(dst);
|
||||
_p = std::move(p);
|
||||
_iovecs = std::move(to_iovec(_p));
|
||||
_hdr.msg_iov = _iovecs.data();
|
||||
_hdr.msg_iovlen = _iovecs.size();
|
||||
}
|
||||
};
|
||||
std::unique_ptr<pollable_fd> _fd;
|
||||
ipv4_addr _address;
|
||||
recv_ctx _recv;
|
||||
send_ctx _send;
|
||||
bool _closed;
|
||||
public:
|
||||
posix_udp_channel(ipv4_addr bind_address)
|
||||
: _closed(false) {
|
||||
auto sa = make_ipv4_address(bind_address);
|
||||
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
|
||||
bool pktinfo_flag = true;
|
||||
::setsockopt(fd.get(), SOL_IP, IP_PKTINFO, &pktinfo_flag, sizeof(pktinfo_flag));
|
||||
fd.bind(sa.u.sa, sizeof(sa.u.sas));
|
||||
_address = ipv4_addr(fd.get_address());
|
||||
_fd = std::make_unique<pollable_fd>(std::move(fd));
|
||||
}
|
||||
virtual ~posix_udp_channel() {};
|
||||
virtual future<udp_datagram> receive() override;
|
||||
virtual future<> send(ipv4_addr dst, const char *msg);
|
||||
virtual future<> send(ipv4_addr dst, packet p);
|
||||
virtual void close() override {
|
||||
_closed = true;
|
||||
_fd.reset();
|
||||
}
|
||||
virtual bool is_closed() const override { return _closed; }
|
||||
};
|
||||
|
||||
future<> posix_udp_channel::send(ipv4_addr dst, const char *message) {
|
||||
auto len = strlen(message);
|
||||
return _fd->sendto(make_ipv4_address(dst), message, len)
|
||||
.then([len] (size_t size) { assert(size == len); });
|
||||
}
|
||||
|
||||
future<> posix_udp_channel::send(ipv4_addr dst, packet p) {
|
||||
auto len = p.len();
|
||||
_send.prepare(dst, std::move(p));
|
||||
return _fd->sendmsg(&_send._hdr)
|
||||
.then([len] (size_t size) { assert(size == len); });
|
||||
}
|
||||
|
||||
udp_channel
|
||||
posix_network_stack::make_udp_channel(ipv4_addr addr) {
|
||||
return udp_channel(std::make_unique<posix_udp_channel>(addr));
|
||||
}
|
||||
|
||||
class posix_datagram : public udp_datagram_impl {
|
||||
private:
|
||||
ipv4_addr _src;
|
||||
ipv4_addr _dst;
|
||||
packet _p;
|
||||
public:
|
||||
posix_datagram(ipv4_addr src, ipv4_addr dst, packet p) : _src(src), _dst(dst), _p(std::move(p)) {}
|
||||
virtual ipv4_addr get_src() override { return _src; }
|
||||
virtual ipv4_addr get_dst() override { return _dst; }
|
||||
virtual uint16_t get_dst_port() override { return _dst.port; }
|
||||
virtual packet& get_data() override { return _p; }
|
||||
};
|
||||
|
||||
future<udp_datagram>
|
||||
posix_udp_channel::receive() {
|
||||
_recv.prepare();
|
||||
return _fd->recvmsg(&_recv._hdr).then([this] (size_t size) {
|
||||
auto dst = ipv4_addr(_recv._cmsg.pktinfo.ipi_addr.s_addr, _address.port);
|
||||
return make_ready_future<udp_datagram>(udp_datagram(std::make_unique<posix_datagram>(
|
||||
_recv._src_addr, dst, packet(fragment{_recv._buffer, size}, [buf = _recv._buffer] { delete[] buf; }))));
|
||||
});
|
||||
}
|
||||
|
||||
network_stack_registrator nsr_posix{"posix",
|
||||
boost::program_options::options_description(),
|
||||
[](boost::program_options::variables_map ops) {
|
||||
return smp::main_thread() ? posix_network_stack::create(ops) : posix_ap_network_stack::create(ops);
|
||||
},
|
||||
true
|
||||
};
|
||||
|
||||
}
|
||||
80
net/posix-stack.hh
Normal file
80
net/posix-stack.hh
Normal file
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#ifndef POSIX_STACK_HH_
|
||||
#define POSIX_STACK_HH_
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
namespace net {
|
||||
|
||||
data_source posix_data_source(pollable_fd& fd);
|
||||
data_sink posix_data_sink(pollable_fd& fd);
|
||||
|
||||
class posix_data_source_impl final : public data_source_impl {
|
||||
pollable_fd& _fd;
|
||||
temporary_buffer<char> _buf;
|
||||
size_t _buf_size;
|
||||
public:
|
||||
explicit posix_data_source_impl(pollable_fd& fd, size_t buf_size = 8192)
|
||||
: _fd(fd), _buf(buf_size), _buf_size(buf_size) {}
|
||||
virtual future<temporary_buffer<char>> get() override;
|
||||
};
|
||||
|
||||
class posix_data_sink_impl : public data_sink_impl {
|
||||
pollable_fd& _fd;
|
||||
std::vector<temporary_buffer<char>> _data;
|
||||
private:
|
||||
future<> do_write(size_t idx);
|
||||
public:
|
||||
explicit posix_data_sink_impl(pollable_fd& fd) : _fd(fd) {}
|
||||
future<> put(std::vector<temporary_buffer<char>> data) override;
|
||||
};
|
||||
|
||||
class posix_ap_server_socket_impl : public server_socket_impl {
|
||||
struct connection {
|
||||
pollable_fd fd;
|
||||
socket_address addr;
|
||||
connection(pollable_fd xfd, socket_address xaddr) : fd(std::move(xfd)), addr(xaddr) {}
|
||||
};
|
||||
static thread_local std::unordered_map<::sockaddr_in, promise<connected_socket, socket_address>> sockets;
|
||||
static thread_local std::unordered_multimap<::sockaddr_in, connection> conn_q;
|
||||
socket_address _sa;
|
||||
public:
|
||||
explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {}
|
||||
virtual future<connected_socket, socket_address> accept();
|
||||
static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr);
|
||||
};
|
||||
|
||||
class posix_server_socket_impl : public server_socket_impl {
|
||||
socket_address _sa;
|
||||
pollable_fd _lfd;
|
||||
public:
|
||||
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
|
||||
virtual future<connected_socket, socket_address> accept();
|
||||
};
|
||||
|
||||
class posix_network_stack : public network_stack {
|
||||
public:
|
||||
posix_network_stack(boost::program_options::variables_map opts) {}
|
||||
virtual server_socket listen(socket_address sa, listen_options opts) override;
|
||||
virtual net::udp_channel make_udp_channel(ipv4_addr addr) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::unique_ptr<network_stack>(new posix_network_stack(opts));
|
||||
}
|
||||
};
|
||||
|
||||
class posix_ap_network_stack : public posix_network_stack {
|
||||
public:
|
||||
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)) {}
|
||||
virtual server_socket listen(socket_address sa, listen_options opts) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::unique_ptr<network_stack>(new posix_ap_network_stack(opts));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
104
net/stack.cc
104
net/stack.cc
@@ -14,26 +14,15 @@
|
||||
|
||||
namespace net {
|
||||
|
||||
class native_network_stack;
|
||||
|
||||
template <typename Protocol>
|
||||
class native_server_socket_impl;
|
||||
|
||||
template <typename Protocol>
|
||||
class native_connected_socket_impl;
|
||||
|
||||
template <typename Protocol>
|
||||
class native_connected_socket_impl : public connected_socket_impl {
|
||||
typename Protocol::connection _conn;
|
||||
class native_data_source_impl;
|
||||
class native_data_sink_impl;
|
||||
public:
|
||||
explicit native_connected_socket_impl(typename Protocol::connection conn)
|
||||
: _conn(std::move(conn)) {}
|
||||
virtual input_stream<char> input() override;
|
||||
virtual output_stream<char> output() override;
|
||||
};
|
||||
class native_network_stack;
|
||||
|
||||
// native_server_socket_impl
|
||||
template <typename Protocol>
|
||||
class native_server_socket_impl : public server_socket_impl {
|
||||
typename Protocol::listener _listener;
|
||||
@@ -42,39 +31,6 @@ public:
|
||||
virtual future<connected_socket, socket_address> accept() override;
|
||||
};
|
||||
|
||||
|
||||
|
||||
class native_network_stack : public network_stack {
|
||||
static std::unique_ptr<native_network_stack> _s;
|
||||
interface _netif;
|
||||
ipv4 _inet;
|
||||
udp_v4 _udp;
|
||||
using tcp4 = tcp<ipv4_traits>;
|
||||
public:
|
||||
explicit native_network_stack(boost::program_options::variables_map opts);
|
||||
virtual server_socket listen(socket_address sa, listen_options opt) override;
|
||||
virtual udp_channel make_udp_channel(ipv4_addr addr) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::make_unique<native_network_stack>(opts);
|
||||
}
|
||||
friend class native_server_socket_impl<tcp4>;
|
||||
};
|
||||
|
||||
udp_channel
|
||||
native_network_stack::make_udp_channel(ipv4_addr addr) {
|
||||
return _udp.make_channel(addr);
|
||||
}
|
||||
|
||||
native_network_stack::native_network_stack(boost::program_options::variables_map opts)
|
||||
: _netif(smp::main_thread() ? create_virtio_net_device(opts["tap-device"].as<std::string>(), opts) : create_proxy_net_device(opts))
|
||||
, _inet(&_netif)
|
||||
, _udp(_inet) {
|
||||
_inet.set_host_address(ipv4_address(opts["host-ipv4-addr"].as<std::string>()));
|
||||
_inet.set_gw_address(ipv4_address(opts["gw-ipv4-addr"].as<std::string>()));
|
||||
_inet.set_netmask_address(ipv4_address(opts["netmask-ipv4-addr"].as<std::string>()));
|
||||
_udp.set_queue_size(opts["udpv4-queue-size"].as<int>());
|
||||
}
|
||||
|
||||
template <typename Protocol>
|
||||
native_server_socket_impl<Protocol>::native_server_socket_impl(Protocol& proto, uint16_t port, listen_options opt)
|
||||
: _listener(proto.listen(port)) {
|
||||
@@ -90,12 +46,18 @@ native_server_socket_impl<Protocol>::accept() {
|
||||
});
|
||||
}
|
||||
|
||||
server_socket
|
||||
native_network_stack::listen(socket_address sa, listen_options opts) {
|
||||
assert(sa.as_posix_sockaddr().sa_family == AF_INET);
|
||||
return server_socket(std::make_unique<native_server_socket_impl<tcp4>>(
|
||||
_inet.get_tcp(), ntohs(sa.as_posix_sockaddr_in().sin_port), opts));
|
||||
}
|
||||
// native_connected_socket_impl
|
||||
template <typename Protocol>
|
||||
class native_connected_socket_impl : public connected_socket_impl {
|
||||
typename Protocol::connection _conn;
|
||||
class native_data_source_impl;
|
||||
class native_data_sink_impl;
|
||||
public:
|
||||
explicit native_connected_socket_impl(typename Protocol::connection conn)
|
||||
: _conn(std::move(conn)) {}
|
||||
virtual input_stream<char> input() override;
|
||||
virtual output_stream<char> output() override;
|
||||
};
|
||||
|
||||
template <typename Protocol>
|
||||
class native_connected_socket_impl<Protocol>::native_data_source_impl final
|
||||
@@ -160,6 +122,44 @@ native_connected_socket_impl<Protocol>::output() {
|
||||
return output_stream<char>(std::move(ds), 8192);
|
||||
}
|
||||
|
||||
// native_network_stack
|
||||
class native_network_stack : public network_stack {
|
||||
static std::unique_ptr<native_network_stack> _s;
|
||||
interface _netif;
|
||||
ipv4 _inet;
|
||||
udp_v4 _udp;
|
||||
using tcp4 = tcp<ipv4_traits>;
|
||||
public:
|
||||
explicit native_network_stack(boost::program_options::variables_map opts);
|
||||
virtual server_socket listen(socket_address sa, listen_options opt) override;
|
||||
virtual udp_channel make_udp_channel(ipv4_addr addr) override;
|
||||
static std::unique_ptr<network_stack> create(boost::program_options::variables_map opts) {
|
||||
return std::make_unique<native_network_stack>(opts);
|
||||
}
|
||||
friend class native_server_socket_impl<tcp4>;
|
||||
};
|
||||
|
||||
udp_channel
|
||||
native_network_stack::make_udp_channel(ipv4_addr addr) {
|
||||
return _udp.make_channel(addr);
|
||||
}
|
||||
|
||||
native_network_stack::native_network_stack(boost::program_options::variables_map opts)
|
||||
: _netif(smp::main_thread() ? create_virtio_net_device(opts["tap-device"].as<std::string>(), opts) : create_proxy_net_device(opts))
|
||||
, _inet(&_netif)
|
||||
, _udp(_inet) {
|
||||
_inet.set_host_address(ipv4_address(opts["host-ipv4-addr"].as<std::string>()));
|
||||
_inet.set_gw_address(ipv4_address(opts["gw-ipv4-addr"].as<std::string>()));
|
||||
_inet.set_netmask_address(ipv4_address(opts["netmask-ipv4-addr"].as<std::string>()));
|
||||
_udp.set_queue_size(opts["udpv4-queue-size"].as<int>());
|
||||
}
|
||||
|
||||
server_socket
|
||||
native_network_stack::listen(socket_address sa, listen_options opts) {
|
||||
assert(sa.as_posix_sockaddr().sa_family == AF_INET);
|
||||
return server_socket(std::make_unique<native_server_socket_impl<tcp4>>(
|
||||
_inet.get_tcp(), ntohs(sa.as_posix_sockaddr_in().sin_port), opts));
|
||||
}
|
||||
|
||||
std::unique_ptr<native_network_stack> native_network_stack::_s;
|
||||
|
||||
|
||||
15
net/tcp.hh
15
net/tcp.hh
@@ -9,6 +9,7 @@
|
||||
#include "core/queue.hh"
|
||||
#include "net.hh"
|
||||
#include "ip_checksum.hh"
|
||||
#include "const.hh"
|
||||
#include <unordered_map>
|
||||
#include <map>
|
||||
#include <functional>
|
||||
@@ -49,6 +50,11 @@ inline bool operator>(tcp_seq s, tcp_seq q) { return q < s; }
|
||||
inline bool operator<=(tcp_seq s, tcp_seq q) { return !(s > q); }
|
||||
inline bool operator>=(tcp_seq s, tcp_seq q) { return !(s < q); }
|
||||
|
||||
inline tcp_seq get_tcp_isn() {
|
||||
// FIXME: should increase every 4ms
|
||||
return make_seq(1000000);
|
||||
}
|
||||
|
||||
struct tcp_hdr {
|
||||
packed<uint16_t> src_port;
|
||||
packed<uint16_t> dst_port;
|
||||
@@ -268,7 +274,7 @@ void tcp<InetTraits>::received(packet p, ipaddr from, ipaddr to) {
|
||||
|
||||
if (!hw_features().rx_csum_offload) {
|
||||
checksummer csum;
|
||||
InetTraits::pseudo_header_checksum(csum, from, to, p.len());
|
||||
InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
|
||||
csum.sum(p);
|
||||
if (csum.get() != 0) {
|
||||
return;
|
||||
@@ -346,7 +352,7 @@ void tcp<InetTraits>::respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr f
|
||||
hton(*th);
|
||||
|
||||
checksummer csum;
|
||||
InetTraits::pseudo_header_checksum(csum, local_ip, foreign_ip, sizeof(*th));
|
||||
InetTraits::tcp_pseudo_header_checksum(csum, local_ip, foreign_ip, sizeof(*th));
|
||||
if (hw_features().tx_csum_offload) {
|
||||
th->checksum = ~csum.get();
|
||||
} else {
|
||||
@@ -372,6 +378,7 @@ void tcp<InetTraits>::tcb::input(tcp_hdr* th, packet p) {
|
||||
_rcv.window = 4500; // FIXME: what?
|
||||
_rcv.urgent = _rcv.next;
|
||||
_snd.wl1 = th->seq;
|
||||
_snd.next = _snd.initial = get_tcp_isn();
|
||||
} else {
|
||||
if (seg_seq != _rcv.initial) {
|
||||
return respond_with_reset(th);
|
||||
@@ -562,7 +569,7 @@ void tcp<InetTraits>::tcb::output() {
|
||||
hton(*th);
|
||||
|
||||
checksummer csum;
|
||||
InetTraits::pseudo_header_checksum(csum, _local_ip, _foreign_ip, sizeof(*th) + len);
|
||||
InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip, sizeof(*th) + len);
|
||||
if (_tcp.hw_features().tx_csum_offload) {
|
||||
// virtio-net's VIRTIO_NET_F_CSUM feature requires th->checksum to be
|
||||
// initialized to ones' complement sum of the pseudo header.
|
||||
@@ -574,7 +581,7 @@ void tcp<InetTraits>::tcb::output() {
|
||||
|
||||
offload_info oi;
|
||||
// TCP protocol
|
||||
oi.protocol = offload_info::protocol_type::tcp;
|
||||
oi.protocol = ip_protocol_num::tcp;
|
||||
// TCP hdr len
|
||||
oi.tcp_hdr_len = 20;
|
||||
p.set_offload_info(oi);
|
||||
|
||||
20
net/udp.cc
20
net/udp.cc
@@ -87,7 +87,7 @@ const int udp_v4::default_queue_size = 1024;
|
||||
udp_v4::udp_v4(ipv4& inet)
|
||||
: _inet(inet)
|
||||
{
|
||||
_inet.register_l4(protocol_number, this);
|
||||
_inet.register_l4(uint8_t(ip_protocol_num::udp), this);
|
||||
}
|
||||
|
||||
unsigned udp_v4::forward(packet& p, size_t off, ipv4_address from, ipv4_address to)
|
||||
@@ -116,13 +116,27 @@ void udp_v4::received(packet p, ipv4_address from, ipv4_address to)
|
||||
|
||||
future<> udp_v4::send(uint16_t src_port, ipv4_addr dst, packet &&p)
|
||||
{
|
||||
auto src = _inet.host_address();
|
||||
auto hdr = p.prepend_header<udp_hdr>();
|
||||
hdr->src_port = src_port;
|
||||
hdr->dst_port = dst.port;
|
||||
hdr->len = p.len();
|
||||
hdr->cksum = 0; // TODO: calculate checksum
|
||||
hton(*hdr);
|
||||
return _inet.send(dst, protocol_number, std::move(p));
|
||||
|
||||
checksummer csum;
|
||||
ipv4_traits::udp_pseudo_header_checksum(csum, src, dst, p.len());
|
||||
if (hw_features().tx_csum_offload) {
|
||||
hdr->cksum = ~csum.get();
|
||||
} else {
|
||||
csum.sum(p);
|
||||
hdr->cksum = csum.get();
|
||||
}
|
||||
|
||||
offload_info oi;
|
||||
oi.protocol = ip_protocol_num::udp;
|
||||
p.set_offload_info(oi);
|
||||
|
||||
return _inet.send(dst, ip_protocol_num::udp, std::move(p));
|
||||
}
|
||||
|
||||
uint16_t udp_v4::next_port(uint16_t port) {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <unordered_map>
|
||||
#include <assert.h>
|
||||
#include "net/api.hh"
|
||||
#include "const.hh"
|
||||
|
||||
namespace net {
|
||||
|
||||
@@ -42,7 +43,6 @@ class udp_v4 : ip_protocol {
|
||||
public:
|
||||
static const int default_queue_size;
|
||||
private:
|
||||
static const int protocol_number = 17;
|
||||
static const uint16_t min_anonymous_port = 32768;
|
||||
ipv4 &_inet;
|
||||
std::unordered_map<uint16_t, shared_ptr<udp_channel_state>> _channels;
|
||||
@@ -73,6 +73,7 @@ public:
|
||||
future<> send(uint16_t src_port, ipv4_addr dst, packet &&p);
|
||||
unsigned forward(packet& p, size_t off, ipv4_address from, ipv4_address to) override;
|
||||
void set_queue_size(int size) { _queue_size = size; }
|
||||
net::hw_features hw_features() { return _inet.hw_features(); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <linux/vhost.h>
|
||||
#include <linux/if_tun.h>
|
||||
#include "ip.hh"
|
||||
#include "const.hh"
|
||||
|
||||
using namespace net;
|
||||
|
||||
@@ -362,8 +363,9 @@ class virtio_net_device : public net::device {
|
||||
class rxq {
|
||||
virtio_net_device& _dev;
|
||||
vring _ring;
|
||||
packet _building;
|
||||
unsigned _remaining_buffers = 0;
|
||||
std::vector<fragment> _fragments;
|
||||
std::vector<std::unique_ptr<char[]>> _deleters;
|
||||
public:
|
||||
rxq(virtio_net_device& _if,
|
||||
vring::config config, readable_eventfd notified, writeable_eventfd kicked);
|
||||
@@ -412,22 +414,35 @@ virtio_net_device::txq::post(packet p) {
|
||||
|
||||
// Handle TCP checksum offload
|
||||
auto oi = p.offload_info();
|
||||
if (_dev.hw_features().tx_csum_offload && oi.protocol == offload_info::protocol_type::tcp) {
|
||||
if (_dev.hw_features().tx_csum_offload) {
|
||||
auto eth_hdr_len = sizeof(eth_hdr);
|
||||
auto ip_hdr_len = oi.ip_hdr_len;
|
||||
auto tcp_hdr_len = oi.tcp_hdr_len;
|
||||
vhdr.needs_csum = 1;
|
||||
vhdr.csum_start = eth_hdr_len + ip_hdr_len;
|
||||
// TCP checksum filed's offset within the TCP header is 16 bytes
|
||||
vhdr.csum_offset = 16;
|
||||
auto mtu = _dev.hw_features().mtu;
|
||||
if (_dev.hw_features().tx_tso && p.len() > mtu + eth_hdr_len) {
|
||||
// IPv4 TCP TSO
|
||||
vhdr.gso_type = net_hdr::gso_tcpv4;
|
||||
// Sum of Ethernet, IP and TCP header size
|
||||
vhdr.hdr_len = eth_hdr_len + ip_hdr_len + tcp_hdr_len;
|
||||
// Maximum segment size of packet after the offload
|
||||
vhdr.gso_size = mtu - ip_hdr_len - tcp_hdr_len;
|
||||
if (oi.protocol == ip_protocol_num::tcp) {
|
||||
auto tcp_hdr_len = oi.tcp_hdr_len;
|
||||
vhdr.needs_csum = 1;
|
||||
vhdr.csum_start = eth_hdr_len + ip_hdr_len;
|
||||
// TCP checksum filed's offset within the TCP header is 16 bytes
|
||||
vhdr.csum_offset = 16;
|
||||
if (_dev.hw_features().tx_tso && p.len() > mtu + eth_hdr_len) {
|
||||
// IPv4 TCP TSO
|
||||
vhdr.gso_type = net_hdr::gso_tcpv4;
|
||||
// Sum of Ethernet, IP and TCP header size
|
||||
vhdr.hdr_len = eth_hdr_len + ip_hdr_len + tcp_hdr_len;
|
||||
// Maximum segment size of packet after the offload
|
||||
vhdr.gso_size = mtu - ip_hdr_len - tcp_hdr_len;
|
||||
}
|
||||
} else if (oi.protocol == ip_protocol_num::udp) {
|
||||
auto udp_hdr_len = oi.udp_hdr_len;
|
||||
vhdr.needs_csum = 1;
|
||||
vhdr.csum_start = eth_hdr_len + ip_hdr_len;
|
||||
// UDP checksum filed's offset within the UDP header is 6 bytes
|
||||
vhdr.csum_offset = 6;
|
||||
if (_dev.hw_features().tx_ufo && p.len() > mtu + eth_hdr_len) {
|
||||
vhdr.gso_type = net_hdr::gso_udp;
|
||||
vhdr.hdr_len = eth_hdr_len + ip_hdr_len + udp_hdr_len;
|
||||
vhdr.gso_size = mtu - ip_hdr_len - udp_hdr_len;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,33 +493,35 @@ virtio_net_device::rxq::prepare_buffers() {
|
||||
b.addr = virt_to_phys(buf.get());
|
||||
b.len = 4096;
|
||||
b.writeable = true;
|
||||
b.completed.get_future().then([this, buf = buf.get()] (size_t len) {
|
||||
auto frag_buf = buf;
|
||||
b.completed.get_future().then([this, buf = std::move(buf)] (size_t len) mutable {
|
||||
auto frag_buf = buf.get();
|
||||
auto frag_len = len;
|
||||
// First buffer
|
||||
if (_remaining_buffers == 0) {
|
||||
auto hdr = reinterpret_cast<net_hdr_mrg*>(buf);
|
||||
auto hdr = reinterpret_cast<net_hdr_mrg*>(frag_buf);
|
||||
assert(hdr->num_buffers >= 1);
|
||||
// TODO: special-case for num_buffers == 1
|
||||
_remaining_buffers = hdr->num_buffers;
|
||||
_building = std::move(packet(_remaining_buffers));
|
||||
frag_buf += _dev._header_len;
|
||||
frag_len -= _dev._header_len;
|
||||
_fragments.clear();
|
||||
_deleters.clear();
|
||||
};
|
||||
|
||||
// Append current buffer
|
||||
packet p(fragment{frag_buf, frag_len}, [buf] { delete[] buf; });
|
||||
_building.append(std::move(p));
|
||||
_fragments.emplace_back(fragment{frag_buf, frag_len});
|
||||
_deleters.emplace_back(buf.release());
|
||||
_remaining_buffers--;
|
||||
|
||||
// Last buffer
|
||||
if (_remaining_buffers == 0) {
|
||||
_dev._rx_ready = _dev._rx_ready.then([this] () mutable {
|
||||
return _dev.queue_rx_packet(std::move(_building));
|
||||
packet p(_fragments.begin(), _fragments.end(), [deleters = std::move(_deleters)] () mutable { deleters.clear(); });
|
||||
_dev._rx_ready = _dev._rx_ready.then([this, p = std::move(p)] () mutable {
|
||||
return _dev.queue_rx_packet(std::move(p));
|
||||
});
|
||||
}
|
||||
});
|
||||
bc.push_back(std::move(b));
|
||||
buf.release();
|
||||
vbc.push_back(std::move(bc));
|
||||
}
|
||||
_ring.post(std::move(vbc));
|
||||
@@ -591,12 +608,14 @@ uint64_t virtio_net_device::setup_features() {
|
||||
}
|
||||
if (!(_opts.count("tso") && _opts["tso"].as<std::string>() == "off")) {
|
||||
seastar_supported_features |= VIRTIO_NET_F_HOST_TSO4;
|
||||
seastar_supported_features |= VIRTIO_NET_F_GUEST_TSO4;
|
||||
_hw_features.tx_tso = true;
|
||||
} else {
|
||||
_hw_features.tx_tso = false;
|
||||
}
|
||||
if (!(_opts.count("ufo") && _opts["ufo"].as<std::string>() == "off")) {
|
||||
seastar_supported_features |= VIRTIO_NET_F_HOST_UFO;
|
||||
seastar_supported_features |= VIRTIO_NET_F_GUEST_UFO;
|
||||
_hw_features.tx_ufo = true;
|
||||
} else {
|
||||
_hw_features.tx_ufo = false;
|
||||
|
||||
@@ -18,7 +18,7 @@ void dump_arp_packets(l3_protocol& proto) {
|
||||
int main(int ac, char** av) {
|
||||
auto vnet = create_virtio_net_device("tap0");
|
||||
interface netif(std::move(vnet));
|
||||
l3_protocol arp(&netif, 0x0806);
|
||||
l3_protocol arp(&netif, eth_protocol_num::arp);
|
||||
dump_arp_packets(arp);
|
||||
engine.run();
|
||||
return 0;
|
||||
|
||||
Reference in New Issue
Block a user