Merge "Delete compacted sstables atomically" from Avi
"If we compact sstables A, B into a new sstable C we must either delete both A and B, or none of them. This is because a tombstone in B may delete data in A, and during compaction, both the tombstone and the data are removed. If only B is deleted, then the data gets resurrected. Non-atomic deletion occurs because the filesystem does not support atomic deletion of multiple files; but the window for that is small and is not addressed in this patchset. Another case is when A is shared across multiple shards (as is the case when changing shard count, or migrating from existing Cassandra sstables). This case is covered by this patchset. Fixes #1181."
This commit is contained in:
90
database.cc
90
database.cc
@@ -45,7 +45,9 @@
|
||||
#include <boost/algorithm/cxx11/all_of.hpp>
|
||||
#include <boost/function_output_iterator.hpp>
|
||||
#include <boost/range/algorithm/heap_algorithm.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation_partition_applier.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -770,6 +772,8 @@ column_family::stop() {
|
||||
return _flush_queue->close().then([this] {
|
||||
return _streaming_flush_gate.close();
|
||||
});
|
||||
}).then([this] {
|
||||
return _sstable_deletion_gate.close();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -832,6 +836,18 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::rebuild_statistics() {
|
||||
// zeroing live_disk_space_used and live_sstable_count because the
|
||||
// sstable list was re-created
|
||||
_stats.live_disk_space_used = 0;
|
||||
_stats.live_sstable_count = 0;
|
||||
|
||||
for (auto&& tab : boost::range::join(_sstables_compacted_but_not_deleted,
|
||||
*_sstables | boost::adaptors::map_values)) {
|
||||
update_stats_for_new_sstable(tab->data_size());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
@@ -840,37 +856,49 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// later), and we add the new tables generated by the compaction.
|
||||
// We create a new list rather than modifying it in-place, so that
|
||||
// on-going reads can continue to use the old list.
|
||||
//
|
||||
// We only remove old sstables after they are successfully deleted,
|
||||
// to avoid a new compaction from ignoring data in the old sstables
|
||||
// if the deletion fails (note deletion of shared sstables can take
|
||||
// unbounded time, because all shards must agree on the deletion).
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
|
||||
|
||||
// zeroing live_disk_space_used and live_sstable_count because the
|
||||
// sstable list is re-created below.
|
||||
_stats.live_disk_space_used = 0;
|
||||
_stats.live_sstable_count = 0;
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
|
||||
for (const auto& oldtab : *current_sstables) {
|
||||
// First, add the new sstables.
|
||||
for (auto&& tab : boost::range::join(new_sstables, *current_sstables | boost::adaptors::map_values)) {
|
||||
// Checks if oldtab is a sstable not being compacted.
|
||||
if (!s.count(oldtab.second)) {
|
||||
update_stats_for_new_sstable(oldtab.second->data_size());
|
||||
new_sstable_list->emplace(oldtab.first, oldtab.second);
|
||||
if (!s.count(tab)) {
|
||||
new_sstable_list->emplace(tab->generation(), tab);
|
||||
} else {
|
||||
new_compacted_but_not_deleted.push_back(tab);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& newtab : new_sstables) {
|
||||
// FIXME: rename the new sstable(s). Verify a rename doesn't cause
|
||||
// problems for the sstable object.
|
||||
update_stats_for_new_sstable(newtab->data_size());
|
||||
new_sstable_list->emplace(newtab->generation(), newtab);
|
||||
}
|
||||
|
||||
for (const auto& oldtab : sstables_to_remove) {
|
||||
oldtab->mark_for_deletion();
|
||||
}
|
||||
|
||||
_sstables = std::move(new_sstable_list);
|
||||
_sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted);
|
||||
|
||||
rebuild_statistics();
|
||||
|
||||
// Second, delete the old sstables. This is done in the background, so we can
|
||||
// consider this compaction completed.
|
||||
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
|
||||
return s.count(sst);
|
||||
});
|
||||
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
|
||||
rebuild_statistics();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -894,7 +922,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
|
||||
};
|
||||
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
|
||||
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
|
||||
this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1025,6 +1053,24 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
|
||||
return _sstables;
|
||||
}
|
||||
|
||||
// Gets the list of all sstables in the column family, including ones that are
|
||||
// not used for active queries because they have already been compacted, but are
|
||||
// waiting for delete_atomically() to return.
|
||||
//
|
||||
// As long as we haven't deleted them, compaction needs to ensure it doesn't
|
||||
// garbage-collect a tombstone that covers data in an sstable that may not be
|
||||
// successfully deleted.
|
||||
lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_undeleted() {
|
||||
if (_sstables_compacted_but_not_deleted.empty()) {
|
||||
return _sstables;
|
||||
}
|
||||
auto ret = make_lw_shared(*_sstables);
|
||||
for (auto&& s : _sstables_compacted_but_not_deleted) {
|
||||
ret->insert(std::make_pair(s->generation(), s));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline bool column_family::manifest_json_filter(const sstring& fname) {
|
||||
using namespace boost::filesystem;
|
||||
|
||||
@@ -2524,7 +2570,7 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
_cache.clear();
|
||||
|
||||
return parallel_for_each(remove, [](sstables::shared_sstable s) {
|
||||
return s->mark_for_deletion_on_disk();
|
||||
return sstables::delete_atomically({s});
|
||||
}).then([rp] {
|
||||
return make_ready_future<db::replay_position>(rp);
|
||||
}).finally([remove] {}); // keep the objects alive until here.
|
||||
|
||||
@@ -267,6 +267,12 @@ private:
|
||||
|
||||
// generation -> sstable. Ordered by key so we can easily get the most recent.
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
// sstables that have been compacted (so don't look up in query) but
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
// that may delete data in these sstables:
|
||||
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
|
||||
// Control background fibers waiting for sstables to be deleted
|
||||
seastar::gate _sstable_deletion_gate;
|
||||
// There are situations in which we need to stop writing sstables. Flushers will take
|
||||
// the read lock, and the ones that wish to stop that process will take the write lock.
|
||||
rwlock _sstables_lock;
|
||||
@@ -319,6 +325,7 @@ private:
|
||||
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove);
|
||||
void rebuild_statistics();
|
||||
private:
|
||||
// Creates a mutation reader which covers sstables.
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
@@ -485,6 +492,7 @@ public:
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstable_list> get_sstables();
|
||||
lw_shared_ptr<sstable_list> get_sstables_including_compacted_undeleted();
|
||||
size_t sstables_count();
|
||||
int64_t get_unleveled_sstables() const;
|
||||
|
||||
|
||||
4
main.cc
4
main.cc
@@ -396,6 +396,10 @@ int main(int ac, char** av) {
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
db.start(std::ref(*cfg)).get();
|
||||
engine().at_exit([&db] {
|
||||
// A shared sstable must be compacted by all shards before it can be deleted.
|
||||
// Since we're stoping, that's not going to happen. Cancel those pending
|
||||
// deletions to let anyone waiting on them to continue.
|
||||
sstables::cancel_atomic_deletions();
|
||||
// #293 - do not stop anything - not even db (for real)
|
||||
//return db.stop();
|
||||
// call stop on each db instance, but leave the shareded<database> pointers alive.
|
||||
|
||||
@@ -143,7 +143,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
|
||||
|
||||
db::replay_position rp;
|
||||
|
||||
auto all_sstables = cf.get_sstables();
|
||||
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
|
||||
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "core/shared_ptr.hh"
|
||||
#include "core/do_with.hh"
|
||||
#include "core/thread.hh"
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <iterator>
|
||||
|
||||
#include "types.hh"
|
||||
@@ -44,6 +45,9 @@
|
||||
#include <boost/filesystem/operations.hpp>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/algorithm_ext/insert.hpp>
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <boost/range/algorithm/set_algorithm.hpp>
|
||||
#include <regex>
|
||||
#include <core/align.hh>
|
||||
#include "utils/phased_barrier.hh"
|
||||
@@ -72,8 +76,6 @@ future<file> new_sstable_component_file(disk_error_signal_type& signal, sstring
|
||||
});
|
||||
}
|
||||
|
||||
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
|
||||
|
||||
static utils::phased_barrier& background_jobs() {
|
||||
static thread_local utils::phased_barrier gate;
|
||||
return gate;
|
||||
@@ -1853,7 +1855,7 @@ sstable::~sstable() {
|
||||
// clean up unused sstables, and because we'll never reuse the same
|
||||
// generation number anyway.
|
||||
try {
|
||||
shared_remove_by_toc_name(filename(component_type::TOC), _shared).handle_exception(
|
||||
delete_atomically({sstable_to_delete(filename(component_type::TOC), _shared)}).handle_exception(
|
||||
[op = background_jobs().start()] (std::exception_ptr eptr) {
|
||||
sstlog.warn("Exception when deleting sstable file: {}", eptr);
|
||||
});
|
||||
@@ -1869,26 +1871,6 @@ dirname(sstring fname) {
|
||||
return boost::filesystem::canonical(std::string(fname)).parent_path().string();
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
|
||||
if (!shared) {
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
auto shard = std::hash<sstring>()(toc_name) % smp::count;
|
||||
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
|
||||
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
|
||||
remove_set.insert(src_shard);
|
||||
auto counter = remove_set.size();
|
||||
if (counter == smp::count) {
|
||||
_shards_agreeing_to_remove_sstable.erase(toc_name);
|
||||
return remove_by_toc_name(toc_name);
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
fsync_directory(sstring fname) {
|
||||
return sstable_write_io_check([&] {
|
||||
@@ -2040,4 +2022,132 @@ void sstable::mark_sstable_for_deletion(sstring ks, sstring cf, sstring dir, int
|
||||
sst.mark_for_deletion();
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const sstable_to_delete& std) {
|
||||
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
|
||||
}
|
||||
|
||||
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
|
||||
using sstables_to_delete_atomically_type = std::set<sstring>;
|
||||
struct pending_deletion {
|
||||
sstables_to_delete_atomically_type names;
|
||||
std::vector<lw_shared_ptr<promise<>>> completions;
|
||||
};
|
||||
|
||||
static thread_local bool g_atomic_deletions_cancelled = false;
|
||||
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
|
||||
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
|
||||
|
||||
static logging::logger deletion_logger("sstable-deletion");
|
||||
|
||||
static
|
||||
future<>
|
||||
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
|
||||
// runs on shard 0 only
|
||||
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
|
||||
|
||||
if (g_atomic_deletions_cancelled) {
|
||||
deletion_logger.debug("atomic deletions disabled, erroring out");
|
||||
throw std::runtime_error(sprint("atomic deletions disabled; not deleting %s", atomic_deletion_set));
|
||||
}
|
||||
|
||||
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
|
||||
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
|
||||
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
|
||||
auto merged_set = make_lw_shared(pending_deletion());
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
merged_set->names.insert(sst_to_delete.name);
|
||||
if (!sst_to_delete.shared) {
|
||||
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
|
||||
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
merged_set->completions.push_back(make_lw_shared<promise<>>());
|
||||
auto ret = merged_set->completions.back()->get_future();
|
||||
for (auto&& old_set : g_atomic_deletion_sets) {
|
||||
auto intersection = sstables_to_delete_atomically_type();
|
||||
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
|
||||
if (intersection.empty()) {
|
||||
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
|
||||
// further on.
|
||||
new_atomic_deletion_sets.push_back(old_set);
|
||||
} else {
|
||||
deletion_logger.debug("merging with {}", old_set->names);
|
||||
boost::insert(merged_set->names, old_set->names);
|
||||
boost::push_back(merged_set->completions, old_set->completions);
|
||||
}
|
||||
}
|
||||
deletion_logger.debug("new atomic set: {}", merged_set->names);
|
||||
new_atomic_deletion_sets.push_back(merged_set);
|
||||
// can now exception-safely commit:
|
||||
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
|
||||
|
||||
// Mark each sstable as being deleted from deleting_shard. We have to do
|
||||
// this in a separate pass, so the consideration whether we can delete or not
|
||||
// sees all the data from this pass.
|
||||
for (auto&& sst : atomic_deletion_set) {
|
||||
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
|
||||
}
|
||||
|
||||
// Figure out if the (possibly merged) set can be deleted
|
||||
for (auto&& sst : merged_set->names) {
|
||||
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
|
||||
// Not everyone agrees, leave the set pending
|
||||
deletion_logger.debug("deferring deletion until all shards agree");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot recover from a failed deletion
|
||||
g_atomic_deletion_sets.pop_back();
|
||||
for (auto&& name : merged_set->names) {
|
||||
g_shards_agreeing_to_delete_sstable.erase(name);
|
||||
}
|
||||
|
||||
// Everyone agrees, let's delete
|
||||
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
|
||||
parallel_for_each(merged_set->names, [] (sstring name) {
|
||||
deletion_logger.debug("deleting {}", name);
|
||||
return remove_by_toc_name(name);
|
||||
}).then_wrapped([merged_set] (future<> result) {
|
||||
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
|
||||
shared_future<> sf(std::move(result));
|
||||
for (auto&& comp : merged_set->completions) {
|
||||
sf.get_future().forward_to(std::move(*comp));
|
||||
}
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
future<>
|
||||
delete_atomically(std::vector<sstable_to_delete> ssts) {
|
||||
auto shard = engine().cpu_id();
|
||||
return smp::submit_to(0, [=] {
|
||||
return do_delete_atomically(ssts, shard);
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
delete_atomically(std::vector<shared_sstable> ssts) {
|
||||
std::vector<sstable_to_delete> sstables_to_delete_atomically;
|
||||
for (auto&& sst : ssts) {
|
||||
sstables_to_delete_atomically.push_back({sst->toc_filename(), sst->is_shared()});
|
||||
}
|
||||
return delete_atomically(std::move(sstables_to_delete_atomically));
|
||||
}
|
||||
|
||||
void
|
||||
cancel_atomic_deletions() {
|
||||
g_atomic_deletions_cancelled = true;
|
||||
for (auto&& pd : g_atomic_deletion_sets) {
|
||||
for (auto&& c : pd->completions) {
|
||||
c->set_exception(std::runtime_error(sprint("Atomic sstable deletions cancelled; not deleting %s", pd->names)));
|
||||
}
|
||||
}
|
||||
g_atomic_deletion_sets.clear();
|
||||
g_shards_agreeing_to_delete_sstable.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -341,11 +341,9 @@ private:
|
||||
void prepare_write_components(::mutation_reader mr,
|
||||
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size,
|
||||
const io_priority_class& pc);
|
||||
static future<> shared_remove_by_toc_name(sstring toc_name, bool shared);
|
||||
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
|
||||
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
|
||||
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
|
||||
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
|
||||
|
||||
std::unordered_set<component_type, enum_hash<component_type>> _components;
|
||||
|
||||
@@ -588,4 +586,31 @@ future<> await_background_jobs();
|
||||
// Invokes await_background_jobs() on all shards
|
||||
future<> await_background_jobs_on_all_shards();
|
||||
|
||||
struct sstable_to_delete {
|
||||
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
|
||||
sstring name;
|
||||
bool shared = false;
|
||||
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
|
||||
};
|
||||
|
||||
|
||||
// When we compact sstables, we have to atomically instantiate the new
|
||||
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
|
||||
// and if A contained some data that was tombstoned by B, and if B was
|
||||
// deleted but A survived, then data from A will be resurrected.
|
||||
//
|
||||
// There are two violators of the requirement to atomically delete
|
||||
// sstables: first sstable instantiation and deletion on disk is atomic
|
||||
// only wrt. itself, not other sstables, and second when an sstable is
|
||||
// shared among shard, so actual on-disk deletion of an sstable is deferred
|
||||
// until all shards agree it can be deleted.
|
||||
//
|
||||
// This function only solves the second problem for now.
|
||||
future<> delete_atomically(std::vector<shared_sstable> ssts);
|
||||
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
|
||||
|
||||
// Cancel any deletions scheduled by delete_atomically() and make their
|
||||
// futures complete
|
||||
void cancel_atomic_deletions();
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user