From bbf82e80d02b97ea5d3f9b7fdfe2f65d4b1a4e36 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 19 Aug 2015 16:11:26 +0200 Subject: [PATCH] Commitlog: Allow skipping X bytes in commit log reader Also refactor reader into named methods for debugging sanity. --- db/commitlog/commitlog.cc | 247 ++++++++++++++++++++++---------------- db/commitlog/commitlog.hh | 4 +- 2 files changed, 147 insertions(+), 104 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index f504265ca6..5483246fd9 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -862,14 +862,14 @@ const db::commitlog::config& db::commitlog::active_config() const { } future, db::replay_position>> -db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next) { - return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next)](file f) { - return read_log_file(std::move(f), std::move(next)); +db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func next, position_type off) { + return engine().open_file_dma(filename, open_flags::ro).then([next = std::move(next), off](file f) { + return read_log_file(std::move(f), std::move(next), off); }); } subscription, db::replay_position> -db::commitlog::read_log_file(file f, commit_load_reader_func next) { +db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) { struct work { file f; stream, replay_position> s; @@ -878,111 +878,154 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next) { uint64_t id = 0; size_t pos = 0; size_t next = 0; + size_t start_off = 0; + size_t skip_to = 0; bool eof = false; bool header = true; - work(file f) : f(f), fin(make_file_input_stream(f)) {} + + work(file f, position_type o = 0) + : f(f), fin(make_file_input_stream(f)), start_off(o) { + } + work(work&&) = default; + + bool advance(const temporary_buffer& buf) { + pos += buf.size(); + if (buf.size() == 0) { + eof = true; + } + return !eof; + } + bool end_of_file() const { + return eof; + } + bool end_of_chunk() const { + return eof || next == pos; + } + future<> skip(size_t bytes) { + skip_to = pos + bytes; + return do_until([this] { return pos == skip_to || eof; }, [this, bytes] { + auto s = std::min(4096, skip_to - pos); + // should eof be an error here? + return fin.read_exactly(s).then([this](auto buf) { + this->advance(buf); + }); + }); + } + future<> read_header() { + return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer buf) { + advance(buf); + // Will throw if we got eof + data_input in(buf); + auto ver = in.read(); + auto id = in.read(); + auto checksum = in.read(); + + crc32_nbo crc; + crc.process(ver); + crc.process(id & 0xffffffff); + crc.process(id >> 32); + + auto cs = crc.checksum(); + if (cs != checksum) { + throw std::runtime_error("Checksum error in file header"); + } + + this->id = id; + this->next = 0; + + if (start_off > pos) { + return skip(start_off - pos); + } + return make_ready_future<>(); + }); + } + future<> read_chunk() { + return fin.read_exactly(segment::segment_overhead_size).then([this](temporary_buffer buf) { + auto start = pos; + + if (!advance(buf)) { + return make_ready_future<>(); + } + + data_input in(buf); + auto next = in.read(); + auto checksum = in.read(); + + crc32_nbo crc; + crc.process(id & 0xffffffff); + crc.process(id >> 32); + crc.process(start); + + auto cs = crc.checksum(); + if (cs != checksum) { + throw std::runtime_error("Checksum error in chunk header"); + } + + this->next = next; + + return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this)); + }); + } + future<> read_entry() { + static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t); + return fin.read_exactly(entry_header_size).then([this](temporary_buffer buf) { + replay_position rp(id, position_type(pos)); + + if (!advance(buf)) { + return make_ready_future<>(); + } + + data_input in(buf); + + auto size = in.read(); + auto checksum = in.read(); + + if (size == 0) { + // special urchin case: zero padding due to dma blocks + auto slack = next - pos; + return skip(slack); + } + + if (size < 3 * sizeof(uint32_t)) { + throw std::runtime_error("Invalid entry size"); + } + + return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer buf) { + advance(buf); + + data_input in(buf); + + auto data_size = size - segment::entry_overhead_size; + in.skip(data_size); + auto checksum = in.read(); + + crc32_nbo crc; + crc.process(size); + crc.process_bytes(buf.get(), data_size); + + if (crc.checksum() != checksum) { + throw std::runtime_error("Checksum error in data entry"); + } + + return s.produce(buf.share(0, data_size), rp); + }); + }); + } + future<> read_file() { + return read_header().then( + [this] { + return do_until(std::bind(&work::end_of_file, this), std::bind(&work::read_chunk, this)); + }); + } }; - auto w = make_lw_shared(std::move(f)); - + auto w = make_lw_shared(std::move(f), off); auto ret = w->s.listen(std::move(next)); - w->s.started().then([w] { - return w->fin.read_exactly(segment::descriptor_header_size).then([w](temporary_buffer buf) { - // Will throw if we got eof - data_input in(buf); - auto ver = in.read(); - auto id = in.read(); - auto checksum = in.read(); + w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] { + w->s.close(); + }); - crc32_nbo crc; - crc.process(ver); - crc.process(id & 0xffffffff); - crc.process(id >> 32); - - auto cs = crc.checksum(); - if (cs != checksum) { - throw std::runtime_error("Checksum error in file header"); - } - - w->id = id; - w->next = 0; - w->pos = buf.size(); - - auto eofcond = [w] { return w->eof; }; - return do_until(eofcond, [w] { - assert(w->pos == w->next || w->next == 0); - return w->fin.read_exactly(segment::segment_overhead_size).then([w](temporary_buffer buf) { - if (buf.size() == 0) { - w->eof = true; - return make_ready_future<>(); - } - - data_input in(buf); - auto next = in.read(); - auto checksum = in.read(); - - crc32_nbo crc; - crc.process(w->id & 0xffffffff); - crc.process(w->id >> 32); - crc.process(w->pos); - - auto cs = crc.checksum(); - if (cs != checksum) { - throw std::runtime_error("Checksum error in chunk header"); - } - - w->pos += 8; - w->next = next; - - auto eoccond = [w] { return w->pos == w->next; }; - return do_until(eoccond, [w] { - static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t); - return w->fin.read_exactly(entry_header_size).then([w](temporary_buffer buf) { - data_input in(buf); - - auto size = in.read(); - auto checksum = in.read(); - - if (size == 0) { - // special urchin case: zero padding due to dma blocks - auto slack = w->next - entry_header_size - w->pos; - return w->fin.read_exactly(slack).then([w, slack](temporary_buffer buf) { - // should eof be an error here? - w->pos += slack + entry_header_size; - }); - } - if (size < 3 * sizeof(uint32_t)) { - throw std::runtime_error("Invalid entry size"); - } - return w->fin.read_exactly(size - entry_header_size).then([w, size, checksum](temporary_buffer buf) { - data_input in(buf); - - auto data_size = size - segment::entry_overhead_size; - in.skip(data_size); - auto checksum = in.read(); - - crc32_nbo crc; - crc.process(size); - crc.process_bytes(buf.get(), data_size); - - if (crc.checksum() != checksum) { - throw std::runtime_error("Checksum error in data entry"); - } - - replay_position rp(w->id, position_type(w->pos)); - - w->pos += size; - return w->s.produce(buf.share(0, data_size), rp); - }); - }); - }); - }); - }); - }); - }).then([w] { - return w->s.close(); - }).finally([w] {}); // keep w alive for last close return ret; } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 9f775c55c5..5ca5a30024 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -192,8 +192,8 @@ public: typedef std::function(temporary_buffer, replay_position)> commit_load_reader_func; - static subscription, replay_position> read_log_file(file, commit_load_reader_func); - static future, replay_position>> read_log_file(const sstring&, commit_load_reader_func); + static subscription, replay_position> read_log_file(file, commit_load_reader_func, position_type = 0); + static future, replay_position>> read_log_file(const sstring&, commit_load_reader_func, position_type = 0); private: commitlog(config); };