From 9d742682346bd0ea4bee797d5a46421d7771df1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Wed, 13 Jan 2016 09:56:16 +0100 Subject: [PATCH] commitlog: introduce entry_writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Current commitlog interface requires writers to specify the size of a new entry which cannot depend on the segment to which the entry is written. If column mappings are going to be stored in the commitlog that's not enough since we don't know whether column mapping needs to be written until we known in which segment the entry is going to be stored. Signed-off-by: Paweł Dziepak --- db/commitlog/commitlog.cc | 27 ++++++++++++++++++++------- db/commitlog/commitlog.hh | 5 +++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 9da70efaf7..c30e7033e4 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -587,8 +587,8 @@ public: /** * Add a "mutation" to the segment. */ - future allocate(const cf_id_type& id, size_t size, - serializer_func func) { + future allocate(const cf_id_type& id, shared_ptr writer) { + const auto size = writer->size(*this); const auto s = size + entry_overhead_size; // total size if (s > _segment_manager->max_mutation_size) { return make_exception_future( @@ -602,8 +602,8 @@ public: if (position() + s > _segment_manager->max_size) { // do this in next segment instead. return finish_and_get_new().then( - [id, size, func = std::move(func)](auto new_seg) { - return new_seg->allocate(id, size, func); + [id, writer = std::move(writer)] (auto new_seg) mutable { + return new_seg->allocate(id, std::move(writer)); }); } // enough data? @@ -634,7 +634,7 @@ public: out.write(crc.checksum()); // actual data - func(out); + writer->write(*this, out); crc.process_bytes(p + 2 * sizeof(uint32_t), size); @@ -1128,8 +1128,21 @@ void db::commitlog::segment_manager::release_buffer(buffer_type&& b) { */ future db::commitlog::add(const cf_id_type& id, size_t size, serializer_func func) { - return _segment_manager->active_segment().then([=](auto s) { - return s->allocate(id, size, std::move(func)); + class serializer_func_entry_writer final : public entry_writer { + serializer_func _func; + size_t _size; + public: + serializer_func_entry_writer(size_t sz, serializer_func func) + : _func(std::move(func)), _size(sz) + { } + virtual size_t size(segment&) override { return _size; } + virtual void write(segment&, output& out) override { + _func(out); + } + }; + auto writer = ::make_shared(size, std::move(func)); + return _segment_manager->active_segment().then([id, writer] (auto s) { + return s->allocate(id, writer); }); } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 5bd048d6a5..b277e2f4d5 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -283,6 +283,11 @@ public: const sstring&, commit_load_reader_func, position_type = 0); private: commitlog(config); + + struct entry_writer { + virtual size_t size(segment&) = 0; + virtual void write(segment&, output&) = 0; + }; }; }