diff --git a/api/storage_service.cc b/api/storage_service.cc index f5cdbbc5c7..67d530f491 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -987,8 +987,8 @@ void set_storage_service(http_context& ctx, routes& r) { ss::table_sstables tst; tst.keyspace = schema->ks_name(); tst.table = schema->cf_name(); - - for (auto sstable : *t->get_sstables_including_compacted_undeleted()) { + auto sstables = t->get_sstables_including_compacted_undeleted(); + for (auto sstable : *sstables) { auto ts = db_clock::to_time_t(sstable->data_file_write_time()); ::tm t; ::gmtime_r(&ts, &t); diff --git a/configure.py b/configure.py index 3b79b6c42a..d26089fa7d 100755 --- a/configure.py +++ b/configure.py @@ -392,6 +392,7 @@ scylla_tests = set([ 'test/boost/sstable_conforms_to_mutation_source_test', 'test/boost/sstable_resharding_test', 'test/boost/sstable_directory_test', + 'test/boost/sstable_set_test', 'test/boost/sstable_test', 'test/boost/sstable_move_test', 'test/boost/storage_proxy_test', @@ -435,6 +436,7 @@ scylla_tests = set([ 'test/perf/perf_row_cache_reads', 'test/perf/perf_simple_query', 'test/perf/perf_sstable', + 'test/perf/perf_sstable_set', 'test/unit/lsa_async_eviction_test', 'test/unit/lsa_sync_eviction_test', 'test/unit/row_cache_alloc_stress_test', diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 703d8ce454..150b593965 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -67,7 +67,7 @@ future<> view_update_generator::start() { // Exploit the fact that sstables in the staging directory // are usually non-overlapping and use a partitioned set for // the read. - auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, make_lw_shared(sstable_list{}), false)); + auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); for (auto& sst : sstables) { ssts->insert(sst); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 1da6d191a7..6f11969e9c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2606,8 +2606,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, size_t nr_sst_current = 0; while (!sstables.empty()) { auto ops_uuid = utils::make_random_uuid(); - auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, - make_lw_shared(sstable_list{}), false)); + auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); size_t batch_sst_nr = 16; std::vector sst_names; std::vector sst_processed; diff --git a/sstables/shared_sstable.hh b/sstables/shared_sstable.hh index 736fa06244..7071db9f62 100644 --- a/sstables/shared_sstable.hh +++ b/sstables/shared_sstable.hh @@ -46,7 +46,6 @@ struct lw_shared_ptr_deleter { namespace sstables { using shared_sstable = seastar::lw_shared_ptr; -using sstable_list = std::unordered_set; } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 4b89b4a9ec..69124810db 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -63,93 +63,526 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { return os; } -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) - : _impl(std::move(impl)) - , _schema(std::move(s)) - , _all(std::move(all)) { +sstable_set_data::sstable_set_data(std::unique_ptr impl) + : impl(std::move(impl)) { } -sstable_set::sstable_set(const sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared(*x._all)) - , _all_runs(x._all_runs) { -} - -sstable_set::sstable_set(sstable_set&&) noexcept = default; - -sstable_set& -sstable_set::operator=(const sstable_set& x) { - if (this != &x) { - auto tmp = sstable_set(x); - *this = std::move(tmp); +void sstable_set_data::insert(shared_sstable sst) { + auto it = sstables_and_times_added.find(sst); + if (it != sstables_and_times_added.end()) { + // the sstable has already been added in some version + it->second++; + return; } - return *this; -} - -sstable_set& -sstable_set::operator=(sstable_set&&) noexcept = default; - -std::vector -sstable_set::select(const dht::partition_range& range) const { - return _impl->select(range); -} - -std::vector -sstable_set::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); -} - -void -sstable_set::insert(shared_sstable sst) { - _impl->insert(sst); try { - _all->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _all->erase(sst); - throw; - } + impl->insert(sst); + all_runs[sst->run_identifier()].insert(sst); + sstables_and_times_added.emplace(sst, 1); } catch (...) { - _impl->erase(sst); + impl->erase(sst); + auto runs_it = all_runs.find(sst->run_identifier()); + if (runs_it != all_runs.end()) { + runs_it->second.erase(sst); + if (runs_it->second.empty()) { + all_runs.erase(runs_it); + } + } throw; } } -void -sstable_set::erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); +std::vector sstable_set_data::select(const dht::partition_range& range) const { + return impl->select(range); } -sstable_set::~sstable_set() = default; +std::unordered_set sstable_set_data::select_by_run_id(utils::UUID run_id) const { + return all_runs.at(run_id); +} -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) - : _impl(std::move(impl)) - , _cmp(s) { +// Called when a version that was adding an sstable was removed or when the sstable was later erased in that version. +void sstable_set_data::remove(shared_sstable sst) { + if (--sstables_and_times_added.at(sst) == 0) { + impl->erase(sst); + all_runs[sst->run_identifier()].erase(sst); + sstables_and_times_added.erase(sst); + } +} + +sstable_set_version_reference::~sstable_set_version_reference() { + if (_p) { + _p->remove_reference(); + if (_p->can_merge_with_next()) { + // merging will destroy the last reference to the version and the version will be deleted as a result + _p->merge_with_next(); + } else if (_p->can_delete()) { + delete _p; + _p = nullptr; + } + } +} + +sstable_set_version_reference::sstable_set_version_reference(sstable_set_version* p) : _p(p) { + if (_p) { + _p->add_reference(); + } +} + +sstable_set_version_reference::sstable_set_version_reference(const sstable_set_version_reference& ref) : _p(ref._p) { + if (_p) { + _p->add_reference(); + } +} + +sstable_set_version_reference::sstable_set_version_reference(sstable_set_version_reference&& ref) noexcept : _p(ref._p) { + ref._p = nullptr; +} + +sstable_set_version_reference& sstable_set_version_reference::operator=(const sstable_set_version_reference& ref) { + *this = sstable_set_version_reference(ref); + return *this; +} + +sstable_set_version_reference& sstable_set_version_reference::operator=(sstable_set_version_reference&& ref) noexcept { + if (this != &ref) { + // Destroying this reference may invalide other references, so we're taking over the pointer managed by + // the moved reference, and reassigning it after calling the destructor + auto ptr = ref._p; + ref._p = nullptr; + this->~sstable_set_version_reference(); + _p = ptr; + } + return *this; +} + +static sstable_set_version_reference make_sstable_set_version(std::unique_ptr impl, schema_ptr s) { + sstable_set_version* new_version = new sstable_set_version(std::move(impl), std::move(s)); + return new_version->get_reference_to_this(); +} + +sstable_list::sstable_list(std::unique_ptr impl, schema_ptr s) + : _version(make_sstable_set_version(std::move(impl), std::move(s))) { +} + +sstable_list::sstable_list(const sstable_list& sstl) + : _version(sstl._version->get_reference_to_new_copy()) { + // copying an sstable_list creates a new sstable_set_version +} + +sstable_list::sstable_list(sstable_list&& sstl) noexcept = default; + +sstable_list& sstable_list::operator=(const sstable_list& sstl) { + if (this != &sstl) { + *this = sstable_list(sstl); + } + return *this; +} + +sstable_list& sstable_list::operator=(sstable_list&& sstl) noexcept { + if (this != &sstl) { + this->~sstable_list(); + _version = std::move(sstl._version); + } + return *this; +} + +// Moves the iterator to the next sstable which is contained by the associated sstable_set, or to the end +// If the iterator already references a satisfying sstable, no changes are made. +void sstable_list::const_iterator::advance() { + while (_it != (*_ver)->all().end() && !(*_ver)->contains(_it->first)) { + _it++; + } +} + +sstable_list::const_iterator::const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver) + : _it(std::move(it)) + , _ver(ver) { + advance(); +} + +sstable_list::const_iterator& sstable_list::const_iterator::operator++() { + assert(_it != (*_ver)->all().end()); + _it++; + advance(); + return *this; +} + +sstable_list::const_iterator sstable_list::const_iterator::operator++(int) { + const_iterator it = *this; + operator++(); + return it; +} + +const shared_sstable& sstable_list::const_iterator::operator*() const { + return _it->first; +} + +bool sstable_list::const_iterator::operator==(const const_iterator& it) const { + assert(_ver == it._ver); + return _it == it._it; +} + +sstable_list::const_iterator sstable_list::begin() const { + return const_iterator(_version->all().begin(), &_version); +} + +sstable_list::const_iterator sstable_list::end() const { + return const_iterator(_version->all().end(), &_version); +} + +size_t sstable_list::size() const { + return _version->size(); +} + +void sstable_list::insert(shared_sstable sst) { + _version = _version->insert(sst); +} + +void sstable_list::erase(shared_sstable sst) { + _version = _version->erase(sst); +} + +bool sstable_list::contains(shared_sstable sst) const { + return _version->contains(sst); +} + +bool sstable_list::empty() const { + return _version->size() == 0; +} + +const sstable_set_version& sstable_list::version() const { + return *_version; +} + +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) { + if (!impl->empty()) { + throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl"); + } + _all = make_lw_shared(std::move(impl), std::move(s)); +} + +sstable_set::sstable_set(const sstable_set& x) + : _all(make_lw_shared(*x._all)) { +} + +sstable_set::sstable_set(sstable_set&& x) noexcept = default; + +sstable_set& sstable_set::operator=(const sstable_set& ssts) { + *this = sstable_set(ssts); + return *this; +} + +sstable_set& sstable_set::operator=(sstable_set&& ssts) noexcept = default; + + +std::vector sstable_set::select(const dht::partition_range& range) const { + return _all->version().select(range); +} + +// Return all runs which contain any of the input sstables. +std::vector sstable_set::select_sstable_runs(const std::vector& sstables) const { + return _all->version().select_sstable_runs(sstables); +} + +lw_shared_ptr sstable_set::all() const { + return _all; +} + +void sstable_set::insert(shared_sstable sst) { + _all->insert(sst); +} + +void sstable_set::erase(shared_sstable sst) { + _all->erase(sst); } sstable_set::incremental_selector::~incremental_selector() = default; sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; -sstable_set::incremental_selector::selection -sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl) + : _impl(std::move(impl)) + , _cmp(s) + , _sstl(std::move(sstl)) { +} + +sstable_set::incremental_selector::selection sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); + std::vector current_versioned_sstables; + std::tie(_current_range, current_versioned_sstables, _current_next_position) = _impl->select(pos); + _current_sstables = boost::copy_range>(current_versioned_sstables + | boost::adaptors::filtered([this] (shared_sstable sst) { return _sstl->contains(sst); }) + | boost::adaptors::transformed([] (shared_sstable sst) { return sst; })); _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); } return {_current_sstables, _current_next_position}; } -sstable_set::incremental_selector -sstable_set::make_incremental_selector() const { - return incremental_selector(_impl->make_incremental_selector(), *_schema); +sstables::sstable_set::incremental_selector sstable_set::make_incremental_selector() const { + return incremental_selector(_all->version().make_incremental_selector(), *_all->version().get_schema(), _all); +} + +sstable_set_version::~sstable_set_version() { + for (auto& added_sst : _added) { + _base_set->remove(added_sst); + } + if (_prev) { + _prev->_next.erase(this); + } +} + +// the sstable_set_impl must be empty +sstable_set_version::sstable_set_version(std::unique_ptr impl, schema_ptr schema) + : _base_set(make_lw_shared(std::move(impl))) + , _schema(std::move(schema)) { +} + +// Creates a new version based on ver +sstable_set_version::sstable_set_version(sstable_set_version* ver) + : _base_set(ver->_base_set) + , _schema(ver->_schema) + , _prev(ver) { + _prev->_next.insert(this); +} + +// Merges changes made in this version into the next version (can be called only when there is a single next version, +// and no further changes can be made to this one, i.e. no sstable_list references this version). +void sstable_set_version::merge_with_next() noexcept { + auto next_version = *_next.begin(); + next_version->_added = std::move(_added); + next_version->_erased = std::move(_erased); + auto next_nh = _next.extract(*_next.begin()); + if (_prev) { + _prev->_next.erase(this); + _prev->_next.insert(std::move(next_nh)); + } + next_version->_prev = std::move(_prev); // destroys this by overwriting the last reference +} + +void sstable_set_version::propagate_inserted_sstable(const shared_sstable& sst) noexcept { + if (_reference_count > _next.size()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + return; + } + for (auto& ver_chck : _next) { + if (!ver_chck->_added.contains(sst)) { + return; + } + } + // Remove the sstable from child versions and get a node handle to insert in this version + auto sst_nh = (*_next.begin())->_added.extract(sst); + for (auto& ver_chck : _next) { + auto nh = ver_chck->_added.extract(sst_nh.value()); + if (!nh.empty()) { + _base_set->remove(nh.value()); + } + } + auto it = _erased.find(sst_nh.value()); + if (it != _erased.end()) { + // If the sstable was erased in this version and added in all its children, its as if it weren't added or inserted in any of them + // because we won't read from this version anymore. + _erased.erase(it); + _base_set->remove(sst_nh.value()); + } else { + auto added_it = _added.insert(std::move(sst_nh)).position; + if (_prev) { + _prev->propagate_inserted_sstable(*added_it); + } + } +} + +void sstable_set_version::propagate_erased_sstable(const shared_sstable& sst) noexcept { + if (_reference_count > _next.size()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + return; + } + for (auto& ver_chck : _next) { + if (!ver_chck->_erased.contains(sst)) { + return; + } + } + // Remove the sstable from child versions and get a node handle to insert in this version + auto sst_nh = (*_next.begin())->_erased.extract(sst); + for (auto& ver_chck : _next) { + ver_chck->_erased.extract(sst_nh.value()); + } + auto it = _added.find(sst_nh.value()); + if (it != _added.end()) { + // If the sstable was added in this version and erased in all its children, its as if it weren't added or inserted in any of them + // because we won't read from this version anymore. + _added.erase(it); + _base_set->remove(sst_nh.value()); + } else { + auto erased_it = _erased.insert(std::move(sst_nh)).position; + if (_prev) { + _prev->propagate_erased_sstable(*erased_it); + } + } +} + +// Called when a reference to the version gets removed - if the reference was from an sstable_list, it's the first time we can propagate any +// changes, and if the reference was from another sstable_set_version, we want to check if there were any changes that were present in all +// versions based on this one, but absent in the version that was just removed. +void sstable_set_version::propagate_changes_from_next_versions() noexcept { + if (_reference_count > _next.size() || _next.empty()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + // Or there are no child versions so there is nothing to propagate + return; + } + sstable_set_version* next_ver = *_next.begin(); + // Propagate additions + for (auto ver : _next) { + if (ver->_added.size() < next_ver->_added.size()) { + next_ver = ver; + } + } + for (auto it = next_ver->_added.begin(); it != next_ver->_added.end();) { + auto& sst = *it; + it++; + propagate_inserted_sstable(sst); + } + + next_ver = *_next.begin(); + // Propagate erasures + for (auto ver : _next) { + if (ver->_erased.size() < next_ver->_erased.size()) { + next_ver = ver; + } + } + for (auto it = next_ver->_erased.begin(); it != next_ver->_erased.end();) { + auto& sst = *it; + it++; + propagate_erased_sstable(sst); + } +} + +const sstable_set_version* sstable_set_version::get_previous_version() const { + return _prev.get(); +} + +bool sstable_set_version::can_merge_with_next() const noexcept { + return _reference_count == 1 && _next.size() == 1; +} + +bool sstable_set_version::can_delete() const noexcept { + return _reference_count == 0; +} + +void sstable_set_version::add_reference() noexcept { + _reference_count++; +} + +void sstable_set_version::remove_reference() noexcept { + _reference_count--; + propagate_changes_from_next_versions(); +} + +schema_ptr sstable_set_version::get_schema() const { + return _schema; +} + +std::vector sstable_set_version::select(const dht::partition_range& range) const { + return boost::copy_range>(_base_set->select(range) + | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); })); +} + +// Return all runs which contain any of the input sstables. +std::vector sstable_set_version::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return sstable_run(boost::copy_range>(_base_set->select_by_run_id(run_id) + | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); }))); + })); +} + +const std::map& sstable_set_version::all() const { + return _base_set->sstables_and_times_added; +} + +// Provides strong exception guarantee +sstable_set_version_reference sstable_set_version::insert(shared_sstable sst) { + if (this->contains(sst)) { + return get_reference_to_this(); + } + if (_next.size()) { + auto sstvr = get_reference_to_new_copy(); + // The new version has no copies based on it, so inserting into it doesn't create another version + return sstvr->insert(sst); + } + auto it = _erased.find(sst); + if (it != _erased.end()) { + _erased.erase(it); + } else { + _base_set->insert(sst); + try { + _added.insert(sst); + if (_prev) { + _prev->propagate_inserted_sstable(sst); + } + } catch (...) { + _base_set->remove(sst); + throw; + } + } + return get_reference_to_this(); +} + +// Provides strong exception guarantee +sstable_set_version_reference sstable_set_version::erase(shared_sstable sst) { + if (!this->contains(sst)) { + return get_reference_to_this(); + } + if (_next.size()) { + auto sstvr = get_reference_to_new_copy(); + // The new version has no copies based on it, so erasing from it doesn't create another version + return sstvr->erase(sst); + } + auto it = _added.find(sst); + if (it != _added.end()) { + _added.erase(it); + _base_set->remove(sst); + } else { + _erased.insert(sst); + if (_prev) { + _prev->propagate_erased_sstable(sst); + } + } + return get_reference_to_this(); +} + +bool sstable_set_version::contains(shared_sstable sst) const { + return _added.contains(sst) || (!_erased.contains(sst) && _prev && _prev->contains(sst)); +} + +size_t sstable_set_version::size() const { + return _added.size() - _erased.size() + (_prev ? _prev->size() : 0); +} + +std::unique_ptr sstable_set_version::make_incremental_selector() const { + return _base_set->impl->make_incremental_selector(); +} + +flat_mutation_reader +sstable_set_version::create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const { + return _base_set->impl->create_single_key_sstable_reader(cf, std::move(schema), + std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + +sstable_set_version_reference sstable_set_version::get_reference_to_this() { + return sstable_set_version_reference(this); +} + +sstable_set_version_reference sstable_set_version::get_reference_to_new_copy() { + return sstable_set_version_reference(new sstable_set_version(this)); } std::unique_ptr bag_sstable_set::clone() const { @@ -171,6 +604,10 @@ void bag_sstable_set::erase(shared_sstable sst) { } } +bool bag_sstable_set::empty() const { + return _sstables.empty(); +} + class bag_sstable_set::incremental_selector : public incremental_selector_impl { const std::vector& _sstables; public: @@ -299,6 +736,10 @@ void partitioned_sstable_set::erase(shared_sstable sst) { } } +bool partitioned_sstable_set::empty() const { + return _unleveled_sstables.empty() && _leveled_sstables.empty(); +} + class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; const std::vector& _unleveled_sstables; @@ -396,6 +837,10 @@ void time_series_sstable_set::erase(shared_sstable sst) { } } +bool time_series_sstable_set::empty() const { + return _sstables->empty(); +} + std::unique_ptr time_series_sstable_set::make_incremental_selector() const { struct selector : public incremental_selector_impl { const time_series_sstable_set& _set; @@ -535,16 +980,15 @@ std::unique_ptr time_window_compaction_strategy::make_sstable_ return std::make_unique(std::move(schema)); } -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); +sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) { + return sstable_set(std::make_unique(schema, use_level_metadata), schema); } sstable_set compaction_strategy::make_sstable_set(schema_ptr schema) const { return sstable_set( _compaction_strategy_impl->make_sstable_set(schema), - schema, - make_lw_shared()); + schema); } using sstable_reader_factory_type = std::function; @@ -830,7 +1274,7 @@ sstable_set::create_single_key_sstable_reader( streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { assert(pr.is_singular() && pr.start()->value().has_key()); - return _impl->create_single_key_sstable_reader(cf, std::move(schema), + return _all->version().create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 8fbcf4c13a..ebe1ae1214 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -24,6 +24,7 @@ #include "flat_mutation_reader.hh" #include "sstables/progress_monitor.hh" #include "shared_sstable.hh" +#include "sstables/sstables.hh" #include "dht/i_partitioner.hh" #include #include @@ -34,31 +35,158 @@ class estimated_histogram; namespace sstables { -class sstable_set_impl; +// This is an implementation of a set of sstables. The sstable_set allows: +// - selecting sstables from a given partition_range +// - selecting sstables with the same run identifiers +// - creating incremental_selectors, used to incrementally select sstables from the set using ring-position +// - many utilities of the std::set (inserting, erasing, checking membership, checking emptiness, iterating) +// - creating copies + +// The main use of the sstable_set is in the table class. The sstable_set that is stored there needs to be copied +// every time it is modified, to allow existing sstable readers to continue using the old version. For this reason +// the sstable_set is implemented in a way that allows fast copying. + +// This is achieved by associating each copy of an sstable_set with an sstable_set_version which contains all changes +// made to that copy. Each sstable_set_version has a reference to the sstable_set_version associated with the sstable_set +// that was copied. We say that the copied version is a parent, and that the copy version is a child, or that it is "based" +// on the copied version. +// This allows easy checking if an sstable is an element of an sstable_set copy - the answer is yes if the sstable was added +// in this copy or if it wasn't erased in it but it was an element of the parent version. +// It's worth adding that this solution makes a copied sstable_set immutable. To support modifying a copied sstable_set anyway, +// any modification applied to it replaces its associated sstable_set_version with a new one, based on the immutable version. +// The new version hasn't been copied, co it can be modified. + +// With the ability to check whether an sstable is an element of an sstable_set, all the methods that select sstables +// from the set can follow the same rule: get results that include all sstables from any copy, and filter out those sstables +// that aren't elements of current copy. These results are received from the class called sstable_set_data structure. + +// This solution requires a special way of finding out when an sstable should be removed from sstable_set_data. We achieve this +// by counting, for each sstable, in how many different versions has it been added. If that number is zero - the sstable should +// be completely removed. This number is decreased when deleting versions, when erasing in the same version it was added, and +// in one other case, as a result of propagating a change from a versions children to their parent, explained further in following +// paragraphs. + +// With this approach, the length of the longest chain of sstable_set_versions that are based on one another should be as +// short as possible. To achieve that, we are merging pairs of versions. If a version is not referenced by any sstable_sets or +// sstable_lists, we know that it won't be modified or read from, so we can merge its changes into versions that are based on it. +// We don't want to copy these changes though, so we wait with merging until there is only one child version. + +// This waiting causes one more problem: an sstable may have been added in a version that has no reference from an sstable_set or +// an sstable_list, and it may have been erased in all its child versions. In such scenario, the sstable can't be selected from +// any version, so it should be removed from the set completely. To handle such situations, if a change has been made to an +// sstable in all children of some version that has no referenece from an sstable_set or sstable_list, the change is removed +// in all children and added to the parent instead. +// This solution guarantees that when a version is ready for merging, the version it is being merged into contains no changes, +// because if it had any, they would be propagated to this version instead. As a result, merging versions is simply reassigning +// sets of changes. The number of times an sstable change gets propagated into a parent version is limited by the number of ancestors +// of a version. + class incremental_selector_impl; +class sstable_set_impl; // Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID. // SStables in that same run will not overlap with one another. class sstable_run { - sstable_list _all; + std::unordered_set _all; public: + sstable_run() = default; + sstable_run(std::unordered_set all) : _all(std::move(all)) {} void insert(shared_sstable sst); void erase(shared_sstable sst); // Data size of the whole run, meaning it's a sum of the data size of all its fragments. uint64_t data_size() const; - const sstable_list& all() const { return _all; } + const std::unordered_set& all() const { return _all; } }; +// The very base class of an sstable_set. Stores all sstables that were added in +// any version of the set. Its methods return supersets of values returned by +// any single version. +struct sstable_set_data { + std::unique_ptr impl; + std::unordered_map> all_runs; + // For each sstable, stores in how many different versions has it been inserted + std::map sstables_and_times_added; + + sstable_set_data(std::unique_ptr impl); + void insert(shared_sstable sst); + void erase(shared_sstable sst); + std::vector select(const dht::partition_range& range) const; + std::unordered_set select_by_run_id(utils::UUID run_id) const; + void remove(shared_sstable sst); +}; + +class sstable_set_version; + +// Manages a pointer to an sstable_set_version - the when sstable_set_version gets removed when all +// sstable_set_version_references are removed, or when there is only one sstable_set_version_reference +// and that reference is owned by another sstable_set_version. +// In the second case the data from the managed version is merged into the other version before removal. +class sstable_set_version_reference { + mutable sstable_set_version* _p = nullptr; + + explicit sstable_set_version_reference(sstable_set_version* p); +public: + ~sstable_set_version_reference(); + sstable_set_version_reference() = default; + sstable_set_version_reference(const sstable_set_version_reference& ref); + sstable_set_version_reference(sstable_set_version_reference&&) noexcept; + sstable_set_version_reference& operator=(const sstable_set_version_reference& x); + sstable_set_version_reference& operator=(sstable_set_version_reference&&) noexcept; + sstable_set_version& operator*() const noexcept { return *_p; } + sstable_set_version* operator->() const noexcept { return _p; } + sstable_set_version* get() const noexcept { return _p; } + explicit operator bool() const noexcept { return bool(_p); } + friend class sstable_set_version; +}; + +// The data storage for an sstable_set. Can be used like a std::set (although with slightly +// costlier operations). +class sstable_list { + sstable_set_version_reference _version; +public: + sstable_list(std::unique_ptr impl, schema_ptr s); + sstable_list(const sstable_list& sstl); + sstable_list(sstable_list&& sstl) noexcept; + sstable_list& operator=(const sstable_list& sstl); + sstable_list& operator=(sstable_list&& sstl) noexcept; +public: + class const_iterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = const shared_sstable; + using difference_type = std::ptrdiff_t; + using pointer = const shared_sstable*; + using reference = const shared_sstable&; + private: + std::map::const_iterator _it; + const sstable_set_version_reference* _ver; + void advance(); + public: + const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver); + const_iterator& operator++(); + const_iterator operator++(int); + const shared_sstable& operator*() const; + bool operator==(const const_iterator& it) const; + }; + using iterator = const_iterator; + const_iterator begin() const; + const_iterator end() const; + + size_t size() const; + void insert(shared_sstable sst); + void erase(shared_sstable sst); + bool contains(shared_sstable sst) const; + bool empty() const; + const sstable_set_version& version() const; +}; + +// A set of sstables associated with a table. class sstable_set : public enable_lw_shared_from_this { - std::unique_ptr _impl; - schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list // that has a reference somewhere lw_shared_ptr _all; - std::unordered_map _all_runs; public: - ~sstable_set(); - sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all); + sstable_set(std::unique_ptr impl, schema_ptr s); sstable_set(const sstable_set&); sstable_set(sstable_set&&) noexcept; sstable_set& operator=(const sstable_set&); @@ -66,7 +194,7 @@ public: std::vector select(const dht::partition_range& range) const; // Return all runs which contain any of the input sstables. std::vector select_sstable_runs(const std::vector& sstables) const; - lw_shared_ptr all() const { return _all; } + lw_shared_ptr all() const; void insert(shared_sstable sst); void erase(shared_sstable sst); @@ -79,9 +207,10 @@ public: mutable std::optional> _current_range_view; mutable std::vector _current_sstables; mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min(); + lw_shared_ptr _sstl; public: ~incremental_selector(); - incremental_selector(std::unique_ptr impl, const schema& s); + incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl); incremental_selector(incremental_selector&&) noexcept; struct selection { @@ -173,7 +302,59 @@ flat_mutation_reader make_restricted_range_sstable_reader( mutation_reader::forwarding, read_monitor_generator& rmg = default_read_monitor_generator()); -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); +class sstable_set_version { + // shared by all sstable_set_versions that were based on the same original set + lw_shared_ptr _base_set; + schema_ptr _schema; + sstable_set_version_reference _prev; + mutable std::unordered_set _next; + std::unordered_set _added; + std::unordered_set _erased; + // is equal to the number of sstable_set_versions based on this version increased by one if there is + // an sstable_list that references this version + unsigned _reference_count = 0; +public: + ~sstable_set_version(); + sstable_set_version(std::unique_ptr impl, schema_ptr schema); + explicit sstable_set_version(sstable_set_version* ver); +private: + void propagate_inserted_sstable(const shared_sstable& sst) noexcept; + void propagate_erased_sstable(const shared_sstable& sst) noexcept; + void propagate_changes_from_next_versions() noexcept; +public: + const sstable_set_version* get_previous_version() const; + bool can_merge_with_next() const noexcept; + void merge_with_next() noexcept; + bool can_delete() const noexcept; + void add_reference() noexcept; + void remove_reference() noexcept; + schema_ptr get_schema() const; + std::vector select(const dht::partition_range& range) const; + // Return all runs which contain any of the input sstables. + std::vector select_sstable_runs(const std::vector& sstables) const; + const std::map& all() const; + sstable_set_version_reference insert(shared_sstable sst); + sstable_set_version_reference erase(shared_sstable sst); + bool contains(shared_sstable sst) const; + size_t size() const; + std::unique_ptr make_incremental_selector() const; + flat_mutation_reader create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const; +public: + sstable_set_version_reference get_reference_to_this(); + sstable_set_version_reference get_reference_to_new_copy(); +}; + +sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 28e3cf77fd..785e46cc77 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -41,6 +41,7 @@ public: virtual std::vector select(const dht::partition_range& range) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; + virtual bool empty() const = 0; virtual std::unique_ptr make_incremental_selector() const = 0; virtual flat_mutation_reader create_single_key_sstable_reader( @@ -65,6 +66,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -103,6 +105,7 @@ public: virtual std::vector select(const dht::partition_range& range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -124,6 +127,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; std::unique_ptr make_min_position_reader_queue( diff --git a/table.cc b/table.cc index 416881a65e..ac9a3df716 100644 --- a/table.cc +++ b/table.cc @@ -718,21 +718,17 @@ void table::rebuild_statistics() { future> table::build_new_sstable_list(const std::vector& new_sstables, const std::vector& old_sstables) { - auto current_sstables = _sstables; - auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema); - std::unordered_set s(old_sstables.begin(), old_sstables.end()); - - // this might seem dangerous, but "move" here just avoids constness, - // making the two ranges compatible when compiling with boost 1.55. - // Noone is actually moving anything... - for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) { - if (!s.contains(tab)) { - new_sstable_list.insert(tab); - } + auto new_sstable_set = *_sstables; + for (auto& tab : new_sstables) { + new_sstable_set.insert(tab); co_await make_ready_future<>(); // yield if needed. } - co_return make_lw_shared(std::move(new_sstable_list)); + for (auto& tab : old_sstables) { + new_sstable_set.erase(tab); + co_await make_ready_future<>(); // yield if needed. + } + co_return make_lw_shared(std::move(new_sstable_set)); } // Note: must run in a seastar thread diff --git a/test/boost/sstable_set_test.cc b/test/boost/sstable_set_test.cc new file mode 100644 index 0000000000..887bb9bb94 --- /dev/null +++ b/test/boost/sstable_set_test.cc @@ -0,0 +1,638 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include +#include +#include "sstables/compaction.hh" +#include +#include +#include "test/boost/sstable_test.hh" +#include +#include +#include "sstables/compaction_strategy_impl.hh" +#include "sstables/date_tiered_compaction_strategy.hh" +#include "sstables/time_window_compaction_strategy.hh" +#include "sstables/leveled_compaction_strategy.hh" +#include "sstables/sstable_set.hh" +#include "sstables/sstable_set_impl.hh" + +using namespace sstables; + +static const sstring some_keyspace("ks"); +static const sstring some_column_family("cf"); + +static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { + auto sst = env.make_sstable(schema, "", gen, la, big); + sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); + return sst; +} + +SEASTAR_TEST_CASE(simple_versioned_sstable_set_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto key_and_token_pair = token_generation_for_current_shard(8); + auto decorated_keys = boost::copy_range>( + key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { + auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); + auto pk = sstables::key::from_bytes(value).to_partition_key(*s); + return dht::decorate_key(*s, std::move(pk)); + })); + struct snapshot_and_selections { + std::optional ssts; + std::vector> selections; + std::unordered_map> runs; + std::unordered_set all; + snapshot_and_selections(sstable_set&& ssts, const std::vector>& selections, + const std::unordered_map>& runs, const std::unordered_set& all) + : ssts(std::move(ssts)), selections(selections), runs(runs), all(all) { } + }; + auto check = [&decorated_keys] (snapshot_and_selections& version) { + for (int j = 0; j < 8; j++) { + auto sel = version.ssts->select(dht::partition_range::make_singular(decorated_keys[j])); + BOOST_REQUIRE_EQUAL(sel.size(), version.selections[j].size()); + for (auto& sst : sel) { + BOOST_REQUIRE(version.selections[j].contains(sst)); + } + } + for (auto& [uuid, run] : version.runs) { + if (run.empty()) { + continue; + } + std::vector runs = version.ssts->select_sstable_runs({*run.begin()}); + // only one sstable -> only one run + BOOST_REQUIRE_EQUAL(runs[0].all().size(), run.size()); + for (auto& sst : runs[0].all()) { + BOOST_REQUIRE(run.contains(sst)); + } + } + BOOST_REQUIRE_EQUAL(version.ssts->all()->size(), version.all.size()); + for (auto& sst : *version.ssts->all()) { + BOOST_REQUIRE(version.all.contains(sst)); + } + }; + // check that selecting from older snapshots of an sstable_set gives correct results. + { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + std::optional set = cs.make_sstable_set(s); + std::vector> current_selections(8); + std::unordered_map> current_runs; + std::unordered_set current_all; + + std::vector versions; + versions.reserve(20); + int token = 0; + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); + set->insert(sst); + current_selections[token].insert(sst); + current_runs[sst->run_identifier()].insert(sst); + current_all.insert(sst); + ++token %= 8; + } + for (int j = 0; j < 5; j++) { + auto sst = *set->all()->begin(); + set->erase(sst); + for (auto& sel : current_selections) { + // actually erases only from one + sel.erase(sst); + } + current_runs[sst->run_identifier()].erase(sst); + current_all.erase(sst); + } + versions.emplace_back(std::move(*set), current_selections, current_runs, current_all); + set = versions.back().ssts; + } + + for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { + check(versions[i]); + // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots + versions[i].ssts.reset(); + } + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_list_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto key_and_token_pair = token_generation_for_current_shard(8); + auto decorated_keys = boost::copy_range>( + key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { + auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); + auto pk = sstables::key::from_bytes(value).to_partition_key(*s); + return dht::decorate_key(*s, std::move(pk)); + })); + + + struct list_and_sstables { + std::optional list; + std::unordered_set sstables_in_list; + + list_and_sstables(sstable_list&& sstl, std::unordered_set sstables_in_list) + : list(std::move(sstl)), sstables_in_list(sstables_in_list) { } + }; + auto check = [] (list_and_sstables& version) { + BOOST_REQUIRE_EQUAL(version.list->size(), version.sstables_in_list.size()); + for (auto& sst : *version.list) { + BOOST_REQUIRE(version.list->contains(sst)); + } + }; + + { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + std::optional set = cs.make_sstable_set(s); + std::optional l = *set->all(); + std::unordered_set sstables_in_list; + + + std::vector versions; + versions.reserve(20); + int token = 0; + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); + l->insert(sst); + sstables_in_list.insert(sst); + ++token %= 8; + } + for (int j = 0; j < 5; j++) { + auto sst = *l->begin(); + l->erase(sst); + sstables_in_list.erase(sst); + } + versions.emplace_back(std::move(*l), sstables_in_list); + l = versions.back().list; + } + + for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { + check(versions[i]); + // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots + versions[i].list.reset(); + } + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_version_merging_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(2); + std::optional set = cs.make_sstable_set(s); + std::optional list = *set->all(); + // set -> list + BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); + std::optional list2 = *set->all(); + // set -> list + // -> list2 + BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(&set->all()->version(), list2->version().get_previous_version()); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + set->insert(sst); + // set' -> list + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); + BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); + sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[1].first, key_and_token_pair[1].first, 1); + set->insert(sst); + // set' -> list + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); + BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); + std::optional list3 = list; + // set' -> list -> list3 + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(&list->version(), list3->version().get_previous_version()); + list.reset(); + // set' -> list3 + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list3->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); + set.reset(); + // set' -> list3 + // -> list2 + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); + std::optional list4 = list3; + std::optional list5 = list3; + // set' -> list3 -> list4 + // -> list5 + // -> list2 + BOOST_REQUIRE_EQUAL(&list3->version(), list4->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(&list3->version(), list5->version().get_previous_version()); + list3.reset(); + // set' -> list3' -> list4 + // -> list5 + // -> list2 + BOOST_REQUIRE_NE(list2->version().get_previous_version(), list4->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(list4->version().get_previous_version(), list5->version().get_previous_version()); + list4.reset(); + // set' -> list5 + // -> list2 + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list5->version().get_previous_version()); + list2.reset(); + // list5 + BOOST_REQUIRE_EQUAL(nullptr, list5->version().get_previous_version()); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_erased_in_last_descendant_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set.reset(); + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + set5->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_ancestor_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + // set -> set2 + set2->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set4->erase(sst); + set5->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set3.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_descendant_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set.reset(); + set2->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set3.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set.reset(); + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set4.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set5.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + +class simple_sstable_set { + std::unique_ptr _impl; + schema_ptr _schema; + // used to support column_family::get_sstable(), which wants to return an sstable_list + // that has a reference somewhere + lw_shared_ptr> _all; + std::unordered_map _all_runs; +public: + ~simple_sstable_set() = default; + + simple_sstable_set(std::unique_ptr impl, schema_ptr schema) + : _impl(std::move(impl)) + , _schema(std::move(schema)) + , _all(make_lw_shared>()) { + } + + simple_sstable_set(const simple_sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared>(*x._all)) + , _all_runs(x._all_runs) { + } + + simple_sstable_set(simple_sstable_set&& x) noexcept = default; + + simple_sstable_set& operator=(const simple_sstable_set& x) { + if (this != &x) { + auto tmp = simple_sstable_set(x); + *this = std::move(tmp); + } + return *this; + } + + simple_sstable_set& operator=(simple_sstable_set&&) noexcept = default; + + std::vector select(const dht::partition_range& range) const { + return _impl->select(range); + } + + // Return all runs which contain any of the input sstables. + std::vector select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); + } + + lw_shared_ptr> all() const { return _all; } + + void insert(shared_sstable sst) { + _impl->insert(sst); + _all->insert(sst); + _all_runs[sst->run_identifier()].insert(sst); + } + + void erase(shared_sstable sst) { + _impl->erase(sst); + _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); + } +}; + +SEASTAR_TEST_CASE(sstable_set_random_walk_test) { + return test_env::do_with([] (test_env& env) { + auto rand = std::default_random_engine(); + auto op_gen = std::uniform_int_distribution(0, 7); + auto nr_dist = std::geometric_distribution(0.7); + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto leveled_cs = leveled_compaction_strategy(s->compaction_strategy_options()); + std::vector sstable_sets; + std::vector simple_sstable_sets; + sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); + simple_sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); + auto key_and_token_pair = token_generation_for_current_shard(1000); + std::vector sstables(1000); + for (int i = 0; i < 1000; i++) { + sstables[i] = sstable_for_overlapping_test(env, s, i, key_and_token_pair[i].first, key_and_token_pair[i].first, i); + } + for (auto i = 0; i != 100000; i++) { + auto op = op_gen(rand); + auto u = std::uniform_int_distribution(0, sstable_sets.size() - 1); + auto idx = u(rand); + switch (op) { + case 0: { + // delete + if (sstable_sets.size() > 1) { + sstable_sets.erase(sstable_sets.begin() + idx); + simple_sstable_sets.erase(simple_sstable_sets.begin() + idx); + break; + } + // if we can't remove the version, let's create one + [[fallthrough]]; + } + case 1: { + // copy + if (sstable_sets.size() < 100) { + sstable_sets.emplace_back(sstable_sets[idx]); + simple_sstable_sets.emplace_back(simple_sstable_sets[idx]); + for (auto& sst : *simple_sstable_sets.back().all()) { + BOOST_REQUIRE(sstable_sets.back().all()->contains(sst)); + } + } + break; + } + default: + // modify + auto sst_u = std::uniform_int_distribution(0, 999); + auto sst_idx = sst_u(rand); + if (simple_sstable_sets[idx].all()->contains(sstables[sst_idx])) { + sstable_sets[idx].erase(sstables[sst_idx]); + simple_sstable_sets[idx].erase(sstables[sst_idx]); + BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); + BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); + } else { + sstable_sets[idx].insert(sstables[sst_idx]); + simple_sstable_sets[idx].insert(sstables[sst_idx]); + BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); + BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); + } + } + for (int j = 0; j < sstable_sets.size(); j++) { + BOOST_REQUIRE_EQUAL(sstable_sets[j].all()->size(), simple_sstable_sets[j].all()->size()); + } + } + return make_ready_future<>(); + }); +} diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index 30790ab29c..efc0eba8df 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -37,40 +37,6 @@ #include #include -constexpr auto la = sstables::sstable::version_types::la; -constexpr auto big = sstables::sstable::format_types::big; - -class column_family_test { - lw_shared_ptr _cf; -public: - column_family_test(lw_shared_ptr cf) : _cf(cf) {} - - void add_sstable(sstables::shared_sstable sstable) { - _cf->_sstables->insert(std::move(sstable)); - } - - // NOTE: must run in a thread - void rebuild_sstable_list(const std::vector& new_sstables, - const std::vector& sstables_to_remove) { - _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); - } - - static void update_sstables_known_generation(column_family& cf, unsigned generation) { - cf.update_sstables_known_generation(generation); - } - - static uint64_t calculate_generation_for_new_table(column_family& cf) { - return cf.calculate_generation_for_new_table(); - } - - static int64_t calculate_shard_from_sstable_generation(int64_t generation) { - return column_family::calculate_shard_from_sstable_generation(generation); - } - - future try_flush_memtable_to_sstable(lw_shared_ptr mt) { - return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); - } -}; namespace sstables { diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 347ddb543a..e2a3a1a26a 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -31,6 +31,41 @@ #include "test/lib/test_services.hh" #include "test/lib/log.hh" +constexpr auto la = sstables::sstable::version_types::la; +constexpr auto big = sstables::sstable::format_types::big; + +class column_family_test { + lw_shared_ptr _cf; +public: + column_family_test(lw_shared_ptr cf) : _cf(cf) {} + + void add_sstable(sstables::shared_sstable sstable) { + _cf->_sstables->insert(std::move(sstable)); + } + + // NOTE: must run in a thread + void rebuild_sstable_list(const std::vector& new_sstables, + const std::vector& sstables_to_remove) { + _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); + } + + static void update_sstables_known_generation(column_family& cf, unsigned generation) { + cf.update_sstables_known_generation(generation); + } + + static uint64_t calculate_generation_for_new_table(column_family& cf) { + return cf.calculate_generation_for_new_table(); + } + + static int64_t calculate_shard_from_sstable_generation(int64_t generation) { + return column_family::calculate_shard_from_sstable_generation(generation); + } + + future try_flush_memtable_to_sstable(lw_shared_ptr mt) { + return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); + } +}; + namespace sstables { class test_env_sstables_manager : public sstables_manager { diff --git a/test/perf/perf_sstable_set.cc b/test/perf/perf_sstable_set.cc new file mode 100644 index 0000000000..02cc128a9e --- /dev/null +++ b/test/perf/perf_sstable_set.cc @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "database.hh" +#include "test/lib/simple_schema.hh" +#include "test/perf/perf.hh" +#include +#include +#include "test/lib/sstable_test_env.hh" +#include + +static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { + auto sst = env.make_sstable(schema, "", gen, la, big); + sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); + return sst; +} + +int main(int argc, char* argv[]) { + app_template app; + return app.run(argc, argv, [&app] { + return test_env::do_with_async([] (test_env& env) { + using namespace std::chrono; + using namespace std::chrono_literals; + auto start = high_resolution_clock::now(); + simple_schema ss; + auto s = ss.schema(); + auto cm = make_lw_shared(); + column_family::config cfg; + auto cl_stats = make_lw_shared(); + auto tracker = make_lw_shared(); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker); + cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled); + + constexpr int ssts_in_level_0 = 10; + std::function idx_to_level = [&] (int i) { + if (i < ssts_in_level_0) { + return 0; + } + return 1 + idx_to_level((i - ssts_in_level_0) / 10 + ssts_in_level_0 - 1); + }; + auto level_to_size = [] (int level) { + if (level == 0) { + return ssts_in_level_0; + } + return int(pow(10, level)); + }; + + auto kt_pair = token_generation_for_current_shard(1120); + auto min_max_keys = [&kt_pair, &level_to_size] (auto level, auto pos_in_level) -> std::pair { + auto last_key_idx = kt_pair.size() - 1; + if (level == 0) { + return { kt_pair[0].first, kt_pair[last_key_idx].first }; + } + auto total_ranges = kt_pair.size(); + auto level_size_in_ssts = level_to_size(level); + unsigned ranges_per_sst = std::max(1U, unsigned(floor(float(total_ranges) / level_size_in_ssts))); + sstring min_key = kt_pair.at(pos_in_level).first; + sstring max_key = kt_pair.at(std::min(pos_in_level + ranges_per_sst - 1, unsigned(last_key_idx))).first; + return {min_key, max_key}; + }; + + std::vector inputs[3], outputs[3]; + + std::array pos_in_levels{0}; + pos_in_levels.fill(0); + auto start2 = high_resolution_clock::now(); + for (auto i = 0; i < 1120; i++) { + auto level = idx_to_level(i); + auto [min, max] = min_max_keys(level, pos_in_levels[level]++); + auto sst = sstable_for_overlapping_test(env, s, i, min, max, uint32_t(level)); + column_family_test(cf).add_sstable(sst); + if (level >= 1 && pos_in_levels[level] < 10) { + inputs[level-1].push_back(sst); + } + seastar::thread::maybe_yield(); + } + + for (auto i = 0; i < 30; i++) { + auto [min, max] = min_max_keys(1 + i / 10, i % 10); + auto sst = sstable_for_overlapping_test(env, s, i, min, max, 1 + i / 10); + outputs[i / 10].push_back(sst); + seastar::thread::maybe_yield(); + } + for (auto i = 0; i < 3; i++) { + auto t1 = high_resolution_clock::now(); + column_family_test(cf).rebuild_sstable_list(outputs[i], inputs[i]); + auto t2 = high_resolution_clock::now(); + std::cout << "Replacing 10 L" << i + 1 <<" sstables took " + << std::chrono::duration_cast(t2 - t1).count() << "ms to complete\n"; + } + }); + }); +}