Merge branch 'nettx'
More virtio and smp batching.
This commit is contained in:
@@ -652,44 +652,58 @@ void smp_message_queue::complete_kick() {
|
||||
}
|
||||
|
||||
void smp_message_queue::move_pending() {
|
||||
bool kick = false;
|
||||
|
||||
while (_current_queue_length < queue_length && !_pending_fifo.empty()) {
|
||||
_pending.push(_pending_fifo.front());
|
||||
_pending_fifo.pop();
|
||||
_current_queue_length++;
|
||||
kick = true;
|
||||
}
|
||||
|
||||
if (kick) {
|
||||
submit_kick();
|
||||
auto queue_room = queue_length - _current_queue_length;
|
||||
auto nr = std::min(queue_room, _tx.a.pending_fifo.size());
|
||||
if (!nr) {
|
||||
return;
|
||||
}
|
||||
auto begin = _tx.a.pending_fifo.begin();
|
||||
auto end = begin + nr;
|
||||
_pending.push(begin, end);
|
||||
_tx.a.pending_fifo.erase(begin, end);
|
||||
_current_queue_length += nr;
|
||||
submit_kick();
|
||||
}
|
||||
|
||||
void smp_message_queue::submit_item(smp_message_queue::work_item* item) {
|
||||
_pending_fifo.push(item);
|
||||
move_pending();
|
||||
_tx.a.pending_fifo.push_back(item);
|
||||
if (_tx.a.pending_fifo.size() >= batch_size) {
|
||||
move_pending();
|
||||
}
|
||||
}
|
||||
|
||||
void smp_message_queue::respond(work_item* item) {
|
||||
// FIXME: batcing
|
||||
_completed.push(item);
|
||||
_completed_fifo.push_back(item);
|
||||
if (_completed_fifo.size() >= batch_size) {
|
||||
flush_response_batch();
|
||||
}
|
||||
}
|
||||
|
||||
void smp_message_queue::flush_response_batch() {
|
||||
_completed.push(_completed_fifo.begin(), _completed_fifo.end());
|
||||
_completed_fifo.clear();
|
||||
complete_kick();
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_completions() {
|
||||
auto nr = _completed.consume_all([this] (work_item* wi) {
|
||||
wi->complete();
|
||||
delete wi;
|
||||
});
|
||||
// copy batch to local memory in order to minimize
|
||||
// time in which cross-cpu data is accessed
|
||||
work_item* items[queue_length];
|
||||
auto nr = _completed.pop(items);
|
||||
for (unsigned i = 0; i < nr; ++i) {
|
||||
items[i]->complete();
|
||||
delete items[i];
|
||||
}
|
||||
|
||||
_current_queue_length -= nr;
|
||||
|
||||
move_pending();
|
||||
|
||||
return nr;
|
||||
}
|
||||
|
||||
void smp_message_queue::flush_request_batch() {
|
||||
move_pending();
|
||||
}
|
||||
|
||||
void smp_message_queue::complete() {
|
||||
_complete_event->wait().then([this] {
|
||||
process_completions();
|
||||
@@ -698,14 +712,19 @@ void smp_message_queue::complete() {
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_incoming() {
|
||||
return _pending.consume_all([this] (smp_message_queue::work_item* wi) {
|
||||
work_item* items[queue_length];
|
||||
auto nr = _pending.pop(items);
|
||||
for (unsigned i = 0; i < nr; ++i) {
|
||||
auto wi = items[i];
|
||||
wi->process().then([this, wi] {
|
||||
respond(wi);
|
||||
});
|
||||
});
|
||||
}
|
||||
return nr;
|
||||
}
|
||||
|
||||
void smp_message_queue::start() {
|
||||
_tx.init();
|
||||
_complete_peer = &engine;
|
||||
complete();
|
||||
}
|
||||
|
||||
@@ -347,6 +347,7 @@ private:
|
||||
|
||||
class smp_message_queue {
|
||||
static constexpr size_t queue_length = 128;
|
||||
static constexpr size_t batch_size = 16;
|
||||
struct work_item;
|
||||
using lf_queue = boost::lockfree::spsc_queue<work_item*,
|
||||
boost::lockfree::capacity<queue_length>>;
|
||||
@@ -395,7 +396,15 @@ class smp_message_queue {
|
||||
}
|
||||
Future get_future() { return _promise.get_future(); }
|
||||
};
|
||||
std::queue<work_item*, circular_buffer<work_item*>> _pending_fifo;
|
||||
union tx_side {
|
||||
tx_side() {}
|
||||
~tx_side() {}
|
||||
void init() { new (&a) aa; }
|
||||
struct aa {
|
||||
std::deque<work_item*> pending_fifo;
|
||||
} a;
|
||||
} _tx;
|
||||
std::vector<work_item*> _completed_fifo;
|
||||
public:
|
||||
smp_message_queue();
|
||||
template <typename Func>
|
||||
@@ -418,6 +427,8 @@ private:
|
||||
void submit_kick();
|
||||
void complete_kick();
|
||||
void move_pending();
|
||||
void flush_request_batch();
|
||||
void flush_response_batch();
|
||||
|
||||
friend class smp;
|
||||
};
|
||||
@@ -749,8 +760,12 @@ public:
|
||||
size_t got = 0;
|
||||
for (unsigned i = 0; i < count; i++) {
|
||||
if (engine.cpu_id() != i) {
|
||||
got += _qs[engine.cpu_id()][i].process_incoming();
|
||||
got += _qs[i][engine._id].process_completions();
|
||||
auto& rxq = _qs[engine.cpu_id()][i];
|
||||
rxq.flush_response_batch();
|
||||
got += rxq.process_incoming();
|
||||
auto& txq = _qs[i][engine._id];
|
||||
txq.flush_request_batch();
|
||||
got += txq.process_completions();
|
||||
}
|
||||
}
|
||||
return got != 0;
|
||||
|
||||
@@ -270,9 +270,12 @@ private:
|
||||
semaphore _available_descriptors = { 0 };
|
||||
int _free_head = -1;
|
||||
int _free_last = -1;
|
||||
std::vector<uint16_t> _batch;
|
||||
std::experimental::optional<reactor::poller> _poller;
|
||||
bool _poll_mode = false;
|
||||
public:
|
||||
|
||||
explicit vring(config conf);
|
||||
explicit vring(config conf, bool poll_mode);
|
||||
void set_notifier(std::unique_ptr<virtio_notifier> notifier) {
|
||||
_notifier = std::move(notifier);
|
||||
}
|
||||
@@ -298,17 +301,22 @@ public:
|
||||
template <typename Iterator>
|
||||
void post(Iterator begin, Iterator end);
|
||||
|
||||
void flush_batch();
|
||||
|
||||
semaphore& available_descriptors() { return _available_descriptors; }
|
||||
private:
|
||||
// Let host know about interrupt delivery
|
||||
void disable_interrupts() {
|
||||
if (!_config.event_index) {
|
||||
if (!_poll_mode && !_config.event_index) {
|
||||
_avail._shared->_flags.store(VRING_AVAIL_F_NO_INTERRUPT, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
// Return "true" if there are pending buffers in the queue
|
||||
bool enable_interrupts() {
|
||||
if (_poll_mode) {
|
||||
return false;
|
||||
}
|
||||
auto tail = _used._tail;
|
||||
if (!_config.event_index) {
|
||||
_avail._shared->_flags.store(0, std::memory_order_relaxed);
|
||||
@@ -380,7 +388,7 @@ unsigned vring::allocate_desc() {
|
||||
return desc;
|
||||
}
|
||||
|
||||
vring::vring(config conf)
|
||||
vring::vring(config conf, bool poll_mode)
|
||||
: _config(conf)
|
||||
, _completions(new promise<size_t>[_config.size])
|
||||
, _descs(reinterpret_cast<desc*>(conf.descs))
|
||||
@@ -388,6 +396,7 @@ vring::vring(config conf)
|
||||
, _used(conf)
|
||||
, _avail_event(reinterpret_cast<std::atomic<uint16_t>*>(&_used._shared->_used_elements[conf.size]))
|
||||
, _used_event(reinterpret_cast<std::atomic<uint16_t>*>(&_avail._shared->_ring[conf.size]))
|
||||
, _poll_mode(poll_mode)
|
||||
{
|
||||
setup();
|
||||
}
|
||||
@@ -402,7 +411,27 @@ void vring::setup() {
|
||||
}
|
||||
|
||||
void vring::run() {
|
||||
complete();
|
||||
if (!_poll_mode) {
|
||||
complete();
|
||||
} else {
|
||||
_poller = reactor::poller([this] {
|
||||
flush_batch();
|
||||
do_complete();
|
||||
return true;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void vring::flush_batch() {
|
||||
if (_batch.empty()) {
|
||||
return;
|
||||
}
|
||||
for (auto desc_head : _batch) {
|
||||
_avail._shared->_ring[masked(_avail._head++)] = desc_head;
|
||||
}
|
||||
_batch.clear();
|
||||
_avail._shared->_idx.store(_avail._head, std::memory_order_release);
|
||||
kick();
|
||||
}
|
||||
|
||||
template <typename Iterator>
|
||||
@@ -430,12 +459,20 @@ void vring::post(Iterator begin, Iterator end) {
|
||||
}
|
||||
auto desc_head = pseudo_head._next;
|
||||
_completions[desc_head] = std::move(bc.completed);
|
||||
_avail._shared->_ring[masked(_avail._head++)] = desc_head;
|
||||
if (!_poll_mode) {
|
||||
_avail._shared->_ring[masked(_avail._head++)] = desc_head;
|
||||
} else {
|
||||
_batch.push_back(desc_head);
|
||||
}
|
||||
_avail._avail_added_since_kick++;
|
||||
});
|
||||
_avail._shared->_idx.store(_avail._head, std::memory_order_release);
|
||||
kick();
|
||||
do_complete();
|
||||
if (!_poll_mode) {
|
||||
_avail._shared->_idx.store(_avail._head, std::memory_order_release);
|
||||
kick();
|
||||
do_complete();
|
||||
} else if (_batch.size() >= 16) {
|
||||
flush_batch();
|
||||
}
|
||||
}
|
||||
|
||||
void vring::do_complete() {
|
||||
@@ -491,7 +528,7 @@ protected:
|
||||
virtio_qp& _dev;
|
||||
vring _ring;
|
||||
public:
|
||||
txq(virtio_qp& dev, vring::config config);
|
||||
txq(virtio_qp& dev, vring::config config, bool poll_mode);
|
||||
void set_notifier(std::unique_ptr<virtio_notifier> notifier) {
|
||||
_ring.set_notifier(std::move(notifier));
|
||||
}
|
||||
@@ -511,7 +548,7 @@ protected:
|
||||
std::vector<fragment> _fragments;
|
||||
std::vector<std::unique_ptr<char[], free_deleter>> _deleters;
|
||||
public:
|
||||
rxq(virtio_qp& _if, vring::config config);
|
||||
rxq(virtio_qp& _if, vring::config config, bool poll_mode);
|
||||
void set_notifier(std::unique_ptr<virtio_notifier> notifier) {
|
||||
_ring.set_notifier(std::move(notifier));
|
||||
}
|
||||
@@ -541,7 +578,7 @@ protected:
|
||||
void common_config(vring::config& r);
|
||||
size_t vring_storage_size(size_t ring_size);
|
||||
public:
|
||||
explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size);
|
||||
explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode);
|
||||
virtual future<> send(packet p) override;
|
||||
virtual void rx_start() override;
|
||||
virtual phys virt_to_phys(void* p) {
|
||||
@@ -549,8 +586,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
virtio_qp::txq::txq(virtio_qp& dev, vring::config config)
|
||||
: _dev(dev), _ring(config) {
|
||||
virtio_qp::txq::txq(virtio_qp& dev, vring::config config, bool poll_mode)
|
||||
: _dev(dev), _ring(config, poll_mode) {
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -627,8 +664,8 @@ virtio_qp::txq::post(packet p) {
|
||||
});
|
||||
}
|
||||
|
||||
virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config)
|
||||
: _dev(dev), _ring(config) {
|
||||
virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config, bool poll_mode)
|
||||
: _dev(dev), _ring(config, poll_mode) {
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -701,12 +738,12 @@ static std::unique_ptr<char[], free_deleter> virtio_buffer(size_t size) {
|
||||
return std::unique_ptr<char[], free_deleter>(reinterpret_cast<char*>(ret));
|
||||
}
|
||||
|
||||
virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size)
|
||||
virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode)
|
||||
: _dev(dev)
|
||||
, _txq_storage(virtio_buffer(vring_storage_size(tx_ring_size)))
|
||||
, _rxq_storage(virtio_buffer(vring_storage_size(rx_ring_size)))
|
||||
, _txq(*this, txq_config(tx_ring_size))
|
||||
, _rxq(*this, rxq_config(rx_ring_size)) {
|
||||
, _txq(*this, txq_config(tx_ring_size), poll_mode)
|
||||
, _rxq(*this, rxq_config(rx_ring_size), poll_mode) {
|
||||
}
|
||||
|
||||
size_t virtio_qp::vring_storage_size(size_t ring_size) {
|
||||
@@ -770,6 +807,9 @@ get_virtio_net_options_description()
|
||||
("virtio-ring-size",
|
||||
boost::program_options::value<unsigned>()->default_value(256),
|
||||
"Virtio ring size (must be power-of-two)")
|
||||
("virtio-poll-mode",
|
||||
boost::program_options::value<bool>()->default_value(false),
|
||||
"Poll virtio rings instead of using interrupts")
|
||||
;
|
||||
return opts;
|
||||
}
|
||||
@@ -792,7 +832,7 @@ static size_t config_ring_size(boost::program_options::variables_map &opts) {
|
||||
}
|
||||
|
||||
virtio_qp_vhost::virtio_qp_vhost(virtio_device *dev, boost::program_options::variables_map opts)
|
||||
: virtio_qp(dev, config_ring_size(opts), config_ring_size(opts))
|
||||
: virtio_qp(dev, config_ring_size(opts), config_ring_size(opts), opts["virtio-poll-mode"].as<bool>())
|
||||
, _vhost_fd(file_desc::open("/dev/vhost-net", O_RDWR))
|
||||
{
|
||||
auto tap_device = opts["tap-device"].as<std::string>();
|
||||
@@ -893,7 +933,7 @@ public:
|
||||
|
||||
virtio_qp_osv::virtio_qp_osv(osv::assigned_virtio &virtio,
|
||||
boost::program_options::variables_map opts)
|
||||
: virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1))
|
||||
: virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1), opts["virtio-poll-mode"].as<bool>())
|
||||
, _virtio(virtio)
|
||||
{
|
||||
// Read the host's virtio supported feature bitmask, AND it with the
|
||||
|
||||
Reference in New Issue
Block a user