mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 16:40:35 +00:00
core: prefetch work items before processing
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
#include <rte_lcore.h>
|
||||
#include <rte_launch.h>
|
||||
#endif
|
||||
#include "prefetch.hh"
|
||||
|
||||
#ifdef HAVE_OSV
|
||||
#include <osv/newpoll.hh>
|
||||
@@ -975,16 +976,33 @@ void smp_message_queue::flush_response_batch() {
|
||||
_completed_fifo.clear();
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_completions() {
|
||||
template<typename Func>
|
||||
size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
|
||||
// copy batch to local memory in order to minimize
|
||||
// time in which cross-cpu data is accessed
|
||||
work_item* items[queue_length];
|
||||
auto nr = _completed.pop(items);
|
||||
for (unsigned i = 0; i < nr; ++i) {
|
||||
items[i]->complete();
|
||||
delete items[i];
|
||||
}
|
||||
work_item* items[queue_length + prefetch_cnt];
|
||||
work_item* wi;
|
||||
if (!q.pop(wi))
|
||||
return 0;
|
||||
// start prefecthing first item before popping the rest to overlap memory
|
||||
// access with potential cache miss the second pop may cause
|
||||
prefetch<2>(wi);
|
||||
auto nr = q.pop(items);
|
||||
std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nullptr);
|
||||
unsigned i = 0;
|
||||
do {
|
||||
prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt);
|
||||
process(wi);
|
||||
wi = items[i++];
|
||||
} while(i <= nr);
|
||||
|
||||
return nr + 1;
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_completions() {
|
||||
auto nr = process_queue(_completed, [] (work_item* wi) {
|
||||
wi->complete();
|
||||
});
|
||||
_current_queue_length -= nr;
|
||||
_compl += nr;
|
||||
_last_cmpl_batch = nr;
|
||||
@@ -997,14 +1015,11 @@ void smp_message_queue::flush_request_batch() {
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_incoming() {
|
||||
work_item* items[queue_length];
|
||||
auto nr = _pending.pop(items);
|
||||
for (unsigned i = 0; i < nr; ++i) {
|
||||
auto wi = items[i];
|
||||
auto nr = process_queue(_pending, [this] (work_item* wi) {
|
||||
wi->process().then([this, wi] {
|
||||
respond(wi);
|
||||
});
|
||||
}
|
||||
});
|
||||
_received += nr;
|
||||
_last_rcv_batch = nr;
|
||||
return nr;
|
||||
|
||||
@@ -385,6 +385,7 @@ private:
|
||||
class smp_message_queue {
|
||||
static constexpr size_t queue_length = 128;
|
||||
static constexpr size_t batch_size = 16;
|
||||
static constexpr size_t prefetch_cnt = 2;
|
||||
struct work_item;
|
||||
using lf_queue = boost::lockfree::spsc_queue<work_item*,
|
||||
boost::lockfree::capacity<queue_length>>;
|
||||
@@ -459,6 +460,8 @@ public:
|
||||
return fut;
|
||||
}
|
||||
void start(unsigned cpuid);
|
||||
template<typename Func>
|
||||
size_t process_queue(lf_queue& q, Func process);
|
||||
size_t process_incoming();
|
||||
size_t process_completions();
|
||||
private:
|
||||
|
||||
Reference in New Issue
Block a user