mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-29 20:57:00 +00:00
core: prefetch different amount of work items for different queues
Incoming item processing usually takes more work then completion item processing. Prefetch more completion items to make sure they are ready before access.
This commit is contained in:
@@ -976,11 +976,11 @@ void smp_message_queue::flush_response_batch() {
|
||||
_completed_fifo.clear();
|
||||
}
|
||||
|
||||
template<typename Func>
|
||||
template<size_t PrefetchCnt, typename Func>
|
||||
size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
|
||||
// copy batch to local memory in order to minimize
|
||||
// time in which cross-cpu data is accessed
|
||||
work_item* items[queue_length + prefetch_cnt];
|
||||
work_item* items[queue_length + PrefetchCnt];
|
||||
work_item* wi;
|
||||
if (!q.pop(wi))
|
||||
return 0;
|
||||
@@ -988,10 +988,10 @@ size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
|
||||
// access with potential cache miss the second pop may cause
|
||||
prefetch<2>(wi);
|
||||
auto nr = q.pop(items);
|
||||
std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nr ? items[nr - 1] : wi);
|
||||
std::fill(std::begin(items) + nr, std::begin(items) + nr + PrefetchCnt, nr ? items[nr - 1] : wi);
|
||||
unsigned i = 0;
|
||||
do {
|
||||
prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt);
|
||||
prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + PrefetchCnt);
|
||||
process(wi);
|
||||
wi = items[i++];
|
||||
} while(i <= nr);
|
||||
@@ -1000,7 +1000,7 @@ size_t smp_message_queue::process_queue(lf_queue& q, Func process) {
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_completions() {
|
||||
auto nr = process_queue(_completed, [] (work_item* wi) {
|
||||
auto nr = process_queue<prefetch_cnt*2>(_completed, [] (work_item* wi) {
|
||||
wi->complete();
|
||||
});
|
||||
_current_queue_length -= nr;
|
||||
@@ -1015,7 +1015,7 @@ void smp_message_queue::flush_request_batch() {
|
||||
}
|
||||
|
||||
size_t smp_message_queue::process_incoming() {
|
||||
auto nr = process_queue(_pending, [this] (work_item* wi) {
|
||||
auto nr = process_queue<prefetch_cnt>(_pending, [this] (work_item* wi) {
|
||||
wi->process().then([this, wi] {
|
||||
respond(wi);
|
||||
});
|
||||
|
||||
@@ -460,7 +460,7 @@ public:
|
||||
return fut;
|
||||
}
|
||||
void start(unsigned cpuid);
|
||||
template<typename Func>
|
||||
template<size_t PrefetchCnt, typename Func>
|
||||
size_t process_queue(lf_queue& q, Func process);
|
||||
size_t process_incoming();
|
||||
size_t process_completions();
|
||||
|
||||
Reference in New Issue
Block a user