1464 lines
62 KiB
C++
1464 lines
62 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <algorithm>
|
|
|
|
#include "utils/assert.hh"
|
|
#include <seastar/util/defer.hh>
|
|
|
|
#include <boost/icl/interval_map.hpp>
|
|
|
|
#include "sstables.hh"
|
|
|
|
#include "dht/ring_position.hh"
|
|
|
|
#include "sstable_set_impl.hh"
|
|
|
|
#include "replica/database.hh"
|
|
#include "readers/from_mutations.hh"
|
|
#include "readers/empty.hh"
|
|
#include "readers/combined.hh"
|
|
|
|
namespace sstables {
|
|
|
|
extern logging::logger sstlog;
|
|
|
|
bool
|
|
sstable_first_key_less_comparator::operator()(const shared_sstable& s1, const shared_sstable& s2) const {
|
|
auto r = s1->compare_by_first_key(*s2);
|
|
if (r == 0) {
|
|
position_in_partition::less_compare less_cmp(*s1->get_schema());
|
|
return less_cmp(s1->first_partition_first_position(), s2->first_partition_first_position());
|
|
}
|
|
return r < 0;
|
|
}
|
|
|
|
bool sstable_run::will_introduce_overlapping(const shared_sstable& sst) const {
|
|
// checks if s1 is *all* before s2, meaning their bounds don't overlap.
|
|
auto completely_ordered_before = [] (const shared_sstable& s1, const shared_sstable& s2) {
|
|
auto pkey_tri_cmp = [s = s1->get_schema()] (const dht::decorated_key& k1, const dht::decorated_key& k2) {
|
|
return k1.tri_compare(*s, k2);
|
|
};
|
|
auto r = pkey_tri_cmp(s1->get_last_decorated_key(), s2->get_first_decorated_key());
|
|
if (r == 0) {
|
|
position_in_partition::tri_compare ckey_tri_cmp(*s1->get_schema());
|
|
const auto& s1_last_position = s1->last_partition_last_position();
|
|
const auto& s2_first_position = s2->first_partition_first_position();
|
|
auto r2 = ckey_tri_cmp(s1_last_position, s2_first_position);
|
|
// Forgive overlapping if s1's last position and s2's first position are both after key.
|
|
// That still produces correct results because the writer translates after_all_prefixed
|
|
// for s1's end bound into bound_kind::incl_end, and s2's start bound into bound_kind::excl_start,
|
|
// meaning they don't actually overlap.
|
|
if (r2 == 0 && s1_last_position.get_bound_weight() == bound_weight::after_all_prefixed) {
|
|
return true;
|
|
}
|
|
return r2 < 0;
|
|
}
|
|
return r < 0;
|
|
};
|
|
// lower bound will be the 1st element which is not *all* before the candidate sstable.
|
|
// upper bound will be the 1st element which the candidate sstable is *all* before.
|
|
// if there's overlapping, lower bound will be 1st element which overlaps, whereas upper bound the 1st one which doesn't (or end iterator)
|
|
// if there's not overlapping, lower and upper bound will both point to 1st element which the candidate sstable is *all* before (or end iterator).
|
|
auto p = std::equal_range(_all.begin(), _all.end(), sst, completely_ordered_before);
|
|
return p.first != p.second;
|
|
};
|
|
|
|
sstable_run::sstable_run(shared_sstable sst)
|
|
: _all({std::move(sst)}) {
|
|
}
|
|
|
|
bool sstable_run::insert(shared_sstable sst) {
|
|
if (will_introduce_overlapping(sst)) {
|
|
return false;
|
|
}
|
|
_all.insert(std::move(sst));
|
|
return true;
|
|
}
|
|
|
|
void sstable_run::erase(shared_sstable sst) {
|
|
_all.erase(sst);
|
|
}
|
|
|
|
uint64_t sstable_run::data_size() const {
|
|
return std::ranges::fold_left(_all | std::views::transform(std::mem_fn(&sstable::data_size)), uint64_t(0), std::plus{});
|
|
}
|
|
|
|
double sstable_run::estimate_droppable_tombstone_ratio(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const {
|
|
auto estimate_sum = std::ranges::fold_left(_all | std::views::transform(std::bind(&sstable::estimate_droppable_tombstone_ratio, std::placeholders::_1, compaction_time, gc_state, s)), double(0), std::plus{});
|
|
return _all.size() ? estimate_sum / _all.size() : double(0);
|
|
}
|
|
|
|
sstables::run_id sstable_run::run_identifier() const {
|
|
return (_all.empty()) ? run_id() : (*_all.begin())->run_identifier();
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) {
|
|
os << "Run = {\n";
|
|
if (run.all().empty()) {
|
|
os << " Identifier: not found\n";
|
|
} else {
|
|
os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier());
|
|
}
|
|
|
|
auto frags = run.all() | std::ranges::to<std::vector<shared_sstable>>();
|
|
std::ranges::sort(frags, std::ranges::less(), [] (const shared_sstable& x) {
|
|
return x->get_first_decorated_key().token();
|
|
});
|
|
os << " Fragments = {\n";
|
|
for (auto& frag : frags) {
|
|
os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token());
|
|
}
|
|
os << " }\n}\n";
|
|
return os;
|
|
}
|
|
|
|
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl)
|
|
: _impl(std::move(impl))
|
|
{}
|
|
|
|
sstable_set::sstable_set(const sstable_set& x)
|
|
: enable_lw_shared_from_this<sstable_set>()
|
|
, _impl(x._impl->clone())
|
|
{}
|
|
|
|
sstable_set::sstable_set(sstable_set&&) noexcept = default;
|
|
|
|
sstable_set&
|
|
sstable_set::operator=(const sstable_set& x) {
|
|
if (this != &x) {
|
|
auto tmp = sstable_set(x);
|
|
*this = std::move(tmp);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
sstable_set&
|
|
sstable_set::operator=(sstable_set&&) noexcept = default;
|
|
|
|
std::vector<shared_sstable>
|
|
sstable_set::select(const dht::partition_range& range) const {
|
|
return _impl->select(range);
|
|
}
|
|
|
|
std::vector<frozen_sstable_run>
|
|
sstable_set::all_sstable_runs() const {
|
|
return _impl->all_sstable_runs();
|
|
}
|
|
|
|
std::vector<frozen_sstable_run>
|
|
partitioned_sstable_set::all_sstable_runs() const {
|
|
return _all_runs | std::views::values | std::ranges::to<std::vector<frozen_sstable_run>>();
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list>
|
|
sstable_set::all() const {
|
|
return _impl->all();
|
|
}
|
|
|
|
void sstable_set::for_each_sstable(std::function<void(const shared_sstable&)> func) const {
|
|
_impl->for_each_sstable_until([func = std::move(func)] (const shared_sstable& sst) {
|
|
func(sst);
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
stop_iteration sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
|
|
return _impl->for_each_sstable_until(std::move(func));
|
|
}
|
|
|
|
bool
|
|
sstable_set::insert(shared_sstable sst) {
|
|
return _impl->insert(sst);
|
|
}
|
|
|
|
bool
|
|
sstable_set::erase(shared_sstable sst) {
|
|
return _impl->erase(sst);
|
|
}
|
|
|
|
size_t
|
|
sstable_set::size() const noexcept {
|
|
return _impl->size();
|
|
}
|
|
|
|
uint64_t
|
|
sstable_set::bytes_on_disk() const noexcept {
|
|
return _impl->bytes_on_disk();
|
|
}
|
|
|
|
file_size_stats
|
|
sstable_set::get_file_size_stats() const noexcept {
|
|
return _impl->get_file_size_stats();
|
|
}
|
|
|
|
sstable_set::~sstable_set() = default;
|
|
|
|
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
|
|
: _impl(std::move(impl))
|
|
, _cmp(s) {
|
|
}
|
|
|
|
sstable_set::incremental_selector::~incremental_selector() = default;
|
|
|
|
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
|
|
|
sstable_set::incremental_selector::selection
|
|
sstable_set::incremental_selector::select(selector_pos s) const {
|
|
if (!_current_range_view || !_current_range_view->contains(s.pos, _cmp)) {
|
|
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(s);
|
|
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
|
|
}
|
|
return {_current_sstables, _current_next_position};
|
|
}
|
|
|
|
sstable_set::incremental_selector
|
|
sstable_set::make_incremental_selector() const {
|
|
auto selector = _impl->make_incremental_selector();
|
|
return incremental_selector(std::get<0>(std::move(selector)), std::get<1>(selector));
|
|
}
|
|
|
|
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema& s, const dht::partition_range& range) {
|
|
return interval_type::closed(
|
|
dht::compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())),
|
|
dht::compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value())));
|
|
}
|
|
|
|
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const dht::partition_range& range) const {
|
|
return make_interval(*_schema, range);
|
|
}
|
|
|
|
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema_ptr& s, const sstable& sst) {
|
|
return interval_type::closed(
|
|
dht::compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())),
|
|
dht::compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key())));
|
|
}
|
|
|
|
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const sstable& sst) {
|
|
return make_interval(_schema, sst);
|
|
}
|
|
|
|
partitioned_sstable_set::interval_type partitioned_sstable_set::singular(const dht::ring_position& rp) const {
|
|
// We should use the view here, since this is used for queries.
|
|
auto rpv = dht::ring_position_view(rp);
|
|
auto crp = dht::compatible_ring_position_or_view(*_schema, std::move(rpv));
|
|
return interval_type::closed(crp, crp);
|
|
}
|
|
|
|
std::pair<partitioned_sstable_set::map_iterator, partitioned_sstable_set::map_iterator>
|
|
partitioned_sstable_set::query(const dht::partition_range& range) const {
|
|
if (range.start() && range.end()) {
|
|
return _leveled_sstables.equal_range(make_interval(range));
|
|
}
|
|
else if (range.start() && !range.end()) {
|
|
auto start = singular(range.start()->value());
|
|
return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() };
|
|
} else if (!range.start() && range.end()) {
|
|
auto end = singular(range.end()->value());
|
|
return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) };
|
|
} else {
|
|
return { _leveled_sstables.begin(), _leveled_sstables.end() };
|
|
}
|
|
}
|
|
|
|
bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const {
|
|
// When a sstable spans most of the entire token range, we'll store it in a
|
|
// vector, to avoid triggering quadratic space complexity in the interval map,
|
|
// since many of such sstables would have presence on almost all intervals.
|
|
static constexpr float unleveled_threshold = 0.85f;
|
|
auto sst_tr = dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
|
|
bool as_unleveled = dht::overlap_ratio(_token_range, sst_tr) >= unleveled_threshold;
|
|
|
|
utils::get_local_injector().inject("sstable_set_insertion_verification", [&] () {
|
|
auto& i = utils::get_local_injector();
|
|
auto table_name = i.inject_parameter<std::string_view>("sstable_set_insertion_verification", "table").value();
|
|
bool expect_unleveled = i.inject_parameter<int>("sstable_set_insertion_verification", "expect_unleveled").value();
|
|
if (_schema->cf_name() != table_name) {
|
|
return;
|
|
}
|
|
sstlog.info("SSTable {}, as_unleveled={}, expect_unleveled={}, sst_tr={}, overlap_ratio={}",
|
|
sst->generation(), as_unleveled, expect_unleveled, sst_tr, dht::overlap_ratio(_token_range, sst_tr));
|
|
scylla_assert(as_unleveled == expect_unleveled);
|
|
});
|
|
|
|
return as_unleveled;
|
|
}
|
|
|
|
dht::ring_position partitioned_sstable_set::to_ring_position(const dht::compatible_ring_position_or_view& crp) {
|
|
// Ring position views, representing bounds of sstable intervals are
|
|
// guaranteed to have key() != nullptr;
|
|
const auto& pos = crp.position();
|
|
return dht::ring_position(pos.token(), *pos.key());
|
|
}
|
|
|
|
dht::partition_range partitioned_sstable_set::to_partition_range(const interval_type& i) {
|
|
return dht::partition_range::make(
|
|
{to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())},
|
|
{to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())});
|
|
}
|
|
|
|
dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring_position_view& pos, const interval_type& i) {
|
|
auto lower_bound = [&] {
|
|
if (pos.key()) {
|
|
return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()),
|
|
pos.is_after_key() == dht::ring_position_view::after_key::no);
|
|
} else {
|
|
return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true);
|
|
}
|
|
}();
|
|
auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds()));
|
|
return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound));
|
|
}
|
|
|
|
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, dht::token_range token_range)
|
|
: _schema(std::move(schema))
|
|
, _all(make_lw_shared<sstable_list>())
|
|
, _token_range(std::move(token_range)) {
|
|
}
|
|
|
|
static std::unordered_map<run_id, shared_sstable_run> clone_runs(const std::unordered_map<run_id, shared_sstable_run>& runs) {
|
|
return runs | std::views::transform([] (auto& p) {
|
|
return std::make_pair(p.first, make_lw_shared<sstable_run>(*p.second));
|
|
}) | std::ranges::to<std::unordered_map<run_id, shared_sstable_run>>();
|
|
}
|
|
|
|
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
|
const lw_shared_ptr<sstable_list>& all, const std::unordered_map<run_id, shared_sstable_run>& all_runs, dht::token_range token_range, file_size_stats bytes_on_disk)
|
|
: sstable_set_impl(bytes_on_disk)
|
|
, _schema(schema)
|
|
, _unleveled_sstables(unleveled_sstables)
|
|
, _leveled_sstables(leveled_sstables)
|
|
, _all(make_lw_shared<sstable_list>(*all))
|
|
, _all_runs(clone_runs(all_runs))
|
|
, _token_range(std::move(token_range)) {
|
|
}
|
|
|
|
std::unique_ptr<sstable_set_impl> partitioned_sstable_set::clone() const {
|
|
return std::make_unique<partitioned_sstable_set>(_schema, _unleveled_sstables, _leveled_sstables, _all, _all_runs, _token_range, _file_size_stats);
|
|
}
|
|
|
|
std::vector<shared_sstable> partitioned_sstable_set::select(const dht::partition_range& range) const {
|
|
value_set result;
|
|
for (auto [b, e] = query(range); b != e; ++b) {
|
|
std::ranges::copy(b->second, std::inserter(result, result.end()));
|
|
}
|
|
auto r = _unleveled_sstables;
|
|
r.insert(r.end(), result.begin(), result.end());
|
|
return r;
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list> partitioned_sstable_set::all() const {
|
|
return _all;
|
|
}
|
|
|
|
stop_iteration partitioned_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
|
|
for (auto& sst : *_all) {
|
|
if (func(sst)) {
|
|
return stop_iteration::yes;
|
|
}
|
|
}
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
future<stop_iteration> partitioned_sstable_set::for_each_sstable_gently_until(std::function<future<stop_iteration>(const shared_sstable&)> func) const {
|
|
for (auto& sst : *_all) {
|
|
auto stop = co_await func(sst);
|
|
if (stop) {
|
|
co_return stop_iteration::yes;
|
|
}
|
|
}
|
|
co_return stop_iteration::no;
|
|
}
|
|
|
|
bool partitioned_sstable_set::insert(shared_sstable sst) {
|
|
auto [_, inserted] = _all->insert(sst);
|
|
if (!inserted) {
|
|
// sst is already in the set, no further handling is required
|
|
return false;
|
|
}
|
|
auto size_stats = sst->get_file_size_stats();
|
|
add_file_size_stats(size_stats);
|
|
auto undo_all_insert = defer([&] () {
|
|
_all->erase(sst);
|
|
sub_file_size_stats(size_stats);
|
|
});
|
|
|
|
auto maybe_insert_run_fragment = [this] (const shared_sstable& sst) mutable {
|
|
auto it = _all_runs.find(sst->run_identifier());
|
|
if (it == _all_runs.end()) {
|
|
auto new_run = make_lw_shared<sstable_run>(sst);
|
|
return _all_runs.emplace(sst->run_identifier(), std::move(new_run)).second;
|
|
}
|
|
return it->second->insert(sst);
|
|
};
|
|
|
|
// If sstable doesn't satisfy disjoint invariant, then place it in a new sstable run.
|
|
while (!maybe_insert_run_fragment(sst)) {
|
|
sstlog.warn("Generating a new run identifier for SSTable {} as overlapping was detected when inserting it into SSTable run {}",
|
|
sst->get_filename(), sst->run_identifier());
|
|
sst->generate_new_run_identifier();
|
|
}
|
|
auto undo_all_runs_insert = defer([&] () { _all_runs[sst->run_identifier()]->erase(sst); });
|
|
|
|
if (store_as_unleveled(sst)) {
|
|
_unleveled_sstables.push_back(sst);
|
|
} else {
|
|
_leveled_sstables_change_cnt++;
|
|
_leveled_sstables.add({make_interval(*sst), value_set({sst})});
|
|
}
|
|
undo_all_insert.cancel();
|
|
undo_all_runs_insert.cancel();
|
|
return true;
|
|
}
|
|
|
|
bool partitioned_sstable_set::erase(shared_sstable sst) {
|
|
if (auto it = _all_runs.find(sst->run_identifier()); it != _all_runs.end()) {
|
|
it->second->erase(sst);
|
|
if (it->second->empty()) {
|
|
_all_runs.erase(it);
|
|
}
|
|
}
|
|
auto ret = _all->erase(sst) != 0;
|
|
if (ret) {
|
|
sub_file_size_stats(sst->get_file_size_stats());
|
|
}
|
|
if (store_as_unleveled(sst)) {
|
|
_unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end());
|
|
} else {
|
|
_leveled_sstables_change_cnt++;
|
|
_leveled_sstables.subtract({make_interval(*sst), value_set({sst})});
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
size_t
|
|
partitioned_sstable_set::size() const noexcept {
|
|
return _all->size();
|
|
}
|
|
|
|
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
|
schema_ptr _schema;
|
|
const std::vector<shared_sstable>& _unleveled_sstables;
|
|
const interval_map_type& _leveled_sstables;
|
|
const uint64_t& _leveled_sstables_change_cnt;
|
|
uint64_t _last_known_leveled_sstables_change_cnt;
|
|
map_iterator _it;
|
|
private:
|
|
dht::ring_position_ext next_position(map_iterator it) {
|
|
if (it == _leveled_sstables.end()) {
|
|
return dht::ring_position_view::max();
|
|
} else {
|
|
auto&& next_position = partitioned_sstable_set::to_ring_position(it->first.lower());
|
|
return dht::ring_position_ext(next_position, dht::ring_position_ext::after_key(!boost::icl::is_left_closed(it->first.bounds())));
|
|
}
|
|
}
|
|
static bool is_before_interval(const dht::compatible_ring_position_or_view& crp, const interval_type& interval) {
|
|
if (boost::icl::is_left_closed(interval.bounds())) {
|
|
return crp < interval.lower();
|
|
} else {
|
|
return crp <= interval.lower();
|
|
}
|
|
}
|
|
void maybe_invalidate_iterator(const dht::compatible_ring_position_or_view& crp) {
|
|
if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) {
|
|
_it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp));
|
|
_last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt;
|
|
}
|
|
}
|
|
public:
|
|
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
|
const uint64_t& leveled_sstables_change_cnt)
|
|
: _schema(std::move(schema))
|
|
, _unleveled_sstables(unleveled_sstables)
|
|
, _leveled_sstables(leveled_sstables)
|
|
, _leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
|
, _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
|
, _it(leveled_sstables.begin()) {
|
|
}
|
|
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_ext> select(const selector_pos& s) override {
|
|
const dht::ring_position_view& pos = s.pos;
|
|
auto crp = dht::compatible_ring_position_or_view(*_schema, pos);
|
|
auto ssts = _unleveled_sstables;
|
|
using namespace dht;
|
|
|
|
maybe_invalidate_iterator(crp);
|
|
|
|
while (_it != _leveled_sstables.end()) {
|
|
if (boost::icl::contains(_it->first, crp)) {
|
|
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
|
return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it)));
|
|
}
|
|
// We don't want to skip current interval if pos lies before it.
|
|
if (is_before_interval(crp, _it->first)) {
|
|
return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it));
|
|
}
|
|
_it++;
|
|
}
|
|
return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max());
|
|
}
|
|
};
|
|
|
|
time_series_sstable_set::time_series_sstable_set(schema_ptr schema, bool enable_optimized_twcs_queries)
|
|
: _schema(std::move(schema))
|
|
, _reversed_schema(_schema->make_reversed())
|
|
, _enable_optimized_twcs_queries(enable_optimized_twcs_queries)
|
|
, _sstables(make_lw_shared<container_t>(position_in_partition::less_compare(*_schema)))
|
|
, _sstables_reversed(make_lw_shared<container_t>(position_in_partition::less_compare(*_reversed_schema)))
|
|
{}
|
|
|
|
time_series_sstable_set::time_series_sstable_set(const time_series_sstable_set& s)
|
|
: sstable_set_impl(s)
|
|
, _schema(s._schema)
|
|
, _reversed_schema(s._reversed_schema)
|
|
, _enable_optimized_twcs_queries(s._enable_optimized_twcs_queries)
|
|
, _sstables(make_lw_shared(*s._sstables))
|
|
, _sstables_reversed(make_lw_shared(*s._sstables_reversed))
|
|
{}
|
|
|
|
std::unique_ptr<sstable_set_impl> time_series_sstable_set::clone() const {
|
|
return std::make_unique<time_series_sstable_set>(*this);
|
|
}
|
|
|
|
std::vector<shared_sstable> time_series_sstable_set::select(const dht::partition_range& range) const {
|
|
return *_sstables | std::views::values | std::ranges::to<std::vector>();
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list> time_series_sstable_set::all() const {
|
|
return make_lw_shared<const sstable_list>(*_sstables | std::views::values | std::ranges::to<sstable_list>());
|
|
}
|
|
|
|
size_t
|
|
time_series_sstable_set::size() const noexcept {
|
|
return _sstables->size();
|
|
}
|
|
|
|
stop_iteration time_series_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
|
|
for (auto& entry : *_sstables) {
|
|
if (func(entry.second)) {
|
|
return stop_iteration::yes;
|
|
}
|
|
}
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
future<stop_iteration> time_series_sstable_set::for_each_sstable_gently_until(std::function<future<stop_iteration>(const shared_sstable&)> func) const {
|
|
for (const auto& [pos, sst] : *_sstables) {
|
|
auto stop = co_await func(sst);
|
|
if (stop) {
|
|
co_return stop_iteration::yes;
|
|
}
|
|
}
|
|
co_return stop_iteration::no;
|
|
}
|
|
|
|
// O(log n)
|
|
bool time_series_sstable_set::insert(shared_sstable sst) {
|
|
try {
|
|
auto min_pos = sst->min_position();
|
|
auto max_pos_reversed = sst->max_position().reversed();
|
|
_sstables->emplace(std::move(min_pos), sst);
|
|
add_file_size_stats(sst->get_file_size_stats());
|
|
_sstables_reversed->emplace(std::move(max_pos_reversed), std::move(sst));
|
|
} catch (...) {
|
|
erase(sst);
|
|
throw;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// O(n) worst case, but should be close to O(log n) most of the time
|
|
bool time_series_sstable_set::erase(shared_sstable sst) {
|
|
bool found;
|
|
{
|
|
auto [first, last] = _sstables->equal_range(sst->min_position());
|
|
auto it = std::find_if(first, last,
|
|
[&sst] (const std::pair<position_in_partition, shared_sstable>& p) { return sst == p.second; });
|
|
found = it != last;
|
|
if (found) {
|
|
_sstables->erase(it);
|
|
sub_file_size_stats(sst->get_file_size_stats());
|
|
}
|
|
}
|
|
|
|
auto [first, last] = _sstables_reversed->equal_range(sst->max_position().reversed());
|
|
auto it = std::find_if(first, last,
|
|
[&sst] (const std::pair<position_in_partition, shared_sstable>& p) { return sst == p.second; });
|
|
if (it != last) {
|
|
_sstables_reversed->erase(it);
|
|
}
|
|
return found;
|
|
}
|
|
|
|
sstable_set_impl::selector_and_schema_t time_series_sstable_set::make_incremental_selector() const {
|
|
struct selector : public incremental_selector_impl {
|
|
const time_series_sstable_set& _set;
|
|
|
|
selector(const time_series_sstable_set& set) : _set(set) {}
|
|
|
|
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_ext>
|
|
select(const selector_pos&) override {
|
|
return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _set.select(), dht::ring_position_view::max());
|
|
}
|
|
};
|
|
|
|
return std::make_tuple(std::make_unique<selector>(*this), std::cref(*_schema));
|
|
}
|
|
|
|
// Queue of readers of sstables in a time_series_sstable_set,
|
|
// returning readers in order of the sstables' clustering key lower bounds.
|
|
//
|
|
// For sstable `s` we take `s.min_position()` as the lower bound for non-reversed reads,
|
|
// and `s.max_position().reversed()` for reversed reads (in reversed reads comparisons
|
|
// are performed using a reversed schema). Let `lower_bound(s)` denote this lower bound
|
|
// in the comments below.
|
|
//
|
|
// Skips sstables that don't pass the supplied filter.
|
|
// Guarantees that the filter will be called at most once for each sstable;
|
|
// exactly once after all sstables are iterated over.
|
|
//
|
|
// The readers are created lazily on-demand using the supplied factory function.
|
|
//
|
|
// Additionally to the sstable readers, the queue always returns one ``dummy reader''
|
|
// that contains only the partition_start/end markers. This dummy reader is always
|
|
// returned as the first on the first `pop(b)` call for any `b`. Its upper bound
|
|
// is `before_all_clustered_rows`.
|
|
class sstable_position_reader_queue : public position_reader_queue {
|
|
using container_t = time_series_sstable_set::container_t;
|
|
using value_t = container_t::value_type;
|
|
|
|
schema_ptr _query_schema;
|
|
lw_shared_ptr<const container_t> _sstables;
|
|
|
|
// Iterates over sstables in order of their lower bounds.
|
|
// Invariant: _it == _end or filter(it->second) == true
|
|
container_t::const_iterator _it;
|
|
const container_t::const_iterator _end;
|
|
|
|
position_in_partition::tri_compare _cmp;
|
|
|
|
std::function<mutation_reader(sstable&)> _create_reader;
|
|
std::function<bool(const sstable&)> _filter;
|
|
|
|
// After construction contains a reader which returns only the partition
|
|
// start (and end, if not in forwarding mode) markers. This is the first
|
|
// returned reader.
|
|
std::optional<mutation_reader> _dummy_reader;
|
|
|
|
bool _reversed;
|
|
|
|
mutation_reader create_reader(sstable& sst) {
|
|
return _create_reader(sst);
|
|
}
|
|
|
|
bool filter(const sstable& sst) const {
|
|
return _filter(sst);
|
|
}
|
|
|
|
public:
|
|
// Assumes that `create_reader` returns readers that emit only fragments from partition `pk`.
|
|
//
|
|
// For reversed reads `query_schema` must be reversed (see docs/dev/reverse-reads.md).
|
|
sstable_position_reader_queue(const time_series_sstable_set& set,
|
|
schema_ptr query_schema,
|
|
std::function<mutation_reader(sstable&)> create_reader,
|
|
std::function<bool(const sstable&)> filter,
|
|
partition_key pk,
|
|
reader_permit permit,
|
|
streamed_mutation::forwarding fwd_sm,
|
|
bool reversed)
|
|
: _query_schema(std::move(query_schema))
|
|
, _sstables(reversed ? set._sstables_reversed : set._sstables)
|
|
, _it(_sstables->begin())
|
|
, _end(_sstables->end())
|
|
, _cmp(*_query_schema)
|
|
, _create_reader(std::move(create_reader))
|
|
, _filter(std::move(filter))
|
|
, _dummy_reader(make_mutation_reader_from_mutations(_query_schema,
|
|
std::move(permit), mutation(_query_schema, std::move(pk)), _query_schema->full_slice(), fwd_sm))
|
|
, _reversed(reversed)
|
|
{
|
|
while (_it != _end && !this->filter(*_it->second)) {
|
|
++_it;
|
|
}
|
|
}
|
|
|
|
virtual ~sstable_position_reader_queue() override = default;
|
|
|
|
// If the dummy reader was not yet returned, return the dummy reader.
|
|
// Otherwise, open sstable readers to all sstables with smallest lower_bound() from the set
|
|
// {S: filter(S) and prev_min_pos < lower_bound(S) <= bound}, where `prev_min_pos` is the lower_bound()
|
|
// of the sstables returned from last non-empty pop() or -infinity if no sstables were previously returned,
|
|
// and `filter` is the filtering function provided when creating the queue.
|
|
//
|
|
// Note that there may be multiple returned sstables (all with the same position) or none.
|
|
//
|
|
// Note that lower_bound(S) is global for sstable S; if the readers are used to inspect specific partitions,
|
|
// the minimal positions in these partitions might actually all be greater than lower_bound(S).
|
|
virtual std::vector<reader_and_upper_bound> pop(position_in_partition_view bound) override {
|
|
if (empty(bound)) {
|
|
return {};
|
|
}
|
|
|
|
if (_dummy_reader) {
|
|
std::vector<reader_and_upper_bound> ret;
|
|
ret.emplace_back(*std::exchange(_dummy_reader, std::nullopt), position_in_partition::before_all_clustered_rows());
|
|
return ret;
|
|
}
|
|
|
|
// by !empty(bound) and `_it` invariant:
|
|
// _it != _end, _it->first <= bound, and filter(*_it->second) == true
|
|
scylla_assert(_cmp(_it->first, bound) <= 0);
|
|
// we don't scylla_assert(filter(*_it->second)) due to the requirement that `filter` is called at most once for each sstable
|
|
|
|
// Find all sstables with the same position as `_it` (they form a contiguous range in the container).
|
|
auto next = std::find_if(std::next(_it), _end, [this] (const value_t& v) { return _cmp(v.first, _it->first) != 0; });
|
|
|
|
// We'll return all sstables in the range [_it, next) which pass the filter
|
|
std::vector<reader_and_upper_bound> ret;
|
|
do {
|
|
// loop invariant: filter(*_it->second) == true
|
|
auto upper_bound = _reversed ? _it->second->min_position().reversed() : _it->second->max_position();
|
|
ret.emplace_back(create_reader(*_it->second), std::move(upper_bound));
|
|
// restore loop invariant
|
|
do {
|
|
++_it;
|
|
} while (_it != next && !filter(*_it->second));
|
|
} while (_it != next);
|
|
|
|
// filter(*_it->second) wasn't called yet since the inner `do..while` above checks _it != next first
|
|
// restore the `_it` invariant before returning
|
|
while (_it != _end && !filter(*_it->second)) {
|
|
++_it;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
// If the dummy reader was not returned yet, returns false.
|
|
// Otherwise checks if the set of sstables {S: filter(S) and prev_min_pos < lower_bound(S) <= bound}
|
|
// is empty (see pop() for definition of `prev_min_pos`).
|
|
virtual bool empty(position_in_partition_view bound) const override {
|
|
return !_dummy_reader && (_it == _end || _cmp(_it->first, bound) > 0);
|
|
}
|
|
|
|
virtual future<> close() noexcept override {
|
|
_it = _end;
|
|
return make_ready_future<>();
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<position_reader_queue> time_series_sstable_set::make_position_reader_queue(
|
|
std::function<mutation_reader(sstable&)> create_reader,
|
|
std::function<bool(const sstable&)> filter,
|
|
partition_key pk, schema_ptr query_schema, reader_permit permit,
|
|
streamed_mutation::forwarding fwd_sm, bool reversed) const {
|
|
return std::make_unique<sstable_position_reader_queue>(*this,
|
|
std::move(query_schema), std::move(create_reader), std::move(filter),
|
|
std::move(pk), std::move(permit), fwd_sm, reversed);
|
|
}
|
|
|
|
sstable_set_impl::selector_and_schema_t partitioned_sstable_set::make_incremental_selector() const {
|
|
return std::make_tuple(std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt), std::cref(*_schema));
|
|
}
|
|
|
|
sstable_set make_partitioned_sstable_set(schema_ptr schema, dht::token_range token_range) {
|
|
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, std::move(token_range)));
|
|
}
|
|
|
|
using sstable_reader_factory_type = std::function<mutation_reader(shared_sstable&, const dht::partition_range& pr)>;
|
|
|
|
static logging::logger irclogger("incremental_reader_selector");
|
|
|
|
// Incremental selector implementation for combined_mutation_reader that
|
|
// selects readers on-demand as the read progresses through the token
|
|
// range.
|
|
class incremental_reader_selector : public reader_selector {
|
|
const dht::partition_range* _pr;
|
|
lw_shared_ptr<const sstable_set> _sstables;
|
|
tracing::trace_state_ptr _trace_state;
|
|
std::optional<sstable_set::incremental_selector> _selector;
|
|
std::unordered_set<generation_type> _read_sstable_gens;
|
|
sstable_reader_factory_type _fn;
|
|
|
|
mutation_reader create_reader(shared_sstable sst) {
|
|
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
|
return _fn(sst, *_pr);
|
|
}
|
|
|
|
dht::ring_position_view pr_end() const {
|
|
return dht::ring_position_view::for_range_end(*_pr);
|
|
}
|
|
|
|
bool end_of_stream() const {
|
|
return _selector_position.is_max() || dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) > 0;
|
|
}
|
|
public:
|
|
explicit incremental_reader_selector(schema_ptr s,
|
|
lw_shared_ptr<const sstable_set> sstables,
|
|
const dht::partition_range& pr,
|
|
tracing::trace_state_ptr trace_state,
|
|
sstable_reader_factory_type fn)
|
|
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min(), sstables->size())
|
|
, _pr(&pr)
|
|
, _sstables(std::move(sstables))
|
|
, _trace_state(std::move(trace_state))
|
|
, _selector(_sstables->make_incremental_selector())
|
|
, _fn(std::move(fn)) {
|
|
|
|
irclogger.trace("{}: created for range: {} with {} sstables",
|
|
fmt::ptr(this),
|
|
*_pr,
|
|
_sstables->size());
|
|
}
|
|
|
|
incremental_reader_selector(const incremental_reader_selector&) = delete;
|
|
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
|
|
|
|
incremental_reader_selector(incremental_reader_selector&&) = delete;
|
|
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
|
|
|
|
virtual std::vector<mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
|
|
irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
|
|
|
|
auto readers = std::vector<mutation_reader>();
|
|
|
|
do {
|
|
auto selection = _selector->select({_selector_position, _pr});
|
|
_selector_position = selection.next_position;
|
|
|
|
irclogger.trace("{}: {} sstables to consider, advancing selector to {}, eos={}", fmt::ptr(this), selection.sstables.size(),
|
|
_selector_position, end_of_stream());
|
|
|
|
readers.clear();
|
|
for (auto& sst : selection.sstables) {
|
|
if (_read_sstable_gens.emplace(sst->generation()).second) {
|
|
readers.push_back(create_reader(sst));
|
|
}
|
|
}
|
|
} while (!end_of_stream() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
|
|
|
irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size());
|
|
|
|
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
|
|
// sstables when done selecting.
|
|
if (_selector_position.is_max()) {
|
|
_selector.reset();
|
|
}
|
|
|
|
return readers;
|
|
}
|
|
|
|
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
|
|
_pr = ≺
|
|
|
|
auto pos = dht::ring_position_view::for_range_start(*_pr);
|
|
|
|
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
|
|
return create_new_readers(pos);
|
|
}
|
|
// If selector position Y is contained in new range [X, Z], then we should try selecting new
|
|
// sstables since it might have sstables that overlap with that range.
|
|
if (!_selector_position.is_max() && dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) <= 0) {
|
|
return create_new_readers(std::nullopt);
|
|
}
|
|
|
|
return {};
|
|
}
|
|
};
|
|
|
|
// The returned function uses the bloom filter to check whether the given sstable
|
|
// may have a partition given by the ring position `pos`.
|
|
//
|
|
// Returning `false` means the sstable doesn't have such a partition.
|
|
// Returning `true` means it may, i.e. we don't know whether or not it does.
|
|
//
|
|
// Assumes the given `pos` and `schema` are alive during the function's lifetime.
|
|
static std::predicate<const sstable&> auto
|
|
make_pk_filter(const dht::ring_position& pos, const utils::hashed_key& hash, const schema& schema) {
|
|
return [&pos, hash, cmp = dht::ring_position_comparator(schema)] (const sstable& sst) {
|
|
return cmp(pos, sst.get_first_decorated_key()) >= 0 &&
|
|
cmp(pos, sst.get_last_decorated_key()) <= 0 &&
|
|
sst.filter_has_key(hash);
|
|
};
|
|
}
|
|
|
|
const sstable_predicate& default_sstable_predicate() {
|
|
static const sstable_predicate predicate = [] (const sstable&) { return true; };
|
|
return predicate;
|
|
}
|
|
|
|
static std::predicate<const sstable&> auto
|
|
make_sstable_filter(const dht::ring_position& pos, const utils::hashed_key& hash, const schema& schema, const sstable_predicate& predicate) {
|
|
return [pk_filter = make_pk_filter(pos, hash, schema), &predicate] (const sstable& sst) {
|
|
return predicate(sst) && pk_filter(sst);
|
|
};
|
|
}
|
|
|
|
// Filter out sstables for reader using bloom filter and supplied predicate
|
|
static std::vector<shared_sstable>
|
|
filter_sstable_for_reader(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos, const utils::hashed_key& hash, const sstable_predicate& predicate) {
|
|
auto filter = [_filter = make_sstable_filter(pos, hash, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); };
|
|
std::erase_if(sstables, filter);
|
|
return std::move(sstables);
|
|
}
|
|
|
|
// Filter out sstables for reader using sstable metadata that keeps track
|
|
// of a range for each clustering component.
|
|
static std::vector<shared_sstable>
|
|
filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, replica::column_family& cf, const schema_ptr& schema,
|
|
const query::partition_slice& slice) {
|
|
// no clustering filtering is applied if schema defines no clustering key or
|
|
// compaction strategy thinks it will not benefit from such an optimization,
|
|
// or the partition_slice includes static columns.
|
|
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
|
|
return std::move(sstables);
|
|
}
|
|
|
|
replica::cf_stats* stats = cf.cf_stats();
|
|
stats->clustering_filter_count++;
|
|
stats->sstables_checked_by_clustering_filter += sstables.size();
|
|
|
|
auto ck_filtering_all_ranges = slice.get_all_ranges();
|
|
// fast path to include all sstables if only one full range was specified.
|
|
// For example, this happens if query only specifies a partition key.
|
|
if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) {
|
|
stats->clustering_filter_fast_path_count++;
|
|
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
|
return std::move(sstables);
|
|
}
|
|
|
|
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) {
|
|
return sst->may_contain_rows(ranges);
|
|
});
|
|
sstables.erase(skipped, sstables.end());
|
|
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
|
|
|
return std::move(sstables);
|
|
}
|
|
|
|
std::vector<frozen_sstable_run>
|
|
sstable_set_impl::all_sstable_runs() const {
|
|
auto all_sstables = all();
|
|
std::unordered_map<sstables::run_id, sstable_run> runs_m;
|
|
std::vector<frozen_sstable_run> all_runs;
|
|
|
|
for (auto&& sst : *all_sstables) {
|
|
// When a run cannot accept sstable due to overlapping, treat the rejected sstable
|
|
// as a single-fragment run.
|
|
if (!runs_m[sst->run_identifier()].insert(sst)) {
|
|
all_runs.push_back(make_lw_shared<const sstable_run>(sst));
|
|
}
|
|
}
|
|
for (auto&& r : runs_m | std::views::values) {
|
|
all_runs.push_back(make_lw_shared<const sstable_run>(std::move(r)));
|
|
}
|
|
return all_runs;
|
|
}
|
|
|
|
mutation_reader
|
|
sstable_set_impl::create_single_key_sstable_reader(
|
|
replica::column_family* cf,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
utils::estimated_histogram& sstable_histogram,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const
|
|
{
|
|
const auto& pos = pr.start()->value();
|
|
auto hash = utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(*schema, *pos.key())));
|
|
auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, hash, predicate);
|
|
auto num_sstables = selected_sstables.size();
|
|
if (!num_sstables) {
|
|
return make_empty_mutation_reader(schema, permit);
|
|
}
|
|
auto readers = filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
|
|
| std::views::transform([&] (const shared_sstable& sstable) {
|
|
tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
|
return sstable->make_reader(schema, permit, pr, slice, trace_state, fwd, mutation_reader::forwarding::yes,
|
|
default_read_monitor(), integrity, &hash);
|
|
})
|
|
| std::ranges::to<std::vector<mutation_reader>>();
|
|
|
|
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
|
|
// we want to emit partition_start/end if no rows were found,
|
|
// to prevent https://github.com/scylladb/scylla/issues/3552.
|
|
//
|
|
// Use `make_mutation_reader_from_mutations` with an empty mutation to emit
|
|
// the partition_start/end pair and append it to the list of readers passed
|
|
// to make_combined_reader to ensure partition_start/end are emitted even if
|
|
// all sstables actually containing the partition were filtered.
|
|
auto num_readers = readers.size();
|
|
if (num_readers != num_sstables) {
|
|
readers.push_back(make_mutation_reader_from_mutations(schema, permit, mutation(schema, *pos.key()), slice, fwd));
|
|
}
|
|
sstable_histogram.add(num_readers);
|
|
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
}
|
|
|
|
mutation_reader
|
|
time_series_sstable_set::create_single_key_sstable_reader(
|
|
replica::column_family* cf,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
utils::estimated_histogram& sstable_histogram,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd_sm,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const {
|
|
const auto& pos = pr.start()->value();
|
|
// First check if the optimized algorithm for TWCS single partition queries can be applied.
|
|
// Multiple conditions must be satisfied:
|
|
// 1. The sstables must be sufficiently modern so they contain the min/max column metadata.
|
|
// 2. The schema cannot have static columns, since we're going to be opening new readers
|
|
// into new sstables in the middle of the partition query. TWCS sstables will usually pass
|
|
// this condition.
|
|
// 3. The sstables cannot have partition tombstones for the same reason as above.
|
|
// TWCS sstables will usually pass this condition.
|
|
// 4. The optimized query path must be enabled.
|
|
using sst_entry = std::pair<position_in_partition, shared_sstable>;
|
|
if (!_enable_optimized_twcs_queries
|
|
|| schema->has_static_columns()
|
|
|| std::any_of(_sstables->begin(), _sstables->end(),
|
|
[] (const sst_entry& e) {
|
|
return e.second->get_version() < sstable_version_types::md
|
|
|| e.second->may_have_partition_tombstones();
|
|
})) {
|
|
// Some of the conditions were not satisfied so we use the standard query path.
|
|
return sstable_set_impl::create_single_key_sstable_reader(
|
|
cf, std::move(schema), std::move(permit), sstable_histogram,
|
|
pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate, integrity);
|
|
}
|
|
|
|
auto hash = utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(*schema, *pos.key())));
|
|
auto sst_filter = make_sstable_filter(pos, hash, *schema, predicate);
|
|
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); });
|
|
if (it == _sstables->end()) {
|
|
// No sstables contain data for the queried partition.
|
|
return make_empty_mutation_reader(std::move(schema), std::move(permit));
|
|
}
|
|
|
|
auto& stats = *cf->cf_stats();
|
|
stats.clustering_filter_count++;
|
|
|
|
auto create_reader = [schema, permit, &pr, &slice, trace_state, fwd_sm, hash] (sstable& sst) {
|
|
return sst.make_reader(schema, permit, pr, slice, trace_state, fwd_sm, mutation_reader::forwarding::yes,
|
|
default_read_monitor(), integrity_check::no, &hash);
|
|
};
|
|
|
|
auto pk_filter = make_pk_filter(pos, hash, *schema);
|
|
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };
|
|
|
|
// We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that
|
|
// the filter is going to be called at most once for each sstable and exactly once after
|
|
// the queue is exhausted. We use that fact to gather statistics.
|
|
auto filter = [pk_filter = std::move(pk_filter), ck_filter = std::move(ck_filter), &stats]
|
|
(const sstable& sst) {
|
|
if (!pk_filter(sst)) {
|
|
return false;
|
|
}
|
|
|
|
++stats.sstables_checked_by_clustering_filter;
|
|
if (ck_filter(sst)) {
|
|
++stats.surviving_sstables_after_clustering_filter;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
};
|
|
|
|
auto reversed = slice.is_reversed();
|
|
// Note that `sstable_position_reader_queue` always includes a reader which emits a `partition_start` fragment,
|
|
// guaranteeing that the reader we return emits it as well; this helps us avoid the problem from #3552.
|
|
return make_clustering_combined_reader(
|
|
schema, permit, fwd_sm,
|
|
make_position_reader_queue(
|
|
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm, reversed));
|
|
}
|
|
|
|
compound_sstable_set::compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets)
|
|
: _schema(std::move(schema))
|
|
, _sets(std::move(sets)) {
|
|
}
|
|
|
|
std::unique_ptr<sstable_set_impl> compound_sstable_set::clone() const {
|
|
std::vector<lw_shared_ptr<sstable_set>> cloned_sets;
|
|
cloned_sets.reserve(_sets.size());
|
|
for (const auto& set : _sets) {
|
|
// implicit clone by using sstable_set's copy ctor.
|
|
auto cloned_set = make_lw_shared(*set);
|
|
cloned_sets.push_back(std::move(cloned_set));
|
|
}
|
|
return std::make_unique<compound_sstable_set>(_schema, std::move(cloned_sets));
|
|
}
|
|
|
|
std::vector<shared_sstable> compound_sstable_set::select(const dht::partition_range& range) const {
|
|
std::vector<shared_sstable> ret;
|
|
for (auto& set : _sets) {
|
|
auto ssts = set->select(range);
|
|
if (ret.empty()) {
|
|
ret = std::move(ssts);
|
|
} else {
|
|
ret.reserve(ret.size() + ssts.size());
|
|
std::move(ssts.begin(), ssts.end(), std::back_inserter(ret));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
std::vector<frozen_sstable_run> compound_sstable_set::all_sstable_runs() const {
|
|
std::vector<frozen_sstable_run> ret;
|
|
for (auto& set : _sets) {
|
|
auto runs = set->all_sstable_runs();
|
|
if (ret.empty()) {
|
|
ret = std::move(runs);
|
|
} else {
|
|
ret.reserve(ret.size() + runs.size());
|
|
std::move(runs.begin(), runs.end(), std::back_inserter(ret));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
lw_shared_ptr<const sstable_list> compound_sstable_set::all() const {
|
|
auto sets = _sets;
|
|
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; });
|
|
auto non_empty_set_count = std::distance(sets.begin(), it);
|
|
|
|
if (!non_empty_set_count) {
|
|
return make_lw_shared<sstable_list>();
|
|
}
|
|
// optimize for common case where primary set contains sstables, but secondary one is empty for most of the time.
|
|
if (non_empty_set_count == 1) {
|
|
const auto& non_empty_set = *std::begin(sets);
|
|
return non_empty_set->all();
|
|
}
|
|
|
|
auto ret = make_lw_shared<sstable_list>();
|
|
for (auto& set : std::ranges::subrange(sets.begin(), it)) {
|
|
auto ssts = set->all();
|
|
ret->reserve(ret->size() + ssts->size());
|
|
ret->insert(ssts->begin(), ssts->end());
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
stop_iteration compound_sstable_set::for_each_sstable_until(std::function<stop_iteration(const shared_sstable&)> func) const {
|
|
for (auto& set : _sets) {
|
|
if (set->for_each_sstable_until([&func] (const shared_sstable& sst) { return func(sst); })) {
|
|
return stop_iteration::yes;
|
|
}
|
|
}
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
future<stop_iteration> compound_sstable_set::for_each_sstable_gently_until(std::function<future<stop_iteration>(const shared_sstable&)> func) const {
|
|
for (auto& set : _sets) {
|
|
auto stop = co_await set->for_each_sstable_gently_until([&func] (const shared_sstable& sst) { return func(sst); });
|
|
if (stop) {
|
|
co_return stop_iteration::yes;
|
|
}
|
|
}
|
|
co_return stop_iteration::no;
|
|
}
|
|
|
|
bool compound_sstable_set::insert(shared_sstable sst) {
|
|
throw_with_backtrace<std::bad_function_call>();
|
|
}
|
|
bool compound_sstable_set::erase(shared_sstable sst) {
|
|
throw_with_backtrace<std::bad_function_call>();
|
|
}
|
|
|
|
size_t
|
|
compound_sstable_set::size() const noexcept {
|
|
return std::ranges::fold_left(_sets | std::views::transform(std::mem_fn(&sstable_set::size)), size_t(0), std::plus{});
|
|
}
|
|
|
|
file_size_stats
|
|
compound_sstable_set::get_file_size_stats() const noexcept {
|
|
return std::ranges::fold_left(_sets | std::views::transform(std::mem_fn(&sstable_set::get_file_size_stats)), file_size_stats{}, std::plus{});
|
|
}
|
|
|
|
class compound_sstable_set::incremental_selector : public incremental_selector_impl {
|
|
const schema& _schema;
|
|
const std::vector<lw_shared_ptr<sstable_set>>& _sets;
|
|
std::vector<sstable_set::incremental_selector> _selectors;
|
|
private:
|
|
std::vector<sstable_set::incremental_selector> make_selectors(const std::vector<lw_shared_ptr<sstable_set>>& sets) {
|
|
return _sets | std::views::transform([] (const auto& set) {
|
|
return set->make_incremental_selector();
|
|
}) | std::ranges::to<std::vector<sstable_set::incremental_selector>>();
|
|
}
|
|
public:
|
|
incremental_selector(const schema& schema, const std::vector<lw_shared_ptr<sstable_set>>& sets)
|
|
: _schema(schema)
|
|
, _sets(sets)
|
|
, _selectors(make_selectors(sets)) {
|
|
}
|
|
|
|
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_ext> select(const selector_pos& pos) override {
|
|
// Return all sstables selected on the requested position from all selectors.
|
|
std::vector<shared_sstable> sstables;
|
|
// Return the lowest next position from all selectors, such that this function will be called again to select the
|
|
// lowest next position from the selector which previously returned it.
|
|
dht::ring_position_view lowest_next_position = dht::ring_position_view::max();
|
|
// Always return minimum singular range, such that incremental_selector::select() will always call this function,
|
|
// which in turn will call the selectors to decide on whether or not any select should be actually performed.
|
|
const dht::partition_range current_range = dht::partition_range::make_singular(dht::ring_position::min());
|
|
auto cmp = dht::ring_position_comparator(_schema);
|
|
|
|
for (auto& selector : _selectors) {
|
|
auto ret = selector.select(pos);
|
|
sstables.reserve(sstables.size() + ret.sstables.size());
|
|
std::copy(ret.sstables.begin(), ret.sstables.end(), std::back_inserter(sstables));
|
|
if (cmp(ret.next_position, lowest_next_position) < 0) {
|
|
lowest_next_position = ret.next_position;
|
|
}
|
|
}
|
|
|
|
return std::make_tuple(std::move(current_range), std::move(sstables), dht::ring_position_ext(lowest_next_position));
|
|
}
|
|
};
|
|
|
|
sstable_set_impl::selector_and_schema_t compound_sstable_set::make_incremental_selector() const {
|
|
if (_sets.empty()) {
|
|
// compound_sstable_set must manage one sstable set at least.
|
|
abort();
|
|
}
|
|
auto sets = _sets;
|
|
auto it = std::partition(sets.begin(), sets.end(), [] (const lw_shared_ptr<sstable_set>& set) { return set->size() > 0; });
|
|
auto non_empty_set_count = std::distance(sets.begin(), it);
|
|
|
|
// optimize for common case where only primary set contains sstables, so its selector can be built without an interposer.
|
|
// optimization also applies when no set contains sstable, so any set can be picked as selection will be a no-op anyway.
|
|
if (non_empty_set_count <= 1) {
|
|
const auto& set = sets.front();
|
|
return set->_impl->make_incremental_selector();
|
|
}
|
|
return std::make_tuple(std::make_unique<incremental_selector>(*_schema, _sets), std::cref(*_schema));
|
|
}
|
|
|
|
sstable_set make_compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets) {
|
|
return sstable_set(std::make_unique<compound_sstable_set>(schema, std::move(sets)));
|
|
}
|
|
|
|
mutation_reader
|
|
compound_sstable_set::create_single_key_sstable_reader(
|
|
replica::column_family* cf,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
utils::estimated_histogram& sstable_histogram,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const {
|
|
auto sets = _sets;
|
|
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; });
|
|
auto non_empty_set_count = std::distance(sets.begin(), it);
|
|
|
|
if (!non_empty_set_count) {
|
|
return make_empty_mutation_reader(schema, permit);
|
|
}
|
|
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
|
|
if (non_empty_set_count == 1) {
|
|
const auto& non_empty_set = *std::begin(sets);
|
|
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate, integrity);
|
|
}
|
|
|
|
auto readers = std::ranges::subrange(sets.begin(), it)
|
|
| std::views::transform([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
|
|
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate, integrity);
|
|
})
|
|
| std::ranges::to<std::vector<mutation_reader>>();
|
|
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
|
|
}
|
|
|
|
mutation_reader
|
|
sstable_set::create_single_key_sstable_reader(
|
|
replica::column_family* cf,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
utils::estimated_histogram& sstable_histogram,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
const sstable_predicate& predicate,
|
|
sstables::integrity_check integrity) const {
|
|
scylla_assert(pr.is_singular() && pr.start()->value().has_key());
|
|
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
|
|
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
|
|
}
|
|
|
|
class auto_closed_sstable_reader final : public mutation_reader::impl {
|
|
shared_sstable _sst;
|
|
mutation_reader_opt _reader;
|
|
private:
|
|
future<> maybe_auto_close_sstable_reader(const dht::partition_range& pr) {
|
|
if (!_sst) {
|
|
co_return;
|
|
}
|
|
|
|
auto pos = dht::ring_position_view::for_range_start(pr);
|
|
auto last_pos_in_reader = dht::ring_position_view(_sst->get_last_decorated_key());
|
|
|
|
// If we're fast forwarding past the underlying reader, let's close it
|
|
// and replace it by an empty reader.
|
|
if (dht::ring_position_tri_compare(*_schema, pos, last_pos_in_reader) > 0) {
|
|
co_await _reader->close();
|
|
_reader = make_empty_mutation_reader(_schema, _permit);
|
|
_sst = nullptr;
|
|
}
|
|
}
|
|
public:
|
|
auto_closed_sstable_reader(shared_sstable sst,
|
|
mutation_reader sst_reader,
|
|
reader_permit permit)
|
|
: impl(sst_reader.schema(), std::move(permit))
|
|
, _sst(std::move(sst))
|
|
, _reader(std::move(sst_reader)) {
|
|
}
|
|
virtual future<> fill_buffer() override {
|
|
return _reader->fill_buffer().then([this] {
|
|
_reader->move_buffer_content_to(*this);
|
|
_end_of_stream = _reader->is_end_of_stream();
|
|
});
|
|
}
|
|
future<> fast_forward_to(const dht::partition_range& pr) override {
|
|
clear_buffer();
|
|
|
|
co_await maybe_auto_close_sstable_reader(pr);
|
|
|
|
_end_of_stream = false;
|
|
co_await _reader->fast_forward_to(pr);
|
|
}
|
|
virtual future<> fast_forward_to(position_range pr) override {
|
|
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
|
|
}
|
|
virtual future<> next_partition() override {
|
|
clear_buffer_to_next_partition();
|
|
if (is_buffer_empty() && !is_end_of_stream()) {
|
|
return _reader->next_partition();
|
|
}
|
|
return make_ready_future<>();
|
|
}
|
|
virtual future<> close() noexcept override {
|
|
return _reader->close();
|
|
}
|
|
};
|
|
|
|
mutation_reader make_auto_closed_sstable_reader(shared_sstable sst, mutation_reader sst_reader, reader_permit permit) {
|
|
return make_mutation_reader<auto_closed_sstable_reader>(std::move(sst), std::move(sst_reader), std::move(permit));
|
|
}
|
|
|
|
mutation_reader
|
|
sstable_set::make_range_sstable_reader(
|
|
schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor_generator& monitor_generator,
|
|
integrity_check integrity) const
|
|
{
|
|
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, integrity]
|
|
(shared_sstable& sst, const dht::partition_range& pr) mutable {
|
|
return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst), integrity);
|
|
};
|
|
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
|
shared_from_this(),
|
|
pr,
|
|
std::move(trace_state),
|
|
std::move(reader_factory_fn)),
|
|
fwd,
|
|
fwd_mr);
|
|
}
|
|
|
|
mutation_reader
|
|
sstable_set::make_local_shard_sstable_reader(
|
|
schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor_generator& monitor_generator,
|
|
const sstable_predicate& predicate,
|
|
combined_reader_statistics* statistics,
|
|
integrity_check integrity) const
|
|
{
|
|
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate, integrity]
|
|
(shared_sstable& sst, const dht::partition_range& pr) mutable {
|
|
scylla_assert(!sst->is_shared());
|
|
if (!predicate(*sst)) {
|
|
return make_empty_mutation_reader(s, permit);
|
|
}
|
|
auto reader = sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst), integrity);
|
|
// Auto-closed sstable reader is only enabled in the context of fast-forward to partition ranges
|
|
if (!fwd && fwd_mr) {
|
|
return make_auto_closed_sstable_reader(sst, std::move(reader), permit);
|
|
}
|
|
return reader;
|
|
};
|
|
if (_impl->size() == 1) [[unlikely]] {
|
|
auto sstables = _impl->all();
|
|
auto sst = *sstables->begin();
|
|
return reader_factory_fn(sst, pr);
|
|
}
|
|
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
|
shared_from_this(),
|
|
pr,
|
|
std::move(trace_state),
|
|
std::move(reader_factory_fn)),
|
|
fwd,
|
|
fwd_mr,
|
|
statistics);
|
|
}
|
|
|
|
mutation_reader sstable_set::make_full_scan_reader(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_ptr,
|
|
read_monitor_generator& monitor_generator,
|
|
integrity_check integrity) const {
|
|
std::vector<mutation_reader> readers;
|
|
readers.reserve(size());
|
|
for_each_sstable([&] (const shared_sstable& sst) mutable {
|
|
readers.emplace_back(sst->make_full_scan_reader(schema, permit, trace_ptr, monitor_generator(sst), integrity));
|
|
});
|
|
return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
}
|
|
|
|
unsigned sstable_set_overlapping_count(const schema_ptr& schema, const std::vector<shared_sstable>& sstables) {
|
|
unsigned overlapping_sstables = 0;
|
|
auto prev_last = dht::ring_position::min();
|
|
for (auto& sst : sstables) {
|
|
if (dht::ring_position(sst->get_first_decorated_key()).tri_compare(*schema, prev_last) <= 0) {
|
|
overlapping_sstables++;
|
|
}
|
|
prev_last = dht::ring_position(sst->get_last_decorated_key());
|
|
}
|
|
return overlapping_sstables;
|
|
}
|
|
|
|
} // namespace sstables
|