db: delete compacted sstables atomically

If sstables A, B are compacted, A and B must be deleted atomically.
Otherwise, if A has data that is covered by a tombstone in B, and that
tombstone is deleted, and if B is deleted while A is not, then the data
in A is resurrected.

Fixes #1181.
This commit is contained in:
Avi Kivity
2016-04-13 17:24:27 +03:00
parent 3798d04ae8
commit a843aea547
3 changed files with 77 additions and 23 deletions

View File

@@ -45,7 +45,9 @@
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/function_output_iterator.hpp>
#include <boost/range/algorithm/heap_algorithm.hpp>
#include <boost/range/algorithm/remove_if.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/adaptor/map.hpp>
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
#include "core/do_with.hh"
@@ -770,6 +772,8 @@ column_family::stop() {
return _flush_queue->close().then([this] {
return _streaming_flush_gate.close();
});
}).then([this] {
return _sstable_deletion_gate.close();
});
}
@@ -832,6 +836,18 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
});
}
void column_family::rebuild_statistics() {
// zeroing live_disk_space_used and live_sstable_count because the
// sstable list was re-created
_stats.live_disk_space_used = 0;
_stats.live_sstable_count = 0;
for (auto&& tab : boost::range::join(_sstables_compacted_but_not_deleted,
*_sstables | boost::adaptors::map_values)) {
update_stats_for_new_sstable(tab->data_size());
}
}
void
column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
@@ -840,37 +856,49 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
// later), and we add the new tables generated by the compaction.
// We create a new list rather than modifying it in-place, so that
// on-going reads can continue to use the old list.
//
// We only remove old sstables after they are successfully deleted,
// to avoid a new compaction from ignoring data in the old sstables
// if the deletion fails (note deletion of shared sstables can take
// unbounded time, because all shards must agree on the deletion).
auto current_sstables = _sstables;
auto new_sstable_list = make_lw_shared<sstable_list>();
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
// zeroing live_disk_space_used and live_sstable_count because the
// sstable list is re-created below.
_stats.live_disk_space_used = 0;
_stats.live_sstable_count = 0;
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
for (const auto& oldtab : *current_sstables) {
// First, add the new sstables.
for (auto&& tab : boost::range::join(new_sstables, *current_sstables | boost::adaptors::map_values)) {
// Checks if oldtab is a sstable not being compacted.
if (!s.count(oldtab.second)) {
update_stats_for_new_sstable(oldtab.second->data_size());
new_sstable_list->emplace(oldtab.first, oldtab.second);
if (!s.count(tab)) {
new_sstable_list->emplace(tab->generation(), tab);
} else {
new_compacted_but_not_deleted.push_back(tab);
}
}
for (const auto& newtab : new_sstables) {
// FIXME: rename the new sstable(s). Verify a rename doesn't cause
// problems for the sstable object.
update_stats_for_new_sstable(newtab->data_size());
new_sstable_list->emplace(newtab->generation(), newtab);
}
for (const auto& oldtab : sstables_to_remove) {
oldtab->mark_for_deletion();
}
_sstables = std::move(new_sstable_list);
_sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted);
rebuild_statistics();
// Second, delete the old sstables. This is done in the background, so we can
// consider this compaction completed.
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
auto current_sstables = _sstables;
auto new_sstable_list = make_lw_shared<sstable_list>();
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
return s.count(sst);
});
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
rebuild_statistics();
});
});
}
future<>
@@ -894,7 +922,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});
}
@@ -1025,6 +1053,24 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
return _sstables;
}
// Gets the list of all sstables in the column family, including ones that are
// not used for active queries because they have already been compacted, but are
// waiting for delete_atomically() to return.
//
// As long as we haven't deleted them, compaction needs to ensure it doesn't
// garbage-collect a tombstone that covers data in an sstable that may not be
// successfully deleted.
lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_undeleted() {
if (_sstables_compacted_but_not_deleted.empty()) {
return _sstables;
}
auto ret = make_lw_shared(*_sstables);
for (auto&& s : _sstables_compacted_but_not_deleted) {
ret->insert(std::make_pair(s->generation(), s));
}
return ret;
}
inline bool column_family::manifest_json_filter(const sstring& fname) {
using namespace boost::filesystem;
@@ -2524,7 +2570,7 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return s->mark_for_deletion_on_disk();
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.

View File

@@ -267,6 +267,12 @@ private:
// generation -> sstable. Ordered by key so we can easily get the most recent.
lw_shared_ptr<sstable_list> _sstables;
// sstables that have been compacted (so don't look up in query) but
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// Control background fibers waiting for sstables to be deleted
seastar::gate _sstable_deletion_gate;
// There are situations in which we need to stop writing sstables. Flushers will take
// the read lock, and the ones that wish to stop that process will take the write lock.
rwlock _sstables_lock;
@@ -319,6 +325,7 @@ private:
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove);
void rebuild_statistics();
private:
// Creates a mutation reader which covers sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
@@ -485,6 +492,7 @@ public:
}
lw_shared_ptr<sstable_list> get_sstables();
lw_shared_ptr<sstable_list> get_sstables_including_compacted_undeleted();
size_t sstables_count();
int64_t get_unleveled_sstables() const;

View File

@@ -143,7 +143,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
db::replay_position rp;
auto all_sstables = cf.get_sstables();
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
return x->generation() < y->generation();
});