diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index c3eb1daf24..0b7af31772 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -198,6 +198,7 @@ public: request_controller_type _request_controller; stdx::optional>> _segment_allocating; + std::unordered_map _files_to_delete; void account_memory_usage(size_t size) { _request_controller.consume(size); @@ -276,6 +277,11 @@ public: future<> orphan_all(); + void add_file_to_delete(sstring, descriptor); + future<> do_pending_deletes(); + + future<> delete_segments(std::vector); + void discard_unused_segments(); void discard_completed_segments(const cf_id_type&); void discard_completed_segments(const cf_id_type&, const rp_set&); @@ -443,16 +449,11 @@ public: } ~segment() { if (is_clean()) { - clogger.debug("Segment {} is no longer active and will be deleted now", *this); + clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.total_size_on_disk -= size_on_disk(); _segment_manager->totals.total_size -= (size_on_disk() + _buffer.size()); - try { - commit_io_check([] (const char* fname) { ::unlink(fname); }, - _file_name.c_str()); - } catch (...) { - clogger.error("Could not delete segment {}: {}", *this, std::current_exception()); - } + _segment_manager->add_file_to_delete(_file_name, _desc); } else { clogger.warn("Segment {} is dirty and is left on disk.", *this); } @@ -1292,15 +1293,23 @@ void db::commitlog::segment_manager::discard_unused_segments() { if (i != _segments.end()) { _segments.erase(i, _segments.end()); } + + // launch in background, but guard with gate so this deletion is + // sure to finish in shutdown, because at least through this path, + // segments on deletion queue could be non-empty, and we don't want + // those accidentally left around for replay. + if (!_shutdown) { + with_gate(_gate, [this] { + return do_pending_deletes(); + }); + } } -// FIXME: pop() will call unlink -> sleeping in reactor thread. -// Not urgent since mostly called during shutdown, but have to fix. future<> db::commitlog::segment_manager::clear_reserve_segments() { while (!_reserve_segments.empty()) { _reserve_segments.pop(); } - return make_ready_future<>(); + return do_pending_deletes(); } future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) { @@ -1345,6 +1354,32 @@ future<> db::commitlog::segment_manager::shutdown() { return _shutdown_promise->get_shared_future(); } +void db::commitlog::segment_manager::add_file_to_delete(sstring filename, descriptor d) { + assert(!_files_to_delete.count(filename)); + _files_to_delete.emplace(std::move(filename), std::move(d)); +} + +future<> db::commitlog::segment_manager::delete_segments(std::vector files) { + auto i = files.begin(); + auto e = files.end(); + + return parallel_for_each(i, e, [this](auto& filename) { + // doing this intentionally hung on a future to + // allow adding extension processing here once they exist. + auto f = make_ready_future(); + return f.finally([&] { + clogger.debug("Deleting segment file {}", filename); + return commit_io_check(&seastar::remove_file, filename); + }).handle_exception([&filename](auto ep) { + clogger.error("Could not delete segment {}: {}", filename, ep); + }); + }).finally([files = std::move(files)] {}); +} + +future<> db::commitlog::segment_manager::do_pending_deletes() { + return delete_segments(boost::copy_range>(std::exchange(_files_to_delete, {}) | boost::adaptors::map_keys)); +} + future<> db::commitlog::segment_manager::orphan_all() { _segments.clear(); return clear_reserve_segments(); @@ -1393,7 +1428,7 @@ void db::commitlog::segment_manager::on_timer() { flush_segments(); } } - return make_ready_future<>(); + return do_pending_deletes(); }); arm(); } @@ -1922,10 +1957,14 @@ future> db::commitlog::list_existing_segments(const sstring }); } -std::vector db::commitlog::get_segments_to_replay() { +std::vector db::commitlog::get_segments_to_replay() const { return std::move(_segment_manager->_segments_to_replay); } +future<> db::commitlog::delete_segments(std::vector files) const { + return _segment_manager->delete_segments(std::move(files)); +} + db::rp_handle::rp_handle() noexcept {} diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 29741d501d..1eeac4f6c8 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -268,7 +268,13 @@ public: * * The list will be empty when called for the second time. */ - std::vector get_segments_to_replay(); + std::vector get_segments_to_replay() const; + + /** + * Delete aforementioned segments, and possible metadata + * associated with them + */ + future<> delete_segments(std::vector) const; uint64_t get_total_size() const; uint64_t get_completed_tasks() const; diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 9ecb687e15..cfa62d9852 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -733,8 +733,9 @@ bool manager::end_point_hints_manager::sender::send_one_file(const sstring& fnam } // If we got here we are done with the current segment and we can remove it. - with_shared(_file_update_mutex, [&fname] { - return io_check(remove_file, fname); + with_shared(_file_update_mutex, [&fname, this] { + auto p = _ep_manager.get_or_load().get0(); + return p->delete_segments({ fname }); }).get(); // clear the replay position - we are going to send the next segment... diff --git a/main.cc b/main.cc index 8fa6f5fb84..0d49201495 100644 --- a/main.cc +++ b/main.cc @@ -610,9 +610,7 @@ int main(int ac, char** av) { return db.flush_all_memtables(); }).get(); supervisor::notify("replaying commit log - removing old commitlog segments"); - for (auto& path : paths) { - ::unlink(path.c_str()); - } + cl->delete_segments(std::move(paths)); } } // If the same sstable is shared by several shards, it cannot be