diff --git a/transport/server.cc b/transport/server.cc index deff5b2fdb..454d5c21e6 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -37,32 +37,6 @@ struct cql_frame_error : std::exception { } }; -struct [[gnu::packed]] cql_binary_frame_v1 { - uint8_t version; - uint8_t flags; - uint8_t stream; - uint8_t opcode; - net::packed length; - - template - void adjust_endianness(Adjuster a) { - return a(length); - } -}; - -struct [[gnu::packed]] cql_binary_frame_v3 { - uint8_t version; - uint8_t flags; - net::packed stream; - uint8_t opcode; - net::packed length; - - template - void adjust_endianness(Adjuster a) { - return a(stream, length); - } -}; - enum class cql_binary_opcode : uint8_t { ERROR = 0, STARTUP = 1, @@ -169,85 +143,6 @@ transport::event::event_type parse_event_type(const sstring& value) } } -struct cql_query_state { - service::query_state query_state; - std::unique_ptr options; - - cql_query_state(service::client_state& client_state) - : query_state(client_state) - { } -}; - -class cql_server::connection { - cql_server& _server; - connected_socket _fd; - input_stream _read_buf; - output_stream _write_buf; - seastar::gate _pending_requests_gate; - future<> _ready_to_respond = make_ready_future<>(); - uint8_t _version = 0; - serialization_format _serialization_format = serialization_format::use_16_bit(); - service::client_state _client_state; - std::unordered_map _query_states; -public: - connection(cql_server& server, connected_socket&& fd, socket_address addr); - ~connection(); - future<> process(); - future<> process_request(); -private: - - future<> process_request_one(temporary_buffer buf, - uint8_t op, - uint16_t stream); - unsigned frame_size() const; - cql_binary_frame_v3 parse_frame(temporary_buffer buf); - future> read_frame(); - future<> process_startup(uint16_t stream, temporary_buffer buf); - future<> process_auth_response(uint16_t stream, temporary_buffer buf); - future<> process_options(uint16_t stream, temporary_buffer buf); - future<> process_query(uint16_t stream, temporary_buffer buf); - future<> process_prepare(uint16_t stream, temporary_buffer buf); - future<> process_execute(uint16_t stream, temporary_buffer buf); - future<> process_batch(uint16_t stream, temporary_buffer buf); - future<> process_register(uint16_t stream, temporary_buffer buf); - - future<> write_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive); - future<> write_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present); - future<> write_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name); - future<> write_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id); - future<> write_error(int16_t stream, exceptions::exception_code err, sstring msg); - future<> write_ready(int16_t stream); - future<> write_supported(int16_t stream); - future<> write_result(int16_t stream, shared_ptr msg); - future<> write_topology_change_event(const transport::event::topology_change& event); - future<> write_status_change_event(const transport::event::status_change& event); - future<> write_schema_change_event(const transport::event::schema_change& event); - future<> write_response(shared_ptr response); - - void check_room(temporary_buffer& buf, size_t n); - void validate_utf8(sstring_view s); - int8_t read_byte(temporary_buffer& buf); - int32_t read_int(temporary_buffer& buf); - int64_t read_long(temporary_buffer& buf); - int16_t read_short(temporary_buffer& buf); - uint16_t read_unsigned_short(temporary_buffer& buf); - sstring read_string(temporary_buffer& buf); - bytes read_short_bytes(temporary_buffer& buf); - bytes_opt read_value(temporary_buffer& buf); - sstring_view read_long_string_view(temporary_buffer& buf); - void read_name_and_value_list(temporary_buffer& buf, std::vector& names, std::vector& values); - void read_string_list(temporary_buffer& buf, std::vector& strings); - void read_value_list(temporary_buffer& buf, std::vector& values); - db::consistency_level read_consistency(temporary_buffer& buf); - std::unordered_map read_string_map(temporary_buffer& buf); - std::unique_ptr read_options(temporary_buffer& buf); - - cql_query_state& get_query_state(uint16_t stream); - void init_serialization_format(); - - friend event_notifier; -}; - cql_server::event_notifier::event_notifier(uint16_t port) : _port{port} { diff --git a/transport/server.hh b/transport/server.hh index 7e1c283afb..86a2fe02a3 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -21,6 +21,32 @@ class registrations; class database; +struct [[gnu::packed]] cql_binary_frame_v1 { + uint8_t version; + uint8_t flags; + uint8_t stream; + uint8_t opcode; + net::packed length; + + template + void adjust_endianness(Adjuster a) { + return a(length); + } +}; + +struct [[gnu::packed]] cql_binary_frame_v3 { + uint8_t version; + uint8_t flags; + net::packed stream; + uint8_t opcode; + net::packed length; + + template + void adjust_endianness(Adjuster a) { + return a(stream, length); + } +}; + class cql_server { class event_notifier; @@ -86,4 +112,83 @@ public: virtual void on_move(const gms::inet_address& endpoint) override; }; +struct cql_query_state { + service::query_state query_state; + std::unique_ptr options; + + cql_query_state(service::client_state& client_state) + : query_state(client_state) + { } +}; + +class cql_server::connection { + cql_server& _server; + connected_socket _fd; + input_stream _read_buf; + output_stream _write_buf; + seastar::gate _pending_requests_gate; + future<> _ready_to_respond = make_ready_future<>(); + uint8_t _version = 0; + serialization_format _serialization_format = serialization_format::use_16_bit(); + service::client_state _client_state; + std::unordered_map _query_states; +public: + connection(cql_server& server, connected_socket&& fd, socket_address addr); + ~connection(); + future<> process(); + future<> process_request(); +private: + + future<> process_request_one(temporary_buffer buf, + uint8_t op, + uint16_t stream); + unsigned frame_size() const; + cql_binary_frame_v3 parse_frame(temporary_buffer buf); + future> read_frame(); + future<> process_startup(uint16_t stream, temporary_buffer buf); + future<> process_auth_response(uint16_t stream, temporary_buffer buf); + future<> process_options(uint16_t stream, temporary_buffer buf); + future<> process_query(uint16_t stream, temporary_buffer buf); + future<> process_prepare(uint16_t stream, temporary_buffer buf); + future<> process_execute(uint16_t stream, temporary_buffer buf); + future<> process_batch(uint16_t stream, temporary_buffer buf); + future<> process_register(uint16_t stream, temporary_buffer buf); + + future<> write_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive); + future<> write_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present); + future<> write_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name); + future<> write_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id); + future<> write_error(int16_t stream, exceptions::exception_code err, sstring msg); + future<> write_ready(int16_t stream); + future<> write_supported(int16_t stream); + future<> write_result(int16_t stream, shared_ptr msg); + future<> write_topology_change_event(const transport::event::topology_change& event); + future<> write_status_change_event(const transport::event::status_change& event); + future<> write_schema_change_event(const transport::event::schema_change& event); + future<> write_response(shared_ptr response); + + void check_room(temporary_buffer& buf, size_t n); + void validate_utf8(sstring_view s); + int8_t read_byte(temporary_buffer& buf); + int32_t read_int(temporary_buffer& buf); + int64_t read_long(temporary_buffer& buf); + int16_t read_short(temporary_buffer& buf); + uint16_t read_unsigned_short(temporary_buffer& buf); + sstring read_string(temporary_buffer& buf); + bytes read_short_bytes(temporary_buffer& buf); + bytes_opt read_value(temporary_buffer& buf); + sstring_view read_long_string_view(temporary_buffer& buf); + void read_name_and_value_list(temporary_buffer& buf, std::vector& names, std::vector& values); + void read_string_list(temporary_buffer& buf, std::vector& strings); + void read_value_list(temporary_buffer& buf, std::vector& values); + db::consistency_level read_consistency(temporary_buffer& buf); + std::unordered_map read_string_map(temporary_buffer& buf); + std::unique_ptr read_options(temporary_buffer& buf); + + cql_query_state& get_query_state(uint16_t stream); + void init_serialization_format(); + + friend event_notifier; +}; + #endif