443 lines
17 KiB
C++
443 lines
17 KiB
C++
/*
|
|
* This file is open source software, licensed to you under the terms
|
|
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
|
* distributed with this work for additional information regarding copyright
|
|
* ownership. You may not use this file except in compliance with the License.
|
|
*
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
/*
|
|
* Copyright (C) 2014 Cloudius Systems, Ltd.
|
|
*/
|
|
|
|
#ifndef DISTRIBUTED_HH_
|
|
#define DISTRIBUTED_HH_
|
|
|
|
#include "reactor.hh"
|
|
#include "future-util.hh"
|
|
|
|
/// \defgroup smp-module Multicore
|
|
///
|
|
/// \brief Support for exploiting multiple cores on a server.
|
|
///
|
|
/// Seastar supports multicore servers by using \i sharding. Each logical
|
|
/// core (lcore) runs a separate event loop, with its own memory allocator,
|
|
/// TCP/IP stack, and other services. Shards communicate by explicit message
|
|
/// passing, rather than using locks and condition variables as with traditional
|
|
/// threaded programming.
|
|
|
|
/// \addtogroup smp-module
|
|
/// @{
|
|
|
|
/// Template helper to distribute a service across all logical cores.
|
|
///
|
|
/// The \c distributed template manages a sharded service, by creating
|
|
/// a copy of the service on each shard, providing mechanisms to communicate
|
|
/// with each shard's copy, and a way to stop the service.
|
|
///
|
|
/// \tparam Service a class to be instantiated on each core. Should expose
|
|
/// a \c stop() method that returns a \c future<>, to be called when
|
|
/// the service is stopped.
|
|
template <typename Service>
|
|
class distributed {
|
|
std::vector<Service*> _instances;
|
|
public:
|
|
/// Constructs an empty \c distributed object. No instances of the service are
|
|
/// created.
|
|
distributed() {}
|
|
distributed(const distributed& other) = delete;
|
|
/// Moves a \c distributed object.
|
|
distributed(distributed&& other) = default;
|
|
distributed& operator=(const distributed& other) = delete;
|
|
/// Moves a \c distributed object.
|
|
distributed& operator=(distributed& other) = default;
|
|
/// Destroyes a \c distributed object. Must not be in a started state.
|
|
~distributed();
|
|
|
|
/// Starts \c Service by constructing an instance on every logical core
|
|
/// with a copy of \c args passed to the constructor.
|
|
///
|
|
/// \param args Arguments to be forwarded to \c Service constructor
|
|
/// \return a \ref future<> that becomes ready when all instances have been
|
|
/// constructed.
|
|
template <typename... Args>
|
|
future<> start(Args&&... args);
|
|
|
|
/// Starts \c Service by constructing an instance on a single logical core
|
|
/// with a copy of \c args passed to the constructor.
|
|
///
|
|
/// \param args Arguments to be forwarded to \c Service constructor
|
|
/// \return a \ref future<> that becomes ready when the instance has been
|
|
/// constructed.
|
|
template <typename... Args>
|
|
future<> start_single(Args&&... args);
|
|
|
|
/// Stops all started instances and destroys them.
|
|
///
|
|
/// For every started instance, its \c stop() method is called, and then
|
|
/// it is destroyed.
|
|
future<> stop();
|
|
|
|
// Invoke a method on all instances of @Service.
|
|
// The return value becomes ready when all instances have processed
|
|
// the message.
|
|
template <typename... Args>
|
|
future<> invoke_on_all(future<> (Service::*func)(Args...), Args... args);
|
|
|
|
/// Invoke a method on all \c Service instances in parallel.
|
|
///
|
|
/// \param func member function to be called. Must return \c void or
|
|
/// \c future<>.
|
|
/// \param args arguments to be passed to \c func.
|
|
/// \return future that becomes ready when the method has been invoked
|
|
/// on all instances.
|
|
template <typename... Args>
|
|
future<> invoke_on_all(void (Service::*func)(Args...), Args... args);
|
|
|
|
/// Invoke a callable on all instances of \c Service.
|
|
///
|
|
/// \param func a callable with the signature `void (Service&)`
|
|
/// or `future<> (Service&)`, to be called on each core
|
|
/// with the local instance as an argument.
|
|
/// \return a `future<>` that becomes ready when all cores have
|
|
/// processed the message.
|
|
template <typename Func>
|
|
future<> invoke_on_all(Func&& func);
|
|
|
|
/// Invoke a method on all instances of `Service` and reduce the results using
|
|
/// `Reducer`.
|
|
///
|
|
/// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
|
|
template <typename Reducer, typename Ret, typename... FuncArgs, typename... Args>
|
|
inline
|
|
auto
|
|
map_reduce(Reducer&& r, Ret (Service::*func)(FuncArgs...), Args&&... args)
|
|
-> typename reducer_traits<Reducer>::future_type
|
|
{
|
|
unsigned c = 0;
|
|
return ::map_reduce(_instances.begin(), _instances.end(),
|
|
[&c, func, args = std::make_tuple(std::forward<Args>(args)...)] (Service* inst) mutable {
|
|
return smp::submit_to(c++, [inst, func, args] () mutable {
|
|
return apply([inst, func] (Args&&... args) mutable {
|
|
return (inst->*func)(std::forward<Args>(args)...);
|
|
}, std::move(args));
|
|
});
|
|
}, std::forward<Reducer>(r));
|
|
}
|
|
|
|
/// Invoke a callable on all instances of `Service` and reduce the results using
|
|
/// `Reducer`.
|
|
///
|
|
/// \see map_reduce(Iterator begin, Iterator end, Mapper&& mapper, Reducer&& r)
|
|
template <typename Reducer, typename Func>
|
|
inline
|
|
auto map_reduce(Reducer&& r, Func&& func) -> typename reducer_traits<Reducer>::future_type
|
|
{
|
|
unsigned c = 0;
|
|
return ::map_reduce(_instances.begin(), _instances.end(),
|
|
[&c, &func] (Service* inst) mutable {
|
|
return smp::submit_to(c++, [inst, func] () mutable {
|
|
return func(*inst);
|
|
});
|
|
}, std::forward<Reducer>(r));
|
|
}
|
|
|
|
/// Applies a map function to all shards, then reduces the output by calling a reducer function.
|
|
///
|
|
/// \param map callable with the signature `Value (Service&)` or
|
|
/// `future<Value> (Service&)` (for some `Value` type).
|
|
/// used as the second input to \c reduce
|
|
/// \param initial initial value used as the first input to \c reduce.
|
|
/// \param reduce binary function used to left-fold the return values of \c map
|
|
/// into \c initial .
|
|
///
|
|
/// Each \c map invocation runs on the shard associated with the service.
|
|
///
|
|
/// \tparam Mapper unary function taking `Service&` and producing some result.
|
|
/// \tparam Initial any value type
|
|
/// \tparam Reduce a binary function taking two Initial values and returning an Initial
|
|
/// \return Result of applying `map` to each instance in parallel, reduced by calling
|
|
/// `reduce()` on each adjacent pair of results.
|
|
template <typename Mapper, typename Initial, typename Reduce>
|
|
inline
|
|
future<Initial>
|
|
map_reduce0(Mapper map, Initial initial, Reduce reduce) {
|
|
auto wrapped_map = [this, map] (unsigned c) {
|
|
return smp::submit_to(c, [map, inst = _instances[c]] {
|
|
return map(*inst);
|
|
});
|
|
};
|
|
return ::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
|
|
std::move(wrapped_map),
|
|
std::move(initial),
|
|
std::move(reduce));
|
|
}
|
|
|
|
/// Invoke a method on a specific instance of `Service`.
|
|
///
|
|
/// \param id shard id to call
|
|
/// \param func a method of `Service`
|
|
/// \param args arguments to be passed to `func`
|
|
/// \return result of calling `func(args)` on the designated instance
|
|
template <typename Ret, typename... FuncArgs, typename... Args>
|
|
std::enable_if_t<is_future<Ret>::value, Ret>
|
|
invoke_on(unsigned id, Ret (Service::*func)(FuncArgs...), Args&&... args) {
|
|
auto inst = _instances[id];
|
|
return smp::submit_to(id, [inst, func, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
|
|
return apply([inst, func] (Args&&... args) mutable {
|
|
return (inst->*func)(std::forward<Args>(args)...);
|
|
}, std::move(args));
|
|
});
|
|
}
|
|
|
|
/// Invoke a method on a specific instance of `Service`.
|
|
///
|
|
/// \param id shard id to call
|
|
/// \param func a method of `Service`
|
|
/// \param args arguments to be passed to `func`
|
|
/// \return result of calling `func(args)` on the designated instance
|
|
template <typename Ret, typename... FuncArgs, typename... Args>
|
|
std::enable_if_t<!is_future<Ret>::value && !std::is_same<Ret, void>::value, future<Ret>>
|
|
invoke_on(unsigned id, Ret (Service::*func)(FuncArgs...), Args&&... args) {
|
|
auto inst = _instances[id];
|
|
return smp::submit_to(id, [inst, func, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
|
|
return apply([inst, func] (Args&&... args) mutable {
|
|
return make_ready_future<Ret>((inst->*func)(std::forward<Args>(args)...));
|
|
}, std::move(args));
|
|
});
|
|
}
|
|
|
|
template <typename... FuncArgs, typename... Args>
|
|
void invoke_on(unsigned id, void (Service::*func)(FuncArgs...), Args&&... args) {
|
|
auto inst = _instances[id];
|
|
smp::submit_to(id, [inst, func, args...] () mutable {
|
|
(inst->*func)(std::forward<Args>(args)...);
|
|
});
|
|
}
|
|
|
|
/// Invoke a callable on a specific instance of `Service`.
|
|
///
|
|
/// \param id shard id to call
|
|
/// \param func a callable with signature `Value (Service&)` or
|
|
/// `future<Value> (Service&)` (for some `Value` type)
|
|
/// \return result of calling `func(instance)` on the designated instance
|
|
template <typename Func>
|
|
futurize_t<std::result_of_t<Func(Service&)>>
|
|
invoke_on(unsigned id, Func&& func) {
|
|
auto inst = _instances[id];
|
|
return smp::submit_to(id, [inst, func = std::forward<Func>(func)] () mutable {
|
|
return func(*inst);
|
|
});
|
|
}
|
|
|
|
/// Gets a reference to the local instance.
|
|
Service& local();
|
|
|
|
/// Checks whether the local instance has been initialized.
|
|
bool local_is_initialized();
|
|
};
|
|
|
|
template <typename Service>
|
|
distributed<Service>::~distributed() {
|
|
assert(_instances.empty());
|
|
}
|
|
|
|
template <typename Service>
|
|
template <typename... Args>
|
|
future<>
|
|
distributed<Service>::start(Args&&... args) {
|
|
_instances.resize(smp::count);
|
|
unsigned c = 0;
|
|
return parallel_for_each(_instances.begin(), _instances.end(),
|
|
[this, &c, args = std::make_tuple(std::forward<Args>(args)...)] (Service*& inst) mutable {
|
|
return smp::submit_to(c++, [&inst, args] () mutable {
|
|
inst = apply([] (Args&&... args) {
|
|
return new Service(std::forward<Args>(args)...);
|
|
}, std::move(args));
|
|
});
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
template <typename... Args>
|
|
future<>
|
|
distributed<Service>::start_single(Args&&... args) {
|
|
assert(_instances.empty());
|
|
_instances.resize(1);
|
|
return smp::submit_to(0, [this, args = std::make_tuple(std::forward<Args>(args)...)] () mutable {
|
|
_instances[0] = apply([] (Args&&... args) {
|
|
return new Service(std::forward<Args>(args)...);
|
|
}, std::move(args));
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
future<>
|
|
distributed<Service>::stop() {
|
|
unsigned c = 0;
|
|
return parallel_for_each(_instances.begin(), _instances.end(), [&c] (Service*& inst) mutable {
|
|
return smp::submit_to(c++, [inst] () mutable {
|
|
return inst->stop().then([inst] () mutable {
|
|
delete inst;
|
|
});
|
|
});
|
|
}).then([this] {
|
|
_instances.clear();
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
template <typename... Args>
|
|
inline
|
|
future<>
|
|
distributed<Service>::invoke_on_all(future<> (Service::*func)(Args...), Args... args) {
|
|
unsigned c = 0;
|
|
return parallel_for_each(_instances.begin(), _instances.end(), [&c, func, args...] (Service* inst) {
|
|
return smp::submit_to(c++, [inst, func, args...] {
|
|
return (inst->*func)(args...);
|
|
});
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
template <typename... Args>
|
|
inline
|
|
future<>
|
|
distributed<Service>::invoke_on_all(void (Service::*func)(Args...), Args... args) {
|
|
unsigned c = 0;
|
|
return parallel_for_each(_instances.begin(), _instances.end(), [&c, func, args...] (Service* inst) {
|
|
return smp::submit_to(c++, [inst, func, args...] {
|
|
(inst->*func)(args...);
|
|
});
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
template <typename Func>
|
|
inline
|
|
future<>
|
|
distributed<Service>::invoke_on_all(Func&& func) {
|
|
static_assert(std::is_same<futurize_t<std::result_of_t<Func(Service&)>>, future<>>::value,
|
|
"invoke_on_all()'s func must return void or future<>");
|
|
unsigned c = 0;
|
|
return parallel_for_each(_instances.begin(), _instances.end(), [&c, &func] (Service* inst) {
|
|
return smp::submit_to(c++, [inst, func] {
|
|
return func(*inst);
|
|
});
|
|
});
|
|
}
|
|
|
|
template <typename Service>
|
|
Service& distributed<Service>::local() {
|
|
assert(local_is_initialized());
|
|
return *_instances[engine().cpu_id()];
|
|
}
|
|
|
|
template <typename Service>
|
|
inline bool distributed<Service>::local_is_initialized() {
|
|
return _instances.size() > engine().cpu_id() &&
|
|
_instances[engine().cpu_id()];
|
|
}
|
|
|
|
/// Smart pointer wrapper which makes it safe to move across CPUs.
|
|
///
|
|
/// \c foreign_ptr<> is a smart pointer wrapper which, unlike
|
|
/// \ref shared_ptr and \ref lw_shared_ptr, is safe to move to a
|
|
/// different core.
|
|
///
|
|
/// As seastar avoids locking, any but the most trivial objects must
|
|
/// be destroyed on the same core they were created on, so that,
|
|
/// for example, their destructors can unlink references to the
|
|
/// object from various containers. In addition, for performance
|
|
/// reasons, the shared pointer types do not use atomic operations
|
|
/// to manage their reference counts. As a result they cannot be
|
|
/// used on multiple cores in parallel.
|
|
///
|
|
/// \c foreign_ptr<> provides a solution to that problem.
|
|
/// \c foreign_ptr<> wraps any pointer type -- raw pointer,
|
|
/// \ref shared_ptr<>, or similar, and remembers on what core this
|
|
/// happened. When the \c foreign_ptr<> object is destroyed, it
|
|
/// sends a message to the original core so that the wrapped object
|
|
/// can be safely destroyed.
|
|
///
|
|
/// \c foreign_ptr<> is a move-only object; it cannot be copied.
|
|
///
|
|
template <typename PtrType>
|
|
class foreign_ptr {
|
|
private:
|
|
PtrType _value;
|
|
unsigned _cpu;
|
|
private:
|
|
bool on_origin() {
|
|
return engine().cpu_id() == _cpu;
|
|
}
|
|
public:
|
|
using element_type = typename std::pointer_traits<PtrType>::element_type;
|
|
|
|
/// Constructs a null \c foreign_ptr<>.
|
|
foreign_ptr()
|
|
: _value(PtrType())
|
|
, _cpu(engine().cpu_id()) {
|
|
}
|
|
/// Constructs a null \c foreign_ptr<>.
|
|
foreign_ptr(std::nullptr_t) : foreign_ptr() {}
|
|
/// Wraps a pointer object and remembers the current core.
|
|
foreign_ptr(PtrType value)
|
|
: _value(std::move(value))
|
|
, _cpu(engine().cpu_id()) {
|
|
}
|
|
// The type is intentionally non-copyable because copies
|
|
// are expensive because each copy requires across-CPU call.
|
|
foreign_ptr(const foreign_ptr&) = delete;
|
|
/// Moves a \c foreign_ptr<> to another object.
|
|
foreign_ptr(foreign_ptr&& other) = default;
|
|
/// Destroys the wrapped object on its original cpu.
|
|
~foreign_ptr() {
|
|
if (_value && !on_origin()) {
|
|
smp::submit_to(_cpu, [v = std::move(_value)] () mutable {
|
|
auto local(std::move(v));
|
|
});
|
|
}
|
|
}
|
|
/// release the wrapped object on a local cpu. If executed on cpu
|
|
/// other than the one object was created on object will be copied
|
|
/// to local memory.
|
|
element_type make_local_and_release() {
|
|
if (on_origin()) {
|
|
return std::move(*_value);
|
|
} else {
|
|
// copied to caller's cpu here
|
|
return *_value;
|
|
}
|
|
}
|
|
/// Accesses the wrapped object.
|
|
element_type& operator*() const { return *_value; }
|
|
/// Accesses the wrapped object.
|
|
element_type* operator->() const { return &*_value; }
|
|
/// Checks whether the wrapped pointer is non-null.
|
|
operator bool() const { return static_cast<bool>(_value); }
|
|
/// Move-assigns a \c foreign_ptr<>.
|
|
foreign_ptr& operator=(foreign_ptr&& other) = default;
|
|
};
|
|
|
|
/// Wraps a raw or smart pointer object in a \ref foreign_ptr<>.
|
|
///
|
|
/// \relates foreign_ptr
|
|
template <typename T>
|
|
foreign_ptr<T> make_foreign(T ptr) {
|
|
return foreign_ptr<T>(std::move(ptr));
|
|
}
|
|
|
|
/// @}
|
|
|
|
#endif /* DISTRIBUTED_HH_ */
|