From c653cc1910fbce2be7d6e692ac1787afb61d7b85 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:35:22 +0300 Subject: [PATCH 01/13] posix: wire up shutdown() API --- core/posix.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/posix.hh b/core/posix.hh index 788b3835aa..d226592673 100644 --- a/core/posix.hh +++ b/core/posix.hh @@ -117,6 +117,10 @@ public: throw_system_error_on(ret == -1); return file_desc(ret); } + void shutdown(int how) { + auto ret = ::shutdown(_fd, how); + throw_system_error_on(ret == -1); + } void truncate(size_t size) { auto ret = ::ftruncate(_fd, size); throw_system_error_on(ret); From 1b99b1f170e89f1408e4c8bb067b99c278037e44 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:35:48 +0300 Subject: [PATCH 02/13] queue: add method to abort a queue Destroy all queued objects, blow waiting readers or writers with an exception. --- core/queue.hh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/queue.hh b/core/queue.hh index d1f6d09670..c2c3739201 100644 --- a/core/queue.hh +++ b/core/queue.hh @@ -76,6 +76,22 @@ public: future<> push_eventually(T&& data); size_t size() const { return _q.size(); } + + // Destroy any items in the queue, and pass the provided exception to any + // waiting readers or writers. + void abort(std::exception_ptr ex) { + while (!_q.empty()) { + _q.pop(); + } + if (_not_full) { + _not_full->set_exception(ex); + _not_full= std::experimental::nullopt; + } + if (_not_empty) { + _not_empty->set_exception(std::move(ex)); + _not_empty = std::experimental::nullopt; + } + } }; template From 2a24e8f5caa0b873b6b127c03cc28f50ad7c87a2 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:37:43 +0300 Subject: [PATCH 03/13] reactor: add methods to abort futures waiting on fd event If they're waiting, blow them away with an exception, and unsubscribe the events from epoll. --- core/reactor.cc | 26 ++++++++++++++++++++++++++ core/reactor.hh | 24 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/core/reactor.cc b/core/reactor.cc index 4d0b084f67..f035caf64b 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -281,6 +281,24 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, return (pfd.*pr).get_future(); } +void reactor_backend_epoll::abort_fd(pollable_fd_state& pfd, std::exception_ptr ex, + promise<> pollable_fd_state::* pr, int event) { + if (pfd.events_epoll & event) { + pfd.events_epoll &= ~event; + auto ctl = pfd.events_epoll ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + ::epoll_event eevt; + eevt.events = pfd.events_epoll; + eevt.data.ptr = &pfd; + int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt); + assert(r == 0); + } + if (pfd.events_requested & event) { + pfd.events_requested &= ~event; + (pfd.*pr).set_exception(std::move(ex)); + } + pfd.events_known &= ~event; +} + future<> reactor_backend_epoll::readable(pollable_fd_state& fd) { return get_epoll_future(fd, &pollable_fd_state::pollin, EPOLLIN); } @@ -289,6 +307,14 @@ future<> reactor_backend_epoll::writeable(pollable_fd_state& fd) { return get_epoll_future(fd, &pollable_fd_state::pollout, EPOLLOUT); } +void reactor_backend_epoll::abort_reader(pollable_fd_state& fd, std::exception_ptr ex) { + abort_fd(fd, std::move(ex), &pollable_fd_state::pollin, EPOLLIN); +} + +void reactor_backend_epoll::abort_writer(pollable_fd_state& fd, std::exception_ptr ex) { + abort_fd(fd, std::move(ex), &pollable_fd_state::pollout, EPOLLOUT); +} + void reactor_backend_epoll::forget(pollable_fd_state& fd) { if (fd.events_epoll) { ::epoll_ctl(_epollfd.get(), EPOLL_CTL_DEL, fd.fd.get(), nullptr); diff --git a/core/reactor.hh b/core/reactor.hh index 0a76a2da9c..1fd58f025b 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -167,6 +167,8 @@ public: future<> write_all(net::packet& p); future<> readable(); future<> writeable(); + void abort_reader(std::exception_ptr ex); + void abort_writer(std::exception_ptr ex); future accept(); future sendmsg(struct msghdr *msg); future recvmsg(struct msghdr *msg); @@ -566,6 +568,8 @@ private: promise<> pollable_fd_state::* pr, int event); void complete_epoll_event(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int events, int event); + void abort_fd(pollable_fd_state& fd, std::exception_ptr ex, + promise<> pollable_fd_state::* pr, int event); public: reactor_backend_epoll(); virtual ~reactor_backend_epoll() override { } @@ -575,6 +579,8 @@ public: virtual void forget(pollable_fd_state& fd) override; virtual future<> notified(reactor_notifier *n) override; virtual std::unique_ptr make_reactor_notifier() override; + void abort_reader(pollable_fd_state& fd, std::exception_ptr ex); + void abort_writer(pollable_fd_state& fd, std::exception_ptr ex); }; #ifdef HAVE_OSV @@ -863,6 +869,12 @@ public: future<> notified(reactor_notifier *n) { return _backend.notified(n); } + void abort_reader(pollable_fd_state& fd, std::exception_ptr ex) { + return _backend.abort_reader(fd, std::move(ex)); + } + void abort_writer(pollable_fd_state& fd, std::exception_ptr ex) { + return _backend.abort_writer(fd, std::move(ex)); + } void enable_timer(clock_type::time_point when); std::unique_ptr make_reactor_notifier() { return _backend.make_reactor_notifier(); @@ -1151,6 +1163,18 @@ future<> pollable_fd::writeable() { return engine().writeable(*_s); } +inline +void +pollable_fd::abort_reader(std::exception_ptr ex) { + engine().abort_reader(*_s, std::move(ex)); +} + +inline +void +pollable_fd::abort_writer(std::exception_ptr ex) { + engine().abort_writer(*_s, std::move(ex)); +} + inline future pollable_fd::accept() { return engine().accept(*_s); From 31d38012153173e082fac2c287906974599aa3ec Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:38:52 +0300 Subject: [PATCH 04/13] reactor: wire up shutdown() on pollable_fd --- core/reactor.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/core/reactor.hh b/core/reactor.hh index 1fd58f025b..f0a4f9cace 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -174,6 +174,7 @@ public: future recvmsg(struct msghdr *msg); future sendto(socket_address addr, const void* buf, size_t len); file_desc& get_file_desc() const { return _s->fd; } + void shutdown(int how) { _s->fd.shutdown(how); } void close() { _s.reset(); } protected: int get_fd() const { return _s->fd.get(); } From 9f7e31494c4d47a39ee6383429897fde2be8f3cc Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:41:40 +0300 Subject: [PATCH 05/13] posix stack: add methods to shut down a listening socket Anyone waiting on accept() will receive an exception. --- net/posix-stack.cc | 20 ++++++++++++++++++++ net/posix-stack.hh | 3 +++ 2 files changed, 23 insertions(+) diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 38c1ab0600..057c27fd1b 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -59,6 +59,11 @@ posix_server_socket_impl::accept() { }); } +void +posix_server_socket_impl::abort_accept() { + _lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); +} + future posix_ap_server_socket_impl::accept() { auto conni = conn_q.find(_sa.as_posix_sockaddr_in()); if (conni != conn_q.end()) { @@ -73,6 +78,16 @@ future posix_ap_server_socket_impl::accept() { } } +void +posix_ap_server_socket_impl::abort_accept() { + conn_q.erase(_sa.as_posix_sockaddr_in()); + auto i = sockets.find(_sa.as_posix_sockaddr_in()); + if (i != sockets.end()) { + i->second.set_exception(std::system_error(ECONNABORTED, std::system_category())); + sockets.erase(i); + } +} + future posix_reuseport_server_socket_impl::accept() { return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) { @@ -82,6 +97,11 @@ posix_reuseport_server_socket_impl::accept() { }); } +void +posix_reuseport_server_socket_impl::abort_accept() { + _lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); +} + void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) { auto i = sockets.find(sa.as_posix_sockaddr_in()); if (i != sockets.end()) { diff --git a/net/posix-stack.hh b/net/posix-stack.hh index e100d9132d..12d35f41fb 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -65,6 +65,7 @@ class posix_ap_server_socket_impl : public server_socket_impl { public: explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {} virtual future accept(); + void abort_accept(); static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr); }; @@ -74,6 +75,7 @@ class posix_server_socket_impl : public server_socket_impl { public: explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); + void abort_accept(); }; class posix_reuseport_server_socket_impl : public server_socket_impl { @@ -82,6 +84,7 @@ class posix_reuseport_server_socket_impl : public server_socket_impl { public: explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); + void abort_accept(); }; class posix_network_stack : public network_stack { From 587013a61112c43703e37ebe6a5f98c96542c873 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:44:03 +0300 Subject: [PATCH 06/13] net: add shutdown() method to tcp listener --- net/native-stack-impl.hh | 7 +++++++ net/tcp.hh | 3 +++ 2 files changed, 10 insertions(+) diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index e09f90fdb5..20e99a96e8 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -41,6 +41,7 @@ class native_server_socket_impl : public server_socket_impl { public: native_server_socket_impl(Protocol& proto, uint16_t port, listen_options opt); virtual future accept() override; + void abort_accept(); }; template @@ -58,6 +59,12 @@ native_server_socket_impl::accept() { }); } +template +void +native_server_socket_impl::abort_accept() { + _listener.abort_accept(); +} + // native_connected_socket_impl template class native_connected_socket_impl : public connected_socket_impl { diff --git a/net/tcp.hh b/net/tcp.hh index 5c951b5a15..48284fc47d 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -583,6 +583,9 @@ public: return make_ready_future(_q.pop()); }); } + void abort_accept() { + _q.abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); + } friend class tcp; }; public: From 661d459fff3ae9b64b60a7357bf16a442e926dd0 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:45:07 +0300 Subject: [PATCH 07/13] net: wire up server_socket shutdown method and expose to callers --- core/reactor.hh | 9 +++++++++ net/native-stack-impl.hh | 2 +- net/posix-stack.hh | 6 +++--- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/reactor.hh b/core/reactor.hh index f0a4f9cace..69417085ac 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -228,6 +228,7 @@ class server_socket_impl { public: virtual ~server_socket_impl() {} virtual future accept() = 0; + virtual void abort_accept() = 0; }; /// \endcond @@ -266,6 +267,14 @@ public: future accept() { return _ssi->accept(); } + + /// Stops any \ref accept() in progress. + /// + /// Current and future \ref accept() calls will terminate immediately + /// with an error. + void abort_accept() { + return _ssi->abort_accept(); + } }; /// @} diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index 20e99a96e8..4d81e99e71 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -41,7 +41,7 @@ class native_server_socket_impl : public server_socket_impl { public: native_server_socket_impl(Protocol& proto, uint16_t port, listen_options opt); virtual future accept() override; - void abort_accept(); + virtual void abort_accept() override; }; template diff --git a/net/posix-stack.hh b/net/posix-stack.hh index 12d35f41fb..dc9ea9ea37 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -65,7 +65,7 @@ class posix_ap_server_socket_impl : public server_socket_impl { public: explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {} virtual future accept(); - void abort_accept(); + virtual void abort_accept() override; static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr); }; @@ -75,7 +75,7 @@ class posix_server_socket_impl : public server_socket_impl { public: explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); - void abort_accept(); + virtual void abort_accept() override; }; class posix_reuseport_server_socket_impl : public server_socket_impl { @@ -84,7 +84,7 @@ class posix_reuseport_server_socket_impl : public server_socket_impl { public: explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); - void abort_accept(); + virtual void abort_accept() override; }; class posix_network_stack : public network_stack { From 74e9a897b7e1489ca7ca0cb3c2cee06601fc6d32 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:46:21 +0300 Subject: [PATCH 08/13] net: add shutdown methods for posix connected_socket We can simply ask the kernel to shutdown for us, and it will propagate exceptions to callers. --- net/posix-stack.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 057c27fd1b..ac0036c6a3 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -33,6 +33,12 @@ private: public: virtual input_stream input() override { return input_stream(posix_data_source(_fd)); } virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } + void shutdown_input() { + _fd.shutdown(SHUT_RD); + } + void shutdown_output() { + _fd.shutdown(SHUT_WR); + } friend class posix_server_socket_impl; friend class posix_ap_server_socket_impl; friend class posix_reuseport_server_socket_impl; From e59c7d9f1f956edd4fdaee969ee3ee58101c31ad Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:48:04 +0300 Subject: [PATCH 09/13] net: provide shutdown methods for the native tcp stack --- net/native-stack-impl.hh | 15 +++++++++++++++ net/tcp.hh | 12 ++++++++++++ 2 files changed, 27 insertions(+) diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index 4d81e99e71..91e8111a64 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -76,6 +76,8 @@ public: : _conn(std::move(conn)) {} virtual input_stream input() override; virtual output_stream output() override; + void shutdown_input(); + void shutdown_output(); }; template @@ -137,6 +139,19 @@ native_connected_socket_impl::output() { return output_stream(std::move(ds), 8192); } +template +void +native_connected_socket_impl::shutdown_input() { + _conn.close_read(); +} + +template +void +native_connected_socket_impl::shutdown_output() { + _conn.close_write(); +} + + } diff --git a/net/tcp.hh b/net/tcp.hh index 48284fc47d..4bb9c62ae0 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -323,6 +323,7 @@ private: void input_handle_other_state(tcp_hdr* th, packet p); void output_one(bool data_retransmit = false); future<> wait_for_data(); + void abort_reader(); future<> wait_for_all_data_acked(); future<> send(packet p); void connect(); @@ -1529,6 +1530,16 @@ future<> tcp::tcb::wait_for_data() { return _rcv._data_received_promise->get_future(); } +template +void +tcp::tcb::abort_reader() { + if (_rcv._data_received_promise) { + _rcv._data_received_promise->set_exception( + std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); + _rcv._data_received_promise = std::experimental::nullopt; + } +} + template future<> tcp::tcb::wait_for_all_data_acked() { if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) { @@ -1878,6 +1889,7 @@ std::experimental::optional tcp::tcb: template void tcp::connection::close_read() { + _tcb->abort_reader(); } template From 8dc71b7b8bcd6fc00c0cc7a17b249d9dcc000b32 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 16:48:32 +0300 Subject: [PATCH 10/13] net: wire up connected_socket shutdown methods and expose to user --- core/reactor.hh | 30 ++++++++++++++++++++++++++++++ net/native-stack-impl.hh | 4 ++-- net/posix-stack.cc | 4 ++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/reactor.hh b/core/reactor.hh index 69417085ac..3be99819e9 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -190,6 +190,8 @@ public: virtual ~connected_socket_impl() {} virtual input_stream input() = 0; virtual output_stream output() = 0; + virtual void shutdown_input() = 0; + virtual void shutdown_output() = 0; }; /// \addtogroup networking-module @@ -220,6 +222,22 @@ public: /// /// Gets an object that sends data to the remote endpoint. output_stream output(); + /// Disables output to the socket. + /// + /// Current or future writes that have not been successfully flushed + /// will immediately fail with an error. This is useful to abort + /// operations on a socket that is not making progress due to a + /// peer failure. + void shutdown_output(); + /// Disables input from the socket. + /// + /// Current or future reads will immediately fail with an error. + /// This is useful to abort operations on a socket that is not making + /// progress due to a peer failure. + void shutdown_input(); + /// Disables socket input and output. + /// + /// Equivalent to \ref shutdown_input() and \ref shutdown_output(). }; /// @} @@ -1338,4 +1356,16 @@ connected_socket::output() { return _csi->output(); } +inline +void +connected_socket::shutdown_input() { + return _csi->shutdown_input(); +} + +inline +void +connected_socket::shutdown_output() { + return _csi->shutdown_output(); +} + #endif /* REACTOR_HH_ */ diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index 91e8111a64..e3c0e1c2a6 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -76,8 +76,8 @@ public: : _conn(std::move(conn)) {} virtual input_stream input() override; virtual output_stream output() override; - void shutdown_input(); - void shutdown_output(); + virtual void shutdown_input() override; + virtual void shutdown_output() override; }; template diff --git a/net/posix-stack.cc b/net/posix-stack.cc index ac0036c6a3..eaaa9ce94c 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -33,10 +33,10 @@ private: public: virtual input_stream input() override { return input_stream(posix_data_source(_fd)); } virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } - void shutdown_input() { + virtual void shutdown_input() override { _fd.shutdown(SHUT_RD); } - void shutdown_output() { + virtual void shutdown_output() override { _fd.shutdown(SHUT_WR); } friend class posix_server_socket_impl; From 8f9d3dc9b22dff9bf5651468f9a22928303c03da Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 17:59:54 +0300 Subject: [PATCH 11/13] httpd: reformat main.cc --- apps/httpd/main.cc | 56 ++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc index 273f747df5..fb7b3dec9c 100644 --- a/apps/httpd/main.cc +++ b/apps/httpd/main.cc @@ -45,45 +45,43 @@ void set_routes(routes& r) { return "hello"; }); function_handler* h2 = new function_handler([](std::unique_ptr req) { - return make_ready_future("json-future"); + return make_ready_future("json-future"); }); r.add(operation_type::GET, url("/"), h1); r.add(operation_type::GET, url("/jf"), h2); r.add(operation_type::GET, url("/file").remainder("path"), new directory_handler("/")); - demo_json::hello_world.set(r, - [](const_req req) { - demo_json::my_object obj; - obj.var1 = req.param.at("var1"); - obj.var2 = req.param.at("var2"); - demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum")); - // This demonstrate enum conversion - obj.enum_var = v; - return obj; - }); + demo_json::hello_world.set(r, [] (const_req req) { + demo_json::my_object obj; + obj.var1 = req.param.at("var1"); + obj.var2 = req.param.at("var2"); + demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum")); + // This demonstrate enum conversion + obj.enum_var = v; + return obj; + }); } int main(int ac, char** av) { app_template app; app.add_options()("port", bpo::value()->default_value(10000), "HTTP Server port"); - return app.run(ac, av, - [&] { - auto&& config = app.configuration(); - uint16_t port = config["port"].as(); - auto server = new http_server_control(); - auto rb= make_shared("apps/httpd/"); - server->start().then([server] { - return server->set_routes(set_routes); - }).then([server, rb]{ - return server->set_routes([rb](routes& r){rb->set_api_doc(r);}); - }).then([server, rb]{ - return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); - }).then([server, port] { - return server->listen(port); - }).then([port] { - std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; - }); + return app.run(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as(); + auto server = new http_server_control(); + auto rb = make_shared("apps/httpd/"); + server->start().then([server] { + return server->set_routes(set_routes); + }).then([server, rb]{ + return server->set_routes([rb](routes& r){rb->set_api_doc(r);}); + }).then([server, rb]{ + return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); + }).then([server, port] { + return server->listen(port); + }).then([port] { + std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; + }); - }); + }); } From 56abdc4632728bf42effd205c4a9e52dee54f944 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 15 Jun 2015 18:30:29 +0300 Subject: [PATCH 12/13] httpd: implement stop() for http_server Aborts active accept() calls and shuts down. Active connections are still leaked. --- apps/httpd/main.cc | 5 ++++- http/httpd.hh | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc index fb7b3dec9c..27bebbcb40 100644 --- a/apps/httpd/main.cc +++ b/apps/httpd/main.cc @@ -79,8 +79,11 @@ int main(int ac, char** av) { return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); }).then([server, port] { return server->listen(port); - }).then([port] { + }).then([server, port] { std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; + engine().at_exit([server] { + return server->stop(); + }); }); }); diff --git a/http/httpd.hh b/http/httpd.hh index 616f2f8e07..d3dbe7acfb 100644 --- a/http/httpd.hh +++ b/http/httpd.hh @@ -65,6 +65,7 @@ class http_server { uint64_t _requests_served = 0; sstring _date = http_date(); timer<> _date_format_timer { [this] {_date = http_date();} }; + future<> _stopped = make_ready_future(); public: routes _routes; @@ -75,11 +76,17 @@ public: listen_options lo; lo.reuse_address = true; _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); - do_accepts(_listeners.size() - 1); + _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1)).discard_result(); return make_ready_future<>(); } - void do_accepts(int which) { - _listeners[which].accept().then( + future<> stop() { + for (auto&& l : _listeners) { + l.abort_accept(); + } + return std::move(_stopped); + } + future<> do_accepts(int which) { + return _listeners[which].accept().then( [this, which] (connected_socket fd, socket_address addr) mutable { auto conn = new connection(*this, std::move(fd), addr); conn->process().then_wrapped([this, conn] (auto&& f) { @@ -370,6 +377,10 @@ public: return _server_dist->start(); } + future<> stop() { + return _server_dist->stop(); + } + future<> set_routes(std::function fun) { return _server_dist->invoke_on_all([fun](http_server& server) { fun(server._routes); From 1e481f91b7cffed48ef998933c8ca8d067872aa9 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 16 Jun 2015 11:46:02 +0300 Subject: [PATCH 13/13] httpd: stop active connections on shutdown Keep a list of all active connections, and shutdown the socket when we're stopping. Wait for all connections to remove themselves before returning. --- http/httpd.hh | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/http/httpd.hh b/http/httpd.hh index d3dbe7acfb..8b3d227eaf 100644 --- a/http/httpd.hh +++ b/http/httpd.hh @@ -41,6 +41,7 @@ #include #include #include +#include #include "reply.hh" #include "http/routes.hh" @@ -63,9 +64,18 @@ class http_server { uint64_t _total_connections = 0; uint64_t _current_connections = 0; uint64_t _requests_served = 0; + uint64_t _connections_being_accepted = 0; sstring _date = http_date(); timer<> _date_format_timer { [this] {_date = http_date();} }; - future<> _stopped = make_ready_future(); + bool _stopping = false; + promise<> _all_connections_stopped; + future<> _stopped = _all_connections_stopped.get_future(); +private: + void maybe_idle() { + if (_stopping && !_connections_being_accepted && !_current_connections) { + _all_connections_stopped.set_value(); + } + } public: routes _routes; @@ -80,15 +90,26 @@ public: return make_ready_future<>(); } future<> stop() { + _stopping = true; for (auto&& l : _listeners) { l.abort_accept(); } + for (auto&& c : _connections) { + c.shutdown(); + } return std::move(_stopped); } future<> do_accepts(int which) { - return _listeners[which].accept().then( - [this, which] (connected_socket fd, socket_address addr) mutable { - auto conn = new connection(*this, std::move(fd), addr); + ++_connections_being_accepted; + return _listeners[which].accept().then_wrapped( + [this, which] (future f_cs_sa) mutable { + --_connections_being_accepted; + if (_stopping) { + maybe_idle(); + return; + } + auto cs_sa = f_cs_sa.get(); + auto conn = new connection(*this, std::get<0>(std::move(cs_sa)), std::get<1>(std::move(cs_sa))); conn->process().then_wrapped([this, conn] (auto&& f) { delete conn; try { @@ -106,7 +127,8 @@ public: } }); } - class connection { +private: + class connection : public boost::intrusive::list_base_hook<> { http_server& _server; connected_socket _fd; input_stream _read_buf; @@ -125,9 +147,12 @@ public: _fd.output()) { ++_server._total_connections; ++_server._current_connections; + _server._connections.push_back(*this); } ~connection() { --_server._current_connections; + _server._connections.erase(_server._connections.iterator_to(*this)); + _server.maybe_idle(); } future<> process() { // Launch read and write "threads" simultaneously: @@ -137,6 +162,10 @@ public: return make_ready_future<>(); }); } + void shutdown() { + _fd.shutdown_input(); + _fd.shutdown_output(); + } future<> read() { return do_until([this] {return _done;}, [this] { return read_one(); @@ -335,6 +364,7 @@ public: _resp->_content.size()); } }; +public: uint64_t total_connections() const { return _total_connections; } @@ -352,6 +382,8 @@ public: strftime(tmp, sizeof(tmp), "%d %b %Y %H:%M:%S GMT", &tm); return tmp; } +private: + boost::intrusive::list _connections; }; /*