Commitlog: Make file reader provide replay_position for entries

This commit is contained in:
Calle Wilund
2015-08-18 17:46:59 +02:00
parent db6370ad87
commit fcb87471b9
3 changed files with 14 additions and 9 deletions

View File

@@ -861,16 +861,18 @@ const db::commitlog::config& db::commitlog::active_config() const {
return _segment_manager->cfg;
}
future<subscription<temporary_buffer<char>>> db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next) {
future<subscription<temporary_buffer<char>, db::replay_position>>
db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next) {
return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next)](file f) {
return read_log_file(std::move(f), std::move(next));
});
}
subscription<temporary_buffer<char>> db::commitlog::read_log_file(file f, commit_load_reader_func next) {
subscription<temporary_buffer<char>, db::replay_position>
db::commitlog::read_log_file(file f, commit_load_reader_func next) {
struct work {
file f;
stream<temporary_buffer<char>> s;
stream<temporary_buffer<char>, replay_position> s;
input_stream<char> fin;
input_stream<char> r;
uint64_t id = 0;
@@ -967,8 +969,11 @@ subscription<temporary_buffer<char>> db::commitlog::read_log_file(file f, commit
if (crc.checksum() != checksum) {
throw std::runtime_error("Checksum error in data entry");
}
replay_position rp(w->id, position_type(w->pos));
w->pos += size;
return w->s.produce(buf.share(0, data_size));
return w->s.produce(buf.share(0, data_size), rp);
});
});
});