diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index cd52151098..85388a94a8 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -94,7 +94,7 @@ public: class db::cf_holder { public: virtual ~cf_holder() {}; - virtual void release_cf_count(const cf_id_type&) = 0; + virtual void release_cf_count(const cf_id_type&, const replay_position&) = 0; }; db::commitlog::config db::commitlog::config::from_db_config(const db::config& cfg, seastar::scheduling_group sg, size_t shard_available_memory) { @@ -186,6 +186,7 @@ db::commitlog::descriptor::operator db::replay_position() const { struct db::commitlog::entry_writer { force_sync sync; size_t num_entries; + bool fragmented = false; explicit entry_writer(force_sync fs, size_t ne = 1) : sync(fs) @@ -218,6 +219,19 @@ struct db::commitlog::entry_writer { */ virtual size_t size(segment&, size_t) = 0; + /** return the entry fragment offset for n:th entry (only called if `fragmented` is set) */ + virtual size_t frag_offset(size_t) const { + return 0; + } + /** return the entry fragment remaining for n:th entry (only called if `fragmented` is set) */ + virtual size_t frag_remaining(size_t) const { + return 0; + } + /** return the entry fragment ID for n:th entry (only called if `fragmented` is set) */ + virtual uint32_t frag_sequence_id(size_t) const { + return 0; + } + /* write nth entry */ virtual void write(segment&, output&, size_t) const = 0; @@ -327,6 +341,8 @@ public: requires std::derived_from && std::same_as().result())> future allocate_when_possible(T writer, db::timeout_clock::time_point timeout); + future<> oversized_allocation(entry_writer&, db::timeout_clock::time_point timeout); + replay_position min_position(); template @@ -422,15 +438,6 @@ public: return ++_ids; } - void sanity_check_size(size_t size) { - if (size > max_mutation_size) { - throw std::invalid_argument( - "Mutation of " + std::to_string(size) - + " bytes is too large for the maximum size of " - + std::to_string(max_mutation_size)); - } - } - future<> init(); future new_segment(); future active_segment(db::timeout_clock::time_point timeout); @@ -516,6 +523,7 @@ private: future<> _background_sync; seastar::gate _gate; uint64_t _new_counter = 0; + uint32_t _frag_id_counter = 0; std::optional _disk_write_alignment; future<> _pending_deletes = make_ready_future<>(); }; @@ -640,10 +648,14 @@ detail::sector_split_iterator::sector_split_iterator() {} detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size) + : sector_split_iterator(i, e, sector_size, detail::sector_overhead_size) +{} + +detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead) : _iter(i) , _end(e) , _ptr(i != e ? const_cast(i->get()) : nullptr) - , _size(i != e ? sector_size - sector_overhead_size : 0) + , _size(i != e ? sector_size - overhead : 0) , _sector_size(sector_size) {} @@ -747,6 +759,7 @@ class db::commitlog::segment : public enable_shared_from_this, public c size_t _buffer_ostream_size = 0; std::unordered_map _cf_dirty; std::unordered_map _cf_min_time; + std::unordered_multimap _extended_segments; time_point _sync_time; utils::flush_queue, clock_type> _pending_ops; @@ -802,10 +815,12 @@ public: // The commit log entry overhead in bytes (int: length + int: head checksum) static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t); + static constexpr size_t fragmented_entry_overhead_size = 4 * sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t); static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; static constexpr uint32_t multi_entry_size_magic = 0xffffffff; + static constexpr uint32_t fragmented_entry_size_magic = 0xfffffffe; // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position]) static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t); @@ -833,6 +848,9 @@ public: mode = dispose_mode::Delete; } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { clogger.warn("Segment {} is dirty and is left on disk.", *this); + for (auto& [rp, h] : _extended_segments) { + h.release(); // do not clear out sequential seqments either. + } } _segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes(); @@ -856,12 +874,16 @@ public: _known_schema_versions.clear(); } - void release_cf_count(const cf_id_type& cf) override { + void release_cf_count(const cf_id_type& cf) { mark_clean(cf, 1); if (can_delete()) { _segment_manager->discard_unused_segments(); } } + void release_cf_count(const cf_id_type& cf, const replay_position& rp) override { + _extended_segments.erase(rp); + release_cf_count(cf); + } bool must_sync() { if (_segment_manager->cfg.mode == sync_mode::BATCH) { @@ -961,12 +983,15 @@ public: return make_ready_future(shared_from_this()); } future close() { - _closed = true; + auto closing = !std::exchange(_closed, true); auto s = co_await sync(); co_await flush(); co_await terminate(); - _waste = _file.known_size() - file_position(); - _segment_manager->totals.wasted_size_on_disk += _waste; + if (closing) { + // only update this if we are the closers. + _waste = _file.known_size() - file_position(); + _segment_manager->totals.wasted_size_on_disk += _waste; + } co_return s; } future do_flush(uint64_t pos) { @@ -1144,10 +1169,11 @@ public: co_return; } + auto file_size = _file.known_size(); auto finally = defer([&] () noexcept { _segment_manager->notify_memory_written(size); _segment_manager->totals.buffer_list_bytes -= buf.size_bytes(); - if (_file.known_size() < _file_pos) { + if (file_size < _file_pos) { _segment_manager->totals.total_size_on_disk += (_file_pos - _file.known_size()); } }); @@ -1244,8 +1270,16 @@ public: must_sync, no_space, ok_need_batch_sync, + too_large, }; + size_t writer_size(entry_writer& writer, size_t size) const { + return size + writer.num_entries * entry_overhead_size + + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u) + + (writer.fragmented ? writer.num_entries * fragmented_entry_overhead_size : 0u) + ; // total size + } + /** * Add a "mutation" to the segment. * Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not @@ -1257,9 +1291,11 @@ public: } const auto size = writer.size(*this); - const auto s = size + writer.num_entries * entry_overhead_size + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u); // total size + const auto s = writer_size(writer, size); // total size - _segment_manager->sanity_check_size(s); + if (s > _segment_manager->max_mutation_size) { + return write_result::too_large; + } if (!is_still_allocating() || next_position(s) > _segment_manager->max_size) { // would we make the file too big? return write_result::no_space; @@ -1316,8 +1352,26 @@ public: crc32_nbo crc; - write(out, es); - crc.process(uint32_t(es)); + if (writer.fragmented) { + auto off = uint32_t(writer.frag_offset(entry)); + auto rem = uint32_t(writer.frag_remaining(entry)); + auto id = writer.frag_sequence_id(entry); + es += fragmented_entry_overhead_size; + write(out, fragmented_entry_size_magic); + write(out, es); + write(out, id); + write(out, off); + write(out, rem); + crc.process(uint32_t(fragmented_entry_size_magic)); + crc.process(uint32_t(es)); + crc.process(uint32_t(id)); + crc.process(uint32_t(off)); + crc.process(uint32_t(rem)); + } else { + write(out, es); + crc.process(uint32_t(es)); + } + write(out, crc.checksum()); // actual data @@ -1354,6 +1408,11 @@ public: position_type position() const { return position_type(_file_pos + buffer_position()); } + position_type available() const { + auto pos = position(); + auto lim = _segment_manager->cfg.commitlog_segment_size_in_mb*1024*1024; + return pos < lim ? lim - pos : 0; + } position_type next_position(size_t size) const { auto used = _buffer_ostream_size - _buffer_ostream.size(); @@ -1365,6 +1424,15 @@ public: return _file_pos; } + void reset_file_position(size_t file_pos) { + clogger.trace("{}: set file position to {}", fmt::streamed(*this), file_pos); + assert(_flush_pos >= file_pos); + _file_pos = file_pos; + _flush_pos = file_pos; + _buffer = {}; + _closed = false; + } + // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded // a.k.a. zero the tail. size_t clear_buffer_slack() { @@ -1435,12 +1503,86 @@ template requires std::derived_from && std::same_as().result())> future db::commitlog::segment_manager::allocate_when_possible(T writer, db::timeout_clock::time_point timeout) { auto size = writer.size(); - // If this is already too big now, we should throw early. It's also a correctness issue, since - // if we are too big at this moment we'll never reach allocate() to actually throw at that - // point. - sanity_check_size(size); + // If this is already too big now, we should fall back early. This measurement does not count + // overhead into the estimate, i.e. it might be worse. + if (size < max_mutation_size) { + auto fut = get_units(_request_controller, size, timeout); + if (_request_controller.waiters()) { + totals.requests_blocked_memory++; + } - auto fut = get_units(_request_controller, size, timeout); + scope_increment_counter allocating(totals.active_allocations); + + auto permit = co_await std::move(fut); + sseg_ptr s; + + if (!_segments.empty() && _segments.back()->is_still_allocating()) { + s = _segments.back(); + } else { + s = co_await active_segment(timeout); + } + + bool retry = true; + + while (retry) { + using write_result = segment::write_result; + + switch (s->allocate(writer, permit, timeout)) { + case write_result::ok: + co_return writer.result(); + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + s = co_await s->finish_and_get_new(timeout); + continue; + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + co_return writer.result(); + case write_result::too_large: + retry = false; // retry oversized + break; + } + } + } + + if (!cfg.allow_fragmented_entries) { + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size)); + } + + // really slow path, trying to fit huge thingamabobs... + co_await oversized_allocation(writer, timeout); + co_return writer.result(); +} + +future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writer, db::timeout_clock::time_point timeout) { + clogger.debug("Attempting oversized alloc of {} entry writer", writer.num_entries); + + auto size = writer.size(); + auto max_file_size = cfg.commitlog_segment_size_in_mb * 1024 * 1024; + + // check if this cannot be written at all... + if (!cfg.allow_going_over_size_limit) { + auto sector_size = _segments.empty() ? 512 /* worst case */ : _segments.front()->_alignment; + auto size_with_sector_overhead = size + (1 + size/sector_size) * detail::sector_overhead_size; + // more worst case + auto size_with_meta_overhead = size_with_sector_overhead + + (1 + size_with_sector_overhead/max_mutation_size) * (segment::entry_overhead_size + segment::fragmented_entry_overhead_size + segment::segment_overhead_size) + * (1 + size_with_sector_overhead/max_file_size) * segment::descriptor_header_size + ; + // this is not really true. We could have some space in current segment, + // but again, lets be conservative. + auto max_file_size_avail = max_disk_size - max_file_size; + + if (size_with_meta_overhead > max_file_size_avail) { + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for potentially available disk space of {}", size, max_file_size_avail)); + } + } + + std::vector> maybe_clear; + + assert(_request_controller.available_units() <= ssize_t(max_request_controller_units())); + auto fut = get_units(_request_controller, max_request_controller_units(), timeout); if (_request_controller.waiters()) { totals.requests_blocked_memory++; } @@ -1448,31 +1590,280 @@ future db::commitlog::segment_manager::allocate_when_possible(T writer, db::t scope_increment_counter allocating(totals.active_allocations); auto permit = co_await std::move(fut); - sseg_ptr s; + assert(_request_controller.available_units() == 0); - if (!_segments.empty() && _segments.back()->is_still_allocating()) { - s = _segments.back(); - } else { - s = co_await active_segment(timeout); + decltype(permit) fake_permit; // can't have allocate+sync release semaphore. + bool failed = false; + std::exception_ptr e; + + try { + for (size_t i = 0; i < writer.num_entries && !failed; ++i) { + using frag_ostream_type = segment::frag_ostream_type; + using base_ostream_type = segment::base_ostream_type; + + struct partial_writer : public entry_writer { + entry_writer& _writer; + size_t _index; + size_t _size; + mutable typename buffer_type::view _buffer; + size_t _rem; + size_t _off; + uint32_t _id; + replay_position _rp; + rp_handle _h; + + partial_writer(entry_writer& w, size_t i, size_t size) + : entry_writer(w.sync) + , _writer(w) + , _index(i) + , _size(size) + , _rem(0) + , _off(0) + , _id(0) + {} + partial_writer(entry_writer& w, size_t i, size_t size, typename buffer_type::view buffer, size_t rem, size_t off, uint32_t id) + : entry_writer(w.sync) + , _writer(w) + , _index(i) + , _size(size) + , _buffer(buffer) + , _rem(rem) + , _off(off) + , _id(id) + { + this->fragmented = true; + } + const cf_id_type& id(size_t) const override { + return _writer.id(_index); + } + size_t size() const override { + return _size; + } + size_t size(segment&) override { + return _size; + } + size_t size(segment&, size_t) override { + return _size; + } + void write(segment& seg, output& out, size_t) const override { + if (_id == 0) { + _writer.write(seg, out, _index); + return; + } + while (!_buffer.empty()) { + auto buf = _buffer.current_fragment(); + out.write(reinterpret_cast(buf.data()), buf.size()); + _buffer.remove_current(); + } + } + size_t frag_remaining(size_t) const override { + return _rem; + } + size_t frag_offset(size_t) const override { + return _off; + } + uint32_t frag_sequence_id(size_t) const override { + return _id; + } + + void result(size_t, rp_handle h) override { + if (_off == 0) { + _rp = h; + _writer.result(_index, std::move(h)); + } else { + _h = std::move(h); + } + } + }; + + auto get_segment = [&]() -> future { + sseg_ptr s = co_await active_segment(timeout); + if (maybe_clear.empty() || maybe_clear.back().first.get() != s.get()) { + if (s->position() > (segment::segment_overhead_size + segment::descriptor_header_size)) { + co_await s->sync(); // ensure file pos == restartable. + } + maybe_clear.emplace_back(s, s->file_position()); + } + co_return s; + }; + + sseg_ptr s = co_await get_segment(); + + clogger.trace("Writing entry {} of {}", i, writer.num_entries); + + using write_result = segment::write_result; + + size_t data_size; + bool wrote_entry = false; + + // if we are a multi-entry write, parts of it might be + // small enough for "normal" write path. + for (;;) { + data_size = writer.size(*s, i); + + if (s->writer_size(writer, data_size) >= max_mutation_size) { + break; + } + + partial_writer pw(writer, i, data_size); + + switch (s->allocate(pw, fake_permit, timeout)) { + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + [[fallthrough]]; + case write_result::ok: + wrote_entry = true; + break; + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + co_await s->close(); + s = co_await get_segment(); + continue; + case write_result::too_large: + break; + } + + break; + + } + + if (wrote_entry) { + clogger.debug("oversized wrote sub-entry {}, {} bytes", i, data_size); + continue; + } + + auto align = s->_alignment; + auto sector_size = align - detail::sector_overhead_size; + + auto buffer = acquire_buffer(data_size, align); + { + base_ostream_type buffer_ostream = frag_ostream_type(detail::sector_split_iterator(buffer.begin(), buffer.end(), align, 0), buffer.size_bytes()); + writer.write(*s, buffer_ostream, i); + } + auto strm = buffer.get_istream(); + size_t off = 0; + auto id = ++_frag_id_counter; + sseg_ptr seg_ptr = nullptr; + while (off < data_size && !failed) { + // do this each lap, since we might fill a segment up. + if (!s->is_still_allocating()) { + co_await s->close(); + s = co_await get_segment(); + } + // bytes not counting overhead + auto buf_rem = std::min(max_size - s->position(), s->_buffer_ostream.size()); + + size_t avail; + if (buf_rem > align) { + auto rem2 = buf_rem - (1 + buf_rem/sector_size) * detail::sector_overhead_size; + avail = std::min(rem2, max_mutation_size) + - segment::entry_overhead_size + - segment::fragmented_entry_overhead_size + ; + assert(avail < buf_rem); + } else { + co_await s->cycle(); + auto pos = s->position(); + auto max = std::max(pos, max_file_size); + auto file_rem = max - pos; + + if (file_rem < align) { + co_await s->close(); + continue; + } + + auto rem2 = file_rem - (1 + file_rem/sector_size) * detail::sector_overhead_size; + avail = std::min(rem2, max_mutation_size) + - segment::entry_overhead_size + - segment::fragmented_entry_overhead_size + - (pos == 0 ? segment::descriptor_header_size : 0) + - segment::segment_overhead_size + ; + } + if (!seg_ptr) { + seg_ptr = s; + } + + auto max_write = data_size - off; + auto to_write = std::min(avail, max_write); + auto rem = max_write - to_write; + partial_writer pw(writer, i, to_write, strm.read_view(to_write), rem, off, id); + + switch (s->allocate(pw, fake_permit, timeout)) { + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + [[fallthrough]]; + case write_result::ok: + break; + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + [[fallthrough]]; + case write_result::too_large: + assert(0); // should not reach + failed = true; + break; + } + + if (!failed) { + clogger.debug("oversized wrote sub-entry {} fragment, id={} off={}, size={}, {} of {} bytes", i, id, off, to_write, off + to_write, data_size); + + off += to_write; + if (s != seg_ptr) { + // make first segment keep track of dependent data in + // latter segments. Note, if we fail, all primary + // rp_handles will expire, and we will free + // the extended data as well. See release_cf_count. + // Note also: In "normal" usage, we will not release + // the references to other segments until the _whole_ + // segment is clean. This is intentional, because if + // something survives, the segment survives and might + // be replayed, in which case we want to be able to + // reconstruct a fragmented entry, if for no other + // reason to be able to clear its state (see replay_state). + seg_ptr->_extended_segments.emplace(pw._rp, std::move(pw._h)); + } + } + } + } + } catch (...) { + e = std::current_exception(); + failed = true; + } + // ensure all segments we used are fully flushed. + // both to be able to undo, but also to restore all + // byte usage counts. + for (auto [s, fp] : maybe_clear) { + co_await s->sync(); } - for (;;) { - using write_result = segment::write_result; - - switch (s->allocate(writer, permit, timeout)) { - case write_result::ok: - co_return writer.result(); - case write_result::must_sync: - s = co_await with_timeout(timeout, s->sync()); - continue; - case write_result::no_space: - s = co_await s->finish_and_get_new(timeout); - continue; - case write_result::ok_need_batch_sync: - s = co_await s->batch_cycle(timeout); - co_return writer.result(); + if (failed) { + clogger.debug("Oversized allocation failed. Rolling back..."); + // reset file positions. + for (auto [s, fp] : maybe_clear) { + s->reset_file_position(fp); + if (fp == 0) { + s->mark_clean(); + _segments.erase(std::remove(_segments.begin(), _segments.end(), s), _segments.end()); + } } } + assert(_request_controller.available_units() == 0); + + permit.return_all(); + assert(_request_controller.available_units() == ssize_t(max_request_controller_units())); + + if (!failed) { + clogger.trace("Oversized allocation succeeded."); + co_return; + } + if (e) { + std::rethrow_exception(e); + } + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size)); } const size_t db::commitlog::segment::default_size; @@ -1498,7 +1889,7 @@ db::commitlog::segment_manager::segment_manager(config c) return cfg; }()) , max_size(std::min(std::numeric_limits::max() / (1024 * 1024), std::max(cfg.commitlog_segment_size_in_mb, 1)) * 1024 * 1024) - , max_mutation_size(max_size >> 1) + , max_mutation_size(max_size >> 1) // note: can't up this by much, because we don't know the CRC sector overhead addition before we've actually opened each segment. , max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024) // our threshold for trying to force a flush. needs heristics, for now max - segment_size/2. , disk_usage_threshold([&] { @@ -1558,6 +1949,7 @@ future<> db::commitlog::segment_manager::replenish_reserve() { } continue; } catch (shutdown_marker&) { + _reserve_segments.abort(std::current_exception()); break; } catch (...) { clogger.warn("Exception in segment reservation: {}", std::current_exception()); @@ -2073,9 +2465,13 @@ future db::commitlog::segment_manager: if (!_segment_allocating) { auto f = new_segment(); // must check that we are not already done. - if (f.available()) { - f.get(); // maybe force exception - continue; + try { + if (f.available()) { + f.get(); // maybe force exception + continue; + } + } catch (shutdown_marker&) { + continue; // force new exception } _segment_allocating.emplace(f.discard_result().finally([this] { // clear the shared_future _before_ resolving its contents @@ -2675,6 +3071,7 @@ db::commitlog::add_entries(std::vector entry_writers, db class cl_entries_writer final : public entry_writer { std::vector _writers; std::unordered_set _known; + const segment* _sizes_computed = nullptr; public: std::vector res; @@ -2699,10 +3096,15 @@ db::commitlog::add_entries(std::vector entry_writers, db i->set_with_schema(!known); res += i->size(); } + _sizes_computed = &seg; return res; } size_t size(segment& seg, size_t i) override { - return _writers.at(i).size(); // we have already set schema known/unknown + auto& w = _writers.at(i); + if (_sizes_computed != &seg) { + w.set_with_schema(seg.is_schema_version_known(w.schema())); + } + return w.size(); } size_t size() const override { return std::accumulate(_writers.begin(), _writers.end(), size_t(0), [](size_t acc, const commitlog_entry_writer& w) { @@ -2842,10 +3244,36 @@ const char* db::commitlog::segment_truncation::what() const noexcept { return _msg.c_str(); } +class db::commitlog::replay_state::impl { +public: + struct entry_fragment { + size_t offset; + size_t rem; + size_t end; + buffer_and_replay_position rpbuf; + }; + + // mapping fragment ID -> state (i.e. data collected so far) + std::unordered_map> + fragment_state; +}; + +db::commitlog::replay_state::replay_state() + : _impl(std::make_unique()) +{} + +db::commitlog::replay_state::~replay_state() +{} + +future<> +db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { + co_await read_log_file(replay_state{}, std::move(filename), std::move(pfx), std::move(next), off, exts); +} + // No commit_io_check needed in the log reader since the database will fail // on error at startup if required future<> -db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { +db::commitlog::read_log_file(const replay_state& state, sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { struct work { private: file_input_stream_options make_file_input_stream_options() { @@ -2859,6 +3287,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f descriptor d; commit_load_reader_func func; input_stream fin; + replay_state::impl& state; input_stream r; uint64_t id = 0; size_t pos = 0; @@ -2873,8 +3302,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f fragmented_temporary_buffer::reader frag_reader; fragmented_temporary_buffer buffer, initial; - work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0) - : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) { + work(file f, descriptor din, commit_load_reader_func fn, replay_state::impl& sn, position_type o = 0) + : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), state(sn), start_off(o) { } work(work&&) = default; @@ -3181,7 +3610,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f // check for multi-entry if (size == segment::multi_entry_size_magic) { auto actual_size = checksum; - auto end = pos + actual_size - entry_header_size - sizeof(uint32_t); + auto end = next_pos(actual_size - entry_header_size - sizeof(uint32_t)); SCYLLA_ASSERT(end <= next); // really small read... @@ -3211,6 +3640,84 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } } + co_return; + } else if (size == segment::fragmented_entry_size_magic) { + auto actual_size = checksum; + auto end = next_pos(actual_size - entry_header_size); + + clogger.debug("read_entry (fragmented) size = {}", actual_size); + + assert(end <= next); + + assert(end <= next); + // really small read... + buf = co_await read_data(segment::fragmented_entry_overhead_size); + in = buf.get_istream(); + + auto id = read(in); + auto off = read(in); + auto rem = read(in); + checksum = read(in); + + crc.process(actual_size); + crc.process(id); + crc.process(off); + crc.process(rem); + + if (crc.checksum() != checksum) { + auto slack = next - pos; + if (size != 0) { + clogger.debug("Fractured segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); + corrupt_size += slack; + } + co_await skip_to_chunk(next); + co_return; + } + + buf = co_await read_data(actual_size - entry_header_size - segment::fragmented_entry_overhead_size); + + using entry_fragment = db::commitlog::replay_state::impl::entry_fragment; + entry_fragment frag; + frag.offset = off; + frag.rem = rem; + frag.end = off + buf.size_bytes(); + frag.rpbuf = buffer_and_replay_position{std::move(buf), rp}; + + clogger.debug("fragment id={} off={}, end={}, rem={} ", id, off, frag.end, rem); + + auto& frag_states = state.fragment_state[id]; + + auto join = [](entry_fragment& f1, entry_fragment& f2) { + auto size1 = f1.rpbuf.buffer.size_bytes(); + auto size2 = f2.rpbuf.buffer.size_bytes(); + auto data1 = std::move(f1.rpbuf.buffer).release(); + auto data2 = std::move(f2.rpbuf.buffer).release(); + for (auto&& bb : data2) { + data1.emplace_back(std::move(bb)); + } + f1.rem = f2.rem; + f1.end = f2.end; + f1.rpbuf.buffer = fragmented_temporary_buffer(std::move(data1), size1+size2); + }; + + frag_states.emplace_back(std::move(frag)); + + std::ranges::sort(frag_states, std::less(), std::mem_fn(&entry_fragment::offset)); + + while (frag_states.size() > 1) { + auto& f1 = frag_states[frag_states.size() - 2]; + auto& f2 = frag_states.back(); + if (f1.end != f2.offset) { + break; + } + join(f1, f2); + frag_states.pop_back(); + } + if (frag_states.size() == 1 && frag_states.front().rem == 0 && frag_states.front().offset == 0) { + co_await func(std::move(frag_states.front().rpbuf)); + state.fragment_state.erase(id); + } + co_return; } @@ -3276,7 +3783,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f f = make_checked_file(commit_error_handler, std::move(f)); descriptor d(filename, pfx); - work w(std::move(f), d, std::move(next), off); + work w(std::move(f), d, std::move(next), *state._impl, off); co_await w.read_file(); } @@ -3412,7 +3919,7 @@ db::rp_handle& db::rp_handle::operator=(rp_handle&& v) noexcept { db::rp_handle::~rp_handle() { if (_rp != replay_position() && _h) { - _h->release_cf_count(_cf); + _h->release_cf_count(_cf, _rp); } } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 58348fb58e..ce7cb6fbdb 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -111,6 +111,7 @@ public: bool use_o_dsync = false; bool warn_about_segments_left_on_disk_after_shutdown = true; bool allow_going_over_size_limit = true; + bool allow_fragmented_entries = false; // The base segment ID to use. // The segment IDs of newly allocated segments will be issued sequentially @@ -136,7 +137,8 @@ public: static inline constexpr uint32_t segment_version_1 = 1u; static inline constexpr uint32_t segment_version_2 = 2u; static inline constexpr uint32_t segment_version_3 = 3u; - static inline constexpr uint32_t current_version = segment_version_3; + static inline constexpr uint32_t segment_version_4 = 4u; + static inline constexpr uint32_t current_version = segment_version_4; descriptor(descriptor&&) noexcept = default; descriptor(const descriptor&) = default; @@ -378,7 +380,7 @@ public: // (Re-)set data mix lifetime. void update_max_data_lifetime(std::optional commitlog_data_max_lifetime_in_seconds); - typedef std::function(buffer_and_replay_position)> commit_load_reader_func; + using commit_load_reader_func = std::function(buffer_and_replay_position)>; class segment_error : public std::exception {}; @@ -424,7 +426,18 @@ public: const char* what() const noexcept override; }; + class replay_state { + public: + replay_state(); + ~replay_state(); + private: + friend class commitlog; + class impl; + std::unique_ptr _impl; + }; + static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); + static future<> read_log_file(const replay_state&, sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); private: commitlog(config); diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index 7943457ede..58e61ef760 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -34,6 +34,7 @@ namespace detail { public: sector_split_iterator(const sector_split_iterator&) noexcept; sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size); + sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead); sector_split_iterator(); char* get_write() const { @@ -101,8 +102,9 @@ public: {} void set_with_schema(bool value) { - _with_schema = value; - compute_size(); + if (std::exchange(_with_schema, value) != value || _size == std::numeric_limits::max()) { + compute_size(); + } } bool with_schema() const { return _with_schema; diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 0a6344198c..4f12a56e38 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -1822,3 +1822,187 @@ SEASTAR_TEST_CASE(test_commitlog_update_max_data_lifetime) { co_await log.shutdown(); co_await log.clear(); } + +/** + * Test allocating oversized multi-entry +*/ +static future<> do_test_oversized_entry(size_t max_size_mb) { + commitlog::config cfg; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 8 * max_size_mb * smp::count; + cfg.allow_going_over_size_limit = false; + cfg.allow_fragmented_entries = true; + cfg.use_o_dsync = false; + + // not using cl_test, because we need to be able to abandon + // the log. + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + std::unordered_map rp2mut; + random_mutation_generator gen(random_mutation_generator::generate_counters(false)); + + { + auto log = co_await commitlog::create_commitlog(cfg); + auto size = log.max_record_size() * 2; + + std::vector writers; + std::vector mutations; + + size_t tot = 0; + // generate a bunch of mutation until we have more data than allowed. + while (tot <= size) { + mutations.emplace_back(gen(1).front()); + tot += mutations.back().representation().size(); + } + for (auto& fm : mutations) { + writers.emplace_back(gen.schema(), fm, commitlog::force_sync::no); + } + + // this will create an oversized entry set. + auto res = co_await log.add_entries(writers, db::timeout_clock::now() + 60s); + + auto i = mutations.begin(); + for (auto& h : res) { + rp2mut.emplace(h.release(), *i++); + } + co_await log.sync_all_segments(); + // as if we crashed -> segment left on disk + co_await log.release(); + co_await log.shutdown(); + } + + // new log, for replay. + auto log = co_await commitlog::create_commitlog(cfg); + std::exception_ptr e; + size_t n = 0; + + auto replay_set = co_await log.get_segments_to_replay(); + // Now replay the old commitlog and ensure we match all data. + + commitlog::replay_state state; + for (auto& f : replay_set) { + try { + co_await commitlog::read_log_file(state, f, cfg.fname_prefix, [&](commitlog::buffer_and_replay_position buf_rp) -> future<> { + auto&& buf = buf_rp.buffer; + auto&& rp = buf_rp.position; + + BOOST_CHECK(rp2mut.count(rp)); + commitlog_entry_reader cer(buf); + auto& fm = cer.mutation(); + auto m1 = fm.unfreeze(gen.schema()); + auto m2 = rp2mut.at(rp).unfreeze(gen.schema()); + + BOOST_CHECK_EQUAL(m1, m2); + ++n; + co_return; + }); + } catch (commitlog::segment_truncation&) { + e = std::current_exception(); + } + } + + BOOST_CHECK_EQUAL(n, rp2mut.size()); + + co_await log.shutdown(); + co_await log.clear(); + + if (n != rp2mut.size() && e) { + std::rethrow_exception(e); + } +} + +SEASTAR_TEST_CASE(test_oversized_entry_small) { + co_await do_test_oversized_entry(1); // small segments +} + +SEASTAR_TEST_CASE(test_oversized_entry_normal) { + co_await do_test_oversized_entry(32); // normal segments +} + +SEASTAR_TEST_CASE(test_oversized_entry_large) { + co_await do_test_oversized_entry(32*3); // bigger segments +} + +SEASTAR_TEST_CASE(test_oversized_single_entry) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 8 * max_size_mb * smp::count; + cfg.allow_going_over_size_limit = false; + cfg.allow_fragmented_entries = true; + cfg.use_o_dsync = false; + + // not using cl_test, because we need to be able to abandon + // the log. + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + + fragmented_temporary_buffer buf; + auto uuid = make_table_id(); + replay_position rp; + { + auto log = co_await commitlog::create_commitlog(cfg); + auto size = log.max_record_size() * 2; + + buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + rp = h.release(); // no freeing for you + + co_await log.sync_all_segments(); + // as if we crashed -> segment left on disk + co_await log.release(); + co_await log.shutdown(); + } + + // new log, for replay. + auto log = co_await commitlog::create_commitlog(cfg); + auto replay_set = co_await log.get_segments_to_replay(); + bool found = false; + std::exception_ptr e; + commitlog::replay_state state; + + // Now replay the old commitlog and ensure we match all data. + for (auto& f : replay_set) { + try { + co_await commitlog::read_log_file(state, f, cfg.fname_prefix, [&](commitlog::buffer_and_replay_position buf_rp) -> future<> { + auto&& buf_in = buf_rp.buffer; + auto&& rp_in = buf_rp.position; + + BOOST_CHECK_EQUAL(rp, rp_in); + BOOST_CHECK_EQUAL(buf.size_bytes(), buf_in.size_bytes()); + fragmented_temporary_buffer::view v1(buf); + fragmented_temporary_buffer::view v2(buf_in); + BOOST_CHECK_EQUAL(v1.size_bytes(), v2.size_bytes()); + found = true; + co_return; + }); + } catch (commitlog::segment_truncation&) { + e = std::current_exception(); + } + } + + BOOST_CHECK(found); + + co_await log.shutdown(); + co_await log.clear(); + + if (!found && e) { + std::rethrow_exception(e); + } +}