From f3c89945353bfff6abb6aacbd9ce817e8416978e Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 12 May 2015 16:57:50 +0300 Subject: [PATCH 1/2] distributed: add map_reduce() variant accepting an initial value Separating the initial value (and accumulator) from the reducer function can result in simpler invocations. Unfortunately, the name conflicts with another variant, so we have to name the method map_reduce0. --- core/distributed.hh | 31 +++++++++++++++++++++++++++++++ tests/distributed_test.cc | 21 +++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/core/distributed.hh b/core/distributed.hh index 1ce84847ff..23a4ae0ec1 100644 --- a/core/distributed.hh +++ b/core/distributed.hh @@ -111,6 +111,37 @@ public: }, std::forward(r)); } + /// Applies a map function to all shards, then reduces the output by calling a reducer function. + /// + /// \param mapper map function accepting a \c Service& and returning a value + /// used as the second input to \c reduce + /// \param initial initial value used as the first input to \c reduce . + /// \param reduce binary function used to left-fold the return values of \c map + /// into \c initial . + /// + /// Each \c map invocation runs on the shard associated with the service. + /// + /// Requirements: + /// Mapper: unary function taking \c Service@ and producing some result. + /// Initial: any value type + /// Reduce: a binary function taking two Initial values and returning an Initial + /// Returns: + /// future + template + inline + future + map_reduce0(Mapper map, Initial initial, Reduce reduce) { + auto wrapped_map = [this, map] (unsigned c) { + return smp::submit_to(c, [map, inst = _instances[c]] { + return map(*inst); + }); + }; + return ::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(), + std::move(wrapped_map), + std::move(initial), + std::move(reduce)); + } + // Invoke a method on a specific instance of @Service. // The return value (which must be a future) contains the future // returned by @Service. diff --git a/tests/distributed_test.cc b/tests/distributed_test.cc index c80cb4941f..0f149267e5 100644 --- a/tests/distributed_test.cc +++ b/tests/distributed_test.cc @@ -28,6 +28,10 @@ struct X { sstring echo(sstring arg) { return arg; } + int cpu_id_squared() const { + auto id = engine().cpu_id(); + return id * id; + } future<> stop() { return make_ready_future<>(); } }; @@ -81,6 +85,21 @@ future<> test_constructor_argument_is_passed_to_each_core() { }); } +future<> test_map_reduce() { + return do_with_distributed([] (distributed& x) { + return x.start().then([&x] { + return x.map_reduce0(std::mem_fn(&X::cpu_id_squared), + 0, + std::plus()).then([] (int result) { + int n = smp::count - 1; + if (result != (n * (n + 1) * (2*n + 1)) / 6) { + throw std::runtime_error("map_reduce failed"); + } + }); + }); + }); +} + int main(int argc, char** argv) { app_template app; return app.run(argc, argv, [] { @@ -88,6 +107,8 @@ int main(int argc, char** argv) { return test_functor_version(); }).then([] { return test_constructor_argument_is_passed_to_each_core(); + }).then([] { + return test_map_reduce(); }).then([] { return engine().exit(0); }).or_terminate(); From bf527c2842d4bd6392b783e3f15d96b29a61eeee Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 13 May 2015 10:53:13 +0300 Subject: [PATCH 2/2] future: add range-based parallel_for_each variant --- core/future-util.hh | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/future-util.hh b/core/future-util.hh index bfc69ff87e..005597e6b5 100644 --- a/core/future-util.hh +++ b/core/future-util.hh @@ -19,6 +19,9 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ + +/** @file */ + #ifndef CORE_FUTURE_UTIL_HH_ #define CORE_FUTURE_UTIL_HH_ @@ -48,6 +51,28 @@ parallel_for_each(Iterator begin, Iterator end, Func&& func) { return ret; } +/// \brief parallel_for_each - run tasks in parallel +/// +/// Given a range [\c begin, \c end) of objects, run \c func(object) for +/// each object in the range, and return a future<> that resolves when all +/// the functions complete. @func should return a future<> that indicates +/// when it is complete. +/// +/// \param range A range of objects to iterate run \c func on +/// \param func A callable, accepting reference to the range's +/// \c value_type, and returning a \c future<>. +/// \return a \c future<> that becomes ready when the entire range +/// was processed. If one or more of the invocations of +/// \c func returned an exceptional future, then the return +/// value will contain one of those exceptions. +template +inline +future<> +parallel_for_each(Range&& range, Func&& func) { + return parallel_for_each(std::begin(range), std::end(range), + std::forward(func)); +} + // The AsyncAction concept represents an action which can complete later than // the actual function invocation. It is represented by a function which // returns a future which resolves when the action is done.