From 2bc98aebafb481dfef39684d2e2a6f917e5bd448 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 13 Mar 2018 12:22:36 +0000 Subject: [PATCH] db::commitlog: Do segment delete async + force replay delete go via CL Refs #2858 Push segement files to be deleted to a pending list, and process at intervals or flush-requests (or shutdown). Note that we do _not_ indescrimenately do deletes in non-anchored tasks, because we need to guarantee that finshed segments are fully deleted and gone on CL shutdown, not to be mistaken for replayables. Also make sure we delete segments replayed via commitlog call, so IFF we add metadata processing for CL, we can clear it out. --- db/commitlog/commitlog.cc | 63 +++++++++++++++++++++++++++++++-------- db/commitlog/commitlog.hh | 8 ++++- db/hints/manager.cc | 5 ++-- main.cc | 4 +-- 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index c3eb1daf24..0b7af31772 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -198,6 +198,7 @@ public: request_controller_type _request_controller; stdx::optional>> _segment_allocating; + std::unordered_map _files_to_delete; void account_memory_usage(size_t size) { _request_controller.consume(size); @@ -276,6 +277,11 @@ public: future<> orphan_all(); + void add_file_to_delete(sstring, descriptor); + future<> do_pending_deletes(); + + future<> delete_segments(std::vector); + void discard_unused_segments(); void discard_completed_segments(const cf_id_type&); void discard_completed_segments(const cf_id_type&, const rp_set&); @@ -443,16 +449,11 @@ public: } ~segment() { if (is_clean()) { - clogger.debug("Segment {} is no longer active and will be deleted now", *this); + clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.total_size_on_disk -= size_on_disk(); _segment_manager->totals.total_size -= (size_on_disk() + _buffer.size()); - try { - commit_io_check([] (const char* fname) { ::unlink(fname); }, - _file_name.c_str()); - } catch (...) { - clogger.error("Could not delete segment {}: {}", *this, std::current_exception()); - } + _segment_manager->add_file_to_delete(_file_name, _desc); } else { clogger.warn("Segment {} is dirty and is left on disk.", *this); } @@ -1292,15 +1293,23 @@ void db::commitlog::segment_manager::discard_unused_segments() { if (i != _segments.end()) { _segments.erase(i, _segments.end()); } + + // launch in background, but guard with gate so this deletion is + // sure to finish in shutdown, because at least through this path, + // segments on deletion queue could be non-empty, and we don't want + // those accidentally left around for replay. + if (!_shutdown) { + with_gate(_gate, [this] { + return do_pending_deletes(); + }); + } } -// FIXME: pop() will call unlink -> sleeping in reactor thread. -// Not urgent since mostly called during shutdown, but have to fix. future<> db::commitlog::segment_manager::clear_reserve_segments() { while (!_reserve_segments.empty()) { _reserve_segments.pop(); } - return make_ready_future<>(); + return do_pending_deletes(); } future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) { @@ -1345,6 +1354,32 @@ future<> db::commitlog::segment_manager::shutdown() { return _shutdown_promise->get_shared_future(); } +void db::commitlog::segment_manager::add_file_to_delete(sstring filename, descriptor d) { + assert(!_files_to_delete.count(filename)); + _files_to_delete.emplace(std::move(filename), std::move(d)); +} + +future<> db::commitlog::segment_manager::delete_segments(std::vector files) { + auto i = files.begin(); + auto e = files.end(); + + return parallel_for_each(i, e, [this](auto& filename) { + // doing this intentionally hung on a future to + // allow adding extension processing here once they exist. + auto f = make_ready_future(); + return f.finally([&] { + clogger.debug("Deleting segment file {}", filename); + return commit_io_check(&seastar::remove_file, filename); + }).handle_exception([&filename](auto ep) { + clogger.error("Could not delete segment {}: {}", filename, ep); + }); + }).finally([files = std::move(files)] {}); +} + +future<> db::commitlog::segment_manager::do_pending_deletes() { + return delete_segments(boost::copy_range>(std::exchange(_files_to_delete, {}) | boost::adaptors::map_keys)); +} + future<> db::commitlog::segment_manager::orphan_all() { _segments.clear(); return clear_reserve_segments(); @@ -1393,7 +1428,7 @@ void db::commitlog::segment_manager::on_timer() { flush_segments(); } } - return make_ready_future<>(); + return do_pending_deletes(); }); arm(); } @@ -1922,10 +1957,14 @@ future> db::commitlog::list_existing_segments(const sstring }); } -std::vector db::commitlog::get_segments_to_replay() { +std::vector db::commitlog::get_segments_to_replay() const { return std::move(_segment_manager->_segments_to_replay); } +future<> db::commitlog::delete_segments(std::vector files) const { + return _segment_manager->delete_segments(std::move(files)); +} + db::rp_handle::rp_handle() noexcept {} diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 29741d501d..1eeac4f6c8 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -268,7 +268,13 @@ public: * * The list will be empty when called for the second time. */ - std::vector get_segments_to_replay(); + std::vector get_segments_to_replay() const; + + /** + * Delete aforementioned segments, and possible metadata + * associated with them + */ + future<> delete_segments(std::vector) const; uint64_t get_total_size() const; uint64_t get_completed_tasks() const; diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 9ecb687e15..cfa62d9852 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -733,8 +733,9 @@ bool manager::end_point_hints_manager::sender::send_one_file(const sstring& fnam } // If we got here we are done with the current segment and we can remove it. - with_shared(_file_update_mutex, [&fname] { - return io_check(remove_file, fname); + with_shared(_file_update_mutex, [&fname, this] { + auto p = _ep_manager.get_or_load().get0(); + return p->delete_segments({ fname }); }).get(); // clear the replay position - we are going to send the next segment... diff --git a/main.cc b/main.cc index 8fa6f5fb84..0d49201495 100644 --- a/main.cc +++ b/main.cc @@ -610,9 +610,7 @@ int main(int ac, char** av) { return db.flush_all_memtables(); }).get(); supervisor::notify("replaying commit log - removing old commitlog segments"); - for (auto& path : paths) { - ::unlink(path.c_str()); - } + cl->delete_segments(std::move(paths)); } } // If the same sstable is shared by several shards, it cannot be