diff --git a/database.cc b/database.cc index b2c193c3b7..6c338916bd 100644 --- a/database.cc +++ b/database.cc @@ -127,8 +127,8 @@ column_family::make_partition_presence_checker(lw_shared_ptr old_s mutation_source column_family::sstables_as_mutation_source() { - return [this] (const query::partition_range& r) { - return make_sstable_reader(r); + return [this] (schema_ptr s, const query::partition_range& r) { + return make_sstable_reader(std::move(s), r); }; } @@ -207,16 +207,16 @@ public: }; mutation_reader -column_family::make_sstable_reader(const query::partition_range& pr) const { +column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr) const { if (pr.is_singular() && pr.start()->value().has_key()) { const dht::ring_position& pos = pr.start()->value(); if (dht::shard_of(pos.token()) != engine().cpu_id()) { return make_empty_reader(); // range doesn't belong to this shard } - return make_mutation_reader(_schema, _sstables, *pos.key()); + return make_mutation_reader(std::move(s), _sstables, *pos.key()); } else { // range_sstable_reader is not movable so we need to wrap it - return make_mutation_reader(_schema, _sstables, pr); + return make_mutation_reader(std::move(s), _sstables, pr); } } @@ -240,9 +240,9 @@ key_source column_family::sstables_as_key_source() const { // Exposed for testing, not performance critical. future -column_family::find_partition(const dht::decorated_key& key) const { - return do_with(query::partition_range::make_singular(key), [this] (auto& range) { - return do_with(this->make_reader(range), [] (mutation_reader& reader) { +column_family::find_partition(schema_ptr s, const dht::decorated_key& key) const { + return do_with(query::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) { + return do_with(this->make_reader(s, range), [] (mutation_reader& reader) { return reader().then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; @@ -254,13 +254,13 @@ column_family::find_partition(const dht::decorated_key& key) const { } future -column_family::find_partition_slow(const partition_key& key) const { - return find_partition(dht::global_partitioner().decorate_key(*_schema, key)); +column_family::find_partition_slow(schema_ptr s, const partition_key& key) const { + return find_partition(s, dht::global_partitioner().decorate_key(*s, key)); } future -column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const { - return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { +column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const { + return find_partition(std::move(s), partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { if (!p) { return make_ready_future(); } @@ -275,8 +275,8 @@ column_family::find_row(const dht::decorated_key& partition_key, clustering_key } mutation_reader -column_family::make_reader(const query::partition_range& range) const { - if (query::is_wrap_around(range, *_schema)) { +column_family::make_reader(schema_ptr s, const query::partition_range& range) const { + if (query::is_wrap_around(range, *s)) { // make_combined_reader() can't handle streams that wrap around yet. fail(unimplemented::cause::WRAP_AROUND); } @@ -305,13 +305,13 @@ column_family::make_reader(const query::partition_range& range) const { // https://github.com/scylladb/scylla/issues/185 for (auto&& mt : *_memtables) { - readers.emplace_back(mt->make_reader(range)); + readers.emplace_back(mt->make_reader(s, range)); } if (_config.enable_cache) { - readers.emplace_back(_cache.make_reader(range)); + readers.emplace_back(_cache.make_reader(s, range)); } else { - readers.emplace_back(make_sstable_reader(range)); + readers.emplace_back(make_sstable_reader(s, range)); } return make_combined_reader(std::move(readers)); @@ -319,7 +319,7 @@ column_family::make_reader(const query::partition_range& range) const { template future -column_family::for_all_partitions(Func&& func) const { +column_family::for_all_partitions(schema_ptr s, Func&& func) const { static_assert(std::is_same>::value, "bad Func signature"); @@ -330,13 +330,13 @@ column_family::for_all_partitions(Func&& func) const { bool empty = false; public: bool done() const { return !ok || empty; } - iteration_state(const column_family& cf, Func&& func) - : reader(cf.make_reader()) + iteration_state(schema_ptr s, const column_family& cf, Func&& func) + : reader(cf.make_reader(std::move(s))) , func(std::move(func)) { } }; - return do_with(iteration_state(*this, std::move(func)), [] (iteration_state& is) { + return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) { return do_until([&is] { return is.done(); }, [&is] { return is.reader().then([&is](mutation_opt&& mo) { if (!mo) { @@ -352,8 +352,8 @@ column_family::for_all_partitions(Func&& func) const { } future -column_family::for_all_partitions_slow(std::function func) const { - return for_all_partitions(std::move(func)); +column_family::for_all_partitions_slow(schema_ptr s, std::function func) const { + return for_all_partitions(std::move(s), std::move(func)); } class lister { @@ -1486,13 +1486,17 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { } struct query_state { - explicit query_state(const query::read_command& cmd, const std::vector& ranges) - : cmd(cmd) + explicit query_state(schema_ptr s, + const query::read_command& cmd, + const std::vector& ranges) + : schema(std::move(s)) + , cmd(cmd) , builder(cmd.slice) , limit(cmd.row_limit) , current_partition_range(ranges.begin()) , range_end(ranges.end()){ } + schema_ptr schema; const query::read_command& cmd; query::result::builder builder; uint32_t limit; @@ -1506,21 +1510,21 @@ struct query_state { }; future> -column_family::query(const query::read_command& cmd, const std::vector& partition_ranges) { +column_family::query(schema_ptr s, const query::read_command& cmd, const std::vector& partition_ranges) { utils::latency_counter lc; _stats.reads.set_latency(lc); - return do_with(query_state(cmd, partition_ranges), [this] (query_state& qs) { + return do_with(query_state(std::move(s), cmd, partition_ranges), [this] (query_state& qs) { return do_until(std::bind(&query_state::done, &qs), [this, &qs] { auto&& range = *qs.current_partition_range++; - qs.reader = make_reader(range); + qs.reader = make_reader(qs.schema, range); qs.range_empty = false; - return do_until([&qs] { return !qs.limit || qs.range_empty; }, [this, &qs] { - return qs.reader().then([this, &qs](mutation_opt mo) { + return do_until([&qs] { return !qs.limit || qs.range_empty; }, [&qs] { + return qs.reader().then([&qs](mutation_opt mo) { if (mo) { auto p_builder = qs.builder.add_partition(*mo->schema(), mo->key()); auto is_distinct = qs.cmd.slice.options.contains(query::partition_slice::option::distinct); auto limit = !is_distinct ? qs.limit : 1; - mo->partition().query(p_builder, *_schema, qs.cmd.timestamp, limit); + mo->partition().query(p_builder, *qs.schema, qs.cmd.timestamp, limit); qs.limit -= p_builder.row_count(); } else { qs.range_empty = true; @@ -1541,21 +1545,21 @@ column_family::query(const query::read_command& cmd, const std::vectormake_reader(range); + return [this] (schema_ptr s, const query::partition_range& range) { + return this->make_reader(std::move(s), range); }; } future> -database::query(const query::read_command& cmd, const std::vector& ranges) { +database::query(schema_ptr s, const query::read_command& cmd, const std::vector& ranges) { column_family& cf = find_column_family(cmd.cf_id); - return cf.query(cmd, ranges); + return cf.query(std::move(s), cmd, ranges); } future -database::query_mutations(const query::read_command& cmd, const query::partition_range& range) { +database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) { column_family& cf = find_column_family(cmd.cf_id); - return mutation_query(cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp); + return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp); } std::unordered_set database::get_initial_tokens() { diff --git a/database.hh b/database.hh index b993b771dc..5ba9795ac3 100644 --- a/database.hh +++ b/database.hh @@ -189,7 +189,8 @@ private: // Creates a mutation reader which covers sstables. // Caller needs to ensure that column_family remains live (FIXME: relax this). // The 'range' parameter must be live as long as the reader is used. - mutation_reader make_sstable_reader(const query::partition_range& range) const; + // Mutations returned by the reader will all have given schema. + mutation_reader make_sstable_reader(schema_ptr schema, const query::partition_range& range) const; mutation_source sstables_as_mutation_source(); key_source sstables_as_key_source() const; @@ -200,7 +201,8 @@ public: // Caller needs to ensure that column_family remains live (FIXME: relax this). // Note: for data queries use query() instead. // The 'range' parameter must be live as long as the reader is used. - mutation_reader make_reader(const query::partition_range& range = query::full_partition_range) const; + // Mutations returned by the reader will all have given schema. + mutation_reader make_reader(schema_ptr schema, const query::partition_range& range = query::full_partition_range) const; mutation_source as_mutation_source() const; @@ -227,16 +229,18 @@ public: ~column_family(); schema_ptr schema() const { return _schema; } db::commitlog* commitlog() { return _commitlog; } - future find_partition(const dht::decorated_key& key) const; - future find_partition_slow(const partition_key& key) const; - future find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const; + future find_partition(schema_ptr, const dht::decorated_key& key) const; + future find_partition_slow(schema_ptr, const partition_key& key) const; + future find_row(schema_ptr, const dht::decorated_key& partition_key, clustering_key clustering_key) const; // Applies given mutation to this column family // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); void apply(const mutation& m, const db::replay_position& = db::replay_position()); // Returns at most "cmd.limit" rows - future> query(const query::read_command& cmd, const std::vector& ranges); + future> query(schema_ptr, + const query::read_command& cmd, + const std::vector& ranges); future<> populate(sstring datadir); @@ -362,14 +366,14 @@ private: // so that iteration can be stopped by returning false. // Func signature: bool (const decorated_key& dk, const mutation_partition& mp) template - future for_all_partitions(Func&& func) const; + future for_all_partitions(schema_ptr, Func&& func) const; future probe_file(sstring sstdir, sstring fname); void seal_on_overflow(); void check_valid_rp(const db::replay_position&) const; public: // Iterate over all partitions. Protocol is the same as std::all_of(), // so that iteration can be stopped by returning false. - future for_all_partitions_slow(std::function func) const; + future for_all_partitions_slow(schema_ptr, std::function func) const; friend std::ostream& operator<<(std::ostream& out, const column_family& cf); // Testing purposes. @@ -621,8 +625,8 @@ public: unsigned shard_of(const dht::token& t); unsigned shard_of(const mutation& m); unsigned shard_of(const frozen_mutation& m); - future> query(const query::read_command& cmd, const std::vector& ranges); - future query_mutations(const query::read_command& cmd, const query::partition_range& range); + future> query(schema_ptr, const query::read_command& cmd, const std::vector& ranges); + future query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range); future<> apply(schema_ptr, const frozen_mutation&); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 4c8a871efe..d6d9a8a4dc 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -423,7 +423,7 @@ future query_partition_mutation(service::storage_proxy& proxy, { auto dk = dht::global_partitioner().decorate_key(*s, pkey); return do_with(query::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) { - return proxy.query_mutations_locally(std::move(cmd), range) + return proxy.query_mutations_locally(s, std::move(cmd), range) .then([dk = std::move(dk), s](foreign_ptr> res) { auto&& partitions = res->partitions(); if (partitions.size() == 0) { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b10a7044ac..9dd435c1a3 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1017,7 +1017,7 @@ query_mutations(distributed& proxy, const sstring& cf_na auto slice = partition_slice_builder(*schema).build(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), std::numeric_limits::max()); - return proxy.local().query_mutations_locally(cmd, query::full_partition_range); + return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range); } future> diff --git a/memtable.cc b/memtable.cc index 6e3990e83e..8cefdca883 100644 --- a/memtable.cc +++ b/memtable.cc @@ -103,6 +103,7 @@ memtable::slice(const query::partition_range& range) const { class scanning_reader final : public mutation_reader::impl { lw_shared_ptr _memtable; + schema_ptr _schema; const query::partition_range& _range; stdx::optional _last; memtable::partitions_type::iterator _i; @@ -144,8 +145,9 @@ private: _last_reclaim_counter = current_reclaim_counter; } public: - scanning_reader(lw_shared_ptr m, const query::partition_range& range) + scanning_reader(schema_ptr s, lw_shared_ptr m, const query::partition_range& range) : _memtable(std::move(m)) + , _schema(std::move(s)) , _range(range) { } @@ -159,7 +161,7 @@ public: // FIXME: Use cache. See column_family::make_reader(). _delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range; _delegate = make_mutation_reader( - _memtable->_sstable, _memtable->_schema, *_delegate_range); + _memtable->_sstable, _schema, *_delegate_range); _memtable = {}; _last = {}; return _delegate(); @@ -174,13 +176,13 @@ public: ++_i; _last = e.key(); _memtable->upgrade_entry(e); - return make_ready_future(mutation(_memtable->_schema, e.key(), e.partition())); + return make_ready_future(e.read(_schema)); } }; mutation_reader -memtable::make_reader(const query::partition_range& range) { - if (query::is_wrap_around(range, *_schema)) { +memtable::make_reader(schema_ptr s, const query::partition_range& range) { + if (query::is_wrap_around(range, *s)) { fail(unimplemented::cause::WRAP_AROUND); } @@ -190,13 +192,13 @@ memtable::make_reader(const query::partition_range& range) { auto i = partitions.find(pos, partition_entry::compare(_schema)); if (i != partitions.end()) { upgrade_entry(*i); - return make_reader_returning(mutation(_schema, i->key(), i->partition())); + return make_reader_returning(i->read(s)); } else { return make_empty_reader(); } }); } else { - return make_mutation_reader(shared_from_this(), range); + return make_mutation_reader(std::move(s), shared_from_this(), range); } } @@ -209,7 +211,7 @@ memtable::update(const db::replay_position& rp) { future<> memtable::apply(memtable& mt) { - return do_with(mt.make_reader(), [this] (auto&& rd) mutable { + return do_with(mt.make_reader(_schema), [this] (auto&& rd) mutable { return consume(rd, [self = this->shared_from_this(), &rd] (mutation&& m) { self->apply(m); return stop_iteration::no; @@ -244,14 +246,14 @@ logalloc::occupancy_stats memtable::occupancy() const { } mutation_source memtable::as_data_source() { - return [mt = shared_from_this()] (const query::partition_range& range) { - return mt->make_reader(range); + return [mt = shared_from_this()] (schema_ptr s, const query::partition_range& range) { + return mt->make_reader(std::move(s), range); }; } key_source memtable::as_key_source() { return [mt = shared_from_this()] (const query::partition_range& range) { - return make_key_from_mutation_reader(mt->make_reader(range)); + return make_key_from_mutation_reader(mt->make_reader(mt->_schema, range)); }; } @@ -278,6 +280,12 @@ bool memtable::is_flushed() const { return bool(_sstable); } +mutation partition_entry::read(const schema_ptr& target_schema) { + auto m = mutation(_schema, _key, _p); + m.upgrade(target_schema); + return m; +} + void memtable::upgrade_entry(partition_entry& e) { if (e._schema != _schema) { assert(!_region.reclaiming_enabled()); diff --git a/memtable.hh b/memtable.hh index 1b988a7659..3f26a6d6c0 100644 --- a/memtable.hh +++ b/memtable.hh @@ -58,6 +58,7 @@ public: mutation_partition& partition() { return _p; } const schema_ptr& schema() const { return _schema; } schema_ptr& schema() { return _schema; } + mutation read(const schema_ptr&); struct compare { dht::decorated_key::less_comparator _c; @@ -132,7 +133,9 @@ public: // doesn't need to ensure that memtable remains live. // // The 'range' parameter must be live as long as the reader is being used - mutation_reader make_reader(const query::partition_range& range = query::full_partition_range); + // + // Mutations returned by the reader will all have given schema. + mutation_reader make_reader(schema_ptr, const query::partition_range& range = query::full_partition_range); mutation_source as_data_source(); key_source as_key_source(); diff --git a/mutation_query.cc b/mutation_query.cc index 567beef809..28fdb06da0 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -108,7 +108,8 @@ void db::serializer::read(reconcilable_result& v, input& in } future -mutation_query(const mutation_source& source, +mutation_query(schema_ptr s, + const mutation_source& source, const query::partition_range& range, const query::partition_slice& slice, uint32_t row_limit, @@ -141,8 +142,9 @@ mutation_query(const mutation_source& source, return make_ready_future(reconcilable_result()); } - return do_with(query_state(range, slice, row_limit, query_time), [&source] (query_state& state) -> future { - state.reader = source(state.range); + return do_with(query_state(range, slice, row_limit, query_time), + [&source, s = std::move(s)] (query_state& state) -> future { + state.reader = source(std::move(s), state.range); return consume(state.reader, [&state] (mutation&& m) { // FIXME: Make data sources respect row_ranges so that we don't have to filter them out here. auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct); diff --git a/mutation_query.hh b/mutation_query.hh index 3ffce507fd..98c9b39efe 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -114,6 +114,7 @@ extern template class serializer; // // 'source' doesn't have to survive deferring. future mutation_query( + schema_ptr, const mutation_source& source, const query::partition_range& range, const query::partition_slice& slice, diff --git a/mutation_reader.hh b/mutation_reader.hh index cabb38bc6f..09c05b4663 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -71,6 +71,9 @@ make_mutation_reader(Args&&... args) { return mutation_reader(std::make_unique(std::forward(args)...)); } +// Creates a mutation reader which combines data return by supplied readers. +// Returns mutation of the same schema only when all readers return mutations +// of the same schema. mutation_reader make_combined_reader(std::vector); mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b); // reads from the input readers, in order @@ -147,7 +150,9 @@ future<> consume(mutation_reader& reader, Consumer consumer) { // mutation_source represents source of data in mutation form. The data source // can be queried multiple times and in parallel. For each query it returns // independent mutation_reader. -using mutation_source = std::function; +// The reader returns mutations having all the same schema, the one passed +// when invoking the source. +using mutation_source = std::function; /// A partition_presence_checker quickly returns whether a key is known not to exist /// in a data source (it may return false positives, but not false negatives). diff --git a/query-request.hh b/query-request.hh index 23d61e01a0..de2207b6f1 100644 --- a/query-request.hh +++ b/query-request.hh @@ -57,6 +57,7 @@ typedef std::vector clustering_row_ranges; // Specifies subset of rows, columns and cell attributes to be returned in a query. // Can be accessed across cores. +// Schema-dependent. class partition_slice { public: enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry, reversed, distinct }; diff --git a/repair/repair.cc b/repair/repair.cc index eeba127d95..99641f707d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -299,7 +299,7 @@ static future checksum_range_shard(database &db, const ::range& range) { auto& cf = db.find_column_family(keyspace_name, cf_name); return do_with(query::to_partition_range(range), [&cf] (const auto& partition_range) { - return do_with(cf.make_reader(partition_range), partition_checksum(), + return do_with(cf.make_reader(cf.schema(), partition_range), partition_checksum(), [] (auto& reader, auto& checksum) { return repeat([&reader, &checksum] () { return reader().then([&checksum] (auto mopt) { diff --git a/row_cache.cc b/row_cache.cc index 7dddb8ec0b..91d32edf6e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -155,11 +155,13 @@ const logalloc::region& cache_tracker::region() const { // Reader which populates the cache using data from the delegate. class populating_reader final : public mutation_reader::impl { + schema_ptr _schema; row_cache& _cache; mutation_reader _delegate; public: - populating_reader(row_cache& cache, mutation_reader delegate) - : _cache(cache) + populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate) + : _schema(std::move(s)) + , _cache(cache) , _delegate(std::move(delegate)) { } @@ -167,6 +169,7 @@ public: return _delegate().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) { if (mo) { _cache.populate(*mo); + mo->upgrade(_schema); } return std::move(mo); }); @@ -184,6 +187,7 @@ void row_cache::on_miss() { } class just_cache_scanning_reader final : public mutation_reader::impl { + schema_ptr _schema; row_cache& _cache; row_cache::partitions_type::iterator _it; row_cache::partitions_type::iterator _end; @@ -227,7 +231,9 @@ private: _last_modification_count = modification_count; } public: - just_cache_scanning_reader(row_cache& cache, const query::partition_range& range) : _cache(cache), _range(range) { } + just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range) + : _schema(std::move(s)), _cache(cache), _range(range) + { } virtual future operator()() override { return _cache._read_section(_cache._tracker.region(), [this] { update_iterators(); @@ -238,7 +244,7 @@ public: ++_it; _last = ce.key(); _cache.upgrade_entry(ce); - return make_ready_future(mutation(_cache._schema, ce.key(), ce.partition())); + return make_ready_future(ce.read(_schema)); }); } }; @@ -259,9 +265,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl { dht::decorated_key_opt _next_key; dht::decorated_key_opt _last_secondary_key; public: - scanning_and_populating_reader(row_cache& cache, const query::partition_range& range) - : _cache(cache), _schema(cache._schema), - _primary(make_mutation_reader(cache, range)), + scanning_and_populating_reader(schema_ptr s, row_cache& cache, const query::partition_range& range) + : _cache(cache), _schema(s), + _primary(make_mutation_reader(s, cache, range)), _underlying(cache._underlying), _original_range(range), _underlying_keys(cache._underlying_keys), _keys(_underlying_keys(range)) { } @@ -298,7 +304,7 @@ public: _range = query::partition_range(query::partition_range::bound { std::move(*dk), true }, std::move(end)); _last_secondary_key = {}; _secondary_phase = _cache._populate_phaser.phase(); - _secondary = _underlying(_range); + _secondary = _underlying(_cache._schema, _range); _secondary_only = true; return next_secondary(); }); @@ -311,7 +317,7 @@ private: auto cmp = dht::ring_position_comparator(*_schema); _range = _range.split_after(*_last_secondary_key, cmp); _secondary_phase = _cache._populate_phaser.phase(); - _secondary = _underlying(_range); + _secondary = _underlying(_cache._schema, _range); } return _secondary().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) { if (!mo && _next_primary) { @@ -324,6 +330,7 @@ private: } if (mo) { _cache.populate(*mo); + mo->upgrade(_schema); _last_secondary_key = mo->decorated_key(); } _cache.on_miss(); @@ -339,17 +346,17 @@ private: }; mutation_reader -row_cache::make_scanning_reader(const query::partition_range& range) { - return make_mutation_reader(*this, range); +row_cache::make_scanning_reader(schema_ptr s, const query::partition_range& range) { + return make_mutation_reader(std::move(s), *this, range); } mutation_reader -row_cache::make_reader(const query::partition_range& range) { +row_cache::make_reader(schema_ptr s, const query::partition_range& range) { if (range.is_singular()) { const query::ring_position& pos = range.start()->value(); if (!pos.has_key()) { - return make_scanning_reader(range); + return make_scanning_reader(std::move(s), range); } return _read_section(_tracker.region(), [&] { @@ -360,15 +367,15 @@ row_cache::make_reader(const query::partition_range& range) { _tracker.touch(e); on_hit(); upgrade_entry(e); - return make_reader_returning(mutation(_schema, dk, e.partition())); + return make_reader_returning(e.read(s)); } else { on_miss(); - return make_mutation_reader(*this, _underlying(range)); + return make_mutation_reader(s, *this, _underlying(_schema, range)); } }); } - return make_scanning_reader(range); + return make_scanning_reader(std::move(s), range); } row_cache::~row_cache() { @@ -542,6 +549,14 @@ cache_entry::cache_entry(cache_entry&& o) noexcept } } +mutation cache_entry::read(const schema_ptr& s) { + auto m = mutation(_schema, _key, _p); + if (_schema != s) { + m.upgrade(s); + } + return m; +} + const schema_ptr& row_cache::schema() const { return _schema; } diff --git a/row_cache.hh b/row_cache.hh index b4a61bfbbc..730b5d2173 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -83,6 +83,7 @@ public: mutation_partition& partition() { return _p; } const schema_ptr& schema() const { return _schema; } schema_ptr& schema() { return _schema; } + mutation read(const schema_ptr&); struct compare { dht::decorated_key::less_comparator _c; @@ -194,7 +195,7 @@ private: logalloc::allocating_section _update_section; logalloc::allocating_section _populate_section; logalloc::allocating_section _read_section; - mutation_reader make_scanning_reader(const query::partition_range&); + mutation_reader make_scanning_reader(schema_ptr, const query::partition_range&); void on_hit(); void on_miss(); void upgrade_entry(cache_entry&); @@ -206,7 +207,10 @@ public: row_cache(const row_cache&) = delete; row_cache& operator=(row_cache&&) = default; public: - mutation_reader make_reader(const query::partition_range& = query::full_partition_range); + // Implements mutation_source for this cache, see mutation_reader.hh + // User needs to ensure that the row_cache object stays alive + // as long as the reader is used. + mutation_reader make_reader(schema_ptr, const query::partition_range& = query::full_partition_range); const stats& stats() const { return _stats; } public: // Populate cache from given mutation. The mutation must contain all diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1939cf69a5..eb31943f93 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -71,6 +71,7 @@ #include #include "utils/latency.hh" #include "schema.hh" +#include "schema_registry.hh" namespace service { @@ -1643,6 +1644,7 @@ protected: using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; + schema_ptr _schema; shared_ptr _proxy; lw_shared_ptr _cmd; lw_shared_ptr _retry_cmd; @@ -1653,9 +1655,9 @@ protected: promise>> _result_promise; public: - abstract_read_executor(shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, + abstract_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector targets) : - _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) { + _schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) { _proxy->_stats.reads++; } virtual ~abstract_read_executor() { @@ -1665,7 +1667,7 @@ public: protected: future>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_mutations_locally(cmd, _partition_range); + return _proxy->query_mutations_locally(_schema, cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, *cmd, _partition_range).then([this](reconcilable_result&& result) { @@ -1675,7 +1677,7 @@ protected: } future>> make_data_request(gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_singular_local(_cmd, _partition_range); + return _proxy->query_singular_local(_schema, _cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range).then([this](query::result&& result) { @@ -1685,7 +1687,7 @@ protected: } future make_digest_request(gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_singular_local_digest(_cmd, _partition_range); + return _proxy->query_singular_local_digest(_schema, _cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range); @@ -1742,14 +1744,13 @@ protected: data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { try { f.get(); - schema_ptr s = _proxy->_db.local().find_schema(_cmd->cf_id); - auto rr = data_resolver->resolve(s); // reconciliation happens here + auto rr = data_resolver->resolve(_schema); // reconciliation happens here // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry). // So in particular, if no host returned count live columns, we know it's not a short read. if (data_resolver->max_live_count() < cmd->row_limit || rr.row_count() >= original_row_limit()) { - auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), std::move(s), _cmd->slice))); + auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), _schema, _cmd->slice))); // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) @@ -1890,8 +1891,8 @@ public: class range_slice_read_executor : public abstract_read_executor { public: - range_slice_read_executor(shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - abstract_read_executor(std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} + range_slice_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : + abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} virtual future>> execute(std::chrono::steady_clock::time_point timeout) override { reconcile(_cl, timeout); return _result_promise.get_future(); @@ -1913,7 +1914,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s ::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl) { const dht::token& token = pr.start()->value().token(); - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector all_replicas = get_live_sorted_endpoints(ks, token); @@ -1936,14 +1937,14 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s auto p = shared_from_this(); // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retry_type == speculative_retry::type::NONE || db::block_for(ks, cl) == all_replicas.size()) { - return ::make_shared(p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } if (target_replicas.size() == all_replicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return ::make_shared(/*cfs, */p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. @@ -1959,24 +1960,24 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s target_replicas.push_back(extra_replica); if (retry_type == speculative_retry::type::ALWAYS) { - return ::make_shared(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } else {// PERCENTILE or CUSTOM. - return ::make_shared(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } } future -storage_proxy::query_singular_local_digest(lw_shared_ptr cmd, const query::partition_range& pr) { - return query_singular_local(cmd, pr).then([] (foreign_ptr> result) { +storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { + return query_singular_local(std::move(s), std::move(cmd), pr).then([] (foreign_ptr> result) { return result->digest(); }); } future>> -storage_proxy::query_singular_local(lw_shared_ptr cmd, const query::partition_range& pr) { +storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { unsigned shard = _db.local().shard_of(pr.start()->value().token()); - return _db.invoke_on(shard, [prv = std::vector({pr}) /* FIXME: pr is copied */, cmd] (database& db) { - return db.query(*cmd, prv).then([](auto&& f) { + return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector({pr}) /* FIXME: pr is copied */, cmd] (database& db) { + return db.query(gs, *cmd, prv).then([](auto&& f) { return make_foreign(std::move(f)); }); }); @@ -2011,7 +2012,7 @@ future>>> storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, std::vector::iterator&& i, std::vector&& ranges, int concurrency_factor) { - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector<::shared_ptr> exec; auto concurrent_fetch_starting_index = i; @@ -2064,7 +2065,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t ++i; } db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints); - exec.push_back(::make_shared(p, cmd, std::move(range), cl, std::move(filtered_endpoints))); + exec.push_back(::make_shared(schema, p, cmd, std::move(range), cl, std::move(filtered_endpoints))); } query::result_merger merger; @@ -2087,7 +2088,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t future>> storage_proxy::query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl) { - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector ranges; auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); @@ -2712,17 +2713,23 @@ void storage_proxy::init_messaging_service() { }); ms.register_read_data([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_singular_local(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_singular_local(std::move(s), cmd, pr); }); }); ms.register_read_mutation_data([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_mutations_locally(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_mutations_locally(std::move(s), cmd, pr); }); }); ms.register_read_digest([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_singular_local_digest(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_singular_local_digest(std::move(s), cmd, pr); }); }); ms.register_truncate([](sstring ksname, sstring cfname) { @@ -2840,18 +2847,17 @@ public: }; future>> -storage_proxy::query_mutations_locally(lw_shared_ptr cmd, const query::partition_range& pr) { +storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { if (pr.is_singular()) { unsigned shard = _db.local().shard_of(pr.start()->value().token()); - return _db.invoke_on(shard, [cmd, &pr] (database& db) { - return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) { + return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) { + return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) { return make_foreign(make_lw_shared(std::move(result))); }); }); } else { - auto schema = _db.local().find_schema(cmd->cf_id); - return _db.map_reduce(mutation_result_merger{cmd, schema}, [cmd, &pr] (database& db) { - return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) { + return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) { + return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) { return make_foreign(make_lw_shared(std::move(result))); }); }).then([] (reconcilable_result&& result) { @@ -2869,8 +2875,8 @@ storage_proxy::stop() { class shard_reader final : public mutation_reader::impl { distributed& _db; unsigned _shard; - utils::UUID _cf_id; const query::partition_range _range; + global_schema_ptr _schema; schema_ptr _local_schema; struct remote_state { mutation_reader reader; @@ -2879,20 +2885,24 @@ class shard_reader final : public mutation_reader::impl { foreign_ptr> _remote; private: future<> init() { - _local_schema = _db.local().find_column_family(_cf_id).schema(); return _db.invoke_on(_shard, [this] (database& db) { - column_family& cf = db.find_column_family(_cf_id); - return make_foreign(std::make_unique(remote_state{cf.make_reader(_range)})); + schema_ptr s = _schema; + column_family& cf = db.find_column_family(s->id()); + return make_foreign(std::make_unique(remote_state{cf.make_reader(std::move(s), _range)})); }).then([this] (auto&& ptr) { _remote = std::move(ptr); }); } public: - shard_reader(utils::UUID cf_id, distributed& db, unsigned shard, const query::partition_range& range) + shard_reader(schema_ptr s, + distributed& db, + unsigned shard, + const query::partition_range& range) : _db(db) , _shard(shard) - , _cf_id(cf_id) , _range(range) + , _schema(s) + , _local_schema(std::move(s)) { } virtual future operator()() override { @@ -2938,7 +2948,7 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range unsigned last_shard = range.end() ? dht::shard_of(range.end()->value().token()) : smp::count - 1; std::vector readers; for (auto cpu = first_shard; cpu <= last_shard; ++cpu) { - readers.emplace_back(make_mutation_reader(cf_id, _db, cpu, range)); + readers.emplace_back(make_mutation_reader(schema, _db, cpu, range)); } return make_joining_reader(std::move(readers)); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ad7758297a..72172a3d79 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -143,8 +143,8 @@ private: std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl); - future>> query_singular_local(lw_shared_ptr cmd, const query::partition_range& pr); - future query_singular_local_digest(lw_shared_ptr cmd, const query::partition_range& pr); + future>> query_singular_local(schema_ptr, lw_shared_ptr cmd, const query::partition_range& pr); + future query_singular_local_digest(schema_ptr, lw_shared_ptr cmd, const query::partition_range& pr); future>> query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl); std::vector get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); @@ -222,11 +222,12 @@ public: db::consistency_level cl); future>> query_mutations_locally( - lw_shared_ptr cmd, const query::partition_range&); + schema_ptr, lw_shared_ptr cmd, const query::partition_range&); /* * Returns mutation_reader for given column family * which combines data from all shards. + * Uses schema current at the time of invocation. */ mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 841684668e..c57e25e867 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1354,7 +1354,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_ future<> sstable::write_components(memtable& mt) { _collector.set_replay_position(mt.replay_position()); - return write_components(mt.make_reader(), + return write_components(mt.make_reader(mt.schema()), mt.partition_count(), mt.schema(), std::numeric_limits::max()); } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 13f81071f5..2aa48988b5 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -211,7 +211,8 @@ public: table_name = std::move(table_name)] (database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - return cf.find_partition_slow(pkey).then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { + return cf.find_partition_slow(schema, pkey) + .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(ckey); assert(row != nullptr); diff --git a/tests/mutation_query_test.cc b/tests/mutation_query_test.cc index c164bfdc51..d9c1e644b1 100644 --- a/tests/mutation_query_test.cc +++ b/tests/mutation_query_test.cc @@ -59,8 +59,11 @@ struct mutation_less_cmp { }; mutation_source make_source(std::vector mutations) { - return [mutations = std::move(mutations)] (const query::partition_range& range) { + return [mutations = std::move(mutations)] (schema_ptr s, const query::partition_range& range) { assert(range.is_full()); // slicing not implemented yet + for (auto&& m : mutations) { + assert(m.schema() == s); + } return make_reader_returning_many(mutations); }; } @@ -90,7 +93,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); // FIXME: use mutation assertions @@ -113,7 +116,8 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { clustering_key_prefix::from_single_value(*s, bytes("B")))) .build(); - reconcilable_result result = mutation_query(src, query::full_partition_range, slice, query::max_rows, now).get0(); + reconcilable_result result = mutation_query(s, src, + query::full_partition_range, slice, query::max_rows, now).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -145,7 +149,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -159,7 +163,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now + 2s).get0(); assert_that(to_result_set(result, s, slice)) @@ -191,7 +195,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -221,7 +225,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -249,7 +253,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 10, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -269,7 +273,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { } { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -281,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { } { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -308,7 +312,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -332,7 +336,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -354,7 +358,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -379,7 +383,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { auto src = make_source({m1}); auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, now).get0(); assert_that(to_result_set(result, s, slice)) diff --git a/tests/mutation_source_test.cc b/tests/mutation_source_test.cc index a740cdbf47..4a2702661c 100644 --- a/tests/mutation_source_test.cc +++ b/tests/mutation_source_test.cc @@ -70,7 +70,7 @@ static void test_range_queries(populate_fn populate) { auto test_slice = [&] (query::range r) { BOOST_MESSAGE(sprint("Testing range %s", r)); - assert_that(ds(r)) + assert_that(ds(s, r)) .produces(slice(partitions, r)) .produces_end_of_stream(); }; diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index a601015868..6392eb7677 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -57,7 +57,7 @@ static atomic_cell make_atomic_cell(bytes value) { static mutation_partition get_partition(memtable& mt, const partition_key& key) { auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key); - auto reader = mt.make_reader(query::partition_range::make_singular(dk)); + auto reader = mt.make_reader(mt.schema(), query::partition_range::make_singular(dk)); auto mo = reader().get0(); BOOST_REQUIRE(bool(mo)); return std::move(mo->partition()); @@ -296,7 +296,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { insert_row(1003, 2003)).discard_result().then([s, &r1_col, &cf, key] { auto verify_row = [&] (int32_t c1, int32_t r1) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); - return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { + return cf.find_row(cf.schema(), dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); BOOST_REQUIRE(i); @@ -353,13 +353,13 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) { std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator()); // Flush will happen in the middle of reading for this scanner - auto assert_that_scanner1 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner1 = assert_that(cf.make_reader(s, query::full_partition_range)); // Flush will happen before it is invoked - auto assert_that_scanner2 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner2 = assert_that(cf.make_reader(s, query::full_partition_range)); // Flush will happen after all data was read, but before EOS was consumed - auto assert_that_scanner3 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner3 = assert_that(cf.make_reader(s, query::full_partition_range)); assert_that_scanner1.produces(mutations[0]); assert_that_scanner1.produces(mutations[1]); @@ -432,7 +432,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { } return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) { - return cf.for_all_partitions_slow([&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { + return cf.for_all_partitions_slow(s, [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { auto p1 = value_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); diff --git a/tests/perf_row_cache_update.cc b/tests/perf_row_cache_update.cc index d752a61d38..f883c66846 100644 --- a/tests/perf_row_cache_update.cc +++ b/tests/perf_row_cache_update.cc @@ -68,7 +68,7 @@ int main(int argc, char** argv) { .build(); cache_tracker tracker; - row_cache cache(s, [] (auto&&) { return make_empty_reader(); }, + row_cache cache(s, [] (schema_ptr, auto&&) { return make_empty_reader(); }, [] (auto&&) { return key_reader(); }, tracker); size_t partitions = app.configuration()["partitions"].as(); diff --git a/tests/row_cache_alloc_stress.cc b/tests/row_cache_alloc_stress.cc index 4b4dd5c02d..e999ac69a6 100644 --- a/tests/row_cache_alloc_stress.cc +++ b/tests/row_cache_alloc_stress.cc @@ -181,7 +181,7 @@ int main(int argc, char** argv) { // Verify that all mutations from memtable went through for (auto&& key : keys) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); auto mo = reader().get0(); assert(mo); assert(mo->partition().live_row_count(*s) == @@ -198,7 +198,7 @@ int main(int argc, char** argv) { for (auto&& key : keys) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); auto mo = reader().get0(); assert(mo); } @@ -235,7 +235,7 @@ int main(int argc, char** argv) { } try { - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); assert(!reader().get0()); auto evicted_from_cache = logalloc::segment_size + cell_size * row_count; new char[evicted_from_cache + logalloc::segment_size]; diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index 59ecda4e69..5f4b238622 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -69,13 +69,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) { auto m = make_new_mutation(s); cache_tracker tracker; - row_cache cache(s, [m] (const query::partition_range&) { + row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) { + assert(m.schema() == s); return make_reader_returning(m); }, [m] (auto&&) { return make_key_from_mutation_reader(make_reader_returning(m)); }, tracker); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -87,19 +88,20 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) { auto m = make_new_mutation(s); cache_tracker tracker; - row_cache cache(s, [m] (const query::partition_range&) { + row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) { + assert(m.schema() == s); return make_reader_returning(m); }, [m] (auto&&) { return make_key_from_mutation_reader(make_reader_returning(m)); }, tracker); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); tracker.clear(); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -133,26 +135,26 @@ SEASTAR_TEST_CASE(test_query_of_incomplete_range_goes_to_underlying) { }; // Populate cache for first key - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); // Populate cache for last key - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); // Test single-key queries - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); // Test range query - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(mutations[0]) .produces(mutations[1]) .produces(mutations[2]) @@ -180,15 +182,15 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) { }; for (int i = 0; i < 2; ++i) { - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[1]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[1]))) .produces(mutations[1]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); } @@ -207,8 +209,8 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) { } auto cache = make_lw_shared(s, mt->as_data_source(), mt->as_key_source(), tracker); - return [cache] (const query::partition_range& range) { - return cache->make_reader(range); + return [cache] (schema_ptr s, const query::partition_range& range) { + return cache->make_reader(s, range); }; }); }); @@ -232,7 +234,7 @@ SEASTAR_TEST_CASE(test_eviction) { std::random_shuffle(keys.begin(), keys.end()); for (auto&& key : keys) { - cache.make_reader(query::partition_range::make_singular(key)); + cache.make_reader(s, query::partition_range::make_singular(key)); } while (tracker.region().occupancy().used_space() > 0) { @@ -243,7 +245,7 @@ SEASTAR_TEST_CASE(test_eviction) { bool has_key(row_cache& cache, const dht::decorated_key& key) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(cache.schema(), range); auto mo = reader().get0(); return bool(mo); } @@ -258,7 +260,7 @@ void verify_does_not_have(row_cache& cache, const dht::decorated_key& key) { void verify_has(row_cache& cache, const mutation& m) { auto range = query::partition_range::make_singular(m.decorated_key()); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(cache.schema(), range); auto mo = reader().get0(); BOOST_REQUIRE(bool(mo)); assert_that(*mo).is_equal_to(m); @@ -410,8 +412,8 @@ private: : _underlying(std::move(underlying)) { } - mutation_reader make_reader(const query::partition_range& pr) { - return make_mutation_reader(_throttle, _underlying(pr)); + mutation_reader make_reader(schema_ptr s, const query::partition_range& pr) { + return make_mutation_reader(_throttle, _underlying(s, pr)); } ::throttle& throttle() { return _throttle; } @@ -430,8 +432,8 @@ public: _impl->throttle().unblock(); } - mutation_reader operator()(const query::partition_range& pr) { - return _impl->make_reader(pr); + mutation_reader operator()(schema_ptr s, const query::partition_range& pr) { + return _impl->make_reader(s, pr); } }; @@ -447,10 +449,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema(); std::vector> memtables; - auto memtables_data_source = [&] (const query::partition_range& pr) { + auto memtables_data_source = [&] (schema_ptr s, const query::partition_range& pr) { std::vector readers; for (auto&& mt : memtables) { - readers.emplace_back(mt->make_reader(pr)); + readers.emplace_back(mt->make_reader(s, pr)); } return make_combined_reader(std::move(readers)); }; @@ -481,10 +483,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { cache_source.block(); auto m0_range = query::partition_range::make_singular(ring[0].ring_position()); - auto rd1 = cache.make_reader(m0_range); + auto rd1 = cache.make_reader(s, m0_range); auto rd1_result = rd1(); - auto rd2 = cache.make_reader(); + auto rd2 = cache.make_reader(s); auto rd2_result = rd2(); sleep(10ms).get(); @@ -495,7 +497,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { // This update should miss on all partitions auto update_future = cache.update(*mt2, make_default_partition_presence_checker()); - auto rd3 = cache.make_reader(); + auto rd3 = cache.make_reader(s); // rd2, which is in progress, should not prevent forward progress of update() cache_source.unblock(); @@ -519,7 +521,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { .produces_end_of_stream(); // Reads started after flush should see new data - assert_that(cache.make_reader()) + assert_that(cache.make_reader(s)) .produces(ring2[0]) .produces(ring2[1]) .produces(ring2[2]) diff --git a/tests/sstable_mutation_test.cc b/tests/sstable_mutation_test.cc index f16d17e33c..5933b18902 100644 --- a/tests/sstable_mutation_test.cc +++ b/tests/sstable_mutation_test.cc @@ -345,8 +345,8 @@ SEASTAR_TEST_CASE(read_partial_range_2) { }); } -::mutation_source as_mutation_source(schema_ptr s, lw_shared_ptr sst) { - return [s, sst] (const query::partition_range& range) mutable { +::mutation_source as_mutation_source(lw_shared_ptr sst) { + return [sst] (schema_ptr s, const query::partition_range& range) mutable { return as_mutation_reader(sst, sst->read_range_rows(s, range)); }; } @@ -373,7 +373,7 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) { sst->write_components(*mt).get(); sst->load().get(); - return as_mutation_source(s, sst); + return as_mutation_source(sst); }); }); } diff --git a/thrift/handler.cc b/thrift/handler.cc index e57c66c3f4..6a15aaa4e2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -195,16 +195,18 @@ public: throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { auto&& range = predicate.slice_range; - return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) { + auto s = cf.schema(); + return cf.find_row(s, dk, clustering_key::make_empty(*s)).then( + [s, &cf, range = std::move(range)] (column_family::const_row_ptr rw) { std::vector ret; if (rw) { - auto beg = cf.schema()->regular_begin(); + auto beg = s->regular_begin(); if (!range.start.empty()) { - beg = cf.schema()->regular_lower_bound(to_bytes(range.start)); + beg = s->regular_lower_bound(to_bytes(range.start)); } - auto end = cf.schema()->regular_end(); + auto end = s->regular_end(); if (!range.finish.empty()) { - end = cf.schema()->regular_upper_bound(to_bytes(range.finish)); + end = s->regular_upper_bound(to_bytes(range.finish)); } auto count = range.count; // FIXME: force limit count?