diff --git a/transport/response.hh b/transport/response.hh index 5844e1646f..7c6e7cb1f7 100644 --- a/transport/response.hh +++ b/transport/response.hh @@ -49,15 +49,14 @@ class cql_server::response { int16_t _stream; cql_binary_opcode _opcode; uint8_t _flags = 0; // a bitwise OR mask of zero or more cql_frame_flags values - std::vector _body; + bytes_ostream _body; public: response(int16_t stream, cql_binary_opcode opcode, const tracing::trace_state_ptr& tr_state_ptr) : _stream{stream} , _opcode{opcode} - , _body(tracing::should_return_id_in_response(tr_state_ptr) ? utils::UUID::serialized_size() : 0) { if (tracing::should_return_id_in_response(tr_state_ptr)) { - auto i = _body.begin(); + auto i = _body.write_place_holder(utils::UUID::serialized_size()); tr_state_ptr->session_id().serialize(i); set_frame_flag(cql_frame_flags::tracing); } @@ -92,8 +91,8 @@ public: } private: void compress(cql_compression compression); - std::vector compress_lz4(const std::vector& body); - std::vector compress_snappy(const std::vector& body); + void compress_lz4(); + void compress_snappy(); template sstring make_frame_one(uint8_t version, size_t length) { diff --git a/transport/server.cc b/transport/server.cc index c44184fdb3..8cb42bc869 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -1521,7 +1521,9 @@ cql_server::response::output(output_stream& out, uint8_t version, cql_comp std::copy_n(frame.begin(), frame.size(), tmp.get_write()); auto f = out.write(tmp.get(), tmp.size()); return f.then([this, &out, tmp = std::move(tmp)] { - return out.write(_body.data(), _body.size()); + return seastar::do_for_each(_body.begin(), _body.end(), [&out] (bytes_view fragment) { + return out.write(reinterpret_cast(fragment.data()), fragment.size()); + }); }); } @@ -1529,10 +1531,10 @@ void cql_server::response::compress(cql_compression compression) { switch (compression) { case cql_compression::lz4: - _body = compress_lz4(_body); + compress_lz4(); break; case cql_compression::snappy: - _body = compress_snappy(_body); + compress_snappy(); break; default: throw std::invalid_argument("Invalid CQL compression algorithm"); @@ -1540,10 +1542,11 @@ void cql_server::response::compress(cql_compression compression) set_frame_flag(cql_frame_flags::compression); } -std::vector cql_server::response::compress_lz4(const std::vector& body) +void cql_server::response::compress_lz4() { - const char* input = body.data(); - size_t input_len = body.size(); + auto view = _body.linearize(); + const char* input = reinterpret_cast(view.data()); + size_t input_len = view.size(); std::vector comp; comp.resize(LZ4_COMPRESSBOUND(input_len) + 4); char *output = comp.data(); @@ -1561,13 +1564,15 @@ std::vector cql_server::response::compress_lz4(const std::vector& bo } size_t output_len = ret + 4; comp.resize(output_len); - return comp; + _body = { }; + _body.write(comp.data(), comp.size()); } -std::vector cql_server::response::compress_snappy(const std::vector& body) +void cql_server::response::compress_snappy() { - const char* input = body.data(); - size_t input_len = body.size(); + auto view = _body.linearize(); + const char* input = reinterpret_cast(view.data()); + size_t input_len = view.size(); std::vector comp; size_t output_len = snappy_max_compressed_length(input_len); comp.resize(output_len); @@ -1576,7 +1581,8 @@ std::vector cql_server::response::compress_snappy(const std::vector& throw std::runtime_error("CQL frame Snappy compression failure"); } comp.resize(output_len); - return comp; + _body = { }; + _body.write(comp.data(), comp.size()); } void cql_server::response::serialize(const event::schema_change& event, uint8_t version) @@ -1609,28 +1615,29 @@ void cql_server::response::serialize(const event::schema_change& event, uint8_t void cql_server::response::write_byte(uint8_t b) { - _body.insert(_body.end(), b); + auto s = reinterpret_cast(&b); + _body.write(bytes_view(s, sizeof(b))); } void cql_server::response::write_int(int32_t n) { auto u = htonl(n); - auto *s = reinterpret_cast(&u); - _body.insert(_body.end(), s, s+sizeof(u)); + auto *s = reinterpret_cast(&u); + _body.write(bytes_view(s, sizeof(u))); } void cql_server::response::write_long(int64_t n) { auto u = htonq(n); - auto *s = reinterpret_cast(&u); - _body.insert(_body.end(), s, s+sizeof(u)); + auto *s = reinterpret_cast(&u); + _body.write(bytes_view(s, sizeof(u))); } void cql_server::response::write_short(uint16_t n) { auto u = htons(n); - auto *s = reinterpret_cast(&u); - _body.insert(_body.end(), s, s+sizeof(u)); + auto *s = reinterpret_cast(&u); + _body.write(bytes_view(s, sizeof(u))); } template @@ -1646,19 +1653,19 @@ T cast_if_fits(size_t v) { void cql_server::response::write_string(const sstring& s) { write_short(cast_if_fits(s.size())); - _body.insert(_body.end(), s.begin(), s.end()); + _body.write(bytes_view(reinterpret_cast(s.data()), s.size())); } void cql_server::response::write_bytes_as_string(bytes_view s) { write_short(cast_if_fits(s.size())); - _body.insert(_body.end(), s.begin(), s.end()); + _body.write(s); } void cql_server::response::write_long_string(const sstring& s) { write_int(cast_if_fits(s.size())); - _body.insert(_body.end(), s.begin(), s.end()); + _body.write(bytes_view(reinterpret_cast(s.data()), s.size())); } void cql_server::response::write_string_list(std::vector string_list) @@ -1672,13 +1679,13 @@ void cql_server::response::write_string_list(std::vector string_list) void cql_server::response::write_bytes(bytes b) { write_int(cast_if_fits(b.size())); - _body.insert(_body.end(), b.begin(), b.end()); + _body.write(b); } void cql_server::response::write_short_bytes(bytes b) { write_short(cast_if_fits(b.size())); - _body.insert(_body.end(), b.begin(), b.end()); + _body.write(b); } void cql_server::response::write_inet(ipv4_addr inet) @@ -1731,7 +1738,7 @@ void cql_server::response::write_value(bytes_opt value) } write_int(value->size()); - _body.insert(_body.end(), value->begin(), value->end()); + _body.write(*value); } class type_codec {