mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
Merge seastar upstream
This commit is contained in:
@@ -142,7 +142,7 @@ modes = {
|
||||
'debug': {
|
||||
'sanitize': '-fsanitize=address -fsanitize=leak -fsanitize=undefined',
|
||||
'sanitize_libs': '-lubsan -lasan',
|
||||
'opt': '-O0 -DDEBUG -DDEFAULT_ALLOCATOR',
|
||||
'opt': '-O0 -DDEBUG -DDEBUG_SHARED_PTR -DDEFAULT_ALLOCATOR',
|
||||
'libs': '',
|
||||
},
|
||||
'release': {
|
||||
|
||||
@@ -25,20 +25,35 @@
|
||||
#ifndef CORE_FUTURE_UTIL_HH_
|
||||
#define CORE_FUTURE_UTIL_HH_
|
||||
|
||||
#include "task.hh"
|
||||
#include "future.hh"
|
||||
#include "shared_ptr.hh"
|
||||
#include <tuple>
|
||||
#include <iterator>
|
||||
#include <vector>
|
||||
|
||||
/// \cond internal
|
||||
extern __thread size_t task_quota;
|
||||
/// \endcond
|
||||
|
||||
// parallel_for_each - run tasks in parallel
|
||||
//
|
||||
// Given a range [@begin, @end) of objects, run func(*i) for each i in
|
||||
// the range, and return a future<> that resolves when all the functions
|
||||
// complete. @func should return a future<> that indicates when it is
|
||||
// complete.
|
||||
|
||||
/// \addtogroup future-util
|
||||
/// @{
|
||||
|
||||
/// Run tasks in parallel (iterator version).
|
||||
///
|
||||
/// Given a range [\c begin, \c end) of objects, run \c func on each \c *i in
|
||||
/// the range, and return a future<> that resolves when all the functions
|
||||
/// complete. \c func should return a future<> that indicates when it is
|
||||
/// complete. All invocations are performed in parallel.
|
||||
///
|
||||
/// \param begin an \c InputIterator designating the beginning of the range
|
||||
/// \param end an \c InputIterator designating the end of the range
|
||||
/// \param func Function to apply to each element in the range (returning
|
||||
/// a \c future<>)
|
||||
/// \return a \c future<> that resolves when all the function invocations
|
||||
/// complete. If one or more return an exception, the return value
|
||||
/// contains one of the exceptions.
|
||||
template <typename Iterator, typename Func>
|
||||
inline
|
||||
future<>
|
||||
@@ -53,12 +68,12 @@ parallel_for_each(Iterator begin, Iterator end, Func&& func) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// \brief parallel_for_each - run tasks in parallel
|
||||
/// Run tasks in parallel (range version).
|
||||
///
|
||||
/// Given a range [\c begin, \c end) of objects, run \c func(object) for
|
||||
/// each object in the range, and return a future<> that resolves when all
|
||||
/// the functions complete. @func should return a future<> that indicates
|
||||
/// when it is complete.
|
||||
/// Given a \c range of objects, apply \c func to each object
|
||||
/// in the range, and return a future<> that resolves when all
|
||||
/// the functions complete. \c func should return a future<> that indicates
|
||||
/// when it is complete. All invocations are performed in parallel.
|
||||
///
|
||||
/// \param range A range of objects to iterate run \c func on
|
||||
/// \param func A callable, accepting reference to the range's
|
||||
@@ -79,6 +94,7 @@ parallel_for_each(Range&& range, Func&& func) {
|
||||
// the actual function invocation. It is represented by a function which
|
||||
// returns a future which resolves when the action is done.
|
||||
|
||||
/// \cond internal
|
||||
template<typename AsyncAction, typename StopCondition>
|
||||
static inline
|
||||
void do_until_continued(StopCondition&& stop_cond, AsyncAction&& action, promise<> p) {
|
||||
@@ -110,8 +126,18 @@ void do_until_continued(StopCondition&& stop_cond, AsyncAction&& action, promise
|
||||
|
||||
p.set_value();
|
||||
}
|
||||
/// \endcond
|
||||
|
||||
// Invokes given action until it fails or given condition evaluates to true.
|
||||
/// Invokes given action until it fails or given condition evaluates to true.
|
||||
///
|
||||
/// \param stop_cond a callable taking no arguments, returning a boolean that
|
||||
/// evalutes to true when you don't want to call \c action
|
||||
/// any longer
|
||||
/// \param action a callable taking no arguments, returning a future<>. Will
|
||||
/// be called again as soon as the future resolves, unless the
|
||||
/// future fails, or \c stop_cond returns \c true.
|
||||
/// \return a ready future if we stopped successfully, or a failed future if
|
||||
/// a call to to \c action failed.
|
||||
template<typename AsyncAction, typename StopCondition>
|
||||
static inline
|
||||
future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) {
|
||||
@@ -122,7 +148,13 @@ future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) {
|
||||
return f;
|
||||
}
|
||||
|
||||
// Invoke given action until it fails.
|
||||
/// Invoke given action until it fails.
|
||||
///
|
||||
/// Calls \c action repeatedly until it returns a failed future.
|
||||
///
|
||||
/// \param action a callable taking no arguments, returning a \c future<>
|
||||
/// that becomes ready when you wish it to be called again.
|
||||
/// \return a future<> that will resolve to the first failure of \c action
|
||||
template<typename AsyncAction>
|
||||
static inline
|
||||
future<> keep_doing(AsyncAction&& action) {
|
||||
@@ -150,6 +182,18 @@ future<> keep_doing(AsyncAction&& action) {
|
||||
return f;
|
||||
}
|
||||
|
||||
/// Call a function for each item in a range, sequentially (iterator version).
|
||||
///
|
||||
/// For each item in a range, call a function, waiting for the previous
|
||||
/// invocation to complete before calling the next one.
|
||||
///
|
||||
/// \param begin an \c InputIterator designating the beginning of the range
|
||||
/// \param end an \c InputIterator designating the endof the range
|
||||
/// \param action a callable, taking a reference to objects from the range
|
||||
/// as a parameter, and returning a \c future<> that resolves
|
||||
/// when it is acceptable to process the next item.
|
||||
/// \return a ready future on success, or the first failed future if
|
||||
/// \c action failed.
|
||||
template<typename Iterator, typename AsyncAction>
|
||||
static inline
|
||||
future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
|
||||
@@ -173,12 +217,24 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Call a function for each item in a range, sequentially (range version).
|
||||
///
|
||||
/// For each item in a range, call a function, waiting for the previous
|
||||
/// invocation to complete before calling the next one.
|
||||
///
|
||||
/// \param range an \c Range object designating input values
|
||||
/// \param action a callable, taking a reference to objects from the range
|
||||
/// as a parameter, and returning a \c future<> that resolves
|
||||
/// when it is acceptable to process the next item.
|
||||
/// \return a ready future on success, or the first failed future if
|
||||
/// \c action failed.
|
||||
template<typename Container, typename AsyncAction>
|
||||
static inline
|
||||
future<> do_for_each(Container& c, AsyncAction&& action) {
|
||||
return do_for_each(std::begin(c), std::end(c), std::forward<AsyncAction>(action));
|
||||
}
|
||||
|
||||
/// \cond internal
|
||||
inline
|
||||
future<std::tuple<>>
|
||||
when_all() {
|
||||
@@ -194,7 +250,18 @@ struct do_when_all {
|
||||
return when_all(std::move(fut)...);
|
||||
}
|
||||
};
|
||||
/// \endcond
|
||||
|
||||
/// Wait for many futures to complete, capturing possible errors (variadic version).
|
||||
///
|
||||
/// Given a variable number of futures as input, wait for all of them
|
||||
/// to resolve (either successfully or with an exception), and return
|
||||
/// them as a tuple so individual values or exceptions can be examined.
|
||||
///
|
||||
/// \param fut the first future to wait for
|
||||
/// \param rest more futures to wait for
|
||||
/// \return an \c std::tuple<> of all the futures in the input; when
|
||||
/// ready, all contained futures will be ready as well.
|
||||
template <typename... FutureArgs, typename... Rest>
|
||||
inline
|
||||
future<std::tuple<future<FutureArgs...>, Rest...>>
|
||||
@@ -210,6 +277,7 @@ when_all(future<FutureArgs...>&& fut, Rest&&... rest) {
|
||||
});
|
||||
}
|
||||
|
||||
/// \cond internal
|
||||
template <typename Iterator, typename IteratorCategory>
|
||||
inline
|
||||
size_t
|
||||
@@ -245,11 +313,18 @@ complete_when_all(std::vector<Future>&& futures, typename std::vector<Future>::i
|
||||
return complete_when_all(std::move(futures), pos);
|
||||
});
|
||||
}
|
||||
/// \endcond
|
||||
|
||||
// Given a range of futures (denoted by an iterator pair), wait for
|
||||
// all of them to be available, and return a future containing a
|
||||
// vector of each input future (in available state, with either a
|
||||
// value or exception stored).
|
||||
/// Wait for many futures to complete, capturing possible errors (iterator version).
|
||||
///
|
||||
/// Given a range of futures as input, wait for all of them
|
||||
/// to resolve (either successfully or with an exception), and return
|
||||
/// them as a \c std::vector so individual values or exceptions can be examined.
|
||||
///
|
||||
/// \param begin an \c InputIterator designating the beginning of the range of futures
|
||||
/// \param end an \c InputIterator designating the end of the range of futures
|
||||
/// \return an \c std::vector<> of all the futures in the input; when
|
||||
/// ready, all contained futures will be ready as well.
|
||||
template <typename FutureIterator>
|
||||
inline
|
||||
future<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>
|
||||
@@ -364,4 +439,6 @@ future<> now() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
/// @}
|
||||
|
||||
#endif /* CORE_FUTURE_UTIL_HH_ */
|
||||
|
||||
252
core/future.hh
252
core/future.hh
@@ -16,18 +16,50 @@
|
||||
* under the License.
|
||||
*/
|
||||
/*
|
||||
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#ifndef FUTURE_HH_
|
||||
#define FUTURE_HH_
|
||||
|
||||
#include "apply.hh"
|
||||
#include "task.hh"
|
||||
#include <stdexcept>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <assert.h>
|
||||
|
||||
|
||||
/// \defgroup future-module Futures and Promises
|
||||
///
|
||||
/// \brief
|
||||
/// Futures and promises are the basic tools for asynchronous
|
||||
/// programming in seastar. A future represents a result that
|
||||
/// may not have been computed yet, for example a buffer that
|
||||
/// is being read from the disk, or the result of a function
|
||||
/// that is executed on another cpu. A promise object allows
|
||||
/// the future to be eventually resolved by assigning it a value.
|
||||
///
|
||||
/// \brief
|
||||
/// Another way to look at futures and promises are as the reader
|
||||
/// and writer sides, respectively, of a single-item, single use
|
||||
/// queue. You read from the future, and write to the promise,
|
||||
/// and the system takes care that it works no matter what the
|
||||
/// order of operations is.
|
||||
///
|
||||
/// \brief
|
||||
/// The normal way of working with futures is to chain continuations
|
||||
/// to them. A continuation is a block of code (usually a lamdba)
|
||||
/// that is called when the future is assigned a value (the future
|
||||
/// is resolved); the continuation can then access the actual value.
|
||||
///
|
||||
|
||||
/// \defgroup future-util Future Utilities
|
||||
///
|
||||
/// \brief
|
||||
/// These utilities are provided to help perform operations on futures.
|
||||
|
||||
|
||||
namespace seastar {
|
||||
|
||||
class thread_context;
|
||||
@@ -42,44 +74,39 @@ void switch_out(thread_context* from);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/// \addtogroup future-module
|
||||
/// @{
|
||||
|
||||
template <class... T>
|
||||
class promise;
|
||||
|
||||
template <class... T>
|
||||
class future;
|
||||
|
||||
/// \brief Creates a \ref future in an available, value state.
|
||||
///
|
||||
/// Creates a \ref future object that is already resolved. This
|
||||
/// is useful when it is determined that no I/O needs to be performed
|
||||
/// to perform a computation (for example, because the data is cached
|
||||
/// in some buffer).
|
||||
template <typename... T, typename... A>
|
||||
future<T...> make_ready_future(A&&... value);
|
||||
|
||||
/// \brief Creates a \ref future in an available, failed state.
|
||||
///
|
||||
/// Creates a \ref future object that is already resolved in a failed
|
||||
/// state. This is useful when no I/O needs to be performed to perform
|
||||
/// a computation (for example, because the connection is closed and
|
||||
/// we cannot read from it).
|
||||
template <typename... T>
|
||||
future<T...> make_exception_future(std::exception_ptr value) noexcept;
|
||||
|
||||
class task {
|
||||
public:
|
||||
virtual ~task() noexcept {}
|
||||
virtual void run() noexcept = 0;
|
||||
};
|
||||
|
||||
void schedule(std::unique_ptr<task> t);
|
||||
/// \cond internal
|
||||
void engine_exit(std::exception_ptr eptr = {});
|
||||
|
||||
template <typename Func>
|
||||
class lambda_task final : public task {
|
||||
Func _func;
|
||||
public:
|
||||
lambda_task(const Func& func) : _func(func) {}
|
||||
lambda_task(Func&& func) : _func(std::move(func)) {}
|
||||
virtual void run() noexcept override { _func(); }
|
||||
};
|
||||
|
||||
template <typename Func>
|
||||
inline
|
||||
std::unique_ptr<task>
|
||||
make_task(Func&& func) {
|
||||
return std::make_unique<lambda_task<Func>>(std::forward<Func>(func));
|
||||
}
|
||||
|
||||
void report_failed_future(std::exception_ptr ex);
|
||||
/// \endcond
|
||||
|
||||
//
|
||||
// A future/promise pair maintain one logical value (a future_state).
|
||||
@@ -99,6 +126,7 @@ void report_failed_future(std::exception_ptr ex);
|
||||
// called) or due to the promise or future being mobved around.
|
||||
//
|
||||
|
||||
/// \cond internal
|
||||
template <typename... T>
|
||||
struct future_state {
|
||||
static constexpr bool move_noexcept = std::is_nothrow_move_constructible<std::tuple<T...>>::value;
|
||||
@@ -299,7 +327,11 @@ struct continuation final : task {
|
||||
future_state<T...> _state;
|
||||
Func _func;
|
||||
};
|
||||
/// \endcond
|
||||
|
||||
/// \brief promise - allows a future value to be made available at a later time.
|
||||
///
|
||||
///
|
||||
template <typename... T>
|
||||
class promise {
|
||||
future<T...>* _future = nullptr;
|
||||
@@ -309,7 +341,12 @@ class promise {
|
||||
static constexpr bool move_noexcept = future_state<T...>::move_noexcept;
|
||||
static constexpr bool copy_noexcept = future_state<T...>::copy_noexcept;
|
||||
public:
|
||||
/// \brief Constructs an empty \c promise.
|
||||
///
|
||||
/// Creates promise with no associated future yet (see get_future()).
|
||||
promise() noexcept : _state(&_local_state) {}
|
||||
|
||||
/// \brief Moves a \c promise object.
|
||||
promise(promise&& x) noexcept(move_noexcept) : _future(x._future), _state(x._state), _task(std::move(x._task)) {
|
||||
if (_state == &x._local_state) {
|
||||
_state = &_local_state;
|
||||
@@ -332,24 +369,56 @@ public:
|
||||
return *this;
|
||||
}
|
||||
void operator=(const promise&) = delete;
|
||||
|
||||
/// \brief Gets the promise's associated future.
|
||||
///
|
||||
/// The future and promise will be remember each other, even if either or
|
||||
/// both are moved. When \c set_value() or \c set_exception() are called
|
||||
/// on the promise, the future will be become ready, and if a continuation
|
||||
/// was attached to the future, it will run.
|
||||
future<T...> get_future() noexcept;
|
||||
|
||||
/// \brief Sets the promise's value (as tuple; by copying)
|
||||
///
|
||||
/// Copies the tuple argument and makes it available to the associated
|
||||
/// future. May be called either before or after \c get_future().
|
||||
void set_value(const std::tuple<T...>& result) noexcept(copy_noexcept) {
|
||||
_state->set(result);
|
||||
make_ready();
|
||||
}
|
||||
|
||||
/// \brief Sets the promises value (as tuple; by moving)
|
||||
///
|
||||
/// Moves the tuple argument and makes it available to the associated
|
||||
/// future. May be called either before or after \c get_future().
|
||||
void set_value(std::tuple<T...>&& result) noexcept(move_noexcept) {
|
||||
_state->set(std::move(result));
|
||||
make_ready();
|
||||
}
|
||||
|
||||
/// \brief Sets the promises value (variadic)
|
||||
///
|
||||
/// Forwards the arguments and makes them available to the associated
|
||||
/// future. May be called either before or after \c get_future().
|
||||
template <typename... A>
|
||||
void set_value(A&&... a) noexcept {
|
||||
_state->set(std::forward<A>(a)...);
|
||||
make_ready();
|
||||
}
|
||||
|
||||
/// \brief Marks the promise as failed
|
||||
///
|
||||
/// Forwards the exception argument to the future and makes it
|
||||
/// available. May be called either before or after \c get_future().
|
||||
void set_exception(std::exception_ptr ex) noexcept {
|
||||
_state->set_exception(std::move(ex));
|
||||
make_ready();
|
||||
}
|
||||
|
||||
/// \brief Marks the promise as failed
|
||||
///
|
||||
/// Forwards the exception argument to the future and makes it
|
||||
/// available. May be called either before or after \c get_future().
|
||||
template<typename Exception>
|
||||
void set_exception(Exception&& e) noexcept {
|
||||
set_exception(make_exception_ptr(std::forward<Exception>(e)));
|
||||
@@ -370,10 +439,29 @@ private:
|
||||
friend class future;
|
||||
};
|
||||
|
||||
/// \brief Specialization of \c promise<void>
|
||||
///
|
||||
/// This is an alias for \c promise<>, for generic programming purposes.
|
||||
/// For example, You may have a \c promise<T> where \c T can legally be
|
||||
/// \c void.
|
||||
template<>
|
||||
class promise<void> : public promise<> {};
|
||||
|
||||
/// @}
|
||||
|
||||
/// \addtogroup future-util
|
||||
/// @{
|
||||
|
||||
|
||||
/// \brief Check whether a type is a future
|
||||
///
|
||||
/// This is a type trait evaluating to \c true if the given type is a
|
||||
/// future.
|
||||
///
|
||||
template <typename... T> struct is_future : std::false_type {};
|
||||
|
||||
/// \cond internal
|
||||
/// \addtogroup future-util
|
||||
template <typename... T> struct is_future<future<T...>> : std::true_type {};
|
||||
|
||||
struct ready_future_marker {};
|
||||
@@ -381,25 +469,34 @@ struct ready_future_from_tuple_marker {};
|
||||
struct exception_future_marker {};
|
||||
|
||||
extern __thread size_t future_avail_count;
|
||||
/// \endcond
|
||||
|
||||
// Converts a type to a future type, if it isn't already.
|
||||
//
|
||||
// Result in member type 'type'.
|
||||
|
||||
/// \brief Converts a type to a future type, if it isn't already.
|
||||
///
|
||||
/// \return Result in member type 'type'.
|
||||
template <typename T>
|
||||
struct futurize;
|
||||
|
||||
template <typename T>
|
||||
struct futurize {
|
||||
/// If \c T is a future, \c T; otherwise \c future<T>
|
||||
using type = future<T>;
|
||||
/// The promise type associated with \c type.
|
||||
using promise_type = promise<T>;
|
||||
|
||||
/// Apply a function to an argument list (expressed as a tuple)
|
||||
/// and return the result, as a future (if it wasn't already).
|
||||
template<typename Func, typename... FuncArgs>
|
||||
static inline type apply(Func&& func, std::tuple<FuncArgs...>&& args);
|
||||
|
||||
/// Apply a function to an argument list
|
||||
/// and return the result, as a future (if it wasn't already).
|
||||
template<typename Func, typename... FuncArgs>
|
||||
static inline type apply(Func&& func, FuncArgs&&... args);
|
||||
};
|
||||
|
||||
/// \cond internal
|
||||
template <>
|
||||
struct futurize<void> {
|
||||
using type = future<>;
|
||||
@@ -423,11 +520,30 @@ struct futurize<future<Args...>> {
|
||||
template<typename Func, typename... FuncArgs>
|
||||
static inline type apply(Func&& func, FuncArgs&&... args);
|
||||
};
|
||||
/// \endcond
|
||||
|
||||
// Converts a type to a future type, if it isn't already.
|
||||
template <typename T>
|
||||
using futurize_t = typename futurize<T>::type;
|
||||
|
||||
/// @}
|
||||
|
||||
/// \addtogroup future-module
|
||||
/// @{
|
||||
|
||||
/// \brief A representation of a possibly not-yet-computed value.
|
||||
///
|
||||
/// A \c future represents a value that has not yet been computed
|
||||
/// (an asynchronous computation). It can be in one of several
|
||||
/// states:
|
||||
/// - unavailable: the computation has not been completed yet
|
||||
/// - value: the computation has been completed successfully and a
|
||||
/// value is available.
|
||||
/// - failed: the computation completed with an exception.
|
||||
///
|
||||
/// methods in \c future allow querying the state and, most importantly,
|
||||
/// scheduling a \c continuation to be executed when the future becomes
|
||||
/// available. Only one such continuation may be scheduled.
|
||||
template <typename... T>
|
||||
class future {
|
||||
promise<T...>* _promise;
|
||||
@@ -492,8 +608,11 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
/// \brief The data type carried by the future.
|
||||
using value_type = std::tuple<T...>;
|
||||
/// \brief The data type carried by the future.
|
||||
using promise_type = promise<T...>;
|
||||
/// \brief Moves the future into a new object.
|
||||
future(future&& x) noexcept(move_noexcept) : _promise(x._promise) {
|
||||
if (!_promise) {
|
||||
_local_state = std::move(x._local_state);
|
||||
@@ -521,6 +640,15 @@ public:
|
||||
report_failed_future(state()->get_exception());
|
||||
}
|
||||
}
|
||||
/// \brief gets the value returned by the computation
|
||||
///
|
||||
/// Requires that the future be available. If the value
|
||||
/// was computed successfully, it is returned (as an
|
||||
/// \c std::tuple). Otherwise, an exception is thrown.
|
||||
///
|
||||
/// If get() is called in a \ref seastar::thread context,
|
||||
/// then it need not be available; instead, the thread will
|
||||
/// be paused until the future becomes available.
|
||||
std::tuple<T...> get() {
|
||||
if (!state()->available()) {
|
||||
wait();
|
||||
@@ -528,6 +656,7 @@ public:
|
||||
return state()->get();
|
||||
}
|
||||
|
||||
/// \cond internal
|
||||
void wait() {
|
||||
auto thread = seastar::thread_impl::get();
|
||||
assert(thread);
|
||||
@@ -537,26 +666,73 @@ public:
|
||||
});
|
||||
seastar::thread_impl::switch_out(thread);
|
||||
}
|
||||
/// \endcond
|
||||
|
||||
/// \brief Checks whether the future is available.
|
||||
///
|
||||
/// \return \c true if the future has a value, or has failed.
|
||||
bool available() noexcept {
|
||||
return state()->available();
|
||||
}
|
||||
|
||||
/// \brief Checks whether the future has failed.
|
||||
///
|
||||
/// \return \c true if the future is availble and has failed.
|
||||
bool failed() noexcept {
|
||||
return state()->failed();
|
||||
}
|
||||
|
||||
/// \brief Schedule a block of code to run when the future is ready.
|
||||
///
|
||||
/// Schedules a function (often a lambda) to run when the future becomes
|
||||
/// available. The function is called with the result of this future's
|
||||
/// computation as parameters. The return value of the function becomes
|
||||
/// the return value of then(), itself as a future; this allows then()
|
||||
/// calls to be chained.
|
||||
///
|
||||
/// If the future failed, the function is not called, and the exception
|
||||
/// is propagated into the return value of then().
|
||||
///
|
||||
/// \param func - function to be called when the future becomes available,
|
||||
/// unless it has failed.
|
||||
/// \return a \c future representing the return value of \c func, applied
|
||||
/// to the eventual value of this future.
|
||||
template <typename Func>
|
||||
futurize_t<std::result_of_t<Func(T&&...)>> then(Func&& func) noexcept {
|
||||
return then<std::result_of_t<Func(T&&...)>>(std::forward<Func>(func), [] (future_state<T...>&& state) { return state.get(); });
|
||||
}
|
||||
|
||||
/// \brief Schedule a block of code to run when the future is ready, allowing
|
||||
/// for exception handling.
|
||||
///
|
||||
/// Schedules a function (often a lambda) to run when the future becomes
|
||||
/// available. The function is called with the this future as a parameter;
|
||||
/// it will be in an available state. The return value of the function becomes
|
||||
/// the return value of then_wrapped(), itself as a future; this allows
|
||||
/// then_wrapped() calls to be chained.
|
||||
///
|
||||
/// Unlike then(), the function will be called for both value and exceptional
|
||||
/// futures.
|
||||
///
|
||||
/// \param func - function to be called when the future becomes available,
|
||||
/// \return a \c future representing the return value of \c func, applied
|
||||
/// to the eventual value of this future.
|
||||
template <typename Func>
|
||||
futurize_t<std::result_of_t<Func(future<T...>)>>
|
||||
then_wrapped(Func&& func) noexcept {
|
||||
return then<std::result_of_t<Func(future<T...>)>>(std::forward<Func>(func), [] (future_state<T...>&& state) { return future(std::move(state)); });
|
||||
}
|
||||
|
||||
/// \brief Satisfy some \ref promise object with this future as a result.
|
||||
///
|
||||
/// Arranges so that when this future is resolve, it will be used to
|
||||
/// satisfy an unrelated promise. This is similar to scheduling a
|
||||
/// continuation that moves the result of this future into the promise
|
||||
/// (using promise::set_value() or promise::set_exception(), except
|
||||
/// that it is more efficient.
|
||||
///
|
||||
/// \param pr a promise that will be fulfilled with the results of this
|
||||
/// future.
|
||||
void forward_to(promise<T...>&& pr) noexcept {
|
||||
if (state()->available()) {
|
||||
state()->forward_to(pr);
|
||||
@@ -588,6 +764,10 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/// \brief Terminate the program if this future fails.
|
||||
///
|
||||
/// Terminates the entire program is this future resolves
|
||||
/// to an exception. Use with caution.
|
||||
future<> or_terminate() noexcept {
|
||||
return then_wrapped([] (auto&& f) {
|
||||
try {
|
||||
@@ -598,10 +778,15 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/// \brief Discards the value carried by this future.
|
||||
///
|
||||
/// Converts the future into a no-value \c future<>, by
|
||||
/// ignorting any result. Exceptions are propagated unchanged.
|
||||
future<> discard_result() noexcept {
|
||||
return then([] (T&&...) {});
|
||||
}
|
||||
|
||||
/// \cond internal
|
||||
template <typename... U>
|
||||
friend class promise;
|
||||
template <typename... U, typename... A>
|
||||
@@ -610,6 +795,7 @@ public:
|
||||
friend future<U...> make_exception_future(std::exception_ptr ex) noexcept;
|
||||
template <typename... U, typename Exception>
|
||||
friend future<U...> make_exception_future(Exception&& ex) noexcept;
|
||||
/// \endcond
|
||||
};
|
||||
|
||||
inline
|
||||
@@ -670,12 +856,22 @@ future<T...> make_exception_future(std::exception_ptr ex) noexcept {
|
||||
return future<T...>(exception_future_marker(), std::move(ex));
|
||||
}
|
||||
|
||||
/// \brief Creates a \ref future in an available, failed state.
|
||||
///
|
||||
/// Creates a \ref future object that is already resolved in a failed
|
||||
/// state. This no I/O needs to be performed to perform a computation
|
||||
/// (for example, because the connection is closed and we cannot read
|
||||
/// from it).
|
||||
template <typename... T, typename Exception>
|
||||
inline
|
||||
future<T...> make_exception_future(Exception&& ex) noexcept {
|
||||
return make_exception_future<T...>(std::make_exception_ptr(std::forward<Exception>(ex)));
|
||||
}
|
||||
|
||||
/// @}
|
||||
|
||||
/// \cond internal
|
||||
|
||||
template<typename T>
|
||||
template<typename Func, typename... FuncArgs>
|
||||
typename futurize<T>::type futurize<T>::apply(Func&& func, std::tuple<FuncArgs...>&& args) {
|
||||
@@ -712,4 +908,6 @@ typename futurize<future<Args...>>::type futurize<future<Args...>>::apply(Func&&
|
||||
return func(std::forward<FuncArgs>(args)...);
|
||||
}
|
||||
|
||||
/// \endcond
|
||||
|
||||
#endif /* FUTURE_HH_ */
|
||||
|
||||
@@ -19,6 +19,9 @@
|
||||
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
|
||||
/// \cond internal
|
||||
|
||||
//
|
||||
// Seastar memory allocator
|
||||
//
|
||||
@@ -1198,3 +1201,5 @@ void operator delete[](void* ptr, with_alignment wa) {
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/// \endcond
|
||||
|
||||
@@ -27,8 +27,31 @@
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
||||
|
||||
/// \defgroup memory-module Memory management
|
||||
///
|
||||
/// Functions and classes for managing memory.
|
||||
///
|
||||
/// Memory management in seastar consists of the following:
|
||||
///
|
||||
/// - Low-level memory management in the \ref memory namespace.
|
||||
/// - Various smart pointers: \ref shared_ptr, \ref lw_shared_ptr,
|
||||
/// and \ref foreign_ptr.
|
||||
/// - zero-copy support: \ref temporary_buffer and \ref deleter.
|
||||
|
||||
/// Low-level memory management support
|
||||
///
|
||||
/// The \c memory namespace provides functions and classes for interfacing
|
||||
/// with the seastar memory allocator.
|
||||
///
|
||||
/// The seastar memory allocator splits system memory into a pool per
|
||||
/// logical core (lcore). Memory allocated one an lcore should be freed
|
||||
/// on the same lcore; failing to do so carries a severe performance
|
||||
/// penalty. It is possible to share memory with another core, but this
|
||||
/// should be limited to avoid cache coherency traffic.
|
||||
namespace memory {
|
||||
|
||||
/// \cond internal
|
||||
// TODO: Use getpagesize() in order to learn a size of a system PAGE.
|
||||
static constexpr size_t page_bits = 12;
|
||||
static constexpr size_t page_size = 1 << page_bits; // 4K
|
||||
@@ -53,11 +76,12 @@ public:
|
||||
// Returns @true if any work was actually performed.
|
||||
bool drain_cross_cpu_freelist();
|
||||
|
||||
|
||||
// We don't want the memory code calling back into the rest of
|
||||
// the system, so allow the rest of the system to tell the memory
|
||||
// code how to initiate reclaim.
|
||||
//
|
||||
// When memory is low, calling hook(fn) will result in fn being called
|
||||
// When memory is low, calling \c hook(fn) will result in fn being called
|
||||
// in a safe place wrt. allocations.
|
||||
void set_reclaim_hook(
|
||||
std::function<void (std::function<void ()>)> hook);
|
||||
@@ -78,9 +102,14 @@ struct translation {
|
||||
// translation is not known.
|
||||
translation translate(const void* addr, size_t size);
|
||||
|
||||
/// \endcond
|
||||
|
||||
class statistics;
|
||||
|
||||
/// Capture a snapshot of memory allocation statistics for this lcore.
|
||||
statistics stats();
|
||||
|
||||
/// Memory allocation statistics.
|
||||
class statistics {
|
||||
uint64_t _mallocs;
|
||||
uint64_t _frees;
|
||||
@@ -89,9 +118,14 @@ private:
|
||||
statistics(uint64_t mallocs, uint64_t frees, uint64_t cross_cpu_frees)
|
||||
: _mallocs(mallocs), _frees(frees), _cross_cpu_frees(cross_cpu_frees) {}
|
||||
public:
|
||||
/// Total number of memory allocations calls since the system was started.
|
||||
uint64_t mallocs() const { return _mallocs; }
|
||||
/// Total number of memory deallocations calls since the system was started.
|
||||
uint64_t frees() const { return _frees; }
|
||||
/// Total number of memory deallocations that occured on a different lcore
|
||||
/// than the one on which they were allocated.
|
||||
uint64_t cross_cpu_frees() const { return _cross_cpu_frees; }
|
||||
/// Total number of objects which were allocated but not freed.
|
||||
size_t live_objects() const { return mallocs() - frees(); }
|
||||
friend statistics stats();
|
||||
};
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
*/
|
||||
|
||||
#include <sys/syscall.h>
|
||||
#include "task.hh"
|
||||
#include "reactor.hh"
|
||||
#include "memory.hh"
|
||||
#include "core/posix.hh"
|
||||
|
||||
@@ -22,6 +22,18 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
/// \mainpage
|
||||
///
|
||||
/// Seastar is a high performance C++ application framework for high
|
||||
/// concurrency server applications.
|
||||
///
|
||||
/// Please see:
|
||||
/// - \ref future-module Documentation on futures and promises, which are
|
||||
/// the seastar building blocks.
|
||||
/// - \ref future-util Utililty functions for working with futures
|
||||
/// - \ref memory-module Memory management
|
||||
|
||||
|
||||
#include "sstring.hh"
|
||||
#include "future.hh"
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#ifndef SHARED_PTR_HH_
|
||||
#define SHARED_PTR_HH_
|
||||
|
||||
#include "shared_ptr_debug_helper.hh"
|
||||
#include <utility>
|
||||
#include <type_traits>
|
||||
#include <functional>
|
||||
@@ -46,6 +47,12 @@
|
||||
// and lw_enable_shared_from_this<>().
|
||||
//
|
||||
|
||||
#ifndef DEBUG_SHARED_PTR
|
||||
using shared_ptr_counter_type = long;
|
||||
#else
|
||||
using shared_ptr_counter_type = debug_shared_ptr_counter_type;
|
||||
#endif
|
||||
|
||||
template <typename T>
|
||||
class lw_shared_ptr;
|
||||
|
||||
@@ -103,7 +110,7 @@ shared_ptr<T> const_pointer_cast(const shared_ptr<U>& p);
|
||||
// CRTP from this to enable shared_from_this:
|
||||
template <typename T>
|
||||
class enable_lw_shared_from_this {
|
||||
long _count = 0;
|
||||
shared_ptr_counter_type _count = 0;
|
||||
using ctor = T;
|
||||
T* to_value() { return static_cast<T*>(this); }
|
||||
T* to_internal_object() { return static_cast<T*>(this); }
|
||||
@@ -119,7 +126,7 @@ public:
|
||||
|
||||
template <typename T>
|
||||
struct shared_ptr_no_esft {
|
||||
long _count = 0;
|
||||
shared_ptr_counter_type _count = 0;
|
||||
T _value;
|
||||
using ctor = shared_ptr_no_esft;
|
||||
|
||||
@@ -298,7 +305,7 @@ std::ostream& operator<<(std::ostream& out, const lw_shared_ptr<T>& p) {
|
||||
struct shared_ptr_count_base {
|
||||
// destructor is responsible for fully-typed deletion
|
||||
virtual ~shared_ptr_count_base() {}
|
||||
long count = 0;
|
||||
shared_ptr_counter_type count = 0;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
|
||||
66
core/shared_ptr_debug_helper.hh
Normal file
66
core/shared_ptr_debug_helper.hh
Normal file
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* This file is open source software, licensed to you under the terms
|
||||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
||||
* distributed with this work for additional information regarding copyright
|
||||
* ownership. You may not use this file except in compliance with the License.
|
||||
*
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef DEBUG_SHARED_PTR
|
||||
|
||||
#include <thread>
|
||||
#include <cassert>
|
||||
|
||||
// A counter that is only comfortable being incremented on the cpu
|
||||
// it was created on. Useful for verifying that a shared_ptr
|
||||
// or lw_shared_ptr isn't misued across cores.
|
||||
class debug_shared_ptr_counter_type {
|
||||
long _counter = 0;
|
||||
std::thread::id _cpu = std::this_thread::get_id();
|
||||
public:
|
||||
debug_shared_ptr_counter_type(long x) : _counter(x) {}
|
||||
operator long() const {
|
||||
check();
|
||||
return _counter;
|
||||
}
|
||||
debug_shared_ptr_counter_type& operator++() {
|
||||
check();
|
||||
++_counter;
|
||||
return *this;
|
||||
}
|
||||
long operator++(int) {
|
||||
check();
|
||||
return _counter++;
|
||||
}
|
||||
debug_shared_ptr_counter_type& operator--() {
|
||||
check();
|
||||
--_counter;
|
||||
return *this;
|
||||
}
|
||||
long operator--(int) {
|
||||
check();
|
||||
return _counter--;
|
||||
}
|
||||
private:
|
||||
void check() const {
|
||||
assert(_cpu == std::this_thread::get_id());
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
48
core/task.hh
Normal file
48
core/task.hh
Normal file
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* This file is open source software, licensed to you under the terms
|
||||
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
||||
* distributed with this work for additional information regarding copyright
|
||||
* ownership. You may not use this file except in compliance with the License.
|
||||
*
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
class task {
|
||||
public:
|
||||
virtual ~task() noexcept {}
|
||||
virtual void run() noexcept = 0;
|
||||
};
|
||||
|
||||
void schedule(std::unique_ptr<task> t);
|
||||
|
||||
template <typename Func>
|
||||
class lambda_task final : public task {
|
||||
Func _func;
|
||||
public:
|
||||
lambda_task(const Func& func) : _func(func) {}
|
||||
lambda_task(Func&& func) : _func(std::move(func)) {}
|
||||
virtual void run() noexcept override { _func(); }
|
||||
};
|
||||
|
||||
template <typename Func>
|
||||
inline
|
||||
std::unique_ptr<task>
|
||||
make_task(Func&& func) {
|
||||
return std::make_unique<lambda_task<Func>>(std::forward<Func>(func));
|
||||
}
|
||||
@@ -98,10 +98,13 @@ future<std::unique_ptr<reply>> file_interaction_handler::read(
|
||||
sstring extension = get_extension(file_name);
|
||||
rep->set_content_type(extension);
|
||||
return engine().open_file_dma(file_name, open_flags::ro).then(
|
||||
[rep = std::move(rep)](file f) mutable {
|
||||
[rep = std::move(rep), extension, this, req = std::move(req)](file f) mutable {
|
||||
std::shared_ptr<reader> r = std::make_shared<reader>(std::move(f), std::move(rep));
|
||||
|
||||
return r->is.consume(*r).then([r]() {
|
||||
return r->is.consume(*r).then([r, extension, this, req = std::move(req)]() {
|
||||
if (transformer != nullptr) {
|
||||
transformer->transform(r->_rep->_content, *req, extension);
|
||||
}
|
||||
r->_rep->done();
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(r->_rep));
|
||||
});
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#include <experimental/optional>
|
||||
#include "core/thread.hh"
|
||||
#include "core/semaphore.hh"
|
||||
#include "core/app-template.hh"
|
||||
|
||||
Reference in New Issue
Block a user