From 8218ab792268ea1bea596353362dffcee31ad76a Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 26 Oct 2015 20:37:19 +0800 Subject: [PATCH] storage_service: Implement start_native_transport and start_rpc_server They are used for APIs. Share the code in main.cc as well. --- main.cc | 34 +++-------------- service/storage_service.cc | 75 +++++++++++++++++++++++++------------- service/storage_service.hh | 7 ++++ 3 files changed, 63 insertions(+), 53 deletions(-) diff --git a/main.cc b/main.cc index 7d20227500..118eff2e63 100644 --- a/main.cc +++ b/main.cc @@ -196,10 +196,7 @@ int main(int ac, char** av) { cfg->log_to_stdout(), cfg->log_to_syslog()); dht::set_global_partitioner(cfg->partitioner()); auto start_thrift = cfg->start_rpc(); - uint16_t thrift_port = cfg->rpc_port(); - uint16_t cql_port = cfg->native_transport_port(); uint16_t api_port = cfg->api_port(); - transport::cql_load_balance lb = transport::parse_load_balance(cfg->load_balance()); ctx.api_dir = cfg->api_ui_dir(); ctx.api_doc = cfg->api_doc_dir(); sstring cluster_name = cfg->cluster_name(); @@ -329,32 +326,13 @@ int main(int ac, char** av) { lb->start_broadcasting(); service::get_local_storage_service().set_load_broadcaster(lb); engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); }); - }).then([rpc_address] { - return dns::gethostbyname(rpc_address); - }).then([&db, &proxy, &qp, rpc_address, cql_port, thrift_port, start_thrift, lb] (dns::hostent e) { - auto ip = e.addresses[0].in.s_addr; - auto cserver = new distributed; - cserver->start(std::ref(proxy), std::ref(qp), lb).then([server = std::move(cserver), cql_port, rpc_address, ip] () mutable { - // #293 - do not stop anything - //engine().at_exit([server] { - // return server->stop(); - //}); - server->invoke_on_all(&transport::cql_server::listen, ipv4_addr{ip, cql_port}); - }).then([rpc_address, cql_port] { - print("Starting listening for CQL clients on %s:%s...\n", rpc_address, cql_port); + }).then([start_thrift] () { + return service::get_local_storage_service().start_native_transport().then([start_thrift] () { + if (start_thrift) { + return service::get_local_storage_service().start_rpc_server(); + } + return make_ready_future<>(); }); - if (start_thrift) { - auto tserver = new distributed; - tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address, ip] () mutable { - // #293 - do not stop anything - //engine().at_exit([server] { - // return server->stop(); - //}); - server->invoke_on_all(&thrift_server::listen, ipv4_addr{ip, thrift_port}); - }).then([rpc_address, thrift_port] { - print("Thrift server listening on %s:%s ...\n", rpc_address, thrift_port); - }); - } }).then([api_address] { return dns::gethostbyname(api_address); }).then([&db, api_address, api_port, &ctx] (dns::hostent e){ diff --git a/service/storage_service.cc b/service/storage_service.cc index 3849e2f10e..09171e8a17 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -61,6 +61,9 @@ #include #include #include "service/load_broadcaster.hh" +#include "thrift/server.hh" +#include "transport/server.hh" +#include "dns.hh" using token = dht::token; using UUID = utils::UUID; @@ -1438,15 +1441,30 @@ future storage_service::true_snapshots_size() { } future<> storage_service::start_rpc_server() { - fail(unimplemented::cause::STORAGE_SERVICE); -#if 0 - if (daemon == null) - { - throw new IllegalStateException("No configured daemon"); - } - daemon.thriftServer.start(); -#endif - return make_ready_future<>(); + return get_storage_service().invoke_on(0, [] (storage_service& ss) { + if (ss._thrift_server) { + return make_ready_future<>(); + } + + auto tserver = make_shared>(); + ss._thrift_server = tserver; + + auto& cfg = ss._db.local().get_config(); + auto port = cfg.rpc_port(); + auto addr = cfg.rpc_address(); + return dns::gethostbyname(addr).then([&ss, tserver, addr, port] (dns::hostent e) { + auto ip = e.addresses[0].in.s_addr; + return tserver->start(std::ref(ss._db)).then([tserver, port, addr, ip] { + // #293 - do not stop anything + //engine().at_exit([tserver] { + // return tserver->stop(); + //}); + return tserver->invoke_on_all(&thrift_server::listen, ipv4_addr{ip, port}); + }); + }).then([addr, port] { + print("Thrift server listening on %s:%s ...\n", addr, port); + }); + }); } future<> storage_service::stop_rpc_server() { @@ -1478,23 +1496,30 @@ bool storage_service::is_rpc_server_running() { } future<> storage_service::start_native_transport() { - fail(unimplemented::cause::STORAGE_SERVICE); -#if 0 - if (daemon == null) - { - throw new IllegalStateException("No configured daemon"); - } + return get_storage_service().invoke_on(0, [] (storage_service& ss) { + if (ss._cql_server) { + return make_ready_future<>(); + } + auto cserver = make_shared>(); + ss._cql_server = cserver; - try - { - daemon.nativeServer.start(); - } - catch (Exception e) - { - throw new RuntimeException("Error starting native transport: " + e.getMessage()); - } -#endif - return make_ready_future<>(); + auto& cfg = ss._db.local().get_config(); + auto port = cfg.native_transport_port(); + auto addr = cfg.rpc_address(); + transport::cql_load_balance lb = transport::parse_load_balance(cfg.load_balance()); + return dns::gethostbyname(addr).then([cserver, addr, port, lb] (dns::hostent e) { + auto ip = e.addresses[0].in.s_addr; + return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb).then([cserver, port, addr, ip] { + // #293 - do not stop anything + //engine().at_exit([cserver] { + // return cserver->stop(); + //}); + return cserver->invoke_on_all(&transport::cql_server::listen, ipv4_addr{ip, port}); + }); + }).then([addr, port] { + print("Starting listening for CQL clients on %s:%s...\n", addr, port); + }); + }); } future<> storage_service::stop_native_transport() { diff --git a/service/storage_service.hh b/service/storage_service.hh index ae44e0d455..8a2e8f7836 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -56,6 +56,11 @@ #include #include "streaming/stream_state.hh" +namespace transport { + class cql_server; +} +class thrift_server; + namespace service { class load_broadcaster; @@ -96,6 +101,8 @@ private: // ever arise. bool _loading_new_sstables = false; shared_ptr _lb; + shared_ptr> _cql_server; + shared_ptr> _thrift_server; public: storage_service(distributed& db) : _db(db) {