logstor: tablet split

implement tablet split for logstor.

flush the separator and then perform split as a new type of compaction:
take a batch of segments from the source compaction group, read them and
write all live records into left/right write buffers according to the
split classifier, flush them to the compaction group, and free the old
segments. segments that fit in a single target compaction group are
removed from the source and added to the correct target group.
This commit is contained in:
Michael Litvak
2026-03-19 15:32:35 +01:00
parent 5de39afc24
commit 9fd6dace72
4 changed files with 204 additions and 84 deletions

View File

@@ -12,6 +12,11 @@
#include "write_buffer.hh"
#include "utils/log_heap.hh"
#include <seastar/coroutine/maybe_yield.hh>
#include "mutation_writer/token_group_based_splitting_writer.hh"
namespace replica {
class table;
} // namespace replica
namespace replica::logstor {
@@ -218,6 +223,8 @@ public:
virtual future<compaction_reenabler> disable_compaction(replica::compaction_group&) = 0;
virtual compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) = 0;
virtual future<> split_compaction(replica::table&, replica::compaction_group&, mutation_writer::classify_by_token_group) = 0;
};
}
} // namespace replica::logstor

View File

@@ -415,12 +415,13 @@ public:
separator_buffer allocate_separator_buffer() override;
future<> flush_separator_buffer(separator_buffer buf, compaction_group&) override;
private:
void submit(compaction_group&) override;
future<> stop_ongoing_compactions(compaction_group&) override;
future<compaction_reenabler> disable_compaction(replica::compaction_group&) override;
compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) override;
future<> split_compaction(replica::table&, compaction_group&, mutation_writer::classify_by_token_group) override;
private:
std::vector<log_segment_id> select_segments_for_compaction(const segment_descriptor_hist&);
future<> do_compact(compaction_group&, abort_source&);
@@ -770,6 +771,7 @@ private:
}
friend class compaction_manager_impl;
friend struct compaction_buffer;
};
segment_manager_impl::segment_manager_impl(segment_manager_config config)
@@ -791,8 +793,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config)
_free_segments.reserve(_max_segments);
// pre-allocate write buffers for compaction
// currently there is only a single compaction running at a time
size_t compaction_buffer_count = 1;
// at most a single compaction/split running at a time
// and at most two buffers used at a time by split.
size_t compaction_buffer_count = 2;
_available_compaction_buffers.reserve(compaction_buffer_count);
_compaction_buffer_pool.reserve(compaction_buffer_count);
for (size_t i = 0; i < compaction_buffer_count; ++i) {
@@ -1427,54 +1430,96 @@ future<> compaction_manager_impl::do_compact(compaction_group& cg, abort_source&
});
}
// A single buffer used by compaction for rewriting records into new segments in a single compaction group.
// `rewrite_record` append a record to the buffer and given it's current location, and
// when the buffer is flushed it updates the index with the new location.
// the buffer is flushed when the next record doesn't fit and on close().
struct compaction_buffer {
segment_manager_impl& sm;
write_buffer* buf = nullptr;
compaction_group& cg;
std::vector<future<>> pending_updates;
size_t flush_count{0};
explicit compaction_buffer(segment_manager_impl& sm, compaction_group& cg)
: sm(sm), cg(cg)
{
if (sm._available_compaction_buffers.empty()) {
throw std::runtime_error("No available compaction buffers");
}
buf = sm._available_compaction_buffers.back();
sm._available_compaction_buffers.pop_back();
}
compaction_buffer(compaction_buffer&& o) noexcept
: sm(o.sm), buf(std::exchange(o.buf, nullptr)), cg(o.cg)
, pending_updates(std::move(o.pending_updates)), flush_count(o.flush_count) {}
~compaction_buffer() {
if (buf) {
(void)buf->close().then([sm = &this->sm, buf = this->buf] {
buf->reset();
sm->_available_compaction_buffers.push_back(buf);
});
}
}
future<> flush() {
if (buf->has_data()) {
flush_count++;
co_await sm.write_full_segment(*buf, cg, write_source::compaction);
logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size());
}
co_await when_all_succeed(pending_updates.begin(), pending_updates.end());
co_await buf->close();
buf->reset();
pending_updates.clear();
}
future<> close() {
co_await flush();
sm._available_compaction_buffers.push_back(buf);
buf = nullptr;
}
// Rewrite a single live record into this buffer, updating the index atomically.
// Returns immediately after queuing the write; caller must co_await close()/flush()
// to ensure all pending updates complete.
future<> rewrite_record(primary_index& index, log_location read_location, log_record record,
size_t& records_rewritten, size_t& records_skipped) {
if (!index.is_record_alive(record.key, read_location)) {
records_skipped++;
co_return;
}
auto key = record.key;
log_record_writer writer(std::move(record));
if (!buf->can_fit(writer)) {
co_await flush();
}
auto write_and_update_index = buf->write(std::move(writer)).then_unpack(
[this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped]
(log_location new_location, seastar::gate::holder op) {
if (index.update_record_location(key, read_location, new_location)) {
sm.free_record(read_location);
records_rewritten++;
} else {
// another write updated this key
sm.free_record(new_location);
records_skipped++;
}
});
pending_updates.push_back(std::move(write_and_update_index));
}
};
future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::vector<log_segment_id> segments) {
logstor_logger.trace("Starting compaction of segments {} in compaction group {}:{}", segments, cg.schema()->id(), cg.group_id());
struct compaction_buffer {
segment_manager_impl& sm;
write_buffer* buf = nullptr;
compaction_group& cg;
std::vector<future<>> pending_updates;
size_t flush_count{0};
explicit compaction_buffer(segment_manager_impl& sm, compaction_group& cg)
: sm(sm), cg(cg)
{
if (sm._available_compaction_buffers.empty()) {
throw std::runtime_error("No available compaction buffers");
}
buf = sm._available_compaction_buffers.back();
sm._available_compaction_buffers.pop_back();
}
~compaction_buffer() {
if (buf) {
(void)buf->close().then([sm = &this->sm, buf = this->buf] {
buf->reset();
sm->_available_compaction_buffers.push_back(buf);
});
}
}
future<> flush() {
if (buf->has_data()) {
flush_count++;
co_await sm.write_full_segment(*buf, cg, write_source::compaction);
logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size());
}
co_await when_all_succeed(pending_updates.begin(), pending_updates.end());
co_await buf->close();
buf->reset();
pending_updates.clear();
}
future<> close() {
co_await flush();
sm._available_compaction_buffers.push_back(buf);
buf = nullptr;
}
};
compaction_buffer cb(_sm, cg);
size_t records_rewritten = 0;
@@ -1483,38 +1528,8 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve
auto& index = cg.get_logstor_index();
co_await _sm.for_each_record(segments,
[this, &index, &records_rewritten, &records_skipped, &cb]
(log_location read_location, log_record record) -> future<> {
if (!index.is_record_alive(record.key, read_location)) {
records_skipped++;
_stats.compaction_records_skipped++;
co_return;
}
auto key = record.key;
log_record_writer writer(std::move(record));
if (!cb.buf->can_fit(writer)) {
co_await cb.flush();
}
// write the record and then update the index with the new location
auto write_and_update_index = cb.buf->write(std::move(writer)).then_unpack(
[this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped]
(log_location new_location, seastar::gate::holder op) {
if (index.update_record_location(key, read_location, new_location)) {
_sm.free_record(read_location);
records_rewritten++;
} else {
// another write updated this key
_sm.free_record(new_location);
records_skipped++;
}
});
cb.pending_updates.push_back(std::move(write_and_update_index));
[&index, &records_rewritten, &records_skipped, &cb] (log_location read_location, log_record record) -> future<> {
co_await cb.rewrite_record(index, read_location, std::move(record), records_rewritten, records_skipped);
});
co_await cb.close();
@@ -1523,13 +1538,14 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve
records_rewritten, records_skipped, segments.size(), cb.flush_count);
// wait for read operations that use the old locations
co_await cg.get_logstor_index().await_pending_reads();
co_await index.await_pending_reads();
// Free the compacted segments
auto& ss = cg.logstor_segments();
for (auto seg_id : segments) {
logstor_logger.trace("Free segment {} by compaction", seg_id);
ss.remove_segment(_sm.get_segment_descriptor(seg_id));
auto& desc = _sm.get_segment_descriptor(seg_id);
ss.remove_segment(desc);
_sm.free_segment(seg_id);
}
@@ -1547,6 +1563,96 @@ void compaction_manager_impl::controller::update(size_t segment_write_count, siz
_compaction_overhead = 0.8 * _compaction_overhead + 0.2 * new_overhead;
}
future<> compaction_manager_impl::split_compaction(replica::table& t, compaction_group& src, mutation_writer::classify_by_token_group classifier) {
static constexpr size_t batch_size = 32;
// Disable compaction on src for the duration of the split.
// This waits for any running compaction on src to finish first.
auto compaction_disable_guard = co_await disable_compaction(src);
auto& src_segments = src.logstor_segments();
auto& index = t.logstor_index();
while (!src_segments._segments.empty()) {
// Collect candidate IDs without yielding to avoid iterator invalidation.
std::vector<log_segment_id> candidates;
candidates.reserve(batch_size);
for (const auto& cand_desc : src_segments._segments) {
candidates.push_back(_sm.desc_to_segment_id(cand_desc));
if (candidates.size() == batch_size) {
break;
}
}
// For each candidate, check whether it already belongs to a single group (fast path)
// or straddles the split boundary (slow path).
// Fast-path segments are moved to the correct child group immediately.
// Slow-path segments are collected into a batch for rewriting.
std::vector<log_segment_id> batch;
batch.reserve(candidates.size());
for (auto cand_seg_id : candidates) {
auto cand_hdr = co_await _sm.read_segment_header(cand_seg_id);
if (!cand_hdr || !std::holds_alternative<segment_header::full>(cand_hdr->v)) {
on_internal_error(logstor_logger, format("Invalid segment header for segment {} during split compaction", cand_seg_id));
}
auto& cand_seg_hdr = std::get<segment_header::full>(cand_hdr->v);
if (classifier(cand_seg_hdr.first_token) == classifier(cand_seg_hdr.last_token)) {
// Fast path: segment already belongs to a single group.
// Remove from src and add to the correct child group.
logstor_logger.trace("Fast path split segment {} with token range [{}, {}]", cand_seg_id, cand_seg_hdr.first_token, cand_seg_hdr.last_token);
auto& cand_desc = _sm.get_segment_descriptor(cand_seg_id);
src_segments.remove_segment(cand_desc);
if (!t.add_logstor_segment(cand_seg_id, cand_desc, cand_seg_hdr.first_token, cand_seg_hdr.last_token) || cand_desc.owner == &src_segments) {
on_internal_error(logstor_logger, format("Failed to add segment {} to table {} during split", cand_seg_id, t.schema()->id()));
}
} else {
batch.push_back(cand_seg_id);
}
}
if (batch.empty()) {
continue;
}
// Slow path: rewrite live records from straddling segments into two compaction_buffers
// (one per target group). Both buffers write back into src; the next outer loop
// iteration will fast-path the resulting single-group segments to the correct child group.
// Acquire _compaction_sem to be mutually exclusive with background compaction.
auto sem_units = co_await get_units(_compaction_sem, 1);
std::array<compaction_buffer, 2> bufs{compaction_buffer{_sm, src}, compaction_buffer{_sm, src}};
size_t records_rewritten = 0;
size_t records_skipped = 0;
co_await _sm.for_each_record(batch,
[&index, &classifier, &bufs, &records_rewritten, &records_skipped] (log_location read_location, log_record record) -> future<> {
auto& cb = bufs[classifier(record.key.dk.token())];
co_await cb.rewrite_record(index, read_location, std::move(record), records_rewritten, records_skipped);
});
for (auto& cb : bufs) {
co_await cb.close();
}
logstor_logger.debug("Split compaction: {} records rewritten, {} skipped from {} segments",
records_rewritten, records_skipped, batch.size());
// All records are safely written to new segments in src.
// Await pending reads before freeing the source segments.
co_await index.await_pending_reads();
// Remove and free the source segments of this batch.
for (auto seg_id : batch) {
logstor_logger.trace("Free segment {} by split", seg_id);
auto& desc = _sm.get_segment_descriptor(seg_id);
src_segments.remove_segment(desc);
_sm.free_segment(seg_id);
}
}
}
separator_buffer compaction_manager_impl::allocate_separator_buffer() {
if (_sm._available_separator_buffers.empty()) {
throw std::runtime_error("No available separator buffers");

View File

@@ -16,6 +16,7 @@
#include <seastar/core/queue.hh>
#include <seastar/core/shared_ptr.hh>
#include "bytes_fwd.hh"
#include "mutation_writer/token_group_based_splitting_writer.hh"
#include "replica/logstor/write_buffer.hh"
#include "types.hh"
#include "utils/updateable_value.hh"
@@ -23,6 +24,7 @@
namespace replica {
class database;
class table;
namespace logstor {

View File

@@ -1086,6 +1086,10 @@ future<> compaction_group::split(compaction::compaction_type_options::split opt,
co_await cm.perform_offstrategy(*view, tablet_split_task_info);
co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info);
}
if (_t.uses_logstor()) {
co_await get_logstor_compaction_manager().split_compaction(_t, *this, opt.classifier);
}
}
future<> compaction_group::discard_logstor_segments() {
@@ -1135,6 +1139,7 @@ future<> storage_group::split(compaction::compaction_type_options::split opt, ta
}
auto holder = cg->async_gate().hold();
co_await cg->flush();
co_await cg->flush_separator();
co_await cg->split(opt, tablet_split_task_info);
}
}