From 4e5a52d6fae1410f65d518e5604643dfd89d5b03 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 7 Dec 2015 17:56:20 +0100 Subject: [PATCH] db: Make read interface schema version aware The intent is to make data returned by queries always conform to a single schema version, which is requested by the client. For CQL queries, for example, we want to use the same schema which was used to compile the query. The other node expects to receive data conforming to the requested schema. Interface on shard level accepts schema_ptr, across nodes we use table_schema_version UUID. To transfer schema_ptr across shards, we use global_schema_ptr. Because schema is identified with UUID across nodes, requestors must be prepared for being queried for the definition of the schema. They must hold a live schema_ptr around the request. This guarantees that schema_registry will always know about the requested version. This is not an issue because for queries the requestor needs to hold on to the schema anyway to be able to interpret the results. But care must be taken to always use the same schema version for making the request and parsing the results. Schema requesting across nodes is currently stubbed (throws runtime exception). --- database.cc | 78 +++++++++++++++-------------- database.hh | 24 +++++---- db/schema_tables.cc | 2 +- db/system_keyspace.cc | 2 +- memtable.cc | 30 ++++++----- memtable.hh | 5 +- mutation_query.cc | 8 +-- mutation_query.hh | 1 + mutation_reader.hh | 7 ++- query-request.hh | 1 + repair/repair.cc | 2 +- row_cache.cc | 47 ++++++++++++------ row_cache.hh | 8 ++- service/storage_proxy.cc | 88 ++++++++++++++++++--------------- service/storage_proxy.hh | 7 +-- sstables/sstables.cc | 2 +- tests/cql_test_env.cc | 3 +- tests/mutation_query_test.cc | 32 ++++++------ tests/mutation_source_test.cc | 2 +- tests/mutation_test.cc | 12 ++--- tests/perf_row_cache_update.cc | 2 +- tests/row_cache_alloc_stress.cc | 6 +-- tests/row_cache_test.cc | 58 +++++++++++----------- tests/sstable_mutation_test.cc | 6 +-- thrift/handler.cc | 12 +++-- 25 files changed, 256 insertions(+), 189 deletions(-) diff --git a/database.cc b/database.cc index b2c193c3b7..6c338916bd 100644 --- a/database.cc +++ b/database.cc @@ -127,8 +127,8 @@ column_family::make_partition_presence_checker(lw_shared_ptr old_s mutation_source column_family::sstables_as_mutation_source() { - return [this] (const query::partition_range& r) { - return make_sstable_reader(r); + return [this] (schema_ptr s, const query::partition_range& r) { + return make_sstable_reader(std::move(s), r); }; } @@ -207,16 +207,16 @@ public: }; mutation_reader -column_family::make_sstable_reader(const query::partition_range& pr) const { +column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr) const { if (pr.is_singular() && pr.start()->value().has_key()) { const dht::ring_position& pos = pr.start()->value(); if (dht::shard_of(pos.token()) != engine().cpu_id()) { return make_empty_reader(); // range doesn't belong to this shard } - return make_mutation_reader(_schema, _sstables, *pos.key()); + return make_mutation_reader(std::move(s), _sstables, *pos.key()); } else { // range_sstable_reader is not movable so we need to wrap it - return make_mutation_reader(_schema, _sstables, pr); + return make_mutation_reader(std::move(s), _sstables, pr); } } @@ -240,9 +240,9 @@ key_source column_family::sstables_as_key_source() const { // Exposed for testing, not performance critical. future -column_family::find_partition(const dht::decorated_key& key) const { - return do_with(query::partition_range::make_singular(key), [this] (auto& range) { - return do_with(this->make_reader(range), [] (mutation_reader& reader) { +column_family::find_partition(schema_ptr s, const dht::decorated_key& key) const { + return do_with(query::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) { + return do_with(this->make_reader(s, range), [] (mutation_reader& reader) { return reader().then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; @@ -254,13 +254,13 @@ column_family::find_partition(const dht::decorated_key& key) const { } future -column_family::find_partition_slow(const partition_key& key) const { - return find_partition(dht::global_partitioner().decorate_key(*_schema, key)); +column_family::find_partition_slow(schema_ptr s, const partition_key& key) const { + return find_partition(s, dht::global_partitioner().decorate_key(*s, key)); } future -column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const { - return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { +column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const { + return find_partition(std::move(s), partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { if (!p) { return make_ready_future(); } @@ -275,8 +275,8 @@ column_family::find_row(const dht::decorated_key& partition_key, clustering_key } mutation_reader -column_family::make_reader(const query::partition_range& range) const { - if (query::is_wrap_around(range, *_schema)) { +column_family::make_reader(schema_ptr s, const query::partition_range& range) const { + if (query::is_wrap_around(range, *s)) { // make_combined_reader() can't handle streams that wrap around yet. fail(unimplemented::cause::WRAP_AROUND); } @@ -305,13 +305,13 @@ column_family::make_reader(const query::partition_range& range) const { // https://github.com/scylladb/scylla/issues/185 for (auto&& mt : *_memtables) { - readers.emplace_back(mt->make_reader(range)); + readers.emplace_back(mt->make_reader(s, range)); } if (_config.enable_cache) { - readers.emplace_back(_cache.make_reader(range)); + readers.emplace_back(_cache.make_reader(s, range)); } else { - readers.emplace_back(make_sstable_reader(range)); + readers.emplace_back(make_sstable_reader(s, range)); } return make_combined_reader(std::move(readers)); @@ -319,7 +319,7 @@ column_family::make_reader(const query::partition_range& range) const { template future -column_family::for_all_partitions(Func&& func) const { +column_family::for_all_partitions(schema_ptr s, Func&& func) const { static_assert(std::is_same>::value, "bad Func signature"); @@ -330,13 +330,13 @@ column_family::for_all_partitions(Func&& func) const { bool empty = false; public: bool done() const { return !ok || empty; } - iteration_state(const column_family& cf, Func&& func) - : reader(cf.make_reader()) + iteration_state(schema_ptr s, const column_family& cf, Func&& func) + : reader(cf.make_reader(std::move(s))) , func(std::move(func)) { } }; - return do_with(iteration_state(*this, std::move(func)), [] (iteration_state& is) { + return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) { return do_until([&is] { return is.done(); }, [&is] { return is.reader().then([&is](mutation_opt&& mo) { if (!mo) { @@ -352,8 +352,8 @@ column_family::for_all_partitions(Func&& func) const { } future -column_family::for_all_partitions_slow(std::function func) const { - return for_all_partitions(std::move(func)); +column_family::for_all_partitions_slow(schema_ptr s, std::function func) const { + return for_all_partitions(std::move(s), std::move(func)); } class lister { @@ -1486,13 +1486,17 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { } struct query_state { - explicit query_state(const query::read_command& cmd, const std::vector& ranges) - : cmd(cmd) + explicit query_state(schema_ptr s, + const query::read_command& cmd, + const std::vector& ranges) + : schema(std::move(s)) + , cmd(cmd) , builder(cmd.slice) , limit(cmd.row_limit) , current_partition_range(ranges.begin()) , range_end(ranges.end()){ } + schema_ptr schema; const query::read_command& cmd; query::result::builder builder; uint32_t limit; @@ -1506,21 +1510,21 @@ struct query_state { }; future> -column_family::query(const query::read_command& cmd, const std::vector& partition_ranges) { +column_family::query(schema_ptr s, const query::read_command& cmd, const std::vector& partition_ranges) { utils::latency_counter lc; _stats.reads.set_latency(lc); - return do_with(query_state(cmd, partition_ranges), [this] (query_state& qs) { + return do_with(query_state(std::move(s), cmd, partition_ranges), [this] (query_state& qs) { return do_until(std::bind(&query_state::done, &qs), [this, &qs] { auto&& range = *qs.current_partition_range++; - qs.reader = make_reader(range); + qs.reader = make_reader(qs.schema, range); qs.range_empty = false; - return do_until([&qs] { return !qs.limit || qs.range_empty; }, [this, &qs] { - return qs.reader().then([this, &qs](mutation_opt mo) { + return do_until([&qs] { return !qs.limit || qs.range_empty; }, [&qs] { + return qs.reader().then([&qs](mutation_opt mo) { if (mo) { auto p_builder = qs.builder.add_partition(*mo->schema(), mo->key()); auto is_distinct = qs.cmd.slice.options.contains(query::partition_slice::option::distinct); auto limit = !is_distinct ? qs.limit : 1; - mo->partition().query(p_builder, *_schema, qs.cmd.timestamp, limit); + mo->partition().query(p_builder, *qs.schema, qs.cmd.timestamp, limit); qs.limit -= p_builder.row_count(); } else { qs.range_empty = true; @@ -1541,21 +1545,21 @@ column_family::query(const query::read_command& cmd, const std::vectormake_reader(range); + return [this] (schema_ptr s, const query::partition_range& range) { + return this->make_reader(std::move(s), range); }; } future> -database::query(const query::read_command& cmd, const std::vector& ranges) { +database::query(schema_ptr s, const query::read_command& cmd, const std::vector& ranges) { column_family& cf = find_column_family(cmd.cf_id); - return cf.query(cmd, ranges); + return cf.query(std::move(s), cmd, ranges); } future -database::query_mutations(const query::read_command& cmd, const query::partition_range& range) { +database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) { column_family& cf = find_column_family(cmd.cf_id); - return mutation_query(cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp); + return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp); } std::unordered_set database::get_initial_tokens() { diff --git a/database.hh b/database.hh index b993b771dc..5ba9795ac3 100644 --- a/database.hh +++ b/database.hh @@ -189,7 +189,8 @@ private: // Creates a mutation reader which covers sstables. // Caller needs to ensure that column_family remains live (FIXME: relax this). // The 'range' parameter must be live as long as the reader is used. - mutation_reader make_sstable_reader(const query::partition_range& range) const; + // Mutations returned by the reader will all have given schema. + mutation_reader make_sstable_reader(schema_ptr schema, const query::partition_range& range) const; mutation_source sstables_as_mutation_source(); key_source sstables_as_key_source() const; @@ -200,7 +201,8 @@ public: // Caller needs to ensure that column_family remains live (FIXME: relax this). // Note: for data queries use query() instead. // The 'range' parameter must be live as long as the reader is used. - mutation_reader make_reader(const query::partition_range& range = query::full_partition_range) const; + // Mutations returned by the reader will all have given schema. + mutation_reader make_reader(schema_ptr schema, const query::partition_range& range = query::full_partition_range) const; mutation_source as_mutation_source() const; @@ -227,16 +229,18 @@ public: ~column_family(); schema_ptr schema() const { return _schema; } db::commitlog* commitlog() { return _commitlog; } - future find_partition(const dht::decorated_key& key) const; - future find_partition_slow(const partition_key& key) const; - future find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const; + future find_partition(schema_ptr, const dht::decorated_key& key) const; + future find_partition_slow(schema_ptr, const partition_key& key) const; + future find_row(schema_ptr, const dht::decorated_key& partition_key, clustering_key clustering_key) const; // Applies given mutation to this column family // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position()); void apply(const mutation& m, const db::replay_position& = db::replay_position()); // Returns at most "cmd.limit" rows - future> query(const query::read_command& cmd, const std::vector& ranges); + future> query(schema_ptr, + const query::read_command& cmd, + const std::vector& ranges); future<> populate(sstring datadir); @@ -362,14 +366,14 @@ private: // so that iteration can be stopped by returning false. // Func signature: bool (const decorated_key& dk, const mutation_partition& mp) template - future for_all_partitions(Func&& func) const; + future for_all_partitions(schema_ptr, Func&& func) const; future probe_file(sstring sstdir, sstring fname); void seal_on_overflow(); void check_valid_rp(const db::replay_position&) const; public: // Iterate over all partitions. Protocol is the same as std::all_of(), // so that iteration can be stopped by returning false. - future for_all_partitions_slow(std::function func) const; + future for_all_partitions_slow(schema_ptr, std::function func) const; friend std::ostream& operator<<(std::ostream& out, const column_family& cf); // Testing purposes. @@ -621,8 +625,8 @@ public: unsigned shard_of(const dht::token& t); unsigned shard_of(const mutation& m); unsigned shard_of(const frozen_mutation& m); - future> query(const query::read_command& cmd, const std::vector& ranges); - future query_mutations(const query::read_command& cmd, const query::partition_range& range); + future> query(schema_ptr, const query::read_command& cmd, const std::vector& ranges); + future query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range); future<> apply(schema_ptr, const frozen_mutation&); keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 4c8a871efe..d6d9a8a4dc 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -423,7 +423,7 @@ future query_partition_mutation(service::storage_proxy& proxy, { auto dk = dht::global_partitioner().decorate_key(*s, pkey); return do_with(query::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) { - return proxy.query_mutations_locally(std::move(cmd), range) + return proxy.query_mutations_locally(s, std::move(cmd), range) .then([dk = std::move(dk), s](foreign_ptr> res) { auto&& partitions = res->partitions(); if (partitions.size() == 0) { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b10a7044ac..9dd435c1a3 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1017,7 +1017,7 @@ query_mutations(distributed& proxy, const sstring& cf_na auto slice = partition_slice_builder(*schema).build(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), std::numeric_limits::max()); - return proxy.local().query_mutations_locally(cmd, query::full_partition_range); + return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range); } future> diff --git a/memtable.cc b/memtable.cc index 6e3990e83e..8cefdca883 100644 --- a/memtable.cc +++ b/memtable.cc @@ -103,6 +103,7 @@ memtable::slice(const query::partition_range& range) const { class scanning_reader final : public mutation_reader::impl { lw_shared_ptr _memtable; + schema_ptr _schema; const query::partition_range& _range; stdx::optional _last; memtable::partitions_type::iterator _i; @@ -144,8 +145,9 @@ private: _last_reclaim_counter = current_reclaim_counter; } public: - scanning_reader(lw_shared_ptr m, const query::partition_range& range) + scanning_reader(schema_ptr s, lw_shared_ptr m, const query::partition_range& range) : _memtable(std::move(m)) + , _schema(std::move(s)) , _range(range) { } @@ -159,7 +161,7 @@ public: // FIXME: Use cache. See column_family::make_reader(). _delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range; _delegate = make_mutation_reader( - _memtable->_sstable, _memtable->_schema, *_delegate_range); + _memtable->_sstable, _schema, *_delegate_range); _memtable = {}; _last = {}; return _delegate(); @@ -174,13 +176,13 @@ public: ++_i; _last = e.key(); _memtable->upgrade_entry(e); - return make_ready_future(mutation(_memtable->_schema, e.key(), e.partition())); + return make_ready_future(e.read(_schema)); } }; mutation_reader -memtable::make_reader(const query::partition_range& range) { - if (query::is_wrap_around(range, *_schema)) { +memtable::make_reader(schema_ptr s, const query::partition_range& range) { + if (query::is_wrap_around(range, *s)) { fail(unimplemented::cause::WRAP_AROUND); } @@ -190,13 +192,13 @@ memtable::make_reader(const query::partition_range& range) { auto i = partitions.find(pos, partition_entry::compare(_schema)); if (i != partitions.end()) { upgrade_entry(*i); - return make_reader_returning(mutation(_schema, i->key(), i->partition())); + return make_reader_returning(i->read(s)); } else { return make_empty_reader(); } }); } else { - return make_mutation_reader(shared_from_this(), range); + return make_mutation_reader(std::move(s), shared_from_this(), range); } } @@ -209,7 +211,7 @@ memtable::update(const db::replay_position& rp) { future<> memtable::apply(memtable& mt) { - return do_with(mt.make_reader(), [this] (auto&& rd) mutable { + return do_with(mt.make_reader(_schema), [this] (auto&& rd) mutable { return consume(rd, [self = this->shared_from_this(), &rd] (mutation&& m) { self->apply(m); return stop_iteration::no; @@ -244,14 +246,14 @@ logalloc::occupancy_stats memtable::occupancy() const { } mutation_source memtable::as_data_source() { - return [mt = shared_from_this()] (const query::partition_range& range) { - return mt->make_reader(range); + return [mt = shared_from_this()] (schema_ptr s, const query::partition_range& range) { + return mt->make_reader(std::move(s), range); }; } key_source memtable::as_key_source() { return [mt = shared_from_this()] (const query::partition_range& range) { - return make_key_from_mutation_reader(mt->make_reader(range)); + return make_key_from_mutation_reader(mt->make_reader(mt->_schema, range)); }; } @@ -278,6 +280,12 @@ bool memtable::is_flushed() const { return bool(_sstable); } +mutation partition_entry::read(const schema_ptr& target_schema) { + auto m = mutation(_schema, _key, _p); + m.upgrade(target_schema); + return m; +} + void memtable::upgrade_entry(partition_entry& e) { if (e._schema != _schema) { assert(!_region.reclaiming_enabled()); diff --git a/memtable.hh b/memtable.hh index 1b988a7659..3f26a6d6c0 100644 --- a/memtable.hh +++ b/memtable.hh @@ -58,6 +58,7 @@ public: mutation_partition& partition() { return _p; } const schema_ptr& schema() const { return _schema; } schema_ptr& schema() { return _schema; } + mutation read(const schema_ptr&); struct compare { dht::decorated_key::less_comparator _c; @@ -132,7 +133,9 @@ public: // doesn't need to ensure that memtable remains live. // // The 'range' parameter must be live as long as the reader is being used - mutation_reader make_reader(const query::partition_range& range = query::full_partition_range); + // + // Mutations returned by the reader will all have given schema. + mutation_reader make_reader(schema_ptr, const query::partition_range& range = query::full_partition_range); mutation_source as_data_source(); key_source as_key_source(); diff --git a/mutation_query.cc b/mutation_query.cc index 567beef809..28fdb06da0 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -108,7 +108,8 @@ void db::serializer::read(reconcilable_result& v, input& in } future -mutation_query(const mutation_source& source, +mutation_query(schema_ptr s, + const mutation_source& source, const query::partition_range& range, const query::partition_slice& slice, uint32_t row_limit, @@ -141,8 +142,9 @@ mutation_query(const mutation_source& source, return make_ready_future(reconcilable_result()); } - return do_with(query_state(range, slice, row_limit, query_time), [&source] (query_state& state) -> future { - state.reader = source(state.range); + return do_with(query_state(range, slice, row_limit, query_time), + [&source, s = std::move(s)] (query_state& state) -> future { + state.reader = source(std::move(s), state.range); return consume(state.reader, [&state] (mutation&& m) { // FIXME: Make data sources respect row_ranges so that we don't have to filter them out here. auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct); diff --git a/mutation_query.hh b/mutation_query.hh index 3ffce507fd..98c9b39efe 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -114,6 +114,7 @@ extern template class serializer; // // 'source' doesn't have to survive deferring. future mutation_query( + schema_ptr, const mutation_source& source, const query::partition_range& range, const query::partition_slice& slice, diff --git a/mutation_reader.hh b/mutation_reader.hh index cabb38bc6f..09c05b4663 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -71,6 +71,9 @@ make_mutation_reader(Args&&... args) { return mutation_reader(std::make_unique(std::forward(args)...)); } +// Creates a mutation reader which combines data return by supplied readers. +// Returns mutation of the same schema only when all readers return mutations +// of the same schema. mutation_reader make_combined_reader(std::vector); mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b); // reads from the input readers, in order @@ -147,7 +150,9 @@ future<> consume(mutation_reader& reader, Consumer consumer) { // mutation_source represents source of data in mutation form. The data source // can be queried multiple times and in parallel. For each query it returns // independent mutation_reader. -using mutation_source = std::function; +// The reader returns mutations having all the same schema, the one passed +// when invoking the source. +using mutation_source = std::function; /// A partition_presence_checker quickly returns whether a key is known not to exist /// in a data source (it may return false positives, but not false negatives). diff --git a/query-request.hh b/query-request.hh index 23d61e01a0..de2207b6f1 100644 --- a/query-request.hh +++ b/query-request.hh @@ -57,6 +57,7 @@ typedef std::vector clustering_row_ranges; // Specifies subset of rows, columns and cell attributes to be returned in a query. // Can be accessed across cores. +// Schema-dependent. class partition_slice { public: enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry, reversed, distinct }; diff --git a/repair/repair.cc b/repair/repair.cc index eeba127d95..99641f707d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -299,7 +299,7 @@ static future checksum_range_shard(database &db, const ::range& range) { auto& cf = db.find_column_family(keyspace_name, cf_name); return do_with(query::to_partition_range(range), [&cf] (const auto& partition_range) { - return do_with(cf.make_reader(partition_range), partition_checksum(), + return do_with(cf.make_reader(cf.schema(), partition_range), partition_checksum(), [] (auto& reader, auto& checksum) { return repeat([&reader, &checksum] () { return reader().then([&checksum] (auto mopt) { diff --git a/row_cache.cc b/row_cache.cc index 7dddb8ec0b..91d32edf6e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -155,11 +155,13 @@ const logalloc::region& cache_tracker::region() const { // Reader which populates the cache using data from the delegate. class populating_reader final : public mutation_reader::impl { + schema_ptr _schema; row_cache& _cache; mutation_reader _delegate; public: - populating_reader(row_cache& cache, mutation_reader delegate) - : _cache(cache) + populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate) + : _schema(std::move(s)) + , _cache(cache) , _delegate(std::move(delegate)) { } @@ -167,6 +169,7 @@ public: return _delegate().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) { if (mo) { _cache.populate(*mo); + mo->upgrade(_schema); } return std::move(mo); }); @@ -184,6 +187,7 @@ void row_cache::on_miss() { } class just_cache_scanning_reader final : public mutation_reader::impl { + schema_ptr _schema; row_cache& _cache; row_cache::partitions_type::iterator _it; row_cache::partitions_type::iterator _end; @@ -227,7 +231,9 @@ private: _last_modification_count = modification_count; } public: - just_cache_scanning_reader(row_cache& cache, const query::partition_range& range) : _cache(cache), _range(range) { } + just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range) + : _schema(std::move(s)), _cache(cache), _range(range) + { } virtual future operator()() override { return _cache._read_section(_cache._tracker.region(), [this] { update_iterators(); @@ -238,7 +244,7 @@ public: ++_it; _last = ce.key(); _cache.upgrade_entry(ce); - return make_ready_future(mutation(_cache._schema, ce.key(), ce.partition())); + return make_ready_future(ce.read(_schema)); }); } }; @@ -259,9 +265,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl { dht::decorated_key_opt _next_key; dht::decorated_key_opt _last_secondary_key; public: - scanning_and_populating_reader(row_cache& cache, const query::partition_range& range) - : _cache(cache), _schema(cache._schema), - _primary(make_mutation_reader(cache, range)), + scanning_and_populating_reader(schema_ptr s, row_cache& cache, const query::partition_range& range) + : _cache(cache), _schema(s), + _primary(make_mutation_reader(s, cache, range)), _underlying(cache._underlying), _original_range(range), _underlying_keys(cache._underlying_keys), _keys(_underlying_keys(range)) { } @@ -298,7 +304,7 @@ public: _range = query::partition_range(query::partition_range::bound { std::move(*dk), true }, std::move(end)); _last_secondary_key = {}; _secondary_phase = _cache._populate_phaser.phase(); - _secondary = _underlying(_range); + _secondary = _underlying(_cache._schema, _range); _secondary_only = true; return next_secondary(); }); @@ -311,7 +317,7 @@ private: auto cmp = dht::ring_position_comparator(*_schema); _range = _range.split_after(*_last_secondary_key, cmp); _secondary_phase = _cache._populate_phaser.phase(); - _secondary = _underlying(_range); + _secondary = _underlying(_cache._schema, _range); } return _secondary().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) { if (!mo && _next_primary) { @@ -324,6 +330,7 @@ private: } if (mo) { _cache.populate(*mo); + mo->upgrade(_schema); _last_secondary_key = mo->decorated_key(); } _cache.on_miss(); @@ -339,17 +346,17 @@ private: }; mutation_reader -row_cache::make_scanning_reader(const query::partition_range& range) { - return make_mutation_reader(*this, range); +row_cache::make_scanning_reader(schema_ptr s, const query::partition_range& range) { + return make_mutation_reader(std::move(s), *this, range); } mutation_reader -row_cache::make_reader(const query::partition_range& range) { +row_cache::make_reader(schema_ptr s, const query::partition_range& range) { if (range.is_singular()) { const query::ring_position& pos = range.start()->value(); if (!pos.has_key()) { - return make_scanning_reader(range); + return make_scanning_reader(std::move(s), range); } return _read_section(_tracker.region(), [&] { @@ -360,15 +367,15 @@ row_cache::make_reader(const query::partition_range& range) { _tracker.touch(e); on_hit(); upgrade_entry(e); - return make_reader_returning(mutation(_schema, dk, e.partition())); + return make_reader_returning(e.read(s)); } else { on_miss(); - return make_mutation_reader(*this, _underlying(range)); + return make_mutation_reader(s, *this, _underlying(_schema, range)); } }); } - return make_scanning_reader(range); + return make_scanning_reader(std::move(s), range); } row_cache::~row_cache() { @@ -542,6 +549,14 @@ cache_entry::cache_entry(cache_entry&& o) noexcept } } +mutation cache_entry::read(const schema_ptr& s) { + auto m = mutation(_schema, _key, _p); + if (_schema != s) { + m.upgrade(s); + } + return m; +} + const schema_ptr& row_cache::schema() const { return _schema; } diff --git a/row_cache.hh b/row_cache.hh index b4a61bfbbc..730b5d2173 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -83,6 +83,7 @@ public: mutation_partition& partition() { return _p; } const schema_ptr& schema() const { return _schema; } schema_ptr& schema() { return _schema; } + mutation read(const schema_ptr&); struct compare { dht::decorated_key::less_comparator _c; @@ -194,7 +195,7 @@ private: logalloc::allocating_section _update_section; logalloc::allocating_section _populate_section; logalloc::allocating_section _read_section; - mutation_reader make_scanning_reader(const query::partition_range&); + mutation_reader make_scanning_reader(schema_ptr, const query::partition_range&); void on_hit(); void on_miss(); void upgrade_entry(cache_entry&); @@ -206,7 +207,10 @@ public: row_cache(const row_cache&) = delete; row_cache& operator=(row_cache&&) = default; public: - mutation_reader make_reader(const query::partition_range& = query::full_partition_range); + // Implements mutation_source for this cache, see mutation_reader.hh + // User needs to ensure that the row_cache object stays alive + // as long as the reader is used. + mutation_reader make_reader(schema_ptr, const query::partition_range& = query::full_partition_range); const stats& stats() const { return _stats; } public: // Populate cache from given mutation. The mutation must contain all diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1939cf69a5..eb31943f93 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -71,6 +71,7 @@ #include #include "utils/latency.hh" #include "schema.hh" +#include "schema_registry.hh" namespace service { @@ -1643,6 +1644,7 @@ protected: using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; + schema_ptr _schema; shared_ptr _proxy; lw_shared_ptr _cmd; lw_shared_ptr _retry_cmd; @@ -1653,9 +1655,9 @@ protected: promise>> _result_promise; public: - abstract_read_executor(shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, + abstract_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector targets) : - _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) { + _schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) { _proxy->_stats.reads++; } virtual ~abstract_read_executor() { @@ -1665,7 +1667,7 @@ public: protected: future>> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_mutations_locally(cmd, _partition_range); + return _proxy->query_mutations_locally(_schema, cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, *cmd, _partition_range).then([this](reconcilable_result&& result) { @@ -1675,7 +1677,7 @@ protected: } future>> make_data_request(gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_singular_local(_cmd, _partition_range); + return _proxy->query_singular_local(_schema, _cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range).then([this](query::result&& result) { @@ -1685,7 +1687,7 @@ protected: } future make_digest_request(gms::inet_address ep) { if (is_me(ep)) { - return _proxy->query_singular_local_digest(_cmd, _partition_range); + return _proxy->query_singular_local_digest(_schema, _cmd, _partition_range); } else { auto& ms = net::get_local_messaging_service(); return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range); @@ -1742,14 +1744,13 @@ protected: data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) { try { f.get(); - schema_ptr s = _proxy->_db.local().find_schema(_cmd->cf_id); - auto rr = data_resolver->resolve(s); // reconciliation happens here + auto rr = data_resolver->resolve(_schema); // reconciliation happens here // We generate a retry if at least one node reply with count live columns but after merge we have less // than the total number of column we are interested in (which may be < count on a retry). // So in particular, if no host returned count live columns, we know it's not a short read. if (data_resolver->max_live_count() < cmd->row_limit || rr.row_count() >= original_row_limit()) { - auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), std::move(s), _cmd->slice))); + auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), _schema, _cmd->slice))); // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) @@ -1890,8 +1891,8 @@ public: class range_slice_read_executor : public abstract_read_executor { public: - range_slice_read_executor(shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - abstract_read_executor(std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} + range_slice_read_executor(schema_ptr s, shared_ptr proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : + abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} virtual future>> execute(std::chrono::steady_clock::time_point timeout) override { reconcile(_cl, timeout); return _result_promise.get_future(); @@ -1913,7 +1914,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s ::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl) { const dht::token& token = pr.start()->value().token(); - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector all_replicas = get_live_sorted_endpoints(ks, token); @@ -1936,14 +1937,14 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s auto p = shared_from_this(); // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. if (retry_type == speculative_retry::type::NONE || db::block_for(ks, cl) == all_replicas.size()) { - return ::make_shared(p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } if (target_replicas.size() == all_replicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return ::make_shared(/*cfs, */p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. @@ -1959,24 +1960,24 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s target_replicas.push_back(extra_replica); if (retry_type == speculative_retry::type::ALWAYS) { - return ::make_shared(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } else {// PERCENTILE or CUSTOM. - return ::make_shared(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); + return ::make_shared(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } } future -storage_proxy::query_singular_local_digest(lw_shared_ptr cmd, const query::partition_range& pr) { - return query_singular_local(cmd, pr).then([] (foreign_ptr> result) { +storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { + return query_singular_local(std::move(s), std::move(cmd), pr).then([] (foreign_ptr> result) { return result->digest(); }); } future>> -storage_proxy::query_singular_local(lw_shared_ptr cmd, const query::partition_range& pr) { +storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { unsigned shard = _db.local().shard_of(pr.start()->value().token()); - return _db.invoke_on(shard, [prv = std::vector({pr}) /* FIXME: pr is copied */, cmd] (database& db) { - return db.query(*cmd, prv).then([](auto&& f) { + return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector({pr}) /* FIXME: pr is copied */, cmd] (database& db) { + return db.query(gs, *cmd, prv).then([](auto&& f) { return make_foreign(std::move(f)); }); }); @@ -2011,7 +2012,7 @@ future>>> storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, std::vector::iterator&& i, std::vector&& ranges, int concurrency_factor) { - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector<::shared_ptr> exec; auto concurrent_fetch_starting_index = i; @@ -2064,7 +2065,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t ++i; } db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints); - exec.push_back(::make_shared(p, cmd, std::move(range), cl, std::move(filtered_endpoints))); + exec.push_back(::make_shared(schema, p, cmd, std::move(range), cl, std::move(filtered_endpoints))); } query::result_merger merger; @@ -2087,7 +2088,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t future>> storage_proxy::query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl) { - schema_ptr schema = _db.local().find_schema(cmd->cf_id); + schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); std::vector ranges; auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms()); @@ -2712,17 +2713,23 @@ void storage_proxy::init_messaging_service() { }); ms.register_read_data([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_singular_local(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_singular_local(std::move(s), cmd, pr); }); }); ms.register_read_mutation_data([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_mutations_locally(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_mutations_locally(std::move(s), cmd, pr); }); }); ms.register_read_digest([] (query::read_command cmd, query::partition_range pr) { return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared(std::move(cmd))] (const query::partition_range& pr, shared_ptr& p) { - return p->query_singular_local_digest(cmd, pr); + warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME + schema_ptr s = local_schema_registry().get(cmd->schema_version); + return p->query_singular_local_digest(std::move(s), cmd, pr); }); }); ms.register_truncate([](sstring ksname, sstring cfname) { @@ -2840,18 +2847,17 @@ public: }; future>> -storage_proxy::query_mutations_locally(lw_shared_ptr cmd, const query::partition_range& pr) { +storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr) { if (pr.is_singular()) { unsigned shard = _db.local().shard_of(pr.start()->value().token()); - return _db.invoke_on(shard, [cmd, &pr] (database& db) { - return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) { + return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) { + return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) { return make_foreign(make_lw_shared(std::move(result))); }); }); } else { - auto schema = _db.local().find_schema(cmd->cf_id); - return _db.map_reduce(mutation_result_merger{cmd, schema}, [cmd, &pr] (database& db) { - return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) { + return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) { + return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) { return make_foreign(make_lw_shared(std::move(result))); }); }).then([] (reconcilable_result&& result) { @@ -2869,8 +2875,8 @@ storage_proxy::stop() { class shard_reader final : public mutation_reader::impl { distributed& _db; unsigned _shard; - utils::UUID _cf_id; const query::partition_range _range; + global_schema_ptr _schema; schema_ptr _local_schema; struct remote_state { mutation_reader reader; @@ -2879,20 +2885,24 @@ class shard_reader final : public mutation_reader::impl { foreign_ptr> _remote; private: future<> init() { - _local_schema = _db.local().find_column_family(_cf_id).schema(); return _db.invoke_on(_shard, [this] (database& db) { - column_family& cf = db.find_column_family(_cf_id); - return make_foreign(std::make_unique(remote_state{cf.make_reader(_range)})); + schema_ptr s = _schema; + column_family& cf = db.find_column_family(s->id()); + return make_foreign(std::make_unique(remote_state{cf.make_reader(std::move(s), _range)})); }).then([this] (auto&& ptr) { _remote = std::move(ptr); }); } public: - shard_reader(utils::UUID cf_id, distributed& db, unsigned shard, const query::partition_range& range) + shard_reader(schema_ptr s, + distributed& db, + unsigned shard, + const query::partition_range& range) : _db(db) , _shard(shard) - , _cf_id(cf_id) , _range(range) + , _schema(s) + , _local_schema(std::move(s)) { } virtual future operator()() override { @@ -2938,7 +2948,7 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range unsigned last_shard = range.end() ? dht::shard_of(range.end()->value().token()) : smp::count - 1; std::vector readers; for (auto cpu = first_shard; cpu <= last_shard; ++cpu) { - readers.emplace_back(make_mutation_reader(cf_id, _db, cpu, range)); + readers.emplace_back(make_mutation_reader(schema, _db, cpu, range)); } return make_joining_reader(std::move(readers)); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ad7758297a..72172a3d79 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -143,8 +143,8 @@ private: std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl); - future>> query_singular_local(lw_shared_ptr cmd, const query::partition_range& pr); - future query_singular_local_digest(lw_shared_ptr cmd, const query::partition_range& pr); + future>> query_singular_local(schema_ptr, lw_shared_ptr cmd, const query::partition_range& pr); + future query_singular_local_digest(schema_ptr, lw_shared_ptr cmd, const query::partition_range& pr); future>> query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl); std::vector get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range); float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); @@ -222,11 +222,12 @@ public: db::consistency_level cl); future>> query_mutations_locally( - lw_shared_ptr cmd, const query::partition_range&); + schema_ptr, lw_shared_ptr cmd, const query::partition_range&); /* * Returns mutation_reader for given column family * which combines data from all shards. + * Uses schema current at the time of invocation. */ mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 841684668e..c57e25e867 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1354,7 +1354,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_ future<> sstable::write_components(memtable& mt) { _collector.set_replay_position(mt.replay_position()); - return write_components(mt.make_reader(), + return write_components(mt.make_reader(mt.schema()), mt.partition_count(), mt.schema(), std::numeric_limits::max()); } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 13f81071f5..2aa48988b5 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -211,7 +211,8 @@ public: table_name = std::move(table_name)] (database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - return cf.find_partition_slow(pkey).then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { + return cf.find_partition_slow(schema, pkey) + .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(ckey); assert(row != nullptr); diff --git a/tests/mutation_query_test.cc b/tests/mutation_query_test.cc index c164bfdc51..d9c1e644b1 100644 --- a/tests/mutation_query_test.cc +++ b/tests/mutation_query_test.cc @@ -59,8 +59,11 @@ struct mutation_less_cmp { }; mutation_source make_source(std::vector mutations) { - return [mutations = std::move(mutations)] (const query::partition_range& range) { + return [mutations = std::move(mutations)] (schema_ptr s, const query::partition_range& range) { assert(range.is_full()); // slicing not implemented yet + for (auto&& m : mutations) { + assert(m.schema() == s); + } return make_reader_returning_many(mutations); }; } @@ -90,7 +93,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); // FIXME: use mutation assertions @@ -113,7 +116,8 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { clustering_key_prefix::from_single_value(*s, bytes("B")))) .build(); - reconcilable_result result = mutation_query(src, query::full_partition_range, slice, query::max_rows, now).get0(); + reconcilable_result result = mutation_query(s, src, + query::full_partition_range, slice, query::max_rows, now).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -145,7 +149,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -159,7 +163,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { { auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now + 2s).get0(); assert_that(to_result_set(result, s, slice)) @@ -191,7 +195,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -221,7 +225,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -249,7 +253,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 10, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -269,7 +273,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { } { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 1, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -281,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { } { - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -308,7 +312,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 2, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -332,7 +336,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -354,7 +358,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .reversed() .build(); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, 3, now).get0(); assert_that(to_result_set(result, s, slice)) @@ -379,7 +383,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { auto src = make_source({m1}); auto slice = make_full_slice(*s); - reconcilable_result result = mutation_query(src, + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, now).get0(); assert_that(to_result_set(result, s, slice)) diff --git a/tests/mutation_source_test.cc b/tests/mutation_source_test.cc index a740cdbf47..4a2702661c 100644 --- a/tests/mutation_source_test.cc +++ b/tests/mutation_source_test.cc @@ -70,7 +70,7 @@ static void test_range_queries(populate_fn populate) { auto test_slice = [&] (query::range r) { BOOST_MESSAGE(sprint("Testing range %s", r)); - assert_that(ds(r)) + assert_that(ds(s, r)) .produces(slice(partitions, r)) .produces_end_of_stream(); }; diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index a601015868..6392eb7677 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -57,7 +57,7 @@ static atomic_cell make_atomic_cell(bytes value) { static mutation_partition get_partition(memtable& mt, const partition_key& key) { auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key); - auto reader = mt.make_reader(query::partition_range::make_singular(dk)); + auto reader = mt.make_reader(mt.schema(), query::partition_range::make_singular(dk)); auto mo = reader().get0(); BOOST_REQUIRE(bool(mo)); return std::move(mo->partition()); @@ -296,7 +296,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { insert_row(1003, 2003)).discard_result().then([s, &r1_col, &cf, key] { auto verify_row = [&] (int32_t c1, int32_t r1) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); - return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { + return cf.find_row(cf.schema(), dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); BOOST_REQUIRE(i); @@ -353,13 +353,13 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) { std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator()); // Flush will happen in the middle of reading for this scanner - auto assert_that_scanner1 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner1 = assert_that(cf.make_reader(s, query::full_partition_range)); // Flush will happen before it is invoked - auto assert_that_scanner2 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner2 = assert_that(cf.make_reader(s, query::full_partition_range)); // Flush will happen after all data was read, but before EOS was consumed - auto assert_that_scanner3 = assert_that(cf.make_reader(query::full_partition_range)); + auto assert_that_scanner3 = assert_that(cf.make_reader(s, query::full_partition_range)); assert_that_scanner1.produces(mutations[0]); assert_that_scanner1.produces(mutations[1]); @@ -432,7 +432,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { } return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) { - return cf.for_all_partitions_slow([&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { + return cf.for_all_partitions_slow(s, [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { auto p1 = value_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); diff --git a/tests/perf_row_cache_update.cc b/tests/perf_row_cache_update.cc index d752a61d38..f883c66846 100644 --- a/tests/perf_row_cache_update.cc +++ b/tests/perf_row_cache_update.cc @@ -68,7 +68,7 @@ int main(int argc, char** argv) { .build(); cache_tracker tracker; - row_cache cache(s, [] (auto&&) { return make_empty_reader(); }, + row_cache cache(s, [] (schema_ptr, auto&&) { return make_empty_reader(); }, [] (auto&&) { return key_reader(); }, tracker); size_t partitions = app.configuration()["partitions"].as(); diff --git a/tests/row_cache_alloc_stress.cc b/tests/row_cache_alloc_stress.cc index 4b4dd5c02d..e999ac69a6 100644 --- a/tests/row_cache_alloc_stress.cc +++ b/tests/row_cache_alloc_stress.cc @@ -181,7 +181,7 @@ int main(int argc, char** argv) { // Verify that all mutations from memtable went through for (auto&& key : keys) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); auto mo = reader().get0(); assert(mo); assert(mo->partition().live_row_count(*s) == @@ -198,7 +198,7 @@ int main(int argc, char** argv) { for (auto&& key : keys) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); auto mo = reader().get0(); assert(mo); } @@ -235,7 +235,7 @@ int main(int argc, char** argv) { } try { - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(s, range); assert(!reader().get0()); auto evicted_from_cache = logalloc::segment_size + cell_size * row_count; new char[evicted_from_cache + logalloc::segment_size]; diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index 59ecda4e69..5f4b238622 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -69,13 +69,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) { auto m = make_new_mutation(s); cache_tracker tracker; - row_cache cache(s, [m] (const query::partition_range&) { + row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) { + assert(m.schema() == s); return make_reader_returning(m); }, [m] (auto&&) { return make_key_from_mutation_reader(make_reader_returning(m)); }, tracker); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -87,19 +88,20 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) { auto m = make_new_mutation(s); cache_tracker tracker; - row_cache cache(s, [m] (const query::partition_range&) { + row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) { + assert(m.schema() == s); return make_reader_returning(m); }, [m] (auto&&) { return make_key_from_mutation_reader(make_reader_returning(m)); }, tracker); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); tracker.clear(); - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -133,26 +135,26 @@ SEASTAR_TEST_CASE(test_query_of_incomplete_range_goes_to_underlying) { }; // Populate cache for first key - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); // Populate cache for last key - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); // Test single-key queries - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); // Test range query - assert_that(cache.make_reader(query::full_partition_range)) + assert_that(cache.make_reader(s, query::full_partition_range)) .produces(mutations[0]) .produces(mutations[1]) .produces(mutations[2]) @@ -180,15 +182,15 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) { }; for (int i = 0; i < 2; ++i) { - assert_that(cache.make_reader(get_partition_range(mutations[2]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[2]))) .produces(mutations[2]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[1]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[1]))) .produces(mutations[1]) .produces_end_of_stream(); - assert_that(cache.make_reader(get_partition_range(mutations[0]))) + assert_that(cache.make_reader(s, get_partition_range(mutations[0]))) .produces(mutations[0]) .produces_end_of_stream(); } @@ -207,8 +209,8 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) { } auto cache = make_lw_shared(s, mt->as_data_source(), mt->as_key_source(), tracker); - return [cache] (const query::partition_range& range) { - return cache->make_reader(range); + return [cache] (schema_ptr s, const query::partition_range& range) { + return cache->make_reader(s, range); }; }); }); @@ -232,7 +234,7 @@ SEASTAR_TEST_CASE(test_eviction) { std::random_shuffle(keys.begin(), keys.end()); for (auto&& key : keys) { - cache.make_reader(query::partition_range::make_singular(key)); + cache.make_reader(s, query::partition_range::make_singular(key)); } while (tracker.region().occupancy().used_space() > 0) { @@ -243,7 +245,7 @@ SEASTAR_TEST_CASE(test_eviction) { bool has_key(row_cache& cache, const dht::decorated_key& key) { auto range = query::partition_range::make_singular(key); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(cache.schema(), range); auto mo = reader().get0(); return bool(mo); } @@ -258,7 +260,7 @@ void verify_does_not_have(row_cache& cache, const dht::decorated_key& key) { void verify_has(row_cache& cache, const mutation& m) { auto range = query::partition_range::make_singular(m.decorated_key()); - auto reader = cache.make_reader(range); + auto reader = cache.make_reader(cache.schema(), range); auto mo = reader().get0(); BOOST_REQUIRE(bool(mo)); assert_that(*mo).is_equal_to(m); @@ -410,8 +412,8 @@ private: : _underlying(std::move(underlying)) { } - mutation_reader make_reader(const query::partition_range& pr) { - return make_mutation_reader(_throttle, _underlying(pr)); + mutation_reader make_reader(schema_ptr s, const query::partition_range& pr) { + return make_mutation_reader(_throttle, _underlying(s, pr)); } ::throttle& throttle() { return _throttle; } @@ -430,8 +432,8 @@ public: _impl->throttle().unblock(); } - mutation_reader operator()(const query::partition_range& pr) { - return _impl->make_reader(pr); + mutation_reader operator()(schema_ptr s, const query::partition_range& pr) { + return _impl->make_reader(s, pr); } }; @@ -447,10 +449,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema(); std::vector> memtables; - auto memtables_data_source = [&] (const query::partition_range& pr) { + auto memtables_data_source = [&] (schema_ptr s, const query::partition_range& pr) { std::vector readers; for (auto&& mt : memtables) { - readers.emplace_back(mt->make_reader(pr)); + readers.emplace_back(mt->make_reader(s, pr)); } return make_combined_reader(std::move(readers)); }; @@ -481,10 +483,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { cache_source.block(); auto m0_range = query::partition_range::make_singular(ring[0].ring_position()); - auto rd1 = cache.make_reader(m0_range); + auto rd1 = cache.make_reader(s, m0_range); auto rd1_result = rd1(); - auto rd2 = cache.make_reader(); + auto rd2 = cache.make_reader(s); auto rd2_result = rd2(); sleep(10ms).get(); @@ -495,7 +497,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { // This update should miss on all partitions auto update_future = cache.update(*mt2, make_default_partition_presence_checker()); - auto rd3 = cache.make_reader(); + auto rd3 = cache.make_reader(s); // rd2, which is in progress, should not prevent forward progress of update() cache_source.unblock(); @@ -519,7 +521,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { .produces_end_of_stream(); // Reads started after flush should see new data - assert_that(cache.make_reader()) + assert_that(cache.make_reader(s)) .produces(ring2[0]) .produces(ring2[1]) .produces(ring2[2]) diff --git a/tests/sstable_mutation_test.cc b/tests/sstable_mutation_test.cc index f16d17e33c..5933b18902 100644 --- a/tests/sstable_mutation_test.cc +++ b/tests/sstable_mutation_test.cc @@ -345,8 +345,8 @@ SEASTAR_TEST_CASE(read_partial_range_2) { }); } -::mutation_source as_mutation_source(schema_ptr s, lw_shared_ptr sst) { - return [s, sst] (const query::partition_range& range) mutable { +::mutation_source as_mutation_source(lw_shared_ptr sst) { + return [sst] (schema_ptr s, const query::partition_range& range) mutable { return as_mutation_reader(sst, sst->read_range_rows(s, range)); }; } @@ -373,7 +373,7 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) { sst->write_components(*mt).get(); sst->load().get(); - return as_mutation_source(s, sst); + return as_mutation_source(sst); }); }); } diff --git a/thrift/handler.cc b/thrift/handler.cc index e57c66c3f4..6a15aaa4e2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -195,16 +195,18 @@ public: throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { auto&& range = predicate.slice_range; - return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) { + auto s = cf.schema(); + return cf.find_row(s, dk, clustering_key::make_empty(*s)).then( + [s, &cf, range = std::move(range)] (column_family::const_row_ptr rw) { std::vector ret; if (rw) { - auto beg = cf.schema()->regular_begin(); + auto beg = s->regular_begin(); if (!range.start.empty()) { - beg = cf.schema()->regular_lower_bound(to_bytes(range.start)); + beg = s->regular_lower_bound(to_bytes(range.start)); } - auto end = cf.schema()->regular_end(); + auto end = s->regular_end(); if (!range.finish.empty()) { - end = cf.schema()->regular_upper_bound(to_bytes(range.finish)); + end = s->regular_upper_bound(to_bytes(range.finish)); } auto count = range.count; // FIXME: force limit count?