main: Introduce init.cc to cleanup service startup code

This patch introduce init.cc file which hosts all the initialization
code. The benefits are 1) we can share initialization code with tests
code. 2) all the service startup dependency / order code is in one
single place instead of everywhere.
This commit is contained in:
Asias He
2015-07-27 17:31:23 +08:00
parent 9990191666
commit a2b54fc757
6 changed files with 56 additions and 47 deletions

View File

@@ -328,6 +328,7 @@ urchin_core = (['database.cc',
'streaming/messages/incoming_file_message.cc',
'gc_clock.cc',
'partition_slice_builder.cc',
'init.cc'
]
+ [Antlr3Grammar('cql3/Cql.g')]
+ [Thrift('interface/cassandra.thrift', 'Cassandra')]

41
init.cc Normal file
View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "init.hh"
#include "message/messaging_service.hh"
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider) {
const gms::inet_address listen(listen_address);
// Init messaging_service
return net::get_messaging_service().start(listen).then([]{
engine().at_exit([] { return net::get_messaging_service().stop(); });
}).then([] {
// Init failure_detector
return gms::get_failure_detector().start().then([] {
engine().at_exit([]{ return gms::get_failure_detector().stop(); });
});
}).then([listen_address, seed_provider] {
// Init gossiper
std::set<gms::inet_address> seeds;
if (seed_provider.parameters.count("seeds") > 0) {
size_t begin = 0;
size_t next = 0;
sstring seeds_str = seed_provider.parameters.find("seeds")->second;
while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) {
seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin)));
begin = next+1;
}
}
if (seeds.empty()) {
seeds.emplace(gms::inet_address("127.0.0.1"));
}
return gms::get_gossiper().start().then([seeds] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(seeds);
engine().at_exit([]{ return gms::get_gossiper().stop(); });
});
});
}

10
init.hh Normal file
View File

@@ -0,0 +1,10 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <db/config.hh>
future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider);

View File

@@ -11,7 +11,6 @@
#include "http/httpd.hh"
#include "api/api.hh"
#include "db/config.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "streaming/stream_session.hh"
#include "db/system_keyspace.hh"
@@ -19,6 +18,7 @@
#include "dns.hh"
#include "log.hh"
#include "debug.hh"
#include "init.hh"
#include <cstdio>
namespace bpo = boost::program_options;
@@ -105,7 +105,7 @@ int main(int ac, char** av) {
});
});
}).then([listen_address, seed_provider] {
return net::init_messaging_service(listen_address, seed_provider);
return init_ms_fd_gossiper(listen_address, seed_provider);
}).then([&db] {
return streaming::stream_session::init_streaming_service(db);
}).then([&proxy, &db] {

View File

@@ -135,53 +135,10 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
struct messaging_service::rpc_protocol_client_wrapper : public rpc_protocol::client { using rpc_protocol::client::client; };
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
constexpr int32_t messaging_service::current_version;
distributed<messaging_service> _the_messaging_service;
future<> deinit_messaging_service() {
return gms::get_gossiper().stop().then([] {
return gms::get_failure_detector().stop();
}).then([] {
return net::get_messaging_service().stop();
}).then([]{
return service::deinit_storage_service();
});
}
future<> init_messaging_service(sstring listen_address, db::seed_provider_type seed_provider) {
const gms::inet_address listen(listen_address);
std::set<gms::inet_address> seeds;
if (seed_provider.parameters.count("seeds") > 0) {
size_t begin = 0;
size_t next = 0;
sstring& seeds_str = seed_provider.parameters.find("seeds")->second;
while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) {
seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin)));
begin = next+1;
}
}
if (seeds.empty()) {
seeds.emplace(gms::inet_address("127.0.0.1"));
}
engine().at_exit([]{
return deinit_messaging_service();
});
return net::get_messaging_service().start(listen).then([seeds] {
auto& ms = net::get_local_messaging_service();
print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port());
return gms::get_failure_detector().start().then([seeds] {
return gms::get_gossiper().start().then([seeds] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(seeds);
});
});
});
}
bool operator==(const shard_id& x, const shard_id& y) {
return x.addr == y.addr && x.cpu_id == y.cpu_id ;
}

View File

@@ -13,6 +13,7 @@
#include "service/storage_service.hh"
#include "db/config.hh"
#include "schema_builder.hh"
#include "init.hh"
class in_memory_cql_env : public cql_test_env {
public:
@@ -192,8 +193,7 @@ future<> init_once() {
if (!done) {
done = true;
return service::init_storage_service().then([] {
return net::init_messaging_service("127.0.0.1", db::config::seed_provider_type()).then([] {
});
return init_ms_fd_gossiper("127.0.0.1", db::config::seed_provider_type());
});
} else {
return make_ready_future();