diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc index 273f747df5..27bebbcb40 100644 --- a/apps/httpd/main.cc +++ b/apps/httpd/main.cc @@ -45,45 +45,46 @@ void set_routes(routes& r) { return "hello"; }); function_handler* h2 = new function_handler([](std::unique_ptr req) { - return make_ready_future("json-future"); + return make_ready_future("json-future"); }); r.add(operation_type::GET, url("/"), h1); r.add(operation_type::GET, url("/jf"), h2); r.add(operation_type::GET, url("/file").remainder("path"), new directory_handler("/")); - demo_json::hello_world.set(r, - [](const_req req) { - demo_json::my_object obj; - obj.var1 = req.param.at("var1"); - obj.var2 = req.param.at("var2"); - demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum")); - // This demonstrate enum conversion - obj.enum_var = v; - return obj; - }); + demo_json::hello_world.set(r, [] (const_req req) { + demo_json::my_object obj; + obj.var1 = req.param.at("var1"); + obj.var2 = req.param.at("var2"); + demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum")); + // This demonstrate enum conversion + obj.enum_var = v; + return obj; + }); } int main(int ac, char** av) { app_template app; app.add_options()("port", bpo::value()->default_value(10000), "HTTP Server port"); - return app.run(ac, av, - [&] { - auto&& config = app.configuration(); - uint16_t port = config["port"].as(); - auto server = new http_server_control(); - auto rb= make_shared("apps/httpd/"); - server->start().then([server] { - return server->set_routes(set_routes); - }).then([server, rb]{ - return server->set_routes([rb](routes& r){rb->set_api_doc(r);}); - }).then([server, rb]{ - return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); - }).then([server, port] { - return server->listen(port); - }).then([port] { - std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; - }); - + return app.run(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as(); + auto server = new http_server_control(); + auto rb = make_shared("apps/httpd/"); + server->start().then([server] { + return server->set_routes(set_routes); + }).then([server, rb]{ + return server->set_routes([rb](routes& r){rb->set_api_doc(r);}); + }).then([server, rb]{ + return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); + }).then([server, port] { + return server->listen(port); + }).then([server, port] { + std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; + engine().at_exit([server] { + return server->stop(); }); + }); + + }); } diff --git a/core/posix.hh b/core/posix.hh index 788b3835aa..d226592673 100644 --- a/core/posix.hh +++ b/core/posix.hh @@ -117,6 +117,10 @@ public: throw_system_error_on(ret == -1); return file_desc(ret); } + void shutdown(int how) { + auto ret = ::shutdown(_fd, how); + throw_system_error_on(ret == -1); + } void truncate(size_t size) { auto ret = ::ftruncate(_fd, size); throw_system_error_on(ret); diff --git a/core/queue.hh b/core/queue.hh index d1f6d09670..c2c3739201 100644 --- a/core/queue.hh +++ b/core/queue.hh @@ -76,6 +76,22 @@ public: future<> push_eventually(T&& data); size_t size() const { return _q.size(); } + + // Destroy any items in the queue, and pass the provided exception to any + // waiting readers or writers. + void abort(std::exception_ptr ex) { + while (!_q.empty()) { + _q.pop(); + } + if (_not_full) { + _not_full->set_exception(ex); + _not_full= std::experimental::nullopt; + } + if (_not_empty) { + _not_empty->set_exception(std::move(ex)); + _not_empty = std::experimental::nullopt; + } + } }; template diff --git a/core/reactor.cc b/core/reactor.cc index 4d0b084f67..f035caf64b 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -281,6 +281,24 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, return (pfd.*pr).get_future(); } +void reactor_backend_epoll::abort_fd(pollable_fd_state& pfd, std::exception_ptr ex, + promise<> pollable_fd_state::* pr, int event) { + if (pfd.events_epoll & event) { + pfd.events_epoll &= ~event; + auto ctl = pfd.events_epoll ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + ::epoll_event eevt; + eevt.events = pfd.events_epoll; + eevt.data.ptr = &pfd; + int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt); + assert(r == 0); + } + if (pfd.events_requested & event) { + pfd.events_requested &= ~event; + (pfd.*pr).set_exception(std::move(ex)); + } + pfd.events_known &= ~event; +} + future<> reactor_backend_epoll::readable(pollable_fd_state& fd) { return get_epoll_future(fd, &pollable_fd_state::pollin, EPOLLIN); } @@ -289,6 +307,14 @@ future<> reactor_backend_epoll::writeable(pollable_fd_state& fd) { return get_epoll_future(fd, &pollable_fd_state::pollout, EPOLLOUT); } +void reactor_backend_epoll::abort_reader(pollable_fd_state& fd, std::exception_ptr ex) { + abort_fd(fd, std::move(ex), &pollable_fd_state::pollin, EPOLLIN); +} + +void reactor_backend_epoll::abort_writer(pollable_fd_state& fd, std::exception_ptr ex) { + abort_fd(fd, std::move(ex), &pollable_fd_state::pollout, EPOLLOUT); +} + void reactor_backend_epoll::forget(pollable_fd_state& fd) { if (fd.events_epoll) { ::epoll_ctl(_epollfd.get(), EPOLL_CTL_DEL, fd.fd.get(), nullptr); diff --git a/core/reactor.hh b/core/reactor.hh index 0a76a2da9c..3be99819e9 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -167,11 +167,14 @@ public: future<> write_all(net::packet& p); future<> readable(); future<> writeable(); + void abort_reader(std::exception_ptr ex); + void abort_writer(std::exception_ptr ex); future accept(); future sendmsg(struct msghdr *msg); future recvmsg(struct msghdr *msg); future sendto(socket_address addr, const void* buf, size_t len); file_desc& get_file_desc() const { return _s->fd; } + void shutdown(int how) { _s->fd.shutdown(how); } void close() { _s.reset(); } protected: int get_fd() const { return _s->fd.get(); } @@ -187,6 +190,8 @@ public: virtual ~connected_socket_impl() {} virtual input_stream input() = 0; virtual output_stream output() = 0; + virtual void shutdown_input() = 0; + virtual void shutdown_output() = 0; }; /// \addtogroup networking-module @@ -217,6 +222,22 @@ public: /// /// Gets an object that sends data to the remote endpoint. output_stream output(); + /// Disables output to the socket. + /// + /// Current or future writes that have not been successfully flushed + /// will immediately fail with an error. This is useful to abort + /// operations on a socket that is not making progress due to a + /// peer failure. + void shutdown_output(); + /// Disables input from the socket. + /// + /// Current or future reads will immediately fail with an error. + /// This is useful to abort operations on a socket that is not making + /// progress due to a peer failure. + void shutdown_input(); + /// Disables socket input and output. + /// + /// Equivalent to \ref shutdown_input() and \ref shutdown_output(). }; /// @} @@ -225,6 +246,7 @@ class server_socket_impl { public: virtual ~server_socket_impl() {} virtual future accept() = 0; + virtual void abort_accept() = 0; }; /// \endcond @@ -263,6 +285,14 @@ public: future accept() { return _ssi->accept(); } + + /// Stops any \ref accept() in progress. + /// + /// Current and future \ref accept() calls will terminate immediately + /// with an error. + void abort_accept() { + return _ssi->abort_accept(); + } }; /// @} @@ -566,6 +596,8 @@ private: promise<> pollable_fd_state::* pr, int event); void complete_epoll_event(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int events, int event); + void abort_fd(pollable_fd_state& fd, std::exception_ptr ex, + promise<> pollable_fd_state::* pr, int event); public: reactor_backend_epoll(); virtual ~reactor_backend_epoll() override { } @@ -575,6 +607,8 @@ public: virtual void forget(pollable_fd_state& fd) override; virtual future<> notified(reactor_notifier *n) override; virtual std::unique_ptr make_reactor_notifier() override; + void abort_reader(pollable_fd_state& fd, std::exception_ptr ex); + void abort_writer(pollable_fd_state& fd, std::exception_ptr ex); }; #ifdef HAVE_OSV @@ -863,6 +897,12 @@ public: future<> notified(reactor_notifier *n) { return _backend.notified(n); } + void abort_reader(pollable_fd_state& fd, std::exception_ptr ex) { + return _backend.abort_reader(fd, std::move(ex)); + } + void abort_writer(pollable_fd_state& fd, std::exception_ptr ex) { + return _backend.abort_writer(fd, std::move(ex)); + } void enable_timer(clock_type::time_point when); std::unique_ptr make_reactor_notifier() { return _backend.make_reactor_notifier(); @@ -1151,6 +1191,18 @@ future<> pollable_fd::writeable() { return engine().writeable(*_s); } +inline +void +pollable_fd::abort_reader(std::exception_ptr ex) { + engine().abort_reader(*_s, std::move(ex)); +} + +inline +void +pollable_fd::abort_writer(std::exception_ptr ex) { + engine().abort_writer(*_s, std::move(ex)); +} + inline future pollable_fd::accept() { return engine().accept(*_s); @@ -1304,4 +1356,16 @@ connected_socket::output() { return _csi->output(); } +inline +void +connected_socket::shutdown_input() { + return _csi->shutdown_input(); +} + +inline +void +connected_socket::shutdown_output() { + return _csi->shutdown_output(); +} + #endif /* REACTOR_HH_ */ diff --git a/http/httpd.hh b/http/httpd.hh index 616f2f8e07..8b3d227eaf 100644 --- a/http/httpd.hh +++ b/http/httpd.hh @@ -41,6 +41,7 @@ #include #include #include +#include #include "reply.hh" #include "http/routes.hh" @@ -63,8 +64,18 @@ class http_server { uint64_t _total_connections = 0; uint64_t _current_connections = 0; uint64_t _requests_served = 0; + uint64_t _connections_being_accepted = 0; sstring _date = http_date(); timer<> _date_format_timer { [this] {_date = http_date();} }; + bool _stopping = false; + promise<> _all_connections_stopped; + future<> _stopped = _all_connections_stopped.get_future(); +private: + void maybe_idle() { + if (_stopping && !_connections_being_accepted && !_current_connections) { + _all_connections_stopped.set_value(); + } + } public: routes _routes; @@ -75,13 +86,30 @@ public: listen_options lo; lo.reuse_address = true; _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); - do_accepts(_listeners.size() - 1); + _stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1)).discard_result(); return make_ready_future<>(); } - void do_accepts(int which) { - _listeners[which].accept().then( - [this, which] (connected_socket fd, socket_address addr) mutable { - auto conn = new connection(*this, std::move(fd), addr); + future<> stop() { + _stopping = true; + for (auto&& l : _listeners) { + l.abort_accept(); + } + for (auto&& c : _connections) { + c.shutdown(); + } + return std::move(_stopped); + } + future<> do_accepts(int which) { + ++_connections_being_accepted; + return _listeners[which].accept().then_wrapped( + [this, which] (future f_cs_sa) mutable { + --_connections_being_accepted; + if (_stopping) { + maybe_idle(); + return; + } + auto cs_sa = f_cs_sa.get(); + auto conn = new connection(*this, std::get<0>(std::move(cs_sa)), std::get<1>(std::move(cs_sa))); conn->process().then_wrapped([this, conn] (auto&& f) { delete conn; try { @@ -99,7 +127,8 @@ public: } }); } - class connection { +private: + class connection : public boost::intrusive::list_base_hook<> { http_server& _server; connected_socket _fd; input_stream _read_buf; @@ -118,9 +147,12 @@ public: _fd.output()) { ++_server._total_connections; ++_server._current_connections; + _server._connections.push_back(*this); } ~connection() { --_server._current_connections; + _server._connections.erase(_server._connections.iterator_to(*this)); + _server.maybe_idle(); } future<> process() { // Launch read and write "threads" simultaneously: @@ -130,6 +162,10 @@ public: return make_ready_future<>(); }); } + void shutdown() { + _fd.shutdown_input(); + _fd.shutdown_output(); + } future<> read() { return do_until([this] {return _done;}, [this] { return read_one(); @@ -328,6 +364,7 @@ public: _resp->_content.size()); } }; +public: uint64_t total_connections() const { return _total_connections; } @@ -345,6 +382,8 @@ public: strftime(tmp, sizeof(tmp), "%d %b %Y %H:%M:%S GMT", &tm); return tmp; } +private: + boost::intrusive::list _connections; }; /* @@ -370,6 +409,10 @@ public: return _server_dist->start(); } + future<> stop() { + return _server_dist->stop(); + } + future<> set_routes(std::function fun) { return _server_dist->invoke_on_all([fun](http_server& server) { fun(server._routes); diff --git a/net/native-stack-impl.hh b/net/native-stack-impl.hh index e09f90fdb5..e3c0e1c2a6 100644 --- a/net/native-stack-impl.hh +++ b/net/native-stack-impl.hh @@ -41,6 +41,7 @@ class native_server_socket_impl : public server_socket_impl { public: native_server_socket_impl(Protocol& proto, uint16_t port, listen_options opt); virtual future accept() override; + virtual void abort_accept() override; }; template @@ -58,6 +59,12 @@ native_server_socket_impl::accept() { }); } +template +void +native_server_socket_impl::abort_accept() { + _listener.abort_accept(); +} + // native_connected_socket_impl template class native_connected_socket_impl : public connected_socket_impl { @@ -69,6 +76,8 @@ public: : _conn(std::move(conn)) {} virtual input_stream input() override; virtual output_stream output() override; + virtual void shutdown_input() override; + virtual void shutdown_output() override; }; template @@ -130,6 +139,19 @@ native_connected_socket_impl::output() { return output_stream(std::move(ds), 8192); } +template +void +native_connected_socket_impl::shutdown_input() { + _conn.close_read(); +} + +template +void +native_connected_socket_impl::shutdown_output() { + _conn.close_write(); +} + + } diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 38c1ab0600..eaaa9ce94c 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -33,6 +33,12 @@ private: public: virtual input_stream input() override { return input_stream(posix_data_source(_fd)); } virtual output_stream output() override { return output_stream(posix_data_sink(_fd), 8192); } + virtual void shutdown_input() override { + _fd.shutdown(SHUT_RD); + } + virtual void shutdown_output() override { + _fd.shutdown(SHUT_WR); + } friend class posix_server_socket_impl; friend class posix_ap_server_socket_impl; friend class posix_reuseport_server_socket_impl; @@ -59,6 +65,11 @@ posix_server_socket_impl::accept() { }); } +void +posix_server_socket_impl::abort_accept() { + _lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); +} + future posix_ap_server_socket_impl::accept() { auto conni = conn_q.find(_sa.as_posix_sockaddr_in()); if (conni != conn_q.end()) { @@ -73,6 +84,16 @@ future posix_ap_server_socket_impl::accept() { } } +void +posix_ap_server_socket_impl::abort_accept() { + conn_q.erase(_sa.as_posix_sockaddr_in()); + auto i = sockets.find(_sa.as_posix_sockaddr_in()); + if (i != sockets.end()) { + i->second.set_exception(std::system_error(ECONNABORTED, std::system_category())); + sockets.erase(i); + } +} + future posix_reuseport_server_socket_impl::accept() { return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) { @@ -82,6 +103,11 @@ posix_reuseport_server_socket_impl::accept() { }); } +void +posix_reuseport_server_socket_impl::abort_accept() { + _lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); +} + void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) { auto i = sockets.find(sa.as_posix_sockaddr_in()); if (i != sockets.end()) { diff --git a/net/posix-stack.hh b/net/posix-stack.hh index e100d9132d..dc9ea9ea37 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -65,6 +65,7 @@ class posix_ap_server_socket_impl : public server_socket_impl { public: explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {} virtual future accept(); + virtual void abort_accept() override; static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr); }; @@ -74,6 +75,7 @@ class posix_server_socket_impl : public server_socket_impl { public: explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); + virtual void abort_accept() override; }; class posix_reuseport_server_socket_impl : public server_socket_impl { @@ -82,6 +84,7 @@ class posix_reuseport_server_socket_impl : public server_socket_impl { public: explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {} virtual future accept(); + virtual void abort_accept() override; }; class posix_network_stack : public network_stack { diff --git a/net/tcp.hh b/net/tcp.hh index 5c951b5a15..4bb9c62ae0 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -323,6 +323,7 @@ private: void input_handle_other_state(tcp_hdr* th, packet p); void output_one(bool data_retransmit = false); future<> wait_for_data(); + void abort_reader(); future<> wait_for_all_data_acked(); future<> send(packet p); void connect(); @@ -583,6 +584,9 @@ public: return make_ready_future(_q.pop()); }); } + void abort_accept() { + _q.abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); + } friend class tcp; }; public: @@ -1526,6 +1530,16 @@ future<> tcp::tcb::wait_for_data() { return _rcv._data_received_promise->get_future(); } +template +void +tcp::tcb::abort_reader() { + if (_rcv._data_received_promise) { + _rcv._data_received_promise->set_exception( + std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category()))); + _rcv._data_received_promise = std::experimental::nullopt; + } +} + template future<> tcp::tcb::wait_for_all_data_acked() { if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) { @@ -1875,6 +1889,7 @@ std::experimental::optional tcp::tcb: template void tcp::connection::close_read() { + _tcb->abort_reader(); } template