storage_service: Implement start_native_transport and start_rpc_server

They are used for APIs. Share the code in main.cc as well.
This commit is contained in:
Asias He
2015-10-26 20:37:19 +08:00
parent 3c47844e8c
commit 8218ab7922
3 changed files with 63 additions and 53 deletions

34
main.cc
View File

@@ -196,10 +196,7 @@ int main(int ac, char** av) {
cfg->log_to_stdout(), cfg->log_to_syslog());
dht::set_global_partitioner(cfg->partitioner());
auto start_thrift = cfg->start_rpc();
uint16_t thrift_port = cfg->rpc_port();
uint16_t cql_port = cfg->native_transport_port();
uint16_t api_port = cfg->api_port();
transport::cql_load_balance lb = transport::parse_load_balance(cfg->load_balance());
ctx.api_dir = cfg->api_ui_dir();
ctx.api_doc = cfg->api_doc_dir();
sstring cluster_name = cfg->cluster_name();
@@ -329,32 +326,13 @@ int main(int ac, char** av) {
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
}).then([rpc_address] {
return dns::gethostbyname(rpc_address);
}).then([&db, &proxy, &qp, rpc_address, cql_port, thrift_port, start_thrift, lb] (dns::hostent e) {
auto ip = e.addresses[0].in.s_addr;
auto cserver = new distributed<transport::cql_server>;
cserver->start(std::ref(proxy), std::ref(qp), lb).then([server = std::move(cserver), cql_port, rpc_address, ip] () mutable {
// #293 - do not stop anything
//engine().at_exit([server] {
// return server->stop();
//});
server->invoke_on_all(&transport::cql_server::listen, ipv4_addr{ip, cql_port});
}).then([rpc_address, cql_port] {
print("Starting listening for CQL clients on %s:%s...\n", rpc_address, cql_port);
}).then([start_thrift] () {
return service::get_local_storage_service().start_native_transport().then([start_thrift] () {
if (start_thrift) {
return service::get_local_storage_service().start_rpc_server();
}
return make_ready_future<>();
});
if (start_thrift) {
auto tserver = new distributed<thrift_server>;
tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address, ip] () mutable {
// #293 - do not stop anything
//engine().at_exit([server] {
// return server->stop();
//});
server->invoke_on_all(&thrift_server::listen, ipv4_addr{ip, thrift_port});
}).then([rpc_address, thrift_port] {
print("Thrift server listening on %s:%s ...\n", rpc_address, thrift_port);
});
}
}).then([api_address] {
return dns::gethostbyname(api_address);
}).then([&db, api_address, api_port, &ctx] (dns::hostent e){

View File

@@ -61,6 +61,9 @@
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include "service/load_broadcaster.hh"
#include "thrift/server.hh"
#include "transport/server.hh"
#include "dns.hh"
using token = dht::token;
using UUID = utils::UUID;
@@ -1438,15 +1441,30 @@ future<int64_t> storage_service::true_snapshots_size() {
}
future<> storage_service::start_rpc_server() {
fail(unimplemented::cause::STORAGE_SERVICE);
#if 0
if (daemon == null)
{
throw new IllegalStateException("No configured daemon");
}
daemon.thriftServer.start();
#endif
return make_ready_future<>();
return get_storage_service().invoke_on(0, [] (storage_service& ss) {
if (ss._thrift_server) {
return make_ready_future<>();
}
auto tserver = make_shared<distributed<thrift_server>>();
ss._thrift_server = tserver;
auto& cfg = ss._db.local().get_config();
auto port = cfg.rpc_port();
auto addr = cfg.rpc_address();
return dns::gethostbyname(addr).then([&ss, tserver, addr, port] (dns::hostent e) {
auto ip = e.addresses[0].in.s_addr;
return tserver->start(std::ref(ss._db)).then([tserver, port, addr, ip] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();
//});
return tserver->invoke_on_all(&thrift_server::listen, ipv4_addr{ip, port});
});
}).then([addr, port] {
print("Thrift server listening on %s:%s ...\n", addr, port);
});
});
}
future<> storage_service::stop_rpc_server() {
@@ -1478,23 +1496,30 @@ bool storage_service::is_rpc_server_running() {
}
future<> storage_service::start_native_transport() {
fail(unimplemented::cause::STORAGE_SERVICE);
#if 0
if (daemon == null)
{
throw new IllegalStateException("No configured daemon");
}
return get_storage_service().invoke_on(0, [] (storage_service& ss) {
if (ss._cql_server) {
return make_ready_future<>();
}
auto cserver = make_shared<distributed<transport::cql_server>>();
ss._cql_server = cserver;
try
{
daemon.nativeServer.start();
}
catch (Exception e)
{
throw new RuntimeException("Error starting native transport: " + e.getMessage());
}
#endif
return make_ready_future<>();
auto& cfg = ss._db.local().get_config();
auto port = cfg.native_transport_port();
auto addr = cfg.rpc_address();
transport::cql_load_balance lb = transport::parse_load_balance(cfg.load_balance());
return dns::gethostbyname(addr).then([cserver, addr, port, lb] (dns::hostent e) {
auto ip = e.addresses[0].in.s_addr;
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), lb).then([cserver, port, addr, ip] {
// #293 - do not stop anything
//engine().at_exit([cserver] {
// return cserver->stop();
//});
return cserver->invoke_on_all(&transport::cql_server::listen, ipv4_addr{ip, port});
});
}).then([addr, port] {
print("Starting listening for CQL clients on %s:%s...\n", addr, port);
});
});
}
future<> storage_service::stop_native_transport() {

View File

@@ -56,6 +56,11 @@
#include <seastar/core/distributed.hh>
#include "streaming/stream_state.hh"
namespace transport {
class cql_server;
}
class thrift_server;
namespace service {
class load_broadcaster;
@@ -96,6 +101,8 @@ private:
// ever arise.
bool _loading_new_sstables = false;
shared_ptr<load_broadcaster> _lb;
shared_ptr<distributed<transport::cql_server>> _cql_server;
shared_ptr<distributed<thrift_server>> _thrift_server;
public:
storage_service(distributed<database>& db)
: _db(db) {