transport: response: use bytes_ostream

std::vector<char> is not a very good container for incrementally
building a response. It may cause excessive copies and allocations. If
the response is large it will put more pressure on the memory allocator
by requiring the buffer to be contiguous.

We already have bytes_ostream which avoids all of these problems, so
let's use it.
This commit is contained in:
Paweł Dziepak
2018-05-31 13:07:32 +01:00
parent c04d38b76b
commit a7c4d407ce
2 changed files with 35 additions and 29 deletions

View File

@@ -49,15 +49,14 @@ class cql_server::response {
int16_t _stream;
cql_binary_opcode _opcode;
uint8_t _flags = 0; // a bitwise OR mask of zero or more cql_frame_flags values
std::vector<char> _body;
bytes_ostream _body;
public:
response(int16_t stream, cql_binary_opcode opcode, const tracing::trace_state_ptr& tr_state_ptr)
: _stream{stream}
, _opcode{opcode}
, _body(tracing::should_return_id_in_response(tr_state_ptr) ? utils::UUID::serialized_size() : 0)
{
if (tracing::should_return_id_in_response(tr_state_ptr)) {
auto i = _body.begin();
auto i = _body.write_place_holder(utils::UUID::serialized_size());
tr_state_ptr->session_id().serialize(i);
set_frame_flag(cql_frame_flags::tracing);
}
@@ -92,8 +91,8 @@ public:
}
private:
void compress(cql_compression compression);
std::vector<char> compress_lz4(const std::vector<char>& body);
std::vector<char> compress_snappy(const std::vector<char>& body);
void compress_lz4();
void compress_snappy();
template <typename CqlFrameHeaderType>
sstring make_frame_one(uint8_t version, size_t length) {

View File

@@ -1521,7 +1521,9 @@ cql_server::response::output(output_stream<char>& out, uint8_t version, cql_comp
std::copy_n(frame.begin(), frame.size(), tmp.get_write());
auto f = out.write(tmp.get(), tmp.size());
return f.then([this, &out, tmp = std::move(tmp)] {
return out.write(_body.data(), _body.size());
return seastar::do_for_each(_body.begin(), _body.end(), [&out] (bytes_view fragment) {
return out.write(reinterpret_cast<const char*>(fragment.data()), fragment.size());
});
});
}
@@ -1529,10 +1531,10 @@ void cql_server::response::compress(cql_compression compression)
{
switch (compression) {
case cql_compression::lz4:
_body = compress_lz4(_body);
compress_lz4();
break;
case cql_compression::snappy:
_body = compress_snappy(_body);
compress_snappy();
break;
default:
throw std::invalid_argument("Invalid CQL compression algorithm");
@@ -1540,10 +1542,11 @@ void cql_server::response::compress(cql_compression compression)
set_frame_flag(cql_frame_flags::compression);
}
std::vector<char> cql_server::response::compress_lz4(const std::vector<char>& body)
void cql_server::response::compress_lz4()
{
const char* input = body.data();
size_t input_len = body.size();
auto view = _body.linearize();
const char* input = reinterpret_cast<const char*>(view.data());
size_t input_len = view.size();
std::vector<char> comp;
comp.resize(LZ4_COMPRESSBOUND(input_len) + 4);
char *output = comp.data();
@@ -1561,13 +1564,15 @@ std::vector<char> cql_server::response::compress_lz4(const std::vector<char>& bo
}
size_t output_len = ret + 4;
comp.resize(output_len);
return comp;
_body = { };
_body.write(comp.data(), comp.size());
}
std::vector<char> cql_server::response::compress_snappy(const std::vector<char>& body)
void cql_server::response::compress_snappy()
{
const char* input = body.data();
size_t input_len = body.size();
auto view = _body.linearize();
const char* input = reinterpret_cast<const char*>(view.data());
size_t input_len = view.size();
std::vector<char> comp;
size_t output_len = snappy_max_compressed_length(input_len);
comp.resize(output_len);
@@ -1576,7 +1581,8 @@ std::vector<char> cql_server::response::compress_snappy(const std::vector<char>&
throw std::runtime_error("CQL frame Snappy compression failure");
}
comp.resize(output_len);
return comp;
_body = { };
_body.write(comp.data(), comp.size());
}
void cql_server::response::serialize(const event::schema_change& event, uint8_t version)
@@ -1609,28 +1615,29 @@ void cql_server::response::serialize(const event::schema_change& event, uint8_t
void cql_server::response::write_byte(uint8_t b)
{
_body.insert(_body.end(), b);
auto s = reinterpret_cast<const int8_t*>(&b);
_body.write(bytes_view(s, sizeof(b)));
}
void cql_server::response::write_int(int32_t n)
{
auto u = htonl(n);
auto *s = reinterpret_cast<const char*>(&u);
_body.insert(_body.end(), s, s+sizeof(u));
auto *s = reinterpret_cast<const int8_t*>(&u);
_body.write(bytes_view(s, sizeof(u)));
}
void cql_server::response::write_long(int64_t n)
{
auto u = htonq(n);
auto *s = reinterpret_cast<const char*>(&u);
_body.insert(_body.end(), s, s+sizeof(u));
auto *s = reinterpret_cast<const int8_t*>(&u);
_body.write(bytes_view(s, sizeof(u)));
}
void cql_server::response::write_short(uint16_t n)
{
auto u = htons(n);
auto *s = reinterpret_cast<const char*>(&u);
_body.insert(_body.end(), s, s+sizeof(u));
auto *s = reinterpret_cast<const int8_t*>(&u);
_body.write(bytes_view(s, sizeof(u)));
}
template<typename T>
@@ -1646,19 +1653,19 @@ T cast_if_fits(size_t v) {
void cql_server::response::write_string(const sstring& s)
{
write_short(cast_if_fits<uint16_t>(s.size()));
_body.insert(_body.end(), s.begin(), s.end());
_body.write(bytes_view(reinterpret_cast<const int8_t*>(s.data()), s.size()));
}
void cql_server::response::write_bytes_as_string(bytes_view s)
{
write_short(cast_if_fits<uint16_t>(s.size()));
_body.insert(_body.end(), s.begin(), s.end());
_body.write(s);
}
void cql_server::response::write_long_string(const sstring& s)
{
write_int(cast_if_fits<int32_t>(s.size()));
_body.insert(_body.end(), s.begin(), s.end());
_body.write(bytes_view(reinterpret_cast<const int8_t*>(s.data()), s.size()));
}
void cql_server::response::write_string_list(std::vector<sstring> string_list)
@@ -1672,13 +1679,13 @@ void cql_server::response::write_string_list(std::vector<sstring> string_list)
void cql_server::response::write_bytes(bytes b)
{
write_int(cast_if_fits<int32_t>(b.size()));
_body.insert(_body.end(), b.begin(), b.end());
_body.write(b);
}
void cql_server::response::write_short_bytes(bytes b)
{
write_short(cast_if_fits<uint16_t>(b.size()));
_body.insert(_body.end(), b.begin(), b.end());
_body.write(b);
}
void cql_server::response::write_inet(ipv4_addr inet)
@@ -1731,7 +1738,7 @@ void cql_server::response::write_value(bytes_opt value)
}
write_int(value->size());
_body.insert(_body.end(), value->begin(), value->end());
_body.write(*value);
}
class type_codec {