diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index d9f42b0471..4dc4600057 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -70,8 +70,7 @@ sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) sstable_set::sstable_set(const sstable_set& x) : _impl(x._impl->clone()) - , _schema(x._schema) - , _all_runs(x._all_runs) { + , _schema(x._schema) { } sstable_set::sstable_set(sstable_set&&) noexcept = default; @@ -95,6 +94,11 @@ sstable_set::select(const dht::partition_range& range) const { std::vector sstable_set::select_sstable_runs(const std::vector& sstables) const { + return _impl->select_sstable_runs(sstables); +} + +std::vector +partitioned_sstable_set::select_sstable_runs(const std::vector& sstables) const { auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { return _all_runs.at(run_id); @@ -113,18 +117,11 @@ void sstable_set::for_each_sstable(std::function fu void sstable_set::insert(shared_sstable sst) { _impl->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _impl->erase(sst); - throw; - } } void sstable_set::erase(shared_sstable sst) { _impl->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); } sstable_set::~sstable_set() = default; @@ -261,11 +258,16 @@ void partitioned_sstable_set::for_each_sstable(std::functioninsert(sst); try { - if (store_as_unleveled(sst)) { - _unleveled_sstables.push_back(std::move(sst)); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.add({make_interval(*sst), value_set({sst})}); + _all_runs[sst->run_identifier()].insert(sst); + try { + if (store_as_unleveled(sst)) { + _unleveled_sstables.push_back(sst); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.add({make_interval(*sst), value_set({sst})}); + } + } catch (...) { + _all_runs[sst->run_identifier()].erase(sst); } } catch (...) { _all->erase(sst); @@ -273,6 +275,7 @@ void partitioned_sstable_set::insert(shared_sstable sst) { } void partitioned_sstable_set::erase(shared_sstable sst) { + _all_runs[sst->run_identifier()].erase(sst); _all->erase(sst); if (store_as_unleveled(sst)) { _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); @@ -683,6 +686,11 @@ filter_sstable_for_reader_by_ck(std::vector&& sstables, column_f return sstables; } +std::vector +sstable_set_impl::select_sstable_runs(const std::vector& sstables) const { + throw_with_backtrace(); +} + flat_mutation_reader sstable_set_impl::create_single_key_sstable_reader( column_family* cf, diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index a99b0f7458..1c951dc20e 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -52,7 +52,6 @@ public: class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; - std::unordered_map _all_runs; public: ~sstable_set(); sstable_set(std::unique_ptr impl, schema_ptr s); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 01a5d5ba98..d14ada5f8c 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -39,6 +39,7 @@ public: virtual ~sstable_set_impl() {} virtual std::unique_ptr clone() const = 0; virtual std::vector select(const dht::partition_range& range) const = 0; + virtual std::vector select_sstable_runs(const std::vector& sstables) const; virtual lw_shared_ptr all() const = 0; virtual void for_each_sstable(std::function func) const = 0; virtual void insert(shared_sstable sst) = 0; @@ -70,6 +71,7 @@ private: std::vector _unleveled_sstables; interval_map_type _leveled_sstables; lw_shared_ptr _all; + std::unordered_map _all_runs; // Change counter on interval map for leveled sstables which is used by // incremental selector to determine whether or not to invalidate iterators. uint64_t _leveled_sstables_change_cnt = 0; @@ -91,6 +93,7 @@ public: virtual std::unique_ptr clone() const override; virtual std::vector select(const dht::partition_range& range) const override; + virtual std::vector select_sstable_runs(const std::vector& sstables) const override; virtual lw_shared_ptr all() const override; virtual void for_each_sstable(std::function func) const override; virtual void insert(shared_sstable sst) override;