Merge branch 'shutdown'

Shutdown support for sockets, and httpd integration.
This commit is contained in:
Avi Kivity
2015-06-16 12:59:22 +03:00
10 changed files with 255 additions and 35 deletions

View File

@@ -45,45 +45,46 @@ void set_routes(routes& r) {
return "hello";
});
function_handler* h2 = new function_handler([](std::unique_ptr<request> req) {
return make_ready_future<json::json_return_type>("json-future");
return make_ready_future<json::json_return_type>("json-future");
});
r.add(operation_type::GET, url("/"), h1);
r.add(operation_type::GET, url("/jf"), h2);
r.add(operation_type::GET, url("/file").remainder("path"),
new directory_handler("/"));
demo_json::hello_world.set(r,
[](const_req req) {
demo_json::my_object obj;
obj.var1 = req.param.at("var1");
obj.var2 = req.param.at("var2");
demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum"));
// This demonstrate enum conversion
obj.enum_var = v;
return obj;
});
demo_json::hello_world.set(r, [] (const_req req) {
demo_json::my_object obj;
obj.var1 = req.param.at("var1");
obj.var2 = req.param.at("var2");
demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum"));
// This demonstrate enum conversion
obj.enum_var = v;
return obj;
});
}
int main(int ac, char** av) {
app_template app;
app.add_options()("port", bpo::value<uint16_t>()->default_value(10000),
"HTTP Server port");
return app.run(ac, av,
[&] {
auto&& config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
auto server = new http_server_control();
auto rb= make_shared<api_registry_builder>("apps/httpd/");
server->start().then([server] {
return server->set_routes(set_routes);
}).then([server, rb]{
return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
}).then([server, rb]{
return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
}).then([server, port] {
return server->listen(port);
}).then([port] {
std::cout << "Seastar HTTP server listening on port " << port << " ...\n";
});
return app.run(ac, av, [&] {
auto&& config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
auto server = new http_server_control();
auto rb = make_shared<api_registry_builder>("apps/httpd/");
server->start().then([server] {
return server->set_routes(set_routes);
}).then([server, rb]{
return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
}).then([server, rb]{
return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
}).then([server, port] {
return server->listen(port);
}).then([server, port] {
std::cout << "Seastar HTTP server listening on port " << port << " ...\n";
engine().at_exit([server] {
return server->stop();
});
});
});
}

View File

@@ -117,6 +117,10 @@ public:
throw_system_error_on(ret == -1);
return file_desc(ret);
}
void shutdown(int how) {
auto ret = ::shutdown(_fd, how);
throw_system_error_on(ret == -1);
}
void truncate(size_t size) {
auto ret = ::ftruncate(_fd, size);
throw_system_error_on(ret);

View File

@@ -76,6 +76,22 @@ public:
future<> push_eventually(T&& data);
size_t size() const { return _q.size(); }
// Destroy any items in the queue, and pass the provided exception to any
// waiting readers or writers.
void abort(std::exception_ptr ex) {
while (!_q.empty()) {
_q.pop();
}
if (_not_full) {
_not_full->set_exception(ex);
_not_full= std::experimental::nullopt;
}
if (_not_empty) {
_not_empty->set_exception(std::move(ex));
_not_empty = std::experimental::nullopt;
}
}
};
template <typename T>

View File

@@ -281,6 +281,24 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd,
return (pfd.*pr).get_future();
}
void reactor_backend_epoll::abort_fd(pollable_fd_state& pfd, std::exception_ptr ex,
promise<> pollable_fd_state::* pr, int event) {
if (pfd.events_epoll & event) {
pfd.events_epoll &= ~event;
auto ctl = pfd.events_epoll ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
::epoll_event eevt;
eevt.events = pfd.events_epoll;
eevt.data.ptr = &pfd;
int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt);
assert(r == 0);
}
if (pfd.events_requested & event) {
pfd.events_requested &= ~event;
(pfd.*pr).set_exception(std::move(ex));
}
pfd.events_known &= ~event;
}
future<> reactor_backend_epoll::readable(pollable_fd_state& fd) {
return get_epoll_future(fd, &pollable_fd_state::pollin, EPOLLIN);
}
@@ -289,6 +307,14 @@ future<> reactor_backend_epoll::writeable(pollable_fd_state& fd) {
return get_epoll_future(fd, &pollable_fd_state::pollout, EPOLLOUT);
}
void reactor_backend_epoll::abort_reader(pollable_fd_state& fd, std::exception_ptr ex) {
abort_fd(fd, std::move(ex), &pollable_fd_state::pollin, EPOLLIN);
}
void reactor_backend_epoll::abort_writer(pollable_fd_state& fd, std::exception_ptr ex) {
abort_fd(fd, std::move(ex), &pollable_fd_state::pollout, EPOLLOUT);
}
void reactor_backend_epoll::forget(pollable_fd_state& fd) {
if (fd.events_epoll) {
::epoll_ctl(_epollfd.get(), EPOLL_CTL_DEL, fd.fd.get(), nullptr);

View File

@@ -167,11 +167,14 @@ public:
future<> write_all(net::packet& p);
future<> readable();
future<> writeable();
void abort_reader(std::exception_ptr ex);
void abort_writer(std::exception_ptr ex);
future<pollable_fd, socket_address> accept();
future<size_t> sendmsg(struct msghdr *msg);
future<size_t> recvmsg(struct msghdr *msg);
future<size_t> sendto(socket_address addr, const void* buf, size_t len);
file_desc& get_file_desc() const { return _s->fd; }
void shutdown(int how) { _s->fd.shutdown(how); }
void close() { _s.reset(); }
protected:
int get_fd() const { return _s->fd.get(); }
@@ -187,6 +190,8 @@ public:
virtual ~connected_socket_impl() {}
virtual input_stream<char> input() = 0;
virtual output_stream<char> output() = 0;
virtual void shutdown_input() = 0;
virtual void shutdown_output() = 0;
};
/// \addtogroup networking-module
@@ -217,6 +222,22 @@ public:
///
/// Gets an object that sends data to the remote endpoint.
output_stream<char> output();
/// Disables output to the socket.
///
/// Current or future writes that have not been successfully flushed
/// will immediately fail with an error. This is useful to abort
/// operations on a socket that is not making progress due to a
/// peer failure.
void shutdown_output();
/// Disables input from the socket.
///
/// Current or future reads will immediately fail with an error.
/// This is useful to abort operations on a socket that is not making
/// progress due to a peer failure.
void shutdown_input();
/// Disables socket input and output.
///
/// Equivalent to \ref shutdown_input() and \ref shutdown_output().
};
/// @}
@@ -225,6 +246,7 @@ class server_socket_impl {
public:
virtual ~server_socket_impl() {}
virtual future<connected_socket, socket_address> accept() = 0;
virtual void abort_accept() = 0;
};
/// \endcond
@@ -263,6 +285,14 @@ public:
future<connected_socket, socket_address> accept() {
return _ssi->accept();
}
/// Stops any \ref accept() in progress.
///
/// Current and future \ref accept() calls will terminate immediately
/// with an error.
void abort_accept() {
return _ssi->abort_accept();
}
};
/// @}
@@ -566,6 +596,8 @@ private:
promise<> pollable_fd_state::* pr, int event);
void complete_epoll_event(pollable_fd_state& fd,
promise<> pollable_fd_state::* pr, int events, int event);
void abort_fd(pollable_fd_state& fd, std::exception_ptr ex,
promise<> pollable_fd_state::* pr, int event);
public:
reactor_backend_epoll();
virtual ~reactor_backend_epoll() override { }
@@ -575,6 +607,8 @@ public:
virtual void forget(pollable_fd_state& fd) override;
virtual future<> notified(reactor_notifier *n) override;
virtual std::unique_ptr<reactor_notifier> make_reactor_notifier() override;
void abort_reader(pollable_fd_state& fd, std::exception_ptr ex);
void abort_writer(pollable_fd_state& fd, std::exception_ptr ex);
};
#ifdef HAVE_OSV
@@ -863,6 +897,12 @@ public:
future<> notified(reactor_notifier *n) {
return _backend.notified(n);
}
void abort_reader(pollable_fd_state& fd, std::exception_ptr ex) {
return _backend.abort_reader(fd, std::move(ex));
}
void abort_writer(pollable_fd_state& fd, std::exception_ptr ex) {
return _backend.abort_writer(fd, std::move(ex));
}
void enable_timer(clock_type::time_point when);
std::unique_ptr<reactor_notifier> make_reactor_notifier() {
return _backend.make_reactor_notifier();
@@ -1151,6 +1191,18 @@ future<> pollable_fd::writeable() {
return engine().writeable(*_s);
}
inline
void
pollable_fd::abort_reader(std::exception_ptr ex) {
engine().abort_reader(*_s, std::move(ex));
}
inline
void
pollable_fd::abort_writer(std::exception_ptr ex) {
engine().abort_writer(*_s, std::move(ex));
}
inline
future<pollable_fd, socket_address> pollable_fd::accept() {
return engine().accept(*_s);
@@ -1304,4 +1356,16 @@ connected_socket::output() {
return _csi->output();
}
inline
void
connected_socket::shutdown_input() {
return _csi->shutdown_input();
}
inline
void
connected_socket::shutdown_output() {
return _csi->shutdown_output();
}
#endif /* REACTOR_HH_ */

View File

@@ -41,6 +41,7 @@
#include <limits>
#include <cctype>
#include <vector>
#include <boost/intrusive/list.hpp>
#include "reply.hh"
#include "http/routes.hh"
@@ -63,8 +64,18 @@ class http_server {
uint64_t _total_connections = 0;
uint64_t _current_connections = 0;
uint64_t _requests_served = 0;
uint64_t _connections_being_accepted = 0;
sstring _date = http_date();
timer<> _date_format_timer { [this] {_date = http_date();} };
bool _stopping = false;
promise<> _all_connections_stopped;
future<> _stopped = _all_connections_stopped.get_future();
private:
void maybe_idle() {
if (_stopping && !_connections_being_accepted && !_current_connections) {
_all_connections_stopped.set_value();
}
}
public:
routes _routes;
@@ -75,13 +86,30 @@ public:
listen_options lo;
lo.reuse_address = true;
_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
do_accepts(_listeners.size() - 1);
_stopped = when_all(std::move(_stopped), do_accepts(_listeners.size() - 1)).discard_result();
return make_ready_future<>();
}
void do_accepts(int which) {
_listeners[which].accept().then(
[this, which] (connected_socket fd, socket_address addr) mutable {
auto conn = new connection(*this, std::move(fd), addr);
future<> stop() {
_stopping = true;
for (auto&& l : _listeners) {
l.abort_accept();
}
for (auto&& c : _connections) {
c.shutdown();
}
return std::move(_stopped);
}
future<> do_accepts(int which) {
++_connections_being_accepted;
return _listeners[which].accept().then_wrapped(
[this, which] (future<connected_socket, socket_address> f_cs_sa) mutable {
--_connections_being_accepted;
if (_stopping) {
maybe_idle();
return;
}
auto cs_sa = f_cs_sa.get();
auto conn = new connection(*this, std::get<0>(std::move(cs_sa)), std::get<1>(std::move(cs_sa)));
conn->process().then_wrapped([this, conn] (auto&& f) {
delete conn;
try {
@@ -99,7 +127,8 @@ public:
}
});
}
class connection {
private:
class connection : public boost::intrusive::list_base_hook<> {
http_server& _server;
connected_socket _fd;
input_stream<char> _read_buf;
@@ -118,9 +147,12 @@ public:
_fd.output()) {
++_server._total_connections;
++_server._current_connections;
_server._connections.push_back(*this);
}
~connection() {
--_server._current_connections;
_server._connections.erase(_server._connections.iterator_to(*this));
_server.maybe_idle();
}
future<> process() {
// Launch read and write "threads" simultaneously:
@@ -130,6 +162,10 @@ public:
return make_ready_future<>();
});
}
void shutdown() {
_fd.shutdown_input();
_fd.shutdown_output();
}
future<> read() {
return do_until([this] {return _done;}, [this] {
return read_one();
@@ -328,6 +364,7 @@ public:
_resp->_content.size());
}
};
public:
uint64_t total_connections() const {
return _total_connections;
}
@@ -345,6 +382,8 @@ public:
strftime(tmp, sizeof(tmp), "%d %b %Y %H:%M:%S GMT", &tm);
return tmp;
}
private:
boost::intrusive::list<connection> _connections;
};
/*
@@ -370,6 +409,10 @@ public:
return _server_dist->start();
}
future<> stop() {
return _server_dist->stop();
}
future<> set_routes(std::function<void(routes& r)> fun) {
return _server_dist->invoke_on_all([fun](http_server& server) {
fun(server._routes);

View File

@@ -41,6 +41,7 @@ class native_server_socket_impl : public server_socket_impl {
public:
native_server_socket_impl(Protocol& proto, uint16_t port, listen_options opt);
virtual future<connected_socket, socket_address> accept() override;
virtual void abort_accept() override;
};
template <typename Protocol>
@@ -58,6 +59,12 @@ native_server_socket_impl<Protocol>::accept() {
});
}
template <typename Protocol>
void
native_server_socket_impl<Protocol>::abort_accept() {
_listener.abort_accept();
}
// native_connected_socket_impl
template <typename Protocol>
class native_connected_socket_impl : public connected_socket_impl {
@@ -69,6 +76,8 @@ public:
: _conn(std::move(conn)) {}
virtual input_stream<char> input() override;
virtual output_stream<char> output() override;
virtual void shutdown_input() override;
virtual void shutdown_output() override;
};
template <typename Protocol>
@@ -130,6 +139,19 @@ native_connected_socket_impl<Protocol>::output() {
return output_stream<char>(std::move(ds), 8192);
}
template <typename Protocol>
void
native_connected_socket_impl<Protocol>::shutdown_input() {
_conn.close_read();
}
template <typename Protocol>
void
native_connected_socket_impl<Protocol>::shutdown_output() {
_conn.close_write();
}
}

View File

@@ -33,6 +33,12 @@ private:
public:
virtual input_stream<char> input() override { return input_stream<char>(posix_data_source(_fd)); }
virtual output_stream<char> output() override { return output_stream<char>(posix_data_sink(_fd), 8192); }
virtual void shutdown_input() override {
_fd.shutdown(SHUT_RD);
}
virtual void shutdown_output() override {
_fd.shutdown(SHUT_WR);
}
friend class posix_server_socket_impl;
friend class posix_ap_server_socket_impl;
friend class posix_reuseport_server_socket_impl;
@@ -59,6 +65,11 @@ posix_server_socket_impl::accept() {
});
}
void
posix_server_socket_impl::abort_accept() {
_lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
}
future<connected_socket, socket_address> posix_ap_server_socket_impl::accept() {
auto conni = conn_q.find(_sa.as_posix_sockaddr_in());
if (conni != conn_q.end()) {
@@ -73,6 +84,16 @@ future<connected_socket, socket_address> posix_ap_server_socket_impl::accept() {
}
}
void
posix_ap_server_socket_impl::abort_accept() {
conn_q.erase(_sa.as_posix_sockaddr_in());
auto i = sockets.find(_sa.as_posix_sockaddr_in());
if (i != sockets.end()) {
i->second.set_exception(std::system_error(ECONNABORTED, std::system_category()));
sockets.erase(i);
}
}
future<connected_socket, socket_address>
posix_reuseport_server_socket_impl::accept() {
return _lfd.accept().then([this] (pollable_fd fd, socket_address sa) {
@@ -82,6 +103,11 @@ posix_reuseport_server_socket_impl::accept() {
});
}
void
posix_reuseport_server_socket_impl::abort_accept() {
_lfd.abort_reader(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
}
void posix_ap_server_socket_impl::move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr) {
auto i = sockets.find(sa.as_posix_sockaddr_in());
if (i != sockets.end()) {

View File

@@ -65,6 +65,7 @@ class posix_ap_server_socket_impl : public server_socket_impl {
public:
explicit posix_ap_server_socket_impl(socket_address sa) : _sa(sa) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
static void move_connected_socket(socket_address sa, pollable_fd fd, socket_address addr);
};
@@ -74,6 +75,7 @@ class posix_server_socket_impl : public server_socket_impl {
public:
explicit posix_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
};
class posix_reuseport_server_socket_impl : public server_socket_impl {
@@ -82,6 +84,7 @@ class posix_reuseport_server_socket_impl : public server_socket_impl {
public:
explicit posix_reuseport_server_socket_impl(socket_address sa, pollable_fd lfd) : _sa(sa), _lfd(std::move(lfd)) {}
virtual future<connected_socket, socket_address> accept();
virtual void abort_accept() override;
};
class posix_network_stack : public network_stack {

View File

@@ -323,6 +323,7 @@ private:
void input_handle_other_state(tcp_hdr* th, packet p);
void output_one(bool data_retransmit = false);
future<> wait_for_data();
void abort_reader();
future<> wait_for_all_data_acked();
future<> send(packet p);
void connect();
@@ -583,6 +584,9 @@ public:
return make_ready_future<connection>(_q.pop());
});
}
void abort_accept() {
_q.abort(std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
}
friend class tcp;
};
public:
@@ -1526,6 +1530,16 @@ future<> tcp<InetTraits>::tcb::wait_for_data() {
return _rcv._data_received_promise->get_future();
}
template <typename InetTraits>
void
tcp<InetTraits>::tcb::abort_reader() {
if (_rcv._data_received_promise) {
_rcv._data_received_promise->set_exception(
std::make_exception_ptr(std::system_error(ECONNABORTED, std::system_category())));
_rcv._data_received_promise = std::experimental::nullopt;
}
}
template <typename InetTraits>
future<> tcp<InetTraits>::tcb::wait_for_all_data_acked() {
if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) {
@@ -1875,6 +1889,7 @@ std::experimental::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb:
template <typename InetTraits>
void tcp<InetTraits>::connection::close_read() {
_tcb->abort_reader();
}
template <typename InetTraits>