From 05bf2ae5d77dfea148802b6bfb22e14820098050 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 30 Apr 2024 14:22:09 +0000 Subject: [PATCH] commitlog: Handle oversized entries Refs #18161 Yet another approach to dealing with large commitlog submissions. We handle oversize single mutation by adding yet another entry type: fragmented. In this case we only add a fragment (aha) of the data that needs storing into each entry, along with metadata to correlate and reconstruct the full entry on replay. Because these fragmented entries are spread over N segments, we also need to add references from the first segment in a chain to the subsequent ones. These are released once we clear the relevant cf_id count in the base. * This approach has the downside that due to how serialization etc works w.r.t. mutations, we need to create an intermediate buffer to hold the full serialized target entry. This is then incrementally written into entries of < max_mutation_size, successively requesting more segments. On replay, when encountering a fragment chain, the fragment is added to a "state", i.e. a mapping of currently processing frag chains. Once we've found all fragments and concatenated the buffers into a single fragmented one, we can issue a replay callback as usual. Note that a replay caller will need to create and provide such a state object. Old signature replay function remains for tests and such. This approach bumps the file format (docs to come). To ensure "atomicity" we both force syncronization, and should the whole op fail, we restore segment state (rewinding), thus discarding data all we wrote. v2: * Improve some bookeep, ensure we keep track of segments and flush properly, to get counter correct --- db/commitlog/commitlog.cc | 619 +++++++++++++++++++++++++++++--- db/commitlog/commitlog.hh | 17 +- db/commitlog/commitlog_entry.hh | 6 +- test/boost/commitlog_test.cc | 184 ++++++++++ 4 files changed, 766 insertions(+), 60 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index cd52151098..85388a94a8 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -94,7 +94,7 @@ public: class db::cf_holder { public: virtual ~cf_holder() {}; - virtual void release_cf_count(const cf_id_type&) = 0; + virtual void release_cf_count(const cf_id_type&, const replay_position&) = 0; }; db::commitlog::config db::commitlog::config::from_db_config(const db::config& cfg, seastar::scheduling_group sg, size_t shard_available_memory) { @@ -186,6 +186,7 @@ db::commitlog::descriptor::operator db::replay_position() const { struct db::commitlog::entry_writer { force_sync sync; size_t num_entries; + bool fragmented = false; explicit entry_writer(force_sync fs, size_t ne = 1) : sync(fs) @@ -218,6 +219,19 @@ struct db::commitlog::entry_writer { */ virtual size_t size(segment&, size_t) = 0; + /** return the entry fragment offset for n:th entry (only called if `fragmented` is set) */ + virtual size_t frag_offset(size_t) const { + return 0; + } + /** return the entry fragment remaining for n:th entry (only called if `fragmented` is set) */ + virtual size_t frag_remaining(size_t) const { + return 0; + } + /** return the entry fragment ID for n:th entry (only called if `fragmented` is set) */ + virtual uint32_t frag_sequence_id(size_t) const { + return 0; + } + /* write nth entry */ virtual void write(segment&, output&, size_t) const = 0; @@ -327,6 +341,8 @@ public: requires std::derived_from && std::same_as().result())> future allocate_when_possible(T writer, db::timeout_clock::time_point timeout); + future<> oversized_allocation(entry_writer&, db::timeout_clock::time_point timeout); + replay_position min_position(); template @@ -422,15 +438,6 @@ public: return ++_ids; } - void sanity_check_size(size_t size) { - if (size > max_mutation_size) { - throw std::invalid_argument( - "Mutation of " + std::to_string(size) - + " bytes is too large for the maximum size of " - + std::to_string(max_mutation_size)); - } - } - future<> init(); future new_segment(); future active_segment(db::timeout_clock::time_point timeout); @@ -516,6 +523,7 @@ private: future<> _background_sync; seastar::gate _gate; uint64_t _new_counter = 0; + uint32_t _frag_id_counter = 0; std::optional _disk_write_alignment; future<> _pending_deletes = make_ready_future<>(); }; @@ -640,10 +648,14 @@ detail::sector_split_iterator::sector_split_iterator() {} detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size) + : sector_split_iterator(i, e, sector_size, detail::sector_overhead_size) +{} + +detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead) : _iter(i) , _end(e) , _ptr(i != e ? const_cast(i->get()) : nullptr) - , _size(i != e ? sector_size - sector_overhead_size : 0) + , _size(i != e ? sector_size - overhead : 0) , _sector_size(sector_size) {} @@ -747,6 +759,7 @@ class db::commitlog::segment : public enable_shared_from_this, public c size_t _buffer_ostream_size = 0; std::unordered_map _cf_dirty; std::unordered_map _cf_min_time; + std::unordered_multimap _extended_segments; time_point _sync_time; utils::flush_queue, clock_type> _pending_ops; @@ -802,10 +815,12 @@ public: // The commit log entry overhead in bytes (int: length + int: head checksum) static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t); + static constexpr size_t fragmented_entry_overhead_size = 4 * sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t); static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; static constexpr uint32_t multi_entry_size_magic = 0xffffffff; + static constexpr uint32_t fragmented_entry_size_magic = 0xfffffffe; // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position]) static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t); @@ -833,6 +848,9 @@ public: mode = dispose_mode::Delete; } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { clogger.warn("Segment {} is dirty and is left on disk.", *this); + for (auto& [rp, h] : _extended_segments) { + h.release(); // do not clear out sequential seqments either. + } } _segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes(); @@ -856,12 +874,16 @@ public: _known_schema_versions.clear(); } - void release_cf_count(const cf_id_type& cf) override { + void release_cf_count(const cf_id_type& cf) { mark_clean(cf, 1); if (can_delete()) { _segment_manager->discard_unused_segments(); } } + void release_cf_count(const cf_id_type& cf, const replay_position& rp) override { + _extended_segments.erase(rp); + release_cf_count(cf); + } bool must_sync() { if (_segment_manager->cfg.mode == sync_mode::BATCH) { @@ -961,12 +983,15 @@ public: return make_ready_future(shared_from_this()); } future close() { - _closed = true; + auto closing = !std::exchange(_closed, true); auto s = co_await sync(); co_await flush(); co_await terminate(); - _waste = _file.known_size() - file_position(); - _segment_manager->totals.wasted_size_on_disk += _waste; + if (closing) { + // only update this if we are the closers. + _waste = _file.known_size() - file_position(); + _segment_manager->totals.wasted_size_on_disk += _waste; + } co_return s; } future do_flush(uint64_t pos) { @@ -1144,10 +1169,11 @@ public: co_return; } + auto file_size = _file.known_size(); auto finally = defer([&] () noexcept { _segment_manager->notify_memory_written(size); _segment_manager->totals.buffer_list_bytes -= buf.size_bytes(); - if (_file.known_size() < _file_pos) { + if (file_size < _file_pos) { _segment_manager->totals.total_size_on_disk += (_file_pos - _file.known_size()); } }); @@ -1244,8 +1270,16 @@ public: must_sync, no_space, ok_need_batch_sync, + too_large, }; + size_t writer_size(entry_writer& writer, size_t size) const { + return size + writer.num_entries * entry_overhead_size + + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u) + + (writer.fragmented ? writer.num_entries * fragmented_entry_overhead_size : 0u) + ; // total size + } + /** * Add a "mutation" to the segment. * Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not @@ -1257,9 +1291,11 @@ public: } const auto size = writer.size(*this); - const auto s = size + writer.num_entries * entry_overhead_size + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u); // total size + const auto s = writer_size(writer, size); // total size - _segment_manager->sanity_check_size(s); + if (s > _segment_manager->max_mutation_size) { + return write_result::too_large; + } if (!is_still_allocating() || next_position(s) > _segment_manager->max_size) { // would we make the file too big? return write_result::no_space; @@ -1316,8 +1352,26 @@ public: crc32_nbo crc; - write(out, es); - crc.process(uint32_t(es)); + if (writer.fragmented) { + auto off = uint32_t(writer.frag_offset(entry)); + auto rem = uint32_t(writer.frag_remaining(entry)); + auto id = writer.frag_sequence_id(entry); + es += fragmented_entry_overhead_size; + write(out, fragmented_entry_size_magic); + write(out, es); + write(out, id); + write(out, off); + write(out, rem); + crc.process(uint32_t(fragmented_entry_size_magic)); + crc.process(uint32_t(es)); + crc.process(uint32_t(id)); + crc.process(uint32_t(off)); + crc.process(uint32_t(rem)); + } else { + write(out, es); + crc.process(uint32_t(es)); + } + write(out, crc.checksum()); // actual data @@ -1354,6 +1408,11 @@ public: position_type position() const { return position_type(_file_pos + buffer_position()); } + position_type available() const { + auto pos = position(); + auto lim = _segment_manager->cfg.commitlog_segment_size_in_mb*1024*1024; + return pos < lim ? lim - pos : 0; + } position_type next_position(size_t size) const { auto used = _buffer_ostream_size - _buffer_ostream.size(); @@ -1365,6 +1424,15 @@ public: return _file_pos; } + void reset_file_position(size_t file_pos) { + clogger.trace("{}: set file position to {}", fmt::streamed(*this), file_pos); + assert(_flush_pos >= file_pos); + _file_pos = file_pos; + _flush_pos = file_pos; + _buffer = {}; + _closed = false; + } + // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded // a.k.a. zero the tail. size_t clear_buffer_slack() { @@ -1435,12 +1503,86 @@ template requires std::derived_from && std::same_as().result())> future db::commitlog::segment_manager::allocate_when_possible(T writer, db::timeout_clock::time_point timeout) { auto size = writer.size(); - // If this is already too big now, we should throw early. It's also a correctness issue, since - // if we are too big at this moment we'll never reach allocate() to actually throw at that - // point. - sanity_check_size(size); + // If this is already too big now, we should fall back early. This measurement does not count + // overhead into the estimate, i.e. it might be worse. + if (size < max_mutation_size) { + auto fut = get_units(_request_controller, size, timeout); + if (_request_controller.waiters()) { + totals.requests_blocked_memory++; + } - auto fut = get_units(_request_controller, size, timeout); + scope_increment_counter allocating(totals.active_allocations); + + auto permit = co_await std::move(fut); + sseg_ptr s; + + if (!_segments.empty() && _segments.back()->is_still_allocating()) { + s = _segments.back(); + } else { + s = co_await active_segment(timeout); + } + + bool retry = true; + + while (retry) { + using write_result = segment::write_result; + + switch (s->allocate(writer, permit, timeout)) { + case write_result::ok: + co_return writer.result(); + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + s = co_await s->finish_and_get_new(timeout); + continue; + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + co_return writer.result(); + case write_result::too_large: + retry = false; // retry oversized + break; + } + } + } + + if (!cfg.allow_fragmented_entries) { + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size)); + } + + // really slow path, trying to fit huge thingamabobs... + co_await oversized_allocation(writer, timeout); + co_return writer.result(); +} + +future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writer, db::timeout_clock::time_point timeout) { + clogger.debug("Attempting oversized alloc of {} entry writer", writer.num_entries); + + auto size = writer.size(); + auto max_file_size = cfg.commitlog_segment_size_in_mb * 1024 * 1024; + + // check if this cannot be written at all... + if (!cfg.allow_going_over_size_limit) { + auto sector_size = _segments.empty() ? 512 /* worst case */ : _segments.front()->_alignment; + auto size_with_sector_overhead = size + (1 + size/sector_size) * detail::sector_overhead_size; + // more worst case + auto size_with_meta_overhead = size_with_sector_overhead + + (1 + size_with_sector_overhead/max_mutation_size) * (segment::entry_overhead_size + segment::fragmented_entry_overhead_size + segment::segment_overhead_size) + * (1 + size_with_sector_overhead/max_file_size) * segment::descriptor_header_size + ; + // this is not really true. We could have some space in current segment, + // but again, lets be conservative. + auto max_file_size_avail = max_disk_size - max_file_size; + + if (size_with_meta_overhead > max_file_size_avail) { + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for potentially available disk space of {}", size, max_file_size_avail)); + } + } + + std::vector> maybe_clear; + + assert(_request_controller.available_units() <= ssize_t(max_request_controller_units())); + auto fut = get_units(_request_controller, max_request_controller_units(), timeout); if (_request_controller.waiters()) { totals.requests_blocked_memory++; } @@ -1448,31 +1590,280 @@ future db::commitlog::segment_manager::allocate_when_possible(T writer, db::t scope_increment_counter allocating(totals.active_allocations); auto permit = co_await std::move(fut); - sseg_ptr s; + assert(_request_controller.available_units() == 0); - if (!_segments.empty() && _segments.back()->is_still_allocating()) { - s = _segments.back(); - } else { - s = co_await active_segment(timeout); + decltype(permit) fake_permit; // can't have allocate+sync release semaphore. + bool failed = false; + std::exception_ptr e; + + try { + for (size_t i = 0; i < writer.num_entries && !failed; ++i) { + using frag_ostream_type = segment::frag_ostream_type; + using base_ostream_type = segment::base_ostream_type; + + struct partial_writer : public entry_writer { + entry_writer& _writer; + size_t _index; + size_t _size; + mutable typename buffer_type::view _buffer; + size_t _rem; + size_t _off; + uint32_t _id; + replay_position _rp; + rp_handle _h; + + partial_writer(entry_writer& w, size_t i, size_t size) + : entry_writer(w.sync) + , _writer(w) + , _index(i) + , _size(size) + , _rem(0) + , _off(0) + , _id(0) + {} + partial_writer(entry_writer& w, size_t i, size_t size, typename buffer_type::view buffer, size_t rem, size_t off, uint32_t id) + : entry_writer(w.sync) + , _writer(w) + , _index(i) + , _size(size) + , _buffer(buffer) + , _rem(rem) + , _off(off) + , _id(id) + { + this->fragmented = true; + } + const cf_id_type& id(size_t) const override { + return _writer.id(_index); + } + size_t size() const override { + return _size; + } + size_t size(segment&) override { + return _size; + } + size_t size(segment&, size_t) override { + return _size; + } + void write(segment& seg, output& out, size_t) const override { + if (_id == 0) { + _writer.write(seg, out, _index); + return; + } + while (!_buffer.empty()) { + auto buf = _buffer.current_fragment(); + out.write(reinterpret_cast(buf.data()), buf.size()); + _buffer.remove_current(); + } + } + size_t frag_remaining(size_t) const override { + return _rem; + } + size_t frag_offset(size_t) const override { + return _off; + } + uint32_t frag_sequence_id(size_t) const override { + return _id; + } + + void result(size_t, rp_handle h) override { + if (_off == 0) { + _rp = h; + _writer.result(_index, std::move(h)); + } else { + _h = std::move(h); + } + } + }; + + auto get_segment = [&]() -> future { + sseg_ptr s = co_await active_segment(timeout); + if (maybe_clear.empty() || maybe_clear.back().first.get() != s.get()) { + if (s->position() > (segment::segment_overhead_size + segment::descriptor_header_size)) { + co_await s->sync(); // ensure file pos == restartable. + } + maybe_clear.emplace_back(s, s->file_position()); + } + co_return s; + }; + + sseg_ptr s = co_await get_segment(); + + clogger.trace("Writing entry {} of {}", i, writer.num_entries); + + using write_result = segment::write_result; + + size_t data_size; + bool wrote_entry = false; + + // if we are a multi-entry write, parts of it might be + // small enough for "normal" write path. + for (;;) { + data_size = writer.size(*s, i); + + if (s->writer_size(writer, data_size) >= max_mutation_size) { + break; + } + + partial_writer pw(writer, i, data_size); + + switch (s->allocate(pw, fake_permit, timeout)) { + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + [[fallthrough]]; + case write_result::ok: + wrote_entry = true; + break; + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + co_await s->close(); + s = co_await get_segment(); + continue; + case write_result::too_large: + break; + } + + break; + + } + + if (wrote_entry) { + clogger.debug("oversized wrote sub-entry {}, {} bytes", i, data_size); + continue; + } + + auto align = s->_alignment; + auto sector_size = align - detail::sector_overhead_size; + + auto buffer = acquire_buffer(data_size, align); + { + base_ostream_type buffer_ostream = frag_ostream_type(detail::sector_split_iterator(buffer.begin(), buffer.end(), align, 0), buffer.size_bytes()); + writer.write(*s, buffer_ostream, i); + } + auto strm = buffer.get_istream(); + size_t off = 0; + auto id = ++_frag_id_counter; + sseg_ptr seg_ptr = nullptr; + while (off < data_size && !failed) { + // do this each lap, since we might fill a segment up. + if (!s->is_still_allocating()) { + co_await s->close(); + s = co_await get_segment(); + } + // bytes not counting overhead + auto buf_rem = std::min(max_size - s->position(), s->_buffer_ostream.size()); + + size_t avail; + if (buf_rem > align) { + auto rem2 = buf_rem - (1 + buf_rem/sector_size) * detail::sector_overhead_size; + avail = std::min(rem2, max_mutation_size) + - segment::entry_overhead_size + - segment::fragmented_entry_overhead_size + ; + assert(avail < buf_rem); + } else { + co_await s->cycle(); + auto pos = s->position(); + auto max = std::max(pos, max_file_size); + auto file_rem = max - pos; + + if (file_rem < align) { + co_await s->close(); + continue; + } + + auto rem2 = file_rem - (1 + file_rem/sector_size) * detail::sector_overhead_size; + avail = std::min(rem2, max_mutation_size) + - segment::entry_overhead_size + - segment::fragmented_entry_overhead_size + - (pos == 0 ? segment::descriptor_header_size : 0) + - segment::segment_overhead_size + ; + } + if (!seg_ptr) { + seg_ptr = s; + } + + auto max_write = data_size - off; + auto to_write = std::min(avail, max_write); + auto rem = max_write - to_write; + partial_writer pw(writer, i, to_write, strm.read_view(to_write), rem, off, id); + + switch (s->allocate(pw, fake_permit, timeout)) { + case write_result::ok_need_batch_sync: + s = co_await s->batch_cycle(timeout); + [[fallthrough]]; + case write_result::ok: + break; + case write_result::must_sync: + s = co_await with_timeout(timeout, s->sync()); + continue; + case write_result::no_space: + [[fallthrough]]; + case write_result::too_large: + assert(0); // should not reach + failed = true; + break; + } + + if (!failed) { + clogger.debug("oversized wrote sub-entry {} fragment, id={} off={}, size={}, {} of {} bytes", i, id, off, to_write, off + to_write, data_size); + + off += to_write; + if (s != seg_ptr) { + // make first segment keep track of dependent data in + // latter segments. Note, if we fail, all primary + // rp_handles will expire, and we will free + // the extended data as well. See release_cf_count. + // Note also: In "normal" usage, we will not release + // the references to other segments until the _whole_ + // segment is clean. This is intentional, because if + // something survives, the segment survives and might + // be replayed, in which case we want to be able to + // reconstruct a fragmented entry, if for no other + // reason to be able to clear its state (see replay_state). + seg_ptr->_extended_segments.emplace(pw._rp, std::move(pw._h)); + } + } + } + } + } catch (...) { + e = std::current_exception(); + failed = true; + } + // ensure all segments we used are fully flushed. + // both to be able to undo, but also to restore all + // byte usage counts. + for (auto [s, fp] : maybe_clear) { + co_await s->sync(); } - for (;;) { - using write_result = segment::write_result; - - switch (s->allocate(writer, permit, timeout)) { - case write_result::ok: - co_return writer.result(); - case write_result::must_sync: - s = co_await with_timeout(timeout, s->sync()); - continue; - case write_result::no_space: - s = co_await s->finish_and_get_new(timeout); - continue; - case write_result::ok_need_batch_sync: - s = co_await s->batch_cycle(timeout); - co_return writer.result(); + if (failed) { + clogger.debug("Oversized allocation failed. Rolling back..."); + // reset file positions. + for (auto [s, fp] : maybe_clear) { + s->reset_file_position(fp); + if (fp == 0) { + s->mark_clean(); + _segments.erase(std::remove(_segments.begin(), _segments.end(), s), _segments.end()); + } } } + assert(_request_controller.available_units() == 0); + + permit.return_all(); + assert(_request_controller.available_units() == ssize_t(max_request_controller_units())); + + if (!failed) { + clogger.trace("Oversized allocation succeeded."); + co_return; + } + if (e) { + std::rethrow_exception(e); + } + throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size)); } const size_t db::commitlog::segment::default_size; @@ -1498,7 +1889,7 @@ db::commitlog::segment_manager::segment_manager(config c) return cfg; }()) , max_size(std::min(std::numeric_limits::max() / (1024 * 1024), std::max(cfg.commitlog_segment_size_in_mb, 1)) * 1024 * 1024) - , max_mutation_size(max_size >> 1) + , max_mutation_size(max_size >> 1) // note: can't up this by much, because we don't know the CRC sector overhead addition before we've actually opened each segment. , max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024) // our threshold for trying to force a flush. needs heristics, for now max - segment_size/2. , disk_usage_threshold([&] { @@ -1558,6 +1949,7 @@ future<> db::commitlog::segment_manager::replenish_reserve() { } continue; } catch (shutdown_marker&) { + _reserve_segments.abort(std::current_exception()); break; } catch (...) { clogger.warn("Exception in segment reservation: {}", std::current_exception()); @@ -2073,9 +2465,13 @@ future db::commitlog::segment_manager: if (!_segment_allocating) { auto f = new_segment(); // must check that we are not already done. - if (f.available()) { - f.get(); // maybe force exception - continue; + try { + if (f.available()) { + f.get(); // maybe force exception + continue; + } + } catch (shutdown_marker&) { + continue; // force new exception } _segment_allocating.emplace(f.discard_result().finally([this] { // clear the shared_future _before_ resolving its contents @@ -2675,6 +3071,7 @@ db::commitlog::add_entries(std::vector entry_writers, db class cl_entries_writer final : public entry_writer { std::vector _writers; std::unordered_set _known; + const segment* _sizes_computed = nullptr; public: std::vector res; @@ -2699,10 +3096,15 @@ db::commitlog::add_entries(std::vector entry_writers, db i->set_with_schema(!known); res += i->size(); } + _sizes_computed = &seg; return res; } size_t size(segment& seg, size_t i) override { - return _writers.at(i).size(); // we have already set schema known/unknown + auto& w = _writers.at(i); + if (_sizes_computed != &seg) { + w.set_with_schema(seg.is_schema_version_known(w.schema())); + } + return w.size(); } size_t size() const override { return std::accumulate(_writers.begin(), _writers.end(), size_t(0), [](size_t acc, const commitlog_entry_writer& w) { @@ -2842,10 +3244,36 @@ const char* db::commitlog::segment_truncation::what() const noexcept { return _msg.c_str(); } +class db::commitlog::replay_state::impl { +public: + struct entry_fragment { + size_t offset; + size_t rem; + size_t end; + buffer_and_replay_position rpbuf; + }; + + // mapping fragment ID -> state (i.e. data collected so far) + std::unordered_map> + fragment_state; +}; + +db::commitlog::replay_state::replay_state() + : _impl(std::make_unique()) +{} + +db::commitlog::replay_state::~replay_state() +{} + +future<> +db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { + co_await read_log_file(replay_state{}, std::move(filename), std::move(pfx), std::move(next), off, exts); +} + // No commit_io_check needed in the log reader since the database will fail // on error at startup if required future<> -db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { +db::commitlog::read_log_file(const replay_state& state, sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) { struct work { private: file_input_stream_options make_file_input_stream_options() { @@ -2859,6 +3287,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f descriptor d; commit_load_reader_func func; input_stream fin; + replay_state::impl& state; input_stream r; uint64_t id = 0; size_t pos = 0; @@ -2873,8 +3302,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f fragmented_temporary_buffer::reader frag_reader; fragmented_temporary_buffer buffer, initial; - work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0) - : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) { + work(file f, descriptor din, commit_load_reader_func fn, replay_state::impl& sn, position_type o = 0) + : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), state(sn), start_off(o) { } work(work&&) = default; @@ -3181,7 +3610,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f // check for multi-entry if (size == segment::multi_entry_size_magic) { auto actual_size = checksum; - auto end = pos + actual_size - entry_header_size - sizeof(uint32_t); + auto end = next_pos(actual_size - entry_header_size - sizeof(uint32_t)); SCYLLA_ASSERT(end <= next); // really small read... @@ -3211,6 +3640,84 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } } + co_return; + } else if (size == segment::fragmented_entry_size_magic) { + auto actual_size = checksum; + auto end = next_pos(actual_size - entry_header_size); + + clogger.debug("read_entry (fragmented) size = {}", actual_size); + + assert(end <= next); + + assert(end <= next); + // really small read... + buf = co_await read_data(segment::fragmented_entry_overhead_size); + in = buf.get_istream(); + + auto id = read(in); + auto off = read(in); + auto rem = read(in); + checksum = read(in); + + crc.process(actual_size); + crc.process(id); + crc.process(off); + crc.process(rem); + + if (crc.checksum() != checksum) { + auto slack = next - pos; + if (size != 0) { + clogger.debug("Fractured segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); + corrupt_size += slack; + } + co_await skip_to_chunk(next); + co_return; + } + + buf = co_await read_data(actual_size - entry_header_size - segment::fragmented_entry_overhead_size); + + using entry_fragment = db::commitlog::replay_state::impl::entry_fragment; + entry_fragment frag; + frag.offset = off; + frag.rem = rem; + frag.end = off + buf.size_bytes(); + frag.rpbuf = buffer_and_replay_position{std::move(buf), rp}; + + clogger.debug("fragment id={} off={}, end={}, rem={} ", id, off, frag.end, rem); + + auto& frag_states = state.fragment_state[id]; + + auto join = [](entry_fragment& f1, entry_fragment& f2) { + auto size1 = f1.rpbuf.buffer.size_bytes(); + auto size2 = f2.rpbuf.buffer.size_bytes(); + auto data1 = std::move(f1.rpbuf.buffer).release(); + auto data2 = std::move(f2.rpbuf.buffer).release(); + for (auto&& bb : data2) { + data1.emplace_back(std::move(bb)); + } + f1.rem = f2.rem; + f1.end = f2.end; + f1.rpbuf.buffer = fragmented_temporary_buffer(std::move(data1), size1+size2); + }; + + frag_states.emplace_back(std::move(frag)); + + std::ranges::sort(frag_states, std::less(), std::mem_fn(&entry_fragment::offset)); + + while (frag_states.size() > 1) { + auto& f1 = frag_states[frag_states.size() - 2]; + auto& f2 = frag_states.back(); + if (f1.end != f2.offset) { + break; + } + join(f1, f2); + frag_states.pop_back(); + } + if (frag_states.size() == 1 && frag_states.front().rem == 0 && frag_states.front().offset == 0) { + co_await func(std::move(frag_states.front().rpbuf)); + state.fragment_state.erase(id); + } + co_return; } @@ -3276,7 +3783,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f f = make_checked_file(commit_error_handler, std::move(f)); descriptor d(filename, pfx); - work w(std::move(f), d, std::move(next), off); + work w(std::move(f), d, std::move(next), *state._impl, off); co_await w.read_file(); } @@ -3412,7 +3919,7 @@ db::rp_handle& db::rp_handle::operator=(rp_handle&& v) noexcept { db::rp_handle::~rp_handle() { if (_rp != replay_position() && _h) { - _h->release_cf_count(_cf); + _h->release_cf_count(_cf, _rp); } } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 58348fb58e..ce7cb6fbdb 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -111,6 +111,7 @@ public: bool use_o_dsync = false; bool warn_about_segments_left_on_disk_after_shutdown = true; bool allow_going_over_size_limit = true; + bool allow_fragmented_entries = false; // The base segment ID to use. // The segment IDs of newly allocated segments will be issued sequentially @@ -136,7 +137,8 @@ public: static inline constexpr uint32_t segment_version_1 = 1u; static inline constexpr uint32_t segment_version_2 = 2u; static inline constexpr uint32_t segment_version_3 = 3u; - static inline constexpr uint32_t current_version = segment_version_3; + static inline constexpr uint32_t segment_version_4 = 4u; + static inline constexpr uint32_t current_version = segment_version_4; descriptor(descriptor&&) noexcept = default; descriptor(const descriptor&) = default; @@ -378,7 +380,7 @@ public: // (Re-)set data mix lifetime. void update_max_data_lifetime(std::optional commitlog_data_max_lifetime_in_seconds); - typedef std::function(buffer_and_replay_position)> commit_load_reader_func; + using commit_load_reader_func = std::function(buffer_and_replay_position)>; class segment_error : public std::exception {}; @@ -424,7 +426,18 @@ public: const char* what() const noexcept override; }; + class replay_state { + public: + replay_state(); + ~replay_state(); + private: + friend class commitlog; + class impl; + std::unique_ptr _impl; + }; + static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); + static future<> read_log_file(const replay_state&, sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); private: commitlog(config); diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index 7943457ede..58e61ef760 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -34,6 +34,7 @@ namespace detail { public: sector_split_iterator(const sector_split_iterator&) noexcept; sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size); + sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead); sector_split_iterator(); char* get_write() const { @@ -101,8 +102,9 @@ public: {} void set_with_schema(bool value) { - _with_schema = value; - compute_size(); + if (std::exchange(_with_schema, value) != value || _size == std::numeric_limits::max()) { + compute_size(); + } } bool with_schema() const { return _with_schema; diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 0a6344198c..4f12a56e38 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -1822,3 +1822,187 @@ SEASTAR_TEST_CASE(test_commitlog_update_max_data_lifetime) { co_await log.shutdown(); co_await log.clear(); } + +/** + * Test allocating oversized multi-entry +*/ +static future<> do_test_oversized_entry(size_t max_size_mb) { + commitlog::config cfg; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 8 * max_size_mb * smp::count; + cfg.allow_going_over_size_limit = false; + cfg.allow_fragmented_entries = true; + cfg.use_o_dsync = false; + + // not using cl_test, because we need to be able to abandon + // the log. + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + std::unordered_map rp2mut; + random_mutation_generator gen(random_mutation_generator::generate_counters(false)); + + { + auto log = co_await commitlog::create_commitlog(cfg); + auto size = log.max_record_size() * 2; + + std::vector writers; + std::vector mutations; + + size_t tot = 0; + // generate a bunch of mutation until we have more data than allowed. + while (tot <= size) { + mutations.emplace_back(gen(1).front()); + tot += mutations.back().representation().size(); + } + for (auto& fm : mutations) { + writers.emplace_back(gen.schema(), fm, commitlog::force_sync::no); + } + + // this will create an oversized entry set. + auto res = co_await log.add_entries(writers, db::timeout_clock::now() + 60s); + + auto i = mutations.begin(); + for (auto& h : res) { + rp2mut.emplace(h.release(), *i++); + } + co_await log.sync_all_segments(); + // as if we crashed -> segment left on disk + co_await log.release(); + co_await log.shutdown(); + } + + // new log, for replay. + auto log = co_await commitlog::create_commitlog(cfg); + std::exception_ptr e; + size_t n = 0; + + auto replay_set = co_await log.get_segments_to_replay(); + // Now replay the old commitlog and ensure we match all data. + + commitlog::replay_state state; + for (auto& f : replay_set) { + try { + co_await commitlog::read_log_file(state, f, cfg.fname_prefix, [&](commitlog::buffer_and_replay_position buf_rp) -> future<> { + auto&& buf = buf_rp.buffer; + auto&& rp = buf_rp.position; + + BOOST_CHECK(rp2mut.count(rp)); + commitlog_entry_reader cer(buf); + auto& fm = cer.mutation(); + auto m1 = fm.unfreeze(gen.schema()); + auto m2 = rp2mut.at(rp).unfreeze(gen.schema()); + + BOOST_CHECK_EQUAL(m1, m2); + ++n; + co_return; + }); + } catch (commitlog::segment_truncation&) { + e = std::current_exception(); + } + } + + BOOST_CHECK_EQUAL(n, rp2mut.size()); + + co_await log.shutdown(); + co_await log.clear(); + + if (n != rp2mut.size() && e) { + std::rethrow_exception(e); + } +} + +SEASTAR_TEST_CASE(test_oversized_entry_small) { + co_await do_test_oversized_entry(1); // small segments +} + +SEASTAR_TEST_CASE(test_oversized_entry_normal) { + co_await do_test_oversized_entry(32); // normal segments +} + +SEASTAR_TEST_CASE(test_oversized_entry_large) { + co_await do_test_oversized_entry(32*3); // bigger segments +} + +SEASTAR_TEST_CASE(test_oversized_single_entry) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 8 * max_size_mb * smp::count; + cfg.allow_going_over_size_limit = false; + cfg.allow_fragmented_entries = true; + cfg.use_o_dsync = false; + + // not using cl_test, because we need to be able to abandon + // the log. + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + + fragmented_temporary_buffer buf; + auto uuid = make_table_id(); + replay_position rp; + { + auto log = co_await commitlog::create_commitlog(cfg); + auto size = log.max_record_size() * 2; + + buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + rp = h.release(); // no freeing for you + + co_await log.sync_all_segments(); + // as if we crashed -> segment left on disk + co_await log.release(); + co_await log.shutdown(); + } + + // new log, for replay. + auto log = co_await commitlog::create_commitlog(cfg); + auto replay_set = co_await log.get_segments_to_replay(); + bool found = false; + std::exception_ptr e; + commitlog::replay_state state; + + // Now replay the old commitlog and ensure we match all data. + for (auto& f : replay_set) { + try { + co_await commitlog::read_log_file(state, f, cfg.fname_prefix, [&](commitlog::buffer_and_replay_position buf_rp) -> future<> { + auto&& buf_in = buf_rp.buffer; + auto&& rp_in = buf_rp.position; + + BOOST_CHECK_EQUAL(rp, rp_in); + BOOST_CHECK_EQUAL(buf.size_bytes(), buf_in.size_bytes()); + fragmented_temporary_buffer::view v1(buf); + fragmented_temporary_buffer::view v2(buf_in); + BOOST_CHECK_EQUAL(v1.size_bytes(), v2.size_bytes()); + found = true; + co_return; + }); + } catch (commitlog::segment_truncation&) { + e = std::current_exception(); + } + } + + BOOST_CHECK(found); + + co_await log.shutdown(); + co_await log.clear(); + + if (!found && e) { + std::rethrow_exception(e); + } +}