Merge 'Make commitlog disk limit a hard limit.' from Calle Wilund

Refs #6148

Commitlog disk limit was previously a "soft" limit, in that we allowed allocating new segments, even if we were over
disk usage max. This would also cause us sometimes to create new segments and delete old ones, if badly timed in
needing and releasing segments, in turn causing useless disk IO for pre-allocation/zeroing.

This patch set does:
* Make limit a hard limit. If we have disk usage > max, we wait for delete or recycle.
* Make flush threshold configurable. Default is ask for flush when over 50% usage. (We do not wait for results)
* Make flush "partial". We flush X% of the used space (used - thres/2), and make the rp limit accordingly. This means we will try to clear the N oldest segments, not all. I.e. "lighter" flush. Of course, if the CL is wholly dominated by a single CF, this will not really help much. But when > 1 cf is used, it means we can skip those not having unflushed data < req rp.
* Force more eager flush/recycle if we're out of segments

Note: flush threshold is not exposed in scylla config (yet). Because I am unsure of wording, and even if it should.
Note: testing is sparse, esp. in regard to latency/timeouts added in high usage scenarios. While I can fairly easily provoke "stalls" (i.e. forced waiting for segments to free up) with simple C-S, it is hard to say exactly where in a more sane config (I set my limits looow) latencies will start accumulating.

Closes #7879

* github.com:scylladb/scylla:
  commitlog: Force earlier cycle/flush iff segment reserve is empty
  commitlog: Make segment allocation wait iff disk usage > max
  commitlog: Do partial (memtable) flushing based on threshold
  commitlog: Make flush threshold configurable
  table: Add a flush RP mark to table, and shortcut if not above
This commit is contained in:
Avi Kivity
2021-02-08 16:44:05 +02:00
5 changed files with 75 additions and 26 deletions

View File

@@ -867,7 +867,7 @@ database::init_commitlog() {
return;
}
// Initiate a background flush. Waited upon in `stop()`.
(void)_column_families[id]->flush();
(void)_column_families[id]->flush(pos);
}).release(); // we have longer life time than CL. Ignore reg anchor
});
}

View File

@@ -448,6 +448,7 @@ private:
std::optional<int64_t> _sstable_generation = {};
db::replay_position _highest_rp;
db::replay_position _flush_rp;
db::replay_position _lowest_allowed_rp;
// Provided by the database that owns this commitlog
@@ -750,7 +751,7 @@ public:
void start();
future<> stop();
future<> flush();
future<> flush(std::optional<db::replay_position> = {});
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);

View File

@@ -284,6 +284,7 @@ public:
using request_controller_type = basic_semaphore<timeout_exception_factory, db::timeout_clock>;
using request_controller_units = semaphore_units<timeout_exception_factory, db::timeout_clock>;
request_controller_type _request_controller;
shared_promise<> _disk_deletions;
std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
std::unordered_map<sstring, descriptor> _files_to_delete;
@@ -413,7 +414,7 @@ public:
_flush_handlers.erase(id);
}
void flush_segments(bool = false);
void flush_segments(uint64_t size_to_remove);
private:
future<> clear_reserve_segments();
@@ -423,7 +424,7 @@ private:
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
queue<sseg_ptr> _reserve_segments;
std::deque<sstring> _recycled_segments;
queue<sstring> _recycled_segments;
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
flush_handler_id _flush_ids = 0;
replay_position _flush_position;
@@ -1138,7 +1139,9 @@ db::commitlog::segment_manager::segment_manager(config c)
, max_mutation_size(max_size >> 1)
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
// our threshold for trying to force a flush. needs heristics, for now max - segment_size/2.
, disk_usage_threshold(max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0))
, disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value()
? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024
: (max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0)))
, _flush_semaphore(cfg.max_active_flushes)
// That is enough concurrency to allow for our largest mutation (max_mutation_size), plus
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
@@ -1146,6 +1149,7 @@ db::commitlog::segment_manager::segment_manager(config c)
// always be admitted for processing.
, _request_controller(max_request_controller_units(), request_controller_timeout_exception_factory{})
, _reserve_segments(1)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
{
assert(max_size > 0);
@@ -1333,7 +1337,7 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate
});
}
void db::commitlog::segment_manager::flush_segments(bool force) {
void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
if (_segments.empty()) {
return;
}
@@ -1346,10 +1350,22 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
// But if all segments are closed or we force-flush,
// include all.
if (force || !active->is_still_allocating()) {
if (!active->is_still_allocating()) {
high = replay_position(high.id + 1, 0);
}
auto n = size_to_remove;
if (size_to_remove != 0) {
for (auto& s : _segments) {
if (n <= s->_size_on_disk) {
high = replay_position(s->_desc.id, db::position_type(s->_size_on_disk));
break;
}
n -= s->_size_on_disk;
}
}
// Now get a set of used CF ids:
std::unordered_set<cf_id_type> ids;
std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) {
@@ -1358,7 +1374,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
}
});
clogger.debug("Flushing ({}) to {}", force, high);
clogger.debug("Flushing ({} MB) to {}", size_to_remove/(1024*1024), high);
// For each CF id: for each callback c: call c(id, high)
for (auto& f : callbacks) {
@@ -1471,8 +1487,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}
if (!_recycled_segments.empty()) {
auto src = std::move(_recycled_segments.front());
_recycled_segments.pop_front();
auto src = _recycled_segments.pop();
// Note: we have to do the rename here to ensure
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
@@ -1483,6 +1498,14 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
});
}
if (max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
return f.then([this] {
return allocate_segment();
});
}
return allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
}
@@ -1493,11 +1516,19 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
++_new_counter;
// don't increase reserve count if we are at max, or we would go over disk limit.
if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments) && (totals.total_size_on_disk + max_size) <= max_disk_size) {
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
if (_reserve_segments.empty()) {
// don't increase reserve count if we are at max, or we would go over disk limit.
if (_reserve_segments.max_size() < cfg.max_reserve_segments && (totals.total_size_on_disk + max_size) <= max_disk_size) {
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
}
// if we have no reserve and we're above/at limits, make background task a little more eager.
if (!_shutdown && totals.total_size_on_disk >= disk_usage_threshold) {
_timer.cancel();
_timer.arm(std::chrono::milliseconds(0));
}
}
return _reserve_segments.pop_eventually().then([this] (auto s) {
_segments.push_back(std::move(s));
_segments.back()->reset_sync_time();
@@ -1608,14 +1639,21 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
_reserve_segments.pop();
}
auto re = std::exchange(_recycled_segments, {});
auto i = re.begin();
auto e = re.end();
std::vector<sstring> tmp;
tmp.reserve(_recycled_segments.size());
_recycled_segments.consume([&](sstring s) {
tmp.emplace_back(std::move(s));
return true;
});
auto i = tmp.begin();
auto e = tmp.end();
return parallel_for_each(i, e, [this](const sstring& filename) {
clogger.debug("Deleting recycled segment file {}", filename);
return delete_file(filename);
}).finally([this, re = std::move(re)] {
}).finally([this, tmp = std::move(tmp)] {
return do_pending_deletes();
});
}
@@ -1684,7 +1722,10 @@ future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
return seastar::file_size(filename).then([this, filename](uint64_t size) {
clogger.debug("Deleting segment file {}", filename);
return commit_io_check(&seastar::remove_file, filename).then([this, size] {
totals.total_size_on_disk -= size;
clogger.trace("Reclaimed {} MB", size/(1024*1024));
totals.total_size_on_disk -= size;
auto p = std::exchange(_disk_deletions, {});
p.set_value();
});
});
}
@@ -1704,7 +1745,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
return f.finally([&] {
// We allow reuse of the segment if the current disk size is less than shard max.
auto usage = totals.total_size_on_disk;
if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) {
if (!_shutdown && cfg.reuse_segments) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
@@ -1712,8 +1753,9 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
return rename_file(filename, dst).then([this, dst] {
_recycled_segments.emplace_back(dst);
return rename_file(filename, dst).then([this, dst]() mutable {
auto b = _recycled_segments.push(std::move(dst));
assert(b); // we set this to max_size_t so...
return make_ready_future<>();
}).handle_exception([this, filename](auto&&) {
return delete_file(filename);
@@ -1782,8 +1824,8 @@ void db::commitlog::segment_manager::on_timer() {
auto cur = totals.active_size_on_disk;
if (max != 0 && cur >= max) {
_new_counter = 0;
clogger.debug("Used size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024));
flush_segments();
clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024));
flush_segments(cur - max);
}
}
return do_pending_deletes();

View File

@@ -123,6 +123,7 @@ public:
sstring commit_log_location;
sstring metrics_category_name;
uint64_t commitlog_total_space_in_mb = 0;
std::optional<uint64_t> commitlog_flush_threshold_in_mb = {};
uint64_t commitlog_segment_size_in_mb = 32;
uint64_t commitlog_sync_period_in_ms = 10 * 1000; //TODO: verify default!
// Max number of segments to keep in pre-alloc reserve.

View File

@@ -1316,9 +1316,14 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
});
}
future<> table::flush() {
future<> table::flush(std::optional<db::replay_position> pos) {
if (pos && *pos < _flush_rp) {
return make_ready_future<>();
}
auto op = _pending_flushes_phaser.start();
return _memtables->request_flush().then([op = std::move(op)] {});
return _memtables->request_flush().then([this, op = std::move(op), fp = _highest_rp] {
_flush_rp = std::max(_flush_rp, fp);
});
}
bool table::can_flush() const {