/* * Copyright 2014 Cloudius Systems */ #include "database.hh" #include "core/app-template.hh" #include "core/distributed.hh" #include "thrift/server.hh" #include "transport/server.hh" #include "http/httpd.hh" #include "api/api.hh" #include "db/config.hh" #include "message/messaging_service.hh" #include "gms/failure_detector.hh" #include "gms/gossiper.hh" #include "service/storage_service.hh" #include "dns.hh" namespace bpo = boost::program_options; static future<> read_config(bpo::variables_map& opts, db::config& cfg) { if (opts.count("options-file") == 0) { return make_ready_future<>(); } return cfg.read_from_file(opts["options-file"].as()); } future<> init_storage_service() { return service::get_storage_service().start().then([] { print("Start Storage service ...\n"); }); } future<> init_messaging_service(auto listen_address, auto seed_provider) { const gms::inet_address listen(listen_address); std::set seeds; if (seed_provider.parameters.count("seeds") > 0) { size_t begin = 0; size_t next = 0; sstring& seeds_str = seed_provider.parameters.find("seeds")->second; while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) { seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin))); begin = next+1; } } if (seeds.empty()) { seeds.emplace(gms::inet_address("127.0.0.1")); } return net::get_messaging_service().start(listen).then([seeds] { auto& ms = net::get_local_messaging_service(); print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port()); return gms::get_failure_detector().start().then([seeds] { return gms::get_gossiper().start().then([seeds] { auto& gossiper = gms::get_local_gossiper(); gossiper.set_seeds(seeds); auto& ss = service::get_local_storage_service(); return ss.init_server(); }); }); }); } int main(int ac, char** av) { app_template app; auto opt_add = app.add_options(); auto cfg = make_lw_shared(); cfg->add_options(opt_add) ("api-port", bpo::value()->default_value(10000), "Http Rest API port") // TODO : default, always read? ("options-file", bpo::value(), "cassandra.yaml file to read options from") ; auto server = std::make_unique>(); distributed db; distributed qp; service::storage_proxy proxy{db}; api::http_context ctx(db); return app.run(ac, av, [&] { auto&& opts = app.configuration(); return read_config(opts, *cfg).then([&cfg, &db, &qp, &proxy, &ctx, &server, &opts]() { uint16_t thrift_port = cfg->rpc_port(); uint16_t cql_port = cfg->native_transport_port(); uint16_t api_port = opts["api-port"].as(); sstring listen_address = cfg->listen_address(); sstring rpc_address = cfg->rpc_address(); auto seed_provider= cfg->seed_provider(); return db.start(std::move(*cfg)).then([&db, &qp, &proxy, &ctx, &server] { engine().at_exit([&db] { return db.stop(); }); return db.invoke_on_all(&database::init_from_data_directory); }).then([] { return init_storage_service(); }).then([listen_address, seed_provider] { return init_messaging_service(listen_address, seed_provider); }).then([&db, &proxy, &qp] { return qp.start(std::ref(proxy), std::ref(db)).then([&qp] { engine().at_exit([&qp] { return qp.stop(); }); }); }).then([rpc_address] { return dns::gethostbyname(rpc_address); }).then([&db, &proxy, &qp, cql_port, thrift_port] (dns::hostent e) { auto rpc_address = e.addresses[0].in.s_addr; auto cserver = new distributed; cserver->start(std::ref(proxy), std::ref(qp)).then([server = std::move(cserver), cql_port, rpc_address] () mutable { server->invoke_on_all(&cql_server::listen, ipv4_addr{rpc_address, cql_port}); }).then([cql_port] { std::cout << "CQL server listening on port " << cql_port << " ...\n"; }); auto tserver = new distributed; tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address] () mutable { server->invoke_on_all(&thrift_server::listen, ipv4_addr{rpc_address, thrift_port}); }).then([thrift_port] { std::cout << "Thrift server listening on port " << thrift_port << " ...\n"; }); }).then([&db, api_port, &ctx]{ ctx.http_server.start().then([api_port, &ctx] { return set_server(ctx); }).then([&ctx, api_port] { ctx.http_server.listen(api_port); }).then([api_port] { std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n"; }); }).or_terminate(); }); }); }