Merge "Optimize combined_mutation_reader for disjoint sstable ranges" from Botond
"sstables will sometimes have narrow/disjont ranges (e.g. LCS L1+).
This can be exploited when reading from a range of sstables by opening
sstables on-demand thus saving memory, processing and potentially I/O.
To achieve this combined_mutation_reader is refactored such that the
reader selection logic is moved-out into a reader_selector class.
combined_mutation_reader now takes a reader_selector instance in its
constructor and asks it for new readers for the current ring position
on every call to operator()().
At the moment two specializations of reader_selector are provided:
* list_reader_selector which implements the current logic, that is using
a provided mutation_reader list, and
* incremental_reader_selector which implements the on-demand opening
logic discussed above.
Fixes #1935"
* 'bdenes/optimize_combined_reader-v6' of https://github.com/denesb/scylla:
Add combined_mutation_reader_test unit test
Remove range_sstable_reader
Add incremental_reader_selector
Add reader_selector to combined_mutation_reader
sstable_set::incremental_selector: select() now returns a selection
This commit is contained in:
@@ -200,6 +200,7 @@ scylla_tests = [
|
||||
'tests/sstable_test',
|
||||
'tests/sstable_mutation_test',
|
||||
'tests/sstable_resharding_test',
|
||||
'tests/combined_mutation_reader_test',
|
||||
'tests/memtable_test',
|
||||
'tests/commitlog_test',
|
||||
'tests/cartesian_product_test',
|
||||
@@ -654,7 +655,8 @@ for t in scylla_tests:
|
||||
else:
|
||||
deps[t] += scylla_core + api + idls + ['tests/cql_test_env.cc']
|
||||
|
||||
deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc']
|
||||
deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc', 'tests/sstable_utils.cc']
|
||||
deps['tests/combined_mutation_reader_test'] += ['tests/sstable_utils.cc']
|
||||
|
||||
deps['tests/bytes_ostream_test'] = ['tests/bytes_ostream_test.cc', 'utils/managed_bytes.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc']
|
||||
deps['tests/input_stream_test'] = ['tests/input_stream_test.cc']
|
||||
|
||||
198
database.cc
198
database.cc
@@ -150,7 +150,7 @@ partition_presence_checker
|
||||
column_family::make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set> sstables) {
|
||||
auto sel = make_lw_shared(sstables->make_incremental_selector());
|
||||
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
|
||||
auto& sst = sel->select(key.token());
|
||||
auto& sst = sel->select(key.token()).sstables;
|
||||
if (sst.empty()) {
|
||||
return partition_presence_checker_result::definitely_doesnt_exist;
|
||||
}
|
||||
@@ -351,46 +351,22 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
return sstables;
|
||||
}
|
||||
|
||||
class range_sstable_reader final : public combined_mutation_reader {
|
||||
// Incremental selector implementation for combined_mutation_reader that
|
||||
// selects readers on-demand as the read progresses through the token
|
||||
// range.
|
||||
class incremental_reader_selector : public reader_selector {
|
||||
schema_ptr _s;
|
||||
const dht::partition_range* _pr;
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
|
||||
struct sstable_and_reader {
|
||||
sstables::shared_sstable _sstable;
|
||||
// This indirection is sad, but we need stable pointers to mutation
|
||||
// readers. If this ever becomes a performance issue we could store
|
||||
// mutation readers in an object pool (we don't need to preserve order
|
||||
// and can have holes left in the container when elements are removed).
|
||||
std::unique_ptr<mutation_reader> _reader;
|
||||
|
||||
bool operator<(const sstable_and_reader& other) const {
|
||||
return _sstable < other._sstable;
|
||||
}
|
||||
|
||||
struct less_compare {
|
||||
bool operator()(const sstable_and_reader& a, const sstable_and_reader& b) {
|
||||
return a < b;
|
||||
}
|
||||
bool operator()(const sstable_and_reader& a, const sstables::shared_sstable& b) {
|
||||
return a._sstable < b;
|
||||
}
|
||||
bool operator()(const sstables::shared_sstable& a, const sstable_and_reader& b) {
|
||||
return a < b._sstable;
|
||||
}
|
||||
};
|
||||
};
|
||||
std::vector<sstable_and_reader> _current_readers;
|
||||
|
||||
// Use a pointer instead of copying, so we don't need to regenerate the reader if
|
||||
// the priority changes.
|
||||
const io_priority_class& _pc;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
const query::partition_slice& _slice;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
private:
|
||||
std::unique_ptr<mutation_reader> create_reader(sstables::shared_sstable sst) {
|
||||
sstables::sstable_set::incremental_selector _selector;
|
||||
std::unordered_set<sstables::shared_sstable> _read_sstables;
|
||||
|
||||
mutation_reader create_reader(sstables::shared_sstable sst) {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
|
||||
mutation_reader reader =
|
||||
@@ -398,77 +374,87 @@ private:
|
||||
if (sst->is_shared()) {
|
||||
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
|
||||
}
|
||||
return std::make_unique<mutation_reader>(std::move(reader));
|
||||
return std::move(reader);
|
||||
}
|
||||
|
||||
public:
|
||||
range_sstable_reader(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
explicit incremental_reader_selector(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _s(s)
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _slice(slice)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
{
|
||||
auto ssts = _sstables->select(pr);
|
||||
std::vector<mutation_reader*> readers;
|
||||
readers.reserve(ssts.size());
|
||||
_current_readers.reserve(ssts.size());
|
||||
for (auto& sst : ssts) {
|
||||
auto reader = create_reader(sst);
|
||||
readers.emplace_back(reader.get());
|
||||
_current_readers.emplace_back(sstable_and_reader { sst, std::move(reader) });
|
||||
}
|
||||
init_mutation_reader_set(std::move(readers));
|
||||
, _selector(_sstables->make_incremental_selector()) {
|
||||
_selector_position = _pr->start()->value().token();
|
||||
|
||||
dblog.trace("incremental_reader_selector {}: created for range: ({},{}) with {} sstables",
|
||||
this,
|
||||
_pr->start()->value().token(),
|
||||
_pr->end()->value().token(),
|
||||
_sstables->all()->size());
|
||||
}
|
||||
|
||||
range_sstable_reader(range_sstable_reader&&) = delete; // reader takes reference to member fields
|
||||
incremental_reader_selector(const incremental_reader_selector&) = delete;
|
||||
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
|
||||
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
incremental_reader_selector(incremental_reader_selector&&) = delete;
|
||||
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
|
||||
|
||||
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) override {
|
||||
//TODO: fix after lazy_deref() is available
|
||||
dblog.trace("incremental_reader_selector {}: {}({})", this, __FUNCTION__, t ? sprint("{}", *t) : "null");
|
||||
|
||||
const auto& position = (t ? *t : _selector_position);
|
||||
auto selection = _selector.select(position);
|
||||
|
||||
if (selection.sstables.empty()) {
|
||||
// For the lower bound of the token range the _selector
|
||||
// might not return any sstables, in this case try again
|
||||
// with next_token unless it's maximum token.
|
||||
if (position == _pr->start()->value().token() && !selection.next_token.is_maximum()) {
|
||||
dblog.trace("incremental_reader_selector {}: no sstables intersect with the lower bound, retrying", this);
|
||||
_selector_position = std::move(selection.next_token);
|
||||
return create_new_readers(nullptr);
|
||||
}
|
||||
|
||||
_selector_position = dht::maximum_token();
|
||||
return {};
|
||||
}
|
||||
|
||||
dblog.trace("incremental_reader_selector {}: {} new sstables to consider", this, selection.sstables.size());
|
||||
|
||||
if (selection.next_token == _selector_position) {
|
||||
_selector_position = dht::maximum_token();
|
||||
dblog.trace(
|
||||
"incremental_reader_selector {}: selector ({}) is the same as next_token, setting it to max",
|
||||
this,
|
||||
selection.next_token);
|
||||
} else {
|
||||
_selector_position = std::move(selection.next_token);
|
||||
dblog.trace("incremental_reader_selector {}: advancing selector to {}", this, _selector_position);
|
||||
}
|
||||
|
||||
return boost::copy_range<std::vector<mutation_reader>>(selection.sstables
|
||||
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstables.emplace(sst).second; })
|
||||
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
|
||||
}
|
||||
|
||||
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
|
||||
_pr = ≺
|
||||
_selector_position = _pr->start()->value().token();
|
||||
_read_sstables.clear();
|
||||
|
||||
auto new_sstables = _sstables->select(pr);
|
||||
boost::range::sort(new_sstables);
|
||||
boost::range::sort(_current_readers);
|
||||
|
||||
std::vector<sstables::shared_sstable> to_add;
|
||||
std::vector<sstable_and_reader> to_remove, unchanged;
|
||||
sstable_and_reader::less_compare cmp;
|
||||
boost::set_difference(new_sstables, _current_readers, std::back_inserter(to_add), cmp);
|
||||
std::set_difference(_current_readers.begin(), _current_readers.end(), new_sstables.begin(), new_sstables.end(),
|
||||
boost::back_move_inserter(to_remove), cmp);
|
||||
std::set_intersection(_current_readers.begin(), _current_readers.end(), new_sstables.begin(), new_sstables.end(),
|
||||
boost::back_move_inserter(unchanged), cmp);
|
||||
|
||||
std::vector<sstable_and_reader> to_add_sar;
|
||||
boost::transform(to_add, std::back_inserter(to_add_sar), [&] (const sstables::shared_sstable& sst) {
|
||||
return sstable_and_reader { sst, create_reader(sst) };
|
||||
});
|
||||
|
||||
auto get_mutation_readers = [] (std::vector<sstable_and_reader>& ssts) {
|
||||
std::vector<mutation_reader*> mrs;
|
||||
mrs.reserve(ssts.size());
|
||||
boost::range::transform(ssts, std::back_inserter(mrs), [] (const sstable_and_reader& s_a_r) {
|
||||
return s_a_r._reader.get();
|
||||
});
|
||||
return mrs;
|
||||
};
|
||||
|
||||
auto to_add_mrs = get_mutation_readers(to_add_sar);
|
||||
auto to_remove_mrs = get_mutation_readers(to_remove);
|
||||
|
||||
unchanged.insert(unchanged.end(), std::make_move_iterator(to_add_sar.begin()), std::make_move_iterator(to_add_sar.end()));
|
||||
return combined_mutation_reader::fast_forward_to(std::move(to_add_mrs), std::move(to_remove_mrs), pr).then([this, new_readers = std::move(unchanged)] () mutable {
|
||||
_current_readers = std::move(new_readers);
|
||||
});
|
||||
return create_new_readers(nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -569,8 +555,15 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), std::move(sstables), pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return restrict_reader(make_mutation_reader<combined_mutation_reader>(
|
||||
std::make_unique<incremental_reader_selector>(std::move(s),
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4157,6 +4150,25 @@ void column_family::drop_hit_rate(gms::inet_address addr) {
|
||||
_cluster_cache_hit_rates.erase(addr);
|
||||
}
|
||||
|
||||
mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
return make_mutation_reader<combined_mutation_reader>(std::make_unique<incremental_reader_selector>(std::move(s),
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr));
|
||||
}
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstable_write_permit&& permit, bool backup, const io_priority_class& pc, bool leave_unsealed, seastar::thread_scheduling_group *tsg) {
|
||||
class permit_monitor final : public sstables::write_monitor {
|
||||
|
||||
@@ -819,6 +819,15 @@ public:
|
||||
friend class distributed_loader;
|
||||
};
|
||||
|
||||
mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
class user_types_metadata {
|
||||
std::unordered_map<bytes, user_type> _user_types;
|
||||
public:
|
||||
|
||||
@@ -28,12 +28,75 @@
|
||||
#include "utils/move.hh"
|
||||
#include "stdx.hh"
|
||||
|
||||
// Dumb selector implementation for combined_mutation_reader that simply
|
||||
// forwards it's list of readers.
|
||||
class list_reader_selector : public reader_selector {
|
||||
std::vector<mutation_reader> _readers;
|
||||
|
||||
public:
|
||||
explicit list_reader_selector(std::vector<mutation_reader> readers)
|
||||
: _readers(std::move(readers)) {
|
||||
_selector_position = dht::minimum_token();
|
||||
}
|
||||
|
||||
list_reader_selector(const list_reader_selector&) = delete;
|
||||
list_reader_selector& operator=(const list_reader_selector&) = delete;
|
||||
|
||||
list_reader_selector(list_reader_selector&&) = default;
|
||||
list_reader_selector& operator=(list_reader_selector&&) = default;
|
||||
|
||||
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const) override {
|
||||
if (_readers.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
_selector_position = dht::maximum_token();
|
||||
return std::exchange(_readers, {});
|
||||
}
|
||||
|
||||
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range&) override {
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
void combined_mutation_reader::maybe_add_readers(const dht::token* const t) {
|
||||
if (!_selector->has_new_readers(t)) {
|
||||
return;
|
||||
}
|
||||
|
||||
add_readers(_selector->create_new_readers(t));
|
||||
}
|
||||
|
||||
void combined_mutation_reader::add_readers(std::vector<mutation_reader> new_readers) {
|
||||
for (auto&& new_reader : new_readers) {
|
||||
_readers.emplace_back(std::move(new_reader));
|
||||
|
||||
auto* r = &_readers.back();
|
||||
_all_readers.emplace_back(r);
|
||||
_next.emplace_back(r);
|
||||
}
|
||||
}
|
||||
|
||||
const dht::token* combined_mutation_reader::current_position() const {
|
||||
if (_ptables.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return &_ptables.front().m.decorated_key().token();
|
||||
}
|
||||
|
||||
future<> combined_mutation_reader::prepare_next() {
|
||||
maybe_add_readers(current_position());
|
||||
|
||||
return parallel_for_each(_next, [this] (mutation_reader* mr) {
|
||||
return (*mr)().then([this, mr] (streamed_mutation_opt next) {
|
||||
if (next) {
|
||||
_ptables.emplace_back(mutation_and_reader { std::move(*next), mr });
|
||||
boost::range::push_heap(_ptables, &heap_compare);
|
||||
} else {
|
||||
auto it = std::remove(_all_readers.begin(), _all_readers.end(), mr);
|
||||
_all_readers.erase(it);
|
||||
_readers.remove_if([mr](auto& r) { return &r == mr; });
|
||||
}
|
||||
});
|
||||
}).then([this] {
|
||||
@@ -42,12 +105,12 @@ future<> combined_mutation_reader::prepare_next() {
|
||||
}
|
||||
|
||||
future<streamed_mutation_opt> combined_mutation_reader::next() {
|
||||
if (_current.empty() && !_next.empty()) {
|
||||
if ((_current.empty() && !_next.empty()) || _selector->has_new_readers(current_position())) {
|
||||
return prepare_next().then([this] { return next(); });
|
||||
}
|
||||
if (_ptables.empty()) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
};
|
||||
}
|
||||
|
||||
while (!_ptables.empty()) {
|
||||
boost::range::pop_heap(_ptables, &heap_compare);
|
||||
@@ -71,48 +134,19 @@ future<streamed_mutation_opt> combined_mutation_reader::next() {
|
||||
return make_ready_future<streamed_mutation_opt>(merge_mutations(std::exchange(_current, {})));
|
||||
}
|
||||
|
||||
void combined_mutation_reader::init_mutation_reader_set(std::vector<mutation_reader*> readers)
|
||||
combined_mutation_reader::combined_mutation_reader(std::unique_ptr<reader_selector> selector)
|
||||
: _selector(std::move(selector))
|
||||
{
|
||||
_all_readers = std::move(readers);
|
||||
_next.assign(_all_readers.begin(), _all_readers.end());
|
||||
_ptables.reserve(_all_readers.size());
|
||||
}
|
||||
|
||||
future<> combined_mutation_reader::fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr)
|
||||
{
|
||||
_ptables.clear();
|
||||
|
||||
std::vector<mutation_reader*> new_readers;
|
||||
boost::range::sort(_all_readers);
|
||||
boost::range::sort(to_remove);
|
||||
boost::range::set_difference(_all_readers, to_remove, std::back_inserter(new_readers));
|
||||
_all_readers = std::move(new_readers);
|
||||
return parallel_for_each(_all_readers, [this, &pr] (mutation_reader* mr) {
|
||||
return mr->fast_forward_to(pr);
|
||||
}).then([this, to_add = std::move(to_add)] {
|
||||
_all_readers.insert(_all_readers.end(), to_add.begin(), to_add.end());
|
||||
_next.assign(_all_readers.begin(), _all_readers.end());
|
||||
});
|
||||
}
|
||||
|
||||
combined_mutation_reader::combined_mutation_reader(std::vector<mutation_reader> readers)
|
||||
: _readers(std::move(readers))
|
||||
{
|
||||
_next.reserve(_readers.size());
|
||||
_current.reserve(_readers.size());
|
||||
_ptables.reserve(_readers.size());
|
||||
|
||||
for (auto&& r : _readers) {
|
||||
_next.emplace_back(&r);
|
||||
}
|
||||
_all_readers.assign(_next.begin(), _next.end());
|
||||
}
|
||||
|
||||
future<> combined_mutation_reader::fast_forward_to(const dht::partition_range& pr) {
|
||||
_ptables.clear();
|
||||
_next.assign(_all_readers.begin(), _all_readers.end());
|
||||
|
||||
return parallel_for_each(_next, [this, &pr] (mutation_reader* mr) {
|
||||
return mr->fast_forward_to(pr);
|
||||
}).then([this, pr] {
|
||||
add_readers(_selector->fast_forward_to(pr));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -122,7 +156,7 @@ future<streamed_mutation_opt> combined_mutation_reader::operator()() {
|
||||
|
||||
mutation_reader
|
||||
make_combined_reader(std::vector<mutation_reader> readers) {
|
||||
return make_mutation_reader<combined_mutation_reader>(std::move(readers));
|
||||
return make_mutation_reader<combined_mutation_reader>(std::make_unique<list_reader_selector>(std::move(readers)));
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
|
||||
@@ -105,9 +105,25 @@ make_mutation_reader(Args&&... args) {
|
||||
return mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
class reader_selector {
|
||||
protected:
|
||||
dht::token _selector_position;
|
||||
public:
|
||||
virtual ~reader_selector() = default;
|
||||
// Call only if has_new_readers() returned true.
|
||||
virtual std::vector<mutation_reader> create_new_readers(const dht::token* const t) = 0;
|
||||
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) = 0;
|
||||
|
||||
// Can be false-positive but never false-negative!
|
||||
bool has_new_readers(const dht::token* const t) const noexcept {
|
||||
return !_selector_position.is_maximum() && (!t || *t >= _selector_position);
|
||||
}
|
||||
};
|
||||
|
||||
// Combines multiple mutation_readers into one.
|
||||
class combined_mutation_reader : public mutation_reader::impl {
|
||||
std::vector<mutation_reader> _readers;
|
||||
std::unique_ptr<reader_selector> _selector;
|
||||
std::list<mutation_reader> _readers;
|
||||
std::vector<mutation_reader*> _all_readers;
|
||||
|
||||
struct mutation_and_reader {
|
||||
@@ -140,15 +156,14 @@ class combined_mutation_reader : public mutation_reader::impl {
|
||||
std::vector<streamed_mutation> _current;
|
||||
std::vector<mutation_reader*> _next;
|
||||
private:
|
||||
const dht::token* current_position() const;
|
||||
void maybe_add_readers(const dht::token* const t);
|
||||
void add_readers(std::vector<mutation_reader> new_readers);
|
||||
future<> prepare_next();
|
||||
// Produces next mutation or disengaged optional if there are no more.
|
||||
future<streamed_mutation_opt> next();
|
||||
protected:
|
||||
combined_mutation_reader() = default;
|
||||
void init_mutation_reader_set(std::vector<mutation_reader*>);
|
||||
future<> fast_forward_to(std::vector<mutation_reader*> to_add, std::vector<mutation_reader*> to_remove, const dht::partition_range& pr);
|
||||
public:
|
||||
combined_mutation_reader(std::vector<mutation_reader> readers);
|
||||
combined_mutation_reader(std::unique_ptr<reader_selector> selector);
|
||||
virtual future<streamed_mutation_opt> operator()() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
|
||||
};
|
||||
|
||||
@@ -90,7 +90,7 @@ static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf,
|
||||
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk) {
|
||||
auto timestamp = api::max_timestamp;
|
||||
stdx::optional<utils::hashed_key> hk;
|
||||
for (auto&& sst : boost::range::join(selector.select(dk.token()), cf.compacted_undeleted_sstables())) {
|
||||
for (auto&& sst : boost::range::join(selector.select(dk.token()).sstables, cf.compacted_undeleted_sstables())) {
|
||||
if (compacting_set.count(sst)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ extern logging::logger clogger;
|
||||
class incremental_selector_impl {
|
||||
public:
|
||||
virtual ~incremental_selector_impl() {}
|
||||
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) = 0;
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) = 0;
|
||||
};
|
||||
|
||||
class sstable_set_impl {
|
||||
@@ -136,14 +136,12 @@ sstable_set::incremental_selector::~incremental_selector() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
||||
|
||||
const std::vector<shared_sstable>&
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::token& t) const {
|
||||
if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) {
|
||||
auto&& x = _impl->select(t);
|
||||
_current_token_range = std::move(std::get<0>(x));
|
||||
_current_sstables = std::move(std::get<1>(x));
|
||||
std::tie(_current_token_range, _current_sstables, _current_next_token) = _impl->select(t);
|
||||
}
|
||||
return _current_sstables;
|
||||
return {_current_sstables, _current_next_token};
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector
|
||||
@@ -178,8 +176,8 @@ public:
|
||||
incremental_selector(const std::vector<shared_sstable>& sstables)
|
||||
: _sstables(sstables) {
|
||||
}
|
||||
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
|
||||
return std::make_pair(dht::token_range::make_open_ended_both_sides(), _sstables);
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), _sstables, dht::maximum_token());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -295,24 +293,30 @@ public:
|
||||
, _it(leveled_sstables.begin())
|
||||
, _end(leveled_sstables.end()) {
|
||||
}
|
||||
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
|
||||
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::token> select(const dht::token& token) override {
|
||||
auto pr = dht::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
|
||||
auto interval = make_interval(*_schema, std::move(pr));
|
||||
auto ssts = _unleveled_sstables;
|
||||
|
||||
const auto next_token = [&] {
|
||||
const auto next = std::next(_it);
|
||||
return next == _end ? dht::maximum_token() : next->first.lower().token();
|
||||
};
|
||||
|
||||
while (_it != _end) {
|
||||
if (boost::icl::contains(_it->first, interval)) {
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_pair(to_token_range(_it->first), std::move(ssts));
|
||||
return std::make_tuple(to_token_range(_it->first), std::move(ssts), next_token());
|
||||
}
|
||||
// we don't want to skip current interval if token lies before it.
|
||||
if (boost::icl::lower_less(interval, _it->first)) {
|
||||
return std::make_pair(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
|
||||
std::move(ssts));
|
||||
return std::make_tuple(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
|
||||
std::move(ssts),
|
||||
next_token());
|
||||
}
|
||||
_it++;
|
||||
}
|
||||
return std::make_pair(dht::token_range::make_open_ended_both_sides(), std::move(ssts));
|
||||
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), std::move(ssts), dht::maximum_token());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -55,11 +55,24 @@ public:
|
||||
std::unique_ptr<incremental_selector_impl> _impl;
|
||||
mutable stdx::optional<dht::token_range> _current_token_range;
|
||||
mutable std::vector<shared_sstable> _current_sstables;
|
||||
mutable dht::token _current_next_token;
|
||||
public:
|
||||
~incremental_selector();
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl);
|
||||
incremental_selector(incremental_selector&&) noexcept;
|
||||
const std::vector<shared_sstable>& select(const dht::token& t) const;
|
||||
|
||||
struct selection {
|
||||
const std::vector<shared_sstable>& sstables;
|
||||
dht::token next_token;
|
||||
};
|
||||
|
||||
// Return the sstables that intersect with t and the best next
|
||||
// token (inclusive) to call select() with so that the least
|
||||
// amount of sstables will be returned (without skipping any).
|
||||
// NOTE: selection.sstables is a reference to an internal cache
|
||||
// and can be invalidated by another call to select().
|
||||
// If you need it long-term copy it!
|
||||
selection select(const dht::token& t) const;
|
||||
};
|
||||
incremental_selector make_incremental_selector() const;
|
||||
};
|
||||
|
||||
199
tests/combined_mutation_reader_test.cc
Normal file
199
tests/combined_mutation_reader_test.cc
Normal file
@@ -0,0 +1,199 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
|
||||
#include "core/sstring.hh"
|
||||
#include "core/thread.hh"
|
||||
|
||||
#include "database.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "tmpdir.hh"
|
||||
#include "sstable_mutation_readers.hh"
|
||||
#include "cell_locking.hh"
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/mutation_reader_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "tests/simple_schema.hh"
|
||||
#include "tests/sstable_utils.hh"
|
||||
#include "tests/sstable_test.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
struct sst_factory {
|
||||
schema_ptr s;
|
||||
sstring path;
|
||||
unsigned gen;
|
||||
int level;
|
||||
|
||||
sst_factory(schema_ptr s, const sstring& path, unsigned gen, int level)
|
||||
: s(s)
|
||||
, path(path)
|
||||
, gen(gen)
|
||||
, level(level)
|
||||
{}
|
||||
|
||||
sstables::shared_sstable operator()() {
|
||||
auto sst = make_lw_shared<sstables::sstable>(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
|
||||
sst->set_unshared();
|
||||
|
||||
//TODO set sstable level, to make the test more interesting
|
||||
|
||||
return sst;
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(combined_mutation_reader_test) {
|
||||
return seastar::async([] {
|
||||
//logging::logger_registry().set_logger_level("database", logging::log_level::trace);
|
||||
|
||||
simple_schema s;
|
||||
|
||||
const auto pkeys = s.make_pkeys(4);
|
||||
const auto ckeys = s.make_ckeys(4);
|
||||
|
||||
std::vector<mutation> base_mutations = boost::copy_range<std::vector<mutation>>(
|
||||
pkeys | boost::adaptors::transformed([&s](const auto& k) { return mutation(k, s.schema()); }));
|
||||
|
||||
// Data layout:
|
||||
// d[xx]
|
||||
// b[xx][xx]c
|
||||
// a[x x]
|
||||
|
||||
int i{0};
|
||||
|
||||
// sstable d
|
||||
std::vector<mutation> table_d_mutations;
|
||||
|
||||
i = 1;
|
||||
table_d_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_d_mutations.back(), ckeys[i], sprint("val_d_%i", i));
|
||||
|
||||
i = 2;
|
||||
table_d_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_d_mutations.back(), ckeys[i], sprint("val_d_%i", i));
|
||||
const auto t_static_row = s.add_static_row(table_d_mutations.back(), sprint("%i_static_val", i));
|
||||
|
||||
// sstable b
|
||||
std::vector<mutation> table_b_mutations;
|
||||
|
||||
i = 0;
|
||||
table_b_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_b_mutations.back(), ckeys[i], sprint("val_b_%i", i));
|
||||
|
||||
i = 1;
|
||||
table_b_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_b_mutations.back(), ckeys[i], sprint("val_b_%i", i));
|
||||
|
||||
// sstable c
|
||||
std::vector<mutation> table_c_mutations;
|
||||
|
||||
i = 2;
|
||||
table_c_mutations.emplace_back(base_mutations[i]);
|
||||
const auto t_row = s.add_row(table_c_mutations.back(), ckeys[i], sprint("val_c_%i", i));
|
||||
|
||||
i = 3;
|
||||
table_c_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_c_mutations.back(), ckeys[i], sprint("val_c_%i", i));
|
||||
|
||||
// sstable a
|
||||
std::vector<mutation> table_a_mutations;
|
||||
|
||||
i = 0;
|
||||
table_a_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_a_mutations.back(), ckeys[i], sprint("val_a_%i", i));
|
||||
|
||||
i = 3;
|
||||
table_a_mutations.emplace_back(base_mutations[i]);
|
||||
s.add_row(table_a_mutations.back(), ckeys[i], sprint("val_a_%i", i));
|
||||
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
|
||||
std::cout << tmp->path << std::endl;
|
||||
|
||||
unsigned gen{0};
|
||||
|
||||
std::vector<sstables::shared_sstable> tables = {
|
||||
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 0), table_a_mutations),
|
||||
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 1), table_b_mutations),
|
||||
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 1), table_c_mutations),
|
||||
make_sstable_containing(sst_factory(s.schema(), tmp->path, gen++, 2), table_d_mutations)
|
||||
};
|
||||
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {});
|
||||
auto sstables = make_lw_shared<sstables::sstable_set>(cs.make_sstable_set(s.schema()));
|
||||
|
||||
std::vector<mutation_reader> sstable_mutation_readers;
|
||||
|
||||
for (auto table : tables) {
|
||||
sstables->insert(table);
|
||||
|
||||
sstable_mutation_readers.emplace_back(make_mutation_reader<sstable_range_wrapping_reader>(
|
||||
table,
|
||||
s.schema(),
|
||||
dht::partition_range{dht::ring_position::min(), dht::ring_position::max()},
|
||||
query::full_slice,
|
||||
seastar::default_priority_class(),
|
||||
streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes));
|
||||
}
|
||||
|
||||
auto list_reader = make_combined_reader(std::move(sstable_mutation_readers));
|
||||
|
||||
const auto pr = dht::partition_range{dht::ring_position::min(), dht::ring_position::max()};
|
||||
const auto qs = query::full_slice;
|
||||
const auto pc = seastar::default_priority_class();
|
||||
|
||||
auto incremental_reader = make_range_sstable_reader(
|
||||
s.schema(),
|
||||
sstables,
|
||||
pr,
|
||||
qs,
|
||||
pc,
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes);
|
||||
|
||||
// merge c[0] with d[1]
|
||||
i = 2;
|
||||
auto c_d_merged = mutation(pkeys[i], s.schema());
|
||||
s.add_row(c_d_merged, ckeys[i], sprint("val_c_%i", i), t_row);
|
||||
s.add_static_row(c_d_merged, sprint("%i_static_val", i), t_static_row);
|
||||
|
||||
assert_that(std::move(list_reader))
|
||||
.produces(table_a_mutations.front())
|
||||
.produces(table_b_mutations[1])
|
||||
.produces(c_d_merged)
|
||||
.produces(table_a_mutations.back());
|
||||
|
||||
assert_that(std::move(incremental_reader))
|
||||
.produces(table_a_mutations.front())
|
||||
.produces(table_b_mutations[1])
|
||||
.produces(c_d_merged)
|
||||
.produces(table_a_mutations.back());
|
||||
});
|
||||
};
|
||||
@@ -78,11 +78,12 @@ public:
|
||||
return dht::global_partitioner().decorate_key(*_s, key);
|
||||
}
|
||||
|
||||
void add_row(mutation& m, const clustering_key& key, const sstring& v, api::timestamp_type t = api::missing_timestamp) {
|
||||
api::timestamp_type add_row(mutation& m, const clustering_key& key, const sstring& v, api::timestamp_type t = api::missing_timestamp) {
|
||||
if (t == api::missing_timestamp) {
|
||||
t = new_timestamp();
|
||||
}
|
||||
m.set_clustered_cell(key, _v_def, atomic_cell::make_live(t, data_value(v).serialize()));
|
||||
return t;
|
||||
}
|
||||
|
||||
std::pair<sstring, api::timestamp_type> get_value(const clustering_row& row) {
|
||||
@@ -104,8 +105,13 @@ public:
|
||||
return mutation_fragment(std::move(row));
|
||||
}
|
||||
|
||||
void add_static_row(mutation& m, sstring s1) {
|
||||
m.set_static_cell(to_bytes("s1"), data_value(s1), new_timestamp());
|
||||
api::timestamp_type add_static_row(mutation& m, sstring s1, api::timestamp_type t = api::missing_timestamp) {
|
||||
if (t == api::missing_timestamp) {
|
||||
t = new_timestamp();
|
||||
}
|
||||
m.set_static_cell(to_bytes("s1"), data_value(s1), t);
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
range_tombstone delete_range(mutation& m, const query::clustering_range& range) {
|
||||
|
||||
@@ -56,6 +56,8 @@
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
#include <boost/algorithm/cxx11/all_of.hpp>
|
||||
|
||||
#include "sstable_utils.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static sstring some_keyspace("ks");
|
||||
@@ -70,8 +72,6 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
|
||||
}
|
||||
}
|
||||
|
||||
static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts);
|
||||
|
||||
SEASTAR_TEST_CASE(datafile_generation_01) {
|
||||
// Data file with clustering key
|
||||
//
|
||||
@@ -2287,37 +2287,6 @@ SEASTAR_TEST_CASE(check_read_indexes) {
|
||||
});
|
||||
}
|
||||
|
||||
// Must run in a seastar thread
|
||||
static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts) {
|
||||
auto sst = sst_factory();
|
||||
schema_ptr s = muts[0].schema();
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto&& m : muts) {
|
||||
mt->apply(m);
|
||||
}
|
||||
write_memtable_to_sstable(*mt, sst).get();
|
||||
sst->open_data().get();
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
for (auto&& m : muts) {
|
||||
auto result = merged.insert(m);
|
||||
if (!result.second) {
|
||||
auto old = *result.first;
|
||||
merged.erase(result.first);
|
||||
merged.insert(old + m);
|
||||
}
|
||||
}
|
||||
|
||||
// validate the sstable
|
||||
auto rd = assert_that(sstable_reader(sst, s));
|
||||
for (auto&& m : merged) {
|
||||
rd.produces(m);
|
||||
}
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
return sst;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(tombstone_purge_test) {
|
||||
BOOST_REQUIRE(smp::count == 1);
|
||||
return seastar::async([] {
|
||||
@@ -3840,7 +3809,7 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
|
||||
auto check = [] (sstable_set::incremental_selector& selector, const dht::token& token, std::unordered_set<int64_t> expected_gens) {
|
||||
auto sstables = selector.select(token);
|
||||
auto sstables = selector.select(token).sstables;
|
||||
BOOST_REQUIRE(sstables.size() == expected_gens.size());
|
||||
for (auto& sst : sstables) {
|
||||
BOOST_REQUIRE(expected_gens.count(sst->generation()) == 1);
|
||||
|
||||
59
tests/sstable_utils.cc
Normal file
59
tests/sstable_utils.cc
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sstable_utils.hh"
|
||||
|
||||
#include <tests/test-utils.hh>
|
||||
|
||||
#include "database.hh"
|
||||
#include "memtable-sstable.hh"
|
||||
#include "mutation_reader_assertions.hh"
|
||||
|
||||
|
||||
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts) {
|
||||
auto sst = sst_factory();
|
||||
schema_ptr s = muts[0].schema();
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto&& m : muts) {
|
||||
mt->apply(m);
|
||||
}
|
||||
write_memtable_to_sstable(*mt, sst).get();
|
||||
sst->open_data().get();
|
||||
|
||||
std::set<mutation, mutation_decorated_key_less_comparator> merged;
|
||||
for (auto&& m : muts) {
|
||||
auto result = merged.insert(m);
|
||||
if (!result.second) {
|
||||
auto old = *result.first;
|
||||
merged.erase(result.first);
|
||||
merged.insert(old + m);
|
||||
}
|
||||
}
|
||||
|
||||
// validate the sstable
|
||||
auto rd = assert_that(sst->as_mutation_source()(s));
|
||||
for (auto&& m : merged) {
|
||||
rd.produces(m);
|
||||
}
|
||||
rd.produces_end_of_stream();
|
||||
|
||||
return sst;
|
||||
}
|
||||
24
tests/sstable_utils.hh
Normal file
24
tests/sstable_utils.hh
Normal file
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts);
|
||||
Reference in New Issue
Block a user