db: Make read interface schema version aware

The intent is to make data returned by queries always conform to a
single schema version, which is requested by the client. For CQL
queries, for example, we want to use the same schema which was used to
compile the query. The other node expects to receive data conforming
to the requested schema.

Interface on shard level accepts schema_ptr, across nodes we use
table_schema_version UUID. To transfer schema_ptr across shards, we
use global_schema_ptr.

Because schema is identified with UUID across nodes, requestors must
be prepared for being queried for the definition of the schema. They
must hold a live schema_ptr around the request. This guarantees that
schema_registry will always know about the requested version. This is
not an issue because for queries the requestor needs to hold on to the
schema anyway to be able to interpret the results. But care must be
taken to always use the same schema version for making the request and
parsing the results.

Schema requesting across nodes is currently stubbed (throws runtime
exception).
This commit is contained in:
Tomasz Grabiec
2015-12-07 17:56:20 +01:00
parent 036974e19b
commit 4e5a52d6fa
25 changed files with 256 additions and 189 deletions

View File

@@ -127,8 +127,8 @@ column_family::make_partition_presence_checker(lw_shared_ptr<sstable_list> old_s
mutation_source
column_family::sstables_as_mutation_source() {
return [this] (const query::partition_range& r) {
return make_sstable_reader(r);
return [this] (schema_ptr s, const query::partition_range& r) {
return make_sstable_reader(std::move(s), r);
};
}
@@ -207,16 +207,16 @@ public:
};
mutation_reader
column_family::make_sstable_reader(const query::partition_range& pr) const {
column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr) const {
if (pr.is_singular() && pr.start()->value().has_key()) {
const dht::ring_position& pos = pr.start()->value();
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return make_empty_reader(); // range doesn't belong to this shard
}
return make_mutation_reader<single_key_sstable_reader>(_schema, _sstables, *pos.key());
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key());
} else {
// range_sstable_reader is not movable so we need to wrap it
return make_mutation_reader<range_sstable_reader>(_schema, _sstables, pr);
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr);
}
}
@@ -240,9 +240,9 @@ key_source column_family::sstables_as_key_source() const {
// Exposed for testing, not performance critical.
future<column_family::const_mutation_partition_ptr>
column_family::find_partition(const dht::decorated_key& key) const {
return do_with(query::partition_range::make_singular(key), [this] (auto& range) {
return do_with(this->make_reader(range), [] (mutation_reader& reader) {
column_family::find_partition(schema_ptr s, const dht::decorated_key& key) const {
return do_with(query::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) {
return do_with(this->make_reader(s, range), [] (mutation_reader& reader) {
return reader().then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
if (!mo) {
return {};
@@ -254,13 +254,13 @@ column_family::find_partition(const dht::decorated_key& key) const {
}
future<column_family::const_mutation_partition_ptr>
column_family::find_partition_slow(const partition_key& key) const {
return find_partition(dht::global_partitioner().decorate_key(*_schema, key));
column_family::find_partition_slow(schema_ptr s, const partition_key& key) const {
return find_partition(s, dht::global_partitioner().decorate_key(*s, key));
}
future<column_family::const_row_ptr>
column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const {
return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) {
column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
return find_partition(std::move(s), partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) {
if (!p) {
return make_ready_future<const_row_ptr>();
}
@@ -275,8 +275,8 @@ column_family::find_row(const dht::decorated_key& partition_key, clustering_key
}
mutation_reader
column_family::make_reader(const query::partition_range& range) const {
if (query::is_wrap_around(range, *_schema)) {
column_family::make_reader(schema_ptr s, const query::partition_range& range) const {
if (query::is_wrap_around(range, *s)) {
// make_combined_reader() can't handle streams that wrap around yet.
fail(unimplemented::cause::WRAP_AROUND);
}
@@ -305,13 +305,13 @@ column_family::make_reader(const query::partition_range& range) const {
// https://github.com/scylladb/scylla/issues/185
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_reader(range));
readers.emplace_back(mt->make_reader(s, range));
}
if (_config.enable_cache) {
readers.emplace_back(_cache.make_reader(range));
readers.emplace_back(_cache.make_reader(s, range));
} else {
readers.emplace_back(make_sstable_reader(range));
readers.emplace_back(make_sstable_reader(s, range));
}
return make_combined_reader(std::move(readers));
@@ -319,7 +319,7 @@ column_family::make_reader(const query::partition_range& range) const {
template <typename Func>
future<bool>
column_family::for_all_partitions(Func&& func) const {
column_family::for_all_partitions(schema_ptr s, Func&& func) const {
static_assert(std::is_same<bool, std::result_of_t<Func(const dht::decorated_key&, const mutation_partition&)>>::value,
"bad Func signature");
@@ -330,13 +330,13 @@ column_family::for_all_partitions(Func&& func) const {
bool empty = false;
public:
bool done() const { return !ok || empty; }
iteration_state(const column_family& cf, Func&& func)
: reader(cf.make_reader())
iteration_state(schema_ptr s, const column_family& cf, Func&& func)
: reader(cf.make_reader(std::move(s)))
, func(std::move(func))
{ }
};
return do_with(iteration_state(*this, std::move(func)), [] (iteration_state& is) {
return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) {
return do_until([&is] { return is.done(); }, [&is] {
return is.reader().then([&is](mutation_opt&& mo) {
if (!mo) {
@@ -352,8 +352,8 @@ column_family::for_all_partitions(Func&& func) const {
}
future<bool>
column_family::for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
return for_all_partitions(std::move(func));
column_family::for_all_partitions_slow(schema_ptr s, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
return for_all_partitions(std::move(s), std::move(func));
}
class lister {
@@ -1486,13 +1486,17 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
}
struct query_state {
explicit query_state(const query::read_command& cmd, const std::vector<query::partition_range>& ranges)
: cmd(cmd)
explicit query_state(schema_ptr s,
const query::read_command& cmd,
const std::vector<query::partition_range>& ranges)
: schema(std::move(s))
, cmd(cmd)
, builder(cmd.slice)
, limit(cmd.row_limit)
, current_partition_range(ranges.begin())
, range_end(ranges.end()){
}
schema_ptr schema;
const query::read_command& cmd;
query::result::builder builder;
uint32_t limit;
@@ -1506,21 +1510,21 @@ struct query_state {
};
future<lw_shared_ptr<query::result>>
column_family::query(const query::read_command& cmd, const std::vector<query::partition_range>& partition_ranges) {
column_family::query(schema_ptr s, const query::read_command& cmd, const std::vector<query::partition_range>& partition_ranges) {
utils::latency_counter lc;
_stats.reads.set_latency(lc);
return do_with(query_state(cmd, partition_ranges), [this] (query_state& qs) {
return do_with(query_state(std::move(s), cmd, partition_ranges), [this] (query_state& qs) {
return do_until(std::bind(&query_state::done, &qs), [this, &qs] {
auto&& range = *qs.current_partition_range++;
qs.reader = make_reader(range);
qs.reader = make_reader(qs.schema, range);
qs.range_empty = false;
return do_until([&qs] { return !qs.limit || qs.range_empty; }, [this, &qs] {
return qs.reader().then([this, &qs](mutation_opt mo) {
return do_until([&qs] { return !qs.limit || qs.range_empty; }, [&qs] {
return qs.reader().then([&qs](mutation_opt mo) {
if (mo) {
auto p_builder = qs.builder.add_partition(*mo->schema(), mo->key());
auto is_distinct = qs.cmd.slice.options.contains(query::partition_slice::option::distinct);
auto limit = !is_distinct ? qs.limit : 1;
mo->partition().query(p_builder, *_schema, qs.cmd.timestamp, limit);
mo->partition().query(p_builder, *qs.schema, qs.cmd.timestamp, limit);
qs.limit -= p_builder.row_count();
} else {
qs.range_empty = true;
@@ -1541,21 +1545,21 @@ column_family::query(const query::read_command& cmd, const std::vector<query::pa
mutation_source
column_family::as_mutation_source() const {
return [this] (const query::partition_range& range) {
return this->make_reader(range);
return [this] (schema_ptr s, const query::partition_range& range) {
return this->make_reader(std::move(s), range);
};
}
future<lw_shared_ptr<query::result>>
database::query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges) {
database::query(schema_ptr s, const query::read_command& cmd, const std::vector<query::partition_range>& ranges) {
column_family& cf = find_column_family(cmd.cf_id);
return cf.query(cmd, ranges);
return cf.query(std::move(s), cmd, ranges);
}
future<reconcilable_result>
database::query_mutations(const query::read_command& cmd, const query::partition_range& range) {
database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) {
column_family& cf = find_column_family(cmd.cf_id);
return mutation_query(cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp);
return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp);
}
std::unordered_set<sstring> database::get_initial_tokens() {

View File

@@ -189,7 +189,8 @@ private:
// Creates a mutation reader which covers sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// The 'range' parameter must be live as long as the reader is used.
mutation_reader make_sstable_reader(const query::partition_range& range) const;
// Mutations returned by the reader will all have given schema.
mutation_reader make_sstable_reader(schema_ptr schema, const query::partition_range& range) const;
mutation_source sstables_as_mutation_source();
key_source sstables_as_key_source() const;
@@ -200,7 +201,8 @@ public:
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// Note: for data queries use query() instead.
// The 'range' parameter must be live as long as the reader is used.
mutation_reader make_reader(const query::partition_range& range = query::full_partition_range) const;
// Mutations returned by the reader will all have given schema.
mutation_reader make_reader(schema_ptr schema, const query::partition_range& range = query::full_partition_range) const;
mutation_source as_mutation_source() const;
@@ -227,16 +229,18 @@ public:
~column_family();
schema_ptr schema() const { return _schema; }
db::commitlog* commitlog() { return _commitlog; }
future<const_mutation_partition_ptr> find_partition(const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(const partition_key& key) const;
future<const_row_ptr> find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const;
future<const_mutation_partition_ptr> find_partition(schema_ptr, const dht::decorated_key& key) const;
future<const_mutation_partition_ptr> find_partition_slow(schema_ptr, const partition_key& key) const;
future<const_row_ptr> find_row(schema_ptr, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
// Applies given mutation to this column family
// The mutation is always upgraded to current schema.
void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position());
void apply(const mutation& m, const db::replay_position& = db::replay_position());
// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
future<lw_shared_ptr<query::result>> query(schema_ptr,
const query::read_command& cmd,
const std::vector<query::partition_range>& ranges);
future<> populate(sstring datadir);
@@ -362,14 +366,14 @@ private:
// so that iteration can be stopped by returning false.
// Func signature: bool (const decorated_key& dk, const mutation_partition& mp)
template <typename Func>
future<bool> for_all_partitions(Func&& func) const;
future<bool> for_all_partitions(schema_ptr, Func&& func) const;
future<sstables::entry_descriptor> probe_file(sstring sstdir, sstring fname);
void seal_on_overflow();
void check_valid_rp(const db::replay_position&) const;
public:
// Iterate over all partitions. Protocol is the same as std::all_of(),
// so that iteration can be stopped by returning false.
future<bool> for_all_partitions_slow(std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
// Testing purposes.
@@ -621,8 +625,8 @@ public:
unsigned shard_of(const dht::token& t);
unsigned shard_of(const mutation& m);
unsigned shard_of(const frozen_mutation& m);
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
future<reconcilable_result> query_mutations(const query::read_command& cmd, const query::partition_range& range);
future<lw_shared_ptr<query::result>> query(schema_ptr, const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
future<reconcilable_result> query_mutations(schema_ptr, const query::read_command& cmd, const query::partition_range& range);
future<> apply(schema_ptr, const frozen_mutation&);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
const sstring& get_snitch_name() const;

View File

@@ -423,7 +423,7 @@ future<mutation> query_partition_mutation(service::storage_proxy& proxy,
{
auto dk = dht::global_partitioner().decorate_key(*s, pkey);
return do_with(query::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) {
return proxy.query_mutations_locally(std::move(cmd), range)
return proxy.query_mutations_locally(s, std::move(cmd), range)
.then([dk = std::move(dk), s](foreign_ptr<lw_shared_ptr<reconcilable_result>> res) {
auto&& partitions = res->partitions();
if (partitions.size() == 0) {

View File

@@ -1017,7 +1017,7 @@ query_mutations(distributed<service::storage_proxy>& proxy, const sstring& cf_na
auto slice = partition_slice_builder(*schema).build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(),
std::move(slice), std::numeric_limits<uint32_t>::max());
return proxy.local().query_mutations_locally(cmd, query::full_partition_range);
return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range);
}
future<lw_shared_ptr<query::result_set>>

View File

@@ -103,6 +103,7 @@ memtable::slice(const query::partition_range& range) const {
class scanning_reader final : public mutation_reader::impl {
lw_shared_ptr<memtable> _memtable;
schema_ptr _schema;
const query::partition_range& _range;
stdx::optional<dht::decorated_key> _last;
memtable::partitions_type::iterator _i;
@@ -144,8 +145,9 @@ private:
_last_reclaim_counter = current_reclaim_counter;
}
public:
scanning_reader(lw_shared_ptr<memtable> m, const query::partition_range& range)
scanning_reader(schema_ptr s, lw_shared_ptr<memtable> m, const query::partition_range& range)
: _memtable(std::move(m))
, _schema(std::move(s))
, _range(range)
{ }
@@ -159,7 +161,7 @@ public:
// FIXME: Use cache. See column_family::make_reader().
_delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range;
_delegate = make_mutation_reader<sstable_range_wrapping_reader>(
_memtable->_sstable, _memtable->_schema, *_delegate_range);
_memtable->_sstable, _schema, *_delegate_range);
_memtable = {};
_last = {};
return _delegate();
@@ -174,13 +176,13 @@ public:
++_i;
_last = e.key();
_memtable->upgrade_entry(e);
return make_ready_future<mutation_opt>(mutation(_memtable->_schema, e.key(), e.partition()));
return make_ready_future<mutation_opt>(e.read(_schema));
}
};
mutation_reader
memtable::make_reader(const query::partition_range& range) {
if (query::is_wrap_around(range, *_schema)) {
memtable::make_reader(schema_ptr s, const query::partition_range& range) {
if (query::is_wrap_around(range, *s)) {
fail(unimplemented::cause::WRAP_AROUND);
}
@@ -190,13 +192,13 @@ memtable::make_reader(const query::partition_range& range) {
auto i = partitions.find(pos, partition_entry::compare(_schema));
if (i != partitions.end()) {
upgrade_entry(*i);
return make_reader_returning(mutation(_schema, i->key(), i->partition()));
return make_reader_returning(i->read(s));
} else {
return make_empty_reader();
}
});
} else {
return make_mutation_reader<scanning_reader>(shared_from_this(), range);
return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), range);
}
}
@@ -209,7 +211,7 @@ memtable::update(const db::replay_position& rp) {
future<>
memtable::apply(memtable& mt) {
return do_with(mt.make_reader(), [this] (auto&& rd) mutable {
return do_with(mt.make_reader(_schema), [this] (auto&& rd) mutable {
return consume(rd, [self = this->shared_from_this(), &rd] (mutation&& m) {
self->apply(m);
return stop_iteration::no;
@@ -244,14 +246,14 @@ logalloc::occupancy_stats memtable::occupancy() const {
}
mutation_source memtable::as_data_source() {
return [mt = shared_from_this()] (const query::partition_range& range) {
return mt->make_reader(range);
return [mt = shared_from_this()] (schema_ptr s, const query::partition_range& range) {
return mt->make_reader(std::move(s), range);
};
}
key_source memtable::as_key_source() {
return [mt = shared_from_this()] (const query::partition_range& range) {
return make_key_from_mutation_reader(mt->make_reader(range));
return make_key_from_mutation_reader(mt->make_reader(mt->_schema, range));
};
}
@@ -278,6 +280,12 @@ bool memtable::is_flushed() const {
return bool(_sstable);
}
mutation partition_entry::read(const schema_ptr& target_schema) {
auto m = mutation(_schema, _key, _p);
m.upgrade(target_schema);
return m;
}
void memtable::upgrade_entry(partition_entry& e) {
if (e._schema != _schema) {
assert(!_region.reclaiming_enabled());

View File

@@ -58,6 +58,7 @@ public:
mutation_partition& partition() { return _p; }
const schema_ptr& schema() const { return _schema; }
schema_ptr& schema() { return _schema; }
mutation read(const schema_ptr&);
struct compare {
dht::decorated_key::less_comparator _c;
@@ -132,7 +133,9 @@ public:
// doesn't need to ensure that memtable remains live.
//
// The 'range' parameter must be live as long as the reader is being used
mutation_reader make_reader(const query::partition_range& range = query::full_partition_range);
//
// Mutations returned by the reader will all have given schema.
mutation_reader make_reader(schema_ptr, const query::partition_range& range = query::full_partition_range);
mutation_source as_data_source();
key_source as_key_source();

View File

@@ -108,7 +108,8 @@ void db::serializer<reconcilable_result>::read(reconcilable_result& v, input& in
}
future<reconcilable_result>
mutation_query(const mutation_source& source,
mutation_query(schema_ptr s,
const mutation_source& source,
const query::partition_range& range,
const query::partition_slice& slice,
uint32_t row_limit,
@@ -141,8 +142,9 @@ mutation_query(const mutation_source& source,
return make_ready_future<reconcilable_result>(reconcilable_result());
}
return do_with(query_state(range, slice, row_limit, query_time), [&source] (query_state& state) -> future<reconcilable_result> {
state.reader = source(state.range);
return do_with(query_state(range, slice, row_limit, query_time),
[&source, s = std::move(s)] (query_state& state) -> future<reconcilable_result> {
state.reader = source(std::move(s), state.range);
return consume(state.reader, [&state] (mutation&& m) {
// FIXME: Make data sources respect row_ranges so that we don't have to filter them out here.
auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct);

View File

@@ -114,6 +114,7 @@ extern template class serializer<reconcilable_result>;
//
// 'source' doesn't have to survive deferring.
future<reconcilable_result> mutation_query(
schema_ptr,
const mutation_source& source,
const query::partition_range& range,
const query::partition_slice& slice,

View File

@@ -71,6 +71,9 @@ make_mutation_reader(Args&&... args) {
return mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
}
// Creates a mutation reader which combines data return by supplied readers.
// Returns mutation of the same schema only when all readers return mutations
// of the same schema.
mutation_reader make_combined_reader(std::vector<mutation_reader>);
mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b);
// reads from the input readers, in order
@@ -147,7 +150,9 @@ future<> consume(mutation_reader& reader, Consumer consumer) {
// mutation_source represents source of data in mutation form. The data source
// can be queried multiple times and in parallel. For each query it returns
// independent mutation_reader.
using mutation_source = std::function<mutation_reader(const query::partition_range& range)>;
// The reader returns mutations having all the same schema, the one passed
// when invoking the source.
using mutation_source = std::function<mutation_reader(schema_ptr, const query::partition_range& range)>;
/// A partition_presence_checker quickly returns whether a key is known not to exist
/// in a data source (it may return false positives, but not false negatives).

View File

@@ -57,6 +57,7 @@ typedef std::vector<clustering_range> clustering_row_ranges;
// Specifies subset of rows, columns and cell attributes to be returned in a query.
// Can be accessed across cores.
// Schema-dependent.
class partition_slice {
public:
enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry, reversed, distinct };

View File

@@ -299,7 +299,7 @@ static future<partition_checksum> checksum_range_shard(database &db,
const ::range<dht::token>& range) {
auto& cf = db.find_column_family(keyspace_name, cf_name);
return do_with(query::to_partition_range(range), [&cf] (const auto& partition_range) {
return do_with(cf.make_reader(partition_range), partition_checksum(),
return do_with(cf.make_reader(cf.schema(), partition_range), partition_checksum(),
[] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum] () {
return reader().then([&checksum] (auto mopt) {

View File

@@ -155,11 +155,13 @@ const logalloc::region& cache_tracker::region() const {
// Reader which populates the cache using data from the delegate.
class populating_reader final : public mutation_reader::impl {
schema_ptr _schema;
row_cache& _cache;
mutation_reader _delegate;
public:
populating_reader(row_cache& cache, mutation_reader delegate)
: _cache(cache)
populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate)
: _schema(std::move(s))
, _cache(cache)
, _delegate(std::move(delegate))
{ }
@@ -167,6 +169,7 @@ public:
return _delegate().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
}
return std::move(mo);
});
@@ -184,6 +187,7 @@ void row_cache::on_miss() {
}
class just_cache_scanning_reader final : public mutation_reader::impl {
schema_ptr _schema;
row_cache& _cache;
row_cache::partitions_type::iterator _it;
row_cache::partitions_type::iterator _end;
@@ -227,7 +231,9 @@ private:
_last_modification_count = modification_count;
}
public:
just_cache_scanning_reader(row_cache& cache, const query::partition_range& range) : _cache(cache), _range(range) { }
just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range)
: _schema(std::move(s)), _cache(cache), _range(range)
{ }
virtual future<mutation_opt> operator()() override {
return _cache._read_section(_cache._tracker.region(), [this] {
update_iterators();
@@ -238,7 +244,7 @@ public:
++_it;
_last = ce.key();
_cache.upgrade_entry(ce);
return make_ready_future<mutation_opt>(mutation(_cache._schema, ce.key(), ce.partition()));
return make_ready_future<mutation_opt>(ce.read(_schema));
});
}
};
@@ -259,9 +265,9 @@ class scanning_and_populating_reader final : public mutation_reader::impl {
dht::decorated_key_opt _next_key;
dht::decorated_key_opt _last_secondary_key;
public:
scanning_and_populating_reader(row_cache& cache, const query::partition_range& range)
: _cache(cache), _schema(cache._schema),
_primary(make_mutation_reader<just_cache_scanning_reader>(cache, range)),
scanning_and_populating_reader(schema_ptr s, row_cache& cache, const query::partition_range& range)
: _cache(cache), _schema(s),
_primary(make_mutation_reader<just_cache_scanning_reader>(s, cache, range)),
_underlying(cache._underlying), _original_range(range), _underlying_keys(cache._underlying_keys),
_keys(_underlying_keys(range))
{ }
@@ -298,7 +304,7 @@ public:
_range = query::partition_range(query::partition_range::bound { std::move(*dk), true }, std::move(end));
_last_secondary_key = {};
_secondary_phase = _cache._populate_phaser.phase();
_secondary = _underlying(_range);
_secondary = _underlying(_cache._schema, _range);
_secondary_only = true;
return next_secondary();
});
@@ -311,7 +317,7 @@ private:
auto cmp = dht::ring_position_comparator(*_schema);
_range = _range.split_after(*_last_secondary_key, cmp);
_secondary_phase = _cache._populate_phaser.phase();
_secondary = _underlying(_range);
_secondary = _underlying(_cache._schema, _range);
}
return _secondary().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) {
if (!mo && _next_primary) {
@@ -324,6 +330,7 @@ private:
}
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
_last_secondary_key = mo->decorated_key();
}
_cache.on_miss();
@@ -339,17 +346,17 @@ private:
};
mutation_reader
row_cache::make_scanning_reader(const query::partition_range& range) {
return make_mutation_reader<scanning_and_populating_reader>(*this, range);
row_cache::make_scanning_reader(schema_ptr s, const query::partition_range& range) {
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range);
}
mutation_reader
row_cache::make_reader(const query::partition_range& range) {
row_cache::make_reader(schema_ptr s, const query::partition_range& range) {
if (range.is_singular()) {
const query::ring_position& pos = range.start()->value();
if (!pos.has_key()) {
return make_scanning_reader(range);
return make_scanning_reader(std::move(s), range);
}
return _read_section(_tracker.region(), [&] {
@@ -360,15 +367,15 @@ row_cache::make_reader(const query::partition_range& range) {
_tracker.touch(e);
on_hit();
upgrade_entry(e);
return make_reader_returning(mutation(_schema, dk, e.partition()));
return make_reader_returning(e.read(s));
} else {
on_miss();
return make_mutation_reader<populating_reader>(*this, _underlying(range));
return make_mutation_reader<populating_reader>(s, *this, _underlying(_schema, range));
}
});
}
return make_scanning_reader(range);
return make_scanning_reader(std::move(s), range);
}
row_cache::~row_cache() {
@@ -542,6 +549,14 @@ cache_entry::cache_entry(cache_entry&& o) noexcept
}
}
mutation cache_entry::read(const schema_ptr& s) {
auto m = mutation(_schema, _key, _p);
if (_schema != s) {
m.upgrade(s);
}
return m;
}
const schema_ptr& row_cache::schema() const {
return _schema;
}

View File

@@ -83,6 +83,7 @@ public:
mutation_partition& partition() { return _p; }
const schema_ptr& schema() const { return _schema; }
schema_ptr& schema() { return _schema; }
mutation read(const schema_ptr&);
struct compare {
dht::decorated_key::less_comparator _c;
@@ -194,7 +195,7 @@ private:
logalloc::allocating_section _update_section;
logalloc::allocating_section _populate_section;
logalloc::allocating_section _read_section;
mutation_reader make_scanning_reader(const query::partition_range&);
mutation_reader make_scanning_reader(schema_ptr, const query::partition_range&);
void on_hit();
void on_miss();
void upgrade_entry(cache_entry&);
@@ -206,7 +207,10 @@ public:
row_cache(const row_cache&) = delete;
row_cache& operator=(row_cache&&) = default;
public:
mutation_reader make_reader(const query::partition_range& = query::full_partition_range);
// Implements mutation_source for this cache, see mutation_reader.hh
// User needs to ensure that the row_cache object stays alive
// as long as the reader is used.
mutation_reader make_reader(schema_ptr, const query::partition_range& = query::full_partition_range);
const stats& stats() const { return _stats; }
public:
// Populate cache from given mutation. The mutation must contain all

View File

@@ -71,6 +71,7 @@
#include <boost/range/algorithm/sort.hpp>
#include "utils/latency.hh"
#include "schema.hh"
#include "schema_registry.hh"
namespace service {
@@ -1643,6 +1644,7 @@ protected:
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
schema_ptr _schema;
shared_ptr<storage_proxy> _proxy;
lw_shared_ptr<query::read_command> _cmd;
lw_shared_ptr<query::read_command> _retry_cmd;
@@ -1653,9 +1655,9 @@ protected:
promise<foreign_ptr<lw_shared_ptr<query::result>>> _result_promise;
public:
abstract_read_executor(shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for,
abstract_read_executor(schema_ptr s, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for,
std::vector<gms::inet_address> targets) :
_proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) {
_schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) {
_proxy->_stats.reads++;
}
virtual ~abstract_read_executor() {
@@ -1665,7 +1667,7 @@ public:
protected:
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep) {
if (is_me(ep)) {
return _proxy->query_mutations_locally(cmd, _partition_range);
return _proxy->query_mutations_locally(_schema, cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_mutation_data(net::messaging_service::msg_addr{ep, 0}, *cmd, _partition_range).then([this](reconcilable_result&& result) {
@@ -1675,7 +1677,7 @@ protected:
}
future<foreign_ptr<lw_shared_ptr<query::result>>> make_data_request(gms::inet_address ep) {
if (is_me(ep)) {
return _proxy->query_singular_local(_cmd, _partition_range);
return _proxy->query_singular_local(_schema, _cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_data(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range).then([this](query::result&& result) {
@@ -1685,7 +1687,7 @@ protected:
}
future<query::result_digest> make_digest_request(gms::inet_address ep) {
if (is_me(ep)) {
return _proxy->query_singular_local_digest(_cmd, _partition_range);
return _proxy->query_singular_local_digest(_schema, _cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_read_digest(net::messaging_service::msg_addr{ep, 0}, *_cmd, _partition_range);
@@ -1742,14 +1744,13 @@ protected:
data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) {
try {
f.get();
schema_ptr s = _proxy->_db.local().find_schema(_cmd->cf_id);
auto rr = data_resolver->resolve(s); // reconciliation happens here
auto rr = data_resolver->resolve(_schema); // reconciliation happens here
// We generate a retry if at least one node reply with count live columns but after merge we have less
// than the total number of column we are interested in (which may be < count on a retry).
// So in particular, if no host returned count live columns, we know it's not a short read.
if (data_resolver->max_live_count() < cmd->row_limit || rr.row_count() >= original_row_limit()) {
auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), std::move(s), _cmd->slice)));
auto result = ::make_foreign(::make_lw_shared(to_data_query_result(std::move(rr), _schema, _cmd->slice)));
// wait for write to complete before returning result to prevent multiple concurrent read requests to
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
@@ -1890,8 +1891,8 @@ public:
class range_slice_read_executor : public abstract_read_executor {
public:
range_slice_read_executor(shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {}
range_slice_read_executor(schema_ptr s, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(std::move(s), std::move(proxy), std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::steady_clock::time_point timeout) override {
reconcile(_cl, timeout);
return _result_promise.get_future();
@@ -1913,7 +1914,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl) {
const dht::token& token = pr.start()->value().token();
schema_ptr schema = _db.local().find_schema(cmd->cf_id);
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<gms::inet_address> all_replicas = get_live_sorted_endpoints(ks, token);
@@ -1936,14 +1937,14 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
auto p = shared_from_this();
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
if (retry_type == speculative_retry::type::NONE || db::block_for(ks, cl) == all_replicas.size()) {
return ::make_shared<never_speculating_read_executor>(p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
return ::make_shared<never_speculating_read_executor>(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
if (target_replicas.size() == all_replicas.size()) {
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
return ::make_shared<always_speculating_read_executor>(/*cfs, */p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
return ::make_shared<always_speculating_read_executor>(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -1959,24 +1960,24 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
target_replicas.push_back(extra_replica);
if (retry_type == speculative_retry::type::ALWAYS) {
return ::make_shared<always_speculating_read_executor>(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
return ::make_shared<always_speculating_read_executor>(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
} else {// PERCENTILE or CUSTOM.
return ::make_shared<speculating_read_executor>(/*cfs,*/p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
return ::make_shared<speculating_read_executor>(schema, p, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
}
future<query::result_digest>
storage_proxy::query_singular_local_digest(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
return query_singular_local(cmd, pr).then([] (foreign_ptr<lw_shared_ptr<query::result>> result) {
storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
return query_singular_local(std::move(s), std::move(cmd), pr).then([] (foreign_ptr<lw_shared_ptr<query::result>> result) {
return result->digest();
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_singular_local(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd] (database& db) {
return db.query(*cmd, prv).then([](auto&& f) {
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd] (database& db) {
return db.query(gs, *cmd, prv).then([](auto&& f) {
return make_foreign(std::move(f));
});
});
@@ -2011,7 +2012,7 @@ future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
schema_ptr schema = _db.local().find_schema(cmd->cf_id);
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto concurrent_fetch_starting_index = i;
@@ -2064,7 +2065,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
++i;
}
db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints);
exec.push_back(::make_shared<range_slice_read_executor>(p, cmd, std::move(range), cl, std::move(filtered_endpoints)));
exec.push_back(::make_shared<range_slice_read_executor>(schema, p, cmd, std::move(range), cl, std::move(filtered_endpoints)));
}
query::result_merger merger;
@@ -2087,7 +2088,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd, query::partition_range&& range, db::consistency_level cl) {
schema_ptr schema = _db.local().find_schema(cmd->cf_id);
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<query::partition_range> ranges;
auto timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(_db.local().get_config().read_request_timeout_in_ms());
@@ -2712,17 +2713,23 @@ void storage_proxy::init_messaging_service() {
});
ms.register_read_data([] (query::read_command cmd, query::partition_range pr) {
return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared<query::read_command>(std::move(cmd))] (const query::partition_range& pr, shared_ptr<storage_proxy>& p) {
return p->query_singular_local(cmd, pr);
warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME
schema_ptr s = local_schema_registry().get(cmd->schema_version);
return p->query_singular_local(std::move(s), cmd, pr);
});
});
ms.register_read_mutation_data([] (query::read_command cmd, query::partition_range pr) {
return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared<query::read_command>(std::move(cmd))] (const query::partition_range& pr, shared_ptr<storage_proxy>& p) {
return p->query_mutations_locally(cmd, pr);
warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME
schema_ptr s = local_schema_registry().get(cmd->schema_version);
return p->query_mutations_locally(std::move(s), cmd, pr);
});
});
ms.register_read_digest([] (query::read_command cmd, query::partition_range pr) {
return do_with(std::move(pr), get_local_shared_storage_proxy(), [cmd = make_lw_shared<query::read_command>(std::move(cmd))] (const query::partition_range& pr, shared_ptr<storage_proxy>& p) {
return p->query_singular_local_digest(cmd, pr);
warn(unimplemented::cause::SCHEMA_CHANGE); // FIXME
schema_ptr s = local_schema_registry().get(cmd->schema_version);
return p->query_singular_local_digest(std::move(s), cmd, pr);
});
});
ms.register_truncate([](sstring ksname, sstring cfname) {
@@ -2840,18 +2847,17 @@ public:
};
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
storage_proxy::query_mutations_locally(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr) {
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [cmd, &pr] (database& db) {
return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) {
return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) {
return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
});
} else {
auto schema = _db.local().find_schema(cmd->cf_id);
return _db.map_reduce(mutation_result_merger{cmd, schema}, [cmd, &pr] (database& db) {
return db.query_mutations(*cmd, pr).then([] (reconcilable_result&& result) {
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s)] (database& db) {
return db.query_mutations(gs, *cmd, pr).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
}).then([] (reconcilable_result&& result) {
@@ -2869,8 +2875,8 @@ storage_proxy::stop() {
class shard_reader final : public mutation_reader::impl {
distributed<database>& _db;
unsigned _shard;
utils::UUID _cf_id;
const query::partition_range _range;
global_schema_ptr _schema;
schema_ptr _local_schema;
struct remote_state {
mutation_reader reader;
@@ -2879,20 +2885,24 @@ class shard_reader final : public mutation_reader::impl {
foreign_ptr<std::unique_ptr<remote_state>> _remote;
private:
future<> init() {
_local_schema = _db.local().find_column_family(_cf_id).schema();
return _db.invoke_on(_shard, [this] (database& db) {
column_family& cf = db.find_column_family(_cf_id);
return make_foreign(std::make_unique<remote_state>(remote_state{cf.make_reader(_range)}));
schema_ptr s = _schema;
column_family& cf = db.find_column_family(s->id());
return make_foreign(std::make_unique<remote_state>(remote_state{cf.make_reader(std::move(s), _range)}));
}).then([this] (auto&& ptr) {
_remote = std::move(ptr);
});
}
public:
shard_reader(utils::UUID cf_id, distributed<database>& db, unsigned shard, const query::partition_range& range)
shard_reader(schema_ptr s,
distributed<database>& db,
unsigned shard,
const query::partition_range& range)
: _db(db)
, _shard(shard)
, _cf_id(cf_id)
, _range(range)
, _schema(s)
, _local_schema(std::move(s))
{ }
virtual future<mutation_opt> operator()() override {
@@ -2938,7 +2948,7 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range
unsigned last_shard = range.end() ? dht::shard_of(range.end()->value().token()) : smp::count - 1;
std::vector<mutation_reader> readers;
for (auto cpu = first_shard; cpu <= last_shard; ++cpu) {
readers.emplace_back(make_mutation_reader<shard_reader>(cf_id, _db, cpu, range));
readers.emplace_back(make_mutation_reader<shard_reader>(schema, _db, cpu, range));
}
return make_joining_reader(std::move(readers));
}

View File

@@ -143,8 +143,8 @@ private:
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular_local(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<query::result_digest> query_singular_local_digest(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<query::result_digest> query_singular_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd, query::partition_range&& range, db::consistency_level cl);
std::vector<query::partition_range> get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range);
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
@@ -222,11 +222,12 @@ public:
db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
lw_shared_ptr<query::read_command> cmd, const query::partition_range&);
schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range&);
/*
* Returns mutation_reader for given column family
* which combines data from all shards.
* Uses schema current at the time of invocation.
*/
mutation_reader make_local_reader(utils::UUID cf_id, const query::partition_range&);

View File

@@ -1354,7 +1354,7 @@ void sstable::prepare_write_components(::mutation_reader mr, uint64_t estimated_
future<> sstable::write_components(memtable& mt) {
_collector.set_replay_position(mt.replay_position());
return write_components(mt.make_reader(),
return write_components(mt.make_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max());
}

View File

@@ -211,7 +211,8 @@ public:
table_name = std::move(table_name)] (database& db) mutable {
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf.schema();
return cf.find_partition_slow(pkey).then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
return cf.find_partition_slow(schema, pkey)
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
auto row = p->find_row(ckey);
assert(row != nullptr);

View File

@@ -59,8 +59,11 @@ struct mutation_less_cmp {
};
mutation_source make_source(std::vector<mutation> mutations) {
return [mutations = std::move(mutations)] (const query::partition_range& range) {
return [mutations = std::move(mutations)] (schema_ptr s, const query::partition_range& range) {
assert(range.is_full()); // slicing not implemented yet
for (auto&& m : mutations) {
assert(m.schema() == s);
}
return make_reader_returning_many(mutations);
};
}
@@ -90,7 +93,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
{
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, now).get0();
// FIXME: use mutation assertions
@@ -113,7 +116,8 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
clustering_key_prefix::from_single_value(*s, bytes("B"))))
.build();
reconcilable_result result = mutation_query(src, query::full_partition_range, slice, query::max_rows, now).get0();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, now).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
@@ -145,7 +149,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
{
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -159,7 +163,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
{
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, now + 2s).get0();
assert_that(to_result_set(result, s, slice))
@@ -191,7 +195,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.reversed()
.build();
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -221,7 +225,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.reversed()
.build();
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -249,7 +253,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
{
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 10, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -269,7 +273,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -281,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -308,7 +312,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.reversed()
.build();
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -332,7 +336,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.reversed()
.build();
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -354,7 +358,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.reversed()
.build();
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
@@ -379,7 +383,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
auto src = make_source({m1});
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(src,
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, now).get0();
assert_that(to_result_set(result, s, slice))

View File

@@ -70,7 +70,7 @@ static void test_range_queries(populate_fn populate) {
auto test_slice = [&] (query::range<dht::ring_position> r) {
BOOST_MESSAGE(sprint("Testing range %s", r));
assert_that(ds(r))
assert_that(ds(s, r))
.produces(slice(partitions, r))
.produces_end_of_stream();
};

View File

@@ -57,7 +57,7 @@ static atomic_cell make_atomic_cell(bytes value) {
static mutation_partition get_partition(memtable& mt, const partition_key& key) {
auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key);
auto reader = mt.make_reader(query::partition_range::make_singular(dk));
auto reader = mt.make_reader(mt.schema(), query::partition_range::make_singular(dk));
auto mo = reader().get0();
BOOST_REQUIRE(bool(mo));
return std::move(mo->partition());
@@ -296,7 +296,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
insert_row(1003, 2003)).discard_result().then([s, &r1_col, &cf, key] {
auto verify_row = [&] (int32_t c1, int32_t r1) {
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) {
return cf.find_row(cf.schema(), dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) {
BOOST_REQUIRE(r);
auto i = r->find_cell(r1_col.id);
BOOST_REQUIRE(i);
@@ -353,13 +353,13 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator());
// Flush will happen in the middle of reading for this scanner
auto assert_that_scanner1 = assert_that(cf.make_reader(query::full_partition_range));
auto assert_that_scanner1 = assert_that(cf.make_reader(s, query::full_partition_range));
// Flush will happen before it is invoked
auto assert_that_scanner2 = assert_that(cf.make_reader(query::full_partition_range));
auto assert_that_scanner2 = assert_that(cf.make_reader(s, query::full_partition_range));
// Flush will happen after all data was read, but before EOS was consumed
auto assert_that_scanner3 = assert_that(cf.make_reader(query::full_partition_range));
auto assert_that_scanner3 = assert_that(cf.make_reader(s, query::full_partition_range));
assert_that_scanner1.produces(mutations[0]);
assert_that_scanner1.produces(mutations[1]);
@@ -432,7 +432,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
}
return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) {
return cf.for_all_partitions_slow([&, s] (const dht::decorated_key& pk, const mutation_partition& mp) {
return cf.for_all_partitions_slow(s, [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) {
auto p1 = value_cast<int32_t>(int32_type->deserialize(pk._key.explode(*s)[0]));
for (const rows_entry& re : mp.range(*s, query::range<clustering_key_prefix>())) {
auto c1 = value_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));

View File

@@ -68,7 +68,7 @@ int main(int argc, char** argv) {
.build();
cache_tracker tracker;
row_cache cache(s, [] (auto&&) { return make_empty_reader(); },
row_cache cache(s, [] (schema_ptr, auto&&) { return make_empty_reader(); },
[] (auto&&) { return key_reader(); }, tracker);
size_t partitions = app.configuration()["partitions"].as<unsigned>();

View File

@@ -181,7 +181,7 @@ int main(int argc, char** argv) {
// Verify that all mutations from memtable went through
for (auto&& key : keys) {
auto range = query::partition_range::make_singular(key);
auto reader = cache.make_reader(range);
auto reader = cache.make_reader(s, range);
auto mo = reader().get0();
assert(mo);
assert(mo->partition().live_row_count(*s) ==
@@ -198,7 +198,7 @@ int main(int argc, char** argv) {
for (auto&& key : keys) {
auto range = query::partition_range::make_singular(key);
auto reader = cache.make_reader(range);
auto reader = cache.make_reader(s, range);
auto mo = reader().get0();
assert(mo);
}
@@ -235,7 +235,7 @@ int main(int argc, char** argv) {
}
try {
auto reader = cache.make_reader(range);
auto reader = cache.make_reader(s, range);
assert(!reader().get0());
auto evicted_from_cache = logalloc::segment_size + cell_size * row_count;
new char[evicted_from_cache + logalloc::segment_size];

View File

@@ -69,13 +69,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) {
auto m = make_new_mutation(s);
cache_tracker tracker;
row_cache cache(s, [m] (const query::partition_range&) {
row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) {
assert(m.schema() == s);
return make_reader_returning(m);
}, [m] (auto&&) {
return make_key_from_mutation_reader(make_reader_returning(m));
}, tracker);
assert_that(cache.make_reader(query::full_partition_range))
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
});
@@ -87,19 +88,20 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) {
auto m = make_new_mutation(s);
cache_tracker tracker;
row_cache cache(s, [m] (const query::partition_range&) {
row_cache cache(s, [m] (schema_ptr s, const query::partition_range&) {
assert(m.schema() == s);
return make_reader_returning(m);
}, [m] (auto&&) {
return make_key_from_mutation_reader(make_reader_returning(m));
}, tracker);
assert_that(cache.make_reader(query::full_partition_range))
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
tracker.clear();
assert_that(cache.make_reader(query::full_partition_range))
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(m)
.produces_end_of_stream();
});
@@ -133,26 +135,26 @@ SEASTAR_TEST_CASE(test_query_of_incomplete_range_goes_to_underlying) {
};
// Populate cache for first key
assert_that(cache.make_reader(get_partition_range(mutations[0])))
assert_that(cache.make_reader(s, get_partition_range(mutations[0])))
.produces(mutations[0])
.produces_end_of_stream();
// Populate cache for last key
assert_that(cache.make_reader(get_partition_range(mutations[2])))
assert_that(cache.make_reader(s, get_partition_range(mutations[2])))
.produces(mutations[2])
.produces_end_of_stream();
// Test single-key queries
assert_that(cache.make_reader(get_partition_range(mutations[0])))
assert_that(cache.make_reader(s, get_partition_range(mutations[0])))
.produces(mutations[0])
.produces_end_of_stream();
assert_that(cache.make_reader(get_partition_range(mutations[2])))
assert_that(cache.make_reader(s, get_partition_range(mutations[2])))
.produces(mutations[2])
.produces_end_of_stream();
// Test range query
assert_that(cache.make_reader(query::full_partition_range))
assert_that(cache.make_reader(s, query::full_partition_range))
.produces(mutations[0])
.produces(mutations[1])
.produces(mutations[2])
@@ -180,15 +182,15 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) {
};
for (int i = 0; i < 2; ++i) {
assert_that(cache.make_reader(get_partition_range(mutations[2])))
assert_that(cache.make_reader(s, get_partition_range(mutations[2])))
.produces(mutations[2])
.produces_end_of_stream();
assert_that(cache.make_reader(get_partition_range(mutations[1])))
assert_that(cache.make_reader(s, get_partition_range(mutations[1])))
.produces(mutations[1])
.produces_end_of_stream();
assert_that(cache.make_reader(get_partition_range(mutations[0])))
assert_that(cache.make_reader(s, get_partition_range(mutations[0])))
.produces(mutations[0])
.produces_end_of_stream();
}
@@ -207,8 +209,8 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) {
}
auto cache = make_lw_shared<row_cache>(s, mt->as_data_source(), mt->as_key_source(), tracker);
return [cache] (const query::partition_range& range) {
return cache->make_reader(range);
return [cache] (schema_ptr s, const query::partition_range& range) {
return cache->make_reader(s, range);
};
});
});
@@ -232,7 +234,7 @@ SEASTAR_TEST_CASE(test_eviction) {
std::random_shuffle(keys.begin(), keys.end());
for (auto&& key : keys) {
cache.make_reader(query::partition_range::make_singular(key));
cache.make_reader(s, query::partition_range::make_singular(key));
}
while (tracker.region().occupancy().used_space() > 0) {
@@ -243,7 +245,7 @@ SEASTAR_TEST_CASE(test_eviction) {
bool has_key(row_cache& cache, const dht::decorated_key& key) {
auto range = query::partition_range::make_singular(key);
auto reader = cache.make_reader(range);
auto reader = cache.make_reader(cache.schema(), range);
auto mo = reader().get0();
return bool(mo);
}
@@ -258,7 +260,7 @@ void verify_does_not_have(row_cache& cache, const dht::decorated_key& key) {
void verify_has(row_cache& cache, const mutation& m) {
auto range = query::partition_range::make_singular(m.decorated_key());
auto reader = cache.make_reader(range);
auto reader = cache.make_reader(cache.schema(), range);
auto mo = reader().get0();
BOOST_REQUIRE(bool(mo));
assert_that(*mo).is_equal_to(m);
@@ -410,8 +412,8 @@ private:
: _underlying(std::move(underlying))
{ }
mutation_reader make_reader(const query::partition_range& pr) {
return make_mutation_reader<reader>(_throttle, _underlying(pr));
mutation_reader make_reader(schema_ptr s, const query::partition_range& pr) {
return make_mutation_reader<reader>(_throttle, _underlying(s, pr));
}
::throttle& throttle() { return _throttle; }
@@ -430,8 +432,8 @@ public:
_impl->throttle().unblock();
}
mutation_reader operator()(const query::partition_range& pr) {
return _impl->make_reader(pr);
mutation_reader operator()(schema_ptr s, const query::partition_range& pr) {
return _impl->make_reader(s, pr);
}
};
@@ -447,10 +449,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
auto memtables_data_source = [&] (const query::partition_range& pr) {
auto memtables_data_source = [&] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(pr));
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
};
@@ -481,10 +483,10 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
cache_source.block();
auto m0_range = query::partition_range::make_singular(ring[0].ring_position());
auto rd1 = cache.make_reader(m0_range);
auto rd1 = cache.make_reader(s, m0_range);
auto rd1_result = rd1();
auto rd2 = cache.make_reader();
auto rd2 = cache.make_reader(s);
auto rd2_result = rd2();
sleep(10ms).get();
@@ -495,7 +497,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
// This update should miss on all partitions
auto update_future = cache.update(*mt2, make_default_partition_presence_checker());
auto rd3 = cache.make_reader();
auto rd3 = cache.make_reader(s);
// rd2, which is in progress, should not prevent forward progress of update()
cache_source.unblock();
@@ -519,7 +521,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
.produces_end_of_stream();
// Reads started after flush should see new data
assert_that(cache.make_reader())
assert_that(cache.make_reader(s))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])

View File

@@ -345,8 +345,8 @@ SEASTAR_TEST_CASE(read_partial_range_2) {
});
}
::mutation_source as_mutation_source(schema_ptr s, lw_shared_ptr<sstables::sstable> sst) {
return [s, sst] (const query::partition_range& range) mutable {
::mutation_source as_mutation_source(lw_shared_ptr<sstables::sstable> sst) {
return [sst] (schema_ptr s, const query::partition_range& range) mutable {
return as_mutation_reader(sst, sst->read_range_rows(s, range));
};
}
@@ -373,7 +373,7 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
sst->write_components(*mt).get();
sst->load().get();
return as_mutation_source(s, sst);
return as_mutation_source(sst);
});
});
}

View File

@@ -195,16 +195,18 @@ public:
throw unimplemented_exception();
} else if (predicate.__isset.slice_range) {
auto&& range = predicate.slice_range;
return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) {
auto s = cf.schema();
return cf.find_row(s, dk, clustering_key::make_empty(*s)).then(
[s, &cf, range = std::move(range)] (column_family::const_row_ptr rw) {
std::vector<ColumnOrSuperColumn> ret;
if (rw) {
auto beg = cf.schema()->regular_begin();
auto beg = s->regular_begin();
if (!range.start.empty()) {
beg = cf.schema()->regular_lower_bound(to_bytes(range.start));
beg = s->regular_lower_bound(to_bytes(range.start));
}
auto end = cf.schema()->regular_end();
auto end = s->regular_end();
if (!range.finish.empty()) {
end = cf.schema()->regular_upper_bound(to_bytes(range.finish));
end = s->regular_upper_bound(to_bytes(range.finish));
}
auto count = range.count;
// FIXME: force limit count?