Merge branch 'master' of github.com:cloudius-systems/seastar into db

This commit is contained in:
Avi Kivity
2015-05-13 10:57:53 +03:00
3 changed files with 77 additions and 0 deletions

View File

@@ -111,6 +111,37 @@ public:
}, std::forward<Reducer>(r));
}
/// Applies a map function to all shards, then reduces the output by calling a reducer function.
///
/// \param mapper map function accepting a \c Service& and returning a value
/// used as the second input to \c reduce
/// \param initial initial value used as the first input to \c reduce .
/// \param reduce binary function used to left-fold the return values of \c map
/// into \c initial .
///
/// Each \c map invocation runs on the shard associated with the service.
///
/// Requirements:
/// Mapper: unary function taking \c Service@ and producing some result.
/// Initial: any value type
/// Reduce: a binary function taking two Initial values and returning an Initial
/// Returns:
/// future<Initial>
template <typename Mapper, typename Initial, typename Reduce>
inline
future<Initial>
map_reduce0(Mapper map, Initial initial, Reduce reduce) {
auto wrapped_map = [this, map] (unsigned c) {
return smp::submit_to(c, [map, inst = _instances[c]] {
return map(*inst);
});
};
return ::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(),
std::move(wrapped_map),
std::move(initial),
std::move(reduce));
}
// Invoke a method on a specific instance of @Service.
// The return value (which must be a future) contains the future
// returned by @Service.

View File

@@ -19,6 +19,9 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
/** @file */
#ifndef CORE_FUTURE_UTIL_HH_
#define CORE_FUTURE_UTIL_HH_
@@ -48,6 +51,28 @@ parallel_for_each(Iterator begin, Iterator end, Func&& func) {
return ret;
}
/// \brief parallel_for_each - run tasks in parallel
///
/// Given a range [\c begin, \c end) of objects, run \c func(object) for
/// each object in the range, and return a future<> that resolves when all
/// the functions complete. @func should return a future<> that indicates
/// when it is complete.
///
/// \param range A range of objects to iterate run \c func on
/// \param func A callable, accepting reference to the range's
/// \c value_type, and returning a \c future<>.
/// \return a \c future<> that becomes ready when the entire range
/// was processed. If one or more of the invocations of
/// \c func returned an exceptional future, then the return
/// value will contain one of those exceptions.
template <typename Range, typename Func>
inline
future<>
parallel_for_each(Range&& range, Func&& func) {
return parallel_for_each(std::begin(range), std::end(range),
std::forward<Func>(func));
}
// The AsyncAction concept represents an action which can complete later than
// the actual function invocation. It is represented by a function which
// returns a future which resolves when the action is done.

View File

@@ -28,6 +28,10 @@ struct X {
sstring echo(sstring arg) {
return arg;
}
int cpu_id_squared() const {
auto id = engine().cpu_id();
return id * id;
}
future<> stop() { return make_ready_future<>(); }
};
@@ -81,6 +85,21 @@ future<> test_constructor_argument_is_passed_to_each_core() {
});
}
future<> test_map_reduce() {
return do_with_distributed<X>([] (distributed<X>& x) {
return x.start().then([&x] {
return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
0,
std::plus<int>()).then([] (int result) {
int n = smp::count - 1;
if (result != (n * (n + 1) * (2*n + 1)) / 6) {
throw std::runtime_error("map_reduce failed");
}
});
});
});
}
int main(int argc, char** argv) {
app_template app;
return app.run(argc, argv, [] {
@@ -88,6 +107,8 @@ int main(int argc, char** argv) {
return test_functor_version();
}).then([] {
return test_constructor_argument_is_passed_to_each_core();
}).then([] {
return test_map_reduce();
}).then([] {
return engine().exit(0);
}).or_terminate();