Merge "Delete compacted sstables atomically" from Avi

"If we compact sstables A, B into a new sstable C we must either delete both
A and B, or none of them.  This is because a tombstone in B may delete data
in A, and during compaction, both the tombstone and the data are removed.
If only B is deleted, then the data gets resurrected.

Non-atomic deletion occurs because the filesystem does not support atomic
deletion of multiple files; but the window for that is small and is not
addressed in this patchset.  Another case is when A is shared across
multiple shards (as is the case when changing shard count, or migrating
from existing Cassandra sstables).  This case is covered by this patchset.

Fixes #1181."
This commit is contained in:
Pekka Enberg
2016-04-14 22:04:15 +03:00
6 changed files with 241 additions and 48 deletions

View File

@@ -45,7 +45,9 @@
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/function_output_iterator.hpp>
#include <boost/range/algorithm/heap_algorithm.hpp>
#include <boost/range/algorithm/remove_if.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/adaptor/map.hpp>
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
#include "core/do_with.hh"
@@ -770,6 +772,8 @@ column_family::stop() {
return _flush_queue->close().then([this] {
return _streaming_flush_gate.close();
});
}).then([this] {
return _sstable_deletion_gate.close();
});
}
@@ -832,6 +836,18 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
});
}
void column_family::rebuild_statistics() {
// zeroing live_disk_space_used and live_sstable_count because the
// sstable list was re-created
_stats.live_disk_space_used = 0;
_stats.live_sstable_count = 0;
for (auto&& tab : boost::range::join(_sstables_compacted_but_not_deleted,
*_sstables | boost::adaptors::map_values)) {
update_stats_for_new_sstable(tab->data_size());
}
}
void
column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
@@ -840,37 +856,49 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
// later), and we add the new tables generated by the compaction.
// We create a new list rather than modifying it in-place, so that
// on-going reads can continue to use the old list.
//
// We only remove old sstables after they are successfully deleted,
// to avoid a new compaction from ignoring data in the old sstables
// if the deletion fails (note deletion of shared sstables can take
// unbounded time, because all shards must agree on the deletion).
auto current_sstables = _sstables;
auto new_sstable_list = make_lw_shared<sstable_list>();
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
// zeroing live_disk_space_used and live_sstable_count because the
// sstable list is re-created below.
_stats.live_disk_space_used = 0;
_stats.live_sstable_count = 0;
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
for (const auto& oldtab : *current_sstables) {
// First, add the new sstables.
for (auto&& tab : boost::range::join(new_sstables, *current_sstables | boost::adaptors::map_values)) {
// Checks if oldtab is a sstable not being compacted.
if (!s.count(oldtab.second)) {
update_stats_for_new_sstable(oldtab.second->data_size());
new_sstable_list->emplace(oldtab.first, oldtab.second);
if (!s.count(tab)) {
new_sstable_list->emplace(tab->generation(), tab);
} else {
new_compacted_but_not_deleted.push_back(tab);
}
}
for (const auto& newtab : new_sstables) {
// FIXME: rename the new sstable(s). Verify a rename doesn't cause
// problems for the sstable object.
update_stats_for_new_sstable(newtab->data_size());
new_sstable_list->emplace(newtab->generation(), newtab);
}
for (const auto& oldtab : sstables_to_remove) {
oldtab->mark_for_deletion();
}
_sstables = std::move(new_sstable_list);
_sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted);
rebuild_statistics();
// Second, delete the old sstables. This is done in the background, so we can
// consider this compaction completed.
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
auto current_sstables = _sstables;
auto new_sstable_list = make_lw_shared<sstable_list>();
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
return s.count(sst);
});
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
rebuild_statistics();
});
});
}
future<>
@@ -894,7 +922,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});
}
@@ -1025,6 +1053,24 @@ lw_shared_ptr<sstable_list> column_family::get_sstables() {
return _sstables;
}
// Gets the list of all sstables in the column family, including ones that are
// not used for active queries because they have already been compacted, but are
// waiting for delete_atomically() to return.
//
// As long as we haven't deleted them, compaction needs to ensure it doesn't
// garbage-collect a tombstone that covers data in an sstable that may not be
// successfully deleted.
lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_undeleted() {
if (_sstables_compacted_but_not_deleted.empty()) {
return _sstables;
}
auto ret = make_lw_shared(*_sstables);
for (auto&& s : _sstables_compacted_but_not_deleted) {
ret->insert(std::make_pair(s->generation(), s));
}
return ret;
}
inline bool column_family::manifest_json_filter(const sstring& fname) {
using namespace boost::filesystem;
@@ -2524,7 +2570,7 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return s->mark_for_deletion_on_disk();
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.

View File

@@ -267,6 +267,12 @@ private:
// generation -> sstable. Ordered by key so we can easily get the most recent.
lw_shared_ptr<sstable_list> _sstables;
// sstables that have been compacted (so don't look up in query) but
// have not been deleted yet, so must not GC any tombstones in other sstables
// that may delete data in these sstables:
std::vector<sstables::shared_sstable> _sstables_compacted_but_not_deleted;
// Control background fibers waiting for sstables to be deleted
seastar::gate _sstable_deletion_gate;
// There are situations in which we need to stop writing sstables. Flushers will take
// the read lock, and the ones that wish to stop that process will take the write lock.
rwlock _sstables_lock;
@@ -319,6 +325,7 @@ private:
// Rebuild existing _sstables with new_sstables added to it and sstables_to_remove removed from it.
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove);
void rebuild_statistics();
private:
// Creates a mutation reader which covers sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
@@ -485,6 +492,7 @@ public:
}
lw_shared_ptr<sstable_list> get_sstables();
lw_shared_ptr<sstable_list> get_sstables_including_compacted_undeleted();
size_t sstables_count();
int64_t get_unleveled_sstables() const;

View File

@@ -396,6 +396,10 @@ int main(int ac, char** av) {
// Note: changed from using a move here, because we want the config object intact.
db.start(std::ref(*cfg)).get();
engine().at_exit([&db] {
// A shared sstable must be compacted by all shards before it can be deleted.
// Since we're stoping, that's not going to happen. Cancel those pending
// deletions to let anyone waiting on them to continue.
sstables::cancel_atomic_deletions();
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.

View File

@@ -143,7 +143,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
db::replay_position rp;
auto all_sstables = cf.get_sstables();
auto all_sstables = cf.get_sstables_including_compacted_undeleted();
std::sort(sstables.begin(), sstables.end(), [] (const shared_sstable& x, const shared_sstable& y) {
return x->generation() < y->generation();
});

View File

@@ -30,6 +30,7 @@
#include "core/shared_ptr.hh"
#include "core/do_with.hh"
#include "core/thread.hh"
#include <seastar/core/shared_future.hh>
#include <iterator>
#include "types.hh"
@@ -44,6 +45,9 @@
#include <boost/filesystem/operations.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm_ext/insert.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/algorithm/set_algorithm.hpp>
#include <regex>
#include <core/align.hh>
#include "utils/phased_barrier.hh"
@@ -72,8 +76,6 @@ future<file> new_sstable_component_file(disk_error_signal_type& signal, sstring
});
}
thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> sstable::_shards_agreeing_to_remove_sstable;
static utils::phased_barrier& background_jobs() {
static thread_local utils::phased_barrier gate;
return gate;
@@ -1853,7 +1855,7 @@ sstable::~sstable() {
// clean up unused sstables, and because we'll never reuse the same
// generation number anyway.
try {
shared_remove_by_toc_name(filename(component_type::TOC), _shared).handle_exception(
delete_atomically({sstable_to_delete(filename(component_type::TOC), _shared)}).handle_exception(
[op = background_jobs().start()] (std::exception_ptr eptr) {
sstlog.warn("Exception when deleting sstable file: {}", eptr);
});
@@ -1869,26 +1871,6 @@ dirname(sstring fname) {
return boost::filesystem::canonical(std::string(fname)).parent_path().string();
}
future<>
sstable::shared_remove_by_toc_name(sstring toc_name, bool shared) {
if (!shared) {
return remove_by_toc_name(toc_name);
} else {
auto shard = std::hash<sstring>()(toc_name) % smp::count;
return smp::submit_to(shard, [toc_name, src_shard = engine().cpu_id()] {
auto& remove_set = _shards_agreeing_to_remove_sstable[toc_name];
remove_set.insert(src_shard);
auto counter = remove_set.size();
if (counter == smp::count) {
_shards_agreeing_to_remove_sstable.erase(toc_name);
return remove_by_toc_name(toc_name);
} else {
return make_ready_future<>();
}
});
}
}
future<>
fsync_directory(sstring fname) {
return sstable_write_io_check([&] {
@@ -2040,4 +2022,132 @@ void sstable::mark_sstable_for_deletion(sstring ks, sstring cf, sstring dir, int
sst.mark_for_deletion();
}
std::ostream&
operator<<(std::ostream& os, const sstable_to_delete& std) {
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
}
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::vector<lw_shared_ptr<promise<>>> completions;
};
static thread_local bool g_atomic_deletions_cancelled = false;
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
static logging::logger deletion_logger("sstable-deletion");
static
future<>
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (g_atomic_deletions_cancelled) {
deletion_logger.debug("atomic deletions disabled, erroring out");
throw std::runtime_error(sprint("atomic deletions disabled; not deleting %s", atomic_deletion_set));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
}
merged_set->completions.push_back(make_lw_shared<promise<>>());
auto ret = merged_set->completions.back()->get_future();
for (auto&& old_set : g_atomic_deletion_sets) {
auto intersection = sstables_to_delete_atomically_type();
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
if (intersection.empty()) {
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
// further on.
new_atomic_deletion_sets.push_back(old_set);
} else {
deletion_logger.debug("merging with {}", old_set->names);
boost::insert(merged_set->names, old_set->names);
boost::push_back(merged_set->completions, old_set->completions);
}
}
deletion_logger.debug("new atomic set: {}", merged_set->names);
new_atomic_deletion_sets.push_back(merged_set);
// can now exception-safely commit:
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
// Not everyone agrees, leave the set pending
deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
g_atomic_deletion_sets.pop_back();
for (auto&& name : merged_set->names) {
g_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
parallel_for_each(merged_set->names, [] (sstring name) {
deletion_logger.debug("deleting {}", name);
return remove_by_toc_name(name);
}).then_wrapped([merged_set] (future<> result) {
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
future<>
delete_atomically(std::vector<sstable_to_delete> ssts) {
auto shard = engine().cpu_id();
return smp::submit_to(0, [=] {
return do_delete_atomically(ssts, shard);
});
}
future<>
delete_atomically(std::vector<shared_sstable> ssts) {
std::vector<sstable_to_delete> sstables_to_delete_atomically;
for (auto&& sst : ssts) {
sstables_to_delete_atomically.push_back({sst->toc_filename(), sst->is_shared()});
}
return delete_atomically(std::move(sstables_to_delete_atomically));
}
void
cancel_atomic_deletions() {
g_atomic_deletions_cancelled = true;
for (auto&& pd : g_atomic_deletion_sets) {
for (auto&& c : pd->completions) {
c->set_exception(std::runtime_error(sprint("Atomic sstable deletions cancelled; not deleting %s", pd->names)));
}
}
g_atomic_deletion_sets.clear();
g_shards_agreeing_to_delete_sstable.clear();
}
}

View File

@@ -341,11 +341,9 @@ private:
void prepare_write_components(::mutation_reader mr,
uint64_t estimated_partitions, schema_ptr schema, uint64_t max_sstable_size,
const io_priority_class& pc);
static future<> shared_remove_by_toc_name(sstring toc_name, bool shared);
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
static thread_local std::unordered_map<sstring, std::unordered_set<unsigned>> _shards_agreeing_to_remove_sstable;
std::unordered_set<component_type, enum_hash<component_type>> _components;
@@ -588,4 +586,31 @@ future<> await_background_jobs();
// Invokes await_background_jobs() on all shards
future<> await_background_jobs_on_all_shards();
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
// deleted but A survived, then data from A will be resurrected.
//
// There are two violators of the requirement to atomically delete
// sstables: first sstable instantiation and deletion on disk is atomic
// only wrt. itself, not other sstables, and second when an sstable is
// shared among shard, so actual on-disk deletion of an sstable is deferred
// until all shards agree it can be deleted.
//
// This function only solves the second problem for now.
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
// Cancel any deletions scheduled by delete_atomically() and make their
// futures complete
void cancel_atomic_deletions();
}