mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 00:13:31 +00:00
sstables: make partitioned sstable set's incremental selector resilient to changes in the set
The motivation is that compaction may remove a sstable from the set while the incremental selector is alive, and for that to work, we need to invalidate the iterators stored by the selector. We could have added a method to notify it, but there will be a case where the one keeping the set cannot forward the notification to the selector. So it's better for the selector to take care of itself. Change counter approach is used which allows the selector to know when to invalidate the iterators. After invalidation, selector will move the iterator back into its right place by looking for lower bound for current pos. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -202,6 +202,9 @@ private:
|
||||
schema_ptr _schema;
|
||||
std::vector<shared_sstable> _unleveled_sstables;
|
||||
interval_map_type _leveled_sstables;
|
||||
// Change counter on interval map for leveled sstables which is used by
|
||||
// incremental selector to determine whether or not to invalidate iterators.
|
||||
uint64_t _leveled_sstables_change_cnt = 0;
|
||||
private:
|
||||
static interval_type make_interval(const schema& s, const dht::partition_range& range) {
|
||||
return interval_type::closed(
|
||||
@@ -283,6 +286,7 @@ public:
|
||||
if (sst->get_sstable_level() == 0) {
|
||||
_unleveled_sstables.push_back(std::move(sst));
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.add({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
@@ -290,6 +294,7 @@ public:
|
||||
if (sst->get_sstable_level() == 0) {
|
||||
_unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end());
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.subtract({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
@@ -300,13 +305,15 @@ public:
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
const interval_map_type& _leveled_sstables;
|
||||
const uint64_t& _leveled_sstables_change_cnt;
|
||||
uint64_t _last_known_leveled_sstables_change_cnt;
|
||||
map_iterator _it;
|
||||
const map_iterator _end;
|
||||
// Only to back the dht::ring_position_view returned from select().
|
||||
dht::ring_position _next_position;
|
||||
private:
|
||||
dht::ring_position_view next_position(map_iterator it) {
|
||||
if (it == _end) {
|
||||
if (it == _leveled_sstables.end()) {
|
||||
_next_position = dht::ring_position::max();
|
||||
return dht::ring_position_view::max();
|
||||
} else {
|
||||
@@ -321,12 +328,21 @@ private:
|
||||
return crpv <= interval.lower();
|
||||
}
|
||||
}
|
||||
void maybe_invalidate_iterator(const compatible_ring_position_view& crpv) {
|
||||
if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) {
|
||||
_it = _leveled_sstables.lower_bound(interval_type::closed(crpv, crpv));
|
||||
_last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt;
|
||||
}
|
||||
}
|
||||
public:
|
||||
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables)
|
||||
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
||||
const uint64_t& leveled_sstables_change_cnt)
|
||||
: _schema(std::move(schema))
|
||||
, _unleveled_sstables(unleveled_sstables)
|
||||
, _leveled_sstables(leveled_sstables)
|
||||
, _leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _it(leveled_sstables.begin())
|
||||
, _end(leveled_sstables.end())
|
||||
, _next_position(dht::ring_position::min()) {
|
||||
}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view& pos) override {
|
||||
@@ -334,7 +350,9 @@ public:
|
||||
auto ssts = _unleveled_sstables;
|
||||
using namespace dht;
|
||||
|
||||
while (_it != _end) {
|
||||
maybe_invalidate_iterator(crpv);
|
||||
|
||||
while (_it != _leveled_sstables.end()) {
|
||||
if (boost::icl::contains(_it->first, crpv)) {
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it)));
|
||||
@@ -350,7 +368,7 @@ public:
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables);
|
||||
return std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt);
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> compaction_strategy_impl::make_sstable_set(schema_ptr schema) const {
|
||||
|
||||
@@ -50,8 +50,7 @@ public:
|
||||
void erase(shared_sstable sst);
|
||||
|
||||
// Used to incrementally select sstables from sstable set using ring-position.
|
||||
// sstable set must be alive and cannot be modified while incremental
|
||||
// selector is used.
|
||||
// sstable set must be alive during the lifetime of the selector.
|
||||
class incremental_selector {
|
||||
std::unique_ptr<incremental_selector_impl> _impl;
|
||||
dht::ring_position_comparator _cmp;
|
||||
|
||||
Reference in New Issue
Block a user