bytes_ostream: base on managed_bytes

bytes_ostream is an incremental builder for a discontiguous byte container.
managed_bytes is a non-incremental (size must be known up front) byte
container, that is also compatible with LSA. So far, conversion between
them involves copying. This is unfortunate, since query_result is generated
as a bytes_ostream, but is later converted to managed_bytes (today, this
is done in cql3::expr::get_non_pk_values() and
compound_view_wrapper::explode(). If the two types could be made compatible,
we could use managed_bytes_view instead of creating new objects and avoid
a copy. It's also nicer to have one less vocabulary type.

This patch makes bytes_ostream use managed_bytes' internal representation
(blob_storage instead of bytes_ostream::chunk) and provides a conversion
to managed_bytes. All bytes_ostream users are left in place, but the goal
is to make bytes_ostream a write-only type with the only observer a conversion
to managed_bytes.

It turns out to be relatively simple. The internal representations were
already similar. I made blob_storage::ref_type self-initializing to
reduce churn (good practice anyway) and added a private constructor
to managed_bytes for the conversion.

Note that bytes_ostream can only be used to construct a non-LSA managed_bytes,
but LSA uses of managed_bytes are very strictly controlled (the entry
points to memtable and cache) so that's not a problem.

A unit test is added.

Closes #10986
This commit is contained in:
Avi Kivity
2022-07-06 18:10:56 +03:00
committed by Nadav Har'El
parent 5526738794
commit 53e0dc7530
3 changed files with 96 additions and 66 deletions

View File

@@ -11,6 +11,7 @@
#include <boost/range/iterator_range.hpp>
#include "bytes.hh"
#include "utils/managed_bytes.hh"
#include "hashing.hh"
#include <seastar/core/simple-stream.hh>
#include <seastar/core/loop.hh>
@@ -31,26 +32,15 @@ public:
static constexpr size_type max_chunk_size() { return max_alloc_size() - sizeof(chunk); }
private:
static_assert(sizeof(value_type) == 1, "value_type is assumed to be one byte long");
struct chunk {
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
std::unique_ptr<chunk> next;
~chunk() {
auto p = std::move(next);
while (p) {
// Avoid recursion when freeing chunks
auto p_next = std::move(p->next);
p = std::move(p_next);
}
}
size_type offset; // Also means "size" after chunk is closed
size_type size;
value_type data[0];
void operator delete(void* ptr) { free(ptr); }
};
// Note: while appending data, chunk::size refers to the allocated space in the chunk,
// and chunk::frag_size refers to the currently occupied space in the chunk.
// After building, the first chunk::size is the whole object size, and chunk::frag_size
// doesn't change. This fits with managed_bytes interpretation.
using chunk = blob_storage;
static constexpr size_type default_chunk_size{512};
static constexpr size_type max_alloc_size() { return 128 * 1024; }
private:
std::unique_ptr<chunk> _begin;
blob_storage::ref_type _begin;
chunk* _current;
size_type _size;
size_type _initial_chunk_size = default_chunk_size;
@@ -70,13 +60,13 @@ public:
fragment_iterator(const fragment_iterator&) = default;
fragment_iterator& operator=(const fragment_iterator&) = default;
bytes_view operator*() const {
return { _current->data, _current->offset };
return { _current->data, _current->frag_size };
}
bytes_view operator->() const {
return *(*this);
}
fragment_iterator& operator++() {
_current = _current->next.get();
_current = _current->next;
return *this;
}
fragment_iterator operator++(int) {
@@ -119,7 +109,7 @@ private:
if (!_current) {
return 0;
}
return _current->size - _current->offset;
return _current->size - _current->frag_size;
}
// Figure out next chunk size.
// - must be enough for data_size + sizeof(chunk)
@@ -139,8 +129,8 @@ private:
[[gnu::always_inline]]
value_type* alloc(size_type size) {
if (__builtin_expect(size <= current_space_left(), true)) {
auto ret = _current->data + _current->offset;
_current->offset += size;
auto ret = _current->data + _current->frag_size;
_current->frag_size += size;
_size += size;
return ret;
} else {
@@ -154,19 +144,21 @@ private:
if (!space) {
throw std::bad_alloc();
}
auto new_chunk = std::unique_ptr<chunk>(new (space) chunk());
new_chunk->offset = size;
new_chunk->size = alloc_size - sizeof(chunk);
if (_current) {
_current->next = std::move(new_chunk);
_current = _current->next.get();
} else {
_begin = std::move(new_chunk);
_current = _begin.get();
}
auto backref = _current ? &_current->next : &_begin;
auto new_chunk = new (space) chunk(backref, alloc_size - sizeof(chunk), size);
_current = new_chunk;
_size += size;
return _current->data;
}
[[gnu::noinline]]
void free_chain(chunk* c) noexcept {
while (c) {
auto n = c->next;
c->~chunk();
::free(c);
c = n;
}
}
public:
explicit bytes_ostream(size_t initial_chunk_size) noexcept
: _begin()
@@ -178,7 +170,7 @@ public:
bytes_ostream() noexcept : bytes_ostream(default_chunk_size) {}
bytes_ostream(bytes_ostream&& o) noexcept
: _begin(std::move(o._begin))
: _begin(std::exchange(o._begin, {}))
, _current(o._current)
, _size(o._size)
, _initial_chunk_size(o._initial_chunk_size)
@@ -196,6 +188,10 @@ public:
append(o);
}
~bytes_ostream() {
free_chain(_begin.ptr);
}
bytes_ostream& operator=(const bytes_ostream& o) {
if (this != &o) {
auto x = bytes_ostream(o);
@@ -243,8 +239,8 @@ public:
auto this_size = std::min(v.size(), size_t(current_space_left()));
if (__builtin_expect(this_size, true)) {
memcpy(_current->data + _current->offset, v.begin(), this_size);
_current->offset += this_size;
memcpy(_current->data + _current->frag_size, v.begin(), this_size);
_current->frag_size += this_size;
_size += this_size;
v.remove_prefix(this_size);
}
@@ -287,19 +283,20 @@ public:
throw std::bad_alloc();
}
auto new_chunk = std::unique_ptr<chunk>(new (space) chunk());
new_chunk->offset = _size;
new_chunk->size = _size;
auto old_begin = _begin;
auto new_chunk = new (space) chunk(&_begin, _size, _size);
auto dst = new_chunk->data;
auto r = _begin.get();
auto r = old_begin.ptr;
while (r) {
auto next = r->next.get();
dst = std::copy_n(r->data, r->offset, dst);
auto next = r->next;
dst = std::copy_n(r->data, r->frag_size, dst);
r->~chunk();
::free(r);
r = next;
}
_current = new_chunk.get();
_current = new_chunk;
_begin = std::move(new_chunk);
return bytes_view(_current->data, _size);
}
@@ -333,22 +330,23 @@ public:
void remove_suffix(size_t n) {
_size -= n;
auto left = _size;
auto current = _begin.get();
auto current = _begin.ptr;
while (current) {
if (current->offset >= left) {
current->offset = left;
if (current->frag_size >= left) {
current->frag_size = left;
_current = current;
current->next.reset();
free_chain(current->next);
current->next = nullptr;
return;
}
left -= current->offset;
current = current->next.get();
left -= current->frag_size;
current = current->next;
}
}
// begin() and end() form an input range to bytes_view representing fragments.
// Any modification of this instance invalidates iterators.
fragment_iterator begin() const { return { _begin.get() }; }
fragment_iterator begin() const { return { _begin.ptr }; }
fragment_iterator end() const { return { nullptr }; }
output_iterator write_begin() { return output_iterator(*this); }
@@ -363,7 +361,7 @@ public:
};
position pos() const {
return { _current, _current ? _current->offset : 0 };
return { _current, _current ? _current->frag_size : 0 };
}
// Returns the amount of bytes written since given position.
@@ -373,11 +371,11 @@ public:
if (!c) {
return _size;
}
size_type total = c->offset - pos._offset;
c = c->next.get();
size_type total = c->frag_size - pos._offset;
c = c->next;
while (c) {
total += c->offset;
c = c->next.get();
total += c->frag_size;
c = c->next;
}
return total;
}
@@ -391,8 +389,9 @@ public:
}
_size -= written_since(pos);
_current = pos._chunk;
free_chain(_current->next);
_current->next = nullptr;
_current->offset = pos._offset;
_current->frag_size = pos._offset;
}
void reduce_chunk_count() {
@@ -441,11 +440,23 @@ public:
// the clear() calls then writes will not involve any memory allocations,
// except for the first write made on this instance.
void clear() {
if (_begin) {
_begin->offset = 0;
if (_begin.ptr) {
_begin.ptr->frag_size = 0;
_size = 0;
_current = _begin.get();
_begin->next.reset();
free_chain(_begin.ptr->next);
_begin.ptr->next = nullptr;
_current = _begin.ptr;
}
}
managed_bytes to_managed_bytes() && {
if (_size) {
_begin.ptr->size = _size;
_current = nullptr;
_size = 0;
return managed_bytes(std::exchange(_begin.ptr, {}));
} else {
return managed_bytes();
}
}
@@ -456,15 +467,17 @@ public:
// the clear() calls then writes will not involve any memory allocations,
// except for the first write made on this instance.
future<> clear_gently() noexcept {
if (!_begin) {
if (!_begin.ptr) {
return make_ready_future<>();
}
_begin->offset = 0;
_begin->frag_size = 0;
_current = _begin.ptr;
_size = 0;
return do_until([this] { return !_begin->next; }, [this] {
// move next->next first to avoid it being recursively destroyed
// in ~chunk when _begin->next is move-assigned.
auto next = std::move(_begin->next->next);
return do_until([this] { return !_begin.ptr->next; }, [this] {
auto second_chunk = _begin.ptr->next;
auto next = second_chunk->next;
second_chunk->~chunk();
::free(second_chunk);
_begin->next = std::move(next);
return make_ready_future<>();
});

View File

@@ -325,3 +325,12 @@ BOOST_AUTO_TEST_CASE(test_remove_suffix) {
test(std::max(a, b), std::min(a, b));
}
}
BOOST_AUTO_TEST_CASE(test_conversion_to_managed_bytes) {
bytes_ostream buf;
append_sequence(buf, 1024);
auto mb = std::move(buf).to_managed_bytes();
bytes_ostream buf2;
buf2.write(to_bytes(mb));
assert_sequence(buf2, 1024);
}

View File

@@ -18,6 +18,8 @@
#include <unordered_map>
#include <type_traits>
class bytes_ostream;
template <mutable_view is_mutable_view>
class managed_bytes_basic_view;
using managed_bytes_view = managed_bytes_basic_view<mutable_view::no>;
@@ -25,7 +27,7 @@ using managed_bytes_mutable_view = managed_bytes_basic_view<mutable_view::yes>;
struct blob_storage {
struct [[gnu::packed]] ref_type {
blob_storage* ptr;
blob_storage* ptr = nullptr;
ref_type() {}
ref_type(blob_storage* ptr) : ptr(ptr) {}
@@ -72,6 +74,7 @@ struct blob_storage {
// A managed version of "bytes" (can be used with LSA).
class managed_bytes {
friend class bytes_ostream;
static constexpr size_t max_inline_size = 15;
struct small_blob {
bytes_view::value_type data[max_inline_size];
@@ -112,6 +115,11 @@ private:
}
std::unique_ptr<bytes_view::value_type[]> do_linearize_pure() const;
explicit managed_bytes(blob_storage* data) {
_u.small.size = -1;
_u.ptr.ptr = data;
data->backref = &_u.ptr;
}
public:
using size_type = blob_storage::size_type;
struct initialized_later {};