diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh new file mode 100644 index 0000000000..2d6e496981 --- /dev/null +++ b/db/size_estimates_virtual_reader.hh @@ -0,0 +1,281 @@ +/* + * Copyright (C) 2016 ScyllaDB + * + * Modified by ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include +#include +#include + +#include "clustering_bounds_comparator.hh" +#include "database.hh" +#include "db/system_keyspace.hh" +#include "dht/i_partitioner.hh" +#include "mutation_reader.hh" +#include "partition_range_compat.hh" +#include "range.hh" +#include "service/storage_service.hh" +#include "stdx.hh" +#include "streamed_mutation.hh" + +namespace db { + +namespace size_estimates { + +class size_estimates_mutation_reader final : public mutation_reader::impl { + struct token_range { + bytes start; + bytes end; + }; + schema_ptr _schema; + const query::partition_range& _prange; + const query::partition_slice& _slice; + using ks_range = std::vector; + stdx::optional _keyspaces; + ks_range::const_iterator _current_partition; +public: + size_estimates_mutation_reader(schema_ptr schema, const query::partition_range& prange, const query::partition_slice& slice) + : _schema(schema) + , _prange(prange) + , _slice(slice) + { } + + virtual future operator()() override { + // For each specified range, estimate (crudely) mean partition size and partitions count. + auto& db = service::get_local_storage_proxy().get_db().local(); + if (!_keyspaces) { + _keyspaces = get_keyspaces(*_schema, db, _prange); + _current_partition = _keyspaces->begin(); + } + if (_current_partition == _keyspaces->end()) { + return make_ready_future(); + } + return get_local_ranges().then([&db, this] (auto&& ranges) { + auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges)); + auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates)); + ++_current_partition; + return streamed_mutation_opt(streamed_mutation_from_mutation(std::move(mutations))); + }); + } + /** + * Returns the primary ranges for the local node. + * Used for testing as well. + */ + static future> get_local_ranges() { + auto& ss = service::get_local_storage_service(); + return ss.get_local_tokens().then([&ss] (auto&& tokens) { + auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens)); + std::vector local_ranges; + auto to_bytes = [](const stdx::optional::bound>& b) { + assert(b); + return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value())); + }; + // We merge the ranges to be compatible with how Cassandra shows it's size estimates table. + // All queries will be on that table, where all entries are text and there's no notion of + // token ranges form the CQL point of view. + auto left_inf = boost::find_if(ranges, [] (auto&& r) { + return !r.start() || r.start()->value() == dht::minimum_token(); + }); + auto right_inf = boost::find_if(ranges, [] (auto&& r) { + return !r.end() || r.start()->value() == dht::maximum_token(); + }); + if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) { + local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())}); + ranges.erase(left_inf); + ranges.erase(right_inf); + } + for (auto&& r : ranges) { + local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())}); + } + boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) { + return utf8_type->less(tr1.start, tr2.start); + }); + return local_ranges; + }); + } +private: + struct virtual_row { + const bytes& cf_name; + const token_range& tokens; + clustering_key_prefix as_key() const { + return clustering_key_prefix::from_exploded(std::vector{cf_name, tokens.start, tokens.end}); + } + }; + struct virtual_row_comparator { + schema_ptr _schema; + virtual_row_comparator(schema_ptr schema) : _schema(schema) { } + bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) { + return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2); + } + bool operator()(const virtual_row& row, const clustering_key_prefix& key) { + return operator()(row.as_key(), key); + } + bool operator()(const clustering_key_prefix& key, const virtual_row& row) { + return operator()(key, row.as_key()); + } + }; + class virtual_row_iterator : public std::iterator { + std::reference_wrapper> _cf_names; + std::reference_wrapper> _ranges; + size_t _cf_names_idx = 0; + size_t _ranges_idx = 0; + public: + struct end_iterator_tag {}; + virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges) + : _cf_names(ref(cf_names)) + , _ranges(ref(ranges)) + { } + virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges, end_iterator_tag) + : _cf_names(ref(cf_names)) + , _ranges(ref(ranges)) + , _cf_names_idx(cf_names.size()) + , _ranges_idx(ranges.size()) + { } + virtual_row_iterator& operator++() { + if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) { + _ranges_idx = 0; + } + return *this; + } + virtual_row_iterator operator++(int) { + virtual_row_iterator i(*this); + ++(*this); + return i; + } + const value_type operator*() const { + return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] }; + } + bool operator==(const virtual_row_iterator& i) const { + return _cf_names_idx == i._cf_names_idx + && _ranges_idx == i._ranges_idx; + } + bool operator!=(const virtual_row_iterator& i) const { + return !(*this == i); + } + }; + + std::vector + estimates_for_current_keyspace(const database& db, std::vector local_ranges) const { + auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition)); + auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data(); + auto cf_names = boost::copy_range>(cfs | boost::adaptors::transformed([] (auto&& cf) { + return utf8_type->decompose(cf.first); + })); + boost::sort(cf_names, [] (auto&& n1, auto&& n2) { + return utf8_type->less(n1, n2); + }); + std::vector estimates; + for (auto& range : _slice.row_ranges(*_schema, pkey)) { + auto rows = boost::make_iterator_range( + virtual_row_iterator(cf_names, local_ranges), + virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag())); + auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema)); + for (auto&& r : rows_to_estimate) { + auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name)); + estimates.push_back(estimate(cf, r.tokens)); + if (estimates.size() >= _slice.partition_row_limit()) { + return estimates; + } + } + } + return estimates; + } + + /** + * Returns the keyspaces, ordered by name, as selected by the partition_range. + */ + static ks_range get_keyspaces(const schema& s, const database& db, query::partition_range range) { + struct keyspace_less_comparator { + const schema& _s; + keyspace_less_comparator(const schema& s) : _s(s) { } + dht::ring_position as_ring_position(const sstring& ks) { + auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks)); + return dht::global_partitioner().decorate_key(_s, std::move(pkey)); + } + bool operator()(const sstring& ks1, const sstring& ks2) { + return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2)); + } + bool operator()(const sstring& ks, const dht::ring_position& rp) { + return as_ring_position(ks).less_compare(_s, rp); + } + bool operator()(const dht::ring_position& rp, const sstring& ks) { + return rp.less_compare(_s, as_ring_position(ks)); + } + }; + auto keyspaces = db.get_non_system_keyspaces(); + auto cmp = keyspace_less_comparator(s); + boost::sort(keyspaces, cmp); + return boost::copy_range(range.slice(keyspaces, std::move(cmp))); + } + + /** + * Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables. + */ + static nonwrapping_range as_ring_position_range(nonwrapping_range& r) { + stdx::optional::bound> start_bound, end_bound; + if (r.start()) { + start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }}; + } + if (r.end()) { + end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }}; + } + return nonwrapping_range(std::move(start_bound), std::move(end_bound), r.is_singular()); + } + + /** + * Add a new range_estimates for the specified range, considering the sstables associated with `cf`. + */ + static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) { + int64_t count{0}; + utils::estimated_histogram hist{0}; + auto from_bytes = [] (auto& b) { + return dht::global_partitioner().from_sstring(utf8_type->to_string(b)); + }; + std::vector> ranges; + compat::unwrap_into( + wrapping_range({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}), + dht::token_comparator(), + [&] (auto&& rng) { ranges.push_back(std::move(rng)); }); + for (auto&& r : ranges) { + auto rp_range = as_ring_position_range(r); + for (auto&& sstable : cf.select_sstables(rp_range)) { + count += sstable->estimated_keys_for_range(r); + hist.merge(sstable->get_stats_metadata().estimated_row_size); + } + } + return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0}; + } +}; + +struct virtual_reader { + mutation_reader operator()(schema_ptr schema, + const query::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state) { + return make_mutation_reader(schema, range, slice); + } +}; + +} // namespace size_estimates + +} // namespace db diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index f22820efa9..0a956cf203 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -70,6 +70,7 @@ #include "service/storage_proxy.hh" #include "message/messaging_service.hh" #include "mutation_query.hh" +#include "db/size_estimates_virtual_reader.hh" using days = std::chrono::duration>; @@ -1022,6 +1023,12 @@ std::vector all_tables() { return r; } +static void maybe_add_virtual_reader(schema_ptr s, database& db) { + if (s.get() == size_estimates().get()) { + db.find_column_family(s).set_virtual_reader(db::size_estimates::virtual_reader()); + } +} + void make(database& db, bool durable, bool volatile_testing_only) { auto ksm = make_lw_shared(NAME, "org.apache.cassandra.locator.LocalStrategy", @@ -1046,6 +1053,7 @@ void make(database& db, bool durable, bool volatile_testing_only) { auto& ks = db.find_keyspace(NAME); for (auto&& table : all_tables()) { db.add_column_family(table, ks.make_column_family_config(*table, db.get_config())); + maybe_add_virtual_reader(table, db); } }