diff --git a/main.cc b/main.cc index 664b572c13..1c78434426 100644 --- a/main.cc +++ b/main.cc @@ -140,12 +140,18 @@ int main(int ac, char** av) { auto rpc_address = e.addresses[0].in.s_addr; auto cserver = new distributed; cserver->start(std::ref(proxy), std::ref(qp)).then([server = std::move(cserver), cql_port, rpc_address] () mutable { + engine().at_exit([server] { + return server->stop(); + }); server->invoke_on_all(&cql_server::listen, ipv4_addr{rpc_address, cql_port}); }).then([cql_port] { std::cout << "CQL server listening on port " << cql_port << " ...\n"; }); auto tserver = new distributed; tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port, rpc_address] () mutable { + engine().at_exit([server] { + return server->stop(); + }); server->invoke_on_all(&thrift_server::listen, ipv4_addr{rpc_address, thrift_port}); }).then([thrift_port] { std::cout << "Thrift server listening on port " << thrift_port << " ...\n"; diff --git a/sstables/filter.cc b/sstables/filter.cc index 182d12490f..e750ffca20 100644 --- a/sstables/filter.cc +++ b/sstables/filter.cc @@ -16,6 +16,8 @@ namespace sstables { future<> sstable::read_filter() { auto ft = _filter_tracker; return _filter_tracker->start(std::move(ft)).then([this] { + // FIXME: should stop this service. This one is definitely wrong to stop at_exit. + // We should use a Deleter class in lw_shared_ptr if (!has_component(sstable::component_type::Filter)) { _filter = std::make_unique(); return make_ready_future<>(); diff --git a/sstables/filter.hh b/sstables/filter.hh index 8b6c952d4e..5f4db86f7e 100644 --- a/sstables/filter.hh +++ b/sstables/filter.hh @@ -36,6 +36,8 @@ class filter_tracker { public: filter_tracker(lw_shared_ptr>&& ptr) : _ptr(std::move(ptr)) {} + future<> stop() { return make_ready_future<>(); } + void add_false_positive() { false_positive++; } diff --git a/thrift/server.cc b/thrift/server.cc index 0188673c93..3d137aeb5f 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -43,6 +43,15 @@ thrift_server::thrift_server(distributed& db) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) { } +thrift_server::~thrift_server() { +} + +// FIXME: this is here because we must have a stop function. But we should actually +// do something useful - or be sure it is not needed +future<> thrift_server::stop() { + return make_ready_future<>(); +} + struct handler_deleter { CassandraCobSvIfFactory* hf; void operator()(CassandraCobSvIf* h) const { diff --git a/thrift/server.hh b/thrift/server.hh index 0a4c68da28..2586017c96 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -46,7 +46,9 @@ class thrift_server { uint64_t _requests_served = 0; public: thrift_server(distributed& db); + ~thrift_server(); future<> listen(ipv4_addr addr); + future<> stop(); void do_accepts(int which); class connection; uint64_t total_connections() const; diff --git a/transport/server.cc b/transport/server.cc index 08d55d876a..069047f6cb 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -241,6 +241,12 @@ cql_server::cql_server(distributed& proxy, distributed cql_server::stop() { + return make_ready_future<>(); +} + future<> cql_server::listen(ipv4_addr addr) { listen_options lo; diff --git a/transport/server.hh b/transport/server.hh index da7255b962..a373bbf0dc 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -20,6 +20,7 @@ public: cql_server(distributed& proxy, distributed& qp); future<> listen(ipv4_addr addr); void do_accepts(int which); + future<> stop(); private: class fmt_visitor; class connection;