sstables: Enforce disjoint invariant in sstable_run

We know that sstable_run is supposed to contain disjoint files only,
but this assumption can temporarily break when switching strategies
as TWCS, for example, can incorrectly pick the same run id for
sstables in different windows during segregation. So when switching
from TWCS to ICS, it could happen a sstable_run won't contain disjoint
files. We should definitely fix TWCS and any other strategy doing
that, but sstable_run should have disjointness as actual invariant,
not be relaxed on it. Otherwise, we cannot build readers on this
assumption, so more complicated logic have to be added to merge
overlapping files.
After this patch, sstable_run will reject insertion of a file that
will cause the invariant to break, so caller will have to check
that and push that file into a different sstable run.

Closes #11116
This commit is contained in:
Raphael S. Carvalho
2022-07-24 15:02:53 -03:00
committed by Avi Kivity
parent 81e20ceaab
commit 0796b8c97a
4 changed files with 94 additions and 9 deletions

View File

@@ -27,8 +27,36 @@
namespace sstables {
void sstable_run::insert(shared_sstable sst) {
extern logging::logger sstlog;
bool
sstable_first_key_less_comparator::operator()(const shared_sstable& s1, const shared_sstable& s2) const {
return s1->compare_by_first_key(*s2) < 0;
}
bool sstable_run::will_introduce_overlapping(const shared_sstable& sst) const {
// checks if s1 is *all* before s2, meaning their bounds don't overlap.
auto completely_ordered_before = [] (const shared_sstable& s1, const shared_sstable& s2) {
auto less_cmp = [s = s1->get_schema()] (const dht::decorated_key& k1, const dht::decorated_key& k2) {
return k1.less_compare(*s, k2);
};
return less_cmp(s1->get_first_decorated_key(), s2->get_first_decorated_key()) &&
less_cmp(s1->get_last_decorated_key(), s2->get_first_decorated_key());
};
// lower bound will be the 1st element which is not *all* before the candidate sstable.
// upper bound will be the 1st element which the candidate sstable is *all* before.
// if there's overlapping, lower bound will be 1st element which overlaps, whereas upper bound the 1st one which doesn't (or end iterator)
// if there's not overlapping, lower and upper bound will both point to 1st element which the candidate sstable is *all* before (or end iterator).
auto p = std::equal_range(_all.begin(), _all.end(), sst, completely_ordered_before);
return p.first != p.second;
};
bool sstable_run::insert(shared_sstable sst) {
if (will_introduce_overlapping(sst)) {
return false;
}
_all.insert(std::move(sst));
return true;
}
void sstable_run::erase(shared_sstable sst) {
@@ -271,7 +299,12 @@ void partitioned_sstable_set::insert(shared_sstable sst) {
_all->insert(sst);
auto undo_all_insert = defer([&] () { _all->erase(sst); });
_all_runs[sst->run_identifier()].insert(sst);
// If sstable doesn't satisfy disjoint invariant, then place it in a new sstable run.
while (!_all_runs[sst->run_identifier()].insert(sst)) {
sstlog.warn("Generating a new run identifier for SSTable {} as overlapping was detected when inserting it into SSTable run {}",
sst->get_filename(), sst->run_identifier());
sst->generate_new_run_identifier();
}
auto undo_all_runs_insert = defer([&] () { _all_runs[sst->run_identifier()].erase(sst); });
if (store_as_unleveled(sst)) {

View File

@@ -26,16 +26,26 @@ namespace sstables {
class sstable_set_impl;
class incremental_selector_impl;
struct sstable_first_key_less_comparator {
bool operator()(const shared_sstable& s1, const shared_sstable& s2) const;
};
// Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID.
// SStables in that same run will not overlap with one another.
class sstable_run {
sstable_list _all;
public:
void insert(shared_sstable sst);
using sstable_set = std::set<shared_sstable, sstable_first_key_less_comparator>;
private:
sstable_set _all;
private:
bool will_introduce_overlapping(const shared_sstable& sst) const;
public:
// Returns false if sstable being inserted cannot satisfy the disjoint invariant. Then caller should pick another run for it.
bool insert(shared_sstable sst);
void erase(shared_sstable sst);
// Data size of the whole run, meaning it's a sum of the data size of all its fragments.
uint64_t data_size() const;
const sstable_list& all() const { return _all; }
const sstable_set& all() const { return _all; }
double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const;
};

View File

@@ -811,6 +811,10 @@ public:
// This will change sstable level only in memory.
void set_sstable_level(uint32_t);
void generate_new_run_identifier() {
_run_identifier = utils::make_random_uuid();
}
double get_compression_ratio() const;
const sstables::compression& get_compression() const {

View File

@@ -1056,6 +1056,42 @@ SEASTAR_TEST_CASE(check_overlapping) {
});
}
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
return test_env::do_with([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto key_and_token_pair = token_generation_for_current_shard(6);
auto next_gen = [gen = make_lw_shared<unsigned>(1)] { return (*gen)++; };
sstables::sstable_run run;
auto insert = [&] (int first_key_idx, int last_key_idx) {
auto sst = sstable_for_overlapping_test(env, s, next_gen(),
key_and_token_pair[first_key_idx].first, key_and_token_pair[last_key_idx].first);
return run.insert(sst);
};
// insert ranges [0, 0], [1, 1], [3, 4]
BOOST_REQUIRE(insert(0, 0) == true);
BOOST_REQUIRE(insert(1, 1) == true);
BOOST_REQUIRE(insert(3, 4) == true);
BOOST_REQUIRE(run.all().size() == 3);
// check overlapping candidates won't be inserted
BOOST_REQUIRE(insert(0, 4) == false);
BOOST_REQUIRE(insert(4, 5) == false);
BOOST_REQUIRE(run.all().size() == 3);
// check non-overlapping candidates will be inserted
BOOST_REQUIRE(insert(2, 2) == true);
BOOST_REQUIRE(insert(5, 5) == true);
BOOST_REQUIRE(run.all().size() == 5);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(tombstone_purge_test) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
@@ -3514,10 +3550,6 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
auto non_expired_sst = make_sstable_containing(sst_gen, {mut1, mut2, mut3});
auto expired_sst = make_sstable_containing(sst_gen, {mut1_deletion});
// make ssts belong to same run for compaction to enable incremental approach
utils::UUID run_id = utils::make_random_uuid();
sstables::test(non_expired_sst).set_run_identifier(run_id);
sstables::test(expired_sst).set_run_identifier(run_id);
std::vector<shared_sstable> sstables = {
non_expired_sst,
@@ -3571,6 +3603,12 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
throw std::runtime_error("forcing compaction failure on early replacement");
};
// make ssts belong to same run for compaction to enable incremental approach.
// That needs to happen after fragments were inserted into sstable_set, as they'll placed into different runs due to detected overlapping.
utils::UUID run_id = utils::make_random_uuid();
sstables::test(non_expired_sst).set_run_identifier(run_id);
sstables::test(expired_sst).set_run_identifier(run_id);
bool swallowed = false;
try {
// The goal is to have one sstable generated for each mutation to trigger the issue.