From fcb87471b97fa29c1e92be781bfcfb64847a54dc Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 18 Aug 2015 17:46:59 +0200 Subject: [PATCH] Commitlog: Make file reader provide replay_position for entries --- db/commitlog/commitlog.cc | 13 +++++++++---- db/commitlog/commitlog.hh | 6 +++--- tests/commitlog_test.cc | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 3293725ccf..72e6ee4849 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -861,16 +861,18 @@ const db::commitlog::config& db::commitlog::active_config() const { return _segment_manager->cfg; } -future>> db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next) { +future, db::replay_position>> +db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next) { return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next)](file f) { return read_log_file(std::move(f), std::move(next)); }); } -subscription> db::commitlog::read_log_file(file f, commit_load_reader_func next) { +subscription, db::replay_position> +db::commitlog::read_log_file(file f, commit_load_reader_func next) { struct work { file f; - stream> s; + stream, replay_position> s; input_stream fin; input_stream r; uint64_t id = 0; @@ -967,8 +969,11 @@ subscription> db::commitlog::read_log_file(file f, commit if (crc.checksum() != checksum) { throw std::runtime_error("Checksum error in data entry"); } + + replay_position rp(w->id, position_type(w->pos)); + w->pos += size; - return w->s.produce(buf.share(0, data_size)); + return w->s.produce(buf.share(0, data_size), rp); }); }); }); diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 54ee5360e9..986e606557 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -184,10 +184,10 @@ public: const config& active_config() const; - typedef std::function(temporary_buffer)> commit_load_reader_func; + typedef std::function(temporary_buffer, replay_position)> commit_load_reader_func; - static subscription> read_log_file(file, commit_load_reader_func); - static future>> read_log_file(const sstring&, commit_load_reader_func); + static subscription, replay_position> read_log_file(file, commit_load_reader_func); + static future, replay_position>> read_log_file(const sstring&, commit_load_reader_func); private: commitlog(config); }; diff --git a/tests/commitlog_test.cc b/tests/commitlog_test.cc index bf34088356..582d81c6ac 100644 --- a/tests/commitlog_test.cc +++ b/tests/commitlog_test.cc @@ -348,12 +348,12 @@ SEASTAR_TEST_CASE(test_commitlog_reader){ for (auto & de : l->contents()) { if (de.name == findme) { auto path = log->first.path + "/" + de.name; - return db::commitlog::read_log_file(path, [count2](temporary_buffer buf) { + return db::commitlog::read_log_file(path, [count2](temporary_buffer buf, db::replay_position rp) { sstring str(buf.get(), buf.size()); BOOST_CHECK_EQUAL(str, "hej bubba cow"); (*count2)++; return make_ready_future<>(); - }).then([log](subscription> s) { + }).then([log](auto s) { auto ss = make_lw_shared(std::move(s)); return ss->done().then([ss] {}); });