diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index 60540d2fc0..78564863ad 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -12,6 +12,11 @@ #include "write_buffer.hh" #include "utils/log_heap.hh" #include +#include "mutation_writer/token_group_based_splitting_writer.hh" + +namespace replica { +class table; +} // namespace replica namespace replica::logstor { @@ -218,6 +223,8 @@ public: virtual future disable_compaction(replica::compaction_group&) = 0; virtual compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) = 0; + + virtual future<> split_compaction(replica::table&, replica::compaction_group&, mutation_writer::classify_by_token_group) = 0; }; -} +} // namespace replica::logstor diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index c3165572a4..e04684d31c 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -415,12 +415,13 @@ public: separator_buffer allocate_separator_buffer() override; future<> flush_separator_buffer(separator_buffer buf, compaction_group&) override; -private: - void submit(compaction_group&) override; future<> stop_ongoing_compactions(compaction_group&) override; future disable_compaction(replica::compaction_group&) override; compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) override; + future<> split_compaction(replica::table&, compaction_group&, mutation_writer::classify_by_token_group) override; + +private: std::vector select_segments_for_compaction(const segment_descriptor_hist&); future<> do_compact(compaction_group&, abort_source&); @@ -770,6 +771,7 @@ private: } friend class compaction_manager_impl; + friend struct compaction_buffer; }; segment_manager_impl::segment_manager_impl(segment_manager_config config) @@ -791,8 +793,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config) _free_segments.reserve(_max_segments); // pre-allocate write buffers for compaction - // currently there is only a single compaction running at a time - size_t compaction_buffer_count = 1; + // at most a single compaction/split running at a time + // and at most two buffers used at a time by split. + size_t compaction_buffer_count = 2; _available_compaction_buffers.reserve(compaction_buffer_count); _compaction_buffer_pool.reserve(compaction_buffer_count); for (size_t i = 0; i < compaction_buffer_count; ++i) { @@ -1427,54 +1430,96 @@ future<> compaction_manager_impl::do_compact(compaction_group& cg, abort_source& }); } +// A single buffer used by compaction for rewriting records into new segments in a single compaction group. +// `rewrite_record` append a record to the buffer and given it's current location, and +// when the buffer is flushed it updates the index with the new location. +// the buffer is flushed when the next record doesn't fit and on close(). +struct compaction_buffer { + segment_manager_impl& sm; + write_buffer* buf = nullptr; + compaction_group& cg; + std::vector> pending_updates; + size_t flush_count{0}; + + explicit compaction_buffer(segment_manager_impl& sm, compaction_group& cg) + : sm(sm), cg(cg) + { + if (sm._available_compaction_buffers.empty()) { + throw std::runtime_error("No available compaction buffers"); + } + buf = sm._available_compaction_buffers.back(); + sm._available_compaction_buffers.pop_back(); + } + + compaction_buffer(compaction_buffer&& o) noexcept + : sm(o.sm), buf(std::exchange(o.buf, nullptr)), cg(o.cg) + , pending_updates(std::move(o.pending_updates)), flush_count(o.flush_count) {} + + ~compaction_buffer() { + if (buf) { + (void)buf->close().then([sm = &this->sm, buf = this->buf] { + buf->reset(); + sm->_available_compaction_buffers.push_back(buf); + }); + } + } + + future<> flush() { + if (buf->has_data()) { + flush_count++; + co_await sm.write_full_segment(*buf, cg, write_source::compaction); + logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size()); + } + co_await when_all_succeed(pending_updates.begin(), pending_updates.end()); + co_await buf->close(); + buf->reset(); + pending_updates.clear(); + } + + future<> close() { + co_await flush(); + sm._available_compaction_buffers.push_back(buf); + buf = nullptr; + } + + // Rewrite a single live record into this buffer, updating the index atomically. + // Returns immediately after queuing the write; caller must co_await close()/flush() + // to ensure all pending updates complete. + future<> rewrite_record(primary_index& index, log_location read_location, log_record record, + size_t& records_rewritten, size_t& records_skipped) { + if (!index.is_record_alive(record.key, read_location)) { + records_skipped++; + co_return; + } + + auto key = record.key; + log_record_writer writer(std::move(record)); + + if (!buf->can_fit(writer)) { + co_await flush(); + } + + auto write_and_update_index = buf->write(std::move(writer)).then_unpack( + [this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped] + (log_location new_location, seastar::gate::holder op) { + + if (index.update_record_location(key, read_location, new_location)) { + sm.free_record(read_location); + records_rewritten++; + } else { + // another write updated this key + sm.free_record(new_location); + records_skipped++; + } + }); + + pending_updates.push_back(std::move(write_and_update_index)); + } +}; + future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::vector segments) { logstor_logger.trace("Starting compaction of segments {} in compaction group {}:{}", segments, cg.schema()->id(), cg.group_id()); - struct compaction_buffer { - segment_manager_impl& sm; - write_buffer* buf = nullptr; - compaction_group& cg; - std::vector> pending_updates; - size_t flush_count{0}; - - explicit compaction_buffer(segment_manager_impl& sm, compaction_group& cg) - : sm(sm), cg(cg) - { - if (sm._available_compaction_buffers.empty()) { - throw std::runtime_error("No available compaction buffers"); - } - buf = sm._available_compaction_buffers.back(); - sm._available_compaction_buffers.pop_back(); - } - - ~compaction_buffer() { - if (buf) { - (void)buf->close().then([sm = &this->sm, buf = this->buf] { - buf->reset(); - sm->_available_compaction_buffers.push_back(buf); - }); - } - } - - future<> flush() { - if (buf->has_data()) { - flush_count++; - co_await sm.write_full_segment(*buf, cg, write_source::compaction); - logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size()); - } - co_await when_all_succeed(pending_updates.begin(), pending_updates.end()); - co_await buf->close(); - buf->reset(); - pending_updates.clear(); - } - - future<> close() { - co_await flush(); - sm._available_compaction_buffers.push_back(buf); - buf = nullptr; - } - }; - compaction_buffer cb(_sm, cg); size_t records_rewritten = 0; @@ -1483,38 +1528,8 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve auto& index = cg.get_logstor_index(); co_await _sm.for_each_record(segments, - [this, &index, &records_rewritten, &records_skipped, &cb] - (log_location read_location, log_record record) -> future<> { - - if (!index.is_record_alive(record.key, read_location)) { - records_skipped++; - _stats.compaction_records_skipped++; - co_return; - } - - auto key = record.key; - log_record_writer writer(std::move(record)); - - if (!cb.buf->can_fit(writer)) { - co_await cb.flush(); - } - - // write the record and then update the index with the new location - auto write_and_update_index = cb.buf->write(std::move(writer)).then_unpack( - [this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped] - (log_location new_location, seastar::gate::holder op) { - - if (index.update_record_location(key, read_location, new_location)) { - _sm.free_record(read_location); - records_rewritten++; - } else { - // another write updated this key - _sm.free_record(new_location); - records_skipped++; - } - }); - - cb.pending_updates.push_back(std::move(write_and_update_index)); + [&index, &records_rewritten, &records_skipped, &cb] (log_location read_location, log_record record) -> future<> { + co_await cb.rewrite_record(index, read_location, std::move(record), records_rewritten, records_skipped); }); co_await cb.close(); @@ -1523,13 +1538,14 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve records_rewritten, records_skipped, segments.size(), cb.flush_count); // wait for read operations that use the old locations - co_await cg.get_logstor_index().await_pending_reads(); + co_await index.await_pending_reads(); // Free the compacted segments auto& ss = cg.logstor_segments(); for (auto seg_id : segments) { logstor_logger.trace("Free segment {} by compaction", seg_id); - ss.remove_segment(_sm.get_segment_descriptor(seg_id)); + auto& desc = _sm.get_segment_descriptor(seg_id); + ss.remove_segment(desc); _sm.free_segment(seg_id); } @@ -1547,6 +1563,96 @@ void compaction_manager_impl::controller::update(size_t segment_write_count, siz _compaction_overhead = 0.8 * _compaction_overhead + 0.2 * new_overhead; } +future<> compaction_manager_impl::split_compaction(replica::table& t, compaction_group& src, mutation_writer::classify_by_token_group classifier) { + static constexpr size_t batch_size = 32; + + // Disable compaction on src for the duration of the split. + // This waits for any running compaction on src to finish first. + auto compaction_disable_guard = co_await disable_compaction(src); + + auto& src_segments = src.logstor_segments(); + auto& index = t.logstor_index(); + + while (!src_segments._segments.empty()) { + // Collect candidate IDs without yielding to avoid iterator invalidation. + std::vector candidates; + candidates.reserve(batch_size); + for (const auto& cand_desc : src_segments._segments) { + candidates.push_back(_sm.desc_to_segment_id(cand_desc)); + if (candidates.size() == batch_size) { + break; + } + } + + // For each candidate, check whether it already belongs to a single group (fast path) + // or straddles the split boundary (slow path). + // Fast-path segments are moved to the correct child group immediately. + // Slow-path segments are collected into a batch for rewriting. + std::vector batch; + batch.reserve(candidates.size()); + for (auto cand_seg_id : candidates) { + auto cand_hdr = co_await _sm.read_segment_header(cand_seg_id); + if (!cand_hdr || !std::holds_alternative(cand_hdr->v)) { + on_internal_error(logstor_logger, format("Invalid segment header for segment {} during split compaction", cand_seg_id)); + } + auto& cand_seg_hdr = std::get(cand_hdr->v); + if (classifier(cand_seg_hdr.first_token) == classifier(cand_seg_hdr.last_token)) { + // Fast path: segment already belongs to a single group. + // Remove from src and add to the correct child group. + logstor_logger.trace("Fast path split segment {} with token range [{}, {}]", cand_seg_id, cand_seg_hdr.first_token, cand_seg_hdr.last_token); + auto& cand_desc = _sm.get_segment_descriptor(cand_seg_id); + src_segments.remove_segment(cand_desc); + if (!t.add_logstor_segment(cand_seg_id, cand_desc, cand_seg_hdr.first_token, cand_seg_hdr.last_token) || cand_desc.owner == &src_segments) { + on_internal_error(logstor_logger, format("Failed to add segment {} to table {} during split", cand_seg_id, t.schema()->id())); + } + } else { + batch.push_back(cand_seg_id); + } + } + + if (batch.empty()) { + continue; + } + + // Slow path: rewrite live records from straddling segments into two compaction_buffers + // (one per target group). Both buffers write back into src; the next outer loop + // iteration will fast-path the resulting single-group segments to the correct child group. + + // Acquire _compaction_sem to be mutually exclusive with background compaction. + auto sem_units = co_await get_units(_compaction_sem, 1); + + std::array bufs{compaction_buffer{_sm, src}, compaction_buffer{_sm, src}}; + + size_t records_rewritten = 0; + size_t records_skipped = 0; + + co_await _sm.for_each_record(batch, + [&index, &classifier, &bufs, &records_rewritten, &records_skipped] (log_location read_location, log_record record) -> future<> { + auto& cb = bufs[classifier(record.key.dk.token())]; + co_await cb.rewrite_record(index, read_location, std::move(record), records_rewritten, records_skipped); + }); + + for (auto& cb : bufs) { + co_await cb.close(); + } + + logstor_logger.debug("Split compaction: {} records rewritten, {} skipped from {} segments", + records_rewritten, records_skipped, batch.size()); + + // All records are safely written to new segments in src. + // Await pending reads before freeing the source segments. + co_await index.await_pending_reads(); + + // Remove and free the source segments of this batch. + for (auto seg_id : batch) { + logstor_logger.trace("Free segment {} by split", seg_id); + auto& desc = _sm.get_segment_descriptor(seg_id); + src_segments.remove_segment(desc); + _sm.free_segment(seg_id); + } + } +} + separator_buffer compaction_manager_impl::allocate_separator_buffer() { if (_sm._available_separator_buffers.empty()) { throw std::runtime_error("No available separator buffers"); diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 2c2a62bf6f..7b35264c50 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -16,6 +16,7 @@ #include #include #include "bytes_fwd.hh" +#include "mutation_writer/token_group_based_splitting_writer.hh" #include "replica/logstor/write_buffer.hh" #include "types.hh" #include "utils/updateable_value.hh" @@ -23,6 +24,7 @@ namespace replica { class database; +class table; namespace logstor { diff --git a/replica/table.cc b/replica/table.cc index e4253d7719..0516e88f06 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1086,6 +1086,10 @@ future<> compaction_group::split(compaction::compaction_type_options::split opt, co_await cm.perform_offstrategy(*view, tablet_split_task_info); co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info); } + + if (_t.uses_logstor()) { + co_await get_logstor_compaction_manager().split_compaction(_t, *this, opt.classifier); + } } future<> compaction_group::discard_logstor_segments() { @@ -1135,6 +1139,7 @@ future<> storage_group::split(compaction::compaction_type_options::split opt, ta } auto holder = cg->async_gate().hold(); co_await cg->flush(); + co_await cg->flush_separator(); co_await cg->split(opt, tablet_split_task_info); } }