Merge "Optimize combined_mutation_reader for disjoint sstable ranges" from Botond

"sstables will sometimes have narrow/disjont ranges (e.g. LCS L1+).
This can be exploited when reading from a range of sstables by opening
sstables on-demand thus saving memory, processing and potentially I/O.

To achieve this combined_mutation_reader is refactored such that the
reader selection logic is moved-out into a reader_selector class.
combined_mutation_reader now takes a reader_selector instance in its
constructor and asks it for new readers for the current ring position
on every call to operator()().

At the moment two specializations of reader_selector are provided:
* list_reader_selector which implements the current logic, that is using
    a provided mutation_reader list, and
* incremental_reader_selector which implements the on-demand opening
    logic discussed above.

Fixes #1935"

* 'bdenes/optimize_combined_reader-v6' of https://github.com/denesb/scylla:
  Add combined_mutation_reader_test unit test
  Remove range_sstable_reader
  Add incremental_reader_selector
  Add reader_selector to combined_mutation_reader
  sstable_set::incremental_selector: select() now returns a selection
This commit is contained in:
Avi Kivity
2017-08-10 15:16:30 +03:00
13 changed files with 535 additions and 189 deletions

View File

@@ -200,6 +200,7 @@ scylla_tests = [
'tests/sstable_test',
'tests/sstable_mutation_test',
'tests/sstable_resharding_test',
'tests/combined_mutation_reader_test',
'tests/memtable_test',
'tests/commitlog_test',
'tests/cartesian_product_test',
@@ -654,7 +655,8 @@ for t in scylla_tests:
else:
deps[t] += scylla_core + api + idls + ['tests/cql_test_env.cc']
deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc']
deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc', 'tests/sstable_utils.cc']
deps['tests/combined_mutation_reader_test'] += ['tests/sstable_utils.cc']
deps['tests/bytes_ostream_test'] = ['tests/bytes_ostream_test.cc', 'utils/managed_bytes.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc']
deps['tests/input_stream_test'] = ['tests/input_stream_test.cc']

View File

@@ -150,7 +150,7 @@ partition_presence_checker
column_family::make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set> sstables) {
auto sel = make_lw_shared(sstables->make_incremental_selector());
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
auto& sst = sel->select(key.token());
auto& sst = sel->select(key.token()).sstables;
if (sst.empty()) {
return partition_presence_checker_result::definitely_doesnt_exist;
}
@@ -351,46 +351,22 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
return sstables;
}
class range_sstable_reader final : public combined_mutation_reader {
// Incremental selector implementation for combined_mutation_reader that
// selects readers on-demand as the read progresses through the token
// range.
class incremental_reader_selector : public reader_selector {
schema_ptr _s;
const dht::partition_range* _pr;
lw_shared_ptr<sstables::sstable_set> _sstables;
struct sstable_and_reader {
sstables::shared_sstable _sstable;
// This indirection is sad, but we need stable pointers to mutation
// readers. If this ever becomes a performance issue we could store
// mutation readers in an object pool (we don't need to preserve order
// and can have holes left in the container when elements are removed).
std::unique_ptr<mutation_reader> _reader;
bool operator<(const sstable_and_reader& other) const {
return _sstable < other._sstable;
}
struct less_compare {
bool operator()(const sstable_and_reader& a, const sstable_and_reader& b) {
return a < b;
}
bool operator()(const sstable_and_reader& a, const sstables::shared_sstable& b) {
return a._sstable < b;
}
bool operator()(const sstables::shared_sstable& a, const sstable_and_reader& b) {
return a < b._sstable;
}
};
};
std::vector<sstable_and_reader> _current_readers;
// Use a pointer instead of copying, so we don't need to regenerate the reader if
// the priority changes.
const io_priority_class& _pc;
tracing::trace_state_ptr _trace_state;
const query::partition_slice& _slice;
tracing::trace_state_ptr _trace_state;
streamed_mutation::forwarding _fwd;
mutation_reader::forwarding _fwd_mr;
private:
std::unique_ptr<mutation_reader> create_reader(sstables::shared_sstable sst) {
sstables::sstable_set::incremental_selector _selector;
std::unordered_set<sstables::shared_sstable> _read_sstables;
mutation_reader create_reader(sstables::shared_sstable sst) {
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
mutation_reader reader =
@@ -398,77 +374,87 @@ private:
if (sst->is_shared()) {
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
}
return std::make_unique<mutation_reader>(std::move(reader));
return std::move(reader);
}
public:
range_sstable_reader(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
explicit incremental_reader_selector(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
: _s(s)
, _pr(&pr)
, _sstables(std::move(sstables))
, _pc(pc)
, _trace_state(std::move(trace_state))
, _slice(slice)
, _trace_state(std::move(trace_state))
, _fwd(fwd)
, _fwd_mr(fwd_mr)
{
auto ssts = _sstables->select(pr);
std::vector<mutation_reader*> readers;
readers.reserve(ssts.size());
_current_readers.reserve(ssts.size());
for (auto& sst : ssts) {
auto reader = create_reader(sst);
readers.emplace_back(reader.get());
_current_readers.emplace_back(sstable_and_reader { sst, std::move(reader) });
}
init_mutation_reader_set(std::move(readers));
, _selector(_sstables->make_incremental_selector()) {
_selector_position = _pr->start()->value().token();
dblog.trace("incremental_reader_selector {}: created for range: ({},{}) with {} sstables",
this,
_pr->start()->value().token(),
_pr->end()->value().token(),
_sstables->all()->size());
}
range_sstable_reader(range_sstable_reader&&) = delete; // reader takes reference to member fields
incremental_reader_selector(const incremental_reader_selector&) = delete;
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
incremental_reader_selector(incremental_reader_selector&&) = delete;
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) override {
//TODO: fix after lazy_deref() is available
dblog.trace("incremental_reader_selector {}: {}({})", this, __FUNCTION__, t ? sprint("{}", *t) : "null");
const auto& position = (t ? *t : _selector_position);
auto selection = _selector.select(position);
if (selection.sstables.empty()) {
// For the lower bound of the token range the _selector
// might not return any sstables, in this case try again
// with next_token unless it's maximum token.
if (position == _pr->start()->value().token() && !selection.next_token.is_maximum()) {
dblog.trace("incremental_reader_selector {}: no sstables intersect with the lower bound, retrying", this);
_selector_position = std::move(selection.next_token);
return create_new_readers(nullptr);
}
_selector_position = dht::maximum_token();
return {};
}
dblog.trace("incremental_reader_selector {}: {} new sstables to consider", this, selection.sstables.size());
if (selection.next_token == _selector_position) {
_selector_position = dht::maximum_token();
dblog.trace(
"incremental_reader_selector {}: selector ({}) is the same as next_token, setting it to max",
this,
selection.next_token);
} else {
_selector_position = std::move(selection.next_token);
dblog.trace("incremental_reader_selector {}: advancing selector to {}", this, _selector_position);
}
return boost::copy_range<std::vector<mutation_reader>>(selection.sstables
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstables.emplace(sst).second; })
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
}
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
_pr = &pr;
_selector_position = _pr->start()->value().token();
_read_sstables.clear();
auto new_sstables = _sstables->select(pr);
boost::range::sort(new_sstables);
boost::range::sort(_current_readers);
std::vector<sstables::shared_sstable> to_add;
std::vector<sstable_and_reader> to_remove, unchanged;
sstable_and_reader::less_compare cmp;
boost::set_difference(new_sstables, _current_readers, std::back_inserter(to_add), cmp);
std::set_difference(_current_readers.begin(), _current_readers.end(), new_sstables.begin(), new_sstables.end(),
boost::back_move_inserter(to_remove), cmp);
std::set_intersection(_current_readers.begin(), _current_readers.end(), new_sstables.begin(), new_sstables.end(),
boost::back_move_inserter(unchanged), cmp);
std::vector<sstable_and_reader> to_add_sar;
boost::transform(to_add, std::back_inserter(to_add_sar), [&] (const sstables::shared_sstable& sst) {
return sstable_and_reader { sst, create_reader(sst) };
});
auto get_mutation_readers = [] (std::vector<sstable_and_reader>& ssts) {
std::vector<mutation_reader*> mrs;
mrs.reserve(ssts.size());
boost::range::transform(ssts, std::back_inserter(mrs), [] (const sstable_and_reader& s_a_r) {
return s_a_r._reader.get();
});
return mrs;
};
auto to_add_mrs = get_mutation_readers(to_add_sar);
auto to_remove_mrs = get_mutation_readers(to_remove);
unchanged.insert(unchanged.end(), std::make_move_iterator(to_add_sar.begin()), std::make_move_iterator(to_add_sar.end()));
return combined_mutation_reader::fast_forward_to(std::move(to_add_mrs), std::move(to_remove_mrs), pr).then([this, new_readers = std::move(unchanged)] () mutable {
_current_readers = std::move(new_readers);
});
return create_new_readers(nullptr);
}
};
@@ -569,8 +555,15 @@ column_family::make_sstable_reader(schema_ptr s,
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
} else {
// range_sstable_reader is not movable so we need to wrap it
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), std::move(sstables), pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
return restrict_reader(make_mutation_reader<combined_mutation_reader>(
std::make_unique<incremental_reader_selector>(std::move(s),
std::move(sstables),
pr,
slice,
pc,
std::move(trace_state),
fwd,
fwd_mr)));
}
}
@@ -4157,6 +4150,25 @@ void column_family::drop_hit_rate(gms::inet_address addr) {
_cluster_cache_hit_rates.erase(addr);
}
mutation_reader make_range_sstable_reader(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
return make_mutation_reader<combined_mutation_reader>(std::make_unique<incremental_reader_selector>(std::move(s),
std::move(sstables),
pr,
slice,
pc,
std::move(trace_state),
fwd,
fwd_mr));
}
future<>
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstable_write_permit&& permit, bool backup, const io_priority_class& pc, bool leave_unsealed, seastar::thread_scheduling_group *tsg) {
class permit_monitor final : public sstables::write_monitor {

View File

@@ -819,6 +819,15 @@ public:
friend class distributed_loader;
};
mutation_reader make_range_sstable_reader(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr);
class user_types_metadata {
std::unordered_map<bytes, user_type> _user_types;
public:

View File

@@ -28,12 +28,75 @@
#include "utils/move.hh"
#include "stdx.hh"
// Dumb selector implementation for combined_mutation_reader that simply
// forwards it's list of readers.
class list_reader_selector : public reader_selector {
std::vector<mutation_reader> _readers;
public:
explicit list_reader_selector(std::vector<mutation_reader> readers)
: _readers(std::move(readers)) {
_selector_position = dht::minimum_token();
}
list_reader_selector(const list_reader_selector&) = delete;
list_reader_selector& operator=(const list_reader_selector&) = delete;
list_reader_selector(list_reader_selector&&) = default;
list_reader_selector& operator=(list_reader_selector&&) = default;
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const) override {
if (_readers.empty()) {
return {};
}
_selector_position = dht::maximum_token();
return std::exchange(_readers, {});
}
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range&) override {
return {};
}
};
void combined_mutation_reader::maybe_add_readers(const dht::token* const t) {
if (!_selector->has_new_readers(t)) {
return;
}
add_readers(_selector->create_new_readers(t));
}
void combined_mutation_reader::add_readers(std::vector<mutation_reader> new_readers) {
for (auto&& new_reader : new_readers) {
_readers.emplace_back(std::move(new_reader));
auto* r = &_readers.back();
_all_readers.emplace_back(r);
_next.emplace_back(r);
}
}
const dht::token* combined_mutation_reader::current_position() const {
if (_ptables.empty()) {
return nullptr;
}
return &_ptables.front().m.decorated_key().token();
}
future<> combined_mutation_reader::prepare_next() {
maybe_add_readers(current_position());
return parallel_for_each(_next, [this] (mutation_reader* mr) {
return (*mr)().then([this, mr] (streamed_mutation_opt next) {
if (next) {
_ptables.emplace_back(mutation_and_reader { std::move(*next), mr });
boost::range::push_heap(_ptables, &heap_compare);
} else {
auto it = std::remove(_all_readers.begin(), _all_readers.end(), mr);
_all_readers.erase(it);
_readers.remove_if([mr](auto& r) { return &r == mr; });
}
});
}).then([this] {
@@ -42,12 +105,12 @@ future<> combined_mutation_reader::prepare_next() {
}
future<streamed_mutation_opt> combined_mutation_reader::next() {
if (_current.empty() && !_next.empty()) {
if ((_current.empty() && !_next.empty()) || _selector->has_new_readers(current_position())) {
return prepare_next().then([this] { return next(); });
}
if (_ptables.empty()) {
return make_ready_future<streamed_mutation_opt>();
};
}
while (!_ptables.empty()) {
boost::range::pop_heap(_ptables, &heap_compare);
@@ -71,48 +134,19 @@ future<streamed_mutation_opt> combined_mutation_reader::next() {
return make_ready_future<streamed_mutation_opt>(merge_mutations(std::exchange(_current, {})));
}
void combined_mutation_reader::init_mutation_reader_set(std::vector<mutation_reader*> readers)
combined_mutation_reader::combined_mutation_reader(std::unique_ptr<reader_selector> selector)
: _selector(std::move(selector))
{
_all_readers = std::move(readers);
_next.assign(_all_readers.begin(), _all_readers.end());
_ptables.reserve(_all_readers.size());
}
future<> combined_mutation_reader::fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr)
{
_ptables.clear();
std::vector<mutation_reader*> new_readers;
boost::range::sort(_all_readers);
boost::range::sort(to_remove);
boost::range::set_difference(_all_readers, to_remove, std::back_inserter(new_readers));
_all_readers = std::move(new_readers);
return parallel_for_each(_all_readers, [this, &pr] (mutation_reader* mr) {
return mr->fast_forward_to(pr);
}).then([this, to_add = std::move(to_add)] {
_all_readers.insert(_all_readers.end(), to_add.begin(), to_add.end());
_next.assign(_all_readers.begin(), _all_readers.end());
});
}
combined_mutation_reader::combined_mutation_reader(std::vector<mutation_reader> readers)
: _readers(std::move(readers))
{
_next.reserve(_readers.size());
_current.reserve(_readers.size());
_ptables.reserve(_readers.size());
for (auto&& r : _readers) {
_next.emplace_back(&r);
}
_all_readers.assign(_next.begin(), _next.end());
}
future<> combined_mutation_reader::fast_forward_to(const dht::partition_range& pr) {
_ptables.clear();
_next.assign(_all_readers.begin(), _all_readers.end());
return parallel_for_each(_next, [this, &pr] (mutation_reader* mr) {
return mr->fast_forward_to(pr);
}).then([this, pr] {
add_readers(_selector->fast_forward_to(pr));
});
}
@@ -122,7 +156,7 @@ future<streamed_mutation_opt> combined_mutation_reader::operator()() {
mutation_reader
make_combined_reader(std::vector<mutation_reader> readers) {
return make_mutation_reader<combined_mutation_reader>(std::move(readers));
return make_mutation_reader<combined_mutation_reader>(std::make_unique<list_reader_selector>(std::move(readers)));
}
mutation_reader

View File

@@ -105,9 +105,25 @@ make_mutation_reader(Args&&... args) {
return mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
}
class reader_selector {
protected:
dht::token _selector_position;
public:
virtual ~reader_selector() = default;
// Call only if has_new_readers() returned true.
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) = 0;
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) = 0;
// Can be false-positive but never false-negative!
bool has_new_readers(const dht::token* const t) const noexcept {
return !_selector_position.is_maximum() && (!t || *t >= _selector_position);
}
};
// Combines multiple mutation_readers into one.
class combined_mutation_reader : public mutation_reader::impl {
std::vector<mutation_reader> _readers;
std::unique_ptr<reader_selector> _selector;
std::list<mutation_reader> _readers;
std::vector<mutation_reader*> _all_readers;
struct mutation_and_reader {
@@ -140,15 +156,14 @@ class combined_mutation_reader : public mutation_reader::impl {
std::vector<streamed_mutation> _current;
std::vector<mutation_reader*> _next;
private:
const dht::token* current_position() const;
void maybe_add_readers(const dht::token* const t);
void add_readers(std::vector<mutation_reader> new_readers);
future<> prepare_next();
// Produces next mutation or disengaged optional if there are no more.
future<streamed_mutation_opt> next();
protected:
combined_mutation_reader() = default;
void init_mutation_reader_set(std::vector<mutation_reader*>);
future<> fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr);
public:
combined_mutation_reader(std::vector<mutation_reader> readers);
combined_mutation_reader(std::unique_ptr<reader_selector> selector);
virtual future<streamed_mutation_opt> operator()() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
};

View File

@@ -90,7 +90,7 @@ static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf,
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk) {
auto timestamp = api::max_timestamp;
stdx::optional<utils::hashed_key> hk;
for (auto&& sst : boost::range::join(selector.select(dk.token()), cf.compacted_undeleted_sstables())) {
for (auto&& sst : boost::range::join(selector.select(dk.token()).sstables, cf.compacted_undeleted_sstables())) {
if (compacting_set.count(sst)) {
continue;
}

View File

@@ -67,7 +67,7 @@ extern logging::logger clogger;
class incremental_selector_impl {
public:
virtual ~incremental_selector_impl() {}
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) = 0;
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) = 0;
};
class sstable_set_impl {
@@ -136,14 +136,12 @@ sstable_set::incremental_selector::~incremental_selector() = default;
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
const std::vector<shared_sstable>&
sstable_set::incremental_selector::selection
sstable_set::incremental_selector::select(const dht::token& t) const {
if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) {
auto&& x = _impl->select(t);
_current_token_range = std::move(std::get<0>(x));
_current_sstables = std::move(std::get<1>(x));
std::tie(_current_token_range, _current_sstables, _current_next_token) = _impl->select(t);
}
return _current_sstables;
return {_current_sstables, _current_next_token};
}
sstable_set::incremental_selector
@@ -178,8 +176,8 @@ public:
incremental_selector(const std::vector<shared_sstable>& sstables)
: _sstables(sstables) {
}
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
return std::make_pair(dht::token_range::make_open_ended_both_sides(), _sstables);
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), _sstables, dht::maximum_token());
}
};
@@ -295,24 +293,30 @@ public:
, _it(leveled_sstables.begin())
, _end(leveled_sstables.end()) {
}
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
auto pr = dht::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
auto interval = make_interval(*_schema, std::move(pr));
auto ssts = _unleveled_sstables;
const auto next_token = [&] {
const auto next = std::next(_it);
return next == _end ? dht::maximum_token() : next->first.lower().token();
};
while (_it != _end) {
if (boost::icl::contains(_it->first, interval)) {
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
return std::make_pair(to_token_range(_it->first), std::move(ssts));
return std::make_tuple(to_token_range(_it->first), std::move(ssts), next_token());
}
// we don't want to skip current interval if token lies before it.
if (boost::icl::lower_less(interval, _it->first)) {
return std::make_pair(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
std::move(ssts));
return std::make_tuple(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
std::move(ssts),
next_token());
}
_it++;
}
return std::make_pair(dht::token_range::make_open_ended_both_sides(), std::move(ssts));
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), std::move(ssts), dht::maximum_token());
}
};

View File

@@ -55,11 +55,24 @@ public:
std::unique_ptr<incremental_selector_impl> _impl;
mutable stdx::optional<dht::token_range> _current_token_range;
mutable std::vector<shared_sstable> _current_sstables;
mutable dht::token _current_next_token;
public:
~incremental_selector();
incremental_selector(std::unique_ptr<incremental_selector_impl> impl);
incremental_selector(incremental_selector&&) noexcept;
const std::vector<shared_sstable>& select(const dht::token& t) const;
struct selection {
const std::vector<shared_sstable>& sstables;
dht::token next_token;
};
// Return the sstables that intersect with t and the best next
// token (inclusive) to call select() with so that the least
// amount of sstables will be returned (without skipping any).
// NOTE: selection.sstables is a reference to an internal cache
// and can be invalidated by another call to select().
// If you need it long-term copy it!
selection select(const dht::token& t) const;
};
incremental_selector make_incremental_selector() const;
};

View File

@@ -0,0 +1,199 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
#include "core/sstring.hh"
#include "core/thread.hh"
#include "database.hh"
#include "mutation_reader.hh"
#include "schema_builder.hh"
#include "partition_slice_builder.hh"
#include "tmpdir.hh"
#include "sstable_mutation_readers.hh"
#include "cell_locking.hh"
#include "tests/test-utils.hh"
#include "tests/mutation_assertions.hh"
#include "tests/mutation_reader_assertions.hh"
#include "tests/result_set_assertions.hh"
#include "tests/simple_schema.hh"
#include "tests/sstable_utils.hh"
#include "tests/sstable_test.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
struct sst_factory {
schema_ptr s;
sstring path;
unsigned gen;
int level;
sst_factory(schema_ptr s, const sstring& path, unsigned gen, int level)
: s(s)
, path(path)
, gen(gen)
, level(level)
{}
sstables::shared_sstable operator()() {
auto sst = make_lw_shared<sstables::sstable>(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
sst->set_unshared();
//TODO set sstable level, to make the test more interesting
return sst;
}
};
SEASTAR_TEST_CASE(combined_mutation_reader_test) {
return seastar::async([] {
//logging::logger_registry().set_logger_level("database", logging::log_level::trace);
simple_schema s;
const auto pkeys = s.make_pkeys(4);
const auto ckeys = s.make_ckeys(4);
std::vector<mutation> base_mutations = boost::copy_range<std::vector<mutation>>(
pkeys | boost::adaptors::transformed([&s](const auto& k) { return mutation(k, s.schema()); }));
// Data layout:
// d[xx]
// b[xx][xx]c
// a[x x]
int i{0};
// sstable d
std::vector<mutation> table_d_mutations;
i = 1;
table_d_mutations.emplace_back(base_mutations[i]);
s.add_row(table_d_mutations.back(), ckeys[i], sprint("val_d_%i", i));
i = 2;
table_d_mutations.emplace_back(base_mutations[i]);
s.add_row(table_d_mutations.back(), ckeys[i], sprint("val_d_%i", i));
const auto t_static_row = s.add_static_row(table_d_mutations.back(), sprint("%i_static_val", i));
// sstable b
std::vector<mutation> table_b_mutations;
i = 0;
table_b_mutations.emplace_back(base_mutations[i]);
s.add_row(table_b_mutations.back(), ckeys[i], sprint("val_b_%i", i));
i = 1;
table_b_mutations.emplace_back(base_mutations[i]);
s.add_row(table_b_mutations.back(), ckeys[i], sprint("val_b_%i", i));
// sstable c
std::vector<mutation> table_c_mutations;
i = 2;
table_c_mutations.emplace_back(base_mutations[i]);
const auto t_row = s.add_row(table_c_mutations.back(), ckeys[i], sprint("val_c_%i", i));
i = 3;
table_c_mutations.emplace_back(base_mutations[i]);
s.add_row(table_c_mutations.back(), ckeys[i], sprint("val_c_%i", i));
// sstable a
std::vector<mutation> table_a_mutations;
i = 0;
table_a_mutations.emplace_back(base_mutations[i]);
s.add_row(table_a_mutations.back(), ckeys[i], sprint("val_a_%i", i));
i = 3;
table_a_mutations.emplace_back(base_mutations[i]);
s.add_row(table_a_mutations.back(), ckeys[i], sprint("val_a_%i", i));
auto tmp = make_lw_shared<tmpdir>();
std::cout << tmp->path << std::endl;
unsigned gen{0};
std::vector<sstables::shared_sstable> tables = {
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 0), table_a_mutations),
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 1), table_b_mutations),
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 1), table_c_mutations),
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 2), table_d_mutations)
};
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {});
auto sstables = make_lw_shared<sstables::sstable_set>(cs.make_sstable_set(s.schema()));
std::vector<mutation_reader> sstable_mutation_readers;
for (auto table : tables) {
sstables->insert(table);
sstable_mutation_readers.emplace_back(make_mutation_reader<sstable_range_wrapping_reader>(
table,
s.schema(),
dht::partition_range{dht::ring_position::min(), dht::ring_position::max()},
query::full_slice,
seastar::default_priority_class(),
streamed_mutation::forwarding::no,
::mutation_reader::forwarding::yes));
}
auto list_reader = make_combined_reader(std::move(sstable_mutation_readers));
const auto pr = dht::partition_range{dht::ring_position::min(), dht::ring_position::max()};
const auto qs = query::full_slice;
const auto pc = seastar::default_priority_class();
auto incremental_reader = make_range_sstable_reader(
s.schema(),
sstables,
pr,
qs,
pc,
nullptr,
streamed_mutation::forwarding::no,
::mutation_reader::forwarding::yes);
// merge c[0] with d[1]
i = 2;
auto c_d_merged = mutation(pkeys[i], s.schema());
s.add_row(c_d_merged, ckeys[i], sprint("val_c_%i", i), t_row);
s.add_static_row(c_d_merged, sprint("%i_static_val", i), t_static_row);
assert_that(std::move(list_reader))
.produces(table_a_mutations.front())
.produces(table_b_mutations[1])
.produces(c_d_merged)
.produces(table_a_mutations.back());
assert_that(std::move(incremental_reader))
.produces(table_a_mutations.front())
.produces(table_b_mutations[1])
.produces(c_d_merged)
.produces(table_a_mutations.back());
});
};

View File

@@ -78,11 +78,12 @@ public:
return dht::global_partitioner().decorate_key(*_s, key);
}
void add_row(mutation& m, const clustering_key& key, const sstring& v, api::timestamp_type t = api::missing_timestamp) {
api::timestamp_type add_row(mutation& m, const clustering_key& key, const sstring& v, api::timestamp_type t = api::missing_timestamp) {
if (t == api::missing_timestamp) {
t = new_timestamp();
}
m.set_clustered_cell(key, _v_def, atomic_cell::make_live(t, data_value(v).serialize()));
return t;
}
std::pair<sstring, api::timestamp_type> get_value(const clustering_row& row) {
@@ -104,8 +105,13 @@ public:
return mutation_fragment(std::move(row));
}
void add_static_row(mutation& m, sstring s1) {
m.set_static_cell(to_bytes("s1"), data_value(s1), new_timestamp());
api::timestamp_type add_static_row(mutation& m, sstring s1, api::timestamp_type t = api::missing_timestamp) {
if (t == api::missing_timestamp) {
t = new_timestamp();
}
m.set_static_cell(to_bytes("s1"), data_value(s1), t);
return t;
}
range_tombstone delete_range(mutation& m, const query::clustering_range& range) {

View File

@@ -56,6 +56,8 @@
#include <boost/range/algorithm/find_if.hpp>
#include <boost/algorithm/cxx11/all_of.hpp>
#include "sstable_utils.hh"
using namespace sstables;
static sstring some_keyspace("ks");
@@ -70,8 +72,6 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
}
}
static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts);
SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
@@ -2287,37 +2287,6 @@ SEASTAR_TEST_CASE(check_read_indexes) {
});
}
// Must run in a seastar thread
static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts) {
auto sst = sst_factory();
schema_ptr s = muts[0].schema();
auto mt = make_lw_shared<memtable>(s);
for (auto&& m : muts) {
mt->apply(m);
}
write_memtable_to_sstable(*mt, sst).get();
sst->open_data().get();
std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (auto&& m : muts) {
auto result = merged.insert(m);
if (!result.second) {
auto old = *result.first;
merged.erase(result.first);
merged.insert(old + m);
}
}
// validate the sstable
auto rd = assert_that(sstable_reader(sst, s));
for (auto&& m : merged) {
rd.produces(m);
}
rd.produces_end_of_stream();
return sst;
}
SEASTAR_TEST_CASE(tombstone_purge_test) {
BOOST_REQUIRE(smp::count == 1);
return seastar::async([] {
@@ -3840,7 +3809,7 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
auto key_and_token_pair = token_generation_for_current_shard(8);
auto check = [] (sstable_set::incremental_selector& selector, const dht::token& token, std::unordered_set<int64_t> expected_gens) {
auto sstables = selector.select(token);
auto sstables = selector.select(token).sstables;
BOOST_REQUIRE(sstables.size() == expected_gens.size());
for (auto& sst : sstables) {
BOOST_REQUIRE(expected_gens.count(sst->generation()) == 1);

59
tests/sstable_utils.cc Normal file
View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstable_utils.hh"
#include <tests/test-utils.hh>
#include "database.hh"
#include "memtable-sstable.hh"
#include "mutation_reader_assertions.hh"
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts) {
auto sst = sst_factory();
schema_ptr s = muts[0].schema();
auto mt = make_lw_shared<memtable>(s);
for (auto&& m : muts) {
mt->apply(m);
}
write_memtable_to_sstable(*mt, sst).get();
sst->open_data().get();
std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (auto&& m : muts) {
auto result = merged.insert(m);
if (!result.second) {
auto old = *result.first;
merged.erase(result.first);
merged.insert(old + m);
}
}
// validate the sstable
auto rd = assert_that(sst->as_mutation_source()(s));
for (auto&& m : merged) {
rd.produces(m);
}
rd.produces_end_of_stream();
return sst;
}

24
tests/sstable_utils.hh Normal file
View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/sstables.hh"
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts);