From 4ed95b7020bc57eec9685009e67ccd83af5d41e9 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 7 Sep 2015 15:29:05 +0200 Subject: [PATCH 1/3] Commitlog: Add sync_all_segments() For #293 - allows explicit flush to disk (not close!) of all active segments --- db/commitlog/commitlog.cc | 18 ++++++++++++++---- db/commitlog/commitlog.hh | 8 ++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 1abe4cf3bb..32bd68946f 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -184,6 +184,7 @@ public: future new_segment(); future active_segment(); future<> clear(); + future<> sync_all_segments(); scollectd::registrations create_counters(); @@ -824,17 +825,22 @@ void db::commitlog::segment_manager::discard_unused_segments() { } } +future<> db::commitlog::segment_manager::sync_all_segments() { + return parallel_for_each(_segments, [this](sseg_ptr s) { + return s->sync().then([](sseg_ptr) {}); + }); +} + /* * Sync all segments, then clear them out. To ensure all ops are done. * (Assumes you have barriered adding ops!) + * Only use from tests. */ future<> db::commitlog::segment_manager::clear() { logger.debug("Clearing all segments"); flush_segments(true); - return do_until([this]() {return _segments.empty();}, [this]() { - auto s = _segments.front(); - _segments.erase(_segments.begin()); - return s->sync().then([](sseg_ptr) {}); + return sync_all_segments().then([this] { + _segments.clear(); }); } /** @@ -964,6 +970,10 @@ void db::commitlog::discard_completed_segments(const cf_id_type& id, _segment_manager->discard_completed_segments(id, pos); } +future<> db::commitlog::sync_all_segments() { + return _segment_manager->sync_all_segments(); +} + size_t db::commitlog::max_record_size() const { return _segment_manager->max_mutation_size - segment::entry_overhead_size; } diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 1fad3811e6..ed3350e29f 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -222,6 +222,14 @@ public: const config& active_config() const; + /** + * Issues disk sync on all (allocating) segments. I.e. ensures that + * all data written up until this call is indeed on disk. + * _However_, if you issue new "add" ops while this is executing, + * those can/will be missed. + */ + future<> sync_all_segments(); + future> list_existing_descriptors() const; future> list_existing_descriptors(const sstring& dir) const; From 256c0550bfe747113cce05828c83c1621a51c57a Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 7 Sep 2015 15:40:56 +0200 Subject: [PATCH 2/3] Commitlog: Only delete segments on disk if they are marked clean For #293 - i.e. allow more or less coherent shutdown/destruction of the commitlog while retaining disk data. (tests still clear stuff explicitly). --- db/commitlog/commitlog.cc | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 32bd68946f..8023df590d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -279,13 +279,16 @@ public: ++_segment_manager->totals.segments_created; } ~segment() { - logger.debug("Segment {} is no longer active and will be deleted now", *this); - ++_segment_manager->totals.segments_destroyed; - _segment_manager->totals.total_size_on_disk -= size_on_disk(); - _segment_manager->totals.total_size -= - (size_on_disk() + _buffer.size()); - ::unlink( - (_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str()); + if (is_clean()) { + logger.debug("Segment {} is no longer active and will be deleted now", *this); + ++_segment_manager->totals.segments_destroyed; + _segment_manager->totals.total_size_on_disk -= size_on_disk(); + _segment_manager->totals.total_size -= (size_on_disk() + _buffer.size()); + ::unlink( + (_segment_manager->cfg.commit_log_location + "/" + _desc.filename()).c_str()); + } else { + logger.warn("Segment {} is dirty and is left on disk.", *this); + } } bool must_sync() { @@ -840,6 +843,9 @@ future<> db::commitlog::segment_manager::clear() { logger.debug("Clearing all segments"); flush_segments(true); return sync_all_segments().then([this] { + for (auto& s : _segments) { + s->mark_clean(); + } _segments.clear(); }); } From 6b818450415d69b5247b3bbc2dcc94dfa9266ce9 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 7 Sep 2015 18:00:02 +0200 Subject: [PATCH 3/3] Database: Do a commitlog::sync_all first on stop. Refs #293 IFF one desires to _not_ shutdown stuff cleanly, still running this first in database::stop will at least ensure that mutations already in CL transit will end up on disk and be replayable --- database.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/database.cc b/database.cc index f39f3bb32d..a86b018835 100644 --- a/database.cc +++ b/database.cc @@ -1417,6 +1417,12 @@ operator<<(std::ostream& os, const atomic_cell& ac) { future<> database::stop() { return _compaction_manager.stop().then([this] { + // try to ensure that CL has done disk flushing + if (_commitlog != nullptr) { + return _commitlog->sync_all_segments(); + } + return make_ready_future<>(); + }).then([this] { return parallel_for_each(_column_families, [this] (auto& val_pair) { return val_pair.second->stop(); });