diff --git a/configure.py b/configure.py index 3bd12f1952..8d465273c0 100755 --- a/configure.py +++ b/configure.py @@ -328,6 +328,7 @@ urchin_core = (['database.cc', 'streaming/messages/incoming_file_message.cc', 'gc_clock.cc', 'partition_slice_builder.cc', + 'init.cc' ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] diff --git a/init.cc b/init.cc new file mode 100644 index 0000000000..84b93a1601 --- /dev/null +++ b/init.cc @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "init.hh" +#include "message/messaging_service.hh" +#include "gms/failure_detector.hh" +#include "gms/gossiper.hh" + +future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider) { + const gms::inet_address listen(listen_address); + // Init messaging_service + return net::get_messaging_service().start(listen).then([]{ + engine().at_exit([] { return net::get_messaging_service().stop(); }); + }).then([] { + // Init failure_detector + return gms::get_failure_detector().start().then([] { + engine().at_exit([]{ return gms::get_failure_detector().stop(); }); + }); + }).then([listen_address, seed_provider] { + // Init gossiper + std::set seeds; + if (seed_provider.parameters.count("seeds") > 0) { + size_t begin = 0; + size_t next = 0; + sstring seeds_str = seed_provider.parameters.find("seeds")->second; + while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) { + seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin))); + begin = next+1; + } + } + if (seeds.empty()) { + seeds.emplace(gms::inet_address("127.0.0.1")); + } + return gms::get_gossiper().start().then([seeds] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_seeds(seeds); + engine().at_exit([]{ return gms::get_gossiper().stop(); }); + }); + }); +} diff --git a/init.hh b/init.hh new file mode 100644 index 0000000000..9506ab53f0 --- /dev/null +++ b/init.hh @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ +#pragma once + +#include +#include +#include + +future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider); diff --git a/main.cc b/main.cc index f7ef5ae1c0..e86a0d9b7f 100644 --- a/main.cc +++ b/main.cc @@ -11,7 +11,6 @@ #include "http/httpd.hh" #include "api/api.hh" #include "db/config.hh" -#include "message/messaging_service.hh" #include "service/storage_service.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" @@ -19,6 +18,7 @@ #include "dns.hh" #include "log.hh" #include "debug.hh" +#include "init.hh" #include namespace bpo = boost::program_options; @@ -105,7 +105,7 @@ int main(int ac, char** av) { }); }); }).then([listen_address, seed_provider] { - return net::init_messaging_service(listen_address, seed_provider); + return init_ms_fd_gossiper(listen_address, seed_provider); }).then([&db] { return streaming::stream_session::init_streaming_service(db); }).then([&proxy, &db] { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index e1693c73ca..94eef27c7c 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,53 +135,10 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; - constexpr int32_t messaging_service::current_version; distributed _the_messaging_service; -future<> deinit_messaging_service() { - return gms::get_gossiper().stop().then([] { - return gms::get_failure_detector().stop(); - }).then([] { - return net::get_messaging_service().stop(); - }).then([]{ - return service::deinit_storage_service(); - }); -} - -future<> init_messaging_service(sstring listen_address, db::seed_provider_type seed_provider) { - const gms::inet_address listen(listen_address); - std::set seeds; - if (seed_provider.parameters.count("seeds") > 0) { - size_t begin = 0; - size_t next = 0; - sstring& seeds_str = seed_provider.parameters.find("seeds")->second; - while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) { - seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin))); - begin = next+1; - } - } - if (seeds.empty()) { - seeds.emplace(gms::inet_address("127.0.0.1")); - } - - engine().at_exit([]{ - return deinit_messaging_service(); - }); - - return net::get_messaging_service().start(listen).then([seeds] { - auto& ms = net::get_local_messaging_service(); - print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port()); - return gms::get_failure_detector().start().then([seeds] { - return gms::get_gossiper().start().then([seeds] { - auto& gossiper = gms::get_local_gossiper(); - gossiper.set_seeds(seeds); - }); - }); - }); -} - bool operator==(const shard_id& x, const shard_id& y) { return x.addr == y.addr && x.cpu_id == y.cpu_id ; } diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index 28afb908b6..83bb901d1e 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -13,6 +13,7 @@ #include "service/storage_service.hh" #include "db/config.hh" #include "schema_builder.hh" +#include "init.hh" class in_memory_cql_env : public cql_test_env { public: @@ -192,8 +193,7 @@ future<> init_once() { if (!done) { done = true; return service::init_storage_service().then([] { - return net::init_messaging_service("127.0.0.1", db::config::seed_provider_type()).then([] { - }); + return init_ms_fd_gossiper("127.0.0.1", db::config::seed_provider_type()); }); } else { return make_ready_future();