db::commitlog: Do segment delete async + force replay delete go via CL
Refs #2858 Push segement files to be deleted to a pending list, and process at intervals or flush-requests (or shutdown). Note that we do _not_ indescrimenately do deletes in non-anchored tasks, because we need to guarantee that finshed segments are fully deleted and gone on CL shutdown, not to be mistaken for replayables. Also make sure we delete segments replayed via commitlog call, so IFF we add metadata processing for CL, we can clear it out.
This commit is contained in:
@@ -198,6 +198,7 @@ public:
|
||||
request_controller_type _request_controller;
|
||||
|
||||
stdx::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
|
||||
std::unordered_map<sstring, descriptor> _files_to_delete;
|
||||
|
||||
void account_memory_usage(size_t size) {
|
||||
_request_controller.consume(size);
|
||||
@@ -276,6 +277,11 @@ public:
|
||||
|
||||
future<> orphan_all();
|
||||
|
||||
void add_file_to_delete(sstring, descriptor);
|
||||
future<> do_pending_deletes();
|
||||
|
||||
future<> delete_segments(std::vector<sstring>);
|
||||
|
||||
void discard_unused_segments();
|
||||
void discard_completed_segments(const cf_id_type&);
|
||||
void discard_completed_segments(const cf_id_type&, const rp_set&);
|
||||
@@ -443,16 +449,11 @@ public:
|
||||
}
|
||||
~segment() {
|
||||
if (is_clean()) {
|
||||
clogger.debug("Segment {} is no longer active and will be deleted now", *this);
|
||||
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
|
||||
++_segment_manager->totals.segments_destroyed;
|
||||
_segment_manager->totals.total_size_on_disk -= size_on_disk();
|
||||
_segment_manager->totals.total_size -= (size_on_disk() + _buffer.size());
|
||||
try {
|
||||
commit_io_check([] (const char* fname) { ::unlink(fname); },
|
||||
_file_name.c_str());
|
||||
} catch (...) {
|
||||
clogger.error("Could not delete segment {}: {}", *this, std::current_exception());
|
||||
}
|
||||
_segment_manager->add_file_to_delete(_file_name, _desc);
|
||||
} else {
|
||||
clogger.warn("Segment {} is dirty and is left on disk.", *this);
|
||||
}
|
||||
@@ -1292,15 +1293,23 @@ void db::commitlog::segment_manager::discard_unused_segments() {
|
||||
if (i != _segments.end()) {
|
||||
_segments.erase(i, _segments.end());
|
||||
}
|
||||
|
||||
// launch in background, but guard with gate so this deletion is
|
||||
// sure to finish in shutdown, because at least through this path,
|
||||
// segments on deletion queue could be non-empty, and we don't want
|
||||
// those accidentally left around for replay.
|
||||
if (!_shutdown) {
|
||||
with_gate(_gate, [this] {
|
||||
return do_pending_deletes();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: pop() will call unlink -> sleeping in reactor thread.
|
||||
// Not urgent since mostly called during shutdown, but have to fix.
|
||||
future<> db::commitlog::segment_manager::clear_reserve_segments() {
|
||||
while (!_reserve_segments.empty()) {
|
||||
_reserve_segments.pop();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return do_pending_deletes();
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
|
||||
@@ -1345,6 +1354,32 @@ future<> db::commitlog::segment_manager::shutdown() {
|
||||
return _shutdown_promise->get_shared_future();
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::add_file_to_delete(sstring filename, descriptor d) {
|
||||
assert(!_files_to_delete.count(filename));
|
||||
_files_to_delete.emplace(std::move(filename), std::move(d));
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> files) {
|
||||
auto i = files.begin();
|
||||
auto e = files.end();
|
||||
|
||||
return parallel_for_each(i, e, [this](auto& filename) {
|
||||
// doing this intentionally hung on a future to
|
||||
// allow adding extension processing here once they exist.
|
||||
auto f = make_ready_future();
|
||||
return f.finally([&] {
|
||||
clogger.debug("Deleting segment file {}", filename);
|
||||
return commit_io_check(&seastar::remove_file, filename);
|
||||
}).handle_exception([&filename](auto ep) {
|
||||
clogger.error("Could not delete segment {}: {}", filename, ep);
|
||||
});
|
||||
}).finally([files = std::move(files)] {});
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
return delete_segments(boost::copy_range<std::vector<sstring>>(std::exchange(_files_to_delete, {}) | boost::adaptors::map_keys));
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::orphan_all() {
|
||||
_segments.clear();
|
||||
return clear_reserve_segments();
|
||||
@@ -1393,7 +1428,7 @@ void db::commitlog::segment_manager::on_timer() {
|
||||
flush_segments();
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return do_pending_deletes();
|
||||
});
|
||||
arm();
|
||||
}
|
||||
@@ -1922,10 +1957,14 @@ future<std::vector<sstring>> db::commitlog::list_existing_segments(const sstring
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<sstring> db::commitlog::get_segments_to_replay() {
|
||||
std::vector<sstring> db::commitlog::get_segments_to_replay() const {
|
||||
return std::move(_segment_manager->_segments_to_replay);
|
||||
}
|
||||
|
||||
future<> db::commitlog::delete_segments(std::vector<sstring> files) const {
|
||||
return _segment_manager->delete_segments(std::move(files));
|
||||
}
|
||||
|
||||
db::rp_handle::rp_handle() noexcept
|
||||
{}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user