diff --git a/configure.py b/configure.py index b1207a7966..154b7c954b 100755 --- a/configure.py +++ b/configure.py @@ -154,7 +154,7 @@ tests = [ 'tests/shared_ptr_test', 'tests/slab_test', 'tests/fstream_test', - 'tests/map_reduce_test', + 'tests/distributed_test', 'tests/rpc', ] @@ -351,7 +351,7 @@ deps = { 'tests/shared_ptr_test': ['tests/shared_ptr_test.cc'] + core, 'tests/slab_test': ['tests/slab_test.cc'] + core, 'tests/fstream_test': ['tests/fstream_test.cc'] + core, - 'tests/map_reduce_test': ['tests/map_reduce_test.cc'] + core, + 'tests/distributed_test': ['tests/distributed_test.cc'] + core, 'tests/rpc': ['tests/rpc.cc'] + core + libnet, } diff --git a/core/distributed.hh b/core/distributed.hh index f9e5002292..e49a0e105a 100644 --- a/core/distributed.hh +++ b/core/distributed.hh @@ -174,7 +174,7 @@ distributed::start(Args&&... args) { unsigned c = 0; return parallel_for_each(_instances.begin(), _instances.end(), [this, &c, args = std::make_tuple(std::forward(args)...)] (Service*& inst) mutable { - return smp::submit_to(c++, [&inst, args = std::move(args)] () mutable { + return smp::submit_to(c++, [&inst, args] () mutable { inst = apply([] (Args&&... args) { return new Service(std::forward(args)...); }, std::move(args)); diff --git a/core/slab.hh b/core/slab.hh index e661ef5cfc..c3e9db21bc 100644 --- a/core/slab.hh +++ b/core/slab.hh @@ -222,7 +222,7 @@ public: auto objects = max_object_size / _size; desc = new slab_page_desc(slab_page, objects, _size, _slab_class_id, slab_page_index); } catch (const std::bad_alloc& e) { - // FIXME: Is there really a need to re-throw std::bad_alloc? + ::free(slab_page); throw std::bad_alloc{}; } @@ -328,7 +328,7 @@ private: auto objects = _max_object_size / object_size; for (auto i = 0u; i < objects; i++, object += object_size) { if (!desc.empty()) { - // if binary_search returns false, it means that object at the current + // if binary_search returns true, it means that object at the current // offset isn't an item. if (std::binary_search(free_objects.begin(), free_objects.end(), object)) { continue; diff --git a/test.py b/test.py index ead7219589..ef180db1a3 100755 --- a/test.py +++ b/test.py @@ -75,7 +75,7 @@ if __name__ == "__main__": for test in boost_tests: test_to_run.append((os.path.join(prefix, test),'boost')) test_to_run.append(('tests/memcached/test.py --mode ' + mode + (' --fast' if args.fast else ''),'other')) - test_to_run.append((os.path.join(prefix, 'map_reduce_test') + ' -c 2','other')) + test_to_run.append((os.path.join(prefix, 'distributed_test') + ' -c 2','other')) allocator_test_path = os.path.join(prefix, 'allocator_test') diff --git a/tests/distributed_test.cc b/tests/distributed_test.cc new file mode 100644 index 0000000000..c80cb4941f --- /dev/null +++ b/tests/distributed_test.cc @@ -0,0 +1,95 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "core/app-template.hh" +#include "core/distributed.hh" +#include "core/future-util.hh" + +struct X { + sstring echo(sstring arg) { + return arg; + } + future<> stop() { return make_ready_future<>(); } +}; + +template +future<> do_with_distributed(Func&& func) { + auto x = make_shared>(); + return func(*x).finally([x] { + return x->stop(); + }).finally([x]{}); +} + +future<> test_that_each_core_gets_the_arguments() { + return do_with_distributed([] (auto& x) { + return x.start().then([&x] { + return x.map_reduce([] (sstring msg){ + if (msg != "hello") { + throw std::runtime_error("wrong message"); + } + }, &X::echo, sstring("hello")); + }); + }); +} + +future<> test_functor_version() { + return do_with_distributed([] (auto& x) { + return x.start().then([&x] { + return x.map_reduce([] (sstring msg){ + if (msg != "hello") { + throw std::runtime_error("wrong message"); + } + }, [] (X& x) { return x.echo("hello"); }); + }); + }); +} + +struct Y { + sstring s; + Y(sstring s) : s(std::move(s)) {} + future<> stop() { return make_ready_future<>(); } +}; + +future<> test_constructor_argument_is_passed_to_each_core() { + return do_with_distributed([] (auto& y) { + return y.start(sstring("hello")).then([&y] { + return y.invoke_on_all([] (Y& y) { + if (y.s != "hello") { + throw std::runtime_error(sprint("expected message mismatch, is \"%s\"", y.s)); + } + }); + }); + }); +} + +int main(int argc, char** argv) { + app_template app; + return app.run(argc, argv, [] { + test_that_each_core_gets_the_arguments().then([] { + return test_functor_version(); + }).then([] { + return test_constructor_argument_is_passed_to_each_core(); + }).then([] { + return engine().exit(0); + }).or_terminate(); + }); +} diff --git a/tests/map_reduce_test.cc b/tests/map_reduce_test.cc deleted file mode 100644 index 80056bc74d..0000000000 --- a/tests/map_reduce_test.cc +++ /dev/null @@ -1,71 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Copyright (C) 2015 Cloudius Systems, Ltd. - */ - -#include "core/app-template.hh" -#include "core/distributed.hh" -#include "core/future-util.hh" - -struct X { - sstring echo(sstring arg) { - return arg; - } - future<> stop() { return make_ready_future<>(); } -}; - -template -future<> do_with_distributed(Func&& func) { - auto x = make_shared>(); - return func(*x).finally([x] { - return x->stop(); - }).finally([x]{}); -} - -future<> test_that_each_core_gets_the_arguments() { - return do_with_distributed([] (auto& x) { - return x.map_reduce([] (sstring msg){ - if (msg != "hello") { - throw std::runtime_error("wrong message"); - } - }, &X::echo, sstring("hello")); - }); -} - -future<> test_functor_version() { - return do_with_distributed([] (auto& x) { - return x.map_reduce([] (sstring msg){ - if (msg != "hello") { - throw std::runtime_error("wrong message"); - } - }, [] (X& x) { return x.echo("hello"); }); - }); -} - -int main(int argc, char** argv) { - app_template app; - return app.run(argc, argv, [] { - test_that_each_core_gets_the_arguments().then([] { - return test_functor_version(); - }).then([] { - return engine().exit(0); - }).or_terminate(); - }); -}