/* * Copyright (C) 2021 ScyllaDB */ /* * This file is part of Scylla. * * Scylla is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Scylla is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Scylla. If not, see . */ #include "generic_server.hh" namespace generic_server { connection::connection(server& server, connected_socket&& fd) : _server{server} , _fd{std::move(fd)} , _read_buf(_fd.input()) , _write_buf(_fd.output()) { ++_server._total_connections; ++_server._current_connections; _server._connections_list.push_back(*this); } connection::~connection() { --_server._current_connections; _server._connections_list.erase(_server._connections_list.iterator_to(*this)); _server.maybe_idle(); } static bool is_broken_pipe_or_connection_reset(std::exception_ptr ep) { try { std::rethrow_exception(ep); } catch (const std::system_error& e) { return e.code().category() == std::system_category() && (e.code().value() == EPIPE || e.code().value() == ECONNRESET); } catch (...) {} return false; } future<> connection::process() { return with_gate(_pending_requests_gate, [this] { return do_until([this] { return _read_buf.eof(); }, [this] { return process_request(); }).then_wrapped([this] (future<> f) { handle_error(std::move(f)); }); }).finally([this] { return _pending_requests_gate.close().then([this] { on_connection_close(); return _ready_to_respond.handle_exception([] (std::exception_ptr ep) { if (is_broken_pipe_or_connection_reset(ep)) { // expected if another side closes a connection or we're shutting down return; } std::rethrow_exception(ep); }).finally([this] { return _write_buf.close(); }); }); }); } void connection::on_connection_close() { } future<> connection::shutdown() { try { _fd.shutdown_input(); _fd.shutdown_output(); } catch (...) { } return make_ready_future<>(); } server::~server() { } void server::maybe_idle() { if (_stopping && !_connections_being_accepted && !_current_connections) { _all_connections_stopped.set_value(); } } }