diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index d7b7e5ae1d..34b340b70f 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -2449,6 +2449,14 @@ std::vector db::commitlog::get_active_segment_names() const { return _segment_manager->get_active_names(); } +uint64_t db::commitlog::disk_limit() const { + return _segment_manager->max_disk_size; +} + +uint64_t db::commitlog::disk_footprint() const { + return _segment_manager->totals.total_size_on_disk; +} + uint64_t db::commitlog::get_total_size() const { return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes; } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index d502ebd8a7..f309d2d371 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -336,6 +336,16 @@ public: */ uint64_t max_active_flushes() const; + /** + * Return disk footprint + */ + uint64_t disk_footprint() const; + + /** + * Return configured disk footprint limit + */ + uint64_t disk_limit() const; + future<> clear(); const config& active_config() const; diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 5069c36ed6..f86a992b18 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -748,3 +749,75 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment_odsync){ } }); } + +// Test for #8363 +// try to provoke edge case where we race segment deletion +// and waiting for recycled to be replenished. +SEASTAR_TEST_CASE(test_commitlog_deadlock_in_recycle) { + commitlog::config cfg; + + constexpr auto max_size_mb = 2; + cfg.commitlog_segment_size_in_mb = max_size_mb; + // ensure total size per shard is not multiple of segment size. + cfg.commitlog_total_space_in_mb = 5 * smp::count; + cfg.commitlog_sync_period_in_ms = 10; + cfg.reuse_segments = true; + cfg.allow_going_over_size_limit = false; + cfg.use_o_dsync = true; // make sure we pre-allocate. + + // not using cl_test, because we need to be able to abandon + // the log. + + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + auto log = co_await commitlog::create_commitlog(cfg); + + rp_set rps; + std::deque queue; + size_t n = 0; + + // uncomment for verbosity + // logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug); + + auto uuid = utils::UUID_gen::get_time_UUID(); + auto size = log.max_record_size() / 2; + + timer<> t; + t.set_callback([&] { + while (!queue.empty()) { + auto flush = std::move(queue.front()); + queue.pop_front(); + log.discard_completed_segments(uuid, flush); + ++n; + }; + }); + + // add a flush handler that delays releasing things until disk threshold is reached. + auto r = log.add_flush_handler([&](cf_id_type, replay_position pos) { + auto old = std::exchange(rps, rp_set{}); + queue.emplace_back(std::move(old)); + if (log.disk_footprint() >= log.disk_limit() && !t.armed()) { + t.arm(5s); + } + }); + + bool release = true; + + try { + while (n < 10) { + auto now = timeout_clock::now(); + rp_handle h = co_await with_timeout(now + 30s, log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.fill('1', size); + })); + rps.put(std::move(h)); + } + } catch (timed_out_error&) { + BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow..."); + release = false; + } + + if (release) { + co_await log.shutdown(); + co_await log.clear(); + } +}