diff --git a/configure.py b/configure.py index ccae7c4c53..0a2271f946 100755 --- a/configure.py +++ b/configure.py @@ -204,6 +204,7 @@ core = [ 'core/resource.cc', 'core/scollectd.cc', 'core/app-template.cc', + 'core/thread.cc', 'core/dpdk_rte.cc', 'util/conversions.cc', 'net/packet.cc', diff --git a/core/future.hh b/core/future.hh index 9f1ebb3de2..6794530374 100644 --- a/core/future.hh +++ b/core/future.hh @@ -28,6 +28,20 @@ #include #include +namespace seastar { + +class thread_context; + +namespace thread_impl { + +thread_context* get(); +void switch_in(thread_context* to); +void switch_out(thread_context* from); + +} + +} + template class promise; @@ -514,9 +528,22 @@ public: } } std::tuple get() { + if (!state()->available()) { + wait(); + } return state()->get(); } + void wait() { + auto thread = seastar::thread_impl::get(); + assert(thread); + schedule([this, thread] (future_state&& new_state) { + *state() = std::move(new_state); + seastar::thread_impl::switch_in(thread); + }); + seastar::thread_impl::switch_out(thread); + } + bool available() noexcept { return state()->available(); } diff --git a/core/reactor.cc b/core/reactor.cc index f49f29fef0..dea25becf7 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -29,6 +29,7 @@ #include "scollectd.hh" #include "util/conversions.hh" #include "core/future-util.hh" +#include "thread.hh" #include #include #include @@ -186,6 +187,7 @@ reactor::reactor() , _io_context_available(max_aio) , _reuseport(posix_reuseport_detect()) { + seastar::thread_impl::init(); auto r = ::io_setup(max_aio, &_io_context); assert(r >= 0); #ifdef HAVE_OSV diff --git a/core/thread.cc b/core/thread.cc new file mode 100644 index 0000000000..dd74a6d6cb --- /dev/null +++ b/core/thread.cc @@ -0,0 +1,107 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "thread.hh" +#include "posix.hh" + +namespace seastar { + +thread_local thread_context* g_current_thread; +thread_local ucontext_t g_unthreaded_context; +thread_local ucontext_t* g_current_context; + +thread_context::thread_context(std::function func) + : _func(std::move(func)) { + setup(); +} + +void +thread_context::setup() { + auto q = uint64_t(reinterpret_cast(this)); + auto main = reinterpret_cast(&thread_context::s_main); + auto r = getcontext(&_context); + throw_system_error_on(r == -1); + _context.uc_stack.ss_sp = _stack.get(); + _context.uc_stack.ss_size = _stack_size; + _context.uc_link = &g_unthreaded_context; + makecontext(&_context, main, 2, int(q), int(q >> 32)); +} + +void +thread_context::switch_in() { + // FIXME: use setjmp/longjmp after initial_switch_in, much faster + auto prev = g_current_context; + g_current_context = &_context; + g_current_thread = this; + swapcontext(prev, &_context); +} + +void +thread_context::switch_out() { + g_current_context = &g_unthreaded_context; + g_current_thread = nullptr; + swapcontext(&_context, &g_unthreaded_context); +} + +void +thread_context::s_main(unsigned int lo, unsigned int hi) { + uintptr_t q = lo | (uint64_t(hi) << 32); + reinterpret_cast(q)->main(); +} + +void +thread_context::main() { + try { + _func(); + _done.set_value(); + } catch (...) { + _done.set_exception(std::current_exception()); + } + g_current_context = &g_unthreaded_context; + g_current_thread = nullptr; + // returning here auto-switches to the "next context" link +} + +namespace thread_impl { + +thread_context* get() { + return g_current_thread; +} + +void switch_in(thread_context* to) { + to->switch_in(); +} + +void switch_out(thread_context* from) { + from->switch_out(); +} + +void init() { + auto r = getcontext(&g_unthreaded_context); + throw_system_error_on(r == -1); + g_current_context = &g_unthreaded_context; +} + +} + + +} diff --git a/core/thread.hh b/core/thread.hh new file mode 100644 index 0000000000..e27893425f --- /dev/null +++ b/core/thread.hh @@ -0,0 +1,116 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include "future.hh" +#include +#include + +namespace seastar { + +class thread; +class thread_context; + +namespace thread_impl { + +thread_context* get(); +void switch_in(thread_context* to); +void switch_out(thread_context* from); +void init(); + +} + +extern thread_local ucontext_t g_unthreaded_context; + +// Internal class holding thread state. We can't hold this in +// \c thread itself because \c thread is movable, and we want pointers +// to this state to be captured. +class thread_context { + static constexpr const size_t _stack_size = 128*1024; + std::unique_ptr _stack{new char[_stack_size]}; + std::function _func; + ucontext_t _context; + promise<> _done; + bool _joined = false; +private: + static void s_main(unsigned int lo, unsigned int hi); + void setup(); + void main(); +public: + thread_context(std::function func); + void switch_in(); + void switch_out(); + friend class thread; + friend void thread_impl::switch_in(thread_context*); + friend void thread_impl::switch_out(thread_context*); +}; + +/// \brief thread - stateful thread of execution +/// +/// Threads allow using seastar APIs in a blocking manner, +/// by calling future::get() on a non-ready future. When +/// this happens, the thread is put to sleep until the future +/// becomes ready. +class thread { + std::unique_ptr _context; + static thread_local thread* _current; +public: + /// \brief Constructs a \c thread object that does not represent a thread + /// of execution. + thread() = default; + /// \brief Constructs a \c thread object that represents a thread of execution + /// + /// \param func Callable object to execute in thread. The callable is + /// called immediately. + template + thread(Func func); + /// \brief Moves a thread object. + thread(thread&& x) noexcept = default; + /// \brief Move-assigns a thread object. + thread& operator=(thread&& x) noexcept = default; + /// \brief Destroys a \c thread object. + /// + /// The thread must not represent a running thread of execution (see join()). + ~thread() { assert(!_context || _context->_joined); } + /// \brief Waits for thread execution to terminate. + /// + /// Waits for thread execution to terminate, and marks the thread object as not + /// representing a running thread of execution. + future<> join(); +}; + +template +inline +thread::thread(Func func) + : _context(std::make_unique(func)) { + _context->switch_in(); +} + +inline +future<> +thread::join() { + _context->_joined = true; + return _context->_done.get_future(); +} + +}