From 69cc7d89c806c48238edc616c1906b779835b1bc Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 6 May 2019 10:47:23 -0300 Subject: [PATCH] compaction: do not unconditionally delete a new sstable in interrupted compaction After incremental compaction, new sstables may have already replaced old sstables at any point. Meaning that a new sstable is in-use by table and a old sstable is already deleted when compaction itself is UNFINISHED. Therefore, we should *NEVER* delete a new sstable unconditionally for an interrupted compaction, or data loss could happen. To fix it, we'll only delete new sstables that didn't replace anything in the table, meaning they are unused. Found the problem while auditting the code. Fixes #4479. Signed-off-by: Raphael S. Carvalho Message-Id: <20190506134723.16639-1-raphaelsc@scylladb.com> (cherry picked from commit ef5681486f75360af7c8ce573956e841b77e3314) --- sstables/compaction.cc | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 61a3003e8e..c2bce1ca84 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -104,16 +104,6 @@ static bool belongs_to_current_node(const dht::token& t, const dht::token_range_ return false; } -static void delete_sstables_for_interrupted_compaction(std::vector& new_sstables, sstring& ks, sstring& cf) { - // Delete either partially or fully written sstables of a compaction that - // was either stopped abruptly (e.g. out of disk space) or deliberately - // (e.g. nodetool stop COMPACTION). - for (auto& sst : new_sstables) { - clogger.debug("Deleting sstable {} of interrupted compaction for {}.{}", sst->get_filename(), ks, cf); - sst->mark_for_deletion(); - } -} - static std::vector get_uncompacting_sstables(column_family& cf, std::vector sstables) { auto all_sstables = boost::copy_range>(*cf.get_sstables_including_compacted_undeleted()); boost::sort(all_sstables, [] (const shared_sstable& x, const shared_sstable& y) { @@ -317,6 +307,9 @@ protected: column_family& _cf; schema_ptr _schema; std::vector _sstables; + // Unused sstables are tracked because if compaction is interrupted we can only delete them. + // Deleting used sstables could potentially result in data loss. + std::vector _new_unused_sstables; lw_shared_ptr _compacting; uint64_t _max_sstable_size; uint32_t _sstable_level; @@ -347,6 +340,7 @@ protected: void setup_new_sstable(shared_sstable& sst) { _info->new_sstables.push_back(sst); + _new_unused_sstables.push_back(sst); sst->get_metadata_collector().set_replay_position(_rp); sst->get_metadata_collector().sstable_level(_sstable_level); for (auto ancestor : _ancestors) { @@ -488,6 +482,16 @@ private: const schema_ptr& schema() const { return _schema; } + + void delete_sstables_for_interrupted_compaction() { + // Delete either partially or fully written sstables of a compaction that + // was either stopped abruptly (e.g. out of disk space) or deliberately + // (e.g. nodetool stop COMPACTION). + for (auto& sst : _new_unused_sstables) { + clogger.debug("Deleting sstable {} of interrupted compaction for {}.{}", sst->get_filename(), _info->ks_name, _info->cf_name); + sst->mark_for_deletion(); + } + } public: static future run(std::unique_ptr c); @@ -521,7 +525,6 @@ void compacting_sstable_writer::consume_end_of_stream() { class regular_compaction : public compaction { std::function _creator; replacer_fn _replacer; - std::vector _unreplaced_new_tables; std::unordered_set _compacting_for_max_purgeable_func; // store a clone of sstable set for column family, which needs to be alive for incremental selector. sstable_set _set; @@ -625,8 +628,6 @@ private: } void maybe_replace_exhausted_sstables() { - _unreplaced_new_tables.push_back(_sst); - // Replace exhausted sstable(s), if any, by new one(s) in the column family. auto not_exhausted = [s = _schema, &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) { return sst->get_last_decorated_key().tri_compare(*s, dk) > 0; @@ -668,7 +669,7 @@ private: _compacting->erase(sst); _monitor_generator.remove_sstable(_info->tracking, sst); }); - _replacer(std::vector(exhausted, _sstables.end()), std::move(_unreplaced_new_tables)); + _replacer(std::vector(exhausted, _sstables.end()), std::move(_new_unused_sstables)); _sstables.erase(exhausted, _sstables.end()); } } @@ -677,7 +678,7 @@ private: if (!_sstables.empty()) { std::vector sstables_compacted; std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted)); - _replacer(std::move(sstables_compacted), std::move(_unreplaced_new_tables)); + _replacer(std::move(sstables_compacted), std::move(_new_unused_sstables)); } } @@ -877,7 +878,7 @@ future compaction::run(std::unique_ptr c) { auto r = std::move(reader); r.consume_in_thread(std::move(cfc), c->filter_func(), db::no_timeout); } catch (...) { - delete_sstables_for_interrupted_compaction(c->_info->new_sstables, c->_info->ks_name, c->_info->cf_name); + c->delete_sstables_for_interrupted_compaction(); c = nullptr; // make sure writers are stopped while running in thread context throw; }