diff --git a/replica/database.hh b/replica/database.hh index f764e60d6d..4203d0242a 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -882,14 +882,6 @@ public: return make_mutation_reader(std::move(schema), std::move(permit), range, full_slice); } - mutation_reader make_logstor_mutation_reader(schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; - // The streaming mutation reader differs from the regular mutation reader in that: // - Reflects all writes accepted by replica prior to creation of the // reader and a _bounded_ amount of writes which arrive later. diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index c42638cb21..9c3f63fee8 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -14,6 +14,8 @@ namespace replica::logstor { +extern seastar::logger logstor_logger; + constexpr log_heap_options segment_descriptor_hist_options(4 * 1024, 3, 128 * 1024); struct segment_set; @@ -67,6 +69,9 @@ struct segment_set { size_t _segment_count{0}; void add_segment(segment_descriptor& desc) { + if (desc.owner) { + on_internal_error(logstor_logger, "add_segment called for segment that has an owner"); + } desc.owner = this; _segments.push(desc); ++_segment_count; @@ -77,6 +82,9 @@ struct segment_set { } void remove_segment(segment_descriptor& desc) { + if (desc.owner != this) { + on_internal_error(logstor_logger, "remove_segment called not from the owner"); + } _segments.erase(desc); desc.owner = nullptr; --_segment_count; diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index 5f9073e300..d7301f34e0 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -90,6 +90,15 @@ public: return std::nullopt; } + bool is_record_alive(const primary_index_key& key, log_location location) { + auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema)); + if (it != _partitions.end()) { + return it->_e.location == location; + } else { + return false; + } + } + std::optional exchange(const primary_index_key& key, index_entry new_entry) { partitions_type::bound_hint hint; auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 4542885d6c..566dbadecb 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -52,6 +52,22 @@ size_t logstor::get_memory_usage() const { return _segment_manager.get_memory_usage(); } +segment_manager& logstor::get_segment_manager() noexcept { + return _segment_manager; +} + +const segment_manager& logstor::get_segment_manager() const noexcept { + return _segment_manager; +} + +compaction_manager& logstor::get_compaction_manager() noexcept { + return _segment_manager.get_compaction_manager(); +} + +const compaction_manager& logstor::get_compaction_manager() const noexcept { + return _segment_manager.get_compaction_manager(); +} + future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate::holder cg_holder) { primary_index_key key(m.decorated_key()); table_id table = m.schema()->id(); @@ -127,22 +143,6 @@ future> logstor::read(const schema& s, const p }); } -segment_manager& logstor::get_segment_manager() noexcept { - return _segment_manager; -} - -const segment_manager& logstor::get_segment_manager() const noexcept { - return _segment_manager; -} - -compaction_manager& logstor::get_compaction_manager() noexcept { - return _segment_manager.get_compaction_manager(); -} - -const compaction_manager& logstor::get_compaction_manager() const noexcept { - return _segment_manager.get_compaction_manager(); -} - mutation_reader logstor::make_reader(schema_ptr schema, const primary_index& index, reader_permit permit, diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 1e8f84919d..bc34abecde 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -115,8 +115,7 @@ future segment::read(log_location loc) { // Read the serialized record return _file.dma_read_exactly(absolute_offset(loc.offset), loc.size).then([] (seastar::temporary_buffer buf) { - seastar::simple_input_stream in(buf.begin(), buf.size()); - return ser::deserialize(in, std::type_identity{}); + return ser::deserialize_from_buffer(buf, std::type_identity{}); }); } @@ -129,7 +128,6 @@ future<> writeable_segment::stop() { co_return; } co_await _write_gate.close(); - _seg_ref = {}; } log_location writeable_segment::allocate(size_t data_size) { @@ -408,16 +406,11 @@ public: const stats& get_stats() const noexcept { return _stats; } - future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num); - separator_buffer allocate_separator_buffer() override; future<> flush_separator_buffer(separator_buffer buf, compaction_group&) override; private: - bool is_record_alive(primary_index&, const primary_index_key&, log_location); - bool update_record_location(primary_index&, const primary_index_key&, log_location old_loc, log_location new_loc); - void submit(compaction_group&) override; future<> stop_ongoing_compactions(compaction_group&) override; @@ -462,14 +455,13 @@ enum class write_source { separator, }; +static constexpr size_t write_source_count = 3; + static sstring write_source_to_string(write_source src) { switch (src) { - case write_source::normal_write: - return "normal_write"; - case write_source::compaction: - return "compaction"; - case write_source::separator: - return "separator"; + case write_source::normal_write: return "normal_write"; + case write_source::compaction: return "compaction"; + case write_source::separator: return "separator"; } return "unknown"; } @@ -484,11 +476,8 @@ public: struct stats { uint64_t segments_put{0}; - uint64_t segments_get{0}; - uint64_t compaction_segments_get{0}; - uint64_t normal_segments_get{0}; + std::array segments_get{0}; uint64_t normal_segments_wait{0}; - uint64_t separator_segments_get{0}; } _stats; segment_pool(size_t pool_size, size_t reserved_for_compaction) @@ -514,12 +503,12 @@ public: future get_segment(write_source src) { seg_ptr seg; - switch (src) { - case write_source::compaction: - _stats.compaction_segments_get++; + if (src == write_source::compaction) { + _stats.segments_get[static_cast(src)]++; co_return co_await _segments.pop_eventually(); - case write_source::normal_write: - if (_segments.size() <= _reserved_for_compaction) { + } + if (_segments.size() <= _reserved_for_compaction) { + if (src == write_source::normal_write) { _stats.normal_segments_wait++; } while (_segments.size() <= _reserved_for_compaction) { @@ -527,17 +516,9 @@ public: return _segments.size() > _reserved_for_compaction; }); } - _stats.normal_segments_get++; - co_return _segments.pop(); - case write_source::separator: - while (_segments.size() <= _reserved_for_compaction) { - co_await _segment_available.wait([this] { - return _segments.size() > _reserved_for_compaction; - }); - } - _stats.separator_segments_get++; - co_return _segments.pop(); } + _stats.segments_get[static_cast(src)]++; + co_return _segments.pop(); } size_t size() const noexcept { @@ -553,8 +534,8 @@ class segment_manager_impl { struct stats { uint64_t segments_in_use{0}; - uint64_t bytes_written{0}; - uint64_t data_bytes_written{0}; + std::array bytes_written{0}; + std::array data_bytes_written{0}; uint64_t bytes_read{0}; uint64_t bytes_freed{0}; uint64_t segments_allocated{0}; @@ -617,8 +598,8 @@ public: future<> start(); future<> stop(); - future write(write_buffer&); - future write_full_segment(write_buffer&, compaction_group&, write_source); + future<> write(write_buffer&); + future<> write_full_segment(write_buffer&, compaction_group&, write_source); future read(log_location); @@ -669,7 +650,7 @@ public: future<> discard_segments(segment_set&); size_t get_memory_usage() const { - return _cfg.max_separator_memory; + return _cfg.max_separator_memory + sizeof(_segment_descs); } future<> await_pending_writes() { @@ -678,44 +659,15 @@ public: private: - struct segment_allocation_guard { - segment_manager_impl& sm; - std::optional segment_id; - - segment_allocation_guard(segment_manager_impl& sm, log_segment_id seg_id) noexcept - : sm(sm) - , segment_id(seg_id) - {} - - segment_allocation_guard(segment_allocation_guard&& other) noexcept - : sm(other.sm) - , segment_id(std::exchange(other.segment_id, std::nullopt)) - {} - - segment_allocation_guard& operator=(segment_allocation_guard&& other) = delete; - segment_allocation_guard(const segment_allocation_guard&) = delete; - segment_allocation_guard& operator=(const segment_allocation_guard&) = delete; - - ~segment_allocation_guard() { - if (segment_id) { - sm._free_segments.push_back(*segment_id); - sm._segment_freed_cv.signal(); - } - } - - void release() noexcept { - segment_id.reset(); - } - }; - future<> replenish_reserve(); + future allocate_segment(); future<> request_segment_switch(); future<> switch_active_segment(); std::chrono::microseconds calculate_separator_delay() const; - future> allocate_and_create_new_segment(); - future allocate_segment(); + future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num); + void write_to_separator(table&, log_location prev_loc, log_record, segment_ref); segment_ref make_segment_ref(log_segment_id seg_id) { return segment_ref(seg_id, @@ -820,17 +772,17 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config) sm::description("Counts number of segments in the segment pool.")), sm::make_counter("segment_pool_segments_put", _segment_pool.get_stats().segments_put, sm::description("Counts number of segments returned to the segment pool.")), - sm::make_counter("segment_pool_normal_segments_get", _segment_pool.get_stats().normal_segments_get, + sm::make_counter("segment_pool_normal_segments_get", _segment_pool.get_stats().segments_get[static_cast(write_source::normal_write)], sm::description("Counts number of segments taken from the segment pool for normal writes.")), - sm::make_counter("segment_pool_compaction_segments_get", _segment_pool.get_stats().compaction_segments_get, + sm::make_counter("segment_pool_compaction_segments_get", _segment_pool.get_stats().segments_get[static_cast(write_source::compaction)], sm::description("Counts number of segments taken from the segment pool for compaction.")), - sm::make_counter("segment_pool_separator_segments_get", _segment_pool.get_stats().separator_segments_get, + sm::make_counter("segment_pool_separator_segments_get", _segment_pool.get_stats().segments_get[static_cast(write_source::separator)], sm::description("Counts number of segments taken from the segment pool for separator writes.")), sm::make_counter("segment_pool_normal_segments_wait", _segment_pool.get_stats().normal_segments_wait, sm::description("Counts number of times normal writes had to wait for a segment to become available in the segment pool.")), - sm::make_counter("bytes_written", _stats.bytes_written, + sm::make_counter("bytes_written", _stats.bytes_written[static_cast(write_source::normal_write)], sm::description("Counts number of bytes written to the disk.")), - sm::make_counter("data_bytes_written", _stats.data_bytes_written, + sm::make_counter("data_bytes_written", _stats.data_bytes_written[static_cast(write_source::normal_write)], sm::description("Counts number of data bytes written to the disk.")), sm::make_counter("bytes_read", _stats.bytes_read, sm::description("Counts number of bytes read from the disk.")), @@ -842,9 +794,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config) sm::description("Counts number of segments freed.")), sm::make_gauge("disk_usage", [this] { return _file_mgr.allocated_file_count() * _cfg.file_size; }, sm::description("Total disk usage.")), - sm::make_counter("compaction_bytes_written", _stats.compaction_bytes_written, + sm::make_counter("compaction_bytes_written", _stats.bytes_written[static_cast(write_source::compaction)], sm::description("Counts number of bytes written to the disk by compaction.")), - sm::make_counter("compaction_data_bytes_written", _stats.compaction_data_bytes_written, + sm::make_counter("compaction_data_bytes_written", _stats.data_bytes_written[static_cast(write_source::compaction)], sm::description("Counts number of data bytes written to the disk by compaction.")), sm::make_counter("segments_compacted", _compaction_mgr.get_stats().segments_compacted, sm::description("Counts number of segments compacted.")), @@ -854,9 +806,9 @@ segment_manager_impl::segment_manager_impl(segment_manager_config config) sm::description("Counts number of records skipped during compaction.")), sm::make_counter("compaction_records_rewritten", _compaction_mgr.get_stats().compaction_records_rewritten, sm::description("Counts number of records rewritten during compaction.")), - sm::make_counter("separator_bytes_written", _stats.separator_bytes_written, + sm::make_counter("separator_bytes_written", _stats.bytes_written[static_cast(write_source::separator)], sm::description("Counts number of bytes written to the separator.")), - sm::make_counter("separator_data_bytes_written", _stats.separator_data_bytes_written, + sm::make_counter("separator_data_bytes_written", _stats.data_bytes_written[static_cast(write_source::separator)], sm::description("Counts number of data bytes written to the separator.")), sm::make_counter("separator_buffer_flushed", _compaction_mgr.get_stats().separator_buffer_flushed, sm::description("Counts number of times the separator buffer has been flushed.")), @@ -912,9 +864,9 @@ future<> segment_manager_impl::stop() { logstor_logger.info("Segment manager stopped"); } -future segment_manager_impl::write(write_buffer& wb) { +future<> segment_manager_impl::write(write_buffer& wb) { + write_source source = write_source::normal_write; auto holder = _async_gate.hold(); - auto write_op = _writes_phaser.start(); wb.finalize(block_alignment); @@ -928,13 +880,17 @@ future segment_manager_impl::write(write_buffer& wb) { co_await request_segment_switch(); } - log_location loc; - { seg_ptr seg = _active_segment; auto seg_holder = seg->hold(); auto seg_ref = seg->ref(); auto segment_seq_num = _segment_seq_num; + auto write_op = _writes_phaser.start(); + + // if we wrote a record to the segment but failed to write it to the separator, the segment should not be freed. + auto write_to_separator_failed = defer([seg_ref] mutable { + seg_ref.set_flush_failure(); + }); auto loc = seg->allocate(data.size()); auto& desc = get_segment_descriptor(loc); @@ -945,27 +901,26 @@ future segment_manager_impl::write(write_buffer& wb) { desc.on_write(wb.get_net_data_size(), wb.get_record_count()); - _stats.bytes_written += data.size(); - _stats.data_bytes_written += wb.get_net_data_size(); + _stats.bytes_written[static_cast(source)] += data.size(); + _stats.data_bytes_written[static_cast(source)] += wb.get_net_data_size(); // complete all buffered writes with their individual locations and wait // for them to be updated in the index. co_await wb.complete_writes(loc); co_await with_scheduling_group(_cfg.separator_sg, [&] { - return _compaction_mgr.write_to_separator(wb, std::move(seg_ref), segment_seq_num); + return write_to_separator(wb, std::move(seg_ref), segment_seq_num); }); + write_to_separator_failed.cancel(); } // flow control for separator debt if (auto separator_delay = calculate_separator_delay(); separator_delay.count() > 0) { co_await seastar::sleep(separator_delay); } - - co_return loc; } -future segment_manager_impl::write_full_segment(write_buffer& wb, compaction_group& cg, write_source source) { +future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_group& cg, write_source source) { auto holder = _async_gate.hold(); wb.finalize(block_alignment); @@ -973,7 +928,7 @@ future segment_manager_impl::write_full_segment(write_buffer& wb, bytes_view data(reinterpret_cast(wb.data()), wb.offset_in_buffer()); if (data.size() > _cfg.segment_size) { - throw std::runtime_error(fmt::format( "Write size {} exceeds segment size {}", data.size(), _cfg.segment_size)); + throw std::runtime_error(fmt::format("Write size {} exceeds segment size {}", data.size(), _cfg.segment_size)); } seg_ptr seg = co_await _segment_pool.get_segment(source); @@ -989,26 +944,13 @@ future segment_manager_impl::write_full_segment(write_buffer& wb, desc.on_write(wb.get_net_data_size(), wb.get_record_count()); - switch (source) { - case write_source::separator: - _stats.separator_bytes_written += data.size(); - _stats.separator_data_bytes_written += wb.get_net_data_size(); - break; - case write_source::compaction: - _stats.compaction_bytes_written += data.size(); - _stats.compaction_data_bytes_written += wb.get_net_data_size(); - break; - default: - _stats.bytes_written += data.size(); - _stats.data_bytes_written += wb.get_net_data_size(); - break; - } + _stats.bytes_written[static_cast(source)] += data.size(); + _stats.data_bytes_written[static_cast(source)] += wb.get_net_data_size(); co_await wb.complete_writes(loc); co_await seg->stop(); - cg.add_logstor_segment(desc); - co_return loc; + cg.add_logstor_segment(desc); } void segment_manager_impl::free_record(log_location location) { @@ -1074,9 +1016,8 @@ future<> segment_manager_impl::replenish_reserve() { while (true) { bool retry = false; try { - auto [seg_guard, seg] = co_await allocate_and_create_new_segment(); + auto seg = co_await allocate_segment(); co_await _segment_pool.put(std::move(seg)); - seg_guard.release(); } catch (abort_requested_exception&) { logstor_logger.debug("Reserve replenisher stopping due to abort"); break; @@ -1097,27 +1038,27 @@ future<> segment_manager_impl::replenish_reserve() { logstor_logger.debug("Reserve replenisher stopped"); } -future> -segment_manager_impl::allocate_and_create_new_segment() { - auto seg_guard = co_await allocate_segment(); +future segment_manager_impl::allocate_segment() { + auto make_segment = [this] (log_segment_id seg_id) -> future { + try { + auto seg_loc = segment_id_to_file_location(seg_id); + auto file = co_await _file_mgr.get_file_for_write(seg_loc.file_id); + auto seg = make_lw_shared(seg_id, std::move(file), seg_loc.file_offset, _cfg.segment_size); + get_segment_descriptor(seg_id).reset(_cfg.segment_size); + _stats.segments_allocated++; + co_return std::move(seg); + } catch (...) { + _free_segments.push_back(seg_id); + _segment_freed_cv.signal(); + throw; + } + }; - auto seg_id = *seg_guard.segment_id; - auto seg_loc = segment_id_to_file_location(seg_id); - auto file = co_await _file_mgr.get_file_for_write(seg_loc.file_id); - auto seg = make_lw_shared(seg_id, std::move(file), seg_loc.file_offset, _cfg.segment_size); - get_segment_descriptor(seg_id).reset(_cfg.segment_size); - _stats.segments_allocated++; - - co_return std::make_pair(std::move(seg_guard), std::move(seg)); -} - -future -segment_manager_impl::allocate_segment() { while (true) { // first, allocate all new segments sequentially if (_next_new_segment_id < _max_segments) { auto seg_id = log_segment_id(_next_new_segment_id++); - co_return segment_allocation_guard(*this, seg_id); + co_return co_await make_segment(seg_id); } if (_free_segments.size() < _max_segments * trigger_compaction_threshold / 100) { @@ -1128,7 +1069,7 @@ segment_manager_impl::allocate_segment() { if (!_free_segments.empty()) { auto seg_id = _free_segments.front(); _free_segments.pop_front(); - co_return segment_allocation_guard(*this, seg_id); + co_return co_await make_segment(seg_id); } // no free segments - wait for a segment to be freed. @@ -1145,7 +1086,7 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept { auto& desc = get_segment_descriptor(segment_id); if (desc.net_data_size(_cfg.segment_size) != 0) { - logstor_logger.error("Freeing segment {} that has data", segment_id); + on_internal_error(logstor_logger, format("Freeing segment {} that has data", segment_id)); } desc.on_free_segment(); @@ -1209,8 +1150,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id, if (buffer_header_buf.size() < write_buffer::buffer_header_size) { break; } - seastar::simple_memory_input_stream buffer_header_stream(buffer_header_buf.get(), buffer_header_buf.size()); - auto bh = ser::deserialize(buffer_header_stream, std::type_identity{}); + auto bh = ser::deserialize_from_buffer(buffer_header_buf, std::type_identity{}); // if buffer header is not valid, skip to next block if (bh.magic != write_buffer::buffer_header_magic) { @@ -1231,8 +1171,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id, if (size_buf.size() < write_buffer::record_header_size) { break; } - seastar::simple_memory_input_stream size_stream(size_buf.get(), size_buf.size()); - auto rh = ser::deserialize(size_stream, std::type_identity{}); + auto rh = ser::deserialize_from_buffer(size_buf, std::type_identity{}); if (rh.data_size == 0) { // End of records in this block break; @@ -1248,8 +1187,7 @@ future<> segment_manager_impl::for_each_record(log_segment_id segment_id, break; } - seastar::simple_memory_input_stream record_stream(record_buf.get(), record_buf.size()); - auto record = ser::deserialize(record_stream, std::type_identity{}); + auto record = ser::deserialize_from_buffer(record_buf, std::type_identity{}); log_location loc { .segment = segment_id, @@ -1285,16 +1223,6 @@ future<> segment_manager_impl::for_each_record(const std::vector } } -bool compaction_manager_impl::is_record_alive(primary_index& index, const primary_index_key& key, log_location loc) { - return index.get(key) - .transform([loc] (const index_entry& e) { return e.location == loc; }) - .value_or(false); -} - -bool compaction_manager_impl::update_record_location(primary_index& index, const primary_index_key& key, log_location old_loc, log_location new_loc) { - return index.update_record_location(key, old_loc, new_loc); -} - void compaction_manager_impl::submit(compaction_group& cg) { if (_async_gate.is_closed() || !_cfg.compaction_enabled) { return; @@ -1419,10 +1347,10 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve future<> flush() { if (buf->has_data()) { flush_count++; - auto base_location = co_await sm.write_full_segment(*buf, cg, write_source::compaction); - co_await when_all_succeed(pending_updates.begin(), pending_updates.end()); - logstor_logger.trace("Compaction buffer flushed to {} with {} bytes", base_location, buf->get_net_data_size()); + co_await sm.write_full_segment(*buf, cg, write_source::compaction); + logstor_logger.trace("Compaction buffer flushed with {} bytes", buf->get_net_data_size()); } + co_await when_all_succeed(pending_updates.begin(), pending_updates.end()); co_await buf->close(); buf->reset(); pending_updates.clear(); @@ -1440,11 +1368,13 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve size_t records_rewritten = 0; size_t records_skipped = 0; + auto& index = cg.get_logstor_index(); + co_await _sm.for_each_record(segments, - [this, &cg, &records_rewritten, &records_skipped, &cb] + [this, &index, &records_rewritten, &records_skipped, &cb] (log_location read_location, log_record record) -> future<> { - if (!is_record_alive(cg.get_logstor_index(), record.key, read_location)) { + if (!index.is_record_alive(record.key, read_location)) { records_skipped++; _stats.compaction_records_skipped++; co_return; @@ -1459,10 +1389,10 @@ future<> compaction_manager_impl::compact_segments(compaction_group& cg, std::ve // write the record and then update the index with the new location auto write_and_update_index = cb.buf->write(std::move(writer)).then_unpack( - [this, &cg, key = std::move(key), read_location, &records_rewritten, &records_skipped] + [this, &index, key = std::move(key), read_location, &records_rewritten, &records_skipped] (log_location new_location, seastar::gate::holder op) { - if (update_record_location(cg.get_logstor_index(), key, read_location, new_location)) { + if (index.update_record_location(key, read_location, new_location)) { _sm.free_record(read_location); records_rewritten++; } else { @@ -1506,7 +1436,6 @@ void compaction_manager_impl::controller::update(size_t segment_write_count, siz } separator_buffer compaction_manager_impl::allocate_separator_buffer() { - // TODO really ensure we have enough buffers if (_sm._available_separator_buffers.empty()) { throw std::runtime_error("No available separator buffers"); } @@ -1516,7 +1445,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() { return separator_buffer(wb); } -future<> compaction_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) { +future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) { for (auto&& w : wb.records()) { co_await coroutine::maybe_yield(); @@ -1538,17 +1467,38 @@ future<> compaction_manager_impl::write_to_separator(write_buffer& wb, segment_r buf.pending_updates.push_back( buf.write(std::move(w.writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) { - if (update_record_location(index, key, prev_loc, new_loc)) { - _sm.free_record(prev_loc); + if (index.update_record_location(key, prev_loc, new_loc)) { + free_record(prev_loc); } else { - _sm.free_record(new_loc); + free_record(new_loc); } - return make_ready_future<>(); } )); } } +void segment_manager_impl::write_to_separator(table& t, log_location prev_loc, log_record record, segment_ref seg_ref) { + auto key = record.key; + auto& index = t.logstor_index(); + log_record_writer writer(std::move(record)); + + auto& buf = t.get_logstor_separator_buffer(key.dk.token(), writer.size()); + + if (buf.held_segments.empty() || buf.held_segments.back().id() != seg_ref.id()) { + buf.held_segments.push_back(seg_ref); + } + + buf.pending_updates.push_back( + buf.write(std::move(writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) { + if (index.update_record_location(key, prev_loc, new_loc)) { + free_record(prev_loc); + } else { + free_record(new_loc); + } + }) + ); +} + future<> compaction_manager_impl::flush_separator_buffer(separator_buffer buf, compaction_group& cg) { logstor_logger.trace("Flushing separator buffer with {} bytes", buf.buf->offset_in_buffer()); @@ -1645,6 +1595,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { if (!tp->uses_logstor()) { co_return; } + logstor_logger.info("Table {}.{} has {} entries in logstor index", tp->schema()->ks_name(), tp->schema()->cf_name(), tp->logstor_index().get_key_count()); for (const auto& entry : tp->logstor_index()) { used_segments.set(entry.entry().location.segment.value); co_await coroutine::maybe_yield(); @@ -1743,11 +1694,7 @@ future> segment_manager_impl::recover_segment_ continue; } - seastar::simple_memory_input_stream buffer_header_stream( - buffer_header_buf.begin(), buffer_header_buf.size()); - - auto bh = ser::deserialize(buffer_header_stream, - std::type_identity{}); + auto bh = ser::deserialize_from_buffer(buffer_header_buf, std::type_identity{}); if (bh.magic != write_buffer::buffer_header_magic) { continue; @@ -1771,7 +1718,7 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database if (!t.uses_logstor()) { co_return; } - if (t.logstor_index().get(record.key).transform([](auto&& entry) { return entry.location; }) == loc) { + if (t.logstor_index().is_record_alive(record.key, loc)) { co_await callback(loc, std::move(record), t); } } catch(const replica::no_such_column_family&) { @@ -1828,30 +1775,14 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database if (need_separator) { auto seg_ref = make_segment_ref(seg_id); + auto write_to_separator_failed = defer([seg_ref] mutable { + seg_ref.set_flush_failure(); + }); co_await for_each_live_record([this, seg_ref] (log_location prev_loc, log_record record, table& t) -> future<> { - auto key = record.key; - - auto& index = t.logstor_index(); - - log_record_writer writer(std::move(record)); - auto& buf = t.get_logstor_separator_buffer(key.dk.token(), writer.size()); - - if (buf.held_segments.empty() || buf.held_segments.back().id() != seg_ref.id()) { - buf.held_segments.push_back(seg_ref); - } - - buf.pending_updates.push_back( - buf.write(std::move(writer)).then_unpack([this, &index, key = std::move(key), prev_loc] (log_location new_loc, seastar::gate::holder op) { - if (index.update_record_location(key, prev_loc, new_loc)) { - free_record(prev_loc); - } else { - free_record(new_loc); - } - return make_ready_future<>(); - } - )); + write_to_separator(t, prev_loc, std::move(record), seg_ref); return make_ready_future<>(); }); + write_to_separator_failed.cancel(); } } @@ -1883,7 +1814,7 @@ future<> segment_manager::stop() { return _impl->stop(); } -future segment_manager::write(write_buffer& wb) { +future<> segment_manager::write(write_buffer& wb) { return _impl->write(wb); } @@ -1895,11 +1826,6 @@ void segment_manager::free_record(log_location location) { _impl->free_record(location); } -future<> segment_manager::for_each_record(const std::vector& segments, - std::function(log_location, log_record)> callback) { - return _impl->for_each_record(std::move(segments), std::move(callback)); -} - void segment_manager::set_trigger_compaction_hook(std::function fn) { _impl->set_trigger_compaction_hook(std::move(fn)); } diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 1d1dd54218..2c2a62bf6f 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -97,15 +97,12 @@ public: future<> start(); future<> stop(); - future write(write_buffer& wb); + future<> write(write_buffer& wb); future read(log_location location); void free_record(log_location location); - future<> for_each_record(const std::vector& segments, - std::function(log_location, log_record)> callback); - compaction_manager& get_compaction_manager() noexcept; const compaction_manager& get_compaction_manager() const noexcept; diff --git a/replica/logstor/write_buffer.cc b/replica/logstor/write_buffer.cc index fee058fe0c..ba616f190d 100644 --- a/replica/logstor/write_buffer.cc +++ b/replica/logstor/write_buffer.cc @@ -128,14 +128,6 @@ future write_buffer::write(log_record_writer writer, c }); } -future write_buffer::write_no_holder(log_record_writer writer) { - // write and leave the gate immediately after the write. - // use carefully when the gate it not needed. - return write(std::move(writer)).then_unpack([] (log_location loc, seastar::gate::holder op) { - return loc; - }); -} - void write_buffer::pad_to_alignment(size_t alignment) { auto current_pos = offset_in_buffer(); auto next_pos = align_up(current_pos, alignment); diff --git a/replica/logstor/write_buffer.hh b/replica/logstor/write_buffer.hh index 8744182318..d3a8dd71c6 100644 --- a/replica/logstor/write_buffer.hh +++ b/replica/logstor/write_buffer.hh @@ -174,12 +174,6 @@ public: return write(std::move(writer), nullptr, {}); } - // Write a record to the buffer. - // Returns a future that will be resolved with the log location once flushed. - // If there are follow-up operations to the write such as index updates then consider - // using write_with_holder instead to keep the write buffer open until those operations are complete. - future write_no_holder(log_record_writer); - static size_t estimate_required_segments(size_t net_data_size, size_t record_count, size_t segment_size); private: diff --git a/replica/table.cc b/replica/table.cc index 6d7b68dc55..70287bbf54 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -217,17 +217,6 @@ table::add_memtables_to_reader_list(std::vector& readers, } } -mutation_reader -table::make_logstor_mutation_reader(schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { - return _logstor->make_reader(std::move(s), logstor_index(), std::move(permit), pr, slice, std::move(trace_state)); -} - mutation_reader table::make_mutation_reader(schema_ptr s, reader_permit permit, @@ -241,7 +230,7 @@ table::make_mutation_reader(schema_ptr s, } if (_logstor) [[unlikely]] { - return make_logstor_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); + return _logstor->make_reader(s, logstor_index(), std::move(permit), range, slice, std::move(trace_state)); } std::vector readers; diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index b9f4aaa833..5608d3e015 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -370,7 +370,7 @@ async def test_compaction(manager: ManagerClient): @pytest.mark.asyncio async def test_drop_table(manager: ManagerClient): """ - Test log compaction by creating dead data and verifying space reclamation. + Test that DROP TABLE works properly with logstor tables. """ cmdline = ['--logger-log-level', 'logstor=trace'] cfg = {'enable_logstor': True, 'experimental_features': ['logstor']}