diff --git a/core/reactor.cc b/core/reactor.cc index c2401e6af2..8ca454c70f 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -24,6 +24,7 @@ #include #include #endif +#include "prefetch.hh" #ifdef HAVE_OSV #include @@ -975,16 +976,33 @@ void smp_message_queue::flush_response_batch() { _completed_fifo.clear(); } -size_t smp_message_queue::process_completions() { +template +size_t smp_message_queue::process_queue(lf_queue& q, Func process) { // copy batch to local memory in order to minimize // time in which cross-cpu data is accessed - work_item* items[queue_length]; - auto nr = _completed.pop(items); - for (unsigned i = 0; i < nr; ++i) { - items[i]->complete(); - delete items[i]; - } + work_item* items[queue_length + prefetch_cnt]; + work_item* wi; + if (!q.pop(wi)) + return 0; + // start prefecthing first item before popping the rest to overlap memory + // access with potential cache miss the second pop may cause + prefetch<2>(wi); + auto nr = q.pop(items); + std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nullptr); + unsigned i = 0; + do { + prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt); + process(wi); + wi = items[i++]; + } while(i <= nr); + return nr + 1; +} + +size_t smp_message_queue::process_completions() { + auto nr = process_queue(_completed, [] (work_item* wi) { + wi->complete(); + }); _current_queue_length -= nr; _compl += nr; _last_cmpl_batch = nr; @@ -997,14 +1015,11 @@ void smp_message_queue::flush_request_batch() { } size_t smp_message_queue::process_incoming() { - work_item* items[queue_length]; - auto nr = _pending.pop(items); - for (unsigned i = 0; i < nr; ++i) { - auto wi = items[i]; + auto nr = process_queue(_pending, [this] (work_item* wi) { wi->process().then([this, wi] { respond(wi); }); - } + }); _received += nr; _last_rcv_batch = nr; return nr; diff --git a/core/reactor.hh b/core/reactor.hh index 4163f1deab..25477b97a8 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -385,6 +385,7 @@ private: class smp_message_queue { static constexpr size_t queue_length = 128; static constexpr size_t batch_size = 16; + static constexpr size_t prefetch_cnt = 2; struct work_item; using lf_queue = boost::lockfree::spsc_queue>; @@ -459,6 +460,8 @@ public: return fut; } void start(unsigned cpuid); + template + size_t process_queue(lf_queue& q, Func process); size_t process_incoming(); size_t process_completions(); private: