From a2b54fc757e87b2f94792b7936c6b4e8f3b4720a Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 27 Jul 2015 17:31:23 +0800 Subject: [PATCH] main: Introduce init.cc to cleanup service startup code This patch introduce init.cc file which hosts all the initialization code. The benefits are 1) we can share initialization code with tests code. 2) all the service startup dependency / order code is in one single place instead of everywhere. --- configure.py | 1 + init.cc | 41 ++++++++++++++++++++++++++++++++++ init.hh | 10 +++++++++ main.cc | 4 ++-- message/messaging_service.cc | 43 ------------------------------------ tests/urchin/cql_test_env.cc | 4 ++-- 6 files changed, 56 insertions(+), 47 deletions(-) create mode 100644 init.cc create mode 100644 init.hh diff --git a/configure.py b/configure.py index 3bd12f1952..8d465273c0 100755 --- a/configure.py +++ b/configure.py @@ -328,6 +328,7 @@ urchin_core = (['database.cc', 'streaming/messages/incoming_file_message.cc', 'gc_clock.cc', 'partition_slice_builder.cc', + 'init.cc' ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/init.cc b/init.cc new file mode 100644 index 0000000000..84b93a1601 --- /dev/null +++ b/init.cc @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "init.hh" +#include "message/messaging_service.hh" +#include "gms/failure_detector.hh" +#include "gms/gossiper.hh" + +future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider) { + const gms::inet_address listen(listen_address); + // Init messaging_service + return net::get_messaging_service().start(listen).then([]{ + engine().at_exit([] { return net::get_messaging_service().stop(); }); + }).then([] { + // Init failure_detector + return gms::get_failure_detector().start().then([] { + engine().at_exit([]{ return gms::get_failure_detector().stop(); }); + }); + }).then([listen_address, seed_provider] { + // Init gossiper + std::set seeds; + if (seed_provider.parameters.count("seeds") > 0) { + size_t begin = 0; + size_t next = 0; + sstring seeds_str = seed_provider.parameters.find("seeds")->second; + while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) { + seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin))); + begin = next+1; + } + } + if (seeds.empty()) { + seeds.emplace(gms::inet_address("127.0.0.1")); + } + return gms::get_gossiper().start().then([seeds] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_seeds(seeds); + engine().at_exit([]{ return gms::get_gossiper().stop(); }); + }); + }); +} diff --git a/init.hh b/init.hh new file mode 100644 index 0000000000..9506ab53f0 --- /dev/null +++ b/init.hh @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ +#pragma once + +#include +#include +#include + +future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider); diff --git a/main.cc b/main.cc index f7ef5ae1c0..e86a0d9b7f 100644 --- a/main.cc +++ b/main.cc @@ -11,7 +11,6 @@ #include "http/httpd.hh" #include "api/api.hh" #include "db/config.hh" -#include "message/messaging_service.hh" #include "service/storage_service.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" @@ -19,6 +18,7 @@ #include "dns.hh" #include "log.hh" #include "debug.hh" +#include "init.hh" #include namespace bpo = boost::program_options; @@ -105,7 +105,7 @@ int main(int ac, char** av) { }); }); }).then([listen_address, seed_provider] { - return net::init_messaging_service(listen_address, seed_provider); + return init_ms_fd_gossiper(listen_address, seed_provider); }).then([&db] { return streaming::stream_session::init_streaming_service(db); }).then([&proxy, &db] { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index e1693c73ca..94eef27c7c 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,53 +135,10 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; - constexpr int32_t messaging_service::current_version; distributed _the_messaging_service; -future<> deinit_messaging_service() { - return gms::get_gossiper().stop().then([] { - return gms::get_failure_detector().stop(); - }).then([] { - return net::get_messaging_service().stop(); - }).then([]{ - return service::deinit_storage_service(); - }); -} - -future<> init_messaging_service(sstring listen_address, db::seed_provider_type seed_provider) { - const gms::inet_address listen(listen_address); - std::set seeds; - if (seed_provider.parameters.count("seeds") > 0) { - size_t begin = 0; - size_t next = 0; - sstring& seeds_str = seed_provider.parameters.find("seeds")->second; - while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) { - seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin))); - begin = next+1; - } - } - if (seeds.empty()) { - seeds.emplace(gms::inet_address("127.0.0.1")); - } - - engine().at_exit([]{ - return deinit_messaging_service(); - }); - - return net::get_messaging_service().start(listen).then([seeds] { - auto& ms = net::get_local_messaging_service(); - print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port()); - return gms::get_failure_detector().start().then([seeds] { - return gms::get_gossiper().start().then([seeds] { - auto& gossiper = gms::get_local_gossiper(); - gossiper.set_seeds(seeds); - }); - }); - }); -} - bool operator==(const shard_id& x, const shard_id& y) { return x.addr == y.addr && x.cpu_id == y.cpu_id ; } diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index 28afb908b6..83bb901d1e 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -13,6 +13,7 @@ #include "service/storage_service.hh" #include "db/config.hh" #include "schema_builder.hh" +#include "init.hh" class in_memory_cql_env : public cql_test_env { public: @@ -192,8 +193,7 @@ future<> init_once() { if (!done) { done = true; return service::init_storage_service().then([] { - return net::init_messaging_service("127.0.0.1", db::config::seed_provider_type()).then([] { - }); + return init_ms_fd_gossiper("127.0.0.1", db::config::seed_provider_type()); }); } else { return make_ready_future();