Separate inter_thread_work_queue from thread_pool

Currently thread_pool implements cross-thread communication channel
 internally. Separate communication logic into its own class to
 reuse it for smp communication in later patches.
This commit is contained in:
Gleb Natapov
2014-09-24 12:29:14 +03:00
committed by Avi Kivity
parent 236418d262
commit cfbfce3d6d
2 changed files with 45 additions and 32 deletions

View File

@@ -323,19 +323,30 @@ reactor::signal_handler::signal_handler(int signo)
throw_system_error_on(r == -1);
}
thread_pool::thread_pool()
inter_thread_work_queue::inter_thread_work_queue()
: _pending(queue_length)
, _completed(queue_length)
, _start_eventfd(0)
, _complete_eventfd(0)
, _worker_thread([this] { work(); }) {
, _complete_eventfd(0) {
complete();
}
thread_pool::~thread_pool() {
_stopped.store(true, std::memory_order_relaxed);
_start_eventfd.signal(1);
_worker_thread.join();
void inter_thread_work_queue::submit_item(inter_thread_work_queue::work_item* item) {
_queue_has_room.wait().then([this, item] {
_pending.push(item);
_start_eventfd.signal(1);
});
}
void inter_thread_work_queue::complete() {
_complete_eventfd.wait().then([this] (size_t count) {
auto nr = _completed.consume_all([this] (work_item* wi) {
wi->complete();
delete wi;
});
_queue_has_room.signal(nr);
complete();
});
}
void thread_pool::work() {
@@ -345,37 +356,25 @@ void thread_pool::work() {
throw_system_error_on(r == -1);
while (true) {
uint64_t count;
auto r = ::read(_start_eventfd.get_read_fd(), &count, sizeof(count));
auto r = ::read(inter_thread_wq._start_eventfd.get_read_fd(), &count, sizeof(count));
assert(r == sizeof(count));
if (_stopped.load(std::memory_order_relaxed)) {
break;
}
auto nr = _pending.consume_all([this] (work_item* wi) {
auto nr = inter_thread_wq._pending.consume_all([this] (inter_thread_work_queue::work_item* wi) {
wi->process();
_completed.push(wi);
inter_thread_wq._completed.push(wi);
});
count = nr;
r = ::write(_complete_eventfd.get_write_fd(), &count, sizeof(count));
r = ::write(inter_thread_wq._complete_eventfd.get_write_fd(), &count, sizeof(count));
assert(r == sizeof(count));
}
}
void thread_pool::submit_item(thread_pool::work_item* item) {
_queue_has_room.wait().then([this, item] {
_pending.push(item);
_start_eventfd.signal(1);
});
}
void thread_pool::complete() {
_complete_eventfd.wait().then([this] (size_t count) {
auto nr = _completed.consume_all([this] (work_item* wi) {
wi->complete();
delete wi;
});
_queue_has_room.signal(nr);
complete();
});
thread_pool::~thread_pool() {
_stopped.store(true, std::memory_order_relaxed);
inter_thread_wq._start_eventfd.signal(1);
_worker_thread.join();
}
file_desc writeable_eventfd::try_create_eventfd(size_t initial) {

View File

@@ -300,16 +300,16 @@ private:
static file_desc try_create_eventfd(size_t initial);
};
class thread_pool {
class thread_pool;
class inter_thread_work_queue {
static constexpr size_t queue_length = 128;
struct work_item;
std::atomic<bool> _stopped = { false };
boost::lockfree::queue<work_item*> _pending;
boost::lockfree::queue<work_item*> _completed;
writeable_eventfd _start_eventfd;
readable_eventfd _complete_eventfd;
semaphore _queue_has_room = { queue_length };
std::thread _worker_thread;
struct work_item {
virtual ~work_item() {}
virtual void process() = 0;
@@ -326,8 +326,7 @@ class thread_pool {
future<T> get_future() { return _promise.get_future(); }
};
public:
thread_pool();
~thread_pool();
inter_thread_work_queue();
template <typename T, typename Func>
future<T> submit(Func func) {
auto wi = new work_item_returning<T, Func>(std::move(func));
@@ -339,6 +338,21 @@ private:
void work();
void complete();
void submit_item(work_item* wi);
friend class thread_pool;
};
class thread_pool {
inter_thread_work_queue inter_thread_wq;
std::thread _worker_thread;
std::atomic<bool> _stopped = { false };
public:
thread_pool() : _worker_thread([this] { work(); }) {}
~thread_pool();
template <typename T, typename Func>
future<T> submit(Func func) {return inter_thread_wq.submit<T>(std::move(func));}
private:
void work();
};
class reactor {