diff --git a/transport/server.cc b/transport/server.cc index a9f8516817..8e8aca4565 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -135,6 +135,7 @@ class cql_server::connection { connected_socket _fd; input_stream _read_buf; output_stream _write_buf; + seastar::gate _pending_requests_gate; future<> _ready_to_respond = make_ready_future<>(); uint8_t _version = 0; serialization_format _serialization_format = serialization_format::use_16_bit(); @@ -150,10 +151,18 @@ public: { } future<> process() { return do_until([this] { return _read_buf.eof(); }, [this] { return process_request(); }) - .finally([this] { return std::move(_ready_to_respond); }); + .finally([this] { + return _pending_requests_gate.close().then([this] { + std::move(_ready_to_respond); + }); + }); } future<> process_request(); private: + + future<> process_request_one(temporary_buffer buf, + uint8_t op, + uint16_t stream); unsigned frame_size() const; cql_binary_frame_v3 parse_frame(temporary_buffer buf); future> read_frame(); @@ -383,6 +392,44 @@ cql_server::connection::read_frame() { } } +future<> cql_server::connection::process_request_one(temporary_buffer buf, + uint8_t op, + uint16_t stream) { + return make_ready_future<>().then([this, op, stream, buf = std::move(buf)] () mutable { + switch (static_cast(op)) { + case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(buf)); + case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(buf)); + case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(buf)); + case cql_binary_opcode::QUERY: return process_query(stream, std::move(buf)); + case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(buf)); + case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(buf)); + case cql_binary_opcode::BATCH: return process_batch(stream, std::move(buf)); + case cql_binary_opcode::REGISTER: return process_register(stream, std::move(buf)); + default: throw exceptions::protocol_exception(sprint("Unknown opcode %d", op)); + } + }).then_wrapped([stream, this] (future<> f) { + --_server._requests_serving; + try { + f.get(); + return make_ready_future<>(); + } catch (const exceptions::unavailable_exception& ex) { + return write_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive); + } catch (const exceptions::read_timeout_exception& ex) { + return write_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present); + } catch (const exceptions::already_exists_exception& ex) { + return write_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name); + } catch (const exceptions::prepared_query_not_found_exception& ex) { + return write_unprepared_error(stream, ex.code(), ex.what(), ex.id); + } catch (const exceptions::cassandra_exception& ex) { + return write_error(stream, ex.code(), ex.what()); + } catch (std::exception& ex) { + return write_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what()); + } catch (...) { + return write_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error"); + } + }); +} + future<> cql_server::connection::process_request() { return read_frame().then_wrapped([this] (future>&& v) { auto maybe_frame = std::get<0>(v.get()); @@ -390,44 +437,30 @@ future<> cql_server::connection::process_request() { // eof return make_ready_future<>(); } + auto& f = *maybe_frame; - return _read_buf.read_exactly(f.length).then([this, f] (temporary_buffer buf) { - // FIXME: compression - if (f.flags & 0x01) { - throw std::runtime_error("CQL frame compression is not supported"); - } + + // FIXME: compression + if (f.flags & 0x01) { + throw std::runtime_error("CQL frame compression is not supported"); + } + + auto op = f.opcode; + auto stream = f.stream; + + return _read_buf.read_exactly(f.length).then([this, op, stream] (temporary_buffer buf) { + ++_server._requests_served; ++_server._requests_serving; - switch (static_cast(f.opcode)) { - case cql_binary_opcode::STARTUP: return process_startup(f.stream, std::move(buf)); - case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(f.stream, std::move(buf)); - case cql_binary_opcode::OPTIONS: return process_options(f.stream, std::move(buf)); - case cql_binary_opcode::QUERY: return process_query(f.stream, std::move(buf)); - case cql_binary_opcode::PREPARE: return process_prepare(f.stream, std::move(buf)); - case cql_binary_opcode::EXECUTE: return process_execute(f.stream, std::move(buf)); - case cql_binary_opcode::BATCH: return process_batch(f.stream, std::move(buf)); - case cql_binary_opcode::REGISTER: return process_register(f.stream, std::move(buf)); - default: throw exceptions::protocol_exception(sprint("Unknown opcode %d", f.opcode)); - }; - }).then_wrapped([stream = f.stream, this] (future<> f) { - --_server._requests_serving; - try { - f.get(); - } catch (const exceptions::unavailable_exception& ex) { - write_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive); - } catch (const exceptions::read_timeout_exception& ex) { - write_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present); - } catch (const exceptions::already_exists_exception& ex) { - write_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name); - } catch (const exceptions::prepared_query_not_found_exception& ex) { - write_unprepared_error(stream, ex.code(), ex.what(), ex.id); - } catch (const exceptions::cassandra_exception& ex) { - write_error(stream, ex.code(), ex.what()); - } catch (std::exception& ex) { - write_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what()); - } catch (...) { - write_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error"); - } + + with_gate( + _pending_requests_gate, + [this, op, stream, buf = std::move(buf)] () mutable { + return process_request_one(std::move(buf), op, stream); + } + ); + + return make_ready_future<>(); }); }); }