db: delete compacted sstables atomically
If sstables A, B are compacted, A and B must be deleted atomically. Otherwise, if A has data that is covered by a tombstone in B, and that tombstone is deleted, and if B is deleted while A is not, then the data in A is resurrected. Fixes #1181.
This commit is contained in:
90
database.cc
90
database.cc
@@ -45,7 +45,9 @@
|
||||
#include <boost/algorithm/cxx11/all_of.hpp>
|
||||
#include <boost/function_output_iterator.hpp>
|
||||
#include <boost/range/algorithm/heap_algorithm.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation_partition_applier.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -770,6 +772,8 @@ column_family::stop() {
|
||||
return _flush_queue->close().then([this] {
|
||||
return _streaming_flush_gate.close();
|
||||
});
|
||||
}).then([this] {
|
||||
return _sstable_deletion_gate.close();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -832,6 +836,18 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::rebuild_statistics() {
|
||||
// zeroing live_disk_space_used and live_sstable_count because the
|
||||
// sstable list was re-created
|
||||
_stats.live_disk_space_used = 0;
|
||||
_stats.live_sstable_count = 0;
|
||||
|
||||
for (auto&& tab : boost::range::join(_sstables_compacted_but_not_deleted,
|
||||
*_sstables | boost::adaptors::map_values)) {
|
||||
update_stats_for_new_sstable(tab->data_size());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
@@ -840,37 +856,49 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// later), and we add the new tables generated by the compaction.
|
||||
// We create a new list rather than modifying it in-place, so that
|
||||
// on-going reads can continue to use the old list.
|
||||
//
|
||||
// We only remove old sstables after they are successfully deleted,
|
||||
// to avoid a new compaction from ignoring data in the old sstables
|
||||
// if the deletion fails (note deletion of shared sstables can take
|
||||
// unbounded time, because all shards must agree on the deletion).
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
||||
|
||||
// zeroing live_disk_space_used and live_sstable_count because the
|
||||
// sstable list is re-created below.
|
||||
_stats.live_disk_space_used = 0;
|
||||
_stats.live_sstable_count = 0;
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
|
||||
for (const auto& oldtab : *current_sstables) {
|
||||
// First, add the new sstables.
|
||||
for (auto&& tab : boost::range::join(new_sstables, *current_sstables | boost::adaptors::map_values)) {
|
||||
// Checks if oldtab is a sstable not being compacted.
|
||||
if (!s.count(oldtab.second)) {
|
||||
update_stats_for_new_sstable(oldtab.second->data_size());
|
||||
new_sstable_list->emplace(oldtab.first, oldtab.second);
|
||||
if (!s.count(tab)) {
|
||||
new_sstable_list->emplace(tab->generation(), tab);
|
||||
} else {
|
||||
new_compacted_but_not_deleted.push_back(tab);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& newtab : new_sstables) {
|
||||
// FIXME: rename the new sstable(s). Verify a rename doesn't cause
|
||||
// problems for the sstable object.
|
||||
update_stats_for_new_sstable(newtab->data_size());
|
||||
new_sstable_list->emplace(newtab->generation(), newtab);
|
||||
}
|
||||
|
||||
for (const auto& oldtab : sstables_to_remove) {
|
||||
oldtab->mark_for_deletion();
|
||||
}
|
||||
|
||||
_sstables = std::move(new_sstable_list);
|
||||
_sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted);
|
||||
|
||||
rebuild_statistics();
|
||||
|
||||
// Second, delete the old sstables. This is done in the background, so we can
|
||||
// consider this compaction completed.
|
||||
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
|
||||
return s.count(sst);
|
||||
});
|
||||
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
|
||||
rebuild_statistics();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -894,7 +922,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
|
||||
};
|
||||
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
|
||||
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
|
||||
this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1025,6 +1053,24 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
|
||||
return _sstables;
|
||||
}
|
||||
|
||||
// Gets the list of all sstables in the column family, including ones that are
|
||||
// not used for active queries because they have already been compacted, but are
|
||||
// waiting for delete_atomically() to return.
|
||||
//
|
||||
// As long as we haven't deleted them, compaction needs to ensure it doesn't
|
||||
// garbage-collect a tombstone that covers data in an sstable that may not be
|
||||
// successfully deleted.
|
||||
lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_undeleted() {
|
||||
if (_sstables_compacted_but_not_deleted.empty()) {
|
||||
return _sstables;
|
||||
}
|
||||
auto ret = make_lw_shared(*_sstables);
|
||||
for (auto&& s : _sstables_compacted_but_not_deleted) {
|
||||
ret->insert(std::make_pair(s->generation(), s));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline bool column_family::manifest_json_filter(const sstring& fname) {
|
||||
using namespace boost::filesystem;
|
||||
|
||||
@@ -2524,7 +2570,7 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
_cache.clear();
|
||||
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return s->mark_for_deletion_on_disk();
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
|
||||
@@ -267,6 +267,12 @@ private:
|
||||
|
||||
// generation -> sstable. Ordered by key so we can easily get the most recent.
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
// sstables that have been compacted (so don't look up in query) but
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
// that may delete data in these sstables:
|
||||
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
seastar::gate _sstable_deletion_gate;
|
||||
// There are situations in which we need to stop writing sstables. Flushers will take
|
||||
// the read lock, and the ones that wish to stop that process will take the write lock.
|
||||
rwlock _sstables_lock;
|
||||
@@ -319,6 +325,7 @@ private:
|
||||
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove);
|
||||
void rebuild_statistics();
|
||||
private:
|
||||
// Creates a mutation reader which covers sstables.
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
@@ -485,6 +492,7 @@ public:
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstable_list> get_sstables();
|
||||
lw_shared_ptr<sstable_list> get_sstables_including_compacted_undeleted();
|
||||
size_t sstables_count();
|
||||
int64_t get_unleveled_sstables() const;
|
||||
|
||||
|
||||
@@ -143,7 +143,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
|
||||
|
||||
db::replay_position rp;
|
||||
|
||||
auto all_sstables = cf.get_sstables();
|
||||
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
|
||||
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user