bytes_ostream: base on managed_bytes
bytes_ostream is an incremental builder for a discontiguous byte container. managed_bytes is a non-incremental (size must be known up front) byte container, that is also compatible with LSA. So far, conversion between them involves copying. This is unfortunate, since query_result is generated as a bytes_ostream, but is later converted to managed_bytes (today, this is done in cql3::expr::get_non_pk_values() and compound_view_wrapper::explode(). If the two types could be made compatible, we could use managed_bytes_view instead of creating new objects and avoid a copy. It's also nicer to have one less vocabulary type. This patch makes bytes_ostream use managed_bytes' internal representation (blob_storage instead of bytes_ostream::chunk) and provides a conversion to managed_bytes. All bytes_ostream users are left in place, but the goal is to make bytes_ostream a write-only type with the only observer a conversion to managed_bytes. It turns out to be relatively simple. The internal representations were already similar. I made blob_storage::ref_type self-initializing to reduce churn (good practice anyway) and added a private constructor to managed_bytes for the conversion. Note that bytes_ostream can only be used to construct a non-LSA managed_bytes, but LSA uses of managed_bytes are very strictly controlled (the entry points to memtable and cache) so that's not a problem. A unit test is added. Closes #10986
This commit is contained in:
143
bytes_ostream.hh
143
bytes_ostream.hh
@@ -11,6 +11,7 @@
|
||||
#include <boost/range/iterator_range.hpp>
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "hashing.hh"
|
||||
#include <seastar/core/simple-stream.hh>
|
||||
#include <seastar/core/loop.hh>
|
||||
@@ -31,26 +32,15 @@ public:
|
||||
static constexpr size_type max_chunk_size() { return max_alloc_size() - sizeof(chunk); }
|
||||
private:
|
||||
static_assert(sizeof(value_type) == 1, "value_type is assumed to be one byte long");
|
||||
struct chunk {
|
||||
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
|
||||
std::unique_ptr<chunk> next;
|
||||
~chunk() {
|
||||
auto p = std::move(next);
|
||||
while (p) {
|
||||
// Avoid recursion when freeing chunks
|
||||
auto p_next = std::move(p->next);
|
||||
p = std::move(p_next);
|
||||
}
|
||||
}
|
||||
size_type offset; // Also means "size" after chunk is closed
|
||||
size_type size;
|
||||
value_type data[0];
|
||||
void operator delete(void* ptr) { free(ptr); }
|
||||
};
|
||||
// Note: while appending data, chunk::size refers to the allocated space in the chunk,
|
||||
// and chunk::frag_size refers to the currently occupied space in the chunk.
|
||||
// After building, the first chunk::size is the whole object size, and chunk::frag_size
|
||||
// doesn't change. This fits with managed_bytes interpretation.
|
||||
using chunk = blob_storage;
|
||||
static constexpr size_type default_chunk_size{512};
|
||||
static constexpr size_type max_alloc_size() { return 128 * 1024; }
|
||||
private:
|
||||
std::unique_ptr<chunk> _begin;
|
||||
blob_storage::ref_type _begin;
|
||||
chunk* _current;
|
||||
size_type _size;
|
||||
size_type _initial_chunk_size = default_chunk_size;
|
||||
@@ -70,13 +60,13 @@ public:
|
||||
fragment_iterator(const fragment_iterator&) = default;
|
||||
fragment_iterator& operator=(const fragment_iterator&) = default;
|
||||
bytes_view operator*() const {
|
||||
return { _current->data, _current->offset };
|
||||
return { _current->data, _current->frag_size };
|
||||
}
|
||||
bytes_view operator->() const {
|
||||
return *(*this);
|
||||
}
|
||||
fragment_iterator& operator++() {
|
||||
_current = _current->next.get();
|
||||
_current = _current->next;
|
||||
return *this;
|
||||
}
|
||||
fragment_iterator operator++(int) {
|
||||
@@ -119,7 +109,7 @@ private:
|
||||
if (!_current) {
|
||||
return 0;
|
||||
}
|
||||
return _current->size - _current->offset;
|
||||
return _current->size - _current->frag_size;
|
||||
}
|
||||
// Figure out next chunk size.
|
||||
// - must be enough for data_size + sizeof(chunk)
|
||||
@@ -139,8 +129,8 @@ private:
|
||||
[[gnu::always_inline]]
|
||||
value_type* alloc(size_type size) {
|
||||
if (__builtin_expect(size <= current_space_left(), true)) {
|
||||
auto ret = _current->data + _current->offset;
|
||||
_current->offset += size;
|
||||
auto ret = _current->data + _current->frag_size;
|
||||
_current->frag_size += size;
|
||||
_size += size;
|
||||
return ret;
|
||||
} else {
|
||||
@@ -154,19 +144,21 @@ private:
|
||||
if (!space) {
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
auto new_chunk = std::unique_ptr<chunk>(new (space) chunk());
|
||||
new_chunk->offset = size;
|
||||
new_chunk->size = alloc_size - sizeof(chunk);
|
||||
if (_current) {
|
||||
_current->next = std::move(new_chunk);
|
||||
_current = _current->next.get();
|
||||
} else {
|
||||
_begin = std::move(new_chunk);
|
||||
_current = _begin.get();
|
||||
}
|
||||
auto backref = _current ? &_current->next : &_begin;
|
||||
auto new_chunk = new (space) chunk(backref, alloc_size - sizeof(chunk), size);
|
||||
_current = new_chunk;
|
||||
_size += size;
|
||||
return _current->data;
|
||||
}
|
||||
[[gnu::noinline]]
|
||||
void free_chain(chunk* c) noexcept {
|
||||
while (c) {
|
||||
auto n = c->next;
|
||||
c->~chunk();
|
||||
::free(c);
|
||||
c = n;
|
||||
}
|
||||
}
|
||||
public:
|
||||
explicit bytes_ostream(size_t initial_chunk_size) noexcept
|
||||
: _begin()
|
||||
@@ -178,7 +170,7 @@ public:
|
||||
bytes_ostream() noexcept : bytes_ostream(default_chunk_size) {}
|
||||
|
||||
bytes_ostream(bytes_ostream&& o) noexcept
|
||||
: _begin(std::move(o._begin))
|
||||
: _begin(std::exchange(o._begin, {}))
|
||||
, _current(o._current)
|
||||
, _size(o._size)
|
||||
, _initial_chunk_size(o._initial_chunk_size)
|
||||
@@ -196,6 +188,10 @@ public:
|
||||
append(o);
|
||||
}
|
||||
|
||||
~bytes_ostream() {
|
||||
free_chain(_begin.ptr);
|
||||
}
|
||||
|
||||
bytes_ostream& operator=(const bytes_ostream& o) {
|
||||
if (this != &o) {
|
||||
auto x = bytes_ostream(o);
|
||||
@@ -243,8 +239,8 @@ public:
|
||||
|
||||
auto this_size = std::min(v.size(), size_t(current_space_left()));
|
||||
if (__builtin_expect(this_size, true)) {
|
||||
memcpy(_current->data + _current->offset, v.begin(), this_size);
|
||||
_current->offset += this_size;
|
||||
memcpy(_current->data + _current->frag_size, v.begin(), this_size);
|
||||
_current->frag_size += this_size;
|
||||
_size += this_size;
|
||||
v.remove_prefix(this_size);
|
||||
}
|
||||
@@ -287,19 +283,20 @@ public:
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
|
||||
auto new_chunk = std::unique_ptr<chunk>(new (space) chunk());
|
||||
new_chunk->offset = _size;
|
||||
new_chunk->size = _size;
|
||||
auto old_begin = _begin;
|
||||
auto new_chunk = new (space) chunk(&_begin, _size, _size);
|
||||
|
||||
auto dst = new_chunk->data;
|
||||
auto r = _begin.get();
|
||||
auto r = old_begin.ptr;
|
||||
while (r) {
|
||||
auto next = r->next.get();
|
||||
dst = std::copy_n(r->data, r->offset, dst);
|
||||
auto next = r->next;
|
||||
dst = std::copy_n(r->data, r->frag_size, dst);
|
||||
r->~chunk();
|
||||
::free(r);
|
||||
r = next;
|
||||
}
|
||||
|
||||
_current = new_chunk.get();
|
||||
_current = new_chunk;
|
||||
_begin = std::move(new_chunk);
|
||||
return bytes_view(_current->data, _size);
|
||||
}
|
||||
@@ -333,22 +330,23 @@ public:
|
||||
void remove_suffix(size_t n) {
|
||||
_size -= n;
|
||||
auto left = _size;
|
||||
auto current = _begin.get();
|
||||
auto current = _begin.ptr;
|
||||
while (current) {
|
||||
if (current->offset >= left) {
|
||||
current->offset = left;
|
||||
if (current->frag_size >= left) {
|
||||
current->frag_size = left;
|
||||
_current = current;
|
||||
current->next.reset();
|
||||
free_chain(current->next);
|
||||
current->next = nullptr;
|
||||
return;
|
||||
}
|
||||
left -= current->offset;
|
||||
current = current->next.get();
|
||||
left -= current->frag_size;
|
||||
current = current->next;
|
||||
}
|
||||
}
|
||||
|
||||
// begin() and end() form an input range to bytes_view representing fragments.
|
||||
// Any modification of this instance invalidates iterators.
|
||||
fragment_iterator begin() const { return { _begin.get() }; }
|
||||
fragment_iterator begin() const { return { _begin.ptr }; }
|
||||
fragment_iterator end() const { return { nullptr }; }
|
||||
|
||||
output_iterator write_begin() { return output_iterator(*this); }
|
||||
@@ -363,7 +361,7 @@ public:
|
||||
};
|
||||
|
||||
position pos() const {
|
||||
return { _current, _current ? _current->offset : 0 };
|
||||
return { _current, _current ? _current->frag_size : 0 };
|
||||
}
|
||||
|
||||
// Returns the amount of bytes written since given position.
|
||||
@@ -373,11 +371,11 @@ public:
|
||||
if (!c) {
|
||||
return _size;
|
||||
}
|
||||
size_type total = c->offset - pos._offset;
|
||||
c = c->next.get();
|
||||
size_type total = c->frag_size - pos._offset;
|
||||
c = c->next;
|
||||
while (c) {
|
||||
total += c->offset;
|
||||
c = c->next.get();
|
||||
total += c->frag_size;
|
||||
c = c->next;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
@@ -391,8 +389,9 @@ public:
|
||||
}
|
||||
_size -= written_since(pos);
|
||||
_current = pos._chunk;
|
||||
free_chain(_current->next);
|
||||
_current->next = nullptr;
|
||||
_current->offset = pos._offset;
|
||||
_current->frag_size = pos._offset;
|
||||
}
|
||||
|
||||
void reduce_chunk_count() {
|
||||
@@ -441,11 +440,23 @@ public:
|
||||
// the clear() calls then writes will not involve any memory allocations,
|
||||
// except for the first write made on this instance.
|
||||
void clear() {
|
||||
if (_begin) {
|
||||
_begin->offset = 0;
|
||||
if (_begin.ptr) {
|
||||
_begin.ptr->frag_size = 0;
|
||||
_size = 0;
|
||||
_current = _begin.get();
|
||||
_begin->next.reset();
|
||||
free_chain(_begin.ptr->next);
|
||||
_begin.ptr->next = nullptr;
|
||||
_current = _begin.ptr;
|
||||
}
|
||||
}
|
||||
|
||||
managed_bytes to_managed_bytes() && {
|
||||
if (_size) {
|
||||
_begin.ptr->size = _size;
|
||||
_current = nullptr;
|
||||
_size = 0;
|
||||
return managed_bytes(std::exchange(_begin.ptr, {}));
|
||||
} else {
|
||||
return managed_bytes();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -456,15 +467,17 @@ public:
|
||||
// the clear() calls then writes will not involve any memory allocations,
|
||||
// except for the first write made on this instance.
|
||||
future<> clear_gently() noexcept {
|
||||
if (!_begin) {
|
||||
if (!_begin.ptr) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
_begin->offset = 0;
|
||||
_begin->frag_size = 0;
|
||||
_current = _begin.ptr;
|
||||
_size = 0;
|
||||
return do_until([this] { return !_begin->next; }, [this] {
|
||||
// move next->next first to avoid it being recursively destroyed
|
||||
// in ~chunk when _begin->next is move-assigned.
|
||||
auto next = std::move(_begin->next->next);
|
||||
return do_until([this] { return !_begin.ptr->next; }, [this] {
|
||||
auto second_chunk = _begin.ptr->next;
|
||||
auto next = second_chunk->next;
|
||||
second_chunk->~chunk();
|
||||
::free(second_chunk);
|
||||
_begin->next = std::move(next);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
@@ -325,3 +325,12 @@ BOOST_AUTO_TEST_CASE(test_remove_suffix) {
|
||||
test(std::max(a, b), std::min(a, b));
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_conversion_to_managed_bytes) {
|
||||
bytes_ostream buf;
|
||||
append_sequence(buf, 1024);
|
||||
auto mb = std::move(buf).to_managed_bytes();
|
||||
bytes_ostream buf2;
|
||||
buf2.write(to_bytes(mb));
|
||||
assert_sequence(buf2, 1024);
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
#include <unordered_map>
|
||||
#include <type_traits>
|
||||
|
||||
class bytes_ostream;
|
||||
|
||||
template <mutable_view is_mutable_view>
|
||||
class managed_bytes_basic_view;
|
||||
using managed_bytes_view = managed_bytes_basic_view<mutable_view::no>;
|
||||
@@ -25,7 +27,7 @@ using managed_bytes_mutable_view = managed_bytes_basic_view<mutable_view::yes>;
|
||||
|
||||
struct blob_storage {
|
||||
struct [[gnu::packed]] ref_type {
|
||||
blob_storage* ptr;
|
||||
blob_storage* ptr = nullptr;
|
||||
|
||||
ref_type() {}
|
||||
ref_type(blob_storage* ptr) : ptr(ptr) {}
|
||||
@@ -72,6 +74,7 @@ struct blob_storage {
|
||||
|
||||
// A managed version of "bytes" (can be used with LSA).
|
||||
class managed_bytes {
|
||||
friend class bytes_ostream;
|
||||
static constexpr size_t max_inline_size = 15;
|
||||
struct small_blob {
|
||||
bytes_view::value_type data[max_inline_size];
|
||||
@@ -112,6 +115,11 @@ private:
|
||||
}
|
||||
std::unique_ptr<bytes_view::value_type[]> do_linearize_pure() const;
|
||||
|
||||
explicit managed_bytes(blob_storage* data) {
|
||||
_u.small.size = -1;
|
||||
_u.ptr.ptr = data;
|
||||
data->backref = &_u.ptr;
|
||||
}
|
||||
public:
|
||||
using size_type = blob_storage::size_type;
|
||||
struct initialized_later {};
|
||||
|
||||
Reference in New Issue
Block a user