From 0a3df514cb7d88d4c17697479210d96ab12a95de Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 2 Oct 2014 11:11:41 +0300 Subject: [PATCH] smp: fix completion handling in inter_thread_work_queue Currently completion processing start during object creation, but since all object are created by main thread they all run on the same cpu which is incorrect. This patch starts completion processing on correct cpu. --- core/reactor.cc | 13 ++++++++++--- core/reactor.hh | 4 +++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 1292e6c5ac..7ee97c47c6 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -347,7 +347,6 @@ inter_thread_work_queue::inter_thread_work_queue() , _completed(queue_length) , _start_eventfd(0) , _complete_eventfd(0) { - complete(); } void inter_thread_work_queue::submit_item(inter_thread_work_queue::work_item* item) { @@ -572,6 +571,14 @@ void smp::listen_all(inter_thread_work_queue* qs) } } +void smp::start_all_queues() +{ + for (unsigned c = 0; c < count; c++) { + _qs[c][engine._id].start(); + } + listen_all(_qs[engine._id]); +} + void smp::configure(boost::program_options::variables_map configuration) { smp::count = 1; @@ -593,12 +600,12 @@ void smp::configure(boost::program_options::variables_map configuration) engine._id = i; engine.configure(configuration); engine.when_started().then([i] { - listen_all(_qs[i]); + start_all_queues(); }); engine.run(); }, i); } - listen_all(_qs[0]); + start_all_queues(); } } diff --git a/core/reactor.hh b/core/reactor.hh index 93a62e9791..f8195e252b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -369,6 +369,7 @@ public: submit_item(wi); return fut; } + void start() { complete(); } private: void work(); void complete(); @@ -383,7 +384,7 @@ class thread_pool { std::thread _worker_thread; std::atomic _stopped = { false }; public: - thread_pool() : _worker_thread([this] { work(); }) {} + thread_pool() : _worker_thread([this] { work(); }) { inter_thread_wq.start(); } ~thread_pool(); template future submit(Func func) {return inter_thread_wq.submit(std::move(func));} @@ -496,6 +497,7 @@ public: private: static void listen_all(inter_thread_work_queue* qs); static void listen_one(inter_thread_work_queue& q, std::unique_ptr&& rfd, std::unique_ptr&& wfd); + static void start_all_queues(); public: static unsigned count; };