table: tablet_storage_group_manager: make tablet_sstable_set
Make a specialized sstable_set for tablets via tablet_storage_group_manager::make_sstable_set. This sstable set takes a snapshot of the storage_groups (compound) sstable_sets and maps the selected tokens directly into the tablet compound_sstable_set. Refs #16876 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -1240,6 +1240,8 @@ public:
|
||||
friend class compaction_group;
|
||||
};
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);
|
||||
|
||||
using user_types_metadata = data_dictionary::user_types_metadata;
|
||||
|
||||
using keyspace_metadata = data_dictionary::keyspace_metadata;
|
||||
|
||||
@@ -688,11 +688,8 @@ public:
|
||||
future<> maybe_split_compaction_group_of(size_t idx) override;
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
||||
// TODO: switch to a specialized set for groups which assumes disjointness across compound sets and incrementally read from them.
|
||||
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
|
||||
auto sstable_sets = boost::copy_range<std::vector<lw_shared_ptr<sstables::sstable_set>>>(compaction_groups()
|
||||
| boost::adaptors::transformed(std::mem_fn(&compaction_group::make_compound_sstable_set)));
|
||||
return make_lw_shared(sstables::make_compound_sstable_set(schema(), std::move(sstable_sets)));
|
||||
return make_tablet_sstable_set(schema(), *this, *_tablet_map);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
#include "replica/database.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include "replica/tablet_mutation_builder.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "dht/token.hh"
|
||||
|
||||
namespace replica {
|
||||
|
||||
@@ -308,4 +310,289 @@ future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<r
|
||||
co_return std::move(result);
|
||||
}
|
||||
|
||||
// This sstable set provides access to all the stables in the table, using a snapshot of all
|
||||
// its tablets/storage_groups compound_sstable_set:s.
|
||||
// The managed sets cannot be modified through tablet_sstable_set, but only jointly read from, so insert() and erase() are disabled.
|
||||
class tablet_sstable_set : public sstables::sstable_set_impl {
|
||||
schema_ptr _schema;
|
||||
locator::tablet_map _tablet_map;
|
||||
// Keep a single (compound) sstable_set per tablet/storage_group
|
||||
std::vector<lw_shared_ptr<sstables::sstable_set>> _sstable_sets;
|
||||
size_t _size = 0;
|
||||
uint64_t _bytes_on_disk = 0;
|
||||
|
||||
public:
|
||||
tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap)
|
||||
: _schema(std::move(s))
|
||||
, _tablet_map(tmap.tablet_count())
|
||||
{
|
||||
_sstable_sets.reserve(sgm.storage_groups().size());
|
||||
for (const auto& sg : sgm.storage_groups()) {
|
||||
if (sg) {
|
||||
auto set = sg->make_sstable_set();
|
||||
_size += set->size();
|
||||
_bytes_on_disk += set->bytes_on_disk();
|
||||
_sstable_sets.emplace_back(std::move(set));
|
||||
} else {
|
||||
_sstable_sets.emplace_back();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tablet_sstable_set(const tablet_sstable_set& o)
|
||||
: _schema(o._schema)
|
||||
, _tablet_map(o._tablet_map.tablet_count())
|
||||
, _sstable_sets(o._sstable_sets)
|
||||
, _size(o._size)
|
||||
, _bytes_on_disk(o._bytes_on_disk)
|
||||
{}
|
||||
|
||||
static lw_shared_ptr<sstables::sstable_set> make(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) {
|
||||
return make_lw_shared<sstables::sstable_set>(std::make_unique<tablet_sstable_set>(std::move(s), sgm, tmap));
|
||||
}
|
||||
|
||||
const schema_ptr& schema() const noexcept {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override {
|
||||
return std::make_unique<tablet_sstable_set>(*this);
|
||||
}
|
||||
|
||||
virtual std::vector<sstables::shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual lw_shared_ptr<const sstable_list> all() const override;
|
||||
virtual stop_iteration for_each_sstable_until(std::function<stop_iteration(const sstables::shared_sstable&)> func) const override;
|
||||
virtual future<stop_iteration> for_each_sstable_gently_until(std::function<future<stop_iteration>(const sstables::shared_sstable&)> func) const override;
|
||||
virtual bool insert(sstables::shared_sstable sst) override;
|
||||
virtual bool erase(sstables::shared_sstable sst) override;
|
||||
virtual size_t size() const noexcept override {
|
||||
return _size;
|
||||
}
|
||||
virtual uint64_t bytes_on_disk() const noexcept override {
|
||||
return _bytes_on_disk;
|
||||
}
|
||||
virtual selector_and_schema_t make_incremental_selector() const override;
|
||||
|
||||
virtual flat_mutation_reader_v2 create_single_key_sstable_reader(
|
||||
replica::column_family*,
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
utils::estimated_histogram&,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding,
|
||||
const sstables::sstable_predicate&) const override;
|
||||
|
||||
private:
|
||||
size_t group_of(const dht::token& t) const noexcept {
|
||||
return _tablet_map.get_tablet_id(t).id;
|
||||
}
|
||||
dht::token first_token_of(size_t idx) const noexcept {
|
||||
#ifndef SCYLLA_BUILD_MODE_RELEASE
|
||||
if (idx >= _tablet_map.tablet_count()) {
|
||||
on_fatal_internal_error(tablet_logger, format("first_token_of: idx={} out of range", idx));
|
||||
}
|
||||
#endif
|
||||
return _tablet_map.get_first_token(tablet_id(idx));
|
||||
}
|
||||
dht::token last_token_of(size_t idx) const noexcept {
|
||||
#ifndef SCYLLA_BUILD_MODE_RELEASE
|
||||
if (idx >= _tablet_map.tablet_count()) {
|
||||
on_fatal_internal_error(tablet_logger, format("last_token_of: idx={} out of range", idx));
|
||||
}
|
||||
#endif
|
||||
return _tablet_map.get_last_token(tablet_id(idx));
|
||||
}
|
||||
stop_iteration for_each_sstable_set_until(const dht::partition_range&, std::function<stop_iteration(lw_shared_ptr<sstables::sstable_set>)>) const;
|
||||
future<stop_iteration> for_each_sstable_set_gently_until(const dht::partition_range&, std::function<future<stop_iteration>(lw_shared_ptr<sstables::sstable_set>)>) const;
|
||||
|
||||
friend class tablet_incremental_selector;
|
||||
};
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) {
|
||||
return tablet_sstable_set::make(std::move(s), sgm, tmap);
|
||||
}
|
||||
|
||||
stop_iteration tablet_sstable_set::for_each_sstable_set_until(const dht::partition_range& pr, std::function<stop_iteration(lw_shared_ptr<sstables::sstable_set>)> func) const {
|
||||
size_t candidate_start = pr.start() ? group_of(pr.start()->value().token()) : size_t(0);
|
||||
size_t candidate_end = pr.end() ? group_of(pr.end()->value().token()) : (_sstable_sets.size() - 1);
|
||||
for (auto i = candidate_start; i <= candidate_end; i++) {
|
||||
if (const auto& set = _sstable_sets[i]) {
|
||||
if (func(set) == stop_iteration::yes) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
future<stop_iteration> tablet_sstable_set::for_each_sstable_set_gently_until(const dht::partition_range& pr, std::function<future<stop_iteration>(lw_shared_ptr<sstables::sstable_set>)> func) const {
|
||||
size_t candidate_start = pr.start() ? group_of(pr.start()->value().token()) : size_t(0);
|
||||
size_t candidate_end = pr.end() ? group_of(pr.end()->value().token()) : (_sstable_sets.size() - 1);
|
||||
for (auto i = candidate_start; i <= candidate_end; i++) {
|
||||
if (const auto& set = _sstable_sets[i]) {
|
||||
if (co_await func(set) == stop_iteration::yes) {
|
||||
co_return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> tablet_sstable_set::select(const dht::partition_range& range) const {
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
for_each_sstable_set_until(range, [&] (lw_shared_ptr<sstables::sstable_set> set) {
|
||||
auto ssts = set->select(range);
|
||||
if (ret.empty()) {
|
||||
ret = std::move(ssts);
|
||||
} else {
|
||||
std::move(ssts.begin(), ssts.end(), std::back_inserter(ret));
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
tablet_logger.debug("tablet_sstable_set::select: range={} ret={}", range, ret.size());
|
||||
return ret;
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstable_list> tablet_sstable_set::all() const {
|
||||
auto ret = make_lw_shared<sstable_list>();
|
||||
for_each_sstable_set_until(query::full_partition_range, [&] (lw_shared_ptr<sstables::sstable_set> set) {
|
||||
set->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
||||
ret->insert(sst);
|
||||
});
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
stop_iteration tablet_sstable_set::for_each_sstable_until(std::function<stop_iteration(const sstables::shared_sstable&)> func) const {
|
||||
return for_each_sstable_set_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr<sstables::sstable_set> set) {
|
||||
return set->for_each_sstable_until(func);
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> tablet_sstable_set::for_each_sstable_gently_until(std::function<future<stop_iteration>(const sstables::shared_sstable&)> func) const {
|
||||
return for_each_sstable_set_gently_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr<sstables::sstable_set> set) {
|
||||
return set->for_each_sstable_gently_until(func);
|
||||
});
|
||||
}
|
||||
|
||||
bool tablet_sstable_set::insert(sstables::shared_sstable sst) {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
}
|
||||
bool tablet_sstable_set::erase(sstables::shared_sstable sst) {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
}
|
||||
|
||||
class tablet_incremental_selector : public sstables::incremental_selector_impl {
|
||||
const tablet_sstable_set& _tset;
|
||||
|
||||
// _cur_set and _cur_selector contain a snapshot
|
||||
// for the currently selected compaction_group.
|
||||
lw_shared_ptr<sstables::sstable_set> _cur_set;
|
||||
std::optional<sstables::sstable_set::incremental_selector> _cur_selector;
|
||||
dht::token _lowest_next_token = dht::maximum_token();
|
||||
|
||||
public:
|
||||
tablet_incremental_selector(const tablet_sstable_set& tset)
|
||||
: _tset(tset)
|
||||
{}
|
||||
|
||||
virtual std::tuple<dht::partition_range, std::vector<sstables::shared_sstable>, dht::ring_position_ext> select(const dht::ring_position_view& pos) override {
|
||||
// Always return minimum singular range, such that incremental_selector::select() will always call this function,
|
||||
// which in turn will find the next sstable set to select sstables from.
|
||||
const dht::partition_range current_range = dht::partition_range::make_singular(dht::ring_position::min());
|
||||
|
||||
// pos must be monotonically increasing in the weak sense
|
||||
// but caller can skip to a position outside the current set
|
||||
auto token = pos.token();
|
||||
if (!_cur_set || pos.token() >= _lowest_next_token) {
|
||||
auto idx = _tset.group_of(token);
|
||||
if (!token.is_maximum()) {
|
||||
_cur_set = _tset._sstable_sets[idx];
|
||||
}
|
||||
// Set the next token to point to the next engaged storage group.
|
||||
// It will be considered later on when the _cur_set is exhausted
|
||||
_lowest_next_token = find_lowest_next_token(idx+1);
|
||||
}
|
||||
|
||||
if (!_cur_set) {
|
||||
auto lowest_next_position = _lowest_next_token.is_maximum()
|
||||
? dht::ring_position_ext::max()
|
||||
: dht::ring_position_ext::starting_at(_lowest_next_token);
|
||||
tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning 0 sstables, next_pos={}",
|
||||
_tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, lowest_next_position);
|
||||
return std::make_tuple(std::move(current_range), std::vector<sstables::shared_sstable>{}, lowest_next_position);
|
||||
}
|
||||
|
||||
_cur_selector.emplace(_cur_set->make_incremental_selector());
|
||||
|
||||
auto res = _cur_selector->select(pos);
|
||||
// Return all sstables selected on the requested position from the first matching sstable set.
|
||||
// This assumes that the underlying sstable sets are disjoint in their token ranges so
|
||||
// only one of them contain any given token.
|
||||
auto sstables = std::move(res.sstables);
|
||||
// Return the lowest next position, such that this function will be called again to select the
|
||||
// lowest next position from the selector which previously returned it.
|
||||
// Until the current selector is exhausted. In that case,
|
||||
// jump to the next compaction_group sstable set.
|
||||
dht::ring_position_ext next_position = res.next_position;
|
||||
if (next_position.is_max()) {
|
||||
// _cur_selector is exhausted.
|
||||
// Return a position starting at `_lowest_next_token`
|
||||
// that was calculated for the _cur_set
|
||||
// (unless it's already maximum_token in which case we just return next_position == ring_position::max()).
|
||||
_cur_set = {};
|
||||
_cur_selector.reset();
|
||||
if (!_lowest_next_token.is_maximum()) {
|
||||
next_position = dht::ring_position_ext::starting_at(_lowest_next_token);
|
||||
}
|
||||
}
|
||||
|
||||
tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning {} sstables, next_pos={}",
|
||||
_tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, sstables.size(), next_position);
|
||||
return std::make_tuple(std::move(current_range), std::move(sstables), std::move(next_position));
|
||||
}
|
||||
|
||||
private:
|
||||
// Find the start token of the first engaged sstable_set
|
||||
// starting the search from `idx`.
|
||||
dht::token find_lowest_next_token(size_t idx) {
|
||||
while (idx < _tset._sstable_sets.size()) {
|
||||
if (_tset._sstable_sets[idx]) {
|
||||
return _tset.first_token_of(idx);
|
||||
}
|
||||
++idx;
|
||||
}
|
||||
return dht::maximum_token();
|
||||
}
|
||||
};
|
||||
|
||||
flat_mutation_reader_v2
|
||||
tablet_sstable_set::create_single_key_sstable_reader(
|
||||
replica::column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
const sstables::sstable_predicate& predicate) const {
|
||||
// The singular partition_range start bound must be engaged.
|
||||
auto idx = group_of(pr.start()->value().token());
|
||||
const auto& set = _sstable_sets[idx];
|
||||
if (!set) {
|
||||
return make_empty_flat_reader_v2(cf->schema(), std::move(permit));
|
||||
}
|
||||
return set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
|
||||
}
|
||||
|
||||
sstables::sstable_set_impl::selector_and_schema_t tablet_sstable_set::make_incremental_selector() const {
|
||||
return std::make_tuple(std::make_unique<tablet_incremental_selector>(*this), *_schema);
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
Reference in New Issue
Block a user