Merge "One stop per service" from Glauber

"Not all services define a stop method. All of them should."
This commit is contained in:
Avi Kivity
2015-07-23 18:55:44 +03:00
7 changed files with 28 additions and 0 deletions

View File

@@ -140,12 +140,18 @@ int main(int ac, char** av) {
auto rpc_address = e.addresses[0].in.s_addr;
auto cserver = new distributed<cql_server>;
cserver->start(std::ref(proxy), std::ref(qp)).then([server = std::move(cserver), cql_port, rpc_address] () mutable {
engine().at_exit([server] {
return server->stop();
});
server->invoke_on_all(&cql_server::listen, ipv4_addr{rpc_address, cql_port});
}).then([cql_port] {
std::cout << "CQL server listening on port " << cql_port << " ...\n";
});
auto tserver = new distributed<thrift_server>;
tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address] () mutable {
engine().at_exit([server] {
return server->stop();
});
server->invoke_on_all(&thrift_server::listen, ipv4_addr{rpc_address, thrift_port});
}).then([thrift_port] {
std::cout << "Thrift server listening on port " << thrift_port << " ...\n";

View File

@@ -16,6 +16,8 @@ namespace sstables {
future<> sstable::read_filter() {
auto ft = _filter_tracker;
return _filter_tracker->start(std::move(ft)).then([this] {
// FIXME: should stop this service. This one is definitely wrong to stop at_exit.
// We should use a Deleter class in lw_shared_ptr
if (!has_component(sstable::component_type::Filter)) {
_filter = std::make_unique<utils::filter::always_present_filter>();
return make_ready_future<>();

View File

@@ -36,6 +36,8 @@ class filter_tracker {
public:
filter_tracker(lw_shared_ptr<distributed<filter_tracker>>&& ptr) : _ptr(std::move(ptr)) {}
future<> stop() { return make_ready_future<>(); }
void add_false_positive() {
false_positive++;
}

View File

@@ -43,6 +43,15 @@ thrift_server::thrift_server(distributed<database>& db)
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) {
}
thrift_server::~thrift_server() {
}
// FIXME: this is here because we must have a stop function. But we should actually
// do something useful - or be sure it is not needed
future<> thrift_server::stop() {
return make_ready_future<>();
}
struct handler_deleter {
CassandraCobSvIfFactory* hf;
void operator()(CassandraCobSvIf* h) const {

View File

@@ -46,7 +46,9 @@ class thrift_server {
uint64_t _requests_served = 0;
public:
thrift_server(distributed<database>& db);
~thrift_server();
future<> listen(ipv4_addr addr);
future<> stop();
void do_accepts(int which);
class connection;
uint64_t total_connections() const;

View File

@@ -241,6 +241,12 @@ cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<c
{
}
// FIXME: this is here because we must have a stop function. But we should actually
// do something useful - or be sure it is not needed
future<> cql_server::stop() {
return make_ready_future<>();
}
future<>
cql_server::listen(ipv4_addr addr) {
listen_options lo;

View File

@@ -20,6 +20,7 @@ public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp);
future<> listen(ipv4_addr addr);
void do_accepts(int which);
future<> stop();
private:
class fmt_visitor;
class connection;