logstor: code cleanup

misc code cleanup and small changes
This commit is contained in:
Michael Litvak
2026-03-22 19:11:54 +01:00
parent 216d39883a
commit bf7bc5b410
10 changed files with 150 additions and 243 deletions

View File

@@ -882,14 +882,6 @@ public:
return make_mutation_reader(std::move(schema), std::move(permit), range, full_slice);
}
mutation_reader make_logstor_mutation_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
// The streaming mutation reader differs from the regular mutation reader in that:
// - Reflects all writes accepted by replica prior to creation of the
// reader and a _bounded_ amount of writes which arrive later.

View File

@@ -14,6 +14,8 @@
namespace replica::logstor {
extern seastar::logger logstor_logger;
constexpr log_heap_options segment_descriptor_hist_options(4 * 1024, 3, 128 * 1024);
struct segment_set;
@@ -67,6 +69,9 @@ struct segment_set {
size_t _segment_count{0};
void add_segment(segment_descriptor& desc) {
if (desc.owner) {
on_internal_error(logstor_logger, "add_segment called for segment that has an owner");
}
desc.owner = this;
_segments.push(desc);
++_segment_count;
@@ -77,6 +82,9 @@ struct segment_set {
}
void remove_segment(segment_descriptor& desc) {
if (desc.owner != this) {
on_internal_error(logstor_logger, "remove_segment called not from the owner");
}
_segments.erase(desc);
desc.owner = nullptr;
--_segment_count;

View File

@@ -90,6 +90,15 @@ public:
return std::nullopt;
}
bool is_record_alive(const primary_index_key& key, log_location location) {
auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema));
if (it != _partitions.end()) {
return it->_e.location == location;
} else {
return false;
}
}
std::optional<index_entry> exchange(const primary_index_key& key, index_entry new_entry) {
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint);

View File

@@ -52,6 +52,22 @@ size_t logstor::get_memory_usage() const {
return _segment_manager.get_memory_usage();
}
segment_manager& logstor::get_segment_manager() noexcept {
return _segment_manager;
}
const segment_manager& logstor::get_segment_manager() const noexcept {
return _segment_manager;
}
compaction_manager& logstor::get_compaction_manager() noexcept {
return _segment_manager.get_compaction_manager();
}
const compaction_manager& logstor::get_compaction_manager() const noexcept {
return _segment_manager.get_compaction_manager();
}
future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate::holder cg_holder) {
primary_index_key key(m.decorated_key());
table_id table = m.schema()->id();
@@ -127,22 +143,6 @@ future<std::optional<canonical_mutation>> logstor::read(const schema& s, const p
});
}
segment_manager& logstor::get_segment_manager() noexcept {
return _segment_manager;
}
const segment_manager& logstor::get_segment_manager() const noexcept {
return _segment_manager;
}
compaction_manager& logstor::get_compaction_manager() noexcept {
return _segment_manager.get_compaction_manager();
}
const compaction_manager& logstor::get_compaction_manager() const noexcept {
return _segment_manager.get_compaction_manager();
}
mutation_reader logstor::make_reader(schema_ptr schema,
const primary_index& index,
reader_permit permit,

View File

@@ -115,8 +115,7 @@ future<log_record> segment::read(log_location loc) {
// Read the serialized record
return _file.dma_read_exactly<char>(absolute_offset(loc.offset), loc.size).then([] (seastar::temporary_buffer<char> buf) {
seastar::simple_input_stream in(buf.begin(), buf.size());
return ser::deserialize(in, std::type_identity<log_record>{});
return ser::deserialize_from_buffer(buf, std::type_identity<log_record>{});
});
}
@@ -129,7 +128,6 @@ future<> writeable_segment::stop() {
co_return;
}
co_await _write_gate.close();
_seg_ref = {};
}
log_location writeable_segment::allocate(size_t data_size) {
@@ -408,16 +406,11 @@ public:
const stats& get_stats() const noexcept { return _stats; }
future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num);
separator_buffer allocate_separator_buffer() override;
future<> flush_separator_buffer(separator_buffer buf, compaction_group&) override;
private:
bool is_record_alive(primary_index&, const primary_index_key&, log_location);
bool update_record_location(primary_index&, const primary_index_key&, log_location old_loc, log_location new_loc);
void submit(compaction_group&) override;
future<> stop_ongoing_compactions(compaction_group&) override;
@@ -462,14 +455,13 @@ enum class write_source {
separator,
};
static constexpr size_t write_source_count = 3;
static sstring write_source_to_string(write_source src) {
switch (src) {
case write_source::normal_write:
return "normal_write";
case write_source::compaction:
return "compaction";
case write_source::separator:
return "separator";
case write_source::normal_write: return "normal_write";
case write_source::compaction: return "compaction";
case write_source::separator: return "separator";
}
return "unknown";
}
@@ -484,11 +476,8 @@ public:
struct stats {
uint64_t segments_put{0};
uint64_t segments_get{0};
uint64_t compaction_segments_get{0};
uint64_t normal_segments_get{0};
std::array<uint64_t, write_source_count> segments_get{0};
uint64_t normal_segments_wait{0};
uint64_t separator_segments_get{0};
} _stats;
segment_pool(size_t pool_size, size_t reserved_for_compaction)
@@ -514,12 +503,12 @@ public:
future<seg_ptr> get_segment(write_source src) {
seg_ptr seg;
switch (src) {
case write_source::compaction:
_stats.compaction_segments_get++;
if (src == write_source::compaction) {
_stats.segments_get[static_cast<size_t>(src)]++;
co_return co_await _segments.pop_eventually();
case write_source::normal_write:
if (_segments.size() <= _reserved_for_compaction) {
}
if (_segments.size() <= _reserved_for_compaction) {
if (src == write_source::normal_write) {
_stats.normal_segments_wait++;
}
while (_segments.size() <= _reserved_for_compaction) {
@@ -527,17 +516,9 @@ public:
return _segments.size() > _reserved_for_compaction;
});
}
_stats.normal_segments_get++;
co_return _segments.pop();
case write_source::separator:
while (_segments.size() <= _reserved_for_compaction) {
co_await _segment_available.wait([this] {
return _segments.size() > _reserved_for_compaction;
});
}
_stats.separator_segments_get++;
co_return _segments.pop();
}
_stats.segments_get[static_cast<size_t>(src)]++;
co_return _segments.pop();
}
size_t size() const noexcept {
@@ -553,8 +534,8 @@ class segment_manager_impl {
struct stats {
uint64_t segments_in_use{0};
uint64_t bytes_written{0};
uint64_t data_bytes_written{0};
std::array<uint64_t, write_source_count> bytes_written{0};
std::array<uint64_t, write_source_count> data_bytes_written{0};
uint64_t bytes_read{0};
uint64_t bytes_freed{0};
uint64_t segments_allocated{0};
@@ -617,8 +598,8 @@ public:
future<> start();
future<> stop();
future<log_location> write(write_buffer&);
future<log_location> write_full_segment(write_buffer&, compaction_group&, write_source);
future<> write(write_buffer&);
future<> write_full_segment(write_buffer&, compaction_group&, write_source);
future<log_record> read(log_location);
@@ -669,7 +650,7 @@ public:
future<> discard_segments(segment_set&);
size_t get_memory_usage() const {
return _cfg.max_separator_memory;
return _cfg.max_separator_memory + sizeof(_segment_descs);
}
future<> await_pending_writes() {
@@ -678,44 +659,15 @@ public:
private:
struct segment_allocation_guard {
segment_manager_impl& sm;
std::optional<log_segment_id> segment_id;
segment_allocation_guard(segment_manager_impl& sm, log_segment_id seg_id) noexcept
: sm(sm)
, segment_id(seg_id)
{}
segment_allocation_guard(segment_allocation_guard&& other) noexcept
: sm(other.sm)
, segment_id(std::exchange(other.segment_id, std::nullopt))
{}
segment_allocation_guard& operator=(segment_allocation_guard&& other) = delete;
segment_allocation_guard(const segment_allocation_guard&) = delete;
segment_allocation_guard& operator=(const segment_allocation_guard&) = delete;
~segment_allocation_guard() {
if (segment_id) {
sm._free_segments.push_back(*segment_id);
sm._segment_freed_cv.signal();
}
}
void release() noexcept {
segment_id.reset();
}
};
future<> replenish_reserve();
future<seg_ptr> allocate_segment();
future<> request_segment_switch();
future<> switch_active_segment();
std::chrono::microseconds calculate_separator_delay() const;
future<std::pair<segment_allocation_guard, seg_ptr>> allocate_and_create_new_segment();
future<segment_allocation_guard> allocate_segment();
future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num);
void write_to_separator(table&, log_location prev_loc, log_record, segment_ref);
segment_ref make_segment_ref(log_segment_id seg_id) {
return segment_ref(seg_id,
@@ -820,17 +772,17 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config)
sm::description("Counts number of segments in the segment pool.")),
sm::make_counter("segment_pool_segments_put", _segment_pool.get_stats().segments_put,
sm::description("Counts number of segments returned to the segment pool.")),
sm::make_counter("segment_pool_normal_segments_get", _segment_pool.get_stats().normal_segments_get,
sm::make_counter("segment_pool_normal_segments_get", _segment_pool.get_stats().segments_get[static_cast<size_t>(write_source::normal_write)],
sm::description("Counts number of segments taken from the segment pool for normal writes.")),
sm::make_counter("segment_pool_compaction_segments_get", _segment_pool.get_stats().compaction_segments_get,
sm::make_counter("segment_pool_compaction_segments_get", _segment_pool.get_stats().segments_get[static_cast<size_t>(write_source::compaction)],
sm::description("Counts number of segments taken from the segment pool for compaction.")),
sm::make_counter("segment_pool_separator_segments_get", _segment_pool.get_stats().separator_segments_get,
sm::make_counter("segment_pool_separator_segments_get", _segment_pool.get_stats().segments_get[static_cast<size_t>(write_source::separator)],
sm::description("Counts number of segments taken from the segment pool for separator writes.")),
sm::make_counter("segment_pool_normal_segments_wait", _segment_pool.get_stats().normal_segments_wait,
sm::description("Counts number of times normal writes had to wait for a segment to become available in the segment pool.")),
sm::make_counter("bytes_written", _stats.bytes_written,
sm::make_counter("bytes_written", _stats.bytes_written[static_cast<size_t>(write_source::normal_write)],
sm::description("Counts number of bytes written to the disk.")),
sm::make_counter("data_bytes_written", _stats.data_bytes_written,
sm::make_counter("data_bytes_written", _stats.data_bytes_written[static_cast<size_t>(write_source::normal_write)],
sm::description("Counts number of data bytes written to the disk.")),
sm::make_counter("bytes_read", _stats.bytes_read,
sm::description("Counts number of bytes read from the disk.")),
@@ -842,9 +794,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config)
sm::description("Counts number of segments freed.")),
sm::make_gauge("disk_usage", [this] { return _file_mgr.allocated_file_count() * _cfg.file_size; },
sm::description("Total disk usage.")),
sm::make_counter("compaction_bytes_written", _stats.compaction_bytes_written,
sm::make_counter("compaction_bytes_written", _stats.bytes_written[static_cast<size_t>(write_source::compaction)],
sm::description("Counts number of bytes written to the disk by compaction.")),
sm::make_counter("compaction_data_bytes_written", _stats.compaction_data_bytes_written,
sm::make_counter("compaction_data_bytes_written", _stats.data_bytes_written[static_cast<size_t>(write_source::compaction)],
sm::description("Counts number of data bytes written to the disk by compaction.")),
sm::make_counter("segments_compacted", _compaction_mgr.get_stats().segments_compacted,
sm::description("Counts number of segments compacted.")),
@@ -854,9 +806,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config)
sm::description("Counts number of records skipped during compaction.")),
sm::make_counter("compaction_records_rewritten", _compaction_mgr.get_stats().compaction_records_rewritten,
sm::description("Counts number of records rewritten during compaction.")),
sm::make_counter("separator_bytes_written", _stats.separator_bytes_written,
sm::make_counter("separator_bytes_written", _stats.bytes_written[static_cast<size_t>(write_source::separator)],
sm::description("Counts number of bytes written to the separator.")),
sm::make_counter("separator_data_bytes_written", _stats.separator_data_bytes_written,
sm::make_counter("separator_data_bytes_written", _stats.data_bytes_written[static_cast<size_t>(write_source::separator)],
sm::description("Counts number of data bytes written to the separator.")),
sm::make_counter("separator_buffer_flushed", _compaction_mgr.get_stats().separator_buffer_flushed,
sm::description("Counts number of times the separator buffer has been flushed.")),
@@ -912,9 +864,9 @@ future<> segment_manager_impl::stop() {
logstor_logger.info("Segment manager stopped");
}
future<log_location> segment_manager_impl::write(write_buffer& wb) {
future<> segment_manager_impl::write(write_buffer& wb) {
write_source source = write_source::normal_write;
auto holder = _async_gate.hold();
auto write_op = _writes_phaser.start();
wb.finalize(block_alignment);
@@ -928,13 +880,17 @@ future<log_location> segment_manager_impl::write(write_buffer& wb) {
co_await request_segment_switch();
}
log_location loc;
{
seg_ptr seg = _active_segment;
auto seg_holder = seg->hold();
auto seg_ref = seg->ref();
auto segment_seq_num = _segment_seq_num;
auto write_op = _writes_phaser.start();
// if we wrote a record to the segment but failed to write it to the separator, the segment should not be freed.
auto write_to_separator_failed = defer([seg_ref] mutable {
seg_ref.set_flush_failure();
});
auto loc = seg->allocate(data.size());
auto& desc = get_segment_descriptor(loc);
@@ -945,27 +901,26 @@ future<log_location> segment_manager_impl::write(write_buffer& wb) {
desc.on_write(wb.get_net_data_size(), wb.get_record_count());
_stats.bytes_written += data.size();
_stats.data_bytes_written += wb.get_net_data_size();
_stats.bytes_written[static_cast<size_t>(source)] += data.size();
_stats.data_bytes_written[static_cast<size_t>(source)] += wb.get_net_data_size();
// complete all buffered writes with their individual locations and wait
// for them to be updated in the index.
co_await wb.complete_writes(loc);
co_await with_scheduling_group(_cfg.separator_sg, [&] {
return _compaction_mgr.write_to_separator(wb, std::move(seg_ref), segment_seq_num);
return write_to_separator(wb, std::move(seg_ref), segment_seq_num);
});
write_to_separator_failed.cancel();
}
// flow control for separator debt
if (auto separator_delay = calculate_separator_delay(); separator_delay.count() > 0) {
co_await seastar::sleep(separator_delay);
}
co_return loc;
}
future<log_location> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_group& cg, write_source source) {
future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_group& cg, write_source source) {
auto holder = _async_gate.hold();
wb.finalize(block_alignment);
@@ -973,7 +928,7 @@ future<log_location> segment_manager_impl::write_full_segment(write_buffer& wb,
bytes_view data(reinterpret_cast<const int8_t*>(wb.data()), wb.offset_in_buffer());
if (data.size() > _cfg.segment_size) {
throw std::runtime_error(fmt::format( "Write size {} exceeds segment size {}", data.size(), _cfg.segment_size));
throw std::runtime_error(fmt::format("Write size {} exceeds segment size {}", data.size(), _cfg.segment_size));
}
seg_ptr seg = co_await _segment_pool.get_segment(source);
@@ -989,26 +944,13 @@ future<log_location> segment_manager_impl::write_full_segment(write_buffer& wb,
desc.on_write(wb.get_net_data_size(), wb.get_record_count());
switch (source) {
case write_source::separator:
_stats.separator_bytes_written += data.size();
_stats.separator_data_bytes_written += wb.get_net_data_size();
break;
case write_source::compaction:
_stats.compaction_bytes_written += data.size();
_stats.compaction_data_bytes_written += wb.get_net_data_size();
break;
default:
_stats.bytes_written += data.size();
_stats.data_bytes_written += wb.get_net_data_size();
break;
}
_stats.bytes_written[static_cast<size_t>(source)] += data.size();
_stats.data_bytes_written[static_cast<size_t>(source)] += wb.get_net_data_size();
co_await wb.complete_writes(loc);
co_await seg->stop();
cg.add_logstor_segment(desc);
co_return loc;
cg.add_logstor_segment(desc);
}
void segment_manager_impl::free_record(log_location location) {
@@ -1074,9 +1016,8 @@ future<> segment_manager_impl::replenish_reserve() {
while (true) {
bool retry = false;
try {
auto [seg_guard, seg] = co_await allocate_and_create_new_segment();
auto seg = co_await allocate_segment();
co_await _segment_pool.put(std::move(seg));
seg_guard.release();
} catch (abort_requested_exception&) {
logstor_logger.debug("Reserve replenisher stopping due to abort");
break;
@@ -1097,27 +1038,27 @@ future<> segment_manager_impl::replenish_reserve() {
logstor_logger.debug("Reserve replenisher stopped");
}
future<std::pair<segment_manager_impl::segment_allocation_guard, seg_ptr>>
segment_manager_impl::allocate_and_create_new_segment() {
auto seg_guard = co_await allocate_segment();
future<seg_ptr> segment_manager_impl::allocate_segment() {
auto make_segment = [this] (log_segment_id seg_id) -> future<seg_ptr> {
try {
auto seg_loc = segment_id_to_file_location(seg_id);
auto file = co_await _file_mgr.get_file_for_write(seg_loc.file_id);
auto seg = make_lw_shared<writeable_segment>(seg_id, std::move(file), seg_loc.file_offset, _cfg.segment_size);
get_segment_descriptor(seg_id).reset(_cfg.segment_size);
_stats.segments_allocated++;
co_return std::move(seg);
} catch (...) {
_free_segments.push_back(seg_id);
_segment_freed_cv.signal();
throw;
}
};
auto seg_id = *seg_guard.segment_id;
auto seg_loc = segment_id_to_file_location(seg_id);
auto file = co_await _file_mgr.get_file_for_write(seg_loc.file_id);
auto seg = make_lw_shared<writeable_segment>(seg_id, std::move(file), seg_loc.file_offset, _cfg.segment_size);
get_segment_descriptor(seg_id).reset(_cfg.segment_size);
_stats.segments_allocated++;
co_return std::make_pair(std::move(seg_guard), std::move(seg));
}
future<segment_manager_impl::segment_allocation_guard>
segment_manager_impl::allocate_segment() {
while (true) {
// first, allocate all new segments sequentially
if (_next_new_segment_id < _max_segments) {
auto seg_id = log_segment_id(_next_new_segment_id++);
co_return segment_allocation_guard(*this, seg_id);
co_return co_await make_segment(seg_id);
}
if (_free_segments.size() < _max_segments * trigger_compaction_threshold / 100) {
@@ -1128,7 +1069,7 @@ segment_manager_impl::allocate_segment() {
if (!_free_segments.empty()) {
auto seg_id = _free_segments.front();
_free_segments.pop_front();
co_return segment_allocation_guard(*this, seg_id);
co_return co_await make_segment(seg_id);
}
// no free segments - wait for a segment to be freed.
@@ -1145,7 +1086,7 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept {
auto& desc = get_segment_descriptor(segment_id);
if (desc.net_data_size(_cfg.segment_size) != 0) {
logstor_logger.error("Freeing segment {} that has data", segment_id);
on_internal_error(logstor_logger, format("Freeing segment {} that has data", segment_id));
}
desc.on_free_segment();
@@ -1209,8 +1150,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id,
if (buffer_header_buf.size() < write_buffer::buffer_header_size) {
break;
}
seastar::simple_memory_input_stream buffer_header_stream(buffer_header_buf.get(), buffer_header_buf.size());
auto bh = ser::deserialize(buffer_header_stream, std::type_identity<write_buffer::buffer_header>{});
auto bh = ser::deserialize_from_buffer(buffer_header_buf, std::type_identity<write_buffer::buffer_header>{});
// if buffer header is not valid, skip to next block
if (bh.magic != write_buffer::buffer_header_magic) {
@@ -1231,8 +1171,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id,
if (size_buf.size() < write_buffer::record_header_size) {
break;
}
seastar::simple_memory_input_stream size_stream(size_buf.get(), size_buf.size());
auto rh = ser::deserialize(size_stream, std::type_identity<write_buffer::record_header>{});
auto rh = ser::deserialize_from_buffer(size_buf, std::type_identity<write_buffer::record_header>{});
if (rh.data_size == 0) {
// End of records in this block
break;
@@ -1248,8 +1187,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id,
break;
}
seastar::simple_memory_input_stream record_stream(record_buf.get(), record_buf.size());
auto record = ser::deserialize(record_stream, std::type_identity<log_record>{});
auto record = ser::deserialize_from_buffer(record_buf, std::type_identity<log_record>{});
log_location loc {
.segment = segment_id,
@@ -1285,16 +1223,6 @@ future<> segment_manager_impl::for_each_record(const std::vector<log_segment_id>
}
}
bool compaction_manager_impl::is_record_alive(primary_index& index, const primary_index_key& key, log_location loc) {
return index.get(key)
.transform([loc] (const index_entry& e) { return e.location == loc; })
.value_or(false);
}
bool compaction_manager_impl::update_record_location(primary_index& index, const primary_index_key& key, log_location old_loc, log_location new_loc) {
return index.update_record_location(key, old_loc, new_loc);
}
void compaction_manager_impl::submit(compaction_group& cg) {
if (_async_gate.is_closed() || !_cfg.compaction_enabled) {
return;
@@ -1419,10 +1347,10 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve
future<> flush() {
if (buf->has_data()) {
flush_count++;
auto base_location = co_await sm.write_full_segment(*buf, cg, write_source::compaction);
co_await when_all_succeed(pending_updates.begin(), pending_updates.end());
logstor_logger.trace("Compaction buffer flushed to {} with {} bytes", base_location, buf->get_net_data_size());
co_await sm.write_full_segment(*buf, cg, write_source::compaction);
logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size());
}
co_await when_all_succeed(pending_updates.begin(), pending_updates.end());
co_await buf->close();
buf->reset();
pending_updates.clear();
@@ -1440,11 +1368,13 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve
size_t records_rewritten = 0;
size_t records_skipped = 0;
auto& index = cg.get_logstor_index();
co_await _sm.for_each_record(segments,
[this, &cg, &records_rewritten, &records_skipped, &cb]
[this, &index, &records_rewritten, &records_skipped, &cb]
(log_location read_location, log_record record) -> future<> {
if (!is_record_alive(cg.get_logstor_index(), record.key, read_location)) {
if (!index.is_record_alive(record.key, read_location)) {
records_skipped++;
_stats.compaction_records_skipped++;
co_return;
@@ -1459,10 +1389,10 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve
// write the record and then update the index with the new location
auto write_and_update_index = cb.buf->write(std::move(writer)).then_unpack(
[this, &cg, key = std::move(key), read_location, &records_rewritten, &records_skipped]
[this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped]
(log_location new_location, seastar::gate::holder op) {
if (update_record_location(cg.get_logstor_index(), key, read_location, new_location)) {
if (index.update_record_location(key, read_location, new_location)) {
_sm.free_record(read_location);
records_rewritten++;
} else {
@@ -1506,7 +1436,6 @@ void compaction_manager_impl::controller::update(size_t segment_write_count, siz
}
separator_buffer compaction_manager_impl::allocate_separator_buffer() {
// TODO really ensure we have enough buffers
if (_sm._available_separator_buffers.empty()) {
throw std::runtime_error("No available separator buffers");
}
@@ -1516,7 +1445,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() {
return separator_buffer(wb);
}
future<> compaction_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) {
future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) {
for (auto&& w : wb.records()) {
co_await coroutine::maybe_yield();
@@ -1538,17 +1467,38 @@ future<> compaction_manager_impl::write_to_separator(write_buffer& wb, segment_r
buf.pending_updates.push_back(
buf.write(std::move(w.writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) {
if (update_record_location(index, key, prev_loc, new_loc)) {
_sm.free_record(prev_loc);
if (index.update_record_location(key, prev_loc, new_loc)) {
free_record(prev_loc);
} else {
_sm.free_record(new_loc);
free_record(new_loc);
}
return make_ready_future<>();
}
));
}
}
void segment_manager_impl::write_to_separator(table& t, log_location prev_loc, log_record record, segment_ref seg_ref) {
auto key = record.key;
auto& index = t.logstor_index();
log_record_writer writer(std::move(record));
auto& buf = t.get_logstor_separator_buffer(key.dk.token(), writer.size());
if (buf.held_segments.empty() || buf.held_segments.back().id() != seg_ref.id()) {
buf.held_segments.push_back(seg_ref);
}
buf.pending_updates.push_back(
buf.write(std::move(writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) {
if (index.update_record_location(key, prev_loc, new_loc)) {
free_record(prev_loc);
} else {
free_record(new_loc);
}
})
);
}
future<> compaction_manager_impl::flush_separator_buffer(separator_buffer buf, compaction_group& cg) {
logstor_logger.trace("Flushing separator buffer with {} bytes", buf.buf->offset_in_buffer());
@@ -1645,6 +1595,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
if (!tp->uses_logstor()) {
co_return;
}
logstor_logger.info("Table {}.{} has {} entries in logstor index", tp->schema()->ks_name(), tp->schema()->cf_name(), tp->logstor_index().get_key_count());
for (const auto& entry : tp->logstor_index()) {
used_segments.set(entry.entry().location.segment.value);
co_await coroutine::maybe_yield();
@@ -1743,11 +1694,7 @@ future<std::optional<segment_generation>> segment_manager_impl::recover_segment_
continue;
}
seastar::simple_memory_input_stream buffer_header_stream(
buffer_header_buf.begin(), buffer_header_buf.size());
auto bh = ser::deserialize(buffer_header_stream,
std::type_identity<write_buffer::buffer_header>{});
auto bh = ser::deserialize_from_buffer(buffer_header_buf, std::type_identity<write_buffer::buffer_header>{});
if (bh.magic != write_buffer::buffer_header_magic) {
continue;
@@ -1771,7 +1718,7 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database
if (!t.uses_logstor()) {
co_return;
}
if (t.logstor_index().get(record.key).transform([](auto&& entry) { return entry.location; }) == loc) {
if (t.logstor_index().is_record_alive(record.key, loc)) {
co_await callback(loc, std::move(record), t);
}
} catch(const replica::no_such_column_family&) {
@@ -1828,30 +1775,14 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database
if (need_separator) {
auto seg_ref = make_segment_ref(seg_id);
auto write_to_separator_failed = defer([seg_ref] mutable {
seg_ref.set_flush_failure();
});
co_await for_each_live_record([this, seg_ref] (log_location prev_loc, log_record record, table& t) -> future<> {
auto key = record.key;
auto& index = t.logstor_index();
log_record_writer writer(std::move(record));
auto& buf = t.get_logstor_separator_buffer(key.dk.token(), writer.size());
if (buf.held_segments.empty() || buf.held_segments.back().id() != seg_ref.id()) {
buf.held_segments.push_back(seg_ref);
}
buf.pending_updates.push_back(
buf.write(std::move(writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) {
if (index.update_record_location(key, prev_loc, new_loc)) {
free_record(prev_loc);
} else {
free_record(new_loc);
}
return make_ready_future<>();
}
));
write_to_separator(t, prev_loc, std::move(record), seg_ref);
return make_ready_future<>();
});
write_to_separator_failed.cancel();
}
}
@@ -1883,7 +1814,7 @@ future<> segment_manager::stop() {
return _impl->stop();
}
future<log_location> segment_manager::write(write_buffer& wb) {
future<> segment_manager::write(write_buffer& wb) {
return _impl->write(wb);
}
@@ -1895,11 +1826,6 @@ void segment_manager::free_record(log_location location) {
_impl->free_record(location);
}
future<> segment_manager::for_each_record(const std::vector<log_segment_id>& segments,
std::function<future<>(log_location, log_record)> callback) {
return _impl->for_each_record(std::move(segments), std::move(callback));
}
void segment_manager::set_trigger_compaction_hook(std::function<void()> fn) {
_impl->set_trigger_compaction_hook(std::move(fn));
}

View File

@@ -97,15 +97,12 @@ public:
future<> start();
future<> stop();
future<log_location> write(write_buffer& wb);
future<> write(write_buffer& wb);
future<log_record> read(log_location location);
void free_record(log_location location);
future<> for_each_record(const std::vector<log_segment_id>& segments,
std::function<future<>(log_location, log_record)> callback);
compaction_manager& get_compaction_manager() noexcept;
const compaction_manager& get_compaction_manager() const noexcept;

View File

@@ -128,14 +128,6 @@ future<log_location_with_holder> write_buffer::write(log_record_writer writer, c
});
}
future<log_location> write_buffer::write_no_holder(log_record_writer writer) {
// write and leave the gate immediately after the write.
// use carefully when the gate it not needed.
return write(std::move(writer)).then_unpack([] (log_location loc, seastar::gate::holder op) {
return loc;
});
}
void write_buffer::pad_to_alignment(size_t alignment) {
auto current_pos = offset_in_buffer();
auto next_pos = align_up(current_pos, alignment);

View File

@@ -174,12 +174,6 @@ public:
return write(std::move(writer), nullptr, {});
}
// Write a record to the buffer.
// Returns a future that will be resolved with the log location once flushed.
// If there are follow-up operations to the write such as index updates then consider
// using write_with_holder instead to keep the write buffer open until those operations are complete.
future<log_location> write_no_holder(log_record_writer);
static size_t estimate_required_segments(size_t net_data_size, size_t record_count, size_t segment_size);
private:

View File

@@ -217,17 +217,6 @@ table::add_memtables_to_reader_list(std::vector<mutation_reader>& readers,
}
}
mutation_reader
table::make_logstor_mutation_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
return _logstor->make_reader(std::move(s), logstor_index(), std::move(permit), pr, slice, std::move(trace_state));
}
mutation_reader
table::make_mutation_reader(schema_ptr s,
reader_permit permit,
@@ -241,7 +230,7 @@ table::make_mutation_reader(schema_ptr s,
}
if (_logstor) [[unlikely]] {
return make_logstor_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
return _logstor->make_reader(s, logstor_index(), std::move(permit), range, slice, std::move(trace_state));
}
std::vector<mutation_reader> readers;

View File

@@ -370,7 +370,7 @@ async def test_compaction(manager: ManagerClient):
@pytest.mark.asyncio
async def test_drop_table(manager: ManagerClient):
"""
Test log compaction by creating dead data and verifying space reclamation.
Test that DROP TABLE works properly with logstor tables.
"""
cmdline = ['--logger-log-level', 'logstor=trace']
cfg = {'enable_logstor': True, 'experimental_features': ['logstor']}