commitlog: Recalculate footprint on delete_segment exceptions

Fixes #9348

If we get exceptions in delete_segments, we can, and probably will, loose
track of footprint counters. We need to recompute the used disk footprint,
otherwise we will flush too often, and even block indefinately on new_seg
iff using hard limits.
This commit is contained in:
Calle Wilund
2021-09-15 08:04:21 +00:00
parent 810e410c5d
commit 11bb03e46d
2 changed files with 74 additions and 2 deletions

View File

@@ -428,6 +428,8 @@ private:
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);
future<> recalculate_footprint();
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
@@ -444,6 +446,7 @@ private:
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};
template<typename T>
@@ -1225,6 +1228,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1250,6 +1254,11 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
@@ -1872,7 +1881,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
std::exception_ptr recycle_error;
size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
@@ -1925,6 +1934,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
}
}
@@ -1942,6 +1952,11 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (!recycle_error && num_deleted && cfg.reuse_segments && _recycled_segments.empty()) {
abort_recycled_list(std::make_exception_ptr(std::runtime_error("deleted files")));
}
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
@@ -1956,6 +1971,63 @@ void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr e
std::exchange(_disk_deletions, {}).set_exception(ep);
}
future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();
auto guard = get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}
// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;
totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->_size_on_disk;
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->_size_on_disk;
}
// now we need to adjust the actual sizes of recycled files
uint64_t actual_recycled_size = 0;
try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}
for (auto&& filename : recycles) {
_recycled_segments.push(std::move(filename));
}
for (auto&& s : reserves) {
_reserve_segments.push(std::move(s)); // you can have it back now.
}
totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});