diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 2963e94d59..0d19bbf095 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1521,7 +1521,7 @@ future db::commitlog::segment_manager: if (cfg.extensions && !cfg.extensions->commitlog_file_extensions().empty()) { for (auto * ext : cfg.extensions->commitlog_file_extensions()) { - auto nf = co_await ext->wrap_file(std::move(filename), f, flags); + auto nf = co_await ext->wrap_file(filename, f, flags); if (nf) { f = std::move(nf); align = is_overwrite ? f.disk_overwrite_dma_alignment() : f.disk_write_dma_alignment(); @@ -1541,6 +1541,7 @@ future db::commitlog::segment_manager: co_await f.close(); } if (ep) { + add_file_to_delete(filename, d); co_return coroutine::exception(std::move(ep)); } @@ -1870,6 +1871,8 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi std::exception_ptr recycle_error; + size_t num_deleted = 0; + while (!files.empty()) { auto filename = std::move(files.back()); files.pop_back(); @@ -1919,6 +1922,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi } } co_await delete_file(filename); + ++num_deleted; } catch (...) { clogger.error("Could not delete segment {}: {}", filename, std::current_exception()); } @@ -1933,6 +1937,11 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi if (recycle_error && _recycled_segments.empty()) { abort_recycled_list(recycle_error); } + // If recycle failed and turned into a delete, we should fake-wakeup waiters + // since we might still have cleaned up disk space. + if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) { + abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files"))); + } } void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) { diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 87e0c3e8fc..2329ee3ac8 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -44,7 +44,9 @@ #include "test/lib/tmpdir.hh" #include "db/commitlog/commitlog.hh" #include "db/commitlog/commitlog_replayer.hh" +#include "db/commitlog/commitlog_extensions.hh" #include "db/commitlog/rp_set.hh" +#include "db/extensions.hh" #include "log.hh" #include "service/priority_manager.hh" #include "test/lib/exception_utils.hh" @@ -947,3 +949,80 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) { co_await log.clear(); } } + +SEASTAR_TEST_CASE(test_commitlog_exceptions_in_allocate_ex) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count; + cfg.commitlog_sync_period_in_ms = 10; + cfg.reuse_segments = true; + cfg.allow_going_over_size_limit = false; + cfg.use_o_dsync = true; // make sure we pre-allocate. + + // not using cl_test, because we need to be able to abandon + // the log. + + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + + class myfail : public std::exception { + public: + using std::exception::exception; + }; + + struct myext: public db::commitlog_file_extension { + public: + bool fail = false; + bool thrown = false; + + seastar::future wrap_file(const seastar::sstring& filename, seastar::file f, seastar::open_flags flags) override { + if (fail && !thrown) { + thrown = true; + throw myfail{}; + } + co_return f; + } + seastar::future<> before_delete(const seastar::sstring&) override { + co_return; + } + }; + + auto ep = std::make_unique(); + auto& mx = *ep; + + db::extensions myexts; + myexts.add_commitlog_file_extension("hufflepuff", std::move(ep)); + + cfg.extensions = &myexts; + + auto log = co_await commitlog::create_commitlog(cfg); + + rp_set rps; + // uncomment for verbosity + // logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug); + + auto uuid = utils::UUID_gen::get_time_UUID(); + auto size = log.max_record_size(); + + auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) { + log.discard_completed_segments(id, rps); + mx.fail = true; + }); + + try { + while (!mx.thrown) { + rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.fill('1', size); + }); + rps.put(std::move(h)); + } + } catch (...) { + BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow..."); + } + + co_await log.shutdown(); + co_await log.clear(); +}