Remove SSTable's atomic deletion manager

Not used anymore, can be deleted.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2018-02-07 22:38:45 -02:00
parent 1472cfcc19
commit 312bd9ce25
9 changed files with 0 additions and 458 deletions

View File

@@ -269,7 +269,6 @@ scylla_tests = [
'tests/database_test',
'tests/nonwrapping_range_test',
'tests/input_stream_test',
'tests/sstable_atomic_deletion_test',
'tests/virtual_reader_test',
'tests/view_schema_test',
'tests/counter_test',
@@ -393,7 +392,6 @@ scylla_core = (['database.cc',
'sstables/compaction.cc',
'sstables/compaction_strategy.cc',
'sstables/compaction_manager.cc',
'sstables/atomic_deletion.cc',
'sstables/integrity_checked_file_impl.cc',
'sstables/prepended_input_stream.cc',
'transport/event.cc',

View File

@@ -1335,12 +1335,6 @@ column_family::on_compaction_completion(const std::vector<sstables::shared_sstab
return make_exception_future<>(eptr);
}
return make_ready_future<>();
}).handle_exception([] (std::exception_ptr e) {
try {
std::rethrow_exception(e);
} catch (sstables::atomic_deletion_cancelled& adc) {
dblog.debug("Failed to delete sstables after compaction: {}", adc);
}
}).then([this] {
// refresh underlying data source in row cache to prevent it from holding reference
// to sstables files which were previously deleted.

View File

@@ -1,145 +0,0 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "atomic_deletion.hh"
#include "to_string.hh"
#include <seastar/core/shared_future.hh>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
namespace sstables {
logging::logger atomic_deletion_manager::_deletion_logger("sstable-deletion");
atomic_deletion_manager::atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables)
: _shard_count(shard_count)
, _delete_sstables(std::move(delete_sstables)) {
}
future<>
atomic_deletion_manager::delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
// runs on shard 0 only
_deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
if (_atomic_deletions_cancelled) {
_deletion_logger.debug("atomic deletions disabled, erroring out");
using boost::adaptors::transformed;
throw atomic_deletion_cancelled(atomic_deletion_set
| transformed(std::mem_fn(&sstable_to_delete::name)));
}
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
auto merged_set = make_lw_shared(pending_deletion());
for (auto&& sst_to_delete : atomic_deletion_set) {
merged_set->names.insert(sst_to_delete.name);
if (!sst_to_delete.shared) {
for (auto shard : boost::irange<shard_id>(0, _shard_count)) {
_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
}
}
}
auto pr = make_lw_shared<promise<>>();
merged_set->completions.insert(pr);
auto ret = pr->get_future();
for (auto&& sst_to_delete : atomic_deletion_set) {
auto i = _atomic_deletion_sets.find(sst_to_delete.name);
// merge from old deletion set to new deletion set
// i->second can be nullptr, see below why
if (i != _atomic_deletion_sets.end() && i->second) {
boost::copy(i->second->names, std::inserter(merged_set->names, merged_set->names.end()));
boost::copy(i->second->completions, std::inserter(merged_set->completions, merged_set->completions.end()));
}
}
_deletion_logger.debug("new atomic set: {}", merged_set->names);
// we need to merge new_atomic_deletion_sets into g_atomic_deletion_sets,
// but beware of exceptions. We do that with a first pass that inserts
// nullptr as the value, so the second pass only replaces, and does not allocate
for (auto&& sst_to_delete : atomic_deletion_set) {
_atomic_deletion_sets.emplace(sst_to_delete.name, nullptr);
}
// now, no allocations are involved, so this commits the operation atomically
for (auto&& n : merged_set->names) {
auto i = _atomic_deletion_sets.find(n);
i->second = merged_set;
}
// Mark each sstable as being deleted from deleting_shard. We have to do
// this in a separate pass, so the consideration whether we can delete or not
// sees all the data from this pass.
for (auto&& sst : atomic_deletion_set) {
_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
}
// Figure out if the (possibly merged) set can be deleted
for (auto&& sst : merged_set->names) {
if (_shards_agreeing_to_delete_sstable[sst].size() != _shard_count) {
// Not everyone agrees, leave the set pending
_deletion_logger.debug("deferring deletion until all shards agree");
return ret;
}
}
// Cannot recover from a failed deletion
for (auto&& name : merged_set->names) {
_atomic_deletion_sets.erase(name);
_shards_agreeing_to_delete_sstable.erase(name);
}
// Everyone agrees, let's delete
auto names = boost::copy_range<std::vector<sstring>>(merged_set->names);
_deletion_logger.debug("deleting {}", names);
// Run deletion in the background; all callers are waiting for it via merged_set->completions
_delete_sstables(names).then_wrapped([this, merged_set] (future<> result) {
_deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
shared_future<> sf(std::move(result));
for (auto&& comp : merged_set->completions) {
sf.get_future().forward_to(std::move(*comp));
}
});
return ret;
}
void
atomic_deletion_manager::cancel_atomic_deletions() {
_atomic_deletions_cancelled = true;
cancel_prior_atomic_deletions();
}
void
atomic_deletion_manager::cancel_prior_atomic_deletions() {
for (auto&& pd : _atomic_deletion_sets) {
if (!pd.second) {
// Could happen if a delete_atomically() failed
continue;
}
for (auto&& c : pd.second->completions) {
c->set_exception(atomic_deletion_cancelled(pd.second->names));
}
// since sets are shared, make sure we don't hit the same one again
pd.second->completions.clear();
}
_atomic_deletion_sets.clear();
_shards_agreeing_to_delete_sstable.clear();
}
}

View File

@@ -1,95 +0,0 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
// The atomic deletion manager solves the problem of orchestrating
// the deletion of files that must be deleted as a group, where each
// shard has different groups, and all shards delete a file for it to
// be deleted. For example,
//
// shard 0: delete "A"
// we can't delete anything because shard 1 hasn't agreed yet.
// shard 1: delete "A" and B"
// shard 1 agrees to delete "A", but we can't delete it yet,
// because shard 1 requires that it be deleted together with "B",
// and shard 0 hasn't agreed to delete "B" yet.
// shard 0: delete "B" and "C"
// shards 0 and 1 now both agree to delete "A" and "B", but shard 0
// doesn't allow us to delete "B" without "C".
// shard 1: delete "C"
// finally, we can delete "A", "B", and "C".
#include "log.hh"
#include <seastar/core/future.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/reactor.hh> // for shard_id
#include <unordered_set>
#include <unordered_map>
#include <vector>
#include "seastarx.hh"
namespace sstables {
struct sstable_to_delete {
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
sstring name;
bool shared = false;
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
};
class atomic_deletion_cancelled : public std::exception {
std::string _msg;
public:
explicit atomic_deletion_cancelled(std::vector<sstring> names);
template <typename StringRange>
explicit atomic_deletion_cancelled(StringRange range)
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
}
const char* what() const noexcept override;
};
class atomic_deletion_manager {
static logging::logger _deletion_logger;
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
using sstables_to_delete_atomically_type = std::set<sstring>;
struct pending_deletion {
sstables_to_delete_atomically_type names;
std::unordered_set<lw_shared_ptr<promise<>>> completions;
};
bool _atomic_deletions_cancelled = false;
// map from sstable name to a set of sstables that must be deleted atomically, including itself
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> _atomic_deletion_sets;
std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> _shards_agreeing_to_delete_sstable;
unsigned _shard_count;
std::function<future<> (std::vector<sstring> sstables)> _delete_sstables;
public:
atomic_deletion_manager(unsigned shard_count,
std::function<future<> (std::vector<sstring> sstables)> delete_sstables);
future<> delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard);
void cancel_atomic_deletions();
void cancel_prior_atomic_deletions();
};
}

View File

@@ -3046,11 +3046,6 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
}
std::ostream&
operator<<(std::ostream& os, const sstable_to_delete& std) {
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
}
future<>
delete_sstables(std::vector<sstring> tocs) {
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
@@ -3059,16 +3054,6 @@ delete_sstables(std::vector<sstring> tocs) {
});
}
static thread_local atomic_deletion_manager g_atomic_deletion_manager(smp::count, delete_sstables);
future<>
delete_atomically(std::vector<sstable_to_delete> ssts) {
auto shard = engine().cpu_id();
return smp::submit_to(0, [=] {
return g_atomic_deletion_manager.delete_atomically(ssts, shard);
});
}
future<>
delete_atomically(std::vector<shared_sstable> ssts) {
auto sstables_to_delete_atomically = boost::copy_range<std::vector<sstring>>(ssts
@@ -3077,23 +3062,6 @@ delete_atomically(std::vector<shared_sstable> ssts) {
return delete_sstables(std::move(sstables_to_delete_atomically));
}
void cancel_prior_atomic_deletions() {
g_atomic_deletion_manager.cancel_prior_atomic_deletions();
}
void cancel_atomic_deletions() {
g_atomic_deletion_manager.cancel_atomic_deletions();
}
atomic_deletion_cancelled::atomic_deletion_cancelled(std::vector<sstring> names)
: _msg(sprint("atomic deletions cancelled; not deleting %s", names)) {
}
const char*
atomic_deletion_cancelled::what() const noexcept {
return _msg.c_str();
}
thread_local shared_index_lists::stats shared_index_lists::_shard_stats;
static thread_local seastar::metrics::metric_groups metrics;

View File

@@ -52,7 +52,6 @@
#include "query-request.hh"
#include "compound_compat.hh"
#include "disk-error-handler.hh"
#include "atomic_deletion.hh"
#include "sstables/shared_index_lists.hh"
#include "sstables/progress_monitor.hh"
#include "db/commitlog/replay_position.hh"
@@ -811,19 +810,8 @@ future<> await_background_jobs_on_all_shards();
// shared among shard, so actual on-disk deletion of an sstable is deferred
// until all shards agree it can be deleted.
//
// When shutting down, we will not be able to complete some deletions.
// In that case, an atomic_deletion_cancelled exception is returned instead.
//
// This function only solves the second problem for now.
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
// Cancel any deletions scheduled by delete_atomically() and make their
// futures complete (with an atomic_deletion_cancelled exception).
void cancel_prior_atomic_deletions();
// Like cancel_prior_atomic_deletions(), but will also cause any later deletion attempts to fail.
void cancel_atomic_deletions();
struct index_sampling_state {
static constexpr size_t default_summary_byte_cost = 2000;

View File

@@ -43,7 +43,6 @@ boost_tests = [
'storage_proxy_test',
'schema_change_test',
'sstable_mutation_test',
'sstable_atomic_deletion_test',
'sstable_resharding_test',
'commitlog_test',
'hash_test',

View File

@@ -312,7 +312,6 @@ public:
db->start(std::move(*cfg)).get();
auto stop_db = defer([db] {
db->stop().get();
sstables::cancel_prior_atomic_deletions();
});
// FIXME: split

View File

@@ -1,164 +0,0 @@
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/atomic_deletion.hh"
#include <seastar/tests/test-utils.hh>
#include <deque>
#include <boost/range/numeric.hpp>
#include <boost/range/adaptor/transformed.hpp>
using namespace sstables;
class atomic_deletion_test_env {
public:
using event = std::function<future<> (atomic_deletion_test_env& adm)>;
private:
struct a_hash {
size_t operator()(const std::unordered_set<sstring>& s) const {
auto h = std::hash<sstring>();
return boost::accumulate(s | boost::adaptors::transformed(h), size_t(0)); // sue me
}
};
atomic_deletion_manager _adm;
std::deque<event> _events;
std::unordered_set<std::unordered_set<sstring>, a_hash> _deletes;
semaphore _deletion_counter { 0 };
private:
future<> delete_sstables(std::vector<sstring> names) {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
_deletes.insert(s1);
_deletion_counter.signal();
return make_ready_future<>();
}
public:
explicit atomic_deletion_test_env(unsigned shard_count, std::vector<event> events)
: _adm(shard_count, [this] (std::vector<sstring> names) {
return delete_sstables(names);
})
, _events(events.begin(), events.end()) {
}
void expect_no_deletion() {
BOOST_REQUIRE(_deletes.empty());
}
future<> schedule_delete(std::vector<sstable_to_delete> names, unsigned shard) {
_adm.delete_atomically(names, shard).discard_result();
return make_ready_future<>();
}
future<> expect_deletion(std::vector<sstring> names) {
return _deletion_counter.wait().then([this, names] {
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
auto erased = _deletes.erase(s1);
BOOST_REQUIRE_EQUAL(erased, 1);
});
}
future<> test() {
// run all _events sequentially
return repeat([this] {
if (_events.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto ev = std::move(_events.front());
_events.pop_front();
return ev(*this).then([] {
return stop_iteration::no;
});
});
}
};
future<> test_atomic_deletion_manager(unsigned shards, std::vector<atomic_deletion_test_env::event> events) {
auto env = make_lw_shared<atomic_deletion_test_env>(shards, events);
return env->test().finally([env] {});
}
atomic_deletion_test_env::event
delete_many(std::vector<sstable_to_delete> v, unsigned shard) {
return [v, shard] (atomic_deletion_test_env& env) {
// verify we didn't have an early delete from previous deletion
env.expect_no_deletion();
return env.schedule_delete(v, shard);
};
}
atomic_deletion_test_env::event
delete_one(sstable_to_delete s, unsigned shard) {
return delete_many({s}, shard);
}
atomic_deletion_test_env::event
expect_many(std::vector<sstring> names) {
return [names] (atomic_deletion_test_env& env) {
return env.expect_deletion(names);
};
}
atomic_deletion_test_env::event
expect_one(sstring name) {
return expect_many({name});
}
SEASTAR_TEST_CASE(test_single_shard_single_sstable) {
return test_atomic_deletion_manager(1, {
delete_one({"1", false}, 0),
expect_one("1"),
delete_one({"2", true}, 0),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_multi_shard_single_sstable) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"1", true}, 1),
delete_one({"1", true}, 2),
expect_one("1"),
delete_one({"2", false}, 1),
expect_one("2"),
});
}
SEASTAR_TEST_CASE(test_nonshared_compaction) {
return test_atomic_deletion_manager(5, {
delete_many({{"1", false}, {"2", false}, {"3", false}}, 2),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_shared_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", false}}, 2),
delete_one({"1", true}, 1),
expect_many({"1", "2", "3"}),
});
}
SEASTAR_TEST_CASE(test_overlapping_compaction) {
return test_atomic_deletion_manager(3, {
delete_one({"1", true}, 0),
delete_one({"3", true}, 0),
delete_many({{"1", true}, {"2", false}, {"3", true}}, 2),
delete_one({"1", true}, 1),
delete_many({{"3", true}, {"4", false}}, 1),
expect_many({"1", "2", "3", "4"}),
});
}