commitlog: Handle oversized entries
Refs #18161 Yet another approach to dealing with large commitlog submissions. We handle oversize single mutation by adding yet another entry type: fragmented. In this case we only add a fragment (aha) of the data that needs storing into each entry, along with metadata to correlate and reconstruct the full entry on replay. Because these fragmented entries are spread over N segments, we also need to add references from the first segment in a chain to the subsequent ones. These are released once we clear the relevant cf_id count in the base. * This approach has the downside that due to how serialization etc works w.r.t. mutations, we need to create an intermediate buffer to hold the full serialized target entry. This is then incrementally written into entries of < max_mutation_size, successively requesting more segments. On replay, when encountering a fragment chain, the fragment is added to a "state", i.e. a mapping of currently processing frag chains. Once we've found all fragments and concatenated the buffers into a single fragmented one, we can issue a replay callback as usual. Note that a replay caller will need to create and provide such a state object. Old signature replay function remains for tests and such. This approach bumps the file format (docs to come). To ensure "atomicity" we both force syncronization, and should the whole op fail, we restore segment state (rewinding), thus discarding data all we wrote. v2: * Improve some bookeep, ensure we keep track of segments and flush properly, to get counter correct
This commit is contained in:
@@ -94,7 +94,7 @@ public:
|
||||
class db::cf_holder {
|
||||
public:
|
||||
virtual ~cf_holder() {};
|
||||
virtual void release_cf_count(const cf_id_type&) = 0;
|
||||
virtual void release_cf_count(const cf_id_type&, const replay_position&) = 0;
|
||||
};
|
||||
|
||||
db::commitlog::config db::commitlog::config::from_db_config(const db::config& cfg, seastar::scheduling_group sg, size_t shard_available_memory) {
|
||||
@@ -186,6 +186,7 @@ db::commitlog::descriptor::operator db::replay_position() const {
|
||||
struct db::commitlog::entry_writer {
|
||||
force_sync sync;
|
||||
size_t num_entries;
|
||||
bool fragmented = false;
|
||||
|
||||
explicit entry_writer(force_sync fs, size_t ne = 1)
|
||||
: sync(fs)
|
||||
@@ -218,6 +219,19 @@ struct db::commitlog::entry_writer {
|
||||
*/
|
||||
virtual size_t size(segment&, size_t) = 0;
|
||||
|
||||
/** return the entry fragment offset for n:th entry (only called if `fragmented` is set) */
|
||||
virtual size_t frag_offset(size_t) const {
|
||||
return 0;
|
||||
}
|
||||
/** return the entry fragment remaining for n:th entry (only called if `fragmented` is set) */
|
||||
virtual size_t frag_remaining(size_t) const {
|
||||
return 0;
|
||||
}
|
||||
/** return the entry fragment ID for n:th entry (only called if `fragmented` is set) */
|
||||
virtual uint32_t frag_sequence_id(size_t) const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* write nth entry */
|
||||
virtual void write(segment&, output&, size_t) const = 0;
|
||||
|
||||
@@ -327,6 +341,8 @@ public:
|
||||
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
|
||||
future<R> allocate_when_possible(T writer, db::timeout_clock::time_point timeout);
|
||||
|
||||
future<> oversized_allocation(entry_writer&, db::timeout_clock::time_point timeout);
|
||||
|
||||
replay_position min_position();
|
||||
|
||||
template<typename T>
|
||||
@@ -422,15 +438,6 @@ public:
|
||||
return ++_ids;
|
||||
}
|
||||
|
||||
void sanity_check_size(size_t size) {
|
||||
if (size > max_mutation_size) {
|
||||
throw std::invalid_argument(
|
||||
"Mutation of " + std::to_string(size)
|
||||
+ " bytes is too large for the maximum size of "
|
||||
+ std::to_string(max_mutation_size));
|
||||
}
|
||||
}
|
||||
|
||||
future<> init();
|
||||
future<sseg_ptr> new_segment();
|
||||
future<sseg_ptr> active_segment(db::timeout_clock::time_point timeout);
|
||||
@@ -516,6 +523,7 @@ private:
|
||||
future<> _background_sync;
|
||||
seastar::gate _gate;
|
||||
uint64_t _new_counter = 0;
|
||||
uint32_t _frag_id_counter = 0;
|
||||
std::optional<size_t> _disk_write_alignment;
|
||||
future<> _pending_deletes = make_ready_future<>();
|
||||
};
|
||||
@@ -640,10 +648,14 @@ detail::sector_split_iterator::sector_split_iterator()
|
||||
{}
|
||||
|
||||
detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size)
|
||||
: sector_split_iterator(i, e, sector_size, detail::sector_overhead_size)
|
||||
{}
|
||||
|
||||
detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead)
|
||||
: _iter(i)
|
||||
, _end(e)
|
||||
, _ptr(i != e ? const_cast<char*>(i->get()) : nullptr)
|
||||
, _size(i != e ? sector_size - sector_overhead_size : 0)
|
||||
, _size(i != e ? sector_size - overhead : 0)
|
||||
, _sector_size(sector_size)
|
||||
{}
|
||||
|
||||
@@ -747,6 +759,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
|
||||
size_t _buffer_ostream_size = 0;
|
||||
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
|
||||
std::unordered_map<cf_id_type, gc_clock::time_point> _cf_min_time;
|
||||
std::unordered_multimap<replay_position, rp_handle> _extended_segments;
|
||||
time_point _sync_time;
|
||||
utils::flush_queue<replay_position, std::less<replay_position>, clock_type> _pending_ops;
|
||||
|
||||
@@ -802,10 +815,12 @@ public:
|
||||
// The commit log entry overhead in bytes (int: length + int: head checksum)
|
||||
static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t);
|
||||
static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t);
|
||||
static constexpr size_t fragmented_entry_overhead_size = 4 * sizeof(uint32_t);
|
||||
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
|
||||
static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t);
|
||||
static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C';
|
||||
static constexpr uint32_t multi_entry_size_magic = 0xffffffff;
|
||||
static constexpr uint32_t fragmented_entry_size_magic = 0xfffffffe;
|
||||
|
||||
// The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
|
||||
static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t);
|
||||
@@ -833,6 +848,9 @@ public:
|
||||
mode = dispose_mode::Delete;
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
for (auto& [rp, h] : _extended_segments) {
|
||||
h.release(); // do not clear out sequential seqments either.
|
||||
}
|
||||
}
|
||||
|
||||
_segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes();
|
||||
@@ -856,12 +874,16 @@ public:
|
||||
_known_schema_versions.clear();
|
||||
}
|
||||
|
||||
void release_cf_count(const cf_id_type& cf) override {
|
||||
void release_cf_count(const cf_id_type& cf) {
|
||||
mark_clean(cf, 1);
|
||||
if (can_delete()) {
|
||||
_segment_manager->discard_unused_segments();
|
||||
}
|
||||
}
|
||||
void release_cf_count(const cf_id_type& cf, const replay_position& rp) override {
|
||||
_extended_segments.erase(rp);
|
||||
release_cf_count(cf);
|
||||
}
|
||||
|
||||
bool must_sync() {
|
||||
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
|
||||
@@ -961,12 +983,15 @@ public:
|
||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||
}
|
||||
future<sseg_ptr> close() {
|
||||
_closed = true;
|
||||
auto closing = !std::exchange(_closed, true);
|
||||
auto s = co_await sync();
|
||||
co_await flush();
|
||||
co_await terminate();
|
||||
_waste = _file.known_size() - file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk += _waste;
|
||||
if (closing) {
|
||||
// only update this if we are the closers.
|
||||
_waste = _file.known_size() - file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk += _waste;
|
||||
}
|
||||
co_return s;
|
||||
}
|
||||
future<sseg_ptr> do_flush(uint64_t pos) {
|
||||
@@ -1144,10 +1169,11 @@ public:
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto file_size = _file.known_size();
|
||||
auto finally = defer([&] () noexcept {
|
||||
_segment_manager->notify_memory_written(size);
|
||||
_segment_manager->totals.buffer_list_bytes -= buf.size_bytes();
|
||||
if (_file.known_size() < _file_pos) {
|
||||
if (file_size < _file_pos) {
|
||||
_segment_manager->totals.total_size_on_disk += (_file_pos - _file.known_size());
|
||||
}
|
||||
});
|
||||
@@ -1244,8 +1270,16 @@ public:
|
||||
must_sync,
|
||||
no_space,
|
||||
ok_need_batch_sync,
|
||||
too_large,
|
||||
};
|
||||
|
||||
size_t writer_size(entry_writer& writer, size_t size) const {
|
||||
return size + writer.num_entries * entry_overhead_size
|
||||
+ (writer.num_entries > 1 ? multi_entry_overhead_size : 0u)
|
||||
+ (writer.fragmented ? writer.num_entries * fragmented_entry_overhead_size : 0u)
|
||||
; // total size
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a "mutation" to the segment.
|
||||
* Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not
|
||||
@@ -1257,9 +1291,11 @@ public:
|
||||
}
|
||||
|
||||
const auto size = writer.size(*this);
|
||||
const auto s = size + writer.num_entries * entry_overhead_size + (writer.num_entries > 1 ? multi_entry_overhead_size : 0u); // total size
|
||||
const auto s = writer_size(writer, size); // total size
|
||||
|
||||
_segment_manager->sanity_check_size(s);
|
||||
if (s > _segment_manager->max_mutation_size) {
|
||||
return write_result::too_large;
|
||||
}
|
||||
|
||||
if (!is_still_allocating() || next_position(s) > _segment_manager->max_size) { // would we make the file too big?
|
||||
return write_result::no_space;
|
||||
@@ -1316,8 +1352,26 @@ public:
|
||||
|
||||
crc32_nbo crc;
|
||||
|
||||
write<uint32_t>(out, es);
|
||||
crc.process(uint32_t(es));
|
||||
if (writer.fragmented) {
|
||||
auto off = uint32_t(writer.frag_offset(entry));
|
||||
auto rem = uint32_t(writer.frag_remaining(entry));
|
||||
auto id = writer.frag_sequence_id(entry);
|
||||
es += fragmented_entry_overhead_size;
|
||||
write<uint32_t>(out, fragmented_entry_size_magic);
|
||||
write<uint32_t>(out, es);
|
||||
write<uint32_t>(out, id);
|
||||
write<uint32_t>(out, off);
|
||||
write<uint32_t>(out, rem);
|
||||
crc.process(uint32_t(fragmented_entry_size_magic));
|
||||
crc.process(uint32_t(es));
|
||||
crc.process(uint32_t(id));
|
||||
crc.process(uint32_t(off));
|
||||
crc.process(uint32_t(rem));
|
||||
} else {
|
||||
write<uint32_t>(out, es);
|
||||
crc.process(uint32_t(es));
|
||||
}
|
||||
|
||||
write<uint32_t>(out, crc.checksum());
|
||||
|
||||
// actual data
|
||||
@@ -1354,6 +1408,11 @@ public:
|
||||
position_type position() const {
|
||||
return position_type(_file_pos + buffer_position());
|
||||
}
|
||||
position_type available() const {
|
||||
auto pos = position();
|
||||
auto lim = _segment_manager->cfg.commitlog_segment_size_in_mb*1024*1024;
|
||||
return pos < lim ? lim - pos : 0;
|
||||
}
|
||||
|
||||
position_type next_position(size_t size) const {
|
||||
auto used = _buffer_ostream_size - _buffer_ostream.size();
|
||||
@@ -1365,6 +1424,15 @@ public:
|
||||
return _file_pos;
|
||||
}
|
||||
|
||||
void reset_file_position(size_t file_pos) {
|
||||
clogger.trace("{}: set file position to {}", fmt::streamed(*this), file_pos);
|
||||
assert(_flush_pos >= file_pos);
|
||||
_file_pos = file_pos;
|
||||
_flush_pos = file_pos;
|
||||
_buffer = {};
|
||||
_closed = false;
|
||||
}
|
||||
|
||||
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
|
||||
// a.k.a. zero the tail.
|
||||
size_t clear_buffer_slack() {
|
||||
@@ -1435,12 +1503,86 @@ template<typename T, typename R>
|
||||
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
|
||||
future<R> db::commitlog::segment_manager::allocate_when_possible(T writer, db::timeout_clock::time_point timeout) {
|
||||
auto size = writer.size();
|
||||
// If this is already too big now, we should throw early. It's also a correctness issue, since
|
||||
// if we are too big at this moment we'll never reach allocate() to actually throw at that
|
||||
// point.
|
||||
sanity_check_size(size);
|
||||
// If this is already too big now, we should fall back early. This measurement does not count
|
||||
// overhead into the estimate, i.e. it might be worse.
|
||||
if (size < max_mutation_size) {
|
||||
auto fut = get_units(_request_controller, size, timeout);
|
||||
if (_request_controller.waiters()) {
|
||||
totals.requests_blocked_memory++;
|
||||
}
|
||||
|
||||
auto fut = get_units(_request_controller, size, timeout);
|
||||
scope_increment_counter allocating(totals.active_allocations);
|
||||
|
||||
auto permit = co_await std::move(fut);
|
||||
sseg_ptr s;
|
||||
|
||||
if (!_segments.empty() && _segments.back()->is_still_allocating()) {
|
||||
s = _segments.back();
|
||||
} else {
|
||||
s = co_await active_segment(timeout);
|
||||
}
|
||||
|
||||
bool retry = true;
|
||||
|
||||
while (retry) {
|
||||
using write_result = segment::write_result;
|
||||
|
||||
switch (s->allocate(writer, permit, timeout)) {
|
||||
case write_result::ok:
|
||||
co_return writer.result();
|
||||
case write_result::must_sync:
|
||||
s = co_await with_timeout(timeout, s->sync());
|
||||
continue;
|
||||
case write_result::no_space:
|
||||
s = co_await s->finish_and_get_new(timeout);
|
||||
continue;
|
||||
case write_result::ok_need_batch_sync:
|
||||
s = co_await s->batch_cycle(timeout);
|
||||
co_return writer.result();
|
||||
case write_result::too_large:
|
||||
retry = false; // retry oversized
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!cfg.allow_fragmented_entries) {
|
||||
throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size));
|
||||
}
|
||||
|
||||
// really slow path, trying to fit huge thingamabobs...
|
||||
co_await oversized_allocation(writer, timeout);
|
||||
co_return writer.result();
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writer, db::timeout_clock::time_point timeout) {
|
||||
clogger.debug("Attempting oversized alloc of {} entry writer", writer.num_entries);
|
||||
|
||||
auto size = writer.size();
|
||||
auto max_file_size = cfg.commitlog_segment_size_in_mb * 1024 * 1024;
|
||||
|
||||
// check if this cannot be written at all...
|
||||
if (!cfg.allow_going_over_size_limit) {
|
||||
auto sector_size = _segments.empty() ? 512 /* worst case */ : _segments.front()->_alignment;
|
||||
auto size_with_sector_overhead = size + (1 + size/sector_size) * detail::sector_overhead_size;
|
||||
// more worst case
|
||||
auto size_with_meta_overhead = size_with_sector_overhead
|
||||
+ (1 + size_with_sector_overhead/max_mutation_size) * (segment::entry_overhead_size + segment::fragmented_entry_overhead_size + segment::segment_overhead_size)
|
||||
* (1 + size_with_sector_overhead/max_file_size) * segment::descriptor_header_size
|
||||
;
|
||||
// this is not really true. We could have some space in current segment,
|
||||
// but again, lets be conservative.
|
||||
auto max_file_size_avail = max_disk_size - max_file_size;
|
||||
|
||||
if (size_with_meta_overhead > max_file_size_avail) {
|
||||
throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for potentially available disk space of {}", size, max_file_size_avail));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::pair<sseg_ptr, uint64_t>> maybe_clear;
|
||||
|
||||
assert(_request_controller.available_units() <= ssize_t(max_request_controller_units()));
|
||||
auto fut = get_units(_request_controller, max_request_controller_units(), timeout);
|
||||
if (_request_controller.waiters()) {
|
||||
totals.requests_blocked_memory++;
|
||||
}
|
||||
@@ -1448,31 +1590,280 @@ future<R> db::commitlog::segment_manager::allocate_when_possible(T writer, db::t
|
||||
scope_increment_counter allocating(totals.active_allocations);
|
||||
|
||||
auto permit = co_await std::move(fut);
|
||||
sseg_ptr s;
|
||||
assert(_request_controller.available_units() == 0);
|
||||
|
||||
if (!_segments.empty() && _segments.back()->is_still_allocating()) {
|
||||
s = _segments.back();
|
||||
} else {
|
||||
s = co_await active_segment(timeout);
|
||||
decltype(permit) fake_permit; // can't have allocate+sync release semaphore.
|
||||
bool failed = false;
|
||||
std::exception_ptr e;
|
||||
|
||||
try {
|
||||
for (size_t i = 0; i < writer.num_entries && !failed; ++i) {
|
||||
using frag_ostream_type = segment::frag_ostream_type;
|
||||
using base_ostream_type = segment::base_ostream_type;
|
||||
|
||||
struct partial_writer : public entry_writer {
|
||||
entry_writer& _writer;
|
||||
size_t _index;
|
||||
size_t _size;
|
||||
mutable typename buffer_type::view _buffer;
|
||||
size_t _rem;
|
||||
size_t _off;
|
||||
uint32_t _id;
|
||||
replay_position _rp;
|
||||
rp_handle _h;
|
||||
|
||||
partial_writer(entry_writer& w, size_t i, size_t size)
|
||||
: entry_writer(w.sync)
|
||||
, _writer(w)
|
||||
, _index(i)
|
||||
, _size(size)
|
||||
, _rem(0)
|
||||
, _off(0)
|
||||
, _id(0)
|
||||
{}
|
||||
partial_writer(entry_writer& w, size_t i, size_t size, typename buffer_type::view buffer, size_t rem, size_t off, uint32_t id)
|
||||
: entry_writer(w.sync)
|
||||
, _writer(w)
|
||||
, _index(i)
|
||||
, _size(size)
|
||||
, _buffer(buffer)
|
||||
, _rem(rem)
|
||||
, _off(off)
|
||||
, _id(id)
|
||||
{
|
||||
this->fragmented = true;
|
||||
}
|
||||
const cf_id_type& id(size_t) const override {
|
||||
return _writer.id(_index);
|
||||
}
|
||||
size_t size() const override {
|
||||
return _size;
|
||||
}
|
||||
size_t size(segment&) override {
|
||||
return _size;
|
||||
}
|
||||
size_t size(segment&, size_t) override {
|
||||
return _size;
|
||||
}
|
||||
void write(segment& seg, output& out, size_t) const override {
|
||||
if (_id == 0) {
|
||||
_writer.write(seg, out, _index);
|
||||
return;
|
||||
}
|
||||
while (!_buffer.empty()) {
|
||||
auto buf = _buffer.current_fragment();
|
||||
out.write(reinterpret_cast<const char*>(buf.data()), buf.size());
|
||||
_buffer.remove_current();
|
||||
}
|
||||
}
|
||||
size_t frag_remaining(size_t) const override {
|
||||
return _rem;
|
||||
}
|
||||
size_t frag_offset(size_t) const override {
|
||||
return _off;
|
||||
}
|
||||
uint32_t frag_sequence_id(size_t) const override {
|
||||
return _id;
|
||||
}
|
||||
|
||||
void result(size_t, rp_handle h) override {
|
||||
if (_off == 0) {
|
||||
_rp = h;
|
||||
_writer.result(_index, std::move(h));
|
||||
} else {
|
||||
_h = std::move(h);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto get_segment = [&]() -> future<sseg_ptr> {
|
||||
sseg_ptr s = co_await active_segment(timeout);
|
||||
if (maybe_clear.empty() || maybe_clear.back().first.get() != s.get()) {
|
||||
if (s->position() > (segment::segment_overhead_size + segment::descriptor_header_size)) {
|
||||
co_await s->sync(); // ensure file pos == restartable.
|
||||
}
|
||||
maybe_clear.emplace_back(s, s->file_position());
|
||||
}
|
||||
co_return s;
|
||||
};
|
||||
|
||||
sseg_ptr s = co_await get_segment();
|
||||
|
||||
clogger.trace("Writing entry {} of {}", i, writer.num_entries);
|
||||
|
||||
using write_result = segment::write_result;
|
||||
|
||||
size_t data_size;
|
||||
bool wrote_entry = false;
|
||||
|
||||
// if we are a multi-entry write, parts of it might be
|
||||
// small enough for "normal" write path.
|
||||
for (;;) {
|
||||
data_size = writer.size(*s, i);
|
||||
|
||||
if (s->writer_size(writer, data_size) >= max_mutation_size) {
|
||||
break;
|
||||
}
|
||||
|
||||
partial_writer pw(writer, i, data_size);
|
||||
|
||||
switch (s->allocate(pw, fake_permit, timeout)) {
|
||||
case write_result::ok_need_batch_sync:
|
||||
s = co_await s->batch_cycle(timeout);
|
||||
[[fallthrough]];
|
||||
case write_result::ok:
|
||||
wrote_entry = true;
|
||||
break;
|
||||
case write_result::must_sync:
|
||||
s = co_await with_timeout(timeout, s->sync());
|
||||
continue;
|
||||
case write_result::no_space:
|
||||
co_await s->close();
|
||||
s = co_await get_segment();
|
||||
continue;
|
||||
case write_result::too_large:
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
if (wrote_entry) {
|
||||
clogger.debug("oversized wrote sub-entry {}, {} bytes", i, data_size);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto align = s->_alignment;
|
||||
auto sector_size = align - detail::sector_overhead_size;
|
||||
|
||||
auto buffer = acquire_buffer(data_size, align);
|
||||
{
|
||||
base_ostream_type buffer_ostream = frag_ostream_type(detail::sector_split_iterator(buffer.begin(), buffer.end(), align, 0), buffer.size_bytes());
|
||||
writer.write(*s, buffer_ostream, i);
|
||||
}
|
||||
auto strm = buffer.get_istream();
|
||||
size_t off = 0;
|
||||
auto id = ++_frag_id_counter;
|
||||
sseg_ptr seg_ptr = nullptr;
|
||||
while (off < data_size && !failed) {
|
||||
// do this each lap, since we might fill a segment up.
|
||||
if (!s->is_still_allocating()) {
|
||||
co_await s->close();
|
||||
s = co_await get_segment();
|
||||
}
|
||||
// bytes not counting overhead
|
||||
auto buf_rem = std::min(max_size - s->position(), s->_buffer_ostream.size());
|
||||
|
||||
size_t avail;
|
||||
if (buf_rem > align) {
|
||||
auto rem2 = buf_rem - (1 + buf_rem/sector_size) * detail::sector_overhead_size;
|
||||
avail = std::min(rem2, max_mutation_size)
|
||||
- segment::entry_overhead_size
|
||||
- segment::fragmented_entry_overhead_size
|
||||
;
|
||||
assert(avail < buf_rem);
|
||||
} else {
|
||||
co_await s->cycle();
|
||||
auto pos = s->position();
|
||||
auto max = std::max<size_t>(pos, max_file_size);
|
||||
auto file_rem = max - pos;
|
||||
|
||||
if (file_rem < align) {
|
||||
co_await s->close();
|
||||
continue;
|
||||
}
|
||||
|
||||
auto rem2 = file_rem - (1 + file_rem/sector_size) * detail::sector_overhead_size;
|
||||
avail = std::min(rem2, max_mutation_size)
|
||||
- segment::entry_overhead_size
|
||||
- segment::fragmented_entry_overhead_size
|
||||
- (pos == 0 ? segment::descriptor_header_size : 0)
|
||||
- segment::segment_overhead_size
|
||||
;
|
||||
}
|
||||
if (!seg_ptr) {
|
||||
seg_ptr = s;
|
||||
}
|
||||
|
||||
auto max_write = data_size - off;
|
||||
auto to_write = std::min(avail, max_write);
|
||||
auto rem = max_write - to_write;
|
||||
partial_writer pw(writer, i, to_write, strm.read_view(to_write), rem, off, id);
|
||||
|
||||
switch (s->allocate(pw, fake_permit, timeout)) {
|
||||
case write_result::ok_need_batch_sync:
|
||||
s = co_await s->batch_cycle(timeout);
|
||||
[[fallthrough]];
|
||||
case write_result::ok:
|
||||
break;
|
||||
case write_result::must_sync:
|
||||
s = co_await with_timeout(timeout, s->sync());
|
||||
continue;
|
||||
case write_result::no_space:
|
||||
[[fallthrough]];
|
||||
case write_result::too_large:
|
||||
assert(0); // should not reach
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!failed) {
|
||||
clogger.debug("oversized wrote sub-entry {} fragment, id={} off={}, size={}, {} of {} bytes", i, id, off, to_write, off + to_write, data_size);
|
||||
|
||||
off += to_write;
|
||||
if (s != seg_ptr) {
|
||||
// make first segment keep track of dependent data in
|
||||
// latter segments. Note, if we fail, all primary
|
||||
// rp_handles will expire, and we will free
|
||||
// the extended data as well. See release_cf_count.
|
||||
// Note also: In "normal" usage, we will not release
|
||||
// the references to other segments until the _whole_
|
||||
// segment is clean. This is intentional, because if
|
||||
// something survives, the segment survives and might
|
||||
// be replayed, in which case we want to be able to
|
||||
// reconstruct a fragmented entry, if for no other
|
||||
// reason to be able to clear its state (see replay_state).
|
||||
seg_ptr->_extended_segments.emplace(pw._rp, std::move(pw._h));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
e = std::current_exception();
|
||||
failed = true;
|
||||
}
|
||||
// ensure all segments we used are fully flushed.
|
||||
// both to be able to undo, but also to restore all
|
||||
// byte usage counts.
|
||||
for (auto [s, fp] : maybe_clear) {
|
||||
co_await s->sync();
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
using write_result = segment::write_result;
|
||||
|
||||
switch (s->allocate(writer, permit, timeout)) {
|
||||
case write_result::ok:
|
||||
co_return writer.result();
|
||||
case write_result::must_sync:
|
||||
s = co_await with_timeout(timeout, s->sync());
|
||||
continue;
|
||||
case write_result::no_space:
|
||||
s = co_await s->finish_and_get_new(timeout);
|
||||
continue;
|
||||
case write_result::ok_need_batch_sync:
|
||||
s = co_await s->batch_cycle(timeout);
|
||||
co_return writer.result();
|
||||
if (failed) {
|
||||
clogger.debug("Oversized allocation failed. Rolling back...");
|
||||
// reset file positions.
|
||||
for (auto [s, fp] : maybe_clear) {
|
||||
s->reset_file_position(fp);
|
||||
if (fp == 0) {
|
||||
s->mark_clean();
|
||||
_segments.erase(std::remove(_segments.begin(), _segments.end(), s), _segments.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
assert(_request_controller.available_units() == 0);
|
||||
|
||||
permit.return_all();
|
||||
assert(_request_controller.available_units() == ssize_t(max_request_controller_units()));
|
||||
|
||||
if (!failed) {
|
||||
clogger.trace("Oversized allocation succeeded.");
|
||||
co_return;
|
||||
}
|
||||
if (e) {
|
||||
std::rethrow_exception(e);
|
||||
}
|
||||
throw std::invalid_argument(fmt::format("Mutation of {} bytes is too large for the maximum size of {}", size, max_mutation_size));
|
||||
}
|
||||
|
||||
const size_t db::commitlog::segment::default_size;
|
||||
@@ -1498,7 +1889,7 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
return cfg;
|
||||
}())
|
||||
, max_size(std::min<size_t>(std::numeric_limits<position_type>::max() / (1024 * 1024), std::max<size_t>(cfg.commitlog_segment_size_in_mb, 1)) * 1024 * 1024)
|
||||
, max_mutation_size(max_size >> 1)
|
||||
, max_mutation_size(max_size >> 1) // note: can't up this by much, because we don't know the CRC sector overhead addition before we've actually opened each segment.
|
||||
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
|
||||
// our threshold for trying to force a flush. needs heristics, for now max - segment_size/2.
|
||||
, disk_usage_threshold([&] {
|
||||
@@ -1558,6 +1949,7 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
|
||||
}
|
||||
continue;
|
||||
} catch (shutdown_marker&) {
|
||||
_reserve_segments.abort(std::current_exception());
|
||||
break;
|
||||
} catch (...) {
|
||||
clogger.warn("Exception in segment reservation: {}", std::current_exception());
|
||||
@@ -2073,9 +2465,13 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
if (!_segment_allocating) {
|
||||
auto f = new_segment();
|
||||
// must check that we are not already done.
|
||||
if (f.available()) {
|
||||
f.get(); // maybe force exception
|
||||
continue;
|
||||
try {
|
||||
if (f.available()) {
|
||||
f.get(); // maybe force exception
|
||||
continue;
|
||||
}
|
||||
} catch (shutdown_marker&) {
|
||||
continue; // force new exception
|
||||
}
|
||||
_segment_allocating.emplace(f.discard_result().finally([this] {
|
||||
// clear the shared_future _before_ resolving its contents
|
||||
@@ -2675,6 +3071,7 @@ db::commitlog::add_entries(std::vector<commitlog_entry_writer> entry_writers, db
|
||||
class cl_entries_writer final : public entry_writer {
|
||||
std::vector<commitlog_entry_writer> _writers;
|
||||
std::unordered_set<table_schema_version> _known;
|
||||
const segment* _sizes_computed = nullptr;
|
||||
public:
|
||||
std::vector<rp_handle> res;
|
||||
|
||||
@@ -2699,10 +3096,15 @@ db::commitlog::add_entries(std::vector<commitlog_entry_writer> entry_writers, db
|
||||
i->set_with_schema(!known);
|
||||
res += i->size();
|
||||
}
|
||||
_sizes_computed = &seg;
|
||||
return res;
|
||||
}
|
||||
size_t size(segment& seg, size_t i) override {
|
||||
return _writers.at(i).size(); // we have already set schema known/unknown
|
||||
auto& w = _writers.at(i);
|
||||
if (_sizes_computed != &seg) {
|
||||
w.set_with_schema(seg.is_schema_version_known(w.schema()));
|
||||
}
|
||||
return w.size();
|
||||
}
|
||||
size_t size() const override {
|
||||
return std::accumulate(_writers.begin(), _writers.end(), size_t(0), [](size_t acc, const commitlog_entry_writer& w) {
|
||||
@@ -2842,10 +3244,36 @@ const char* db::commitlog::segment_truncation::what() const noexcept {
|
||||
return _msg.c_str();
|
||||
}
|
||||
|
||||
class db::commitlog::replay_state::impl {
|
||||
public:
|
||||
struct entry_fragment {
|
||||
size_t offset;
|
||||
size_t rem;
|
||||
size_t end;
|
||||
buffer_and_replay_position rpbuf;
|
||||
};
|
||||
|
||||
// mapping fragment ID -> state (i.e. data collected so far)
|
||||
std::unordered_map<uint32_t, std::vector<entry_fragment>>
|
||||
fragment_state;
|
||||
};
|
||||
|
||||
db::commitlog::replay_state::replay_state()
|
||||
: _impl(std::make_unique<impl>())
|
||||
{}
|
||||
|
||||
db::commitlog::replay_state::~replay_state()
|
||||
{}
|
||||
|
||||
future<>
|
||||
db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) {
|
||||
co_await read_log_file(replay_state{}, std::move(filename), std::move(pfx), std::move(next), off, exts);
|
||||
}
|
||||
|
||||
// No commit_io_check needed in the log reader since the database will fail
|
||||
// on error at startup if required
|
||||
future<>
|
||||
db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) {
|
||||
db::commitlog::read_log_file(const replay_state& state, sstring filename, sstring pfx, commit_load_reader_func next, position_type off, const db::extensions* exts) {
|
||||
struct work {
|
||||
private:
|
||||
file_input_stream_options make_file_input_stream_options() {
|
||||
@@ -2859,6 +3287,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
descriptor d;
|
||||
commit_load_reader_func func;
|
||||
input_stream<char> fin;
|
||||
replay_state::impl& state;
|
||||
input_stream<char> r;
|
||||
uint64_t id = 0;
|
||||
size_t pos = 0;
|
||||
@@ -2873,8 +3302,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
fragmented_temporary_buffer::reader frag_reader;
|
||||
fragmented_temporary_buffer buffer, initial;
|
||||
|
||||
work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0)
|
||||
: f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) {
|
||||
work(file f, descriptor din, commit_load_reader_func fn, replay_state::impl& sn, position_type o = 0)
|
||||
: f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), state(sn), start_off(o) {
|
||||
}
|
||||
work(work&&) = default;
|
||||
|
||||
@@ -3181,7 +3610,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
// check for multi-entry
|
||||
if (size == segment::multi_entry_size_magic) {
|
||||
auto actual_size = checksum;
|
||||
auto end = pos + actual_size - entry_header_size - sizeof(uint32_t);
|
||||
auto end = next_pos(actual_size - entry_header_size - sizeof(uint32_t));
|
||||
|
||||
SCYLLA_ASSERT(end <= next);
|
||||
// really small read...
|
||||
@@ -3211,6 +3640,84 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
}
|
||||
}
|
||||
|
||||
co_return;
|
||||
} else if (size == segment::fragmented_entry_size_magic) {
|
||||
auto actual_size = checksum;
|
||||
auto end = next_pos(actual_size - entry_header_size);
|
||||
|
||||
clogger.debug("read_entry (fragmented) size = {}", actual_size);
|
||||
|
||||
assert(end <= next);
|
||||
|
||||
assert(end <= next);
|
||||
// really small read...
|
||||
buf = co_await read_data(segment::fragmented_entry_overhead_size);
|
||||
in = buf.get_istream();
|
||||
|
||||
auto id = read<uint32_t>(in);
|
||||
auto off = read<uint32_t>(in);
|
||||
auto rem = read<uint32_t>(in);
|
||||
checksum = read<uint32_t>(in);
|
||||
|
||||
crc.process(actual_size);
|
||||
crc.process(id);
|
||||
crc.process(off);
|
||||
crc.process(rem);
|
||||
|
||||
if (crc.checksum() != checksum) {
|
||||
auto slack = next - pos;
|
||||
if (size != 0) {
|
||||
clogger.debug("Fractured segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
|
||||
corrupt_size += slack;
|
||||
}
|
||||
co_await skip_to_chunk(next);
|
||||
co_return;
|
||||
}
|
||||
|
||||
buf = co_await read_data(actual_size - entry_header_size - segment::fragmented_entry_overhead_size);
|
||||
|
||||
using entry_fragment = db::commitlog::replay_state::impl::entry_fragment;
|
||||
entry_fragment frag;
|
||||
frag.offset = off;
|
||||
frag.rem = rem;
|
||||
frag.end = off + buf.size_bytes();
|
||||
frag.rpbuf = buffer_and_replay_position{std::move(buf), rp};
|
||||
|
||||
clogger.debug("fragment id={} off={}, end={}, rem={} ", id, off, frag.end, rem);
|
||||
|
||||
auto& frag_states = state.fragment_state[id];
|
||||
|
||||
auto join = [](entry_fragment& f1, entry_fragment& f2) {
|
||||
auto size1 = f1.rpbuf.buffer.size_bytes();
|
||||
auto size2 = f2.rpbuf.buffer.size_bytes();
|
||||
auto data1 = std::move(f1.rpbuf.buffer).release();
|
||||
auto data2 = std::move(f2.rpbuf.buffer).release();
|
||||
for (auto&& bb : data2) {
|
||||
data1.emplace_back(std::move(bb));
|
||||
}
|
||||
f1.rem = f2.rem;
|
||||
f1.end = f2.end;
|
||||
f1.rpbuf.buffer = fragmented_temporary_buffer(std::move(data1), size1+size2);
|
||||
};
|
||||
|
||||
frag_states.emplace_back(std::move(frag));
|
||||
|
||||
std::ranges::sort(frag_states, std::less(), std::mem_fn(&entry_fragment::offset));
|
||||
|
||||
while (frag_states.size() > 1) {
|
||||
auto& f1 = frag_states[frag_states.size() - 2];
|
||||
auto& f2 = frag_states.back();
|
||||
if (f1.end != f2.offset) {
|
||||
break;
|
||||
}
|
||||
join(f1, f2);
|
||||
frag_states.pop_back();
|
||||
}
|
||||
if (frag_states.size() == 1 && frag_states.front().rem == 0 && frag_states.front().offset == 0) {
|
||||
co_await func(std::move(frag_states.front().rpbuf));
|
||||
state.fragment_state.erase(id);
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -3276,7 +3783,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
f = make_checked_file(commit_error_handler, std::move(f));
|
||||
|
||||
descriptor d(filename, pfx);
|
||||
work w(std::move(f), d, std::move(next), off);
|
||||
work w(std::move(f), d, std::move(next), *state._impl, off);
|
||||
|
||||
co_await w.read_file();
|
||||
}
|
||||
@@ -3412,7 +3919,7 @@ db::rp_handle& db::rp_handle::operator=(rp_handle&& v) noexcept {
|
||||
|
||||
db::rp_handle::~rp_handle() {
|
||||
if (_rp != replay_position() && _h) {
|
||||
_h->release_cf_count(_cf);
|
||||
_h->release_cf_count(_cf, _rp);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user