smp: fix completion handling in inter_thread_work_queue
Currently completion processing start during object creation, but since all object are created by main thread they all run on the same cpu which is incorrect. This patch starts completion processing on correct cpu.
This commit is contained in:
@@ -347,7 +347,6 @@ inter_thread_work_queue::inter_thread_work_queue()
|
||||
, _completed(queue_length)
|
||||
, _start_eventfd(0)
|
||||
, _complete_eventfd(0) {
|
||||
complete();
|
||||
}
|
||||
|
||||
void inter_thread_work_queue::submit_item(inter_thread_work_queue::work_item* item) {
|
||||
@@ -572,6 +571,14 @@ void smp::listen_all(inter_thread_work_queue* qs)
|
||||
}
|
||||
}
|
||||
|
||||
void smp::start_all_queues()
|
||||
{
|
||||
for (unsigned c = 0; c < count; c++) {
|
||||
_qs[c][engine._id].start();
|
||||
}
|
||||
listen_all(_qs[engine._id]);
|
||||
}
|
||||
|
||||
void smp::configure(boost::program_options::variables_map configuration)
|
||||
{
|
||||
smp::count = 1;
|
||||
@@ -593,12 +600,12 @@ void smp::configure(boost::program_options::variables_map configuration)
|
||||
engine._id = i;
|
||||
engine.configure(configuration);
|
||||
engine.when_started().then([i] {
|
||||
listen_all(_qs[i]);
|
||||
start_all_queues();
|
||||
});
|
||||
engine.run();
|
||||
}, i);
|
||||
}
|
||||
listen_all(_qs[0]);
|
||||
start_all_queues();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -369,6 +369,7 @@ public:
|
||||
submit_item(wi);
|
||||
return fut;
|
||||
}
|
||||
void start() { complete(); }
|
||||
private:
|
||||
void work();
|
||||
void complete();
|
||||
@@ -383,7 +384,7 @@ class thread_pool {
|
||||
std::thread _worker_thread;
|
||||
std::atomic<bool> _stopped = { false };
|
||||
public:
|
||||
thread_pool() : _worker_thread([this] { work(); }) {}
|
||||
thread_pool() : _worker_thread([this] { work(); }) { inter_thread_wq.start(); }
|
||||
~thread_pool();
|
||||
template <typename T, typename Func>
|
||||
future<T> submit(Func func) {return inter_thread_wq.submit<T>(std::move(func));}
|
||||
@@ -496,6 +497,7 @@ public:
|
||||
private:
|
||||
static void listen_all(inter_thread_work_queue* qs);
|
||||
static void listen_one(inter_thread_work_queue& q, std::unique_ptr<readable_eventfd>&& rfd, std::unique_ptr<writeable_eventfd>&& wfd);
|
||||
static void start_all_queues();
|
||||
public:
|
||||
static unsigned count;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user