mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'sstables: add versioning to the sstable_set ' from Wojciech Mitros
Currently, the sstable_set in a table is copied before every change to allow accessing the unchanged version by existing sstable readers. This patch changes the sstable_set to a structure that keeps all its versions that are referenced somewhere and provides a way of getting a reference to an immutable version of the set. Each sstable in the set is associated with the versions it is alive in, and is removed when all such versions don't have references anymore. To avoid copying, the object holding all sstables in the set version is changed to a new structure, sstable_list, which was previously an alias for std::unordered_set<shared_sstable>, and which implements most of the methods of an unordered_set, but its iterator uses the actual set with all sstables from all referenced versions and iterates over those sstables that belong to the captured version. The methods that modify the sets contents give strong exception guarantee by trying to insert new sstables to its containers, and erasing them in the case of an caught exception. To release shared_sstables as soon as possible (i.e. when all references to versions that contain them die), each time a version is removed, all sstables that were referenced exclusively by this version are erased. We are able to find these sstables efficiently by storing, for each version, all sstables that were added and erased in it, and, when a version is removed, merging it with the next one. When a version that adds an sstable gets merged with a version that removes it, this sstable is erased. Fixes #2622 Signed-off-by: Wojciech Mitros wojciech.mitros@scylladb.com Closes #8111 * github.com:scylladb/scylla: sstables: add test for checking the latency of updating the sstable_set in a table sstables: move column_family_test class from test/boost to test/lib sstables: use fast copying of the sstable_set instead of rebuilding it sstables: replace the sstable_set with a versioned structure sstables: remove potential ub sstables: make sstable_set constructor less error-prone
This commit is contained in:
@@ -987,8 +987,8 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
ss::table_sstables tst;
|
||||
tst.keyspace = schema->ks_name();
|
||||
tst.table = schema->cf_name();
|
||||
|
||||
for (auto sstable : *t->get_sstables_including_compacted_undeleted()) {
|
||||
auto sstables = t->get_sstables_including_compacted_undeleted();
|
||||
for (auto sstable : *sstables) {
|
||||
auto ts = db_clock::to_time_t(sstable->data_file_write_time());
|
||||
::tm t;
|
||||
::gmtime_r(&ts, &t);
|
||||
|
||||
@@ -392,6 +392,7 @@ scylla_tests = set([
|
||||
'test/boost/sstable_conforms_to_mutation_source_test',
|
||||
'test/boost/sstable_resharding_test',
|
||||
'test/boost/sstable_directory_test',
|
||||
'test/boost/sstable_set_test',
|
||||
'test/boost/sstable_test',
|
||||
'test/boost/sstable_move_test',
|
||||
'test/boost/storage_proxy_test',
|
||||
@@ -435,6 +436,7 @@ scylla_tests = set([
|
||||
'test/perf/perf_row_cache_reads',
|
||||
'test/perf/perf_simple_query',
|
||||
'test/perf/perf_sstable',
|
||||
'test/perf/perf_sstable_set',
|
||||
'test/unit/lsa_async_eviction_test',
|
||||
'test/unit/lsa_sync_eviction_test',
|
||||
'test/unit/row_cache_alloc_stress_test',
|
||||
|
||||
@@ -67,7 +67,7 @@ future<> view_update_generator::start() {
|
||||
// Exploit the fact that sstables in the staging directory
|
||||
// are usually non-overlapping and use a partitioned set for
|
||||
// the read.
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
@@ -2606,8 +2606,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
size_t nr_sst_current = 0;
|
||||
while (!sstables.empty()) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s,
|
||||
make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
size_t batch_sst_nr = 16;
|
||||
std::vector<sstring> sst_names;
|
||||
std::vector<sstables::shared_sstable> sst_processed;
|
||||
|
||||
@@ -46,7 +46,6 @@ struct lw_shared_ptr_deleter<sstables::sstable> {
|
||||
namespace sstables {
|
||||
|
||||
using shared_sstable = seastar::lw_shared_ptr<sstable>;
|
||||
using sstable_list = std::unordered_set<shared_sstable>;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -63,93 +63,526 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) {
|
||||
return os;
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(s))
|
||||
, _all(std::move(all)) {
|
||||
sstable_set_data::sstable_set_data(std::unique_ptr<sstable_set_impl> impl)
|
||||
: impl(std::move(impl)) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<sstable_list>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&&) noexcept = default;
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(const sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
void sstable_set_data::insert(shared_sstable sst) {
|
||||
auto it = sstables_and_times_added.find(sst);
|
||||
if (it != sstables_and_times_added.end()) {
|
||||
// the sstable has already been added in some version
|
||||
it->second++;
|
||||
return;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable>
|
||||
sstable_set::select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstable_run>
|
||||
sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
try {
|
||||
_all->insert(sst);
|
||||
try {
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
} catch (...) {
|
||||
_all->erase(sst);
|
||||
throw;
|
||||
}
|
||||
impl->insert(sst);
|
||||
all_runs[sst->run_identifier()].insert(sst);
|
||||
sstables_and_times_added.emplace(sst, 1);
|
||||
} catch (...) {
|
||||
_impl->erase(sst);
|
||||
impl->erase(sst);
|
||||
auto runs_it = all_runs.find(sst->run_identifier());
|
||||
if (runs_it != all_runs.end()) {
|
||||
runs_it->second.erase(sst);
|
||||
if (runs_it->second.empty()) {
|
||||
all_runs.erase(runs_it);
|
||||
}
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
std::vector<shared_sstable> sstable_set_data::select(const dht::partition_range& range) const {
|
||||
return impl->select(range);
|
||||
}
|
||||
|
||||
sstable_set::~sstable_set() = default;
|
||||
std::unordered_set<shared_sstable> sstable_set_data::select_by_run_id(utils::UUID run_id) const {
|
||||
return all_runs.at(run_id);
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s) {
|
||||
// Called when a version that was adding an sstable was removed or when the sstable was later erased in that version.
|
||||
void sstable_set_data::remove(shared_sstable sst) {
|
||||
if (--sstables_and_times_added.at(sst) == 0) {
|
||||
impl->erase(sst);
|
||||
all_runs[sst->run_identifier()].erase(sst);
|
||||
sstables_and_times_added.erase(sst);
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::~sstable_set_version_reference() {
|
||||
if (_p) {
|
||||
_p->remove_reference();
|
||||
if (_p->can_merge_with_next()) {
|
||||
// merging will destroy the last reference to the version and the version will be deleted as a result
|
||||
_p->merge_with_next();
|
||||
} else if (_p->can_delete()) {
|
||||
delete _p;
|
||||
_p = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(sstable_set_version* p) : _p(p) {
|
||||
if (_p) {
|
||||
_p->add_reference();
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(const sstable_set_version_reference& ref) : _p(ref._p) {
|
||||
if (_p) {
|
||||
_p->add_reference();
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(sstable_set_version_reference&& ref) noexcept : _p(ref._p) {
|
||||
ref._p = nullptr;
|
||||
}
|
||||
|
||||
sstable_set_version_reference& sstable_set_version_reference::operator=(const sstable_set_version_reference& ref) {
|
||||
*this = sstable_set_version_reference(ref);
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set_version_reference& sstable_set_version_reference::operator=(sstable_set_version_reference&& ref) noexcept {
|
||||
if (this != &ref) {
|
||||
// Destroying this reference may invalide other references, so we're taking over the pointer managed by
|
||||
// the moved reference, and reassigning it after calling the destructor
|
||||
auto ptr = ref._p;
|
||||
ref._p = nullptr;
|
||||
this->~sstable_set_version_reference();
|
||||
_p = ptr;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
static sstable_set_version_reference make_sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr s) {
|
||||
sstable_set_version* new_version = new sstable_set_version(std::move(impl), std::move(s));
|
||||
return new_version->get_reference_to_this();
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(std::unique_ptr<sstable_set_impl> impl, schema_ptr s)
|
||||
: _version(make_sstable_set_version(std::move(impl), std::move(s))) {
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(const sstable_list& sstl)
|
||||
: _version(sstl._version->get_reference_to_new_copy()) {
|
||||
// copying an sstable_list creates a new sstable_set_version
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(sstable_list&& sstl) noexcept = default;
|
||||
|
||||
sstable_list& sstable_list::operator=(const sstable_list& sstl) {
|
||||
if (this != &sstl) {
|
||||
*this = sstable_list(sstl);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_list& sstable_list::operator=(sstable_list&& sstl) noexcept {
|
||||
if (this != &sstl) {
|
||||
this->~sstable_list();
|
||||
_version = std::move(sstl._version);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
// Moves the iterator to the next sstable which is contained by the associated sstable_set, or to the end
|
||||
// If the iterator already references a satisfying sstable, no changes are made.
|
||||
void sstable_list::const_iterator::advance() {
|
||||
while (_it != (*_ver)->all().end() && !(*_ver)->contains(_it->first)) {
|
||||
_it++;
|
||||
}
|
||||
}
|
||||
|
||||
sstable_list::const_iterator::const_iterator(std::map<shared_sstable, unsigned>::const_iterator it, const sstable_set_version_reference* ver)
|
||||
: _it(std::move(it))
|
||||
, _ver(ver) {
|
||||
advance();
|
||||
}
|
||||
|
||||
sstable_list::const_iterator& sstable_list::const_iterator::operator++() {
|
||||
assert(_it != (*_ver)->all().end());
|
||||
_it++;
|
||||
advance();
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::const_iterator::operator++(int) {
|
||||
const_iterator it = *this;
|
||||
operator++();
|
||||
return it;
|
||||
}
|
||||
|
||||
const shared_sstable& sstable_list::const_iterator::operator*() const {
|
||||
return _it->first;
|
||||
}
|
||||
|
||||
bool sstable_list::const_iterator::operator==(const const_iterator& it) const {
|
||||
assert(_ver == it._ver);
|
||||
return _it == it._it;
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::begin() const {
|
||||
return const_iterator(_version->all().begin(), &_version);
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::end() const {
|
||||
return const_iterator(_version->all().end(), &_version);
|
||||
}
|
||||
|
||||
size_t sstable_list::size() const {
|
||||
return _version->size();
|
||||
}
|
||||
|
||||
void sstable_list::insert(shared_sstable sst) {
|
||||
_version = _version->insert(sst);
|
||||
}
|
||||
|
||||
void sstable_list::erase(shared_sstable sst) {
|
||||
_version = _version->erase(sst);
|
||||
}
|
||||
|
||||
bool sstable_list::contains(shared_sstable sst) const {
|
||||
return _version->contains(sst);
|
||||
}
|
||||
|
||||
bool sstable_list::empty() const {
|
||||
return _version->size() == 0;
|
||||
}
|
||||
|
||||
const sstable_set_version& sstable_list::version() const {
|
||||
return *_version;
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s) {
|
||||
if (!impl->empty()) {
|
||||
throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl");
|
||||
}
|
||||
_all = make_lw_shared<sstable_list>(std::move(impl), std::move(s));
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _all(make_lw_shared<sstable_list>(*x._all)) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&& x) noexcept = default;
|
||||
|
||||
sstable_set& sstable_set::operator=(const sstable_set& ssts) {
|
||||
*this = sstable_set(ssts);
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set& sstable_set::operator=(sstable_set&& ssts) noexcept = default;
|
||||
|
||||
|
||||
std::vector<shared_sstable> sstable_set::select(const dht::partition_range& range) const {
|
||||
return _all->version().select(range);
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
return _all->version().select_sstable_runs(sstables);
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstable_list> sstable_set::all() const {
|
||||
return _all;
|
||||
}
|
||||
|
||||
void sstable_set::insert(shared_sstable sst) {
|
||||
_all->insert(sst);
|
||||
}
|
||||
|
||||
void sstable_set::erase(shared_sstable sst) {
|
||||
_all->erase(sst);
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::~incremental_selector() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
||||
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s, lw_shared_ptr<sstable_list> sstl)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s)
|
||||
, _sstl(std::move(sstl)) {
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::selection sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) {
|
||||
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos);
|
||||
std::vector<shared_sstable> current_versioned_sstables;
|
||||
std::tie(_current_range, current_versioned_sstables, _current_next_position) = _impl->select(pos);
|
||||
_current_sstables = boost::copy_range<std::vector<shared_sstable>>(current_versioned_sstables
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return _sstl->contains(sst); })
|
||||
| boost::adaptors::transformed([] (shared_sstable sst) { return sst; }));
|
||||
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
|
||||
}
|
||||
return {_current_sstables, _current_next_position};
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector
|
||||
sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_impl->make_incremental_selector(), *_schema);
|
||||
sstables::sstable_set::incremental_selector sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_all->version().make_incremental_selector(), *_all->version().get_schema(), _all);
|
||||
}
|
||||
|
||||
sstable_set_version::~sstable_set_version() {
|
||||
for (auto& added_sst : _added) {
|
||||
_base_set->remove(added_sst);
|
||||
}
|
||||
if (_prev) {
|
||||
_prev->_next.erase(this);
|
||||
}
|
||||
}
|
||||
|
||||
// the sstable_set_impl must be empty
|
||||
sstable_set_version::sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema)
|
||||
: _base_set(make_lw_shared<sstable_set_data>(std::move(impl)))
|
||||
, _schema(std::move(schema)) {
|
||||
}
|
||||
|
||||
// Creates a new version based on ver
|
||||
sstable_set_version::sstable_set_version(sstable_set_version* ver)
|
||||
: _base_set(ver->_base_set)
|
||||
, _schema(ver->_schema)
|
||||
, _prev(ver) {
|
||||
_prev->_next.insert(this);
|
||||
}
|
||||
|
||||
// Merges changes made in this version into the next version (can be called only when there is a single next version,
|
||||
// and no further changes can be made to this one, i.e. no sstable_list references this version).
|
||||
void sstable_set_version::merge_with_next() noexcept {
|
||||
auto next_version = *_next.begin();
|
||||
next_version->_added = std::move(_added);
|
||||
next_version->_erased = std::move(_erased);
|
||||
auto next_nh = _next.extract(*_next.begin());
|
||||
if (_prev) {
|
||||
_prev->_next.erase(this);
|
||||
_prev->_next.insert(std::move(next_nh));
|
||||
}
|
||||
next_version->_prev = std::move(_prev); // destroys this by overwriting the last reference
|
||||
}
|
||||
|
||||
void sstable_set_version::propagate_inserted_sstable(const shared_sstable& sst) noexcept {
|
||||
if (_reference_count > _next.size()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
return;
|
||||
}
|
||||
for (auto& ver_chck : _next) {
|
||||
if (!ver_chck->_added.contains(sst)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Remove the sstable from child versions and get a node handle to insert in this version
|
||||
auto sst_nh = (*_next.begin())->_added.extract(sst);
|
||||
for (auto& ver_chck : _next) {
|
||||
auto nh = ver_chck->_added.extract(sst_nh.value());
|
||||
if (!nh.empty()) {
|
||||
_base_set->remove(nh.value());
|
||||
}
|
||||
}
|
||||
auto it = _erased.find(sst_nh.value());
|
||||
if (it != _erased.end()) {
|
||||
// If the sstable was erased in this version and added in all its children, its as if it weren't added or inserted in any of them
|
||||
// because we won't read from this version anymore.
|
||||
_erased.erase(it);
|
||||
_base_set->remove(sst_nh.value());
|
||||
} else {
|
||||
auto added_it = _added.insert(std::move(sst_nh)).position;
|
||||
if (_prev) {
|
||||
_prev->propagate_inserted_sstable(*added_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sstable_set_version::propagate_erased_sstable(const shared_sstable& sst) noexcept {
|
||||
if (_reference_count > _next.size()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
return;
|
||||
}
|
||||
for (auto& ver_chck : _next) {
|
||||
if (!ver_chck->_erased.contains(sst)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Remove the sstable from child versions and get a node handle to insert in this version
|
||||
auto sst_nh = (*_next.begin())->_erased.extract(sst);
|
||||
for (auto& ver_chck : _next) {
|
||||
ver_chck->_erased.extract(sst_nh.value());
|
||||
}
|
||||
auto it = _added.find(sst_nh.value());
|
||||
if (it != _added.end()) {
|
||||
// If the sstable was added in this version and erased in all its children, its as if it weren't added or inserted in any of them
|
||||
// because we won't read from this version anymore.
|
||||
_added.erase(it);
|
||||
_base_set->remove(sst_nh.value());
|
||||
} else {
|
||||
auto erased_it = _erased.insert(std::move(sst_nh)).position;
|
||||
if (_prev) {
|
||||
_prev->propagate_erased_sstable(*erased_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called when a reference to the version gets removed - if the reference was from an sstable_list, it's the first time we can propagate any
|
||||
// changes, and if the reference was from another sstable_set_version, we want to check if there were any changes that were present in all
|
||||
// versions based on this one, but absent in the version that was just removed.
|
||||
void sstable_set_version::propagate_changes_from_next_versions() noexcept {
|
||||
if (_reference_count > _next.size() || _next.empty()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
// Or there are no child versions so there is nothing to propagate
|
||||
return;
|
||||
}
|
||||
sstable_set_version* next_ver = *_next.begin();
|
||||
// Propagate additions
|
||||
for (auto ver : _next) {
|
||||
if (ver->_added.size() < next_ver->_added.size()) {
|
||||
next_ver = ver;
|
||||
}
|
||||
}
|
||||
for (auto it = next_ver->_added.begin(); it != next_ver->_added.end();) {
|
||||
auto& sst = *it;
|
||||
it++;
|
||||
propagate_inserted_sstable(sst);
|
||||
}
|
||||
|
||||
next_ver = *_next.begin();
|
||||
// Propagate erasures
|
||||
for (auto ver : _next) {
|
||||
if (ver->_erased.size() < next_ver->_erased.size()) {
|
||||
next_ver = ver;
|
||||
}
|
||||
}
|
||||
for (auto it = next_ver->_erased.begin(); it != next_ver->_erased.end();) {
|
||||
auto& sst = *it;
|
||||
it++;
|
||||
propagate_erased_sstable(sst);
|
||||
}
|
||||
}
|
||||
|
||||
const sstable_set_version* sstable_set_version::get_previous_version() const {
|
||||
return _prev.get();
|
||||
}
|
||||
|
||||
bool sstable_set_version::can_merge_with_next() const noexcept {
|
||||
return _reference_count == 1 && _next.size() == 1;
|
||||
}
|
||||
|
||||
bool sstable_set_version::can_delete() const noexcept {
|
||||
return _reference_count == 0;
|
||||
}
|
||||
|
||||
void sstable_set_version::add_reference() noexcept {
|
||||
_reference_count++;
|
||||
}
|
||||
|
||||
void sstable_set_version::remove_reference() noexcept {
|
||||
_reference_count--;
|
||||
propagate_changes_from_next_versions();
|
||||
}
|
||||
|
||||
schema_ptr sstable_set_version::get_schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> sstable_set_version::select(const dht::partition_range& range) const {
|
||||
return boost::copy_range<std::vector<shared_sstable>>(_base_set->select(range)
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); }));
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> sstable_set_version::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return sstable_run(boost::copy_range<std::unordered_set<shared_sstable>>(_base_set->select_by_run_id(run_id)
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); })));
|
||||
}));
|
||||
}
|
||||
|
||||
const std::map<shared_sstable, unsigned>& sstable_set_version::all() const {
|
||||
return _base_set->sstables_and_times_added;
|
||||
}
|
||||
|
||||
// Provides strong exception guarantee
|
||||
sstable_set_version_reference sstable_set_version::insert(shared_sstable sst) {
|
||||
if (this->contains(sst)) {
|
||||
return get_reference_to_this();
|
||||
}
|
||||
if (_next.size()) {
|
||||
auto sstvr = get_reference_to_new_copy();
|
||||
// The new version has no copies based on it, so inserting into it doesn't create another version
|
||||
return sstvr->insert(sst);
|
||||
}
|
||||
auto it = _erased.find(sst);
|
||||
if (it != _erased.end()) {
|
||||
_erased.erase(it);
|
||||
} else {
|
||||
_base_set->insert(sst);
|
||||
try {
|
||||
_added.insert(sst);
|
||||
if (_prev) {
|
||||
_prev->propagate_inserted_sstable(sst);
|
||||
}
|
||||
} catch (...) {
|
||||
_base_set->remove(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return get_reference_to_this();
|
||||
}
|
||||
|
||||
// Provides strong exception guarantee
|
||||
sstable_set_version_reference sstable_set_version::erase(shared_sstable sst) {
|
||||
if (!this->contains(sst)) {
|
||||
return get_reference_to_this();
|
||||
}
|
||||
if (_next.size()) {
|
||||
auto sstvr = get_reference_to_new_copy();
|
||||
// The new version has no copies based on it, so erasing from it doesn't create another version
|
||||
return sstvr->erase(sst);
|
||||
}
|
||||
auto it = _added.find(sst);
|
||||
if (it != _added.end()) {
|
||||
_added.erase(it);
|
||||
_base_set->remove(sst);
|
||||
} else {
|
||||
_erased.insert(sst);
|
||||
if (_prev) {
|
||||
_prev->propagate_erased_sstable(sst);
|
||||
}
|
||||
}
|
||||
return get_reference_to_this();
|
||||
}
|
||||
|
||||
bool sstable_set_version::contains(shared_sstable sst) const {
|
||||
return _added.contains(sst) || (!_erased.contains(sst) && _prev && _prev->contains(sst));
|
||||
}
|
||||
|
||||
size_t sstable_set_version::size() const {
|
||||
return _added.size() - _erased.size() + (_prev ? _prev->size() : 0);
|
||||
}
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> sstable_set_version::make_incremental_selector() const {
|
||||
return _base_set->impl->make_incremental_selector();
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set_version::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
return _base_set->impl->create_single_key_sstable_reader(cf, std::move(schema),
|
||||
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
sstable_set_version_reference sstable_set_version::get_reference_to_this() {
|
||||
return sstable_set_version_reference(this);
|
||||
}
|
||||
|
||||
sstable_set_version_reference sstable_set_version::get_reference_to_new_copy() {
|
||||
return sstable_set_version_reference(new sstable_set_version(this));
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> bag_sstable_set::clone() const {
|
||||
@@ -171,6 +604,10 @@ void bag_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool bag_sstable_set::empty() const {
|
||||
return _sstables.empty();
|
||||
}
|
||||
|
||||
class bag_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
const std::vector<shared_sstable>& _sstables;
|
||||
public:
|
||||
@@ -299,6 +736,10 @@ void partitioned_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool partitioned_sstable_set::empty() const {
|
||||
return _unleveled_sstables.empty() && _leveled_sstables.empty();
|
||||
}
|
||||
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
@@ -396,6 +837,10 @@ void time_series_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool time_series_sstable_set::empty() const {
|
||||
return _sstables->empty();
|
||||
}
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> time_series_sstable_set::make_incremental_selector() const {
|
||||
struct selector : public incremental_selector_impl {
|
||||
const time_series_sstable_set& _set;
|
||||
@@ -535,16 +980,15 @@ std::unique_ptr<sstable_set_impl> time_window_compaction_strategy::make_sstable_
|
||||
return std::make_unique<time_series_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema);
|
||||
}
|
||||
|
||||
sstable_set
|
||||
compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return sstable_set(
|
||||
_compaction_strategy_impl->make_sstable_set(schema),
|
||||
schema,
|
||||
make_lw_shared<sstable_list>());
|
||||
schema);
|
||||
}
|
||||
|
||||
using sstable_reader_factory_type = std::function<flat_mutation_reader(shared_sstable&, const dht::partition_range& pr)>;
|
||||
@@ -830,7 +1274,7 @@ sstable_set::create_single_key_sstable_reader(
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
assert(pr.is_singular() && pr.start()->value().has_key());
|
||||
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
|
||||
return _all->version().create_single_key_sstable_reader(cf, std::move(schema),
|
||||
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "shared_sstable.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <vector>
|
||||
@@ -34,31 +35,158 @@ class estimated_histogram;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class sstable_set_impl;
|
||||
// This is an implementation of a set of sstables. The sstable_set allows:
|
||||
// - selecting sstables from a given partition_range
|
||||
// - selecting sstables with the same run identifiers
|
||||
// - creating incremental_selectors, used to incrementally select sstables from the set using ring-position
|
||||
// - many utilities of the std::set (inserting, erasing, checking membership, checking emptiness, iterating)
|
||||
// - creating copies
|
||||
|
||||
// The main use of the sstable_set is in the table class. The sstable_set that is stored there needs to be copied
|
||||
// every time it is modified, to allow existing sstable readers to continue using the old version. For this reason
|
||||
// the sstable_set is implemented in a way that allows fast copying.
|
||||
|
||||
// This is achieved by associating each copy of an sstable_set with an sstable_set_version which contains all changes
|
||||
// made to that copy. Each sstable_set_version has a reference to the sstable_set_version associated with the sstable_set
|
||||
// that was copied. We say that the copied version is a parent, and that the copy version is a child, or that it is "based"
|
||||
// on the copied version.
|
||||
// This allows easy checking if an sstable is an element of an sstable_set copy - the answer is yes if the sstable was added
|
||||
// in this copy or if it wasn't erased in it but it was an element of the parent version.
|
||||
// It's worth adding that this solution makes a copied sstable_set immutable. To support modifying a copied sstable_set anyway,
|
||||
// any modification applied to it replaces its associated sstable_set_version with a new one, based on the immutable version.
|
||||
// The new version hasn't been copied, co it can be modified.
|
||||
|
||||
// With the ability to check whether an sstable is an element of an sstable_set, all the methods that select sstables
|
||||
// from the set can follow the same rule: get results that include all sstables from any copy, and filter out those sstables
|
||||
// that aren't elements of current copy. These results are received from the class called sstable_set_data structure.
|
||||
|
||||
// This solution requires a special way of finding out when an sstable should be removed from sstable_set_data. We achieve this
|
||||
// by counting, for each sstable, in how many different versions has it been added. If that number is zero - the sstable should
|
||||
// be completely removed. This number is decreased when deleting versions, when erasing in the same version it was added, and
|
||||
// in one other case, as a result of propagating a change from a versions children to their parent, explained further in following
|
||||
// paragraphs.
|
||||
|
||||
// With this approach, the length of the longest chain of sstable_set_versions that are based on one another should be as
|
||||
// short as possible. To achieve that, we are merging pairs of versions. If a version is not referenced by any sstable_sets or
|
||||
// sstable_lists, we know that it won't be modified or read from, so we can merge its changes into versions that are based on it.
|
||||
// We don't want to copy these changes though, so we wait with merging until there is only one child version.
|
||||
|
||||
// This waiting causes one more problem: an sstable may have been added in a version that has no reference from an sstable_set or
|
||||
// an sstable_list, and it may have been erased in all its child versions. In such scenario, the sstable can't be selected from
|
||||
// any version, so it should be removed from the set completely. To handle such situations, if a change has been made to an
|
||||
// sstable in all children of some version that has no referenece from an sstable_set or sstable_list, the change is removed
|
||||
// in all children and added to the parent instead.
|
||||
// This solution guarantees that when a version is ready for merging, the version it is being merged into contains no changes,
|
||||
// because if it had any, they would be propagated to this version instead. As a result, merging versions is simply reassigning
|
||||
// sets of changes. The number of times an sstable change gets propagated into a parent version is limited by the number of ancestors
|
||||
// of a version.
|
||||
|
||||
class incremental_selector_impl;
|
||||
class sstable_set_impl;
|
||||
|
||||
// Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID.
|
||||
// SStables in that same run will not overlap with one another.
|
||||
class sstable_run {
|
||||
sstable_list _all;
|
||||
std::unordered_set<shared_sstable> _all;
|
||||
public:
|
||||
sstable_run() = default;
|
||||
sstable_run(std::unordered_set<shared_sstable> all) : _all(std::move(all)) {}
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
// Data size of the whole run, meaning it's a sum of the data size of all its fragments.
|
||||
uint64_t data_size() const;
|
||||
const sstable_list& all() const { return _all; }
|
||||
const std::unordered_set<shared_sstable>& all() const { return _all; }
|
||||
};
|
||||
|
||||
// The very base class of an sstable_set. Stores all sstables that were added in
|
||||
// any version of the set. Its methods return supersets of values returned by
|
||||
// any single version.
|
||||
struct sstable_set_data {
|
||||
std::unique_ptr<sstable_set_impl> impl;
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> all_runs;
|
||||
// For each sstable, stores in how many different versions has it been inserted
|
||||
std::map<shared_sstable, unsigned> sstables_and_times_added;
|
||||
|
||||
sstable_set_data(std::unique_ptr<sstable_set_impl> impl);
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
std::unordered_set<shared_sstable> select_by_run_id(utils::UUID run_id) const;
|
||||
void remove(shared_sstable sst);
|
||||
};
|
||||
|
||||
class sstable_set_version;
|
||||
|
||||
// Manages a pointer to an sstable_set_version - the when sstable_set_version gets removed when all
|
||||
// sstable_set_version_references are removed, or when there is only one sstable_set_version_reference
|
||||
// and that reference is owned by another sstable_set_version.
|
||||
// In the second case the data from the managed version is merged into the other version before removal.
|
||||
class sstable_set_version_reference {
|
||||
mutable sstable_set_version* _p = nullptr;
|
||||
|
||||
explicit sstable_set_version_reference(sstable_set_version* p);
|
||||
public:
|
||||
~sstable_set_version_reference();
|
||||
sstable_set_version_reference() = default;
|
||||
sstable_set_version_reference(const sstable_set_version_reference& ref);
|
||||
sstable_set_version_reference(sstable_set_version_reference&&) noexcept;
|
||||
sstable_set_version_reference& operator=(const sstable_set_version_reference& x);
|
||||
sstable_set_version_reference& operator=(sstable_set_version_reference&&) noexcept;
|
||||
sstable_set_version& operator*() const noexcept { return *_p; }
|
||||
sstable_set_version* operator->() const noexcept { return _p; }
|
||||
sstable_set_version* get() const noexcept { return _p; }
|
||||
explicit operator bool() const noexcept { return bool(_p); }
|
||||
friend class sstable_set_version;
|
||||
};
|
||||
|
||||
// The data storage for an sstable_set. Can be used like a std::set<shared_sstable> (although with slightly
|
||||
// costlier operations).
|
||||
class sstable_list {
|
||||
sstable_set_version_reference _version;
|
||||
public:
|
||||
sstable_list(std::unique_ptr<sstable_set_impl> impl, schema_ptr s);
|
||||
sstable_list(const sstable_list& sstl);
|
||||
sstable_list(sstable_list&& sstl) noexcept;
|
||||
sstable_list& operator=(const sstable_list& sstl);
|
||||
sstable_list& operator=(sstable_list&& sstl) noexcept;
|
||||
public:
|
||||
class const_iterator {
|
||||
public:
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using value_type = const shared_sstable;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using pointer = const shared_sstable*;
|
||||
using reference = const shared_sstable&;
|
||||
private:
|
||||
std::map<shared_sstable, unsigned>::const_iterator _it;
|
||||
const sstable_set_version_reference* _ver;
|
||||
void advance();
|
||||
public:
|
||||
const_iterator(std::map<shared_sstable, unsigned>::const_iterator it, const sstable_set_version_reference* ver);
|
||||
const_iterator& operator++();
|
||||
const_iterator operator++(int);
|
||||
const shared_sstable& operator*() const;
|
||||
bool operator==(const const_iterator& it) const;
|
||||
};
|
||||
using iterator = const_iterator;
|
||||
const_iterator begin() const;
|
||||
const_iterator end() const;
|
||||
|
||||
size_t size() const;
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
bool contains(shared_sstable sst) const;
|
||||
bool empty() const;
|
||||
const sstable_set_version& version() const;
|
||||
};
|
||||
|
||||
// A set of sstables associated with a table.
|
||||
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
|
||||
std::unique_ptr<sstable_set_impl> _impl;
|
||||
schema_ptr _schema;
|
||||
// used to support column_family::get_sstable(), which wants to return an sstable_list
|
||||
// that has a reference somewhere
|
||||
lw_shared_ptr<sstable_list> _all;
|
||||
std::unordered_map<utils::UUID, sstable_run> _all_runs;
|
||||
public:
|
||||
~sstable_set();
|
||||
sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all);
|
||||
sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s);
|
||||
sstable_set(const sstable_set&);
|
||||
sstable_set(sstable_set&&) noexcept;
|
||||
sstable_set& operator=(const sstable_set&);
|
||||
@@ -66,7 +194,7 @@ public:
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
|
||||
lw_shared_ptr<sstable_list> all() const { return _all; }
|
||||
lw_shared_ptr<const sstable_list> all() const;
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
|
||||
@@ -79,9 +207,10 @@ public:
|
||||
mutable std::optional<nonwrapping_range<dht::ring_position_view>> _current_range_view;
|
||||
mutable std::vector<shared_sstable> _current_sstables;
|
||||
mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min();
|
||||
lw_shared_ptr<sstable_list> _sstl;
|
||||
public:
|
||||
~incremental_selector();
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s);
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s, lw_shared_ptr<sstable_list> sstl);
|
||||
incremental_selector(incremental_selector&&) noexcept;
|
||||
|
||||
struct selection {
|
||||
@@ -173,7 +302,59 @@ flat_mutation_reader make_restricted_range_sstable_reader(
|
||||
mutation_reader::forwarding,
|
||||
read_monitor_generator& rmg = default_read_monitor_generator());
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
|
||||
class sstable_set_version {
|
||||
// shared by all sstable_set_versions that were based on the same original set
|
||||
lw_shared_ptr<sstable_set_data> _base_set;
|
||||
schema_ptr _schema;
|
||||
sstable_set_version_reference _prev;
|
||||
mutable std::unordered_set<sstable_set_version*> _next;
|
||||
std::unordered_set<shared_sstable> _added;
|
||||
std::unordered_set<shared_sstable> _erased;
|
||||
// is equal to the number of sstable_set_versions based on this version increased by one if there is
|
||||
// an sstable_list that references this version
|
||||
unsigned _reference_count = 0;
|
||||
public:
|
||||
~sstable_set_version();
|
||||
sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema);
|
||||
explicit sstable_set_version(sstable_set_version* ver);
|
||||
private:
|
||||
void propagate_inserted_sstable(const shared_sstable& sst) noexcept;
|
||||
void propagate_erased_sstable(const shared_sstable& sst) noexcept;
|
||||
void propagate_changes_from_next_versions() noexcept;
|
||||
public:
|
||||
const sstable_set_version* get_previous_version() const;
|
||||
bool can_merge_with_next() const noexcept;
|
||||
void merge_with_next() noexcept;
|
||||
bool can_delete() const noexcept;
|
||||
void add_reference() noexcept;
|
||||
void remove_reference() noexcept;
|
||||
schema_ptr get_schema() const;
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
|
||||
const std::map<shared_sstable, unsigned>& all() const;
|
||||
sstable_set_version_reference insert(shared_sstable sst);
|
||||
sstable_set_version_reference erase(shared_sstable sst);
|
||||
bool contains(shared_sstable sst) const;
|
||||
size_t size() const;
|
||||
std::unique_ptr<incremental_selector_impl> make_incremental_selector() const;
|
||||
flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const;
|
||||
public:
|
||||
sstable_set_version_reference get_reference_to_this();
|
||||
sstable_set_version_reference get_reference_to_new_copy();
|
||||
};
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
|
||||
virtual void insert(shared_sstable sst) = 0;
|
||||
virtual void erase(shared_sstable sst) = 0;
|
||||
virtual bool empty() const = 0;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
|
||||
|
||||
virtual flat_mutation_reader create_single_key_sstable_reader(
|
||||
@@ -65,6 +66,7 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
@@ -103,6 +105,7 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
@@ -124,6 +127,7 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
|
||||
std::unique_ptr<position_reader_queue> make_min_position_reader_queue(
|
||||
|
||||
20
table.cc
20
table.cc
@@ -718,21 +718,17 @@ void table::rebuild_statistics() {
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
table::build_new_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema);
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
||||
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) {
|
||||
if (!s.contains(tab)) {
|
||||
new_sstable_list.insert(tab);
|
||||
}
|
||||
auto new_sstable_set = *_sstables;
|
||||
for (auto& tab : new_sstables) {
|
||||
new_sstable_set.insert(tab);
|
||||
co_await make_ready_future<>(); // yield if needed.
|
||||
}
|
||||
co_return make_lw_shared<sstables::sstable_set>(std::move(new_sstable_list));
|
||||
for (auto& tab : old_sstables) {
|
||||
new_sstable_set.erase(tab);
|
||||
co_await make_ready_future<>(); // yield if needed.
|
||||
}
|
||||
co_return make_lw_shared<sstables::sstable_set>(std::move(new_sstable_set));
|
||||
}
|
||||
|
||||
// Note: must run in a seastar thread
|
||||
|
||||
638
test/boost/sstable_set_test.cc
Normal file
638
test/boost/sstable_set_test.cc
Normal file
@@ -0,0 +1,638 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <random>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/align.hh>
|
||||
#include <seastar/core/aligned_buffer.hh>
|
||||
#include "sstables/compaction.hh"
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "test/boost/sstable_test.hh"
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include "sstables/compaction_strategy_impl.hh"
|
||||
#include "sstables/date_tiered_compaction_strategy.hh"
|
||||
#include "sstables/time_window_compaction_strategy.hh"
|
||||
#include "sstables/leveled_compaction_strategy.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "sstables/sstable_set_impl.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static const sstring some_keyspace("ks");
|
||||
static const sstring some_column_family("cf");
|
||||
|
||||
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) {
|
||||
auto sst = env.make_sstable(schema, "", gen, la, big);
|
||||
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key));
|
||||
return sst;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(simple_versioned_sstable_set_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
auto decorated_keys = boost::copy_range<std::vector<dht::decorated_key>>(
|
||||
key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair<sstring, dht::token>& key_and_token) {
|
||||
auto value = bytes(reinterpret_cast<const signed char*>(key_and_token.first.data()), key_and_token.first.size());
|
||||
auto pk = sstables::key::from_bytes(value).to_partition_key(*s);
|
||||
return dht::decorate_key(*s, std::move(pk));
|
||||
}));
|
||||
struct snapshot_and_selections {
|
||||
std::optional<sstable_set> ssts;
|
||||
std::vector<std::unordered_set<shared_sstable>> selections;
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> runs;
|
||||
std::unordered_set<shared_sstable> all;
|
||||
snapshot_and_selections(sstable_set&& ssts, const std::vector<std::unordered_set<shared_sstable>>& selections,
|
||||
const std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>>& runs, const std::unordered_set<shared_sstable>& all)
|
||||
: ssts(std::move(ssts)), selections(selections), runs(runs), all(all) { }
|
||||
};
|
||||
auto check = [&decorated_keys] (snapshot_and_selections& version) {
|
||||
for (int j = 0; j < 8; j++) {
|
||||
auto sel = version.ssts->select(dht::partition_range::make_singular(decorated_keys[j]));
|
||||
BOOST_REQUIRE_EQUAL(sel.size(), version.selections[j].size());
|
||||
for (auto& sst : sel) {
|
||||
BOOST_REQUIRE(version.selections[j].contains(sst));
|
||||
}
|
||||
}
|
||||
for (auto& [uuid, run] : version.runs) {
|
||||
if (run.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<sstable_run> runs = version.ssts->select_sstable_runs({*run.begin()});
|
||||
// only one sstable -> only one run
|
||||
BOOST_REQUIRE_EQUAL(runs[0].all().size(), run.size());
|
||||
for (auto& sst : runs[0].all()) {
|
||||
BOOST_REQUIRE(run.contains(sst));
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(version.ssts->all()->size(), version.all.size());
|
||||
for (auto& sst : *version.ssts->all()) {
|
||||
BOOST_REQUIRE(version.all.contains(sst));
|
||||
}
|
||||
};
|
||||
// check that selecting from older snapshots of an sstable_set gives correct results.
|
||||
{
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::vector<std::unordered_set<shared_sstable>> current_selections(8);
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> current_runs;
|
||||
std::unordered_set<shared_sstable> current_all;
|
||||
|
||||
std::vector<snapshot_and_selections> versions;
|
||||
versions.reserve(20);
|
||||
int token = 0;
|
||||
for (int i = 0; i < 20; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1);
|
||||
set->insert(sst);
|
||||
current_selections[token].insert(sst);
|
||||
current_runs[sst->run_identifier()].insert(sst);
|
||||
current_all.insert(sst);
|
||||
++token %= 8;
|
||||
}
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto sst = *set->all()->begin();
|
||||
set->erase(sst);
|
||||
for (auto& sel : current_selections) {
|
||||
// actually erases only from one
|
||||
sel.erase(sst);
|
||||
}
|
||||
current_runs[sst->run_identifier()].erase(sst);
|
||||
current_all.erase(sst);
|
||||
}
|
||||
versions.emplace_back(std::move(*set), current_selections, current_runs, current_all);
|
||||
set = versions.back().ssts;
|
||||
}
|
||||
|
||||
for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) {
|
||||
check(versions[i]);
|
||||
// by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots
|
||||
versions[i].ssts.reset();
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_list_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
auto decorated_keys = boost::copy_range<std::vector<dht::decorated_key>>(
|
||||
key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair<sstring, dht::token>& key_and_token) {
|
||||
auto value = bytes(reinterpret_cast<const signed char*>(key_and_token.first.data()), key_and_token.first.size());
|
||||
auto pk = sstables::key::from_bytes(value).to_partition_key(*s);
|
||||
return dht::decorate_key(*s, std::move(pk));
|
||||
}));
|
||||
|
||||
|
||||
struct list_and_sstables {
|
||||
std::optional<sstable_list> list;
|
||||
std::unordered_set<shared_sstable> sstables_in_list;
|
||||
|
||||
list_and_sstables(sstable_list&& sstl, std::unordered_set<shared_sstable> sstables_in_list)
|
||||
: list(std::move(sstl)), sstables_in_list(sstables_in_list) { }
|
||||
};
|
||||
auto check = [] (list_and_sstables& version) {
|
||||
BOOST_REQUIRE_EQUAL(version.list->size(), version.sstables_in_list.size());
|
||||
for (auto& sst : *version.list) {
|
||||
BOOST_REQUIRE(version.list->contains(sst));
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::optional<sstable_list> l = *set->all();
|
||||
std::unordered_set<shared_sstable> sstables_in_list;
|
||||
|
||||
|
||||
std::vector<list_and_sstables> versions;
|
||||
versions.reserve(20);
|
||||
int token = 0;
|
||||
for (int i = 0; i < 20; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1);
|
||||
l->insert(sst);
|
||||
sstables_in_list.insert(sst);
|
||||
++token %= 8;
|
||||
}
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto sst = *l->begin();
|
||||
l->erase(sst);
|
||||
sstables_in_list.erase(sst);
|
||||
}
|
||||
versions.emplace_back(std::move(*l), sstables_in_list);
|
||||
l = versions.back().list;
|
||||
}
|
||||
|
||||
for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) {
|
||||
check(versions[i]);
|
||||
// by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots
|
||||
versions[i].list.reset();
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_version_merging_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(2);
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::optional<sstable_list> list = *set->all();
|
||||
// set -> list
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version());
|
||||
std::optional<sstable_list> list2 = *set->all();
|
||||
// set -> list
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list2->version().get_previous_version());
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
set->insert(sst);
|
||||
// set' -> list
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version());
|
||||
BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version());
|
||||
sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[1].first, key_and_token_pair[1].first, 1);
|
||||
set->insert(sst);
|
||||
// set' -> list
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version());
|
||||
BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version());
|
||||
std::optional<sstable_list> list3 = list;
|
||||
// set' -> list -> list3
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(&list->version(), list3->version().get_previous_version());
|
||||
list.reset();
|
||||
// set' -> list3
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list3->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version());
|
||||
set.reset();
|
||||
// set' -> list3
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version());
|
||||
std::optional<sstable_list> list4 = list3;
|
||||
std::optional<sstable_list> list5 = list3;
|
||||
// set' -> list3 -> list4
|
||||
// -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(&list3->version(), list4->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(&list3->version(), list5->version().get_previous_version());
|
||||
list3.reset();
|
||||
// set' -> list3' -> list4
|
||||
// -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_NE(list2->version().get_previous_version(), list4->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(list4->version().get_previous_version(), list5->version().get_previous_version());
|
||||
list4.reset();
|
||||
// set' -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list5->version().get_previous_version());
|
||||
list2.reset();
|
||||
// list5
|
||||
BOOST_REQUIRE_EQUAL(nullptr, list5->version().get_previous_version());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_erased_in_last_descendant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
set5->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_ancestor_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
// set -> set2
|
||||
set2->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set4->erase(sst);
|
||||
set5->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set3.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_descendant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set3.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set4.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set5.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
class simple_sstable_set {
|
||||
std::unique_ptr<sstable_set_impl> _impl;
|
||||
schema_ptr _schema;
|
||||
// used to support column_family::get_sstable(), which wants to return an sstable_list
|
||||
// that has a reference somewhere
|
||||
lw_shared_ptr<std::unordered_set<shared_sstable>> _all;
|
||||
std::unordered_map<utils::UUID, sstable_run> _all_runs;
|
||||
public:
|
||||
~simple_sstable_set() = default;
|
||||
|
||||
simple_sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(schema))
|
||||
, _all(make_lw_shared<std::unordered_set<shared_sstable>>()) {
|
||||
}
|
||||
|
||||
simple_sstable_set(const simple_sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<std::unordered_set<shared_sstable>>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
simple_sstable_set(simple_sstable_set&& x) noexcept = default;
|
||||
|
||||
simple_sstable_set& operator=(const simple_sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = simple_sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
simple_sstable_set& operator=(simple_sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
lw_shared_ptr<std::unordered_set<shared_sstable>> all() const { return _all; }
|
||||
|
||||
void insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
_all->insert(sst);
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
}
|
||||
|
||||
void erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_random_walk_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto rand = std::default_random_engine();
|
||||
auto op_gen = std::uniform_int_distribution<unsigned>(0, 7);
|
||||
auto nr_dist = std::geometric_distribution<size_t>(0.7);
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto leveled_cs = leveled_compaction_strategy(s->compaction_strategy_options());
|
||||
std::vector<sstable_set> sstable_sets;
|
||||
std::vector<simple_sstable_set> simple_sstable_sets;
|
||||
sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s);
|
||||
simple_sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1000);
|
||||
std::vector<shared_sstable> sstables(1000);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
sstables[i] = sstable_for_overlapping_test(env, s, i, key_and_token_pair[i].first, key_and_token_pair[i].first, i);
|
||||
}
|
||||
for (auto i = 0; i != 100000; i++) {
|
||||
auto op = op_gen(rand);
|
||||
auto u = std::uniform_int_distribution<size_t>(0, sstable_sets.size() - 1);
|
||||
auto idx = u(rand);
|
||||
switch (op) {
|
||||
case 0: {
|
||||
// delete
|
||||
if (sstable_sets.size() > 1) {
|
||||
sstable_sets.erase(sstable_sets.begin() + idx);
|
||||
simple_sstable_sets.erase(simple_sstable_sets.begin() + idx);
|
||||
break;
|
||||
}
|
||||
// if we can't remove the version, let's create one
|
||||
[[fallthrough]];
|
||||
}
|
||||
case 1: {
|
||||
// copy
|
||||
if (sstable_sets.size() < 100) {
|
||||
sstable_sets.emplace_back(sstable_sets[idx]);
|
||||
simple_sstable_sets.emplace_back(simple_sstable_sets[idx]);
|
||||
for (auto& sst : *simple_sstable_sets.back().all()) {
|
||||
BOOST_REQUIRE(sstable_sets.back().all()->contains(sst));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// modify
|
||||
auto sst_u = std::uniform_int_distribution<size_t>(0, 999);
|
||||
auto sst_idx = sst_u(rand);
|
||||
if (simple_sstable_sets[idx].all()->contains(sstables[sst_idx])) {
|
||||
sstable_sets[idx].erase(sstables[sst_idx]);
|
||||
simple_sstable_sets[idx].erase(sstables[sst_idx]);
|
||||
BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
} else {
|
||||
sstable_sets[idx].insert(sstables[sst_idx]);
|
||||
simple_sstable_sets[idx].insert(sstables[sst_idx]);
|
||||
BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
}
|
||||
}
|
||||
for (int j = 0; j < sstable_sets.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(sstable_sets[j].all()->size(), simple_sstable_sets[j].all()->size());
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
@@ -37,40 +37,6 @@
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <array>
|
||||
|
||||
constexpr auto la = sstables::sstable::version_types::la;
|
||||
constexpr auto big = sstables::sstable::format_types::big;
|
||||
|
||||
class column_family_test {
|
||||
lw_shared_ptr<column_family> _cf;
|
||||
public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::shared_sstable sstable) {
|
||||
_cf->_sstables->insert(std::move(sstable));
|
||||
}
|
||||
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
}
|
||||
|
||||
static void update_sstables_known_generation(column_family& cf, unsigned generation) {
|
||||
cf.update_sstables_known_generation(generation);
|
||||
}
|
||||
|
||||
static uint64_t calculate_generation_for_new_table(column_family& cf) {
|
||||
return cf.calculate_generation_for_new_table();
|
||||
}
|
||||
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t generation) {
|
||||
return column_family::calculate_shard_from_sstable_generation(generation);
|
||||
}
|
||||
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> mt) {
|
||||
return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional());
|
||||
}
|
||||
};
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
@@ -31,6 +31,41 @@
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
constexpr auto la = sstables::sstable::version_types::la;
|
||||
constexpr auto big = sstables::sstable::format_types::big;
|
||||
|
||||
class column_family_test {
|
||||
lw_shared_ptr<column_family> _cf;
|
||||
public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::shared_sstable sstable) {
|
||||
_cf->_sstables->insert(std::move(sstable));
|
||||
}
|
||||
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
}
|
||||
|
||||
static void update_sstables_known_generation(column_family& cf, unsigned generation) {
|
||||
cf.update_sstables_known_generation(generation);
|
||||
}
|
||||
|
||||
static uint64_t calculate_generation_for_new_table(column_family& cf) {
|
||||
return cf.calculate_generation_for_new_table();
|
||||
}
|
||||
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t generation) {
|
||||
return column_family::calculate_shard_from_sstable_generation(generation);
|
||||
}
|
||||
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> mt) {
|
||||
return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional());
|
||||
}
|
||||
};
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class test_env_sstables_manager : public sstables_manager {
|
||||
|
||||
111
test/perf/perf_sstable_set.cc
Normal file
111
test/perf/perf_sstable_set.cc
Normal file
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "database.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/perf/perf.hh"
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "test/lib/sstable_test_env.hh"
|
||||
#include <iostream>
|
||||
|
||||
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) {
|
||||
auto sst = env.make_sstable(schema, "", gen, la, big);
|
||||
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key));
|
||||
return sst;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
app_template app;
|
||||
return app.run(argc, argv, [&app] {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
using namespace std::chrono;
|
||||
using namespace std::chrono_literals;
|
||||
auto start = high_resolution_clock::now();
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg;
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled);
|
||||
|
||||
constexpr int ssts_in_level_0 = 10;
|
||||
std::function<int(int)> idx_to_level = [&] (int i) {
|
||||
if (i < ssts_in_level_0) {
|
||||
return 0;
|
||||
}
|
||||
return 1 + idx_to_level((i - ssts_in_level_0) / 10 + ssts_in_level_0 - 1);
|
||||
};
|
||||
auto level_to_size = [] (int level) {
|
||||
if (level == 0) {
|
||||
return ssts_in_level_0;
|
||||
}
|
||||
return int(pow(10, level));
|
||||
};
|
||||
|
||||
auto kt_pair = token_generation_for_current_shard(1120);
|
||||
auto min_max_keys = [&kt_pair, &level_to_size] (auto level, auto pos_in_level) -> std::pair<sstring, sstring> {
|
||||
auto last_key_idx = kt_pair.size() - 1;
|
||||
if (level == 0) {
|
||||
return { kt_pair[0].first, kt_pair[last_key_idx].first };
|
||||
}
|
||||
auto total_ranges = kt_pair.size();
|
||||
auto level_size_in_ssts = level_to_size(level);
|
||||
unsigned ranges_per_sst = std::max(1U, unsigned(floor(float(total_ranges) / level_size_in_ssts)));
|
||||
sstring min_key = kt_pair.at(pos_in_level).first;
|
||||
sstring max_key = kt_pair.at(std::min(pos_in_level + ranges_per_sst - 1, unsigned(last_key_idx))).first;
|
||||
return {min_key, max_key};
|
||||
};
|
||||
|
||||
std::vector<shared_sstable> inputs[3], outputs[3];
|
||||
|
||||
std::array<unsigned, 9> pos_in_levels{0};
|
||||
pos_in_levels.fill(0);
|
||||
auto start2 = high_resolution_clock::now();
|
||||
for (auto i = 0; i < 1120; i++) {
|
||||
auto level = idx_to_level(i);
|
||||
auto [min, max] = min_max_keys(level, pos_in_levels[level]++);
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, min, max, uint32_t(level));
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
if (level >= 1 && pos_in_levels[level] < 10) {
|
||||
inputs[level-1].push_back(sst);
|
||||
}
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
|
||||
for (auto i = 0; i < 30; i++) {
|
||||
auto [min, max] = min_max_keys(1 + i / 10, i % 10);
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, min, max, 1 + i / 10);
|
||||
outputs[i / 10].push_back(sst);
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
for (auto i = 0; i < 3; i++) {
|
||||
auto t1 = high_resolution_clock::now();
|
||||
column_family_test(cf).rebuild_sstable_list(outputs[i], inputs[i]);
|
||||
auto t2 = high_resolution_clock::now();
|
||||
std::cout << "Replacing 10 L" << i + 1 <<" sstables took "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << "ms to complete\n";
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user