Merge 'Commitlog: Handle disk usage and disk footprint discrepancies, ensuring we flush when needed (#8695)​ (v3)' from Calle Wilund

Fixes #8270

If we have an allocation pattern where we leave large parts of segments "wasted" (typically because the segment has empty space, but cannot hold the mutation being added), we can have a disk usage that is below threshold, yet still get a disk footprint that is over limit causing new segment allocation to stall.

We need to take a few things into account:
1.) Need to include wasted space in the threshold check. Whether or not disk is actually used does not matter here.
2.) If we stall a segment alloc, we should just flush immediately. No point in waiting for the timer task.
3.) Need to adjust the thresholds a bit. Depending on sizes, we should probably consider start flushing once we've used up space enough to be in the last available segment, so a new one is hopefully available by the time we hit the limit.
4.) (v2) Must ensure discard/delete routines are executed. Because we can race with background disk syncs, we may need to
    issue segment prunes from end_flush() so we wake up actual file deletion/recycling
5.) (v2) Shutdown must ensure discard/delete is run after we've disabled background task etc, otherwise we might fail waking up replenish and get stuck in gate
6.) (v2) Recycling or deleting segments must be consistent, regardless of shutdown. For same reason as above.
7.) (v3) Signal recycle/delete queues/promise on shutdown (with recognized marker) to handle edge case where we only have a single (allocating) segment in the list, and cannot wake up replenisher in any more civilized way.

Also fix edge case (for tests), when we have too few segment to have an active one (i.e. need flush everything).

New attempt at this, should fix intermittent shutdown deadlocks in commitlog_test.

Closes #8764

* github.com:scylladb/scylla:
  commitlog_test: Add test case for usage/disk size threshold mismatch
  commitlog_test: Improve test assertion
  commitlog: Add waitable future for background sync/flush
  commitlog: abort queues on shutdown
  commitlog: break out "abort" calls into member functions
  commitlog: Do explicit discard+delete in shutdown
  commitlog: Recycle or not should not depend on shutdown state
  commitlog: Issue discard_unused_segments on segment::flush end IFF deletable
  commitlog: Flush all segments if we only have one.
  commitlog: Always force flush if segment allocation is waiting
  commitlog: Include segment wasted (slack) size in footprint check
  commitlog: Adjust (lower) usage threshold

(cherry picked from commit 14252c8b71)
This commit is contained in:
Avi Kivity
2021-06-23 15:43:57 +03:00
parent 7146646bf4
commit f89f4e69a0
2 changed files with 145 additions and 18 deletions

View File

@@ -316,6 +316,7 @@ public:
uint64_t buffer_list_bytes = 0;
// size on disk, actually used - i.e. containing data (allocate+cycle)
uint64_t active_size_on_disk = 0;
uint64_t wasted_size_on_disk = 0;
// size allocated on disk - i.e. files created (new, reserve, recycled)
uint64_t total_size_on_disk = 0;
uint64_t requests_blocked_memory = 0;
@@ -419,7 +420,11 @@ public:
void flush_segments(uint64_t size_to_remove);
private:
class shutdown_marker{};
future<> clear_reserve_segments();
void abort_recycled_list(std::exception_ptr);
void abort_deletion_promise(std::exception_ptr);
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
@@ -433,6 +438,7 @@ private:
timer<clock_type> _timer;
future<> replenish_reserve();
future<> _reserve_replenisher;
future<> _background_sync;
seastar::gate _gate;
uint64_t _new_counter = 0;
};
@@ -541,6 +547,9 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
void end_flush() {
_segment_manager->end_flush();
if (can_delete()) {
_segment_manager->discard_unused_segments();
}
}
public:
@@ -584,6 +593,7 @@ public:
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position());
_segment_manager->add_file_to_delete(_file_name, _desc);
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
@@ -695,7 +705,14 @@ public:
}
future<sseg_ptr> close() {
_closed = true;
return sync().then([] (sseg_ptr s) { return s->flush(); }).then([] (sseg_ptr s) { return s->terminate(); });
return sync().then([] (sseg_ptr s) {
return s->flush();
}).then([](sseg_ptr s) {
return s->terminate();
}).then([](sseg_ptr s) {
s->_segment_manager->totals.wasted_size_on_disk += (s->_size_on_disk - s->file_position());
return s;
});
}
future<sseg_ptr> do_flush(uint64_t pos) {
auto me = shared_from_this();
@@ -1143,7 +1160,9 @@ db::commitlog::segment_manager::segment_manager(config c)
// our threshold for trying to force a flush. needs heristics, for now max - segment_size/2.
, disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value()
? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024
: (max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0)))
: (max_disk_size -
(max_disk_size >= (max_size*2) ? max_size
: (max_disk_size > (max_size/2) ? (max_size/2) : max_disk_size/3))))
, _flush_semaphore(cfg.max_active_flushes)
// That is enough concurrency to allow for our largest mutation (max_mutation_size), plus
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
@@ -1153,6 +1172,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _reserve_segments(1)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1190,6 +1210,12 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
return make_ready_future<>();
});
}).handle_exception([](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (shutdown_marker&) {
return make_ready_future<>();
} catch (...) {
}
clogger.warn("Exception in segment reservation: {}", ep);
return sleep(100ms);
});
@@ -1334,6 +1360,10 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate
sm::description("Holds a size of disk space in bytes used for data so far. "
"A too high value indicates that we have some bottleneck in the writing to sstables path.")),
sm::make_gauge("disk_slack_end_bytes", totals.wasted_size_on_disk,
sm::description("Holds a size of disk space in bytes unused because of segment switching (end slack). "
"A too high value indicates that we do not write enough data to each segment.")),
sm::make_gauge("memory_buffer_bytes", totals.buffer_list_bytes,
sm::description("Holds the total number of bytes in internal memory buffers.")),
});
@@ -1370,7 +1400,8 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
// Now get a set of used CF ids:
std::unordered_set<cf_id_type> ids;
std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) {
auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating));
std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) {
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
ids.insert(id);
}
@@ -1516,7 +1547,17 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
if (!f.available()) {
_new_counter = 0; // zero this so timer task does not duplicate the below flush
flush_segments(0); // force memtable flush already
}
return f.handle_exception([this](auto ep) {
try {
std::rethrow_exception(ep);
} catch (shutdown_marker&) {
throw;
} catch (...) {
}
clogger.warn("Exception while waiting for segments {}. Will retry allocation...", ep);
}).then([this] {
return allocate_segment();
@@ -1540,7 +1581,8 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
}
// if we have no reserve and we're above/at limits, make background task a little more eager.
if (!_shutdown && totals.total_size_on_disk >= disk_usage_threshold) {
auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk;
if (!_shutdown && cur >= disk_usage_threshold) {
_timer.cancel();
_timer.arm(std::chrono::milliseconds(0));
}
@@ -1708,14 +1750,28 @@ future<> db::commitlog::segment_manager::shutdown() {
return std::move(block_new_requests).then([this] (auto permits) {
_timer.cancel(); // no more timer calls
_shutdown = true; // no re-arm, no create new segments.
// do a discard + delete sweep to force
// gate holder (i.e. replenish) to wake up
discard_unused_segments();
auto f = do_pending_deletes().then([this] {
auto ep = std::make_exception_ptr(shutdown_marker{});
if (_recycled_segments.empty()) {
abort_recycled_list(ep);
}
abort_deletion_promise(ep);
return std::exchange(_background_sync, make_ready_future<>());
});
// Now first wait for periodic task to finish, then sync and close all
// segments, flushing out any remaining data.
return _gate.close().finally([this, permits = std::move(permits)] () mutable {
return shutdown_all_segments().handle_exception([permits = std::move(permits)] (std::exception_ptr ex) {
return _gate.close().then([this, f = std::move(f)]() mutable {
return std::move(f).then(std::bind(&segment_manager::shutdown_all_segments, this)).handle_exception([](std::exception_ptr ex) {
clogger.error("Shutting down all segments failed during shutdown: {}. Aborting.", ex);
abort();
});
});
}).finally([permits = std::move(permits)] { });
});
}).finally([this] {
discard_unused_segments();
@@ -1778,7 +1834,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
}
// We allow reuse of the segment if the current disk size is less than shard max.
if (!_shutdown && cfg.reuse_segments) {
if (cfg.reuse_segments) {
auto usage = totals.total_size_on_disk;
auto recycle = usage <= max_disk_size;
@@ -1826,12 +1882,22 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
// for new allocs at least. Or more likely, everything is broken, but
// we will at least make more noise.
if (recycle_error && _recycled_segments.empty()) {
_recycled_segments.abort(recycle_error);
// and ensure next lap(s) still has a queue
_recycled_segments = queue<sstring>(std::numeric_limits<size_t>::max());
abort_recycled_list(recycle_error);
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
// may not call here with elements in list. that would leak files.
assert(_recycled_segments.empty());
_recycled_segments.abort(ep);
// and ensure next lap(s) still has a queue
_recycled_segments = queue<sstring>(std::numeric_limits<size_t>::max());
}
void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr ep) {
std::exchange(_disk_deletions, {}).set_exception(ep);
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});
@@ -1868,9 +1934,12 @@ future<> db::commitlog::segment_manager::clear() {
* Called by timer in periodic mode.
*/
void db::commitlog::segment_manager::sync() {
for (auto s : _segments) {
(void)s->sync(); // we do not care about waiting...
}
auto f = std::exchange(_background_sync, make_ready_future<>());
_background_sync = parallel_for_each(_segments, [](sseg_ptr s) {
return s->sync().discard_result();
}).then([f = std::move(f)]() mutable {
return std::move(f);
});
}
void db::commitlog::segment_manager::on_timer() {
@@ -1885,10 +1954,11 @@ void db::commitlog::segment_manager::on_timer() {
// above threshold, request flush.
if (_new_counter > 0) {
auto max = disk_usage_threshold;
auto cur = totals.active_size_on_disk;
auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk;
if (max != 0 && cur >= max) {
_new_counter = 0;
clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024));
_new_counter = 0;
flush_segments(cur - max);
}
}
@@ -2512,7 +2582,10 @@ uint64_t db::commitlog::disk_footprint() const {
}
uint64_t db::commitlog::get_total_size() const {
return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes;
return _segment_manager->totals.active_size_on_disk
+ _segment_manager->totals.wasted_size_on_disk
+ _segment_manager->totals.buffer_list_bytes
;
}
uint64_t db::commitlog::get_completed_tasks() const {