diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index af873931d2..e23a6d624f 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -800,6 +800,8 @@ class db::commitlog::segment : public enable_shared_from_this, public c void end_flush() { _segment_manager->end_flush(); if (can_delete()) { + // #25709 - do this early if possible + _extended_segments.clear(); _segment_manager->discard_unused_segments(); } } @@ -875,6 +877,8 @@ public: void release_cf_count(const cf_id_type& cf) { mark_clean(cf, 1); if (can_delete()) { + // #25709 - do this early if possible + _extended_segments.clear(); _segment_manager->discard_unused_segments(); } } @@ -2576,20 +2580,24 @@ struct fmt::formatter { void db::commitlog::segment_manager::discard_unused_segments() noexcept { clogger.trace("Checking for unused segments ({} active)", _segments.size()); - std::erase_if(_segments, [=](sseg_ptr s) { - if (s->can_delete()) { - clogger.debug("Segment {} is unused", *s); - return true; - } - if (s->is_still_allocating()) { - clogger.debug("Not safe to delete segment {}; still allocating.", *s); - } else if (!s->is_clean()) { - clogger.debug("Not safe to delete segment {}; dirty is {}", *s, segment::cf_mark {*s}); - } else { - clogger.debug("Not safe to delete segment {}; disk ops pending", *s); - } - return false; - }); + // #25709 ensure we don't free any segment until after prune. + { + auto tmp = _segments; + std::erase_if(_segments, [=](sseg_ptr s) { + if (s->can_delete()) { + clogger.debug("Segment {} is unused", *s); + return true; + } + if (s->is_still_allocating()) { + clogger.debug("Not safe to delete segment {}; still allocating.", *s); + } else if (!s->is_clean()) { + clogger.debug("Not safe to delete segment {}; dirty is {}", *s, segment::cf_mark {*s}); + } else { + clogger.debug("Not safe to delete segment {}; disk ops pending", *s); + } + return false; + }); + } // launch in background, but guard with gate so this deletion is // sure to finish in shutdown, because at least through this path, @@ -2878,7 +2886,10 @@ future<> db::commitlog::segment_manager::do_pending_deletes() { } future<> db::commitlog::segment_manager::orphan_all() { - _segments.clear(); + // #25709. the actual process of destroying the elements here + // might cause a call into discard_unused_segments. + // ensure the target vector is empty when we get to destructors + auto tmp = std::exchange(_segments, {}); return clear_reserve_segments(); } diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 8a2276cff3..5d82ca6b46 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -2102,4 +2102,50 @@ SEASTAR_TEST_CASE(test_commitlog_handle_replayed_segments) { } } +// #25709 +// Test that creating large mutation entries (cross more than one +// segment) and releasing them the memtable way (ref count, not rp) +// does not crash. +// When releasing such a segment, we will get recursion in segment +// pruning, and need to handle this in both discard_unused_segments +// as well as (for tests anyway) orphan_all. +// Note: this test is not perfect. It does not really force a crash +// in actual discard_unused_segments per se, but the orphan_all call +// in cl_test will in fact cause a recursion that, without the fix, +// will do a double free -> boom. +SEASTAR_TEST_CASE(test_commitlog_release_large_mutation_segments) { + commitlog::config cfg; + + constexpr uint64_t max_size_mb = 8; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 8 * 4 * max_size_mb * smp::count; + cfg.allow_going_over_size_limit = false; + cfg.allow_fragmented_entries = true; + cfg.use_o_dsync = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + { + db::rp_set handles; + auto size = log.max_record_size() * 2 - log.max_record_size() / 2; + auto buf = fragmented_temporary_buffer::allocate_to_fit(size); + + for (size_t i = 0; i < 6; ++i) { + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + handles.put(std::move(h)); + } + + co_await log.force_new_active_segment(); + co_await log.sync_all_segments(); + } + // now handles will release + }); +} + + BOOST_AUTO_TEST_SUITE_END()