core: prefetch work items before processing

This commit is contained in:
Gleb Natapov
2015-01-25 14:35:28 +02:00
committed by Avi Kivity
parent b9554219dc
commit ff4aca2ee0
2 changed files with 30 additions and 12 deletions

View File

@@ -24,6 +24,7 @@
#include <rte_lcore.h>
#include <rte_launch.h>
#endif
#include "prefetch.hh"
#ifdef HAVE_OSV
#include <osv/newpoll.hh>
@@ -975,16 +976,33 @@ void smp_message_queue::flush_response_batch() {
_completed_fifo.clear();
}
size_t smp_message_queue::process_completions() {
template<typename Func>
size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
// copy batch to local memory in order to minimize
// time in which cross-cpu data is accessed
work_item* items[queue_length];
auto nr = _completed.pop(items);
for (unsigned i = 0; i < nr; ++i) {
items[i]->complete();
delete items[i];
}
work_item* items[queue_length + prefetch_cnt];
work_item* wi;
if (!q.pop(wi))
return 0;
// start prefecthing first item before popping the rest to overlap memory
// access with potential cache miss the second pop may cause
prefetch<2>(wi);
auto nr = q.pop(items);
std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nullptr);
unsigned i = 0;
do {
prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt);
process(wi);
wi = items[i++];
} while(i <= nr);
return nr + 1;
}
size_t smp_message_queue::process_completions() {
auto nr = process_queue(_completed, [] (work_item* wi) {
wi->complete();
});
_current_queue_length -= nr;
_compl += nr;
_last_cmpl_batch = nr;
@@ -997,14 +1015,11 @@ void smp_message_queue::flush_request_batch() {
}
size_t smp_message_queue::process_incoming() {
work_item* items[queue_length];
auto nr = _pending.pop(items);
for (unsigned i = 0; i < nr; ++i) {
auto wi = items[i];
auto nr = process_queue(_pending, [this] (work_item* wi) {
wi->process().then([this, wi] {
respond(wi);
});
}
});
_received += nr;
_last_rcv_batch = nr;
return nr;

View File

@@ -385,6 +385,7 @@ private:
class smp_message_queue {
static constexpr size_t queue_length = 128;
static constexpr size_t batch_size = 16;
static constexpr size_t prefetch_cnt = 2;
struct work_item;
using lf_queue = boost::lockfree::spsc_queue<work_item*,
boost::lockfree::capacity<queue_length>>;
@@ -459,6 +460,8 @@ public:
return fut;
}
void start(unsigned cpuid);
template<typename Func>
size_t process_queue(lf_queue& q, Func process);
size_t process_incoming();
size_t process_completions();
private: