commitlog: use memory_output_stream

memory_output_stream deals with all required pointer arithmetic and
allows easy transition to fragmented buffers.
This commit is contained in:
Paweł Dziepak
2018-07-31 13:35:25 +01:00
parent b9ab058834
commit fe48aaae46
6 changed files with 66 additions and 59 deletions

View File

@@ -107,6 +107,11 @@ public:
void process_bytes(const char* data, size_t size) {
return _c.process(reinterpret_cast<const uint8_t*>(data), size);
}
template<typename FragmentedBuffer>
GCC6_CONCEPT(requires FragmentRange<FragmentedBuffer>)
void process_fragmented(const FragmentedBuffer& buffer) {
return _c.process_fragmented(buffer);
}
};
class db::cf_holder {
@@ -344,6 +349,12 @@ private:
uint64_t _new_counter = 0;
};
template<typename T>
static void write(seastar::simple_memory_output_stream& out, T value) {
auto v = net::hton(value);
out.write(reinterpret_cast<const char*>(&v), sizeof(v));
}
/*
* A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
@@ -398,7 +409,6 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
uint64_t _file_pos = 0;
uint64_t _flush_pos = 0;
uint64_t _buf_pos = 0;
bool _closed = false;
using buffer_type = segment_manager::buffer_type;
@@ -407,6 +417,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
using time_point = segment_manager::time_point;
buffer_type _buffer;
simple_memory_output_stream _buffer_ostream;
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
time_point _sync_time;
seastar::gate _gate;
@@ -420,6 +431,10 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
friend std::ostream& operator<<(std::ostream&, const segment&);
friend class segment_manager;
size_t buffer_position() const {
return _buffer.size() - _buffer_ostream.size();
}
future<> begin_flush() {
// This is maintaining the semantica of only using the write-lock
// as a gate for flushing, i.e. once we've begun a flush for position X
@@ -621,15 +636,15 @@ public:
throw;
}
}
_buf_pos = overhead;
auto * p = reinterpret_cast<uint32_t *>(_buffer.get_write());
std::fill(p, p + overhead, 0);
_buffer_ostream = seastar::simple_memory_output_stream(_buffer.get_write(), _buffer.size());
auto out = _buffer_ostream.write_substream(overhead);
out.fill('\0', overhead);
_segment_manager->totals.total_size += k;
}
bool buffer_is_empty() const {
return _buf_pos <= segment_overhead_size
|| (_file_pos == 0 && _buf_pos <= (segment_overhead_size + descriptor_header_size));
return buffer_position() <= segment_overhead_size
|| (_file_pos == 0 && buffer_position() <= (segment_overhead_size + descriptor_header_size));
}
/**
* Send any buffer contents to disk and get a new tmp buffer
@@ -647,29 +662,26 @@ public:
auto num = _num_allocs;
_file_pos = top;
_buf_pos = 0;
_buffer_ostream = { };
_num_allocs = 0;
auto me = shared_from_this();
assert(me.use_count() > 1);
auto * p = buf.get_write();
assert(std::count(p, p + 2 * sizeof(uint32_t), 0) == 2 * sizeof(uint32_t));
data_output out(p, p + buf.size());
auto out = seastar::simple_memory_output_stream(buf.get_write(), buf.size());
auto header_size = 0;
if (off == 0) {
// first block. write file header.
out.write(segment_magic);
out.write(_desc.ver);
out.write(_desc.id);
write(out, segment_magic);
write(out, _desc.ver);
write(out, _desc.id);
crc32_nbo crc;
crc.process(_desc.ver);
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
out.write(crc.checksum());
write(out, crc.checksum());
header_size = descriptor_header_size;
}
@@ -679,8 +691,8 @@ public:
crc.process<int32_t>(_desc.id >> 32);
crc.process(uint32_t(off + header_size));
out.write(uint32_t(_file_pos));
out.write(crc.checksum());
write(out, uint32_t(_file_pos));
write(out, crc.checksum());
forget_schema_versions();
@@ -690,7 +702,7 @@ public:
// The write will be allowed to start now, but flush (below) must wait for not only this,
// but all previous write/flush pairs.
return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable {
return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable { ///////////////////////////////////////////////////
auto written = make_lw_shared<size_t>(0);
auto p = buf.get();
return repeat([this, size, off, written, p]() mutable {
@@ -786,7 +798,7 @@ public:
return finish_and_get_new(timeout).then([id, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer), std::move(permit), timeout);
});
} else if (!_buffer.empty() && (s > (_buffer.size() - _buf_pos))) { // enough data?
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
@@ -805,7 +817,7 @@ public:
size_t buf_memory = s;
if (_buffer.empty()) {
new_buffer(s);
buf_memory += _buf_pos;
buf_memory += buffer_position();
}
_gate.enter(); // this might throw. I guess we accept this?
@@ -813,29 +825,24 @@ public:
_segment_manager->account_memory_usage(buf_memory);
replay_position rp(_desc.id, position());
auto pos = _buf_pos;
_buf_pos += s;
_cf_dirty[id]++; // increase use count for cf.
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
auto * p = _buffer.get_write() + pos;
auto * e = _buffer.get_write() + pos + s - sizeof(uint32_t);
data_output out(p, e);
auto out = _buffer_ostream.write_substream(s);
crc32_nbo crc;
out.write(uint32_t(s));
write<uint32_t>(out, s);
crc.process(uint32_t(s));
out.write(crc.checksum());
write<uint32_t>(out, crc.checksum());
// actual data
writer->write(*this, out);
auto entry_out = out.write_substream(size);
auto entry_data = entry_out.to_input_stream();
writer->write(*this, entry_out);
crc.process_fragmented(ser::buffer_view<std::vector<temporary_buffer<char>>::iterator>(entry_data));
crc.process_bytes(p + 2 * sizeof(uint32_t), size);
out = data_output(e, sizeof(uint32_t));
out.write(crc.checksum());
write<uint32_t>(out, crc.checksum());
++_segment_manager->totals.allocation_count;
++_num_allocs;
@@ -850,7 +857,7 @@ public:
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
// have to do it ourselves.
if ((_buf_pos >= (db::commitlog::segment::default_size))) {
if ((buffer_position() >= (db::commitlog::segment::default_size))) {
cycle().discard_result().handle_exception([] (auto ex) {
clogger.error("Failed to flush commits to disk: {}", ex);
});
@@ -860,7 +867,7 @@ public:
}
position_type position() const {
return position_type(_file_pos + _buf_pos);
return position_type(_file_pos + buffer_position());
}
size_t size_on_disk() const {
@@ -870,11 +877,12 @@ public:
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
// a.k.a. zero the tail.
size_t clear_buffer_slack() {
auto size = align_up(_buf_pos, alignment);
std::fill(_buffer.get_write() + _buf_pos, _buffer.get_write() + size,
0);
_segment_manager->totals.bytes_slack += (size - _buf_pos);
_segment_manager->account_memory_usage(size - _buf_pos);
auto buf_pos = buffer_position();
auto size = align_up(buf_pos, alignment);
auto fill_size = size - buf_pos;
_buffer_ostream.fill('\0', fill_size);
_segment_manager->totals.bytes_slack += fill_size;
_segment_manager->account_memory_usage(fill_size);
return size;
}
void mark_clean(const cf_id_type& id, uint64_t count) {