mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 04:06:59 +00:00
core: thread support
Add a thread class that can be used to launch a blockable thread of execution. Within a thread, future<>::get() can be called on an unavailable future, in which case it blocks until the future is made ready.
This commit is contained in:
@@ -204,6 +204,7 @@ core = [
|
||||
'core/resource.cc',
|
||||
'core/scollectd.cc',
|
||||
'core/app-template.cc',
|
||||
'core/thread.cc',
|
||||
'core/dpdk_rte.cc',
|
||||
'util/conversions.cc',
|
||||
'net/packet.cc',
|
||||
|
||||
@@ -28,6 +28,20 @@
|
||||
#include <type_traits>
|
||||
#include <assert.h>
|
||||
|
||||
namespace seastar {
|
||||
|
||||
class thread_context;
|
||||
|
||||
namespace thread_impl {
|
||||
|
||||
thread_context* get();
|
||||
void switch_in(thread_context* to);
|
||||
void switch_out(thread_context* from);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
template <class... T>
|
||||
class promise;
|
||||
|
||||
@@ -514,9 +528,22 @@ public:
|
||||
}
|
||||
}
|
||||
std::tuple<T...> get() {
|
||||
if (!state()->available()) {
|
||||
wait();
|
||||
}
|
||||
return state()->get();
|
||||
}
|
||||
|
||||
void wait() {
|
||||
auto thread = seastar::thread_impl::get();
|
||||
assert(thread);
|
||||
schedule([this, thread] (future_state<T...>&& new_state) {
|
||||
*state() = std::move(new_state);
|
||||
seastar::thread_impl::switch_in(thread);
|
||||
});
|
||||
seastar::thread_impl::switch_out(thread);
|
||||
}
|
||||
|
||||
bool available() noexcept {
|
||||
return state()->available();
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "scollectd.hh"
|
||||
#include "util/conversions.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "thread.hh"
|
||||
#include <cassert>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
@@ -186,6 +187,7 @@ reactor::reactor()
|
||||
, _io_context_available(max_aio)
|
||||
, _reuseport(posix_reuseport_detect()) {
|
||||
|
||||
seastar::thread_impl::init();
|
||||
auto r = ::io_setup(max_aio, &_io_context);
|
||||
assert(r >= 0);
|
||||
#ifdef HAVE_OSV
|
||||
|
||||
107
core/thread.cc
Normal file
107
core/thread.cc
Normal file
@@ -0,0 +1,107 @@
|
||||
/*
|
||||
* This file is open source software, licensed to you under the terms
|
||||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
||||
* distributed with this work for additional information regarding copyright
|
||||
* ownership. You may not use this file except in compliance with the License.
|
||||
*
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#include "thread.hh"
|
||||
#include "posix.hh"
|
||||
|
||||
namespace seastar {
|
||||
|
||||
thread_local thread_context* g_current_thread;
|
||||
thread_local ucontext_t g_unthreaded_context;
|
||||
thread_local ucontext_t* g_current_context;
|
||||
|
||||
thread_context::thread_context(std::function<void ()> func)
|
||||
: _func(std::move(func)) {
|
||||
setup();
|
||||
}
|
||||
|
||||
void
|
||||
thread_context::setup() {
|
||||
auto q = uint64_t(reinterpret_cast<uintptr_t>(this));
|
||||
auto main = reinterpret_cast<void (*)()>(&thread_context::s_main);
|
||||
auto r = getcontext(&_context);
|
||||
throw_system_error_on(r == -1);
|
||||
_context.uc_stack.ss_sp = _stack.get();
|
||||
_context.uc_stack.ss_size = _stack_size;
|
||||
_context.uc_link = &g_unthreaded_context;
|
||||
makecontext(&_context, main, 2, int(q), int(q >> 32));
|
||||
}
|
||||
|
||||
void
|
||||
thread_context::switch_in() {
|
||||
// FIXME: use setjmp/longjmp after initial_switch_in, much faster
|
||||
auto prev = g_current_context;
|
||||
g_current_context = &_context;
|
||||
g_current_thread = this;
|
||||
swapcontext(prev, &_context);
|
||||
}
|
||||
|
||||
void
|
||||
thread_context::switch_out() {
|
||||
g_current_context = &g_unthreaded_context;
|
||||
g_current_thread = nullptr;
|
||||
swapcontext(&_context, &g_unthreaded_context);
|
||||
}
|
||||
|
||||
void
|
||||
thread_context::s_main(unsigned int lo, unsigned int hi) {
|
||||
uintptr_t q = lo | (uint64_t(hi) << 32);
|
||||
reinterpret_cast<thread_context*>(q)->main();
|
||||
}
|
||||
|
||||
void
|
||||
thread_context::main() {
|
||||
try {
|
||||
_func();
|
||||
_done.set_value();
|
||||
} catch (...) {
|
||||
_done.set_exception(std::current_exception());
|
||||
}
|
||||
g_current_context = &g_unthreaded_context;
|
||||
g_current_thread = nullptr;
|
||||
// returning here auto-switches to the "next context" link
|
||||
}
|
||||
|
||||
namespace thread_impl {
|
||||
|
||||
thread_context* get() {
|
||||
return g_current_thread;
|
||||
}
|
||||
|
||||
void switch_in(thread_context* to) {
|
||||
to->switch_in();
|
||||
}
|
||||
|
||||
void switch_out(thread_context* from) {
|
||||
from->switch_out();
|
||||
}
|
||||
|
||||
void init() {
|
||||
auto r = getcontext(&g_unthreaded_context);
|
||||
throw_system_error_on(r == -1);
|
||||
g_current_context = &g_unthreaded_context;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
116
core/thread.hh
Normal file
116
core/thread.hh
Normal file
@@ -0,0 +1,116 @@
|
||||
/*
|
||||
* This file is open source software, licensed to you under the terms
|
||||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
||||
* distributed with this work for additional information regarding copyright
|
||||
* ownership. You may not use this file except in compliance with the License.
|
||||
*
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "future.hh"
|
||||
#include <memory>
|
||||
#include <ucontext.h>
|
||||
|
||||
namespace seastar {
|
||||
|
||||
class thread;
|
||||
class thread_context;
|
||||
|
||||
namespace thread_impl {
|
||||
|
||||
thread_context* get();
|
||||
void switch_in(thread_context* to);
|
||||
void switch_out(thread_context* from);
|
||||
void init();
|
||||
|
||||
}
|
||||
|
||||
extern thread_local ucontext_t g_unthreaded_context;
|
||||
|
||||
// Internal class holding thread state. We can't hold this in
|
||||
// \c thread itself because \c thread is movable, and we want pointers
|
||||
// to this state to be captured.
|
||||
class thread_context {
|
||||
static constexpr const size_t _stack_size = 128*1024;
|
||||
std::unique_ptr<char[]> _stack{new char[_stack_size]};
|
||||
std::function<void ()> _func;
|
||||
ucontext_t _context;
|
||||
promise<> _done;
|
||||
bool _joined = false;
|
||||
private:
|
||||
static void s_main(unsigned int lo, unsigned int hi);
|
||||
void setup();
|
||||
void main();
|
||||
public:
|
||||
thread_context(std::function<void ()> func);
|
||||
void switch_in();
|
||||
void switch_out();
|
||||
friend class thread;
|
||||
friend void thread_impl::switch_in(thread_context*);
|
||||
friend void thread_impl::switch_out(thread_context*);
|
||||
};
|
||||
|
||||
/// \brief thread - stateful thread of execution
|
||||
///
|
||||
/// Threads allow using seastar APIs in a blocking manner,
|
||||
/// by calling future::get() on a non-ready future. When
|
||||
/// this happens, the thread is put to sleep until the future
|
||||
/// becomes ready.
|
||||
class thread {
|
||||
std::unique_ptr<thread_context> _context;
|
||||
static thread_local thread* _current;
|
||||
public:
|
||||
/// \brief Constructs a \c thread object that does not represent a thread
|
||||
/// of execution.
|
||||
thread() = default;
|
||||
/// \brief Constructs a \c thread object that represents a thread of execution
|
||||
///
|
||||
/// \param func Callable object to execute in thread. The callable is
|
||||
/// called immediately.
|
||||
template <typename Func>
|
||||
thread(Func func);
|
||||
/// \brief Moves a thread object.
|
||||
thread(thread&& x) noexcept = default;
|
||||
/// \brief Move-assigns a thread object.
|
||||
thread& operator=(thread&& x) noexcept = default;
|
||||
/// \brief Destroys a \c thread object.
|
||||
///
|
||||
/// The thread must not represent a running thread of execution (see join()).
|
||||
~thread() { assert(!_context || _context->_joined); }
|
||||
/// \brief Waits for thread execution to terminate.
|
||||
///
|
||||
/// Waits for thread execution to terminate, and marks the thread object as not
|
||||
/// representing a running thread of execution.
|
||||
future<> join();
|
||||
};
|
||||
|
||||
template <typename Func>
|
||||
inline
|
||||
thread::thread(Func func)
|
||||
: _context(std::make_unique<thread_context>(func)) {
|
||||
_context->switch_in();
|
||||
}
|
||||
|
||||
inline
|
||||
future<>
|
||||
thread::join() {
|
||||
_context->_joined = true;
|
||||
return _context->_done.get_future();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user