From ace070c8fc2e777a099eea70bb413269bae2adbe Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 27 Jul 2018 22:15:39 -0300 Subject: [PATCH] sstables: make partitioned sstable set's incremental selector resilient to changes in the set The motivation is that compaction may remove a sstable from the set while the incremental selector is alive, and for that to work, we need to invalidate the iterators stored by the selector. We could have added a method to notify it, but there will be a case where the one keeping the set cannot forward the notification to the selector. So it's better for the selector to take care of itself. Change counter approach is used which allows the selector to know when to invalidate the iterators. After invalidation, selector will move the iterator back into its right place by looking for lower bound for current pos. Signed-off-by: Raphael S. Carvalho --- sstables/compaction_strategy.cc | 30 ++++++++++++++++++++++++------ sstables/sstable_set.hh | 3 +-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 90d718d37e..d8b5052a8f 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -202,6 +202,9 @@ private: schema_ptr _schema; std::vector _unleveled_sstables; interval_map_type _leveled_sstables; + // Change counter on interval map for leveled sstables which is used by + // incremental selector to determine whether or not to invalidate iterators. + uint64_t _leveled_sstables_change_cnt = 0; private: static interval_type make_interval(const schema& s, const dht::partition_range& range) { return interval_type::closed( @@ -283,6 +286,7 @@ public: if (sst->get_sstable_level() == 0) { _unleveled_sstables.push_back(std::move(sst)); } else { + _leveled_sstables_change_cnt++; _leveled_sstables.add({make_interval(*sst), value_set({sst})}); } } @@ -290,6 +294,7 @@ public: if (sst->get_sstable_level() == 0) { _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); } else { + _leveled_sstables_change_cnt++; _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); } } @@ -300,13 +305,15 @@ public: class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; const std::vector& _unleveled_sstables; + const interval_map_type& _leveled_sstables; + const uint64_t& _leveled_sstables_change_cnt; + uint64_t _last_known_leveled_sstables_change_cnt; map_iterator _it; - const map_iterator _end; // Only to back the dht::ring_position_view returned from select(). dht::ring_position _next_position; private: dht::ring_position_view next_position(map_iterator it) { - if (it == _end) { + if (it == _leveled_sstables.end()) { _next_position = dht::ring_position::max(); return dht::ring_position_view::max(); } else { @@ -321,12 +328,21 @@ private: return crpv <= interval.lower(); } } + void maybe_invalidate_iterator(const compatible_ring_position_view& crpv) { + if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { + _it = _leveled_sstables.lower_bound(interval_type::closed(crpv, crpv)); + _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; + } + } public: - incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables) + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, + const uint64_t& leveled_sstables_change_cnt) : _schema(std::move(schema)) , _unleveled_sstables(unleveled_sstables) + , _leveled_sstables(leveled_sstables) + , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) , _it(leveled_sstables.begin()) - , _end(leveled_sstables.end()) , _next_position(dht::ring_position::min()) { } virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { @@ -334,7 +350,9 @@ public: auto ssts = _unleveled_sstables; using namespace dht; - while (_it != _end) { + maybe_invalidate_iterator(crpv); + + while (_it != _leveled_sstables.end()) { if (boost::icl::contains(_it->first, crpv)) { ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); @@ -350,7 +368,7 @@ public: }; std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables); + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); } std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index ae0fe0017f..816cb8a529 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -50,8 +50,7 @@ public: void erase(shared_sstable sst); // Used to incrementally select sstables from sstable set using ring-position. - // sstable set must be alive and cannot be modified while incremental - // selector is used. + // sstable set must be alive during the lifetime of the selector. class incremental_selector { std::unique_ptr _impl; dht::ring_position_comparator _cmp;