The API to put scattered_message into output_stream() is gone in seastar API level 9, transport is the only place in Scylla that still uses it. The change is to put the response as a sequence of temporary_buffer-s. This preserves the zero-copy-ness of the reply, but needs few things to care about. First, the response header frame needs to be put as zero-copy buffer too. Despite output_stream() supports semi-mixed mode, where z.c. buffers can follow the buffered writes, it won't apply here. The socket is flushed() in batched mode, so even if the first reply populates the stream with data and flushes it, the next response may happen to start putting the header frame before delayed flush took place. Second, because socket is flushed in batch-flush poller, the temporary buffers that are put into it must hold the foreigh_ptr with the response object. With scattered message this was implemented with the help of a delter that was attached to the message, now the deleter is shared between all buffers. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
130 lines
4.1 KiB
C++
130 lines
4.1 KiB
C++
/*
|
|
* Copyright (C) 2018-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "server.hh"
|
|
|
|
namespace cql_transport {
|
|
|
|
enum class cql_binary_opcode : uint8_t {
|
|
ERROR = 0,
|
|
STARTUP = 1,
|
|
READY = 2,
|
|
AUTHENTICATE = 3,
|
|
CREDENTIALS = 4,
|
|
OPTIONS = 5,
|
|
SUPPORTED = 6,
|
|
QUERY = 7,
|
|
RESULT = 8,
|
|
PREPARE = 9,
|
|
EXECUTE = 10,
|
|
REGISTER = 11,
|
|
EVENT = 12,
|
|
BATCH = 13,
|
|
AUTH_CHALLENGE = 14,
|
|
AUTH_RESPONSE = 15,
|
|
AUTH_SUCCESS = 16,
|
|
OPCODES_COUNT
|
|
};
|
|
|
|
class response {
|
|
int16_t _stream;
|
|
cql_binary_opcode _opcode;
|
|
uint8_t _flags = 0; // a bitwise OR mask of zero or more cql_frame_flags values
|
|
bytes_ostream _body;
|
|
public:
|
|
template<typename T>
|
|
class placeholder;
|
|
|
|
response(int16_t stream, cql_binary_opcode opcode, const tracing::trace_state_ptr& tr_state_ptr)
|
|
: _stream{stream}
|
|
, _opcode{opcode}
|
|
{
|
|
if (tracing::should_return_id_in_response(tr_state_ptr)) {
|
|
auto i = _body.write_place_holder(utils::UUID::serialized_size());
|
|
tr_state_ptr->session_id().serialize(i);
|
|
set_frame_flag(cql_frame_flags::tracing);
|
|
}
|
|
}
|
|
|
|
void set_frame_flag(cql_frame_flags flag) noexcept {
|
|
_flags |= flag;
|
|
}
|
|
|
|
void serialize(const event::schema_change& event, uint8_t version);
|
|
void write_byte(uint8_t b);
|
|
void write_int(int32_t n);
|
|
placeholder<int32_t> write_int_placeholder();
|
|
void write_long(int64_t n);
|
|
void write_short(uint16_t n);
|
|
void write_string(std::string_view s);
|
|
void write_bytes_as_string(bytes_view s);
|
|
void write_long_string(const sstring& s);
|
|
void write_string_list(std::vector<sstring> string_list);
|
|
void write_bytes(bytes b);
|
|
void write_short_bytes(bytes b);
|
|
void write_inet(socket_address inet);
|
|
void write_consistency(db::consistency_level c);
|
|
void write_string_map(std::map<sstring, sstring> string_map);
|
|
void write_string_multimap(std::multimap<sstring, sstring> string_map);
|
|
void write_string_bytes_map(const std::unordered_map<sstring, bytes>& map);
|
|
void write_value(bytes_opt value);
|
|
void write_value(std::optional<managed_bytes_view> value);
|
|
void write(const cql3::metadata& m, const cql_metadata_id_wrapper& request_metadata_id, bool no_metadata = false);
|
|
void write(const cql3::prepared_metadata& m, uint8_t version);
|
|
|
|
future<> write_message(output_stream<char>& out, uint8_t version, cql_compression compression, seastar::deleter);
|
|
|
|
cql_binary_opcode opcode() const {
|
|
return _opcode;
|
|
}
|
|
size_t size() const {
|
|
return _body.size();
|
|
}
|
|
private:
|
|
void compress(cql_compression compression);
|
|
void compress_lz4();
|
|
void compress_snappy();
|
|
|
|
template <typename CqlFrameHeaderType>
|
|
temporary_buffer<char> make_frame_one(uint8_t version, size_t length) {
|
|
temporary_buffer<char> frame_buf(sizeof(CqlFrameHeaderType));
|
|
auto* frame = reinterpret_cast<CqlFrameHeaderType*>(frame_buf.get_write());
|
|
frame->version = version | 0x80;
|
|
frame->flags = _flags;
|
|
frame->opcode = static_cast<uint8_t>(_opcode);
|
|
frame->length = htonl(length);
|
|
frame->stream = net::hton((decltype(frame->stream))_stream);
|
|
|
|
return frame_buf;
|
|
}
|
|
|
|
utils::result_with_exception_ptr<temporary_buffer<char>> make_frame(uint8_t version, size_t length) {
|
|
if (version > 0x04) {
|
|
return bo::failure(std::make_exception_ptr(exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", version))));
|
|
}
|
|
|
|
return make_frame_one<cql_binary_frame_v3>(version, length);
|
|
}
|
|
};
|
|
|
|
template<>
|
|
class response::placeholder<int32_t> {
|
|
int8_t* _pointer;
|
|
public:
|
|
explicit placeholder(int8_t* ptr) : _pointer(ptr) { }
|
|
void write(int32_t n) {
|
|
auto u = htonl(n);
|
|
auto* s = reinterpret_cast<const int8_t*>(&u);
|
|
std::copy_n(s, sizeof(u), _pointer);
|
|
}
|
|
};
|
|
|
|
}
|