From bb87afcd1f849ebe749fe4a3328a9fd761cc9bae Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 14 May 2015 15:07:44 +0800 Subject: [PATCH] main: Wire up gossip and friends Integrate messaging_service and failure_detector and gossiper into urchin. To start a 3 nodes cluster on a single host: ./seastar --listen-address 127.0.0.1 --seed-provider-parameters 127.0.0.1 ./seastar --listen-address 127.0.0.2 --seed-provider-parameters 127.0.0.1 ./seastar --listen-address 127.0.0.3 --seed-provider-parameters 127.0.0.1 --- main.cc | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/main.cc b/main.cc index ba1fa107f9..1cca9c304e 100644 --- a/main.cc +++ b/main.cc @@ -11,6 +11,9 @@ #include "http/httpd.hh" #include "api/api.hh" #include "db/config.hh" +#include "message/messaging_service.hh" +#include "gms/failure_detector.hh" +#include "gms/gossiper.hh" namespace bpo = boost::program_options; @@ -44,6 +47,8 @@ int main(int ac, char** av) { uint16_t thrift_port = cfg->rpc_port(); uint16_t cql_port = cfg->native_transport_port(); uint16_t api_port = opts["api-port"].as(); + sstring listen_address = cfg->listen_address(); + auto seed_provider= cfg->seed_provider(); return read_config(opts, *cfg).then([cfg, &db]() { return db.start(std::move(*cfg)); @@ -67,6 +72,28 @@ int main(int ac, char** av) { }).then([thrift_port] { std::cout << "Thrift server listening on port " << thrift_port << " ...\n"; }); + }).then([listen_address, seed_provider] { + const gms::inet_address listen(listen_address); + std::set seeds; + for (auto& x : seed_provider.parameters) { + seeds.emplace(x.first); + } + net::get_messaging_service().start(listen).then([seeds] { + auto& ms = net::get_local_messaging_service(); + print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port()); + gms::get_failure_detector().start_single().then([seeds] { + gms::get_gossiper().start_single().then([seeds] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.set_seeds(seeds); + using namespace std::chrono; + auto now = high_resolution_clock::now().time_since_epoch(); + int generation_number = duration_cast(now).count(); + gossiper.start(generation_number).then([] { + print("Start gossiper service ...\n"); + }); + }); + }); + }); }).then([&db, api_port, &ctx]{ ctx.http_server.start().then([api_port, &ctx] { return set_server(ctx);