From 4e653081a4b127e4aba6eaf1eea0572efd523559 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 4 Dec 2014 11:51:24 +0200 Subject: [PATCH 1/8] virtio: poll mode support With a new --virtio-poll-mode, poll queues instead of waiting for an interrupt. Increases httpd throughput by about 12%. --- net/virtio.cc | 48 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/net/virtio.cc b/net/virtio.cc index 0d14ed5091..c461af3edc 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -270,9 +270,11 @@ private: semaphore _available_descriptors = { 0 }; int _free_head = -1; int _free_last = -1; + std::experimental::optional _poller; + bool _poll_mode = false; public: - explicit vring(config conf); + explicit vring(config conf, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _notifier = std::move(notifier); } @@ -302,13 +304,16 @@ public: private: // Let host know about interrupt delivery void disable_interrupts() { - if (!_config.event_index) { + if (!_poll_mode && !_config.event_index) { _avail._shared->_flags.store(VRING_AVAIL_F_NO_INTERRUPT, std::memory_order_relaxed); } } // Return "true" if there are pending buffers in the queue bool enable_interrupts() { + if (_poll_mode) { + return false; + } auto tail = _used._tail; if (!_config.event_index) { _avail._shared->_flags.store(0, std::memory_order_relaxed); @@ -380,7 +385,7 @@ unsigned vring::allocate_desc() { return desc; } -vring::vring(config conf) +vring::vring(config conf, bool poll_mode) : _config(conf) , _completions(new promise[_config.size]) , _descs(reinterpret_cast(conf.descs)) @@ -388,6 +393,7 @@ vring::vring(config conf) , _used(conf) , _avail_event(reinterpret_cast*>(&_used._shared->_used_elements[conf.size])) , _used_event(reinterpret_cast*>(&_avail._shared->_ring[conf.size])) + , _poll_mode(poll_mode) { setup(); } @@ -402,7 +408,14 @@ void vring::setup() { } void vring::run() { - complete(); + if (!_poll_mode) { + complete(); + } else { + _poller = reactor::poller([this] { + do_complete(); + return true; + }); + } } template @@ -491,7 +504,7 @@ protected: virtio_qp& _dev; vring _ring; public: - txq(virtio_qp& dev, vring::config config); + txq(virtio_qp& dev, vring::config config, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _ring.set_notifier(std::move(notifier)); } @@ -511,7 +524,7 @@ protected: std::vector _fragments; std::vector> _deleters; public: - rxq(virtio_qp& _if, vring::config config); + rxq(virtio_qp& _if, vring::config config, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _ring.set_notifier(std::move(notifier)); } @@ -541,7 +554,7 @@ protected: void common_config(vring::config& r); size_t vring_storage_size(size_t ring_size); public: - explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size); + explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode); virtual future<> send(packet p) override; virtual void rx_start() override; virtual phys virt_to_phys(void* p) { @@ -549,8 +562,8 @@ public: } }; - virtio_qp::txq::txq(virtio_qp& dev, vring::config config) - : _dev(dev), _ring(config) { + virtio_qp::txq::txq(virtio_qp& dev, vring::config config, bool poll_mode) + : _dev(dev), _ring(config, poll_mode) { } future<> @@ -627,8 +640,8 @@ virtio_qp::txq::post(packet p) { }); } - virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config) - : _dev(dev), _ring(config) { + virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config, bool poll_mode) + : _dev(dev), _ring(config, poll_mode) { } future<> @@ -701,12 +714,12 @@ static std::unique_ptr virtio_buffer(size_t size) { return std::unique_ptr(reinterpret_cast(ret)); } -virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size) +virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode) : _dev(dev) , _txq_storage(virtio_buffer(vring_storage_size(tx_ring_size))) , _rxq_storage(virtio_buffer(vring_storage_size(rx_ring_size))) - , _txq(*this, txq_config(tx_ring_size)) - , _rxq(*this, rxq_config(rx_ring_size)) { + , _txq(*this, txq_config(tx_ring_size), poll_mode) + , _rxq(*this, rxq_config(rx_ring_size), poll_mode) { } size_t virtio_qp::vring_storage_size(size_t ring_size) { @@ -770,6 +783,9 @@ get_virtio_net_options_description() ("virtio-ring-size", boost::program_options::value()->default_value(256), "Virtio ring size (must be power-of-two)") + ("virtio-poll-mode", + boost::program_options::value()->default_value(false), + "Poll virtio rings instead of using interrupts") ; return opts; } @@ -792,7 +808,7 @@ static size_t config_ring_size(boost::program_options::variables_map &opts) { } virtio_qp_vhost::virtio_qp_vhost(virtio_device *dev, boost::program_options::variables_map opts) - : virtio_qp(dev, config_ring_size(opts), config_ring_size(opts)) + : virtio_qp(dev, config_ring_size(opts), config_ring_size(opts), opts["virtio-poll-mode"].as()) , _vhost_fd(file_desc::open("/dev/vhost-net", O_RDWR)) { auto tap_device = opts["tap-device"].as(); @@ -893,7 +909,7 @@ public: virtio_qp_osv::virtio_qp_osv(osv::assigned_virtio &virtio, boost::program_options::variables_map opts) - : virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1)) + : virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1), opts["virtio-poll-mode"].as()) , _virtio(virtio) { // Read the host's virtio supported feature bitmask, AND it with the From 97dff834615364b430a9aa4eb3ce7c2652a5829e Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 4 Dec 2014 12:00:28 +0200 Subject: [PATCH 2/8] virtio: don't try to complete after posting a buffer, if in poll mode We will poll for it soon anyway, and completing too soon simply reduces batching. --- net/virtio.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/virtio.cc b/net/virtio.cc index c461af3edc..e8d7049348 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -448,7 +448,9 @@ void vring::post(Iterator begin, Iterator end) { }); _avail._shared->_idx.store(_avail._head, std::memory_order_release); kick(); - do_complete(); + if (!_poll_mode) { + do_complete(); + } } void vring::do_complete() { From 503f1bf4d049cfb5b5a8b9cae8a1200f0d5b5cc7 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 11 Dec 2014 19:19:09 +0200 Subject: [PATCH 3/8] virtio: batch transmitted packets Instead of placing packets directly into the virtio ring, add them to a temporary queue, and flush it when we are polled. This reduces cross-cpu writes and kicks. --- net/virtio.cc | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/net/virtio.cc b/net/virtio.cc index e8d7049348..a57a667e41 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -270,6 +270,7 @@ private: semaphore _available_descriptors = { 0 }; int _free_head = -1; int _free_last = -1; + std::vector _batch; std::experimental::optional _poller; bool _poll_mode = false; public: @@ -300,6 +301,8 @@ public: template void post(Iterator begin, Iterator end); + void flush_batch(); + semaphore& available_descriptors() { return _available_descriptors; } private: // Let host know about interrupt delivery @@ -412,12 +415,25 @@ void vring::run() { complete(); } else { _poller = reactor::poller([this] { + flush_batch(); do_complete(); return true; }); } } +void vring::flush_batch() { + if (_batch.empty()) { + return; + } + for (auto desc_head : _batch) { + _avail._shared->_ring[masked(_avail._head++)] = desc_head; + } + _batch.clear(); + _avail._shared->_idx.store(_avail._head, std::memory_order_release); + kick(); +} + template void vring::post(Iterator begin, Iterator end) { // Note: buffer_chain here is any container of buffer, not @@ -443,13 +459,19 @@ void vring::post(Iterator begin, Iterator end) { } auto desc_head = pseudo_head._next; _completions[desc_head] = std::move(bc.completed); - _avail._shared->_ring[masked(_avail._head++)] = desc_head; + if (!_poll_mode) { + _avail._shared->_ring[masked(_avail._head++)] = desc_head; + } else { + _batch.push_back(desc_head); + } _avail._avail_added_since_kick++; }); - _avail._shared->_idx.store(_avail._head, std::memory_order_release); - kick(); if (!_poll_mode) { + _avail._shared->_idx.store(_avail._head, std::memory_order_release); + kick(); do_complete(); + } else if (_batch.size() >= 16) { + flush_batch(); } } From b6485bcb7c8440f09884caf0d235bf965af1e6d0 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 7 Dec 2014 13:52:59 +0200 Subject: [PATCH 4/8] smp: initialize _pending_fifo on sending cpu If it needs to be resized, it will cause a deallocation on the wrong cpu, so initialize it on the sending cpu. Does not break with circular_buffer<>, but it's not going to be a circular_buffer<> for long. --- core/reactor.cc | 9 +++++---- core/reactor.hh | 9 ++++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 31f84f2510..00a44af9f5 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -654,9 +654,9 @@ void smp_message_queue::complete_kick() { void smp_message_queue::move_pending() { bool kick = false; - while (_current_queue_length < queue_length && !_pending_fifo.empty()) { - _pending.push(_pending_fifo.front()); - _pending_fifo.pop(); + while (_current_queue_length < queue_length && !_tx.a.pending_fifo.empty()) { + _pending.push(_tx.a.pending_fifo.front()); + _tx.a.pending_fifo.pop(); _current_queue_length++; kick = true; } @@ -667,7 +667,7 @@ void smp_message_queue::move_pending() { } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { - _pending_fifo.push(item); + _tx.a.pending_fifo.push(item); move_pending(); } @@ -706,6 +706,7 @@ size_t smp_message_queue::process_incoming() { } void smp_message_queue::start() { + _tx.init(); _complete_peer = &engine; complete(); } diff --git a/core/reactor.hh b/core/reactor.hh index c4f7635464..c84134c06f 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -395,7 +395,14 @@ class smp_message_queue { } Future get_future() { return _promise.get_future(); } }; - std::queue> _pending_fifo; + union tx_side { + tx_side() {} + ~tx_side() {} + void init() { new (&a) aa; } + struct aa { + std::queue> pending_fifo; + } a; + } _tx; public: smp_message_queue(); template From 2717ac3c373bd32aa0822f448a010ac94ca36e0f Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 7 Dec 2014 11:24:58 +0200 Subject: [PATCH 5/8] smp: improve _pending_fifo flushing Instead of flushing pending items one by one, flush them all at once, amortizing the write to the index. --- core/reactor.cc | 23 +++++++++++------------ core/reactor.hh | 2 +- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 00a44af9f5..b2dabe0fc0 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -652,22 +652,21 @@ void smp_message_queue::complete_kick() { } void smp_message_queue::move_pending() { - bool kick = false; - - while (_current_queue_length < queue_length && !_tx.a.pending_fifo.empty()) { - _pending.push(_tx.a.pending_fifo.front()); - _tx.a.pending_fifo.pop(); - _current_queue_length++; - kick = true; - } - - if (kick) { - submit_kick(); + auto queue_room = queue_length - _current_queue_length; + auto nr = std::min(queue_room, _tx.a.pending_fifo.size()); + if (!nr) { + return; } + auto begin = _tx.a.pending_fifo.begin(); + auto end = begin + nr; + _pending.push(begin, end); + _tx.a.pending_fifo.erase(begin, end); + _current_queue_length += nr; + submit_kick(); } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { - _tx.a.pending_fifo.push(item); + _tx.a.pending_fifo.push_back(item); move_pending(); } diff --git a/core/reactor.hh b/core/reactor.hh index c84134c06f..3ee305f138 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -400,7 +400,7 @@ class smp_message_queue { ~tx_side() {} void init() { new (&a) aa; } struct aa { - std::queue> pending_fifo; + std::deque pending_fifo; } a; } _tx; public: From 04488eebea6542b1080ecd02b7a405c232964864 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 7 Dec 2014 12:26:08 +0200 Subject: [PATCH 6/8] smp: batch messages across smp request/response queues Instead of incurring the overhead of pushing a message down the queue (two cache line misses), amortize of over 16 messages (3/4 cache line misses per batch). Batch size is limited by poll frequency, so we should adjust that dynamically. --- core/reactor.cc | 21 ++++++++++++++++----- core/reactor.hh | 12 ++++++++++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index b2dabe0fc0..57ffb14e2c 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -667,12 +667,21 @@ void smp_message_queue::move_pending() { void smp_message_queue::submit_item(smp_message_queue::work_item* item) { _tx.a.pending_fifo.push_back(item); - move_pending(); + if (_tx.a.pending_fifo.size() >= batch_size) { + move_pending(); + } } void smp_message_queue::respond(work_item* item) { - // FIXME: batcing - _completed.push(item); + _completed_fifo.push_back(item); + if (_completed_fifo.size() >= batch_size) { + flush_response_batch(); + } +} + +void smp_message_queue::flush_response_batch() { + _completed.push(_completed_fifo.begin(), _completed_fifo.end()); + _completed_fifo.clear(); complete_kick(); } @@ -684,11 +693,13 @@ size_t smp_message_queue::process_completions() { _current_queue_length -= nr; - move_pending(); - return nr; } +void smp_message_queue::flush_request_batch() { + move_pending(); +} + void smp_message_queue::complete() { _complete_event->wait().then([this] { process_completions(); diff --git a/core/reactor.hh b/core/reactor.hh index 3ee305f138..f9d04a0807 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -347,6 +347,7 @@ private: class smp_message_queue { static constexpr size_t queue_length = 128; + static constexpr size_t batch_size = 16; struct work_item; using lf_queue = boost::lockfree::spsc_queue>; @@ -403,6 +404,7 @@ class smp_message_queue { std::deque pending_fifo; } a; } _tx; + std::vector _completed_fifo; public: smp_message_queue(); template @@ -425,6 +427,8 @@ private: void submit_kick(); void complete_kick(); void move_pending(); + void flush_request_batch(); + void flush_response_batch(); friend class smp; }; @@ -756,8 +760,12 @@ public: size_t got = 0; for (unsigned i = 0; i < count; i++) { if (engine.cpu_id() != i) { - got += _qs[engine.cpu_id()][i].process_incoming(); - got += _qs[i][engine._id].process_completions(); + auto& rxq = _qs[engine.cpu_id()][i]; + rxq.flush_response_batch(); + got += rxq.process_incoming(); + auto& txq = _qs[i][engine._id]; + txq.flush_request_batch(); + got += txq.process_completions(); } } return got != 0; From 5855f0c82aa7d0f0b93fe22a07e5104359e1dc5f Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 11 Dec 2014 15:05:05 +0200 Subject: [PATCH 7/8] smp: batch completion processing We're currently using boost::lockfree::consume_all() to consume smp completions, but this has two problems: 1. consume_all() calls consume_one() internally, which means it accesses the ring index once per message 2 we interleave calling the request function with accessing the ring, which allows the other side to access the ring again, bouncing ring cache lines. Fix by copying all available items in one show, using pop(array), and then processing them afterwards. --- core/reactor.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 57ffb14e2c..e3487b239d 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -686,10 +686,14 @@ void smp_message_queue::flush_response_batch() { } size_t smp_message_queue::process_completions() { - auto nr = _completed.consume_all([this] (work_item* wi) { - wi->complete(); - delete wi; - }); + // copy batch to local memory in order to minimize + // time in which cross-cpu data is accessed + work_item* items[queue_length]; + auto nr = _completed.pop(items); + for (unsigned i = 0; i < nr; ++i) { + items[i]->complete(); + delete items[i]; + } _current_queue_length -= nr; From d11803d1b998f2a4e90f0cd263baede0d637e528 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 11 Dec 2014 15:05:05 +0200 Subject: [PATCH 8/8] smp: batch request processing We're currently using boost::lockfree::consume_all() to consume smp requests, but this has two problems: 1. consume_all() calls consume_one() internally, which means it accesses the ring index once per message 2 we interleave calling the request function with accessing the ring, which allows the other side to access the ring again, bouncing ring cache lines. Fix by copying all available items in one show, using pop(array), and then processing them afterwards. --- core/reactor.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index e3487b239d..0abd918615 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -712,11 +712,15 @@ void smp_message_queue::complete() { } size_t smp_message_queue::process_incoming() { - return _pending.consume_all([this] (smp_message_queue::work_item* wi) { + work_item* items[queue_length]; + auto nr = _pending.pop(items); + for (unsigned i = 0; i < nr; ++i) { + auto wi = items[i]; wi->process().then([this, wi] { respond(wi); }); - }); + } + return nr; } void smp_message_queue::start() {