From e1b494633b0f5a8575d5d0a2e9a07fd00894359c Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Fri, 25 Sep 2020 16:03:25 +0200 Subject: [PATCH 1/6] sstables: make sstable_set constructor less error-prone Adding an non-empty set of sstables as the set of all sstables in an sstable_set could cause inconsistencies with the values returned by select_sstable_runs because the _all_runs map would still be initialized empty. For similar reasons, the provided sstable_set_impl should also be empty. Dispel doubts by removing the unordered_set from the constructor, and adding a check of emptiness of the sstable_set_impl. Signed-off-by: Wojciech Mitros --- db/view/view_update_generator.cc | 2 +- service/storage_service.cc | 3 +-- sstables/sstable_set.cc | 26 ++++++++++++++++++++------ sstables/sstable_set.hh | 4 ++-- sstables/sstable_set_impl.hh | 4 ++++ 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 703d8ce454..150b593965 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -67,7 +67,7 @@ future<> view_update_generator::start() { // Exploit the fact that sstables in the staging directory // are usually non-overlapping and use a partitioned set for // the read. - auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, make_lw_shared(sstable_list{}), false)); + auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); for (auto& sst : sstables) { ssts->insert(sst); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 115a8d4965..317dc48b31 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2872,8 +2872,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, size_t nr_sst_current = 0; while (!sstables.empty()) { auto ops_uuid = utils::make_random_uuid(); - auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, - make_lw_shared(sstable_list{}), false)); + auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); size_t batch_sst_nr = 16; std::vector sst_names; std::vector sst_processed; diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index e26525c12e..0e8d9fd1cf 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -63,10 +63,13 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { return os; } -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) : _impl(std::move(impl)) , _schema(std::move(s)) - , _all(std::move(all)) { + , _all(make_lw_shared(sstable_list())) { + if (!_impl->empty()) { + throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl"); + } } sstable_set::sstable_set(const sstable_set& x) @@ -171,6 +174,10 @@ void bag_sstable_set::erase(shared_sstable sst) { } } +bool bag_sstable_set::empty() const { + return _sstables.empty(); +} + class bag_sstable_set::incremental_selector : public incremental_selector_impl { const std::vector& _sstables; public: @@ -299,6 +306,10 @@ void partitioned_sstable_set::erase(shared_sstable sst) { } } +bool partitioned_sstable_set::empty() const { + return _unleveled_sstables.empty() && _leveled_sstables.empty(); +} + class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; const std::vector& _unleveled_sstables; @@ -396,6 +407,10 @@ void time_series_sstable_set::erase(shared_sstable sst) { } } +bool time_series_sstable_set::empty() const { + return _sstables->empty(); +} + std::unique_ptr time_series_sstable_set::make_incremental_selector() const { struct selector : public incremental_selector_impl { const time_series_sstable_set& _set; @@ -535,16 +550,15 @@ std::unique_ptr time_window_compaction_strategy::make_sstable_ return std::make_unique(std::move(schema)); } -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); +sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) { + return sstable_set(std::make_unique(schema, use_level_metadata), schema); } sstable_set compaction_strategy::make_sstable_set(schema_ptr schema) const { return sstable_set( _compaction_strategy_impl->make_sstable_set(schema), - schema, - make_lw_shared()); + schema); } using sstable_reader_factory_type = std::function; diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 8fbcf4c13a..a8d7975b00 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -58,7 +58,7 @@ class sstable_set : public enable_lw_shared_from_this { std::unordered_map _all_runs; public: ~sstable_set(); - sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all); + sstable_set(std::unique_ptr impl, schema_ptr s); sstable_set(const sstable_set&); sstable_set(sstable_set&&) noexcept; sstable_set& operator=(const sstable_set&); @@ -173,7 +173,7 @@ flat_mutation_reader make_restricted_range_sstable_reader( mutation_reader::forwarding, read_monitor_generator& rmg = default_read_monitor_generator()); -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); +sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 28e3cf77fd..785e46cc77 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -41,6 +41,7 @@ public: virtual std::vector select(const dht::partition_range& range) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; + virtual bool empty() const = 0; virtual std::unique_ptr make_incremental_selector() const = 0; virtual flat_mutation_reader create_single_key_sstable_reader( @@ -65,6 +66,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -103,6 +105,7 @@ public: virtual std::vector select(const dht::partition_range& range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -124,6 +127,7 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; + virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; std::unique_ptr make_min_position_reader_queue( From 48153a1e2c2052247b15676a7097cde141bfb4e9 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Mon, 28 Sep 2020 14:10:29 +0200 Subject: [PATCH 2/6] sstables: remove potential ub If the range expression in a range based for loop returns a temporary, its lifetime is extended until the end of the loop. The same can't be said about temporaries created within the range expression. In our case, *t->get_sstables_including_compacted_undeleted() returns a reference to a const sstable_list, but the t->get_sstables_including_compacted_undeleted() is a temporary lw_shared_ptr, so its lifetime may not be prolonged until the end of the loop, and it may be the sole owner of the referenced sstable_list, so the referenced sstable_list may be already deleted inside the loop too. Fix by creating a local copy of the lw_shared_ptr, and get reference from it in the loop. Fixes #7605 Signed-off-by: Wojciech Mitros --- api/storage_service.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index cb9232f9d4..5b049f6efe 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -986,8 +986,8 @@ void set_storage_service(http_context& ctx, routes& r) { ss::table_sstables tst; tst.keyspace = schema->ks_name(); tst.table = schema->cf_name(); - - for (auto sstable : *t->get_sstables_including_compacted_undeleted()) { + auto sstables = t->get_sstables_including_compacted_undeleted(); + for (auto sstable : *sstables) { auto ts = db_clock::to_time_t(sstable->data_file_write_time()); ::tm t; ::gmtime_r(&ts, &t); From aa0cd940d67e0f70cbbd2704c54fac42f32eb1b1 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 4 Nov 2020 14:34:43 +0100 Subject: [PATCH 3/6] sstables: replace the sstable_set with a versioned structure Currently, the sstable_set in a table is copied before every change to allow accessing the unchanged version by existing sstable readers. This patch changes the sstable_set to a structure that allows copying without actually copying all the sstables in the set, while providing the same methods(and some extra) without majorly decreasing their speed. This is achieved by associating all copies with sstable_set versions which hold the changes that were performed in them, and references to the versions that were copied, a.k.a. their parents. The set represented by a version is the result of combining all changes of its ancestors. This causes most methods of the version to have a time complexity dependent on the number of its ancestors. To limit this number, versions that represent copies that have already been deleted are merged with its descendants. The strategy used for deciding when and with which of its children should a version be merged heavily depends on the use case of sstable_sets: there is a main copy of the set in a table class which undergoes many insertions and deletions, and there are copies of it in compaction or mutation readers which are further copied or edited few or zero times. It's worth to mention, that when a copy is made, the copied set should not be modified anymore, because it would also modify the results given by the copy. In order to still allow modifying the copied set, if a change is to be performed on it, the version assiociated with this set is replaced with a new version depending on the previous one. As we can see, in our use case there is a main chain of versions(with changes from the table), and smaller branches of versions that start from a version from this chain, but are deleted soon after. In such case we can merge a version when it has exactly one descendant, as this limits the number of concurrent ancestors of a version to the number of copies of its ancestors are concurrently used. During each such merge, the parent version is removed and the child version is modified so that all operations on it give the same results. In order to preserve the same interface, the sstable_set still contains a lw_shared_ptr, but sstable_list (previously an alias for unordered_set) is now a new structure. Each sstable_set contains a sstable_list but not every sstable_list has to be contained by a sstable_set, and we also want to allow fast copying of sstable_lists, so the reference to the sstable_set_version is kept by the sstable_lists and the sstable_set can access the sstable_set_version it's associated with through its sstable_list. Accessing sstables that are elements of a certain sstable_set copy(so the select, select_sstable_runs and sstable_list's iterator) get results from containers that hold all sstables from all versions(which are stored in a single, shared "versioned_sstable_set_data" structure), and then filter out these sstables that aren't present in the version in question. This version of the sstable_set allows adding and erasing the same sstable repeatedly. Inserting and erasing from the set modifies the containers in a version only when it has an actual effect: if an sstable has been added in the parent version, and hasn't been erased in the child version, adding it again will have no effect. This ensures that when merging versions, the versions have disjoint sets of added, and erased sstables (an sstable can still be added in one and erased in the second). It's worth noting hat if an sstable has been added in one of the merged sets and erased in the second, the version that remains after merging doesn't need to have any info about the sstable's inclusion in the set - it can be inferred from the changes in previous versions (and it doesn't matter if the sstable has been erased before or after being added). To release pointers to sstables as soon as possible (i.e. when all references to versions that contain them die), if an sstable is added/erased in all child versions that are based on a version which has no external references, this change gets removed from these versions and added to the parent version. If an sstable's insertion gets overwritten as a result, we might be able to remove the sstable completely from the set. We know how many times this needs to happen by counting, for each sstable, in how many different verisions has it been added. When a change that adds an sstable gets merged with a change that removes it, or when a such a change simply gets deleted alongside its associated version, this count is reduced, and when an sstable gets added to a version that doesn't already contain it, this count is increased. The methods that modify the sets contents give strong exception guarantee by trying to insert new sstables to its containers, and erasing them in the case of an caught exception. Fixes #2622 Signed-off-by: Wojciech Mitros --- configure.py | 1 + sstables/shared_sstable.hh | 1 - sstables/sstable_set.cc | 568 +++++++++++++++++++++++++---- sstables/sstable_set.hh | 199 +++++++++- test/boost/sstable_set_test.cc | 638 +++++++++++++++++++++++++++++++++ 5 files changed, 1328 insertions(+), 79 deletions(-) create mode 100644 test/boost/sstable_set_test.cc diff --git a/configure.py b/configure.py index 029b837b1c..e9b8fd73be 100755 --- a/configure.py +++ b/configure.py @@ -389,6 +389,7 @@ scylla_tests = set([ 'test/boost/sstable_conforms_to_mutation_source_test', 'test/boost/sstable_resharding_test', 'test/boost/sstable_directory_test', + 'test/boost/sstable_set_test', 'test/boost/sstable_test', 'test/boost/sstable_move_test', 'test/boost/storage_proxy_test', diff --git a/sstables/shared_sstable.hh b/sstables/shared_sstable.hh index 736fa06244..7071db9f62 100644 --- a/sstables/shared_sstable.hh +++ b/sstables/shared_sstable.hh @@ -46,7 +46,6 @@ struct lw_shared_ptr_deleter { namespace sstables { using shared_sstable = seastar::lw_shared_ptr; -using sstable_list = std::unordered_set; } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 0e8d9fd1cf..6d305b64be 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -63,96 +63,526 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { return os; } -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) - : _impl(std::move(impl)) - , _schema(std::move(s)) - , _all(make_lw_shared(sstable_list())) { - if (!_impl->empty()) { - throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl"); +sstable_set_data::sstable_set_data(std::unique_ptr impl) + : impl(std::move(impl)) { +} + +void sstable_set_data::insert(shared_sstable sst) { + auto it = sstables_and_times_added.find(sst); + if (it != sstables_and_times_added.end()) { + // the sstable has already been added in some version + it->second++; + return; } -} - -sstable_set::sstable_set(const sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared(*x._all)) - , _all_runs(x._all_runs) { -} - -sstable_set::sstable_set(sstable_set&&) noexcept = default; - -sstable_set& -sstable_set::operator=(const sstable_set& x) { - if (this != &x) { - auto tmp = sstable_set(x); - *this = std::move(tmp); - } - return *this; -} - -sstable_set& -sstable_set::operator=(sstable_set&&) noexcept = default; - -std::vector -sstable_set::select(const dht::partition_range& range) const { - return _impl->select(range); -} - -std::vector -sstable_set::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); -} - -void -sstable_set::insert(shared_sstable sst) { - _impl->insert(sst); try { - _all->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _all->erase(sst); - throw; - } + impl->insert(sst); + all_runs[sst->run_identifier()].insert(sst); + sstables_and_times_added.emplace(sst, 1); } catch (...) { - _impl->erase(sst); + impl->erase(sst); + auto runs_it = all_runs.find(sst->run_identifier()); + if (runs_it != all_runs.end()) { + runs_it->second.erase(sst); + if (runs_it->second.empty()) { + all_runs.erase(runs_it); + } + } throw; } } -void -sstable_set::erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); +std::vector sstable_set_data::select(const dht::partition_range& range) const { + return impl->select(range); } -sstable_set::~sstable_set() = default; +std::unordered_set sstable_set_data::select_by_run_id(utils::UUID run_id) const { + return all_runs.at(run_id); +} -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) - : _impl(std::move(impl)) - , _cmp(s) { +// Called when a version that was adding an sstable was removed or when the sstable was later erased in that version. +void sstable_set_data::remove(shared_sstable sst) { + if (--sstables_and_times_added.at(sst) == 0) { + impl->erase(sst); + all_runs[sst->run_identifier()].erase(sst); + sstables_and_times_added.erase(sst); + } +} + +sstable_set_version_reference::~sstable_set_version_reference() { + if (_p) { + _p->remove_reference(); + if (_p->can_merge_with_next()) { + // merging will destroy the last reference to the version and the version will be deleted as a result + _p->merge_with_next(); + } else if (_p->can_delete()) { + delete _p; + _p = nullptr; + } + } +} + +sstable_set_version_reference::sstable_set_version_reference(sstable_set_version* p) : _p(p) { + if (_p) { + _p->add_reference(); + } +} + +sstable_set_version_reference::sstable_set_version_reference(const sstable_set_version_reference& ref) : _p(ref._p) { + if (_p) { + _p->add_reference(); + } +} + +sstable_set_version_reference::sstable_set_version_reference(sstable_set_version_reference&& ref) noexcept : _p(ref._p) { + ref._p = nullptr; +} + +sstable_set_version_reference& sstable_set_version_reference::operator=(const sstable_set_version_reference& ref) { + *this = sstable_set_version_reference(ref); + return *this; +} + +sstable_set_version_reference& sstable_set_version_reference::operator=(sstable_set_version_reference&& ref) noexcept { + if (this != &ref) { + // Destroying this reference may invalide other references, so we're taking over the pointer managed by + // the moved reference, and reassigning it after calling the destructor + auto ptr = ref._p; + ref._p = nullptr; + this->~sstable_set_version_reference(); + _p = ptr; + } + return *this; +} + +static sstable_set_version_reference make_sstable_set_version(std::unique_ptr impl, schema_ptr s) { + sstable_set_version* new_version = new sstable_set_version(std::move(impl), std::move(s)); + return new_version->get_reference_to_this(); +} + +sstable_list::sstable_list(std::unique_ptr impl, schema_ptr s) + : _version(make_sstable_set_version(std::move(impl), std::move(s))) { +} + +sstable_list::sstable_list(const sstable_list& sstl) + : _version(sstl._version->get_reference_to_new_copy()) { + // copying an sstable_list creates a new sstable_set_version +} + +sstable_list::sstable_list(sstable_list&& sstl) noexcept = default; + +sstable_list& sstable_list::operator=(const sstable_list& sstl) { + if (this != &sstl) { + *this = sstable_list(sstl); + } + return *this; +} + +sstable_list& sstable_list::operator=(sstable_list&& sstl) noexcept { + if (this != &sstl) { + this->~sstable_list(); + _version = std::move(sstl._version); + } + return *this; +} + +// Moves the iterator to the next sstable which is contained by the associated sstable_set, or to the end +// If the iterator already references a satisfying sstable, no changes are made. +void sstable_list::const_iterator::advance() { + while (_it != (*_ver)->all().end() && !(*_ver)->contains(_it->first)) { + _it++; + } +} + +sstable_list::const_iterator::const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver) + : _it(std::move(it)) + , _ver(ver) { + advance(); +} + +sstable_list::const_iterator& sstable_list::const_iterator::operator++() { + assert(_it != (*_ver)->all().end()); + _it++; + advance(); + return *this; +} + +sstable_list::const_iterator sstable_list::const_iterator::operator++(int) { + const_iterator it = *this; + operator++(); + return it; +} + +const shared_sstable& sstable_list::const_iterator::operator*() const { + return _it->first; +} + +bool sstable_list::const_iterator::operator==(const const_iterator& it) const { + assert(_ver == it._ver); + return _it == it._it; +} + +sstable_list::const_iterator sstable_list::begin() const { + return const_iterator(_version->all().begin(), &_version); +} + +sstable_list::const_iterator sstable_list::end() const { + return const_iterator(_version->all().end(), &_version); +} + +size_t sstable_list::size() const { + return _version->size(); +} + +void sstable_list::insert(shared_sstable sst) { + _version = _version->insert(sst); +} + +void sstable_list::erase(shared_sstable sst) { + _version = _version->erase(sst); +} + +bool sstable_list::contains(shared_sstable sst) const { + return _version->contains(sst); +} + +bool sstable_list::empty() const { + return _version->size() == 0; +} + +const sstable_set_version& sstable_list::version() const { + return *_version; +} + +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) { + if (!impl->empty()) { + throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl"); + } + _all = make_lw_shared(std::move(impl), std::move(s)); +} + +sstable_set::sstable_set(const sstable_set& x) + : _all(make_lw_shared(*x._all)) { +} + +sstable_set::sstable_set(sstable_set&& x) noexcept = default; + +sstable_set& sstable_set::operator=(const sstable_set& ssts) { + *this = sstable_set(ssts); + return *this; +} + +sstable_set& sstable_set::operator=(sstable_set&& ssts) noexcept = default; + + +std::vector sstable_set::select(const dht::partition_range& range) const { + return _all->version().select(range); +} + +// Return all runs which contain any of the input sstables. +std::vector sstable_set::select_sstable_runs(const std::vector& sstables) const { + return _all->version().select_sstable_runs(sstables); +} + +lw_shared_ptr sstable_set::all() const { + return _all; +} + +void sstable_set::insert(shared_sstable sst) { + _all->insert(sst); +} + +void sstable_set::erase(shared_sstable sst) { + _all->erase(sst); } sstable_set::incremental_selector::~incremental_selector() = default; sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; -sstable_set::incremental_selector::selection -sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl) + : _impl(std::move(impl)) + , _cmp(s) + , _sstl(std::move(sstl)) { +} + +sstable_set::incremental_selector::selection sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); + std::vector current_versioned_sstables; + std::tie(_current_range, current_versioned_sstables, _current_next_position) = _impl->select(pos); + _current_sstables = boost::copy_range>(current_versioned_sstables + | boost::adaptors::filtered([this] (shared_sstable sst) { return _sstl->contains(sst); }) + | boost::adaptors::transformed([] (shared_sstable sst) { return sst; })); _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); } return {_current_sstables, _current_next_position}; } -sstable_set::incremental_selector -sstable_set::make_incremental_selector() const { - return incremental_selector(_impl->make_incremental_selector(), *_schema); +sstables::sstable_set::incremental_selector sstable_set::make_incremental_selector() const { + return incremental_selector(_all->version().make_incremental_selector(), *_all->version().get_schema(), _all); +} + +sstable_set_version::~sstable_set_version() { + for (auto& added_sst : _added) { + _base_set->remove(added_sst); + } + if (_prev) { + _prev->_next.erase(this); + } +} + +// the sstable_set_impl must be empty +sstable_set_version::sstable_set_version(std::unique_ptr impl, schema_ptr schema) + : _base_set(make_lw_shared(std::move(impl))) + , _schema(std::move(schema)) { +} + +// Creates a new version based on ver +sstable_set_version::sstable_set_version(sstable_set_version* ver) + : _base_set(ver->_base_set) + , _schema(ver->_schema) + , _prev(ver) { + _prev->_next.insert(this); +} + +// Merges changes made in this version into the next version (can be called only when there is a single next version, +// and no further changes can be made to this one, i.e. no sstable_list references this version). +void sstable_set_version::merge_with_next() noexcept { + auto next_version = *_next.begin(); + next_version->_added = std::move(_added); + next_version->_erased = std::move(_erased); + auto next_nh = _next.extract(*_next.begin()); + if (_prev) { + _prev->_next.erase(this); + _prev->_next.insert(std::move(next_nh)); + } + next_version->_prev = std::move(_prev); // destroys this by overwriting the last reference +} + +void sstable_set_version::propagate_inserted_sstable(const shared_sstable& sst) noexcept { + if (_reference_count > _next.size()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + return; + } + for (auto& ver_chck : _next) { + if (!ver_chck->_added.contains(sst)) { + return; + } + } + // Remove the sstable from child versions and get a node handle to insert in this version + auto sst_nh = (*_next.begin())->_added.extract(sst); + for (auto& ver_chck : _next) { + auto nh = ver_chck->_added.extract(sst_nh.value()); + if (!nh.empty()) { + _base_set->remove(nh.value()); + } + } + auto it = _erased.find(sst_nh.value()); + if (it != _erased.end()) { + // If the sstable was erased in this version and added in all its children, its as if it weren't added or inserted in any of them + // because we won't read from this version anymore. + _erased.erase(it); + _base_set->remove(sst_nh.value()); + } else { + auto added_it = _added.insert(std::move(sst_nh)).position; + if (_prev) { + _prev->propagate_inserted_sstable(*added_it); + } + } +} + +void sstable_set_version::propagate_erased_sstable(const shared_sstable& sst) noexcept { + if (_reference_count > _next.size()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + return; + } + for (auto& ver_chck : _next) { + if (!ver_chck->_erased.contains(sst)) { + return; + } + } + // Remove the sstable from child versions and get a node handle to insert in this version + auto sst_nh = (*_next.begin())->_erased.extract(sst); + for (auto& ver_chck : _next) { + ver_chck->_erased.extract(sst_nh.value()); + } + auto it = _added.find(sst_nh.value()); + if (it != _added.end()) { + // If the sstable was added in this version and erased in all its children, its as if it weren't added or inserted in any of them + // because we won't read from this version anymore. + _added.erase(it); + _base_set->remove(sst_nh.value()); + } else { + auto erased_it = _erased.insert(std::move(sst_nh)).position; + if (_prev) { + _prev->propagate_erased_sstable(*erased_it); + } + } +} + +// Called when a reference to the version gets removed - if the reference was from an sstable_list, it's the first time we can propagate any +// changes, and if the reference was from another sstable_set_version, we want to check if there were any changes that were present in all +// versions based on this one, but absent in the version that was just removed. +void sstable_set_version::propagate_changes_from_next_versions() noexcept { + if (_reference_count > _next.size() || _next.empty()) { + // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it + // Or there are no child versions so there is nothing to propagate + return; + } + sstable_set_version* next_ver = *_next.begin(); + // Propagate additions + for (auto ver : _next) { + if (ver->_added.size() < next_ver->_added.size()) { + next_ver = ver; + } + } + for (auto it = next_ver->_added.begin(); it != next_ver->_added.end();) { + auto& sst = *it; + it++; + propagate_inserted_sstable(sst); + } + + next_ver = *_next.begin(); + // Propagate erasures + for (auto ver : _next) { + if (ver->_erased.size() < next_ver->_erased.size()) { + next_ver = ver; + } + } + for (auto it = next_ver->_erased.begin(); it != next_ver->_erased.end();) { + auto& sst = *it; + it++; + propagate_erased_sstable(sst); + } +} + +const sstable_set_version* sstable_set_version::get_previous_version() const { + return _prev.get(); +} + +bool sstable_set_version::can_merge_with_next() const noexcept { + return _reference_count == 1 && _next.size() == 1; +} + +bool sstable_set_version::can_delete() const noexcept { + return _reference_count == 0; +} + +void sstable_set_version::add_reference() noexcept { + _reference_count++; +} + +void sstable_set_version::remove_reference() noexcept { + _reference_count--; + propagate_changes_from_next_versions(); +} + +schema_ptr sstable_set_version::get_schema() const { + return _schema; +} + +std::vector sstable_set_version::select(const dht::partition_range& range) const { + return boost::copy_range>(_base_set->select(range) + | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); })); +} + +// Return all runs which contain any of the input sstables. +std::vector sstable_set_version::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return sstable_run(boost::copy_range>(_base_set->select_by_run_id(run_id) + | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); }))); + })); +} + +const std::map& sstable_set_version::all() const { + return _base_set->sstables_and_times_added; +} + +// Provides strong exception guarantee +sstable_set_version_reference sstable_set_version::insert(shared_sstable sst) { + if (this->contains(sst)) { + return get_reference_to_this(); + } + if (_next.size()) { + auto sstvr = get_reference_to_new_copy(); + // The new version has no copies based on it, so inserting into it doesn't create another version + return sstvr->insert(sst); + } + auto it = _erased.find(sst); + if (it != _erased.end()) { + _erased.erase(it); + } else { + _base_set->insert(sst); + try { + _added.insert(sst); + if (_prev) { + _prev->propagate_inserted_sstable(sst); + } + } catch (...) { + _base_set->remove(sst); + throw; + } + } + return get_reference_to_this(); +} + +// Provides strong exception guarantee +sstable_set_version_reference sstable_set_version::erase(shared_sstable sst) { + if (!this->contains(sst)) { + return get_reference_to_this(); + } + if (_next.size()) { + auto sstvr = get_reference_to_new_copy(); + // The new version has no copies based on it, so erasing from it doesn't create another version + return sstvr->erase(sst); + } + auto it = _added.find(sst); + if (it != _added.end()) { + _added.erase(it); + _base_set->remove(sst); + } else { + _erased.insert(sst); + if (_prev) { + _prev->propagate_erased_sstable(sst); + } + } + return get_reference_to_this(); +} + +bool sstable_set_version::contains(shared_sstable sst) const { + return _added.contains(sst) || (!_erased.contains(sst) && _prev && _prev->contains(sst)); +} + +size_t sstable_set_version::size() const { + return _added.size() - _erased.size() + (_prev ? _prev->size() : 0); +} + +std::unique_ptr sstable_set_version::make_incremental_selector() const { + return _base_set->impl->make_incremental_selector(); +} + +flat_mutation_reader +sstable_set_version::create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const { + return _base_set->impl->create_single_key_sstable_reader(cf, std::move(schema), + std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + +sstable_set_version_reference sstable_set_version::get_reference_to_this() { + return sstable_set_version_reference(this); +} + +sstable_set_version_reference sstable_set_version::get_reference_to_new_copy() { + return sstable_set_version_reference(new sstable_set_version(this)); } std::unique_ptr bag_sstable_set::clone() const { @@ -844,7 +1274,7 @@ sstable_set::create_single_key_sstable_reader( streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { assert(pr.is_singular() && pr.start()->value().has_key()); - return _impl->create_single_key_sstable_reader(cf, std::move(schema), + return _all->version().create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index a8d7975b00..ebe1ae1214 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -24,6 +24,7 @@ #include "flat_mutation_reader.hh" #include "sstables/progress_monitor.hh" #include "shared_sstable.hh" +#include "sstables/sstables.hh" #include "dht/i_partitioner.hh" #include #include @@ -34,30 +35,157 @@ class estimated_histogram; namespace sstables { -class sstable_set_impl; +// This is an implementation of a set of sstables. The sstable_set allows: +// - selecting sstables from a given partition_range +// - selecting sstables with the same run identifiers +// - creating incremental_selectors, used to incrementally select sstables from the set using ring-position +// - many utilities of the std::set (inserting, erasing, checking membership, checking emptiness, iterating) +// - creating copies + +// The main use of the sstable_set is in the table class. The sstable_set that is stored there needs to be copied +// every time it is modified, to allow existing sstable readers to continue using the old version. For this reason +// the sstable_set is implemented in a way that allows fast copying. + +// This is achieved by associating each copy of an sstable_set with an sstable_set_version which contains all changes +// made to that copy. Each sstable_set_version has a reference to the sstable_set_version associated with the sstable_set +// that was copied. We say that the copied version is a parent, and that the copy version is a child, or that it is "based" +// on the copied version. +// This allows easy checking if an sstable is an element of an sstable_set copy - the answer is yes if the sstable was added +// in this copy or if it wasn't erased in it but it was an element of the parent version. +// It's worth adding that this solution makes a copied sstable_set immutable. To support modifying a copied sstable_set anyway, +// any modification applied to it replaces its associated sstable_set_version with a new one, based on the immutable version. +// The new version hasn't been copied, co it can be modified. + +// With the ability to check whether an sstable is an element of an sstable_set, all the methods that select sstables +// from the set can follow the same rule: get results that include all sstables from any copy, and filter out those sstables +// that aren't elements of current copy. These results are received from the class called sstable_set_data structure. + +// This solution requires a special way of finding out when an sstable should be removed from sstable_set_data. We achieve this +// by counting, for each sstable, in how many different versions has it been added. If that number is zero - the sstable should +// be completely removed. This number is decreased when deleting versions, when erasing in the same version it was added, and +// in one other case, as a result of propagating a change from a versions children to their parent, explained further in following +// paragraphs. + +// With this approach, the length of the longest chain of sstable_set_versions that are based on one another should be as +// short as possible. To achieve that, we are merging pairs of versions. If a version is not referenced by any sstable_sets or +// sstable_lists, we know that it won't be modified or read from, so we can merge its changes into versions that are based on it. +// We don't want to copy these changes though, so we wait with merging until there is only one child version. + +// This waiting causes one more problem: an sstable may have been added in a version that has no reference from an sstable_set or +// an sstable_list, and it may have been erased in all its child versions. In such scenario, the sstable can't be selected from +// any version, so it should be removed from the set completely. To handle such situations, if a change has been made to an +// sstable in all children of some version that has no referenece from an sstable_set or sstable_list, the change is removed +// in all children and added to the parent instead. +// This solution guarantees that when a version is ready for merging, the version it is being merged into contains no changes, +// because if it had any, they would be propagated to this version instead. As a result, merging versions is simply reassigning +// sets of changes. The number of times an sstable change gets propagated into a parent version is limited by the number of ancestors +// of a version. + class incremental_selector_impl; +class sstable_set_impl; // Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID. // SStables in that same run will not overlap with one another. class sstable_run { - sstable_list _all; + std::unordered_set _all; public: + sstable_run() = default; + sstable_run(std::unordered_set all) : _all(std::move(all)) {} void insert(shared_sstable sst); void erase(shared_sstable sst); // Data size of the whole run, meaning it's a sum of the data size of all its fragments. uint64_t data_size() const; - const sstable_list& all() const { return _all; } + const std::unordered_set& all() const { return _all; } }; +// The very base class of an sstable_set. Stores all sstables that were added in +// any version of the set. Its methods return supersets of values returned by +// any single version. +struct sstable_set_data { + std::unique_ptr impl; + std::unordered_map> all_runs; + // For each sstable, stores in how many different versions has it been inserted + std::map sstables_and_times_added; + + sstable_set_data(std::unique_ptr impl); + void insert(shared_sstable sst); + void erase(shared_sstable sst); + std::vector select(const dht::partition_range& range) const; + std::unordered_set select_by_run_id(utils::UUID run_id) const; + void remove(shared_sstable sst); +}; + +class sstable_set_version; + +// Manages a pointer to an sstable_set_version - the when sstable_set_version gets removed when all +// sstable_set_version_references are removed, or when there is only one sstable_set_version_reference +// and that reference is owned by another sstable_set_version. +// In the second case the data from the managed version is merged into the other version before removal. +class sstable_set_version_reference { + mutable sstable_set_version* _p = nullptr; + + explicit sstable_set_version_reference(sstable_set_version* p); +public: + ~sstable_set_version_reference(); + sstable_set_version_reference() = default; + sstable_set_version_reference(const sstable_set_version_reference& ref); + sstable_set_version_reference(sstable_set_version_reference&&) noexcept; + sstable_set_version_reference& operator=(const sstable_set_version_reference& x); + sstable_set_version_reference& operator=(sstable_set_version_reference&&) noexcept; + sstable_set_version& operator*() const noexcept { return *_p; } + sstable_set_version* operator->() const noexcept { return _p; } + sstable_set_version* get() const noexcept { return _p; } + explicit operator bool() const noexcept { return bool(_p); } + friend class sstable_set_version; +}; + +// The data storage for an sstable_set. Can be used like a std::set (although with slightly +// costlier operations). +class sstable_list { + sstable_set_version_reference _version; +public: + sstable_list(std::unique_ptr impl, schema_ptr s); + sstable_list(const sstable_list& sstl); + sstable_list(sstable_list&& sstl) noexcept; + sstable_list& operator=(const sstable_list& sstl); + sstable_list& operator=(sstable_list&& sstl) noexcept; +public: + class const_iterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = const shared_sstable; + using difference_type = std::ptrdiff_t; + using pointer = const shared_sstable*; + using reference = const shared_sstable&; + private: + std::map::const_iterator _it; + const sstable_set_version_reference* _ver; + void advance(); + public: + const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver); + const_iterator& operator++(); + const_iterator operator++(int); + const shared_sstable& operator*() const; + bool operator==(const const_iterator& it) const; + }; + using iterator = const_iterator; + const_iterator begin() const; + const_iterator end() const; + + size_t size() const; + void insert(shared_sstable sst); + void erase(shared_sstable sst); + bool contains(shared_sstable sst) const; + bool empty() const; + const sstable_set_version& version() const; +}; + +// A set of sstables associated with a table. class sstable_set : public enable_lw_shared_from_this { - std::unique_ptr _impl; - schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list // that has a reference somewhere lw_shared_ptr _all; - std::unordered_map _all_runs; public: - ~sstable_set(); sstable_set(std::unique_ptr impl, schema_ptr s); sstable_set(const sstable_set&); sstable_set(sstable_set&&) noexcept; @@ -66,7 +194,7 @@ public: std::vector select(const dht::partition_range& range) const; // Return all runs which contain any of the input sstables. std::vector select_sstable_runs(const std::vector& sstables) const; - lw_shared_ptr all() const { return _all; } + lw_shared_ptr all() const; void insert(shared_sstable sst); void erase(shared_sstable sst); @@ -79,9 +207,10 @@ public: mutable std::optional> _current_range_view; mutable std::vector _current_sstables; mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min(); + lw_shared_ptr _sstl; public: ~incremental_selector(); - incremental_selector(std::unique_ptr impl, const schema& s); + incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl); incremental_selector(incremental_selector&&) noexcept; struct selection { @@ -173,6 +302,58 @@ flat_mutation_reader make_restricted_range_sstable_reader( mutation_reader::forwarding, read_monitor_generator& rmg = default_read_monitor_generator()); +class sstable_set_version { + // shared by all sstable_set_versions that were based on the same original set + lw_shared_ptr _base_set; + schema_ptr _schema; + sstable_set_version_reference _prev; + mutable std::unordered_set _next; + std::unordered_set _added; + std::unordered_set _erased; + // is equal to the number of sstable_set_versions based on this version increased by one if there is + // an sstable_list that references this version + unsigned _reference_count = 0; +public: + ~sstable_set_version(); + sstable_set_version(std::unique_ptr impl, schema_ptr schema); + explicit sstable_set_version(sstable_set_version* ver); +private: + void propagate_inserted_sstable(const shared_sstable& sst) noexcept; + void propagate_erased_sstable(const shared_sstable& sst) noexcept; + void propagate_changes_from_next_versions() noexcept; +public: + const sstable_set_version* get_previous_version() const; + bool can_merge_with_next() const noexcept; + void merge_with_next() noexcept; + bool can_delete() const noexcept; + void add_reference() noexcept; + void remove_reference() noexcept; + schema_ptr get_schema() const; + std::vector select(const dht::partition_range& range) const; + // Return all runs which contain any of the input sstables. + std::vector select_sstable_runs(const std::vector& sstables) const; + const std::map& all() const; + sstable_set_version_reference insert(shared_sstable sst); + sstable_set_version_reference erase(shared_sstable sst); + bool contains(shared_sstable sst) const; + size_t size() const; + std::unique_ptr make_incremental_selector() const; + flat_mutation_reader create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const; +public: + sstable_set_version_reference get_reference_to_this(); + sstable_set_version_reference get_reference_to_new_copy(); +}; + sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/test/boost/sstable_set_test.cc b/test/boost/sstable_set_test.cc new file mode 100644 index 0000000000..887bb9bb94 --- /dev/null +++ b/test/boost/sstable_set_test.cc @@ -0,0 +1,638 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include +#include +#include "sstables/compaction.hh" +#include +#include +#include "test/boost/sstable_test.hh" +#include +#include +#include "sstables/compaction_strategy_impl.hh" +#include "sstables/date_tiered_compaction_strategy.hh" +#include "sstables/time_window_compaction_strategy.hh" +#include "sstables/leveled_compaction_strategy.hh" +#include "sstables/sstable_set.hh" +#include "sstables/sstable_set_impl.hh" + +using namespace sstables; + +static const sstring some_keyspace("ks"); +static const sstring some_column_family("cf"); + +static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { + auto sst = env.make_sstable(schema, "", gen, la, big); + sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); + return sst; +} + +SEASTAR_TEST_CASE(simple_versioned_sstable_set_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto key_and_token_pair = token_generation_for_current_shard(8); + auto decorated_keys = boost::copy_range>( + key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { + auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); + auto pk = sstables::key::from_bytes(value).to_partition_key(*s); + return dht::decorate_key(*s, std::move(pk)); + })); + struct snapshot_and_selections { + std::optional ssts; + std::vector> selections; + std::unordered_map> runs; + std::unordered_set all; + snapshot_and_selections(sstable_set&& ssts, const std::vector>& selections, + const std::unordered_map>& runs, const std::unordered_set& all) + : ssts(std::move(ssts)), selections(selections), runs(runs), all(all) { } + }; + auto check = [&decorated_keys] (snapshot_and_selections& version) { + for (int j = 0; j < 8; j++) { + auto sel = version.ssts->select(dht::partition_range::make_singular(decorated_keys[j])); + BOOST_REQUIRE_EQUAL(sel.size(), version.selections[j].size()); + for (auto& sst : sel) { + BOOST_REQUIRE(version.selections[j].contains(sst)); + } + } + for (auto& [uuid, run] : version.runs) { + if (run.empty()) { + continue; + } + std::vector runs = version.ssts->select_sstable_runs({*run.begin()}); + // only one sstable -> only one run + BOOST_REQUIRE_EQUAL(runs[0].all().size(), run.size()); + for (auto& sst : runs[0].all()) { + BOOST_REQUIRE(run.contains(sst)); + } + } + BOOST_REQUIRE_EQUAL(version.ssts->all()->size(), version.all.size()); + for (auto& sst : *version.ssts->all()) { + BOOST_REQUIRE(version.all.contains(sst)); + } + }; + // check that selecting from older snapshots of an sstable_set gives correct results. + { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + std::optional set = cs.make_sstable_set(s); + std::vector> current_selections(8); + std::unordered_map> current_runs; + std::unordered_set current_all; + + std::vector versions; + versions.reserve(20); + int token = 0; + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); + set->insert(sst); + current_selections[token].insert(sst); + current_runs[sst->run_identifier()].insert(sst); + current_all.insert(sst); + ++token %= 8; + } + for (int j = 0; j < 5; j++) { + auto sst = *set->all()->begin(); + set->erase(sst); + for (auto& sel : current_selections) { + // actually erases only from one + sel.erase(sst); + } + current_runs[sst->run_identifier()].erase(sst); + current_all.erase(sst); + } + versions.emplace_back(std::move(*set), current_selections, current_runs, current_all); + set = versions.back().ssts; + } + + for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { + check(versions[i]); + // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots + versions[i].ssts.reset(); + } + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_list_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto key_and_token_pair = token_generation_for_current_shard(8); + auto decorated_keys = boost::copy_range>( + key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { + auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); + auto pk = sstables::key::from_bytes(value).to_partition_key(*s); + return dht::decorate_key(*s, std::move(pk)); + })); + + + struct list_and_sstables { + std::optional list; + std::unordered_set sstables_in_list; + + list_and_sstables(sstable_list&& sstl, std::unordered_set sstables_in_list) + : list(std::move(sstl)), sstables_in_list(sstables_in_list) { } + }; + auto check = [] (list_and_sstables& version) { + BOOST_REQUIRE_EQUAL(version.list->size(), version.sstables_in_list.size()); + for (auto& sst : *version.list) { + BOOST_REQUIRE(version.list->contains(sst)); + } + }; + + { + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + std::optional set = cs.make_sstable_set(s); + std::optional l = *set->all(); + std::unordered_set sstables_in_list; + + + std::vector versions; + versions.reserve(20); + int token = 0; + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); + l->insert(sst); + sstables_in_list.insert(sst); + ++token %= 8; + } + for (int j = 0; j < 5; j++) { + auto sst = *l->begin(); + l->erase(sst); + sstables_in_list.erase(sst); + } + versions.emplace_back(std::move(*l), sstables_in_list); + l = versions.back().list; + } + + for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { + check(versions[i]); + // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots + versions[i].list.reset(); + } + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_version_merging_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(2); + std::optional set = cs.make_sstable_set(s); + std::optional list = *set->all(); + // set -> list + BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); + std::optional list2 = *set->all(); + // set -> list + // -> list2 + BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(&set->all()->version(), list2->version().get_previous_version()); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + set->insert(sst); + // set' -> list + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); + BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); + sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[1].first, key_and_token_pair[1].first, 1); + set->insert(sst); + // set' -> list + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); + BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); + std::optional list3 = list; + // set' -> list -> list3 + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(&list->version(), list3->version().get_previous_version()); + list.reset(); + // set' -> list3 + // -> list2 + // -> set + BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list3->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); + set.reset(); + // set' -> list3 + // -> list2 + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); + std::optional list4 = list3; + std::optional list5 = list3; + // set' -> list3 -> list4 + // -> list5 + // -> list2 + BOOST_REQUIRE_EQUAL(&list3->version(), list4->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(&list3->version(), list5->version().get_previous_version()); + list3.reset(); + // set' -> list3' -> list4 + // -> list5 + // -> list2 + BOOST_REQUIRE_NE(list2->version().get_previous_version(), list4->version().get_previous_version()); + BOOST_REQUIRE_EQUAL(list4->version().get_previous_version(), list5->version().get_previous_version()); + list4.reset(); + // set' -> list5 + // -> list2 + BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list5->version().get_previous_version()); + list2.reset(); + // list5 + BOOST_REQUIRE_EQUAL(nullptr, list5->version().get_previous_version()); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_erased_in_last_descendant_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set.reset(); + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + set5->erase(sst); + sst = nullptr; + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_ancestor_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + // set -> set2 + set2->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set4->erase(sst); + set5->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set3.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + + +SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_descendant_test) { + return test_env::do_with([] (test_env& env) { + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + auto key_and_token_pair = token_generation_for_current_shard(1); + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + // set -> set2 + // -> set3 + set.reset(); + set2->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set3.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set; + // set -> set2 + // -> set3 + // -> set4 + set.reset(); + set2->erase(sst); + set3->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set4.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + { + std::optional set = cs.make_sstable_set(s); + auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); + bool is_sstable_removed = false; + utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { + is_sstable_removed = true; + }); + set->insert(sst); + std::optional set2 = set; + std::optional set3 = set; + std::optional set4 = set3; + std::optional set5 = set3; + // set -> set2 + // -> set3 -> set4 + // -> set5 + set.reset(); + set2->erase(sst); + set3.reset(); + set4->erase(sst); + sst = nullptr; + BOOST_REQUIRE(!is_sstable_removed); + set5.reset(); + BOOST_REQUIRE(is_sstable_removed); + } + return make_ready_future<>(); + }); +} + +class simple_sstable_set { + std::unique_ptr _impl; + schema_ptr _schema; + // used to support column_family::get_sstable(), which wants to return an sstable_list + // that has a reference somewhere + lw_shared_ptr> _all; + std::unordered_map _all_runs; +public: + ~simple_sstable_set() = default; + + simple_sstable_set(std::unique_ptr impl, schema_ptr schema) + : _impl(std::move(impl)) + , _schema(std::move(schema)) + , _all(make_lw_shared>()) { + } + + simple_sstable_set(const simple_sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared>(*x._all)) + , _all_runs(x._all_runs) { + } + + simple_sstable_set(simple_sstable_set&& x) noexcept = default; + + simple_sstable_set& operator=(const simple_sstable_set& x) { + if (this != &x) { + auto tmp = simple_sstable_set(x); + *this = std::move(tmp); + } + return *this; + } + + simple_sstable_set& operator=(simple_sstable_set&&) noexcept = default; + + std::vector select(const dht::partition_range& range) const { + return _impl->select(range); + } + + // Return all runs which contain any of the input sstables. + std::vector select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); + } + + lw_shared_ptr> all() const { return _all; } + + void insert(shared_sstable sst) { + _impl->insert(sst); + _all->insert(sst); + _all_runs[sst->run_identifier()].insert(sst); + } + + void erase(shared_sstable sst) { + _impl->erase(sst); + _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); + } +}; + +SEASTAR_TEST_CASE(sstable_set_random_walk_test) { + return test_env::do_with([] (test_env& env) { + auto rand = std::default_random_engine(); + auto op_gen = std::uniform_int_distribution(0, 7); + auto nr_dist = std::geometric_distribution(0.7); + auto s = make_shared_schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type); + auto leveled_cs = leveled_compaction_strategy(s->compaction_strategy_options()); + std::vector sstable_sets; + std::vector simple_sstable_sets; + sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); + simple_sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); + auto key_and_token_pair = token_generation_for_current_shard(1000); + std::vector sstables(1000); + for (int i = 0; i < 1000; i++) { + sstables[i] = sstable_for_overlapping_test(env, s, i, key_and_token_pair[i].first, key_and_token_pair[i].first, i); + } + for (auto i = 0; i != 100000; i++) { + auto op = op_gen(rand); + auto u = std::uniform_int_distribution(0, sstable_sets.size() - 1); + auto idx = u(rand); + switch (op) { + case 0: { + // delete + if (sstable_sets.size() > 1) { + sstable_sets.erase(sstable_sets.begin() + idx); + simple_sstable_sets.erase(simple_sstable_sets.begin() + idx); + break; + } + // if we can't remove the version, let's create one + [[fallthrough]]; + } + case 1: { + // copy + if (sstable_sets.size() < 100) { + sstable_sets.emplace_back(sstable_sets[idx]); + simple_sstable_sets.emplace_back(simple_sstable_sets[idx]); + for (auto& sst : *simple_sstable_sets.back().all()) { + BOOST_REQUIRE(sstable_sets.back().all()->contains(sst)); + } + } + break; + } + default: + // modify + auto sst_u = std::uniform_int_distribution(0, 999); + auto sst_idx = sst_u(rand); + if (simple_sstable_sets[idx].all()->contains(sstables[sst_idx])) { + sstable_sets[idx].erase(sstables[sst_idx]); + simple_sstable_sets[idx].erase(sstables[sst_idx]); + BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); + BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); + } else { + sstable_sets[idx].insert(sstables[sst_idx]); + simple_sstable_sets[idx].insert(sstables[sst_idx]); + BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); + BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); + } + } + for (int j = 0; j < sstable_sets.size(); j++) { + BOOST_REQUIRE_EQUAL(sstable_sets[j].all()->size(), simple_sstable_sets[j].all()->size()); + } + } + return make_ready_future<>(); + }); +} From 0feff8712e11c743217414293d7ec559f9604451 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Thu, 5 Nov 2020 18:04:21 +0100 Subject: [PATCH 4/6] sstables: use fast copying of the sstable_set instead of rebuilding it The sstable_set enables copying without iterating over all its elements, so it's faster to copy a set and modify it than copy all its elements while filtering the ones that were erased. The modifications are done on a temporary version of the set, so that if an operation fails the base version remains unchanged Signed-off-by: Wojciech Mitros --- table.cc | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/table.cc b/table.cc index a76b48fa47..8f20a3a046 100644 --- a/table.cc +++ b/table.cc @@ -718,21 +718,17 @@ void table::rebuild_statistics() { future> table::build_new_sstable_list(const std::vector& new_sstables, const std::vector& old_sstables) { - auto current_sstables = _sstables; - auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema); - std::unordered_set s(old_sstables.begin(), old_sstables.end()); - - // this might seem dangerous, but "move" here just avoids constness, - // making the two ranges compatible when compiling with boost 1.55. - // Noone is actually moving anything... - for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) { - if (!s.contains(tab)) { - new_sstable_list.insert(tab); - } + auto new_sstable_set = *_sstables; + for (auto& tab : new_sstables) { + new_sstable_set.insert(tab); co_await make_ready_future<>(); // yield if needed. } - co_return make_lw_shared(std::move(new_sstable_list)); + for (auto& tab : old_sstables) { + new_sstable_set.erase(tab); + co_await make_ready_future<>(); // yield if needed. + } + co_return make_lw_shared(std::move(new_sstable_set)); } // Note: must run in a seastar thread From 693b4e0fcd9a14472340739cdbee1e6d4451cdfd Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Fri, 11 Dec 2020 00:33:18 +0100 Subject: [PATCH 5/6] sstables: move column_family_test class from test/boost to test/lib Column_family_test allows performing private methods on column_family's sstable_set. It may be useful not only in the boost tests, so it's moved from test/boost/sstable_test.hh to test/lib/sstable_test_env.hh. sstable_test.hh includes sstable_test_env.hh, so no includes need to be changed. Signed-off-by: Wojciech Mitros --- test/boost/sstable_test.hh | 34 ---------------------------------- test/lib/sstable_test_env.hh | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index 30790ab29c..efc0eba8df 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -37,40 +37,6 @@ #include #include -constexpr auto la = sstables::sstable::version_types::la; -constexpr auto big = sstables::sstable::format_types::big; - -class column_family_test { - lw_shared_ptr _cf; -public: - column_family_test(lw_shared_ptr cf) : _cf(cf) {} - - void add_sstable(sstables::shared_sstable sstable) { - _cf->_sstables->insert(std::move(sstable)); - } - - // NOTE: must run in a thread - void rebuild_sstable_list(const std::vector& new_sstables, - const std::vector& sstables_to_remove) { - _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); - } - - static void update_sstables_known_generation(column_family& cf, unsigned generation) { - cf.update_sstables_known_generation(generation); - } - - static uint64_t calculate_generation_for_new_table(column_family& cf) { - return cf.calculate_generation_for_new_table(); - } - - static int64_t calculate_shard_from_sstable_generation(int64_t generation) { - return column_family::calculate_shard_from_sstable_generation(generation); - } - - future try_flush_memtable_to_sstable(lw_shared_ptr mt) { - return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); - } -}; namespace sstables { diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index e0ac3234a3..e126386562 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -31,6 +31,41 @@ #include "test/lib/test_services.hh" #include "test/lib/log.hh" +constexpr auto la = sstables::sstable::version_types::la; +constexpr auto big = sstables::sstable::format_types::big; + +class column_family_test { + lw_shared_ptr _cf; +public: + column_family_test(lw_shared_ptr cf) : _cf(cf) {} + + void add_sstable(sstables::shared_sstable sstable) { + _cf->_sstables->insert(std::move(sstable)); + } + + // NOTE: must run in a thread + void rebuild_sstable_list(const std::vector& new_sstables, + const std::vector& sstables_to_remove) { + _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); + } + + static void update_sstables_known_generation(column_family& cf, unsigned generation) { + cf.update_sstables_known_generation(generation); + } + + static uint64_t calculate_generation_for_new_table(column_family& cf) { + return cf.calculate_generation_for_new_table(); + } + + static int64_t calculate_shard_from_sstable_generation(int64_t generation) { + return column_family::calculate_shard_from_sstable_generation(generation); + } + + future try_flush_memtable_to_sstable(lw_shared_ptr mt) { + return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); + } +}; + namespace sstables { class test_env_sstables_manager : public sstables_manager { From 17634d141b70228e89310c4a72a63a66eaab680c Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Fri, 11 Dec 2020 00:38:32 +0100 Subject: [PATCH 6/6] sstables: add test for checking the latency of updating the sstable_set in a table Added a test which measures the time it takes to replace sstables in a table's sstable_set, using the leveled compaction strategy. Signed-off-by: Wojciech Mitros --- configure.py | 1 + test/perf/perf_sstable_set.cc | 111 ++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 test/perf/perf_sstable_set.cc diff --git a/configure.py b/configure.py index e9b8fd73be..a58e83a57f 100755 --- a/configure.py +++ b/configure.py @@ -432,6 +432,7 @@ scylla_tests = set([ 'test/perf/perf_row_cache_update', 'test/perf/perf_simple_query', 'test/perf/perf_sstable', + 'test/perf/perf_sstable_set', 'test/unit/lsa_async_eviction_test', 'test/unit/lsa_sync_eviction_test', 'test/unit/row_cache_alloc_stress_test', diff --git a/test/perf/perf_sstable_set.cc b/test/perf/perf_sstable_set.cc new file mode 100644 index 0000000000..02cc128a9e --- /dev/null +++ b/test/perf/perf_sstable_set.cc @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "database.hh" +#include "test/lib/simple_schema.hh" +#include "test/perf/perf.hh" +#include +#include +#include "test/lib/sstable_test_env.hh" +#include + +static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { + auto sst = env.make_sstable(schema, "", gen, la, big); + sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); + return sst; +} + +int main(int argc, char* argv[]) { + app_template app; + return app.run(argc, argv, [&app] { + return test_env::do_with_async([] (test_env& env) { + using namespace std::chrono; + using namespace std::chrono_literals; + auto start = high_resolution_clock::now(); + simple_schema ss; + auto s = ss.schema(); + auto cm = make_lw_shared(); + column_family::config cfg; + auto cl_stats = make_lw_shared(); + auto tracker = make_lw_shared(); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker); + cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled); + + constexpr int ssts_in_level_0 = 10; + std::function idx_to_level = [&] (int i) { + if (i < ssts_in_level_0) { + return 0; + } + return 1 + idx_to_level((i - ssts_in_level_0) / 10 + ssts_in_level_0 - 1); + }; + auto level_to_size = [] (int level) { + if (level == 0) { + return ssts_in_level_0; + } + return int(pow(10, level)); + }; + + auto kt_pair = token_generation_for_current_shard(1120); + auto min_max_keys = [&kt_pair, &level_to_size] (auto level, auto pos_in_level) -> std::pair { + auto last_key_idx = kt_pair.size() - 1; + if (level == 0) { + return { kt_pair[0].first, kt_pair[last_key_idx].first }; + } + auto total_ranges = kt_pair.size(); + auto level_size_in_ssts = level_to_size(level); + unsigned ranges_per_sst = std::max(1U, unsigned(floor(float(total_ranges) / level_size_in_ssts))); + sstring min_key = kt_pair.at(pos_in_level).first; + sstring max_key = kt_pair.at(std::min(pos_in_level + ranges_per_sst - 1, unsigned(last_key_idx))).first; + return {min_key, max_key}; + }; + + std::vector inputs[3], outputs[3]; + + std::array pos_in_levels{0}; + pos_in_levels.fill(0); + auto start2 = high_resolution_clock::now(); + for (auto i = 0; i < 1120; i++) { + auto level = idx_to_level(i); + auto [min, max] = min_max_keys(level, pos_in_levels[level]++); + auto sst = sstable_for_overlapping_test(env, s, i, min, max, uint32_t(level)); + column_family_test(cf).add_sstable(sst); + if (level >= 1 && pos_in_levels[level] < 10) { + inputs[level-1].push_back(sst); + } + seastar::thread::maybe_yield(); + } + + for (auto i = 0; i < 30; i++) { + auto [min, max] = min_max_keys(1 + i / 10, i % 10); + auto sst = sstable_for_overlapping_test(env, s, i, min, max, 1 + i / 10); + outputs[i / 10].push_back(sst); + seastar::thread::maybe_yield(); + } + for (auto i = 0; i < 3; i++) { + auto t1 = high_resolution_clock::now(); + column_family_test(cf).rebuild_sstable_list(outputs[i], inputs[i]); + auto t2 = high_resolution_clock::now(); + std::cout << "Replacing 10 L" << i + 1 <<" sstables took " + << std::chrono::duration_cast(t2 - t1).count() << "ms to complete\n"; + } + }); + }); +}