commitlog: introduce entry_writer

Current commitlog interface requires writers to specify the size of a
new entry which cannot depend on the segment to which the entry is
written.
If column mappings are going to be stored in the commitlog that's not
enough since we don't know whether column mapping needs to be written
until we known in which segment the entry is going to be stored.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-01-13 09:56:16 +01:00
parent 678bdd5c79
commit 9d74268234
2 changed files with 25 additions and 7 deletions

View File

@@ -587,8 +587,8 @@ public:
/**
* Add a "mutation" to the segment.
*/
future<replay_position> allocate(const cf_id_type& id, size_t size,
serializer_func func) {
future<replay_position> allocate(const cf_id_type& id, shared_ptr<entry_writer> writer) {
const auto size = writer->size(*this);
const auto s = size + entry_overhead_size; // total size
if (s > _segment_manager->max_mutation_size) {
return make_exception_future<replay_position>(
@@ -602,8 +602,8 @@ public:
if (position() + s > _segment_manager->max_size) {
// do this in next segment instead.
return finish_and_get_new().then(
[id, size, func = std::move(func)](auto new_seg) {
return new_seg->allocate(id, size, func);
[id, writer = std::move(writer)] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer));
});
}
// enough data?
@@ -634,7 +634,7 @@ public:
out.write(crc.checksum());
// actual data
func(out);
writer->write(*this, out);
crc.process_bytes(p + 2 * sizeof(uint32_t), size);
@@ -1128,8 +1128,21 @@ void db::commitlog::segment_manager::release_buffer(buffer_type&& b) {
*/
future<db::replay_position> db::commitlog::add(const cf_id_type& id,
size_t size, serializer_func func) {
return _segment_manager->active_segment().then([=](auto s) {
return s->allocate(id, size, std::move(func));
class serializer_func_entry_writer final : public entry_writer {
serializer_func _func;
size_t _size;
public:
serializer_func_entry_writer(size_t sz, serializer_func func)
: _func(std::move(func)), _size(sz)
{ }
virtual size_t size(segment&) override { return _size; }
virtual void write(segment&, output& out) override {
_func(out);
}
};
auto writer = ::make_shared<serializer_func_entry_writer>(size, std::move(func));
return _segment_manager->active_segment().then([id, writer] (auto s) {
return s->allocate(id, writer);
});
}