diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 90d718d37e..d8b5052a8f 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -202,6 +202,9 @@ private: schema_ptr _schema; std::vector _unleveled_sstables; interval_map_type _leveled_sstables; + // Change counter on interval map for leveled sstables which is used by + // incremental selector to determine whether or not to invalidate iterators. + uint64_t _leveled_sstables_change_cnt = 0; private: static interval_type make_interval(const schema& s, const dht::partition_range& range) { return interval_type::closed( @@ -283,6 +286,7 @@ public: if (sst->get_sstable_level() == 0) { _unleveled_sstables.push_back(std::move(sst)); } else { + _leveled_sstables_change_cnt++; _leveled_sstables.add({make_interval(*sst), value_set({sst})}); } } @@ -290,6 +294,7 @@ public: if (sst->get_sstable_level() == 0) { _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); } else { + _leveled_sstables_change_cnt++; _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); } } @@ -300,13 +305,15 @@ public: class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; const std::vector& _unleveled_sstables; + const interval_map_type& _leveled_sstables; + const uint64_t& _leveled_sstables_change_cnt; + uint64_t _last_known_leveled_sstables_change_cnt; map_iterator _it; - const map_iterator _end; // Only to back the dht::ring_position_view returned from select(). dht::ring_position _next_position; private: dht::ring_position_view next_position(map_iterator it) { - if (it == _end) { + if (it == _leveled_sstables.end()) { _next_position = dht::ring_position::max(); return dht::ring_position_view::max(); } else { @@ -321,12 +328,21 @@ private: return crpv <= interval.lower(); } } + void maybe_invalidate_iterator(const compatible_ring_position_view& crpv) { + if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { + _it = _leveled_sstables.lower_bound(interval_type::closed(crpv, crpv)); + _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; + } + } public: - incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables) + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, + const uint64_t& leveled_sstables_change_cnt) : _schema(std::move(schema)) , _unleveled_sstables(unleveled_sstables) + , _leveled_sstables(leveled_sstables) + , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) , _it(leveled_sstables.begin()) - , _end(leveled_sstables.end()) , _next_position(dht::ring_position::min()) { } virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { @@ -334,7 +350,9 @@ public: auto ssts = _unleveled_sstables; using namespace dht; - while (_it != _end) { + maybe_invalidate_iterator(crpv); + + while (_it != _leveled_sstables.end()) { if (boost::icl::contains(_it->first, crpv)) { ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); @@ -350,7 +368,7 @@ public: }; std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables); + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); } std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index ae0fe0017f..816cb8a529 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -50,8 +50,7 @@ public: void erase(shared_sstable sst); // Used to incrementally select sstables from sstable set using ring-position. - // sstable set must be alive and cannot be modified while incremental - // selector is used. + // sstable set must be alive during the lifetime of the selector. class incremental_selector { std::unique_ptr _impl; dht::ring_position_comparator _cmp;