commitlog: coroutinize segment_manager::allocate_segment
This commit is contained in:
@@ -1522,46 +1522,44 @@ future<> db::commitlog::segment_manager::rename_file(sstring from, sstring to) c
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment() {
|
||||
descriptor d(next_id(), cfg.fname_prefix);
|
||||
auto dst = filename(d);
|
||||
auto flags = open_flags::wo;
|
||||
if (cfg.use_o_dsync) {
|
||||
flags |= open_flags::dsync;
|
||||
}
|
||||
|
||||
if (!_recycled_segments.empty()) {
|
||||
auto src = _recycled_segments.pop();
|
||||
// Note: we have to do the rename here to ensure
|
||||
// proper descriptor id order. If we renamed in the delete call
|
||||
// that recycled the file we could potentially have
|
||||
// out-of-order files. (Sort does not help).
|
||||
clogger.debug("Using recycled segment file {} -> {}", src, dst);
|
||||
return rename_file(std::move(src), dst).then([this, d = std::move(d), dst = std::move(dst), flags] () mutable {
|
||||
return allocate_segment_ex(std::move(d), std::move(dst), flags);
|
||||
});
|
||||
}
|
||||
|
||||
if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
|
||||
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
|
||||
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
|
||||
if (!f.available()) {
|
||||
_new_counter = 0; // zero this so timer task does not duplicate the below flush
|
||||
flush_segments(0); // force memtable flush already
|
||||
for (;;) {
|
||||
descriptor d(next_id(), cfg.fname_prefix);
|
||||
auto dst = filename(d);
|
||||
auto flags = open_flags::wo;
|
||||
if (cfg.use_o_dsync) {
|
||||
flags |= open_flags::dsync;
|
||||
}
|
||||
return f.handle_exception([this](auto ep) {
|
||||
|
||||
if (!_recycled_segments.empty()) {
|
||||
auto src = _recycled_segments.pop();
|
||||
// Note: we have to do the rename here to ensure
|
||||
// proper descriptor id order. If we renamed in the delete call
|
||||
// that recycled the file we could potentially have
|
||||
// out-of-order files. (Sort does not help).
|
||||
clogger.debug("Using recycled segment file {} -> {}", src, dst);
|
||||
co_await rename_file(src, dst);
|
||||
co_return co_await allocate_segment_ex(std::move(d), std::move(dst), flags);
|
||||
}
|
||||
|
||||
if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
|
||||
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
|
||||
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
|
||||
if (!f.available()) {
|
||||
_new_counter = 0; // zero this so timer task does not duplicate the below flush
|
||||
flush_segments(0); // force memtable flush already
|
||||
}
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
co_await std::move(f);
|
||||
} catch (shutdown_marker&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
clogger.warn("Exception while waiting for segments {}. Will retry allocation...", std::current_exception());
|
||||
}
|
||||
clogger.warn("Exception while waiting for segments {}. Will retry allocation...", ep);
|
||||
}).then([this] {
|
||||
return allocate_segment();
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
return allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
|
||||
co_return co_await allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
|
||||
}
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::new_segment() {
|
||||
|
||||
Reference in New Issue
Block a user