Merge "Fix potential infinite recursion in leveled compaction" from Raphael
'"The issue is triggered by compaction of sstables of level higher than 0. The problem happens when interval map of partitioned sstable set stores intervals such as follow: [-9223362900961284625 : -3695961740249769322 ] (-3695961740249769322 : -3695961103022958562 ] When selector is called for first interval above, the exclusive lower bound of the second interval is returned as next token, but the inclusivess info is not returned. So reader_selector was returning that there *were* new readers when the current token was -3695961740249769322 because it was stored in selector position field as inclusive, but it's actually exclusive. This false positive was leading to infinite recursion in combined reader because sstable set's incremental selector itself knew that there were actually *no* new readers, and therefore *no* progress could be made." Fixes #2908.' * 'high_level_compaction_infinite_recursion_fix_v4' of github.com:raphaelsc/scylla: tests: test for infinite recursion bug when doing high-level compaction Fix potential infinite recursion when combining mutations for leveled compaction dht: make it easier to create ring_position_view from token dht: introduce is_min/max for ring_position
This commit is contained in:
20
database.cc
20
database.cc
@@ -375,7 +375,6 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
// selects readers on-demand as the read progresses through the token
|
||||
// range.
|
||||
class incremental_reader_selector : public reader_selector {
|
||||
schema_ptr _s;
|
||||
const dht::partition_range* _pr;
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
const io_priority_class& _pc;
|
||||
@@ -404,7 +403,7 @@ public:
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstable_reader_factory_type fn)
|
||||
: _s(s)
|
||||
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position::min())
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
, _pc(pc)
|
||||
@@ -415,7 +414,6 @@ public:
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _selector(_sstables->make_incremental_selector())
|
||||
, _fn(std::move(fn)) {
|
||||
_selector_position = _pr->start() ? _pr->start()->value().token() : dht::minimum_token();
|
||||
|
||||
dblog.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
|
||||
this,
|
||||
@@ -432,25 +430,28 @@ public:
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const t) override {
|
||||
dblog.trace("incremental_reader_selector {}: {}({})", this, __FUNCTION__, seastar::lazy_deref(t));
|
||||
|
||||
const auto& position = (t ? *t : _selector_position);
|
||||
const auto& position = (t ? *t : _selector_position.token());
|
||||
// we only pass _selector_position's token to _selector::select() when T is nullptr
|
||||
// because it means gap between sstables, and the lower bound of the first interval
|
||||
// after the gap is guaranteed to be inclusive.
|
||||
auto selection = _selector.select(position);
|
||||
|
||||
if (selection.sstables.empty()) {
|
||||
// For the lower bound of the token range the _selector
|
||||
// might not return any sstables, in this case try again
|
||||
// with next_token unless it's maximum token.
|
||||
if (!selection.next_token.is_maximum()
|
||||
if (!selection.next_position.is_max()
|
||||
&& position == (_pr->start() ? _pr->start()->value().token() : dht::minimum_token())) {
|
||||
dblog.trace("incremental_reader_selector {}: no sstables intersect with the lower bound, retrying", this);
|
||||
_selector_position = std::move(selection.next_token);
|
||||
_selector_position = std::move(selection.next_position);
|
||||
return create_new_readers(nullptr);
|
||||
}
|
||||
|
||||
_selector_position = dht::maximum_token();
|
||||
_selector_position = dht::ring_position::max();
|
||||
return {};
|
||||
}
|
||||
|
||||
_selector_position = std::move(selection.next_token);
|
||||
_selector_position = std::move(selection.next_position);
|
||||
|
||||
dblog.trace("incremental_reader_selector {}: {} new sstables to consider, advancing selector to {}", this, selection.sstables.size(), _selector_position);
|
||||
|
||||
@@ -464,7 +465,8 @@ public:
|
||||
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
|
||||
_pr = ≺
|
||||
|
||||
if (_pr->start()->value().token() >= _selector_position) {
|
||||
dht::ring_position_comparator cmp(*_s);
|
||||
if (cmp(dht::ring_position_view::for_range_start(*_pr), _selector_position) >= 0) {
|
||||
return create_new_readers(&_pr->start()->value().token());
|
||||
}
|
||||
|
||||
|
||||
@@ -399,6 +399,14 @@ public:
|
||||
return { maximum_token(), token_bound::end };
|
||||
}
|
||||
|
||||
bool is_min() const {
|
||||
return _token.is_minimum();
|
||||
}
|
||||
|
||||
bool is_max() const {
|
||||
return _token.is_maximum();
|
||||
}
|
||||
|
||||
static ring_position starting_at(dht::token token) {
|
||||
return { std::move(token), token_bound::start };
|
||||
}
|
||||
@@ -559,6 +567,12 @@ public:
|
||||
, _weight(weight)
|
||||
{ }
|
||||
|
||||
explicit ring_position_view(const dht::token& token, int8_t weight = -1)
|
||||
: _token(&token)
|
||||
, _key(nullptr)
|
||||
, _weight(weight)
|
||||
{ }
|
||||
|
||||
const partition_key* key() const { return _key; }
|
||||
|
||||
friend std::ostream& operator<<(std::ostream&, ring_position_view);
|
||||
|
||||
@@ -219,9 +219,9 @@ class list_reader_selector : public reader_selector {
|
||||
std::vector<flat_mutation_reader> _readers;
|
||||
|
||||
public:
|
||||
explicit list_reader_selector(std::vector<flat_mutation_reader> readers)
|
||||
: _readers(std::move(readers)) {
|
||||
_selector_position = dht::minimum_token();
|
||||
explicit list_reader_selector(schema_ptr s, std::vector<flat_mutation_reader> readers)
|
||||
: reader_selector(s, dht::ring_position::min())
|
||||
, _readers(std::move(readers)) {
|
||||
}
|
||||
|
||||
list_reader_selector(const list_reader_selector&) = delete;
|
||||
@@ -231,7 +231,7 @@ public:
|
||||
list_reader_selector& operator=(list_reader_selector&&) = default;
|
||||
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const) override {
|
||||
_selector_position = dht::maximum_token();
|
||||
_selector_position = dht::ring_position::max();
|
||||
return std::exchange(_readers, {});
|
||||
}
|
||||
|
||||
@@ -497,7 +497,7 @@ flat_mutation_reader make_combined_reader(schema_ptr schema,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_flat_mutation_reader<combined_mutation_reader>(schema,
|
||||
std::make_unique<list_reader_selector>(std::move(readers)),
|
||||
std::make_unique<list_reader_selector>(schema, std::move(readers)),
|
||||
fwd_sm,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
@@ -107,8 +107,11 @@ make_mutation_reader(Args&&... args) {
|
||||
|
||||
class reader_selector {
|
||||
protected:
|
||||
dht::token _selector_position;
|
||||
schema_ptr _s;
|
||||
dht::ring_position _selector_position;
|
||||
public:
|
||||
reader_selector(schema_ptr s, dht::ring_position rp) noexcept : _s(std::move(s)), _selector_position(std::move(rp)) {}
|
||||
|
||||
virtual ~reader_selector() = default;
|
||||
// Call only if has_new_readers() returned true.
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const t) = 0;
|
||||
@@ -116,7 +119,8 @@ public:
|
||||
|
||||
// Can be false-positive but never false-negative!
|
||||
bool has_new_readers(const dht::token* const t) const noexcept {
|
||||
return !_selector_position.is_maximum() && (!t || *t >= _selector_position);
|
||||
dht::ring_position_comparator cmp(*_s);
|
||||
return !_selector_position.is_max() && (!t || cmp(dht::ring_position_view(*t), _selector_position) >= 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ extern logging::logger clogger;
|
||||
class incremental_selector_impl {
|
||||
public:
|
||||
virtual ~incremental_selector_impl() {}
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) = 0;
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) = 0;
|
||||
};
|
||||
|
||||
class sstable_set_impl {
|
||||
@@ -139,9 +139,9 @@ sstable_set::incremental_selector::incremental_selector(sstable_set::incremental
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::token& t) const {
|
||||
if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) {
|
||||
std::tie(_current_token_range, _current_sstables, _current_next_token) = _impl->select(t);
|
||||
std::tie(_current_token_range, _current_sstables, _current_next_position) = _impl->select(t);
|
||||
}
|
||||
return {_current_sstables, _current_next_token};
|
||||
return {_current_sstables, _current_next_position};
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector
|
||||
@@ -176,8 +176,8 @@ public:
|
||||
incremental_selector(const std::vector<shared_sstable>& sstables)
|
||||
: _sstables(sstables) {
|
||||
}
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), _sstables, dht::maximum_token());
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) override {
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), _sstables, dht::ring_position::max());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -293,34 +293,41 @@ public:
|
||||
, _it(leveled_sstables.begin())
|
||||
, _end(leveled_sstables.end()) {
|
||||
}
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) override {
|
||||
auto pr = dht::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
|
||||
auto interval = make_interval(*_schema, std::move(pr));
|
||||
auto ssts = _unleveled_sstables;
|
||||
using namespace dht;
|
||||
|
||||
const auto next_token = [&] {
|
||||
const auto next = std::next(_it);
|
||||
return next == _end ? dht::maximum_token() : next->first.lower().token();
|
||||
auto inclusiveness = [] (auto& interval) {
|
||||
return boost::icl::is_left_closed(interval.bounds()) ? ring_position::token_bound::start : ring_position::token_bound::end;
|
||||
};
|
||||
|
||||
const auto current_token = [&] {
|
||||
return _it == _end ? dht::maximum_token() : _it->first.lower().token();
|
||||
const auto next_pos = [&] {
|
||||
const auto next = std::next(_it);
|
||||
auto& interval = next->first;
|
||||
return next == _end ? ring_position::max() : ring_position(interval.lower().token(), inclusiveness(interval));
|
||||
};
|
||||
|
||||
const auto current_pos = [&] {
|
||||
auto& interval = _it->first;
|
||||
return _it == _end ? ring_position::max() : ring_position(interval.lower().token(), inclusiveness(interval));
|
||||
};
|
||||
|
||||
while (_it != _end) {
|
||||
if (boost::icl::contains(_it->first, interval)) {
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_tuple(to_token_range(_it->first), std::move(ssts), next_token());
|
||||
return std::make_tuple(to_token_range(_it->first), std::move(ssts), next_pos());
|
||||
}
|
||||
// we don't want to skip current interval if token lies before it.
|
||||
if (boost::icl::lower_less(interval, _it->first)) {
|
||||
return std::make_tuple(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
|
||||
std::move(ssts),
|
||||
current_token());
|
||||
current_pos());
|
||||
}
|
||||
_it++;
|
||||
}
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), std::move(ssts), dht::maximum_token());
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), std::move(ssts), ring_position::max());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ public:
|
||||
std::unique_ptr<incremental_selector_impl> _impl;
|
||||
mutable stdx::optional<dht::token_range> _current_token_range;
|
||||
mutable std::vector<shared_sstable> _current_sstables;
|
||||
mutable dht::token _current_next_token;
|
||||
mutable dht::ring_position _current_next_position = dht::ring_position::min();
|
||||
public:
|
||||
~incremental_selector();
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl);
|
||||
@@ -63,7 +63,7 @@ public:
|
||||
|
||||
struct selection {
|
||||
const std::vector<shared_sstable>& sstables;
|
||||
dht::token next_token;
|
||||
dht::ring_position next_position;
|
||||
};
|
||||
|
||||
// Return the sstables that intersect with t and the best next
|
||||
|
||||
@@ -549,7 +549,7 @@ class dummy_incremental_selector : public reader_selector {
|
||||
flat_mutation_reader pop_reader() {
|
||||
auto muts = std::move(_readers_mutations.back());
|
||||
_readers_mutations.pop_back();
|
||||
_selector_position = _readers_mutations.empty() ? dht::maximum_token() : position();
|
||||
_selector_position = _readers_mutations.empty() ? dht::ring_position::max() : dht::ring_position::starting_at(position());
|
||||
return flat_mutation_reader_from_mutation_reader(_s, make_reader_returning_many(std::move(muts), _pr), _fwd);
|
||||
}
|
||||
public:
|
||||
@@ -561,13 +561,13 @@ public:
|
||||
std::vector<std::vector<mutation>> reader_mutations,
|
||||
dht::partition_range pr = query::full_partition_range,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no)
|
||||
: _s(std::move(s))
|
||||
: reader_selector(s, dht::ring_position::min())
|
||||
, _readers_mutations(std::move(reader_mutations))
|
||||
, _fwd(fwd)
|
||||
, _pr(std::move(pr)) {
|
||||
// So we can pop the next reader off the back
|
||||
boost::reverse(_readers_mutations);
|
||||
_selector_position = position();
|
||||
_selector_position = dht::ring_position::starting_at(position());
|
||||
}
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const t) override {
|
||||
if (_readers_mutations.empty()) {
|
||||
@@ -581,7 +581,7 @@ public:
|
||||
return readers;
|
||||
}
|
||||
|
||||
while (!_readers_mutations.empty() && *t >= _selector_position) {
|
||||
while (!_readers_mutations.empty() && *t >= _selector_position.token()) {
|
||||
readers.emplace_back(pop_reader());
|
||||
}
|
||||
return readers;
|
||||
|
||||
@@ -4346,11 +4346,12 @@ SEASTAR_TEST_CASE(compaction_correctness_with_partitioned_sstable_set) {
|
||||
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
auto sst = make_sstable(s, tmp->path, (*gen)++, la, big);
|
||||
sst->set_unshared();
|
||||
sst->set_sstable_level(1); // NEEDED for partitioned_sstable_set to actually have an effect
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto compact = [&, s] (std::vector<shared_sstable> all) -> std::vector<shared_sstable> {
|
||||
// NEEDED for partitioned_sstable_set to actually have an effect
|
||||
std::for_each(all.begin(), all.end(), [] (auto& sst) { sst->set_sstable_level(1); });
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
auto cf = make_lw_shared<column_family>(s, column_family::config(), column_family::no_commitlog(), *cm, cl_stats);
|
||||
cf->mark_ready_for_writes();
|
||||
@@ -4371,26 +4372,77 @@ SEASTAR_TEST_CASE(compaction_correctness_with_partitioned_sstable_set) {
|
||||
auto mut2 = make_insert(tokens[1]);
|
||||
auto mut3 = make_insert(tokens[2]);
|
||||
auto mut4 = make_insert(tokens[3]);
|
||||
std::vector<shared_sstable> sstables = {
|
||||
make_sstable_containing(sst_gen, {mut1, mut2}),
|
||||
make_sstable_containing(sst_gen, {mut3, mut4})
|
||||
};
|
||||
|
||||
auto result = compact(std::move(sstables));
|
||||
BOOST_REQUIRE_EQUAL(4, result.size());
|
||||
{
|
||||
std::vector<shared_sstable> sstables = {
|
||||
make_sstable_containing(sst_gen, {mut1, mut2}),
|
||||
make_sstable_containing(sst_gen, {mut3, mut4})
|
||||
};
|
||||
|
||||
assert_that(sstable_reader(result[0], s))
|
||||
.produces(mut1)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[1], s))
|
||||
.produces(mut2)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[2], s))
|
||||
.produces(mut3)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[3], s))
|
||||
.produces(mut4)
|
||||
.produces_end_of_stream();
|
||||
auto result = compact(std::move(sstables));
|
||||
BOOST_REQUIRE_EQUAL(4, result.size());
|
||||
|
||||
assert_that(sstable_reader(result[0], s))
|
||||
.produces(mut1)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[1], s))
|
||||
.produces(mut2)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[2], s))
|
||||
.produces(mut3)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[3], s))
|
||||
.produces(mut4)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
// with partitioned_sstable_set having an interval with exclusive lower boundary, example:
|
||||
// [mut1, mut2]
|
||||
// (mut2, mut3]
|
||||
std::vector<shared_sstable> sstables = {
|
||||
make_sstable_containing(sst_gen, {mut1, mut2}),
|
||||
make_sstable_containing(sst_gen, {mut2, mut3}),
|
||||
make_sstable_containing(sst_gen, {mut3, mut4})
|
||||
};
|
||||
|
||||
auto result = compact(std::move(sstables));
|
||||
BOOST_REQUIRE_EQUAL(4, result.size());
|
||||
|
||||
assert_that(sstable_reader(result[0], s))
|
||||
.produces(mut1)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[1], s))
|
||||
.produces(mut2)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[2], s))
|
||||
.produces(mut3)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[3], s))
|
||||
.produces(mut4)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
// with gap between tables
|
||||
std::vector<shared_sstable> sstables = {
|
||||
make_sstable_containing(sst_gen, {mut1, mut2}),
|
||||
make_sstable_containing(sst_gen, {mut4, mut4})
|
||||
};
|
||||
|
||||
auto result = compact(std::move(sstables));
|
||||
BOOST_REQUIRE_EQUAL(3, result.size());
|
||||
|
||||
assert_that(sstable_reader(result[0], s))
|
||||
.produces(mut1)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[1], s))
|
||||
.produces(mut2)
|
||||
.produces_end_of_stream();
|
||||
assert_that(sstable_reader(result[2], s))
|
||||
.produces(mut4)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user