diff --git a/core/distributed.hh b/core/distributed.hh index 1ce84847ff..23a4ae0ec1 100644 --- a/core/distributed.hh +++ b/core/distributed.hh @@ -111,6 +111,37 @@ public: }, std::forward(r)); } + /// Applies a map function to all shards, then reduces the output by calling a reducer function. + /// + /// \param mapper map function accepting a \c Service& and returning a value + /// used as the second input to \c reduce + /// \param initial initial value used as the first input to \c reduce . + /// \param reduce binary function used to left-fold the return values of \c map + /// into \c initial . + /// + /// Each \c map invocation runs on the shard associated with the service. + /// + /// Requirements: + /// Mapper: unary function taking \c Service@ and producing some result. + /// Initial: any value type + /// Reduce: a binary function taking two Initial values and returning an Initial + /// Returns: + /// future + template + inline + future + map_reduce0(Mapper map, Initial initial, Reduce reduce) { + auto wrapped_map = [this, map] (unsigned c) { + return smp::submit_to(c, [map, inst = _instances[c]] { + return map(*inst); + }); + }; + return ::map_reduce(smp::all_cpus().begin(), smp::all_cpus().end(), + std::move(wrapped_map), + std::move(initial), + std::move(reduce)); + } + // Invoke a method on a specific instance of @Service. // The return value (which must be a future) contains the future // returned by @Service. diff --git a/core/future-util.hh b/core/future-util.hh index bfc69ff87e..005597e6b5 100644 --- a/core/future-util.hh +++ b/core/future-util.hh @@ -19,6 +19,9 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ + +/** @file */ + #ifndef CORE_FUTURE_UTIL_HH_ #define CORE_FUTURE_UTIL_HH_ @@ -48,6 +51,28 @@ parallel_for_each(Iterator begin, Iterator end, Func&& func) { return ret; } +/// \brief parallel_for_each - run tasks in parallel +/// +/// Given a range [\c begin, \c end) of objects, run \c func(object) for +/// each object in the range, and return a future<> that resolves when all +/// the functions complete. @func should return a future<> that indicates +/// when it is complete. +/// +/// \param range A range of objects to iterate run \c func on +/// \param func A callable, accepting reference to the range's +/// \c value_type, and returning a \c future<>. +/// \return a \c future<> that becomes ready when the entire range +/// was processed. If one or more of the invocations of +/// \c func returned an exceptional future, then the return +/// value will contain one of those exceptions. +template +inline +future<> +parallel_for_each(Range&& range, Func&& func) { + return parallel_for_each(std::begin(range), std::end(range), + std::forward(func)); +} + // The AsyncAction concept represents an action which can complete later than // the actual function invocation. It is represented by a function which // returns a future which resolves when the action is done. diff --git a/tests/distributed_test.cc b/tests/distributed_test.cc index c80cb4941f..0f149267e5 100644 --- a/tests/distributed_test.cc +++ b/tests/distributed_test.cc @@ -28,6 +28,10 @@ struct X { sstring echo(sstring arg) { return arg; } + int cpu_id_squared() const { + auto id = engine().cpu_id(); + return id * id; + } future<> stop() { return make_ready_future<>(); } }; @@ -81,6 +85,21 @@ future<> test_constructor_argument_is_passed_to_each_core() { }); } +future<> test_map_reduce() { + return do_with_distributed([] (distributed& x) { + return x.start().then([&x] { + return x.map_reduce0(std::mem_fn(&X::cpu_id_squared), + 0, + std::plus()).then([] (int result) { + int n = smp::count - 1; + if (result != (n * (n + 1) * (2*n + 1)) / 6) { + throw std::runtime_error("map_reduce failed"); + } + }); + }); + }); +} + int main(int argc, char** argv) { app_template app; return app.run(argc, argv, [] { @@ -88,6 +107,8 @@ int main(int argc, char** argv) { return test_functor_version(); }).then([] { return test_constructor_argument_is_passed_to_each_core(); + }).then([] { + return test_map_reduce(); }).then([] { return engine().exit(0); }).or_terminate();