diff --git a/core/reactor.cc b/core/reactor.cc index d7545fb272..5978726e8d 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -652,44 +652,58 @@ void smp_message_queue::complete_kick() { } void smp_message_queue::move_pending() { - bool kick = false; - - while (_current_queue_length < queue_length && !_pending_fifo.empty()) { - _pending.push(_pending_fifo.front()); - _pending_fifo.pop(); - _current_queue_length++; - kick = true; - } - - if (kick) { - submit_kick(); + auto queue_room = queue_length - _current_queue_length; + auto nr = std::min(queue_room, _tx.a.pending_fifo.size()); + if (!nr) { + return; } + auto begin = _tx.a.pending_fifo.begin(); + auto end = begin + nr; + _pending.push(begin, end); + _tx.a.pending_fifo.erase(begin, end); + _current_queue_length += nr; + submit_kick(); } void smp_message_queue::submit_item(smp_message_queue::work_item* item) { - _pending_fifo.push(item); - move_pending(); + _tx.a.pending_fifo.push_back(item); + if (_tx.a.pending_fifo.size() >= batch_size) { + move_pending(); + } } void smp_message_queue::respond(work_item* item) { - // FIXME: batcing - _completed.push(item); + _completed_fifo.push_back(item); + if (_completed_fifo.size() >= batch_size) { + flush_response_batch(); + } +} + +void smp_message_queue::flush_response_batch() { + _completed.push(_completed_fifo.begin(), _completed_fifo.end()); + _completed_fifo.clear(); complete_kick(); } size_t smp_message_queue::process_completions() { - auto nr = _completed.consume_all([this] (work_item* wi) { - wi->complete(); - delete wi; - }); + // copy batch to local memory in order to minimize + // time in which cross-cpu data is accessed + work_item* items[queue_length]; + auto nr = _completed.pop(items); + for (unsigned i = 0; i < nr; ++i) { + items[i]->complete(); + delete items[i]; + } _current_queue_length -= nr; - move_pending(); - return nr; } +void smp_message_queue::flush_request_batch() { + move_pending(); +} + void smp_message_queue::complete() { _complete_event->wait().then([this] { process_completions(); @@ -698,14 +712,19 @@ void smp_message_queue::complete() { } size_t smp_message_queue::process_incoming() { - return _pending.consume_all([this] (smp_message_queue::work_item* wi) { + work_item* items[queue_length]; + auto nr = _pending.pop(items); + for (unsigned i = 0; i < nr; ++i) { + auto wi = items[i]; wi->process().then([this, wi] { respond(wi); }); - }); + } + return nr; } void smp_message_queue::start() { + _tx.init(); _complete_peer = &engine; complete(); } diff --git a/core/reactor.hh b/core/reactor.hh index c4f7635464..f9d04a0807 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -347,6 +347,7 @@ private: class smp_message_queue { static constexpr size_t queue_length = 128; + static constexpr size_t batch_size = 16; struct work_item; using lf_queue = boost::lockfree::spsc_queue>; @@ -395,7 +396,15 @@ class smp_message_queue { } Future get_future() { return _promise.get_future(); } }; - std::queue> _pending_fifo; + union tx_side { + tx_side() {} + ~tx_side() {} + void init() { new (&a) aa; } + struct aa { + std::deque pending_fifo; + } a; + } _tx; + std::vector _completed_fifo; public: smp_message_queue(); template @@ -418,6 +427,8 @@ private: void submit_kick(); void complete_kick(); void move_pending(); + void flush_request_batch(); + void flush_response_batch(); friend class smp; }; @@ -749,8 +760,12 @@ public: size_t got = 0; for (unsigned i = 0; i < count; i++) { if (engine.cpu_id() != i) { - got += _qs[engine.cpu_id()][i].process_incoming(); - got += _qs[i][engine._id].process_completions(); + auto& rxq = _qs[engine.cpu_id()][i]; + rxq.flush_response_batch(); + got += rxq.process_incoming(); + auto& txq = _qs[i][engine._id]; + txq.flush_request_batch(); + got += txq.process_completions(); } } return got != 0; diff --git a/net/virtio.cc b/net/virtio.cc index 0d14ed5091..a57a667e41 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -270,9 +270,12 @@ private: semaphore _available_descriptors = { 0 }; int _free_head = -1; int _free_last = -1; + std::vector _batch; + std::experimental::optional _poller; + bool _poll_mode = false; public: - explicit vring(config conf); + explicit vring(config conf, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _notifier = std::move(notifier); } @@ -298,17 +301,22 @@ public: template void post(Iterator begin, Iterator end); + void flush_batch(); + semaphore& available_descriptors() { return _available_descriptors; } private: // Let host know about interrupt delivery void disable_interrupts() { - if (!_config.event_index) { + if (!_poll_mode && !_config.event_index) { _avail._shared->_flags.store(VRING_AVAIL_F_NO_INTERRUPT, std::memory_order_relaxed); } } // Return "true" if there are pending buffers in the queue bool enable_interrupts() { + if (_poll_mode) { + return false; + } auto tail = _used._tail; if (!_config.event_index) { _avail._shared->_flags.store(0, std::memory_order_relaxed); @@ -380,7 +388,7 @@ unsigned vring::allocate_desc() { return desc; } -vring::vring(config conf) +vring::vring(config conf, bool poll_mode) : _config(conf) , _completions(new promise[_config.size]) , _descs(reinterpret_cast(conf.descs)) @@ -388,6 +396,7 @@ vring::vring(config conf) , _used(conf) , _avail_event(reinterpret_cast*>(&_used._shared->_used_elements[conf.size])) , _used_event(reinterpret_cast*>(&_avail._shared->_ring[conf.size])) + , _poll_mode(poll_mode) { setup(); } @@ -402,7 +411,27 @@ void vring::setup() { } void vring::run() { - complete(); + if (!_poll_mode) { + complete(); + } else { + _poller = reactor::poller([this] { + flush_batch(); + do_complete(); + return true; + }); + } +} + +void vring::flush_batch() { + if (_batch.empty()) { + return; + } + for (auto desc_head : _batch) { + _avail._shared->_ring[masked(_avail._head++)] = desc_head; + } + _batch.clear(); + _avail._shared->_idx.store(_avail._head, std::memory_order_release); + kick(); } template @@ -430,12 +459,20 @@ void vring::post(Iterator begin, Iterator end) { } auto desc_head = pseudo_head._next; _completions[desc_head] = std::move(bc.completed); - _avail._shared->_ring[masked(_avail._head++)] = desc_head; + if (!_poll_mode) { + _avail._shared->_ring[masked(_avail._head++)] = desc_head; + } else { + _batch.push_back(desc_head); + } _avail._avail_added_since_kick++; }); - _avail._shared->_idx.store(_avail._head, std::memory_order_release); - kick(); - do_complete(); + if (!_poll_mode) { + _avail._shared->_idx.store(_avail._head, std::memory_order_release); + kick(); + do_complete(); + } else if (_batch.size() >= 16) { + flush_batch(); + } } void vring::do_complete() { @@ -491,7 +528,7 @@ protected: virtio_qp& _dev; vring _ring; public: - txq(virtio_qp& dev, vring::config config); + txq(virtio_qp& dev, vring::config config, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _ring.set_notifier(std::move(notifier)); } @@ -511,7 +548,7 @@ protected: std::vector _fragments; std::vector> _deleters; public: - rxq(virtio_qp& _if, vring::config config); + rxq(virtio_qp& _if, vring::config config, bool poll_mode); void set_notifier(std::unique_ptr notifier) { _ring.set_notifier(std::move(notifier)); } @@ -541,7 +578,7 @@ protected: void common_config(vring::config& r); size_t vring_storage_size(size_t ring_size); public: - explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size); + explicit virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode); virtual future<> send(packet p) override; virtual void rx_start() override; virtual phys virt_to_phys(void* p) { @@ -549,8 +586,8 @@ public: } }; - virtio_qp::txq::txq(virtio_qp& dev, vring::config config) - : _dev(dev), _ring(config) { + virtio_qp::txq::txq(virtio_qp& dev, vring::config config, bool poll_mode) + : _dev(dev), _ring(config, poll_mode) { } future<> @@ -627,8 +664,8 @@ virtio_qp::txq::post(packet p) { }); } - virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config) - : _dev(dev), _ring(config) { + virtio_qp::rxq::rxq(virtio_qp& dev, vring::config config, bool poll_mode) + : _dev(dev), _ring(config, poll_mode) { } future<> @@ -701,12 +738,12 @@ static std::unique_ptr virtio_buffer(size_t size) { return std::unique_ptr(reinterpret_cast(ret)); } -virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size) +virtio_qp::virtio_qp(virtio_device* dev, size_t rx_ring_size, size_t tx_ring_size, bool poll_mode) : _dev(dev) , _txq_storage(virtio_buffer(vring_storage_size(tx_ring_size))) , _rxq_storage(virtio_buffer(vring_storage_size(rx_ring_size))) - , _txq(*this, txq_config(tx_ring_size)) - , _rxq(*this, rxq_config(rx_ring_size)) { + , _txq(*this, txq_config(tx_ring_size), poll_mode) + , _rxq(*this, rxq_config(rx_ring_size), poll_mode) { } size_t virtio_qp::vring_storage_size(size_t ring_size) { @@ -770,6 +807,9 @@ get_virtio_net_options_description() ("virtio-ring-size", boost::program_options::value()->default_value(256), "Virtio ring size (must be power-of-two)") + ("virtio-poll-mode", + boost::program_options::value()->default_value(false), + "Poll virtio rings instead of using interrupts") ; return opts; } @@ -792,7 +832,7 @@ static size_t config_ring_size(boost::program_options::variables_map &opts) { } virtio_qp_vhost::virtio_qp_vhost(virtio_device *dev, boost::program_options::variables_map opts) - : virtio_qp(dev, config_ring_size(opts), config_ring_size(opts)) + : virtio_qp(dev, config_ring_size(opts), config_ring_size(opts), opts["virtio-poll-mode"].as()) , _vhost_fd(file_desc::open("/dev/vhost-net", O_RDWR)) { auto tap_device = opts["tap-device"].as(); @@ -893,7 +933,7 @@ public: virtio_qp_osv::virtio_qp_osv(osv::assigned_virtio &virtio, boost::program_options::variables_map opts) - : virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1)) + : virtio_qp(opts, virtio.queue_size(0), virtio.queue_size(1), opts["virtio-poll-mode"].as()) , _virtio(virtio) { // Read the host's virtio supported feature bitmask, AND it with the