|
|
|
|
@@ -284,6 +284,7 @@ public:
|
|
|
|
|
using request_controller_type = basic_semaphore<timeout_exception_factory, db::timeout_clock>;
|
|
|
|
|
using request_controller_units = semaphore_units<timeout_exception_factory, db::timeout_clock>;
|
|
|
|
|
request_controller_type _request_controller;
|
|
|
|
|
shared_promise<> _disk_deletions;
|
|
|
|
|
|
|
|
|
|
std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
|
|
|
|
|
std::unordered_map<sstring, descriptor> _files_to_delete;
|
|
|
|
|
@@ -413,7 +414,7 @@ public:
|
|
|
|
|
_flush_handlers.erase(id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void flush_segments(bool = false);
|
|
|
|
|
void flush_segments(uint64_t size_to_remove);
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
future<> clear_reserve_segments();
|
|
|
|
|
@@ -423,7 +424,7 @@ private:
|
|
|
|
|
segment_id_type _ids = 0;
|
|
|
|
|
std::vector<sseg_ptr> _segments;
|
|
|
|
|
queue<sseg_ptr> _reserve_segments;
|
|
|
|
|
std::deque<sstring> _recycled_segments;
|
|
|
|
|
queue<sstring> _recycled_segments;
|
|
|
|
|
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
|
|
|
|
|
flush_handler_id _flush_ids = 0;
|
|
|
|
|
replay_position _flush_position;
|
|
|
|
|
@@ -1138,7 +1139,9 @@ db::commitlog::segment_manager::segment_manager(config c)
|
|
|
|
|
, max_mutation_size(max_size >> 1)
|
|
|
|
|
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
|
|
|
|
|
// our threshold for trying to force a flush. needs heristics, for now max - segment_size/2.
|
|
|
|
|
, disk_usage_threshold(max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0))
|
|
|
|
|
, disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value()
|
|
|
|
|
? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024
|
|
|
|
|
: (max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0)))
|
|
|
|
|
, _flush_semaphore(cfg.max_active_flushes)
|
|
|
|
|
// That is enough concurrency to allow for our largest mutation (max_mutation_size), plus
|
|
|
|
|
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
|
|
|
|
|
@@ -1146,6 +1149,7 @@ db::commitlog::segment_manager::segment_manager(config c)
|
|
|
|
|
// always be admitted for processing.
|
|
|
|
|
, _request_controller(max_request_controller_units(), request_controller_timeout_exception_factory{})
|
|
|
|
|
, _reserve_segments(1)
|
|
|
|
|
, _recycled_segments(std::numeric_limits<size_t>::max())
|
|
|
|
|
, _reserve_replenisher(make_ready_future<>())
|
|
|
|
|
{
|
|
|
|
|
assert(max_size > 0);
|
|
|
|
|
@@ -1333,7 +1337,7 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void db::commitlog::segment_manager::flush_segments(bool force) {
|
|
|
|
|
void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
|
|
|
|
|
if (_segments.empty()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
@@ -1346,10 +1350,22 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
|
|
|
|
|
|
|
|
|
// But if all segments are closed or we force-flush,
|
|
|
|
|
// include all.
|
|
|
|
|
if (force || !active->is_still_allocating()) {
|
|
|
|
|
if (!active->is_still_allocating()) {
|
|
|
|
|
high = replay_position(high.id + 1, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto n = size_to_remove;
|
|
|
|
|
|
|
|
|
|
if (size_to_remove != 0) {
|
|
|
|
|
for (auto& s : _segments) {
|
|
|
|
|
if (n <= s->_size_on_disk) {
|
|
|
|
|
high = replay_position(s->_desc.id, db::position_type(s->_size_on_disk));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
n -= s->_size_on_disk;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Now get a set of used CF ids:
|
|
|
|
|
std::unordered_set<cf_id_type> ids;
|
|
|
|
|
std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) {
|
|
|
|
|
@@ -1358,7 +1374,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
clogger.debug("Flushing ({}) to {}", force, high);
|
|
|
|
|
clogger.debug("Flushing ({} MB) to {}", size_to_remove/(1024*1024), high);
|
|
|
|
|
|
|
|
|
|
// For each CF id: for each callback c: call c(id, high)
|
|
|
|
|
for (auto& f : callbacks) {
|
|
|
|
|
@@ -1471,8 +1487,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!_recycled_segments.empty()) {
|
|
|
|
|
auto src = std::move(_recycled_segments.front());
|
|
|
|
|
_recycled_segments.pop_front();
|
|
|
|
|
auto src = _recycled_segments.pop();
|
|
|
|
|
// Note: we have to do the rename here to ensure
|
|
|
|
|
// proper descriptor id order. If we renamed in the delete call
|
|
|
|
|
// that recycled the file we could potentially have
|
|
|
|
|
@@ -1483,6 +1498,14 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
|
|
|
|
|
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
|
|
|
|
|
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
|
|
|
|
|
return f.then([this] {
|
|
|
|
|
return allocate_segment();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1493,11 +1516,19 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
|
|
|
|
|
|
|
|
|
++_new_counter;
|
|
|
|
|
|
|
|
|
|
// don't increase reserve count if we are at max, or we would go over disk limit.
|
|
|
|
|
if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments) && (totals.total_size_on_disk + max_size) <= max_disk_size) {
|
|
|
|
|
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
|
|
|
|
|
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
|
|
|
|
|
if (_reserve_segments.empty()) {
|
|
|
|
|
// don't increase reserve count if we are at max, or we would go over disk limit.
|
|
|
|
|
if (_reserve_segments.max_size() < cfg.max_reserve_segments && (totals.total_size_on_disk + max_size) <= max_disk_size) {
|
|
|
|
|
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
|
|
|
|
|
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
|
|
|
|
|
}
|
|
|
|
|
// if we have no reserve and we're above/at limits, make background task a little more eager.
|
|
|
|
|
if (!_shutdown && totals.total_size_on_disk >= disk_usage_threshold) {
|
|
|
|
|
_timer.cancel();
|
|
|
|
|
_timer.arm(std::chrono::milliseconds(0));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return _reserve_segments.pop_eventually().then([this] (auto s) {
|
|
|
|
|
_segments.push_back(std::move(s));
|
|
|
|
|
_segments.back()->reset_sync_time();
|
|
|
|
|
@@ -1608,14 +1639,21 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
|
|
|
|
|
_reserve_segments.pop();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto re = std::exchange(_recycled_segments, {});
|
|
|
|
|
auto i = re.begin();
|
|
|
|
|
auto e = re.end();
|
|
|
|
|
std::vector<sstring> tmp;
|
|
|
|
|
tmp.reserve(_recycled_segments.size());
|
|
|
|
|
|
|
|
|
|
_recycled_segments.consume([&](sstring s) {
|
|
|
|
|
tmp.emplace_back(std::move(s));
|
|
|
|
|
return true;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
auto i = tmp.begin();
|
|
|
|
|
auto e = tmp.end();
|
|
|
|
|
|
|
|
|
|
return parallel_for_each(i, e, [this](const sstring& filename) {
|
|
|
|
|
clogger.debug("Deleting recycled segment file {}", filename);
|
|
|
|
|
return delete_file(filename);
|
|
|
|
|
}).finally([this, re = std::move(re)] {
|
|
|
|
|
}).finally([this, tmp = std::move(tmp)] {
|
|
|
|
|
return do_pending_deletes();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
@@ -1684,7 +1722,10 @@ future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
|
|
|
|
|
return seastar::file_size(filename).then([this, filename](uint64_t size) {
|
|
|
|
|
clogger.debug("Deleting segment file {}", filename);
|
|
|
|
|
return commit_io_check(&seastar::remove_file, filename).then([this, size] {
|
|
|
|
|
totals.total_size_on_disk -= size;
|
|
|
|
|
clogger.trace("Reclaimed {} MB", size/(1024*1024));
|
|
|
|
|
totals.total_size_on_disk -= size;
|
|
|
|
|
auto p = std::exchange(_disk_deletions, {});
|
|
|
|
|
p.set_value();
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
@@ -1704,7 +1745,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
|
|
|
|
return f.finally([&] {
|
|
|
|
|
// We allow reuse of the segment if the current disk size is less than shard max.
|
|
|
|
|
auto usage = totals.total_size_on_disk;
|
|
|
|
|
if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) {
|
|
|
|
|
if (!_shutdown && cfg.reuse_segments) {
|
|
|
|
|
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
|
|
|
|
|
auto dst = this->filename(d);
|
|
|
|
|
|
|
|
|
|
@@ -1712,8 +1753,9 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
|
|
|
|
|
// must rename the file since we must ensure the
|
|
|
|
|
// data is not replayed. Changing the name will
|
|
|
|
|
// cause header ID to be invalid in the file -> ignored
|
|
|
|
|
return rename_file(filename, dst).then([this, dst] {
|
|
|
|
|
_recycled_segments.emplace_back(dst);
|
|
|
|
|
return rename_file(filename, dst).then([this, dst]() mutable {
|
|
|
|
|
auto b = _recycled_segments.push(std::move(dst));
|
|
|
|
|
assert(b); // we set this to max_size_t so...
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}).handle_exception([this, filename](auto&&) {
|
|
|
|
|
return delete_file(filename);
|
|
|
|
|
@@ -1782,8 +1824,8 @@ void db::commitlog::segment_manager::on_timer() {
|
|
|
|
|
auto cur = totals.active_size_on_disk;
|
|
|
|
|
if (max != 0 && cur >= max) {
|
|
|
|
|
_new_counter = 0;
|
|
|
|
|
clogger.debug("Used size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024));
|
|
|
|
|
flush_segments();
|
|
|
|
|
clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024));
|
|
|
|
|
flush_segments(cur - max);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return do_pending_deletes();
|
|
|
|
|
|