commitlog_test: Add test for deadlock in recycle waiter

Not a very good test, mind you. Nothing to verify, just see if
the test times out. But try to make it at least complete for
failure report.
This commit is contained in:
Calle Wilund
2021-04-06 13:10:21 +00:00
parent e991e01f2e
commit 813694b617
3 changed files with 91 additions and 0 deletions

View File

@@ -2449,6 +2449,14 @@ std::vector<sstring> db::commitlog::get_active_segment_names() const {
return _segment_manager->get_active_names();
}
uint64_t db::commitlog::disk_limit() const {
return _segment_manager->max_disk_size;
}
uint64_t db::commitlog::disk_footprint() const {
return _segment_manager->totals.total_size_on_disk;
}
uint64_t db::commitlog::get_total_size() const {
return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes;
}

View File

@@ -336,6 +336,16 @@ public:
*/
uint64_t max_active_flushes() const;
/**
* Return disk footprint
*/
uint64_t disk_footprint() const;
/**
* Return configured disk footprint limit
*/
uint64_t disk_limit() const;
future<> clear();
const config& active_config() const;

View File

@@ -28,6 +28,7 @@
#include <unordered_map>
#include <unordered_set>
#include <set>
#include <deque>
#include <seastar/testing/test_case.hh>
#include <seastar/core/coroutine.hh>
@@ -748,3 +749,75 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment_odsync){
}
});
}
// Test for #8363
// try to provoke edge case where we race segment deletion
// and waiting for recycled to be replenished.
SEASTAR_TEST_CASE(test_commitlog_deadlock_in_recycle) {
commitlog::config cfg;
constexpr auto max_size_mb = 2;
cfg.commitlog_segment_size_in_mb = max_size_mb;
// ensure total size per shard is not multiple of segment size.
cfg.commitlog_total_space_in_mb = 5 * smp::count;
cfg.commitlog_sync_period_in_ms = 10;
cfg.reuse_segments = true;
cfg.allow_going_over_size_limit = false;
cfg.use_o_dsync = true; // make sure we pre-allocate.
// not using cl_test, because we need to be able to abandon
// the log.
tmpdir tmp;
cfg.commit_log_location = tmp.path().string();
auto log = co_await commitlog::create_commitlog(cfg);
rp_set rps;
std::deque<rp_set> queue;
size_t n = 0;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
auto uuid = utils::UUID_gen::get_time_UUID();
auto size = log.max_record_size() / 2;
timer<> t;
t.set_callback([&] {
while (!queue.empty()) {
auto flush = std::move(queue.front());
queue.pop_front();
log.discard_completed_segments(uuid, flush);
++n;
};
});
// add a flush handler that delays releasing things until disk threshold is reached.
auto r = log.add_flush_handler([&](cf_id_type, replay_position pos) {
auto old = std::exchange(rps, rp_set{});
queue.emplace_back(std::move(old));
if (log.disk_footprint() >= log.disk_limit() && !t.armed()) {
t.arm(5s);
}
});
bool release = true;
try {
while (n < 10) {
auto now = timeout_clock::now();
rp_handle h = co_await with_timeout(now + 30s, log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
}));
rps.put(std::move(h));
}
} catch (timed_out_error&) {
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
release = false;
}
if (release) {
co_await log.shutdown();
co_await log.clear();
}
}