diff --git a/core/reactor.cc b/core/reactor.cc index 1292e6c5ac..7ee97c47c6 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -347,7 +347,6 @@ inter_thread_work_queue::inter_thread_work_queue() , _completed(queue_length) , _start_eventfd(0) , _complete_eventfd(0) { - complete(); } void inter_thread_work_queue::submit_item(inter_thread_work_queue::work_item* item) { @@ -572,6 +571,14 @@ void smp::listen_all(inter_thread_work_queue* qs) } } +void smp::start_all_queues() +{ + for (unsigned c = 0; c < count; c++) { + _qs[c][engine._id].start(); + } + listen_all(_qs[engine._id]); +} + void smp::configure(boost::program_options::variables_map configuration) { smp::count = 1; @@ -593,12 +600,12 @@ void smp::configure(boost::program_options::variables_map configuration) engine._id = i; engine.configure(configuration); engine.when_started().then([i] { - listen_all(_qs[i]); + start_all_queues(); }); engine.run(); }, i); } - listen_all(_qs[0]); + start_all_queues(); } } diff --git a/core/reactor.hh b/core/reactor.hh index 93a62e9791..f8195e252b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -369,6 +369,7 @@ public: submit_item(wi); return fut; } + void start() { complete(); } private: void work(); void complete(); @@ -383,7 +384,7 @@ class thread_pool { std::thread _worker_thread; std::atomic _stopped = { false }; public: - thread_pool() : _worker_thread([this] { work(); }) {} + thread_pool() : _worker_thread([this] { work(); }) { inter_thread_wq.start(); } ~thread_pool(); template future submit(Func func) {return inter_thread_wq.submit(std::move(func));} @@ -496,6 +497,7 @@ public: private: static void listen_all(inter_thread_work_queue* qs); static void listen_one(inter_thread_work_queue& q, std::unique_ptr&& rfd, std::unique_ptr&& wfd); + static void start_all_queues(); public: static unsigned count; };