Merge branch 'master' of github.com:cloudius-systems/urchin into db

This commit is contained in:
Avi Kivity
2015-06-15 11:00:47 +03:00
2 changed files with 55 additions and 49 deletions

View File

@@ -130,7 +130,7 @@ std::basic_ostream<Args...> & operator<<(std::basic_ostream<Args...> & os, const
int n = 0;
for (auto& e : map) {
if (n > 0) {
os << ",";
os << ":";
}
os << e.first << "=" << e.second;
}
@@ -142,9 +142,9 @@ std::basic_istream<Args...> & operator>>(std::basic_istream<Args...> & is, db::c
std::string str;
is >> str;
std::regex comma(",");
std::regex colon(":");
std::sregex_token_iterator s(str.begin(), str.end(), comma, -1);
std::sregex_token_iterator s(str.begin(), str.end(), colon, -1);
std::sregex_token_iterator e;
while (s != e) {
sstring p = std::string(*s++);

98
main.cc
View File

@@ -36,8 +36,14 @@ future<> init_storage_service() {
future<> init_messaging_service(auto listen_address, auto seed_provider) {
const gms::inet_address listen(listen_address);
std::set<gms::inet_address> seeds;
for (auto& x : seed_provider.parameters) {
seeds.emplace(x.first);
if (seed_provider.parameters.count("seeds") > 0) {
size_t begin = 0;
size_t next = 0;
sstring& seeds_str = seed_provider.parameters.find("seeds")->second;
while (begin < seeds_str.length() && begin != (next=seeds_str.find(",",begin))) {
seeds.emplace(gms::inet_address(seeds_str.substr(begin,next-begin)));
begin = next+1;
}
}
if (seeds.empty()) {
seeds.emplace(gms::inet_address("127.0.0.1"));
@@ -75,50 +81,50 @@ int main(int ac, char** av) {
return app.run(ac, av, [&] {
auto&& opts = app.configuration();
uint16_t thrift_port = cfg->rpc_port();
uint16_t cql_port = cfg->native_transport_port();
uint16_t api_port = opts["api-port"].as<uint16_t>();
sstring listen_address = cfg->listen_address();
sstring rpc_address = cfg->rpc_address();
auto seed_provider= cfg->seed_provider();
return read_config(opts, *cfg).then([cfg, &db]() {
return db.start(std::move(*cfg));
}).then([&db] {
engine().at_exit([&db] { return db.stop(); });
return db.invoke_on_all(&database::init_from_data_directory);
}).then([] {
return init_storage_service();
}).then([listen_address, seed_provider] {
return init_messaging_service(listen_address, seed_provider);
}).then([&db, &proxy, &qp] {
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
engine().at_exit([&qp] { return qp.stop(); });
});
}).then([rpc_address] {
return dns::gethostbyname(rpc_address);
}).then([&db, &proxy, &qp, cql_port, thrift_port] (dns::hostent e) {
auto rpc_address = e.addresses[0].in.s_addr;
auto cserver = new distributed<cql_server>;
cserver->start(std::ref(proxy), std::ref(qp)).then([server = std::move(cserver), cql_port, rpc_address] () mutable {
server->invoke_on_all(&cql_server::listen, ipv4_addr{rpc_address, cql_port});
}).then([cql_port] {
std::cout << "CQL server listening on port " << cql_port << " ...\n";
});
auto tserver = new distributed<thrift_server>;
tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address] () mutable {
server->invoke_on_all(&thrift_server::listen, ipv4_addr{rpc_address, thrift_port});
}).then([thrift_port] {
std::cout << "Thrift server listening on port " << thrift_port << " ...\n";
});
}).then([&db, api_port, &ctx]{
ctx.http_server.start().then([api_port, &ctx] {
return set_server(ctx);
}).then([&ctx, api_port] {
ctx.http_server.listen(api_port);
}).then([api_port] {
std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n";
});
}).or_terminate();
return read_config(opts, *cfg).then([&cfg, &db, &qp, &proxy, &ctx, &server, &opts]() {
uint16_t thrift_port = cfg->rpc_port();
uint16_t cql_port = cfg->native_transport_port();
uint16_t api_port = opts["api-port"].as<uint16_t>();
sstring listen_address = cfg->listen_address();
sstring rpc_address = cfg->rpc_address();
auto seed_provider= cfg->seed_provider();
return db.start(std::move(*cfg)).then([&db, &qp, &proxy, &ctx, &server] {
engine().at_exit([&db] { return db.stop(); });
return db.invoke_on_all(&database::init_from_data_directory);
}).then([] {
return init_storage_service();
}).then([listen_address, seed_provider] {
return init_messaging_service(listen_address, seed_provider);
}).then([&db, &proxy, &qp] {
return qp.start(std::ref(proxy), std::ref(db)).then([&qp] {
engine().at_exit([&qp] { return qp.stop(); });
});
}).then([rpc_address] {
return dns::gethostbyname(rpc_address);
}).then([&db, &proxy, &qp, cql_port, thrift_port] (dns::hostent e) {
auto rpc_address = e.addresses[0].in.s_addr;
auto cserver = new distributed<cql_server>;
cserver->start(std::ref(proxy), std::ref(qp)).then([server = std::move(cserver), cql_port, rpc_address] () mutable {
server->invoke_on_all(&cql_server::listen, ipv4_addr{rpc_address, cql_port});
}).then([cql_port] {
std::cout << "CQL server listening on port " << cql_port << " ...\n";
});
auto tserver = new distributed<thrift_server>;
tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address] () mutable {
server->invoke_on_all(&thrift_server::listen, ipv4_addr{rpc_address, thrift_port});
}).then([thrift_port] {
std::cout << "Thrift server listening on port " << thrift_port << " ...\n";
});
}).then([&db, api_port, &ctx]{
ctx.http_server.start().then([api_port, &ctx] {
return set_server(ctx);
}).then([&ctx, api_port] {
ctx.http_server.listen(api_port);
}).then([api_port] {
std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n";
});
}).or_terminate();
});
});
}