sstables/compaction: Introduce compaction_completion_desc

This descriptor contain all information needed for table to be properly
updated on compaction completion. A new member will be added to it soon,
which will store ranges to be invalidated in row cache on behalf of
cleanup compaction.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2020-02-19 17:43:27 -03:00
parent a46f235092
commit 65b4fc8bcd
6 changed files with 28 additions and 22 deletions

View File

@@ -120,6 +120,7 @@ namespace sstables {
class sstable;
class entry_descriptor;
class compaction_descriptor;
class compaction_completion_desc;
class foreign_sstable_open_info;
class sstables_manager;
@@ -608,8 +609,7 @@ private:
const std::vector<sstables::shared_sstable>& old_sstables);
// Rebuilds the sstable set right away and schedule deletion of old sstables.
void on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove);
void on_compaction_completion(sstables::compaction_completion_desc& desc);
void rebuild_statistics();

View File

@@ -829,7 +829,7 @@ private:
_compacting->erase(sst);
});
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(exhausted_ssts, std::move(_new_unused_sstables));
_replacer(compaction_completion_desc{exhausted_ssts, std::move(_new_unused_sstables)});
_sstables.erase(exhausted, _sstables.end());
backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
}
@@ -839,7 +839,7 @@ private:
if (!_sstables.empty()) {
std::vector<shared_sstable> sstables_compacted;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted));
_replacer(std::move(sstables_compacted), std::move(_new_unused_sstables));
_replacer(compaction_completion_desc{std::move(sstables_compacted), std::move(_new_unused_sstables)});
}
}

View File

@@ -189,8 +189,13 @@ namespace sstables {
}
};
struct compaction_completion_desc {
std::vector<shared_sstable> input_sstables;
std::vector<shared_sstable> output_sstables;
};
// Replaces old sstable(s) by new one(s) which contain all non-expired data.
using replacer_fn = std::function<void(std::vector<shared_sstable> removed, std::vector<shared_sstable> added)>;
using replacer_fn = std::function<void(compaction_completion_desc)>;
// Compact a list of N sstables into M sstables.
// Returns info about the finished compaction, which includes vector to new sstables.

View File

@@ -1174,8 +1174,7 @@ table::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sst
// Note: must run in a seastar thread
void
table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
// Build a new list of _sstables: We remove from the existing list the
// tables we compacted (by now, there might be more sstables flushed
// later), and we add the new tables generated by the compaction.
@@ -1188,7 +1187,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
// unbounded time, because all shards must agree on the deletion).
// make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion.
for (auto& sst : sstables_to_remove) {
for (auto& sst : desc.input_sstables) {
auto shards = sst->get_shards_for_this_sstable();
if (shards.size() > 1) {
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!",
@@ -1202,9 +1201,9 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
// rebuilding _sstables_compacted_but_not_deleted first to make the entire rebuild operation exception safe.
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), sstables_to_remove.begin(), sstables_to_remove.end());
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), desc.input_sstables.begin(), desc.input_sstables.end());
rebuild_sstable_list(new_sstables, sstables_to_remove);
rebuild_sstable_list(desc.output_sstables, desc.input_sstables);
// refresh underlying data source in row cache to prevent it from holding reference
// to sstables files that are about to be deleted.
@@ -1214,7 +1213,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
rebuild_statistics();
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove = desc.input_sstables] {
return with_semaphore(_sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] {
return sstables::delete_atomically(std::move(sstables_to_remove));
});
@@ -1232,7 +1231,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
// or they could stay forever in the set, resulting in deleted files remaining
// opened and disk space not being released until shutdown.
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
desc.input_sstables.begin(), desc.input_sstables.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
return s.count(sst);
});
@@ -1289,13 +1288,12 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
sst->set_unshared();
return sst;
};
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (std::vector<sstables::shared_sstable> old_ssts,
std::vector<sstables::shared_sstable> new_ssts) {
_compaction_strategy.notify_completion(old_ssts, new_ssts);
_compaction_manager.propagate_replacement(this, old_ssts, new_ssts);
this->on_compaction_completion(new_ssts, old_ssts);
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) {
_compaction_strategy.notify_completion(desc.input_sstables, desc.output_sstables);
_compaction_manager.propagate_replacement(this, desc.input_sstables, desc.output_sstables);
this->on_compaction_completion(desc);
if (release_exhausted) {
release_exhausted(old_ssts);
release_exhausted(desc.input_sstables);
}
};

View File

@@ -5094,7 +5094,9 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
| boost::adaptors::transformed([] (auto& sst) { return sst->generation(); }));
auto expected_sst = sstable_run.begin();
auto closed_sstables_tracker = sstable_run.begin();
auto replacer = [&] (auto old_sstables, auto new_sstables) {
auto replacer = [&] (sstables::compaction_completion_desc desc) {
auto old_sstables = std::move(desc.input_sstables);
auto new_sstables = std::move(desc.output_sstables);
BOOST_REQUIRE(expected_sst != sstable_run.end());
if (incremental_enabled) {
do_incremental_replace(std::move(old_sstables), std::move(new_sstables), expected_sst, closed_sstables_tracker);
@@ -5627,8 +5629,9 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
cf->add_sstable_and_update_cache(expired_sst).get();
BOOST_REQUIRE(is_partition_dead(alpha));
auto replacer = [&] (std::vector<sstables::shared_sstable> old_sstables, std::vector<sstables::shared_sstable> new_sstables) {
auto replacer = [&] (sstables::compaction_completion_desc desc) {
auto old_sstables = std::move(desc.input_sstables);
auto new_sstables = std::move(desc.output_sstables);
// expired_sst is exhausted, and new sstable is written with mut 2.
BOOST_REQUIRE(old_sstables.size() == 1);
BOOST_REQUIRE(old_sstables.front() == expired_sst);

View File

@@ -226,7 +226,7 @@ public:
};
inline auto replacer_fn_no_op() {
return [](std::vector<shared_sstable> removed, std::vector<shared_sstable> added) -> void {};
return [](sstables::compaction_completion_desc desc) -> void {};
}
inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)