mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
compaction: do not unconditionally delete a new sstable in interrupted compaction
After incremental compaction, new sstables may have already replaced old
sstables at any point. Meaning that a new sstable is in-use by table and
a old sstable is already deleted when compaction itself is UNFINISHED.
Therefore, we should *NEVER* delete a new sstable unconditionally for an
interrupted compaction, or data loss could happen.
To fix it, we'll only delete new sstables that didn't replace anything
in the table, meaning they are unused.
Found the problem while auditting the code.
Fixes #4479.
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20190506134723.16639-1-raphaelsc@scylladb.com>
(cherry picked from commit ef5681486f)
This commit is contained in:
committed by
Avi Kivity
parent
5f6c5d566a
commit
69cc7d89c8
@@ -104,16 +104,6 @@ static bool belongs_to_current_node(const dht::token& t, const dht::token_range_
|
||||
return false;
|
||||
}
|
||||
|
||||
static void delete_sstables_for_interrupted_compaction(std::vector<shared_sstable>& new_sstables, sstring& ks, sstring& cf) {
|
||||
// Delete either partially or fully written sstables of a compaction that
|
||||
// was either stopped abruptly (e.g. out of disk space) or deliberately
|
||||
// (e.g. nodetool stop COMPACTION).
|
||||
for (auto& sst : new_sstables) {
|
||||
clogger.debug("Deleting sstable {} of interrupted compaction for {}.{}", sst->get_filename(), ks, cf);
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
}
|
||||
|
||||
static std::vector<shared_sstable> get_uncompacting_sstables(column_family& cf, std::vector<shared_sstable> sstables) {
|
||||
auto all_sstables = boost::copy_range<std::vector<shared_sstable>>(*cf.get_sstables_including_compacted_undeleted());
|
||||
boost::sort(all_sstables, [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
@@ -317,6 +307,9 @@ protected:
|
||||
column_family& _cf;
|
||||
schema_ptr _schema;
|
||||
std::vector<shared_sstable> _sstables;
|
||||
// Unused sstables are tracked because if compaction is interrupted we can only delete them.
|
||||
// Deleting used sstables could potentially result in data loss.
|
||||
std::vector<shared_sstable> _new_unused_sstables;
|
||||
lw_shared_ptr<sstable_set> _compacting;
|
||||
uint64_t _max_sstable_size;
|
||||
uint32_t _sstable_level;
|
||||
@@ -347,6 +340,7 @@ protected:
|
||||
|
||||
void setup_new_sstable(shared_sstable& sst) {
|
||||
_info->new_sstables.push_back(sst);
|
||||
_new_unused_sstables.push_back(sst);
|
||||
sst->get_metadata_collector().set_replay_position(_rp);
|
||||
sst->get_metadata_collector().sstable_level(_sstable_level);
|
||||
for (auto ancestor : _ancestors) {
|
||||
@@ -488,6 +482,16 @@ private:
|
||||
const schema_ptr& schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
void delete_sstables_for_interrupted_compaction() {
|
||||
// Delete either partially or fully written sstables of a compaction that
|
||||
// was either stopped abruptly (e.g. out of disk space) or deliberately
|
||||
// (e.g. nodetool stop COMPACTION).
|
||||
for (auto& sst : _new_unused_sstables) {
|
||||
clogger.debug("Deleting sstable {} of interrupted compaction for {}.{}", sst->get_filename(), _info->ks_name, _info->cf_name);
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
}
|
||||
public:
|
||||
static future<compaction_info> run(std::unique_ptr<compaction> c);
|
||||
|
||||
@@ -521,7 +525,6 @@ void compacting_sstable_writer::consume_end_of_stream() {
|
||||
class regular_compaction : public compaction {
|
||||
std::function<shared_sstable()> _creator;
|
||||
replacer_fn _replacer;
|
||||
std::vector<shared_sstable> _unreplaced_new_tables;
|
||||
std::unordered_set<shared_sstable> _compacting_for_max_purgeable_func;
|
||||
// store a clone of sstable set for column family, which needs to be alive for incremental selector.
|
||||
sstable_set _set;
|
||||
@@ -625,8 +628,6 @@ private:
|
||||
}
|
||||
|
||||
void maybe_replace_exhausted_sstables() {
|
||||
_unreplaced_new_tables.push_back(_sst);
|
||||
|
||||
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
|
||||
auto not_exhausted = [s = _schema, &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) {
|
||||
return sst->get_last_decorated_key().tri_compare(*s, dk) > 0;
|
||||
@@ -668,7 +669,7 @@ private:
|
||||
_compacting->erase(sst);
|
||||
_monitor_generator.remove_sstable(_info->tracking, sst);
|
||||
});
|
||||
_replacer(std::vector<shared_sstable>(exhausted, _sstables.end()), std::move(_unreplaced_new_tables));
|
||||
_replacer(std::vector<shared_sstable>(exhausted, _sstables.end()), std::move(_new_unused_sstables));
|
||||
_sstables.erase(exhausted, _sstables.end());
|
||||
}
|
||||
}
|
||||
@@ -677,7 +678,7 @@ private:
|
||||
if (!_sstables.empty()) {
|
||||
std::vector<shared_sstable> sstables_compacted;
|
||||
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted));
|
||||
_replacer(std::move(sstables_compacted), std::move(_unreplaced_new_tables));
|
||||
_replacer(std::move(sstables_compacted), std::move(_new_unused_sstables));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -877,7 +878,7 @@ future<compaction_info> compaction::run(std::unique_ptr<compaction> c) {
|
||||
auto r = std::move(reader);
|
||||
r.consume_in_thread(std::move(cfc), c->filter_func(), db::no_timeout);
|
||||
} catch (...) {
|
||||
delete_sstables_for_interrupted_compaction(c->_info->new_sstables, c->_info->ks_name, c->_info->cf_name);
|
||||
c->delete_sstables_for_interrupted_compaction();
|
||||
c = nullptr; // make sure writers are stopped while running in thread context
|
||||
throw;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user