From fe48aaae4626c2fa1282d0ca0d9d03dbe866aafe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 31 Jul 2018 13:35:25 +0100 Subject: [PATCH] commitlog: use memory_output_stream memory_output_stream deals with all required pointer arithmetic and allows easy transition to fragmented buffers. --- db/commitlog/commitlog.cc | 88 ++++++++++++++++++--------------- db/commitlog/commitlog.hh | 2 +- db/commitlog/commitlog_entry.cc | 5 +- db/commitlog/commitlog_entry.hh | 2 +- tests/commitlog_test.cc | 26 +++++----- tests/hint_test.cc | 2 +- 6 files changed, 66 insertions(+), 59 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index ad7346652b..1ce2b6b29d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -107,6 +107,11 @@ public: void process_bytes(const char* data, size_t size) { return _c.process(reinterpret_cast(data), size); } + template + GCC6_CONCEPT(requires FragmentRange) + void process_fragmented(const FragmentedBuffer& buffer) { + return _c.process_fragmented(buffer); + } }; class db::cf_holder { @@ -344,6 +349,12 @@ private: uint64_t _new_counter = 0; }; +template +static void write(seastar::simple_memory_output_stream& out, T value) { + auto v = net::hton(value); + out.write(reinterpret_cast(&v), sizeof(v)); +} + /* * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment @@ -398,7 +409,6 @@ class db::commitlog::segment : public enable_shared_from_this, public c uint64_t _file_pos = 0; uint64_t _flush_pos = 0; - uint64_t _buf_pos = 0; bool _closed = false; using buffer_type = segment_manager::buffer_type; @@ -407,6 +417,7 @@ class db::commitlog::segment : public enable_shared_from_this, public c using time_point = segment_manager::time_point; buffer_type _buffer; + simple_memory_output_stream _buffer_ostream; std::unordered_map _cf_dirty; time_point _sync_time; seastar::gate _gate; @@ -420,6 +431,10 @@ class db::commitlog::segment : public enable_shared_from_this, public c friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; + size_t buffer_position() const { + return _buffer.size() - _buffer_ostream.size(); + } + future<> begin_flush() { // This is maintaining the semantica of only using the write-lock // as a gate for flushing, i.e. once we've begun a flush for position X @@ -621,15 +636,15 @@ public: throw; } } - _buf_pos = overhead; - auto * p = reinterpret_cast(_buffer.get_write()); - std::fill(p, p + overhead, 0); + _buffer_ostream = seastar::simple_memory_output_stream(_buffer.get_write(), _buffer.size()); + auto out = _buffer_ostream.write_substream(overhead); + out.fill('\0', overhead); _segment_manager->totals.total_size += k; } bool buffer_is_empty() const { - return _buf_pos <= segment_overhead_size - || (_file_pos == 0 && _buf_pos <= (segment_overhead_size + descriptor_header_size)); + return buffer_position() <= segment_overhead_size + || (_file_pos == 0 && buffer_position() <= (segment_overhead_size + descriptor_header_size)); } /** * Send any buffer contents to disk and get a new tmp buffer @@ -647,29 +662,26 @@ public: auto num = _num_allocs; _file_pos = top; - _buf_pos = 0; + _buffer_ostream = { }; _num_allocs = 0; auto me = shared_from_this(); assert(me.use_count() > 1); - auto * p = buf.get_write(); - assert(std::count(p, p + 2 * sizeof(uint32_t), 0) == 2 * sizeof(uint32_t)); - - data_output out(p, p + buf.size()); + auto out = seastar::simple_memory_output_stream(buf.get_write(), buf.size()); auto header_size = 0; if (off == 0) { // first block. write file header. - out.write(segment_magic); - out.write(_desc.ver); - out.write(_desc.id); + write(out, segment_magic); + write(out, _desc.ver); + write(out, _desc.id); crc32_nbo crc; crc.process(_desc.ver); crc.process(_desc.id & 0xffffffff); crc.process(_desc.id >> 32); - out.write(crc.checksum()); + write(out, crc.checksum()); header_size = descriptor_header_size; } @@ -679,8 +691,8 @@ public: crc.process(_desc.id >> 32); crc.process(uint32_t(off + header_size)); - out.write(uint32_t(_file_pos)); - out.write(crc.checksum()); + write(out, uint32_t(_file_pos)); + write(out, crc.checksum()); forget_schema_versions(); @@ -690,7 +702,7 @@ public: // The write will be allowed to start now, but flush (below) must wait for not only this, // but all previous write/flush pairs. - return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable { + return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable { /////////////////////////////////////////////////// auto written = make_lw_shared(0); auto p = buf.get(); return repeat([this, size, off, written, p]() mutable { @@ -786,7 +798,7 @@ public: return finish_and_get_new(timeout).then([id, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable { return new_seg->allocate(id, std::move(writer), std::move(permit), timeout); }); - } else if (!_buffer.empty() && (s > (_buffer.size() - _buf_pos))) { // enough data? + } else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data? if (_segment_manager->cfg.mode == sync_mode::BATCH) { // TODO: this could cause starvation if we're really unlucky. // If we run batch mode and find ourselves not fit in a non-empty @@ -805,7 +817,7 @@ public: size_t buf_memory = s; if (_buffer.empty()) { new_buffer(s); - buf_memory += _buf_pos; + buf_memory += buffer_position(); } _gate.enter(); // this might throw. I guess we accept this? @@ -813,29 +825,24 @@ public: _segment_manager->account_memory_usage(buf_memory); replay_position rp(_desc.id, position()); - auto pos = _buf_pos; - _buf_pos += s; _cf_dirty[id]++; // increase use count for cf. rp_handle h(static_pointer_cast(shared_from_this()), std::move(id), rp); - auto * p = _buffer.get_write() + pos; - auto * e = _buffer.get_write() + pos + s - sizeof(uint32_t); - - data_output out(p, e); + auto out = _buffer_ostream.write_substream(s); crc32_nbo crc; - out.write(uint32_t(s)); + write(out, s); crc.process(uint32_t(s)); - out.write(crc.checksum()); + write(out, crc.checksum()); // actual data - writer->write(*this, out); + auto entry_out = out.write_substream(size); + auto entry_data = entry_out.to_input_stream(); + writer->write(*this, entry_out); + crc.process_fragmented(ser::buffer_view>::iterator>(entry_data)); - crc.process_bytes(p + 2 * sizeof(uint32_t), size); - - out = data_output(e, sizeof(uint32_t)); - out.write(crc.checksum()); + write(out, crc.checksum()); ++_segment_manager->totals.allocation_count; ++_num_allocs; @@ -850,7 +857,7 @@ public: // If this buffer alone is too big, potentially bigger than the maximum allowed size, // then no other request will be allowed in to force the cycle()ing of this buffer. We // have to do it ourselves. - if ((_buf_pos >= (db::commitlog::segment::default_size))) { + if ((buffer_position() >= (db::commitlog::segment::default_size))) { cycle().discard_result().handle_exception([] (auto ex) { clogger.error("Failed to flush commits to disk: {}", ex); }); @@ -860,7 +867,7 @@ public: } position_type position() const { - return position_type(_file_pos + _buf_pos); + return position_type(_file_pos + buffer_position()); } size_t size_on_disk() const { @@ -870,11 +877,12 @@ public: // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded // a.k.a. zero the tail. size_t clear_buffer_slack() { - auto size = align_up(_buf_pos, alignment); - std::fill(_buffer.get_write() + _buf_pos, _buffer.get_write() + size, - 0); - _segment_manager->totals.bytes_slack += (size - _buf_pos); - _segment_manager->account_memory_usage(size - _buf_pos); + auto buf_pos = buffer_position(); + auto size = align_up(buf_pos, alignment); + auto fill_size = size - buf_pos; + _buffer_ostream.fill('\0', fill_size); + _segment_manager->totals.bytes_slack += fill_size; + _segment_manager->account_memory_usage(fill_size); return size; } void mark_clean(const cf_id_type& id, uint64_t count) { diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index cac58b7092..aac06ef73d 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -176,7 +176,7 @@ public: * of data to be written. (See add). * Don't write less, absolutely don't write more... */ - using output = data_output; + using output = typename seastar::memory_output_stream>::const_iterator>::simple; using serializer_func = std::function; /** diff --git a/db/commitlog/commitlog_entry.cc b/db/commitlog/commitlog_entry.cc index a7423709f2..ddd6f0a678 100644 --- a/db/commitlog/commitlog_entry.cc +++ b/db/commitlog/commitlog_entry.cc @@ -51,9 +51,8 @@ void commitlog_entry_writer::compute_size() { _size = ms.size(); } -void commitlog_entry_writer::write(data_output& out) const { - seastar::simple_output_stream str(out.reserve(size()), size()); - serialize(str); +void commitlog_entry_writer::write(typename seastar::memory_output_stream>::const_iterator>::simple& out) const { + serialize(out); } commitlog_entry_reader::commitlog_entry_reader(const temporary_buffer& buffer) diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index fc23f25805..71af077409 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -72,7 +72,7 @@ public: return _mutation.representation().size(); } - void write(data_output& out) const; + void write(typename seastar::memory_output_stream>::const_iterator>::simple& out) const; }; class commitlog_entry_reader { diff --git a/tests/commitlog_test.cc b/tests/commitlog_test.cc index 7ba3fd2bd5..4d2b3e9cea 100644 --- a/tests/commitlog_test.cc +++ b/tests/commitlog_test.cc @@ -78,7 +78,7 @@ SEASTAR_TEST_CASE(test_create_commitlog){ return cl_test([](commitlog& log) { sstring tmp = "hej bubba cow"; return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([](db::replay_position rp) { BOOST_CHECK_NE(rp, db::replay_position()); }); @@ -92,7 +92,7 @@ SEASTAR_TEST_CASE(test_commitlog_written_to_disk_batch){ return cl_test(cfg, [](commitlog& log) { sstring tmp = "hej bubba cow"; return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log](replay_position rp) { BOOST_CHECK_NE(rp, db::replay_position()); auto n = log.get_flush_count(); @@ -109,7 +109,7 @@ SEASTAR_TEST_CASE(test_commitlog_written_to_disk_periodic){ [&log, state, uuid]() { sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log, state](replay_position rp) { BOOST_CHECK_NE(rp, db::replay_position()); auto n = log.get_flush_count(); @@ -129,7 +129,7 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment){ return do_until([&set]() { return set.size() > 1; }, [&log, &set, uuid]() { sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&set](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); set.put(std::move(h)); @@ -187,7 +187,7 @@ SEASTAR_TEST_CASE(test_commitlog_discard_completed_segments){ sstring tmp = "hej bubba cow"; auto uuid = state->next_uuid(); return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([state, uuid](db::rp_handle h) { state->rps[uuid].put(std::move(h)); }); @@ -223,7 +223,7 @@ SEASTAR_TEST_CASE(test_equal_record_limit){ return cl_test([](commitlog& log) { auto size = log.max_record_size(); return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) { - dst.write(char(1), size); + dst.fill(char(1), size); }).then([](db::replay_position rp) { BOOST_CHECK_NE(rp, db::replay_position()); }); @@ -234,7 +234,7 @@ SEASTAR_TEST_CASE(test_exceed_record_limit){ return cl_test([](commitlog& log) { auto size = log.max_record_size() + 1; return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) { - dst.write(char(1), size); + dst.fill(char(1), size); }).then_wrapped([](future f) { try { f.get(); @@ -269,7 +269,7 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) { [&log, set, uuid]() { sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([set](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); set->insert(h.release().id); @@ -314,7 +314,7 @@ SEASTAR_TEST_CASE(test_commitlog_reader){ [&log, uuid, count, set]() { sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log, set, count](auto h) { BOOST_CHECK_NE(db::replay_position(), h.rp()); set->put(std::move(h)); @@ -380,7 +380,7 @@ SEASTAR_TEST_CASE(test_commitlog_entry_corruption){ auto uuid = utils::UUID_gen::get_time_UUID(); sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log, rps](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); rps->push_back(h.release()); @@ -423,7 +423,7 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ auto uuid = utils::UUID_gen::get_time_UUID(); sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log, rps](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); rps->push_back(h.release()); @@ -466,7 +466,7 @@ SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){ auto uuid = utils::UUID_gen::get_time_UUID(); sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&log, rps](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); rps->push_back(h.release()); @@ -529,7 +529,7 @@ SEASTAR_TEST_CASE(test_allocation_failure){ } catch (std::bad_alloc&) { } return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) { - dst.write(char(1), size); + dst.fill(char(1), size); }).then_wrapped([junk](future f) { try { f.get(); diff --git a/tests/hint_test.cc b/tests/hint_test.cc index a6bc1a5bb7..3183ed806d 100644 --- a/tests/hint_test.cc +++ b/tests/hint_test.cc @@ -73,7 +73,7 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment_custom_prefix){ return do_until([&set]() { return set.size() > 1; }, [&log, &set, uuid]() { sstring tmp = "hej bubba cow"; return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) { - dst.write(tmp.begin(), tmp.end()); + dst.write(tmp.data(), tmp.size()); }).then([&set](rp_handle h) { BOOST_CHECK_NE(h.rp(), db::replay_position()); set.put(std::move(h));