Merge branch 'tgrabiec/select' of github.com:cloudius-systems/seastar-dev into db

Preparation for range queries, from Tomasz:

"This series adds static typic for different key variants.

It also changes clustered row map to boost implementation which allows to use
heterogenous keys, so that we can lookup a row by a full prefix without
reserializing it.

Similar change is made to row prefix tombstones."
This commit is contained in:
Avi Kivity
2015-03-18 12:58:01 +02:00
30 changed files with 893 additions and 332 deletions

View File

@@ -109,7 +109,7 @@ public:
/**
* Execute the operation.
*/
virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0;
virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0;
/**
* A parsed raw UPDATE operation.

View File

@@ -31,16 +31,17 @@ namespace cql3 {
namespace restrictions {
/**
* A <code>primary_key_restrictions</code> which forwards all its method calls to another
* <code>primary_key_restrictions</code>. Subclasses should override one or more methods to modify the behavior
* of the backing <code>primary_key_restrictions</code> as desired per the decorator pattern.
* A <code>primary_key_restrictions</code> which forwards all its method calls to another
* <code>primary_key_restrictions</code>. Subclasses should override one or more methods to modify the behavior
* of the backing <code>primary_key_restrictions</code> as desired per the decorator pattern.
*/
class forwarding_primary_key_restrictions : public primary_key_restrictions {
template <typename ValueType>
class forwarding_primary_key_restrictions : public primary_key_restrictions<ValueType> {
protected:
/**
* Returns the backing delegate instance that methods are forwarded to.
*/
virtual ::shared_ptr<primary_key_restrictions> get_delegate() = 0;
virtual ::shared_ptr<primary_key_restrictions<ValueType>> get_delegate() = 0;
public:
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) override {
@@ -61,11 +62,11 @@ public:
}
#endif
virtual std::vector<bytes> values_as_serialized_tuples(const query_options& options) override {
return get_delegate()->values_as_serialized_tuples(options);
virtual std::vector<ValueType> values(const query_options& options) override {
return get_delegate()->values(options);
}
virtual std::vector<query::range> bounds(const query_options& options) override {
virtual std::vector<query::range<ValueType>> bounds(const query_options& options) override {
return get_delegate()->bounds(options);
}
@@ -101,7 +102,7 @@ public:
virtual void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) {
get_delegate()->addIndexExpressionTo(expressions, options);
}
#endif
#endif
};
}

View File

@@ -47,13 +47,14 @@ namespace restrictions {
* What was in AbstractPrimaryKeyRestrictions was moved here (In pre 1.8 Java interfaces could not have default
* implementations of methods).
*/
template<typename ValueType>
class primary_key_restrictions : public restrictions {
public:
virtual void merge_with(::shared_ptr<restriction> restriction) = 0;
virtual std::vector<bytes> values_as_serialized_tuples(const query_options& options) = 0;
virtual std::vector<ValueType> values(const query_options& options) = 0;
virtual std::vector<query::range> bounds(const query_options& options) = 0;
virtual std::vector<query::range<ValueType>> bounds(const query_options& options) = 0;
virtual bool is_inclusive(statements::bound b) { return true; }

View File

@@ -33,19 +33,20 @@ namespace restrictions {
/**
* <code>PrimaryKeyRestrictions</code> decorator that reverse the slices.
*/
class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions {
template <typename ValueType>
class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions<ValueType> {
private:
::shared_ptr<primary_key_restrictions> _restrictions;
::shared_ptr<primary_key_restrictions<ValueType>> _restrictions;
protected:
virtual ::shared_ptr<primary_key_restrictions> get_delegate() override {
virtual ::shared_ptr<primary_key_restrictions<ValueType>> get_delegate() override {
return _restrictions;
}
public:
reversed_primary_key_restrictions(shared_ptr<primary_key_restrictions> restrictions)
reversed_primary_key_restrictions(shared_ptr<primary_key_restrictions<ValueType>> restrictions)
: _restrictions(std::move(restrictions))
{ }
virtual std::vector<query::range> bounds(const query_options& options) override {
virtual std::vector<query::range<ValueType>> bounds(const query_options& options) override {
auto ranges = _restrictions->bounds(options);
for (auto&& range : ranges) {
range.reverse();

View File

@@ -37,19 +37,20 @@ namespace restrictions {
/**
* A set of single column restrictions on a primary key part (partition key or clustering key).
*/
class single_column_primary_key_restrictions : public primary_key_restrictions {
template<typename ValueType>
class single_column_primary_key_restrictions : public primary_key_restrictions<ValueType> {
using range_type = query::range<ValueType>;
using range_bound = typename range_type::bound;
private:
schema_ptr _schema;
::shared_ptr<single_column_restrictions> _restrictions;
::shared_ptr<tuple_type<true>> _tuple;
bool _slice;
bool _contains;
bool _in;
public:
single_column_primary_key_restrictions(schema_ptr schema, ::shared_ptr<tuple_type<true>> tuple)
single_column_primary_key_restrictions(schema_ptr schema)
: _schema(schema)
, _restrictions(::make_shared<single_column_restrictions>(schema))
, _tuple(std::move(tuple))
, _slice(false)
, _contains(false)
, _in(false)
@@ -121,7 +122,7 @@ public:
do_merge_with(::static_pointer_cast<single_column_restriction>(restriction));
}
virtual std::vector<bytes> values_as_serialized_tuples(const query_options& options) override {
virtual std::vector<ValueType> values(const query_options& options) override {
std::vector<std::vector<bytes_opt>> value_vector;
value_vector.reserve(_restrictions->size());
for (auto def : _restrictions->get_column_defs()) {
@@ -140,16 +141,16 @@ public:
value_vector.emplace_back(std::move(values));
}
std::vector<bytes> result;
std::vector<ValueType> result;
result.reserve(cartesian_product_size(value_vector));
for (auto&& v : make_cartesian_product(value_vector)) {
result.emplace_back(_tuple->serialize_value(v));
result.emplace_back(ValueType::from_exploded(*_schema, v));
}
return result;
}
virtual std::vector<query::range> bounds(const query_options& options) override {
std::vector<query::range> ranges;
virtual std::vector<range_type> bounds(const query_options& options) override {
std::vector<range_type> ranges;
std::vector<std::vector<bytes_opt>> vec_of_values;
// TODO: optimize for all EQ case
@@ -164,26 +165,20 @@ public:
}
if (r->is_slice()) {
// TODO: make restriction::bounds() return query::range to simplify all this
if (cartesian_product_is_empty(vec_of_values)) {
auto read_value = [r, &options] (statements::bound b) {
auto read_bound = [r, &options, this] (statements::bound b) -> std::experimental::optional<range_bound> {
if (!r->has_bound(b)) {
return {};
}
auto value = r->bounds(b, options)[0];
if (!value) {
throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string()));
}
return *value;
return {range_bound(ValueType::from_exploded(*_schema, {*value}), r->is_inclusive(b))};
};
if (r->has_bound(statements::bound::START) && r->has_bound(statements::bound::END)) {
ranges.emplace_back(query::range(read_value(statements::bound::START), read_value(statements::bound::END),
r->is_inclusive(statements::bound::START), r->is_inclusive(statements::bound::END)));
} else if (r->has_bound(statements::bound::START)) {
ranges.emplace_back(query::range::make_starting_with(read_value(statements::bound::START),
r->is_inclusive(statements::bound::START)));
} else {
assert(r->has_bound(statements::bound::END));
ranges.emplace_back(query::range::make_ending_with(read_value(statements::bound::END),
r->is_inclusive(statements::bound::END)));
}
ranges.emplace_back(range_type(
read_bound(statements::bound::START),
read_bound(statements::bound::END)));
if (def->type->is_reversed()) {
ranges.back().reverse();
}
@@ -192,31 +187,25 @@ public:
ranges.reserve(cartesian_product_size(vec_of_values));
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
auto read_bounds = [r, &prefix, &options, this](bytes& value_holder, bool& inclusive_holder, statements::bound bound) {
auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound {
if (r->has_bound(bound)) {
auto value = std::move(r->bounds(bound, options)[0]);
if (!value) {
throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string()));
}
prefix.emplace_back(std::move(value));
value_holder = _tuple->serialize_value(prefix);
auto val = ValueType::from_exploded(*_schema, prefix);
prefix.pop_back();
inclusive_holder = r->is_inclusive(bound);
return range_bound(std::move(val), r->is_inclusive(bound));
} else {
value_holder = _tuple->serialize_value(prefix);
inclusive_holder = true;
return range_bound(ValueType::from_exploded(*_schema, prefix));
}
};
bytes start_tuple;
bytes end_tuple;
bool start_inclusive;
bool end_inclusive;
ranges.emplace_back(range_type(
read_bound(statements::bound::START),
read_bound(statements::bound::END)));
read_bounds(start_tuple, start_inclusive, statements::bound::START);
read_bounds(end_tuple, end_inclusive, statements::bound::END);
ranges.emplace_back(query::range(std::move(start_tuple), std::move(end_tuple),
start_inclusive, end_inclusive));
if (def->type->is_reversed()) {
ranges.back().reverse();
}
@@ -239,7 +228,7 @@ public:
ranges.reserve(cartesian_product_size(vec_of_values));
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
ranges.emplace_back(query::range::make_singular(_tuple->serialize_value(prefix)));
ranges.emplace_back(range_type::make_singular(ValueType::from_exploded(*_schema, prefix)));
}
return std::move(ranges);

View File

@@ -50,12 +50,12 @@ private:
/**
* Restrictions on partitioning columns
*/
::shared_ptr<primary_key_restrictions> _partition_key_restrictions;
::shared_ptr<primary_key_restrictions<partition_key::one>> _partition_key_restrictions;
/**
* Restrictions on clustering columns
*/
::shared_ptr<primary_key_restrictions> _clustering_columns_restrictions;
::shared_ptr<primary_key_restrictions<clustering_key::prefix::one>> _clustering_columns_restrictions;
/**
* Restriction on non-primary key columns (i.e. secondary index restrictions)
@@ -212,14 +212,12 @@ private:
auto& def = restriction->get_column_def();
if (def.is_partition_key()) {
if (!_partition_key_restrictions) {
_partition_key_restrictions = ::make_shared<single_column_primary_key_restrictions>(_schema,
_schema->partition_key_prefix_type);
_partition_key_restrictions = ::make_shared<single_column_primary_key_restrictions<partition_key::one>>(_schema);
}
_partition_key_restrictions->merge_with(restriction);
} else if (def.is_clustering_key()) {
if (!_clustering_columns_restrictions) {
_clustering_columns_restrictions = ::make_shared<single_column_primary_key_restrictions>(_schema,
_schema->clustering_key_prefix_type);
_clustering_columns_restrictions = ::make_shared<single_column_primary_key_restrictions<clustering_key::prefix::one>>(_schema);
}
_clustering_columns_restrictions->merge_with(restriction);
} else {
@@ -383,9 +381,9 @@ public:
* @return the specified bound of the partition key
* @throws InvalidRequestException if the boundary cannot be retrieved
*/
std::vector<query::range> get_partition_key_ranges(const query_options& options) const {
std::vector<query::partition_range> get_partition_key_ranges(const query_options& options) const {
if (!_partition_key_restrictions) {
return {query::range::make_open_ended_both_sides()};
return {query::partition_range::make_open_ended_both_sides()};
}
return _partition_key_restrictions->bounds(options);
}
@@ -510,9 +508,9 @@ public:
#endif
public:
std::vector<query::range> get_clustering_bounds(const query_options& options) const {
std::vector<query::clustering_range> get_clustering_bounds(const query_options& options) const {
if (!_clustering_columns_restrictions) {
return {query::range::make_open_ended_both_sides()};
return {query::clustering_range::make_open_ended_both_sides()};
}
return _clustering_columns_restrictions->bounds(options);
}
@@ -559,7 +557,8 @@ public:
void reverse() {
if (_clustering_columns_restrictions) {
_clustering_columns_restrictions = ::make_shared<reversed_primary_key_restrictions>(_clustering_columns_restrictions);
_clustering_columns_restrictions = ::make_shared<reversed_primary_key_restrictions<clustering_key::prefix::one>>(
_clustering_columns_restrictions);
}
}
};

View File

@@ -59,7 +59,6 @@ modification_statement::get_mutations(const query_options& options, bool local,
std::vector<mutation> mutations;
mutations.reserve(keys->size());
for (auto key : *keys) {
validation::validate_cql_key(s, key);
mutations.emplace_back(std::move(key), s);
auto& m = mutations.back();
this->add_update_for_key(m, *prefix, *params_ptr);
@@ -70,7 +69,7 @@ modification_statement::get_mutations(const query_options& options, bool local,
future<std::unique_ptr<update_parameters>>
modification_statement::make_update_parameters(
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<std::vector<partition_key::one>> keys,
lw_shared_ptr<clustering_prefix> prefix,
const query_options& options,
bool local,
@@ -87,7 +86,7 @@ modification_statement::make_update_parameters(
future<update_parameters::prefetched_rows_type>
modification_statement::read_required_rows(
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<std::vector<partition_key::one>> keys,
lw_shared_ptr<clustering_prefix> prefix,
bool local,
db::consistency_level cl) {
@@ -181,7 +180,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o
components.push_back(val);
}
}
return components;
return clustering_prefix(std::move(components));
}
clustering_prefix
@@ -222,9 +221,9 @@ modification_statement::create_clustering_prefix(const query_options& options) {
return create_clustering_prefix_internal(options);
}
std::vector<partition_key>
std::vector<partition_key::one>
modification_statement::build_partition_keys(const query_options& options) {
std::vector<partition_key> result;
std::vector<partition_key::one> result;
std::vector<bytes_opt> components;
auto remaining = s->partition_key_size();
@@ -244,9 +243,9 @@ modification_statement::build_partition_keys(const query_options& options) {
throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text()));
}
components.push_back(val);
partition_key key = serialize_value(*s->partition_key_type, components);
auto key = partition_key::one::from_exploded(*s, components);
validation::validate_cql_key(s, key);
result.push_back(key);
result.emplace_back(std::move(key));
} else {
for (auto&& val : values) {
if (!val) {
@@ -256,9 +255,9 @@ modification_statement::build_partition_keys(const query_options& options) {
full_components.reserve(components.size() + 1);
auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components));
*i = val;
partition_key key = serialize_value(*s->partition_key_type, full_components);
auto key = partition_key::one::from_exploded(*s, full_components);
validation::validate_cql_key(s, key);
result.push_back(key);
result.emplace_back(std::move(key));
}
}
} else {

View File

@@ -255,7 +255,7 @@ private:
public:
void add_key_value(column_definition& def, ::shared_ptr<term> value);
void process_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names);
std::vector<partition_key> build_partition_keys(const query_options& options);
std::vector<partition_key::one> build_partition_keys(const query_options& options);
private:
clustering_prefix create_clustering_prefix(const query_options& options);
@@ -273,7 +273,7 @@ public:
protected:
future<update_parameters::prefetched_rows_type> read_required_rows(
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<std::vector<partition_key::one>> keys,
lw_shared_ptr<clustering_prefix> prefix,
bool local,
db::consistency_level cl);
@@ -427,7 +427,7 @@ private:
public:
future<std::unique_ptr<update_parameters>> make_update_parameters(
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<std::vector<partition_key::one>> keys,
lw_shared_ptr<clustering_prefix> prefix,
const query_options& options,
bool local,

View File

@@ -126,7 +126,7 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
for (auto&& e : results->partitions) {
// FIXME: deserialize into views
auto key = _schema->partition_key_type->deserialize_value(e.first);
auto key = e.first.explode(*_schema);
auto& partition = e.second;
if (!partition.static_row.empty() && partition.rows.empty()
@@ -145,7 +145,7 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
}
} else {
for (auto&& e : partition.rows) {
auto c_key = _schema->clustering_key_type->deserialize_value(e.first);
auto c_key = e.first.explode(*_schema);
auto& cells = e.second.cells;
uint32_t static_id = 0;
uint32_t regular_id = 0;

View File

@@ -36,7 +36,7 @@ namespace cql3 {
class update_parameters final {
public:
using prefetched_rows_type = std::experimental::optional<
std::unordered_map<partition_key, row, serialized_hash, serialized_equal>>;
std::unordered_map<partition_key::one, row, partition_key::one::hashing, partition_key::one::equality>>;
private:
const gc_clock::duration _ttl;
const prefetched_rows_type _prefetched; // For operation that require a read-before-write

View File

@@ -82,17 +82,17 @@ schema::schema(sstring ks_name, sstring cf_name, std::vector<column> partition_k
column_family::column_family(schema_ptr schema)
: _schema(std::move(schema))
, partitions(key_compare(_schema->thrift.partition_key_type)) {
, partitions(partition_key::one::less_compare(*_schema)) {
}
mutation_partition*
column_family::find_partition(const bytes& key) {
column_family::find_partition(const partition_key::one& key) {
auto i = partitions.find(key);
return i == partitions.end() ? nullptr : &i->second;
}
row*
column_family::find_row(const bytes& partition_key, const bytes& clustering_key) {
column_family::find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) {
mutation_partition* p = find_partition(partition_key);
if (!p) {
return nullptr;
@@ -101,19 +101,18 @@ column_family::find_row(const bytes& partition_key, const bytes& clustering_key)
}
mutation_partition&
column_family::find_or_create_partition(const bytes& key) {
column_family::find_or_create_partition(const partition_key::one& key) {
// call lower_bound so we have a hint for the insert, just in case.
auto i = partitions.lower_bound(key);
if (i == partitions.end() || key != i->first) {
if (i == partitions.end() || !key.equal(*_schema, i->first)) {
i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema)));
}
return i->second;
}
row&
column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) {
column_family::find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) {
mutation_partition& p = find_or_create_partition(partition_key);
// call lower_bound so we have a hint for the insert, just in case.
return p.clustered_row(clustering_key);
}
@@ -313,6 +312,15 @@ keyspace::find_schema(const sstring& cf_name) {
return cf->_schema;
}
schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) {
auto ks = find_keyspace(ks_name);
if (!ks) {
return {};
}
return ks->find_schema(cf_name);
}
keyspace*
database::find_keyspace(const sstring& name) {
auto i = keyspaces.find(name);
@@ -364,12 +372,17 @@ merge_column(const column_definition& def,
}
}
mutation_partition::~mutation_partition() {
_rows.clear_and_dispose(std::default_delete<rows_entry>());
_row_tombstones.clear_and_dispose(std::default_delete<row_tombstones_entry>());
}
void
mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
_tombstone.apply(p._tombstone);
for (auto&& entry : p._row_tombstones) {
apply_row_tombstone(schema, entry);
for (auto&& e : p._row_tombstones) {
apply_row_tombstone(schema, e.prefix(), e.t());
}
auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) {
@@ -392,63 +405,103 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
merge_cells(_static_row, p._static_row, find_static_column_def);
for (auto&& entry : p._rows) {
auto& key = entry.first;
auto i = _rows.find(key);
auto& key = entry.key();
auto i = _rows.find(key, rows_entry::compare(*schema));
if (i == _rows.end()) {
_rows.emplace_hint(i, entry);
auto e = new rows_entry(entry);
_rows.insert(i, *e);
} else {
i->second.t.apply(entry.second.t);
merge_cells(i->second.cells, entry.second.cells, find_regular_column_def);
i->apply(entry.row().t);
merge_cells(i->row().cells, entry.row().cells, find_regular_column_def);
}
}
}
tombstone
mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key& key) {
mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::one& key) {
tombstone t = _tombstone;
auto i = _row_tombstones.lower_bound(key);
if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) {
t.apply(i->second);
auto c = row_tombstones_entry::key_comparator(
clustering_key::one::prefix_view_type::less_compare_with_prefix(*schema));
// _row_tombstones contains only strict prefixes
for (unsigned prefix_len = 1; prefix_len < schema->clustering_key_size(); ++prefix_len) {
auto i = _row_tombstones.find(key.prefix_view(*schema, prefix_len), c);
if (i != _row_tombstones.end()) {
t.apply(i->t());
}
}
auto j = _rows.find(key);
auto j = _rows.find(key, rows_entry::compare(*schema));
if (j != _rows.end()) {
t.apply(j->second.t);
t.apply(j->row().t);
}
return t;
}
void
mutation_partition::apply_row_tombstone(schema_ptr schema, std::pair<bytes, tombstone> row_tombstone) {
auto& prefix = row_tombstone.first;
auto i = _row_tombstones.lower_bound(prefix);
if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) {
_row_tombstones.emplace_hint(i, std::move(row_tombstone));
} else if (row_tombstone.second > i->second) {
i->second = row_tombstone.second;
mutation_partition::apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) {
assert(!prefix.is_full(*schema));
auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(*schema));
if (i == _row_tombstones.end() || !prefix.equal(*schema, i->prefix())) {
auto e = new row_tombstones_entry(std::move(prefix), t);
_row_tombstones.insert(i, *e);
} else {
i->apply(t);
}
}
void
mutation_partition::apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) {
if (prefix.empty()) {
if (!prefix) {
apply(t);
} else if (prefix.size() == schema->clustering_key_size()) {
_rows[schema->clustering_key_type->serialize_value(prefix)].t.apply(t);
} else if (prefix.is_full(*schema)) {
apply_delete(schema, clustering_key::one::from_clustering_prefix(*schema, prefix), t);
} else {
apply_row_tombstone(schema, {schema->clustering_key_prefix_type->serialize_value(prefix), t});
apply_row_tombstone(schema, clustering_key::prefix::one::from_clustering_prefix(*schema, prefix), t);
}
}
void
mutation_partition::apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t) {
auto i = _rows.lower_bound(key, rows_entry::compare(*schema));
if (i == _rows.end() || !i->key().equal(*schema, key)) {
auto e = new rows_entry(std::move(key));
e->row().apply(t);
_rows.insert(i, *e);
} else {
i->row().apply(t);
}
}
rows_entry*
mutation_partition::find_entry(schema_ptr schema, const clustering_key::prefix::one& key) {
auto i = _rows.find(key, rows_entry::compare_prefix(*schema));
if (i == _rows.end()) {
return nullptr;
}
return &*i;
}
row*
mutation_partition::find_row(const clustering_key& key) {
mutation_partition::find_row(const clustering_key::one& key) {
auto i = _rows.find(key);
if (i == _rows.end()) {
return nullptr;
}
return &i->second.cells;
return &i->row().cells;
}
row&
mutation_partition::clustered_row(const clustering_key::one& key) {
auto i = _rows.find(key);
if (i == _rows.end()) {
auto e = new rows_entry(key);
_rows.insert(i, *e);
return e->row().cells;
}
return i->row().cells;
}
bool column_definition::is_compact_value() const {
@@ -457,7 +510,7 @@ bool column_definition::is_compact_value() const {
}
std::ostream& operator<<(std::ostream& os, const mutation& m) {
return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p);
return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), static_cast<bytes_view>(m.key), m.p);
}
std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) {
@@ -475,24 +528,27 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p
if (!range.is_singular()) {
fail(unimplemented::cause::RANGE_QUERIES);
}
auto& key = range.start();
if (!_schema->clustering_key_prefix_type->is_full(key)) {
auto& key = range.start_value();
if (!key.is_full(*_schema)) {
fail(unimplemented::cause::RANGE_QUERIES);
}
auto row = partition.find_row(key);
rows_entry* row = partition.find_entry(_schema, key);
if (!row) {
continue;
}
auto&& cells = &row->row().cells;
// FIXME: handle removed rows properly. In CQL rows are separate entities (can be live or dead).
auto row_tombstone = partition.tombstone_for_row(_schema, key);
auto row_tombstone = partition.tombstone_for_row(_schema, row->key());
query::result::row result_row;
result_row.cells.reserve(slice.regular_columns.size());
for (auto id : slice.regular_columns) {
auto i = row->find(id);
if (i == row->end()) {
auto i = cells->find(id);
if (i == cells->end()) {
result_row.cells.emplace_back();
} else {
auto def = _schema->regular_column_at(id);
@@ -510,7 +566,7 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p
}
}
}
result.rows.emplace_back(key, std::move(result_row));
result.rows.emplace_back(row->key(), std::move(result_row));
--limit;
}
@@ -532,10 +588,7 @@ column_family::query(const query::read_command& cmd) {
uint32_t limit = cmd.row_limit;
for (auto&& range : cmd.partition_ranges) {
if (range.is_singular()) {
auto& key = range.start();
if (!_schema->partition_key_prefix_type->is_full(key)) {
fail(unimplemented::cause::RANGE_QUERIES);
}
auto& key = range.start_value();
auto partition = find_partition(key);
if (!partition) {
return make_ready_future<lw_shared_ptr<query::result>>(result);

View File

@@ -19,7 +19,6 @@
#include <unordered_map>
#include <map>
#include <set>
#include <vector>
#include <iostream>
#include <boost/functional/hash.hpp>
#include <experimental/optional>
@@ -34,59 +33,168 @@
#include "timestamp.hh"
#include "tombstone.hh"
#include "atomic_cell.hh"
#include "bytes.hh"
#include "query.hh"
using partition_key_type = tuple_type<>;
using clustering_key_type = tuple_type<>;
using clustering_prefix_type = tuple_prefix;
using partition_key = bytes;
using clustering_key = bytes;
using clustering_prefix = clustering_prefix_type::value_type;
#include "keys.hh"
#include <boost/intrusive/set.hpp>
using row = std::map<column_id, atomic_cell_or_collection>;
struct deletable_row final {
tombstone t;
row cells;
void apply(tombstone t_) {
t.apply(t_);
}
};
using row_tombstone_set = std::map<bytes, tombstone, serialized_compare>;
class row_tombstones_entry : public boost::intrusive::set_base_hook<> {
clustering_key::prefix::one _prefix;
tombstone _t;
public:
row_tombstones_entry(clustering_key::prefix::one&& prefix, tombstone t)
: _prefix(std::move(prefix))
, _t(std::move(t))
{ }
clustering_key::prefix::one& prefix() {
return _prefix;
}
const clustering_key::prefix::one& prefix() const {
return _prefix;
}
tombstone& t() {
return _t;
}
const tombstone& t() const {
return _t;
}
void apply(tombstone t) {
_t.apply(t);
}
struct compare {
clustering_key::prefix::one::less_compare _c;
compare(const schema& s) : _c(s) {}
bool operator()(const row_tombstones_entry& e1, const row_tombstones_entry& e2) const {
return _c(e1._prefix, e2._prefix);
}
bool operator()(const clustering_key::prefix::one& prefix, const row_tombstones_entry& e) const {
return _c(prefix, e._prefix);
}
bool operator()(const row_tombstones_entry& e, const clustering_key::prefix::one& prefix) const {
return _c(e._prefix, prefix);
}
};
template <typename Comparator>
struct delegating_compare {
Comparator _c;
delegating_compare(Comparator&& c) : _c(std::move(c)) {}
template <typename Comparable>
bool operator()(const Comparable& prefix, const row_tombstones_entry& e) const {
return _c(prefix, e._prefix);
}
template <typename Comparable>
bool operator()(const row_tombstones_entry& e, const Comparable& prefix) const {
return _c(e._prefix, prefix);
}
};
template <typename Comparator>
static auto key_comparator(Comparator&& c) {
return delegating_compare<Comparator>(std::move(c));
}
};
class rows_entry : public boost::intrusive::set_base_hook<> {
clustering_key::one _key;
deletable_row _row;
public:
rows_entry(clustering_key::one&& key)
: _key(std::move(key))
{ }
rows_entry(const clustering_key::one& key)
: _key(key)
{ }
rows_entry(const rows_entry& e)
: _key(e._key)
, _row(e._row)
{ }
clustering_key::one& key() {
return _key;
}
const clustering_key::one& key() const {
return _key;
}
deletable_row& row() {
return _row;
}
const deletable_row& row() const {
return _row;
}
void apply(tombstone t) {
_row.apply(t);
}
struct compare {
clustering_key::one::less_compare _c;
compare(const schema& s) : _c(s) {}
bool operator()(const rows_entry& e1, const rows_entry& e2) const {
return _c(e1._key, e2._key);
}
bool operator()(const clustering_key::one& key, const rows_entry& e) const {
return _c(key, e._key);
}
bool operator()(const rows_entry& e, const clustering_key::one& key) const {
return _c(e._key, key);
}
};
struct compare_prefix {
clustering_key::one::less_compare_with_prefix _c;
compare_prefix(const schema& s) : _c(s) {}
bool operator()(const clustering_key::prefix::one& prefix, const rows_entry& e) const {
return _c(prefix, e._key);
}
bool operator()(const rows_entry& e, const clustering_key::prefix::one& prefix) const {
return _c(e._key, prefix);
}
};
};
class mutation_partition final {
private:
tombstone _tombstone;
row _static_row;
std::map<clustering_key, deletable_row, key_compare> _rows;
row_tombstone_set _row_tombstones;
boost::intrusive::set<rows_entry, boost::intrusive::compare<rows_entry::compare>> _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
boost::intrusive::set<row_tombstones_entry, boost::intrusive::compare<row_tombstones_entry::compare>> _row_tombstones;
public:
mutation_partition(schema_ptr s)
: _rows(key_compare(s->clustering_key_type))
, _row_tombstones(serialized_compare(s->clustering_key_prefix_type))
: _rows(rows_entry::compare(*s))
, _row_tombstones(row_tombstones_entry::compare(*s))
{ }
mutation_partition(mutation_partition&&) = default;
~mutation_partition();
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t);
void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) {
apply_row_tombstone(schema, {std::move(prefix), std::move(t)});
}
void apply_row_tombstone(schema_ptr schema, std::pair<bytes, tombstone> row_tombstone);
void apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t);
// prefix must not be full
void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t);
void apply(schema_ptr schema, const mutation_partition& p);
const row_tombstone_set& row_tombstones() const { return _row_tombstones; }
row& static_row() { return _static_row; }
row& clustered_row(const clustering_key& key) { return _rows[key].cells; }
row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; }
row* find_row(const clustering_key& key);
tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key);
row& clustered_row(const clustering_key::one& key);
row* find_row(const clustering_key::one& key);
row* find_row(schema_ptr schema, const clustering_key::prefix::one& key);
rows_entry* find_entry(schema_ptr schema, const clustering_key::prefix::one& key);
tombstone tombstone_for_row(schema_ptr schema, const clustering_key::one& key);
tombstone tombstone_for_row(schema_ptr schema, const clustering_key::prefix::one& key);
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
};
class mutation final {
public:
schema_ptr schema;
partition_key key;
partition_key::one key;
mutation_partition p;
public:
mutation(partition_key key_, schema_ptr schema_)
mutation(partition_key::one key_, schema_ptr schema_)
: schema(std::move(schema_))
, key(std::move(key_))
, p(schema)
@@ -100,11 +208,11 @@ public:
}
void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix));
auto& row = p.clustered_row(clustering_key::one::from_clustering_prefix(*schema, prefix));
update_column(row, def, std::move(value));
}
void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
void set_clustered_cell(const clustering_key::one& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(key);
update_column(row, def, std::move(value));
}
@@ -133,13 +241,14 @@ private:
struct column_family {
column_family(schema_ptr schema);
mutation_partition& find_or_create_partition(const bytes& key);
row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key);
mutation_partition* find_partition(const bytes& key);
row* find_row(const bytes& partition_key, const bytes& clustering_key);
column_family(column_family&&) = default;
mutation_partition& find_or_create_partition(const partition_key::one& key);
row& find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key);
mutation_partition* find_partition(const partition_key::one& key);
row* find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key);
schema_ptr _schema;
// partition key -> partition
std::map<bytes, mutation_partition, key_compare> partitions;
std::map<partition_key::one, mutation_partition, partition_key::one::less_compare> partitions;
void apply(const mutation& m);
// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd);
@@ -168,6 +277,7 @@ public:
future<> init_from_data_directory(sstring datadir);
future<> populate(sstring datadir);
keyspace* find_keyspace(const sstring& name);
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name);
future<> stop() { return make_ready_future<>(); }
void assign(database&& db) {
*this = std::move(db);

View File

@@ -25,6 +25,7 @@
#include "core/shared_ptr.hh"
#include "types.hh"
#include "keys.hh"
#include <memory>
namespace dht {
@@ -66,7 +67,7 @@ token minimum_token();
class decorated_key {
public:
token _token;
bytes _key;
partition_key::one _key;
};
class i_partitioner {
@@ -78,10 +79,21 @@ public:
* @param key the raw, client-facing key
* @return decorated version of key
*/
decorated_key decorate_key(const bytes& key) {
decorated_key decorate_key(const partition_key::one& key) {
return { get_token(key), key };
}
/**
* Transform key to object representation of the on-disk format.
*
* @param key the raw, client-facing key
* @return decorated version of key
*/
decorated_key decorate_key(partition_key::one&& key) {
auto token = get_token(key);
return { std::move(token), std::move(key) };
}
/**
* Calculate a token representing the approximate "middle" of the given
* range.
@@ -105,7 +117,7 @@ public:
* (This is NOT a method to create a token from its string representation;
* for that, use tokenFactory.fromString.)
*/
virtual token get_token(const bytes& key) = 0;
virtual token get_token(const partition_key::one& key) = 0;
/**
* @return a randomly generated token

View File

@@ -16,12 +16,13 @@ murmur3_partitioner::normalize(int64_t in) {
}
token
murmur3_partitioner::get_token(const bytes& key) {
murmur3_partitioner::get_token(const partition_key::one& key_) {
bytes_view key(key_);
if (key.empty()) {
return minimum_token();
}
std::array<uint64_t, 2> hash;
utils::murmur_hash::hash3_x64_128(key, 0, key.size(), 0, hash);
utils::murmur_hash::hash3_x64_128(key, 0, hash);
// We don't normalize() the value, since token includes an is-before-everything
// indicator.
// FIXME: will this require a repair when importing a database?

View File

@@ -10,7 +10,7 @@ namespace dht {
class murmur3_partitioner final : public i_partitioner {
public:
virtual token get_token(const bytes& key) override;
virtual token get_token(const partition_key::one& key) override;
virtual bool preserves_order() override { return false; }
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens);
virtual data_type get_token_validator();

287
keys.hh Normal file
View File

@@ -0,0 +1,287 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "schema.hh"
#include "bytes.hh"
#include "types.hh"
//
// This header defines type system for primary key holders.
//
// We distinguish partition keys and clustering keys. API-wise they are almost
// the same, but they're separate type hierarchies.
//
// Clustering keys are further divided into prefixed and non-prefixed (full).
// Non-prefixed keys always have full component set, as defined by schema.
// Prefixed ones can have any number of trailing components missing. They may
// differ in underlying representation.
//
// The main classes are:
//
// partition_key::one - full partition key
// clustering_key::one - full clustering key
// clustering_key::prefix::one - clustering key prefix
//
// These classes wrap only the minimum information required to store the key
// (the key value itself). Any information which can be inferred from schema
// is not stored. Therefore accessors need to be provided with a pointer to
// schema, from which information about structure is extracted.
// FIXME: Keys can't contain nulls, so we could get rid of optionals
// Abstracts serialized tuple, managed by tuple_type.
template <typename TopLevel>
class tuple_wrapper {
protected:
bytes _bytes;
protected:
tuple_wrapper(bytes&& b) : _bytes(std::move(b)) {}
static inline auto type(const schema& s) {
return TopLevel::tuple_type(s);
}
public:
static TopLevel make_empty(const schema& s) {
std::vector<bytes_opt> v;
v.resize(type(s)->types().size());
return from_exploded(s, v);
}
static TopLevel from_exploded(const schema& s, const std::vector<bytes_opt>& v) {
return TopLevel::from_bytes(type(s)->serialize_value(v));
}
static TopLevel from_deeply_exploded(const schema& s, const std::vector<boost::any>& v) {
return TopLevel::from_bytes(type(s)->serialize_value_deep(v));
}
static TopLevel from_single_value(const schema& s, bytes v) {
// FIXME: optimize
std::vector<bytes_opt> values;
values.emplace_back(bytes_opt(std::move(v)));
return from_exploded(s, values);
}
// FIXME: get rid of optional<> and return views
std::vector<bytes_opt> explode(const schema& s) const {
return type(s)->deserialize_value(_bytes);
}
struct less_compare {
data_type _t;
less_compare(const schema& s) : _t(type(s)) {}
bool operator()(const TopLevel& k1, const TopLevel& k2) const {
return _t->less(k1, k2);
}
};
struct hashing {
data_type _t;
hashing(const schema& s) : _t(type(s)) {}
size_t operator()(const TopLevel& o) const {
return _t->hash(o);
}
};
struct equality {
data_type _t;
equality(const schema& s) : _t(type(s)) {}
bool operator()(const TopLevel& o1, const TopLevel& o2) const {
return _t->equal(o1, o2);
}
};
bool equal(const schema& s, const TopLevel& other) const {
return type(s)->equal(*this, other);
}
operator bytes_view() const {
return _bytes;
}
};
template <typename TopLevel, typename PrefixTopLevel>
class prefix_view_on_full_tuple {
public:
using iterator = typename tuple_type<false>::iterator;
private:
bytes_view _b;
unsigned _prefix_len;
iterator _begin;
iterator _end;
public:
prefix_view_on_full_tuple(const schema& s, bytes_view b, unsigned prefix_len)
: _b(b)
, _prefix_len(prefix_len)
, _begin(TopLevel::tuple_type(s)->begin(_b))
, _end(_begin)
{
std::advance(_end, prefix_len);
}
iterator begin() const { return _begin; }
iterator end() const { return _end; }
struct less_compare_with_prefix {
shared_ptr<tuple_type<true>> prefix_type;
less_compare_with_prefix(const schema& s)
: prefix_type(PrefixTopLevel::tuple_type(s))
{ }
bool operator()(const prefix_view_on_full_tuple& k1, const PrefixTopLevel& k2) const {
return lexicographical_compare(prefix_type->types().begin(),
k1.begin(), k1.end(),
prefix_type->begin(k2), prefix_type->end(k2),
optional_less_compare);
}
bool operator()(const PrefixTopLevel& k1, const prefix_view_on_full_tuple& k2) const {
return lexicographical_compare(prefix_type->types().begin(),
prefix_type->begin(k1), prefix_type->end(k1),
k2.begin(), k2.end(),
optional_less_compare);
}
};
};
template <typename TopLevel, typename PrefixTopLevel>
class prefixable_full_tuple : public tuple_wrapper<TopLevel> {
using base = tuple_wrapper<TopLevel>;
protected:
prefixable_full_tuple(bytes&& b) : base(std::move(b)) {}
public:
using prefix_view_type = prefix_view_on_full_tuple<TopLevel, PrefixTopLevel>;
bool is_prefixed_by(const schema& s, const PrefixTopLevel& prefix) const {
auto t = base::type(s);
auto prefix_type = PrefixTopLevel::tuple_type(s);
return ::is_prefixed_by(t->types().begin(),
t->begin(*this), t->end(*this),
prefix_type->begin(prefix), prefix_type->end(prefix),
optional_equal);
}
struct less_compare_with_prefix {
shared_ptr<tuple_type<true>> prefix_type;
shared_ptr<tuple_type<false>> full_type;
less_compare_with_prefix(const schema& s)
: prefix_type(PrefixTopLevel::tuple_type(s))
, full_type(TopLevel::tuple_type(s))
{ }
bool operator()(const TopLevel& k1, const PrefixTopLevel& k2) const {
return lexicographical_compare(prefix_type->types().begin(),
full_type->begin(k1), full_type->end(k1),
prefix_type->begin(k2), prefix_type->end(k2),
optional_less_compare);
}
bool operator()(const PrefixTopLevel& k1, const TopLevel& k2) const {
return lexicographical_compare(prefix_type->types().begin(),
prefix_type->begin(k1), prefix_type->end(k1),
full_type->begin(k2), full_type->end(k2),
optional_less_compare);
}
};
auto prefix_view(const schema& s, unsigned prefix_len) const {
return prefix_view_type(s, *this, prefix_len);
}
};
template <typename TopLevel, typename FullTopLevel>
class prefix_tuple_wrapper : public tuple_wrapper<TopLevel> {
using base = tuple_wrapper<TopLevel>;
protected:
prefix_tuple_wrapper(bytes&& b) : base(std::move(b)) {}
public:
bool is_full(const schema& s) const {
return TopLevel::tuple_type(s)->is_full(base::_bytes);
}
// Can be called only if is_full()
FullTopLevel to_full(const schema& s) const {
return FullTopLevel::from_exploded(s, base::explode(s));
}
bool is_prefixed_by(const schema& s, const TopLevel& prefix) const {
auto t = base::type(s);
return ::is_prefixed_by(t->types().begin(),
t->begin(*this), t->end(*this),
t->begin(prefix), t->end(prefix),
optional_equal);
}
};
class partition_key {
public:
class one;
using full_base = tuple_wrapper<partition_key::one>;
class one : public full_base {
one(bytes&& b) : full_base(std::move(b)) {}
public:
static one from_bytes(bytes b) { return one(std::move(b)); }
static auto tuple_type(const schema& s) { return s.partition_key_type; }
};
};
class clustering_prefix {
std::vector<bytes_opt> _v;
public:
clustering_prefix(std::vector<bytes_opt>&& v) : _v(std::move(v)) {}
clustering_prefix() {}
size_t size() const {
return _v.size();
}
auto const& components() const {
return _v;
}
explicit operator bool() const {
return !_v.empty();
}
bool is_full(const schema& s) const {
return _v.size() == s.clustering_key_size();
}
};
class clustering_key {
public:
class one;
struct prefix {
class one;
};
using full_base = prefixable_full_tuple<clustering_key::one, clustering_key::prefix::one>;
using prefix_base = prefix_tuple_wrapper<clustering_key::prefix::one, clustering_key::one>;
class prefix::one : public prefix_base {
one(bytes&& b) : prefix_base(std::move(b)) {}
public:
static one from_bytes(bytes b) { return one(std::move(b)); }
static auto tuple_type(const schema& s) { return s.clustering_key_prefix_type; }
static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) {
return from_exploded(s, prefix.components());
}
};
class one : public full_base {
one(bytes&& b) : full_base(std::move(b)) {}
public:
static one from_bytes(bytes b) { return one(std::move(b)); }
static auto tuple_type(const schema& s) { return s.clustering_key_type; }
static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) {
assert(prefix.is_full(s));
return from_exploded(s, prefix.components());
}
};
};

108
query.hh
View File

@@ -1,79 +1,83 @@
#pragma once
#include <experimental/optional>
#include "schema.hh"
#include "types.hh"
#include "atomic_cell.hh"
#include "keys.hh"
namespace query {
// A range which can have inclusive, exclusive or open-ended bounds on each end.
template<typename T>
class range {
bytes _start; // empty if range is open on this end
bytes _end; // empty if range is open on this end (_end_inclusive == true) or same as start (_end_inclusive == false)
bool _start_inclusive;
bool _end_inclusive;
template <typename U>
using optional = std::experimental::optional<U>;
public:
range(bytes start, bytes end, bool start_inclusive, bool end_inclusive)
class bound {
T _value;
bool _inclusive;
public:
bound(T value, bool inclusive = true)
: _value(std::move(value))
, _inclusive(inclusive)
{ }
const T& value() const { return _value; }
bool is_inclusive() const { return _inclusive; }
};
private:
optional<bound> _start;
optional<bound> _end;
bool _singular;
public:
range(optional<bound> start, optional<bound> end)
: _start(std::move(start))
, _end(std::move(end))
, _start_inclusive(start_inclusive)
, _end_inclusive(end_inclusive)
, _singular(false)
{ }
range(T value)
: _start(bound(std::move(value), true))
, _end()
, _singular(true)
{ }
public:
static range make(bound start, bound end) {
return range({std::move(start)}, {std::move(end)});
}
static range make_open_ended_both_sides() {
return {{}, {}, true, true};
return {{}, {}};
}
static range make_singular(bytes key) {
return {std::move(key), {}, true, false};
static range make_singular(T value) {
return {std::move(value)};
}
static range make_starting_with(bytes key, bool inclusive = true) {
assert(!key.empty());
return {std::move(key), {}, inclusive, true};
static range make_starting_with(bound b) {
return {{std::move(b)}, {}};
}
static range make_ending_with(bytes key, bool inclusive = true) {
assert(!key.empty());
return {{}, std::move(key), true, inclusive};
}
static range make_both_inclusive(bytes start, bytes end) {
assert(!start.empty());
assert(!end.empty());
return {std::move(start), std::move(end), true, true};
}
static range make_inclusive_exclusive(bytes start, bytes end) {
assert(!start.empty());
assert(!end.empty());
return {std::move(start), std::move(end), true, false};
}
static range make_exclusive_inclusive(bytes start, bytes end) {
assert(!start.empty());
assert(!end.empty());
return {std::move(start), std::move(end), false, true};
}
static range make_both_exclusive(bytes start, bytes end) {
assert(!start.empty());
assert(!end.empty());
return {std::move(start), std::move(end), false, false};
static range make_ending_with(bound b) {
return {{}, {std::move(b)}};
}
bool is_singular() const {
return _end.empty() && !_end_inclusive;
return _singular;
}
bool is_full() const {
return _start.empty() && _end.empty();
return !_start && !_end;
}
void reverse() {
if (!is_singular()) {
if (!_singular) {
std::swap(_start, _end);
std::swap(_end_inclusive, _start_inclusive);
}
}
const bytes& start() const {
return _start;
const T& start_value() const {
return _start->value();
}
const bytes& end() const {
return _end;
const T& end_value() const {
return _end->value();
}
};
using partition_range = range<partition_key::one>;
using clustering_range = range<clustering_key::prefix::one>;
class result {
public:
class partition;
@@ -81,9 +85,7 @@ public:
// TODO: Optimize for singular partition range. In such case the caller
// knows the partition key, no need to send it back.
// std::pair::first is a serialized partition key.
std::vector<std::pair<bytes, partition>> partitions;
std::vector<std::pair<partition_key::one, partition>> partitions;
};
class result::row {
@@ -101,9 +103,7 @@ public:
// TODO: for some queries we could avoid sending keys back, because the client knows
// what the key is (single row query for instance).
//
// std::pair::first is a serialized clustering row key.
std::vector<std::pair<bytes, row>> rows;
std::vector<std::pair<clustering_key::one, row>> rows;
public:
// Returns row count in this result. If there is a static row and no clustering rows, that counts as one row.
// Otherwise, if there are some clustering rows, the static row doesn't count.
@@ -114,11 +114,11 @@ public:
class partition_slice {
public:
std::vector<range> row_ranges;
std::vector<clustering_range> row_ranges;
std::vector<column_id> static_columns; // TODO: consider using bitmap
std::vector<column_id> regular_columns; // TODO: consider using bitmap
public:
partition_slice(std::vector<range> row_ranges, std::vector<column_id> static_columns,
partition_slice(std::vector<clustering_range> row_ranges, std::vector<column_id> static_columns,
std::vector<column_id> regular_columns)
: row_ranges(std::move(row_ranges))
, static_columns(std::move(static_columns))
@@ -130,11 +130,11 @@ class read_command {
public:
sstring keyspace;
sstring column_family;
std::vector<range> partition_ranges; // ranges must be non-overlapping
std::vector<partition_range> partition_ranges; // ranges must be non-overlapping
partition_slice slice;
uint32_t row_limit;
public:
read_command(const sstring& keyspace, const sstring& column_family, std::vector<range> partition_ranges,
read_command(const sstring& keyspace, const sstring& column_family, std::vector<partition_range> partition_ranges,
partition_slice slice, uint32_t row_limit)
: keyspace(keyspace)
, column_family(column_family)

View File

@@ -1155,8 +1155,12 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_level cl) {
if (cmd->partition_ranges.empty()) {
static auto make_empty = [] {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(make_foreign(make_lw_shared<query::result>()));
};
if (cmd->partition_ranges.empty()) {
return make_empty();
}
if (cmd->partition_ranges.size() != 1) {
@@ -1167,7 +1171,7 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
auto& range = cmd->partition_ranges[0];
if (range.is_singular()) {
auto& key = range.start();
auto& key = range.start_value();
auto dk = dht::global_partitioner().decorate_key(key);
auto shard = _db.local().shard_of(dk._token);
return _db.invoke_on(shard, [cmd] (database& db) {
@@ -2128,7 +2132,7 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
}
Set<InetAddress> allEndpoints = Gossiper.instance.getLiveTokenOwners();
int blockFor = allEndpoints.size();
final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
@@ -2159,7 +2163,7 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
{
return !Gossiper.instance.getUnreachableTokenOwners().isEmpty();
}
public interface WritePerformer
{
public void apply(IMutation mutation,
@@ -2320,15 +2324,15 @@ storage_proxy::query(lw_shared_ptr<query::read_command> cmd, db::consistency_lev
public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); }
public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); }
public long getReadRepairAttempted() {
return ReadRepairMetrics.attempted.count();
}
public long getReadRepairRepairedBlocking() {
return ReadRepairMetrics.repairedBlocking.count();
}
public long getReadRepairRepairedBackground() {
return ReadRepairMetrics.repairedBackground.count();
}

View File

@@ -13,8 +13,8 @@ int main(int argc, char* argv[]) {
std::cout << "Timing mutation of single column within one row...\n";
partition_key key = to_bytes("key1");
clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)});
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)});
bytes value = int32_type->decompose(3);
time_it([&] {

View File

@@ -41,7 +41,7 @@ static future<> require_column_has_value(distributed<database>& ddb, const sstri
auto cf = ks->find_column_family(table_name);
assert(cf != nullptr);
auto schema = cf->_schema;
auto pkey = schema->partition_key_type->serialize_value_deep(pk);
auto pkey = partition_key::one::from_deeply_exploded(*schema, pk);
auto dk = dht::global_partitioner().decorate_key(pkey);
auto shard = db.shard_of(dk._token);
return ddb.invoke_on(shard, [pkey = std::move(pkey),
@@ -57,7 +57,7 @@ static future<> require_column_has_value(distributed<database>& ddb, const sstri
auto schema = cf->_schema;
auto p = cf->find_partition(pkey);
assert(p != nullptr);
auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck));
auto row = p->find_row(clustering_key::one::from_deeply_exploded(*schema, ck));
assert(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
assert(col_def != nullptr);

View File

@@ -24,8 +24,8 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
column_family cf(s);
column_definition& r1_col = *s->get_column_definition("r1");
partition_key key = to_bytes("key1");
clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)});
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)});
mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3)));
@@ -39,32 +39,62 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3)));
}
BOOST_AUTO_TEST_CASE(test_multi_level_row_tombstones) {
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}},
{{"c1", int32_type}, {"c2", int32_type}, {"c3", int32_type}},
{{"r1", int32_type}}, {}, utf8_type));
auto ttl = gc_clock::now() + std::chrono::seconds(1);
mutation m(partition_key::one::from_exploded(*s, {to_bytes("key1")}), s);
auto make_prefix = [s] (const std::vector<boost::any>& v) {
return clustering_key::prefix::one::from_deeply_exploded(*s, v);
};
auto make_key = [s] (const std::vector<boost::any>& v) {
return clustering_key::one::from_deeply_exploded(*s, v);
};
m.p.apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 3})), tombstone(9, ttl));
m.p.apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(9, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(8, ttl));
m.p.apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl));
m.p.apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 4, 0})), tombstone(11, ttl));
}
BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
{{"p1", utf8_type}}, {{"c1", int32_type}, {"c2", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
column_family cf(s);
partition_key key = to_bytes("key1");
clustering_key c_key1 = s->clustering_key_type->decompose_value(
{int32_type->decompose(1)}
);
clustering_key c_key2 = s->clustering_key_type->decompose_value(
{int32_type->decompose(2)}
);
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto c_key1 = clustering_key::one::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(0)});
auto c_key1_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(1)});
auto c_key2 = clustering_key::one::from_exploded(*s, {int32_type->decompose(2), int32_type->decompose(0)});
auto c_key2_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(2)});
auto ttl = gc_clock::now() + std::chrono::seconds(1);
mutation m(key, s);
m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl));
m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl));
m.p.apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl));
m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl));
m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl));
m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl));
}
@@ -73,7 +103,7 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) {
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type));
column_family cf(s);
partition_key key = to_bytes("key1");
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto& column = *s->get_column_definition("s1");
map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}};
mutation m1(key, s);
@@ -106,7 +136,7 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) {
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_set_type}}, utf8_type));
column_family cf(s);
partition_key key = to_bytes("key1");
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto& column = *s->get_column_definition("s1");
map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell({})}};
mutation m1(key, s);
@@ -139,8 +169,7 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) {
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_list_type}}, utf8_type));
column_family cf(s);
partition_key key = to_bytes("key1");
auto key_type = timeuuid_type;
auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")});
auto& column = *s->get_column_definition("s1");
auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); };
collection_type_impl::mutation mmut1{{make_key(), make_atomic_cell(int32_type->decompose(101))}};

View File

@@ -107,13 +107,20 @@ public:
}
void get_slice(tcxx::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
auto keyb = to_bytes(key);
auto& ks = lookup_keyspace(_db.local(), _ks_name);
auto schema = ks.find_schema(column_parent.column_family);
if (!_ks) {
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "column family %s not found", column_parent.column_family);
}
auto pk = key_from_thrift(schema, to_bytes(key));
auto dk = dht::global_partitioner().decorate_key(pk);
auto shard = _db.local().shard_of(dk._token);
auto do_get = [this,
key = std::move(key),
pk = std::move(pk),
column_parent = std::move(column_parent),
predicate = std::move(predicate)] (database& db) {
std::vector<ColumnOrSuperColumn> ret;
auto keyb = to_bytes(key);
if (!column_parent.super_column.empty()) {
throw unimplemented_exception();
}
@@ -123,7 +130,7 @@ public:
throw unimplemented_exception();
} else if (predicate.__isset.slice_range) {
auto&& range = predicate.slice_range;
row* rw = cf.find_row(keyb, bytes());
row* rw = cf.find_row(pk, clustering_key::one::make_empty(*cf._schema));
if (rw) {
auto beg = cf._schema->regular_begin();
if (!range.start.empty()) {
@@ -157,8 +164,6 @@ public:
throw make_exception<InvalidRequestException>("empty SlicePredicate");
}
};
auto dk = dht::global_partitioner().decorate_key(keyb);
auto shard = _db.local().shard_of(dk._token);
_db.invoke_on(shard, [do_get = std::move(do_get)] (database& db) {
return do_get(db);
}).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)]
@@ -233,20 +238,20 @@ public:
if (!_ks) {
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "keyspace not set");
}
static bytes null_clustering_key = to_bytes("");
// Would like to use move_iterator below, but Mutation is filled with some const stuff.
parallel_for_each(mutation_map.begin(), mutation_map.end(),
[this] (std::pair<std::string, std::map<std::string, std::vector<Mutation>>> key_cf) {
bytes key = to_bytes(key_cf.first);
bytes thrift_key = to_bytes(key_cf.first);
std::map<std::string, std::vector<Mutation>>& cf_mutations_map = key_cf.second;
return parallel_for_each(
boost::make_move_iterator(cf_mutations_map.begin()),
boost::make_move_iterator(cf_mutations_map.end()),
[this, key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
[this, thrift_key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
sstring cf_name = cf_mutations.first;
const std::vector<Mutation>& mutations = cf_mutations.second;
auto& cf = lookup_column_family(*_ks, cf_name);
mutation m_to_apply(key, cf._schema);
mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema);
auto empty_clustering_key = clustering_key::one::make_empty(*cf._schema);
for (const Mutation& m : mutations) {
if (m.__isset.column_or_supercolumn) {
auto&& cosc = m.column_or_supercolumn;
@@ -268,7 +273,7 @@ public:
ttl = cf._schema->default_time_to_live;
}
auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt();
m_to_apply.set_clustered_cell(null_clustering_key, *def,
m_to_apply.set_clustered_cell(empty_clustering_key, *def,
atomic_cell::one::make_live(col.timestamp, ttl_option, to_bytes(col.value)));
} else if (cosc.__isset.super_column) {
// FIXME: implement
@@ -291,7 +296,7 @@ public:
return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) {
auto& ks = db.keyspaces.at(_ks_name);
auto& cf = ks.column_families.at(cf_name);
cf.apply(std::move(m_to_apply));
cf.apply(m_to_apply);
});
});
}).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) {
@@ -513,13 +518,19 @@ private:
throw make_exception<InvalidRequestException>("column family %s not found", cf_name);
}
}
keyspace& lookup_keyspace(database& db, const sstring ks_name) {
static keyspace& lookup_keyspace(database& db, const sstring& ks_name) {
try {
return db.keyspaces.at(ks_name);
} catch (std::out_of_range&) {
throw make_exception<InvalidRequestException>("Keyspace %s not found", ks_name);
}
}
static partition_key::one key_from_thrift(schema_ptr s, bytes k) {
if (s->partition_key_size() != 1) {
fail(unimplemented::cause::THRIFT);
}
return partition_key::one::from_single_value(*s, std::move(k));
}
};
class handler_factory : public CassandraCobSvIfFactory {

View File

@@ -32,6 +32,10 @@ public:
tuple_type(tuple_type&&) = default;
auto const& types() {
return _types;
}
prefix_type as_prefix() {
return prefix_type(_types);
}
@@ -75,7 +79,7 @@ public:
bytes decompose_value(const value_type& values) {
return ::serialize_value(*this, values);
}
class component_iterator : public std::iterator<std::forward_iterator_tag, std::experimental::optional<bytes_view>> {
class iterator : public std::iterator<std::forward_iterator_tag, std::experimental::optional<bytes_view>> {
private:
ssize_t _types_left;
bytes_view _v;
@@ -111,27 +115,27 @@ public:
}
public:
struct end_iterator_tag {};
component_iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) {
iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) {
read_current();
}
component_iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {}
component_iterator& operator++() {
iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {}
iterator& operator++() {
--_types_left;
read_current();
return *this;
}
const value_type& operator*() const { return _current; }
bool operator!=(const component_iterator& i) const { return _v.begin() != i._v.begin(); }
bool operator==(const component_iterator& i) const { return _v.begin() == i._v.begin(); }
bool operator!=(const iterator& i) const { return _v.begin() != i._v.begin(); }
bool operator==(const iterator& i) const { return _v.begin() == i._v.begin(); }
};
component_iterator begin(const bytes_view& v) const {
return component_iterator(*this, v);
iterator begin(const bytes_view& v) const {
return iterator(*this, v);
}
component_iterator end(const bytes_view& v) const {
return component_iterator(typename component_iterator::end_iterator_tag(), v);
iterator end(const bytes_view& v) const {
return iterator(typename iterator::end_iterator_tag(), v);
}
auto iter_items(const bytes_view& v) {
return boost::iterator_range<component_iterator>(begin(v), end(v));
return boost::iterator_range<iterator>(begin(v), end(v));
}
value_type deserialize_value(bytes_view v) {
std::vector<bytes_opt> result;

View File

@@ -27,6 +27,41 @@ class column_specification;
}
// Like std::lexicographical_compare but injects values from shared sequence (types) to the comparator
// Compare is an abstract_type-aware less comparator, which takes the type as first argument.
template <typename TypesIterator, typename InputIt1, typename InputIt2, typename Compare>
bool lexicographical_compare(TypesIterator types, InputIt1 first1, InputIt1 last1,
InputIt2 first2, InputIt2 last2, Compare comp) {
while (first1 != last1 && first2 != last2) {
if (comp(*types, *first1, *first2)) {
return true;
}
if (comp(*types, *first2, *first1)) {
return false;
}
++first1;
++first2;
++types;
}
return (first1 == last1) && (first2 != last2);
}
// Returns true iff the second sequence is a prefix of the first sequence
// Equality is an abstract_type-aware equality checker which takes the type as first argument.
template <typename TypesIterator, typename InputIt1, typename InputIt2, typename Equality>
bool is_prefixed_by(TypesIterator types, InputIt1 first1, InputIt1 last1,
InputIt2 first2, InputIt2 last2, Equality equality) {
while (first1 != last1 && first2 != last2) {
if (!equality(*types, *first1, *first2)) {
return false;
}
++first1;
++first2;
++types;
}
return first2 == last2;
}
using object_opt = std::experimental::optional<boost::any>;
class marshal_exception : public std::exception {
@@ -81,10 +116,10 @@ public:
}
}
virtual object_opt deserialize(bytes_view v) = 0;
virtual void validate(const bytes& v) {
virtual void validate(bytes_view v) {
// FIXME
}
virtual void validate_collection_member(const bytes& v, const bytes& collection_name) {
virtual void validate_collection_member(bytes_view v, const bytes& collection_name) {
validate(v);
}
virtual bool is_compatible_with(abstract_type& previous) {
@@ -164,6 +199,29 @@ public:
};
using data_type = shared_ptr<abstract_type>;
using bytes_view_opt = std::experimental::optional<bytes_view>;
static inline
bool optional_less_compare(data_type t, bytes_view_opt e1, bytes_view_opt e2) {
if (bool(e1) != bool(e2)) {
return bool(e2);
}
if (!e1) {
return false;
}
return t->less(*e1, *e2);
}
static inline
bool optional_equal(data_type t, bytes_view_opt e1, bytes_view_opt e2) {
if (bool(e1) != bool(e2)) {
return false;
}
if (!e1) {
return true;
}
return t->equal(*e1, *e2);
}
class collection_type_impl : public abstract_type {
static thread_local logging::logger _logger;

View File

@@ -29,6 +29,7 @@ std::ostream& operator<<(std::ostream& out, cause c) {
case cause::LEGACY_COMPOSITE_KEYS: return out << "LEGACY_COMPOSITE_KEYS";
case cause::COLLECTION_RANGE_TOMBSTONES: return out << "COLLECTION_RANGE_TOMBSTONES";
case cause::RANGE_QUERIES: return out << "RANGE_QUERIES";
case cause::THRIFT: return out << "THRIFT";
}
assert(0);
}

View File

@@ -27,7 +27,8 @@ enum class cause {
TOKEN_RESTRICTION,
LEGACY_COMPOSITE_KEYS,
COLLECTION_RANGE_TOMBSTONES,
RANGE_QUERIES
RANGE_QUERIES,
THRIFT
};
void fail(cause what) __attribute__((noreturn));

View File

@@ -26,8 +26,9 @@ namespace utils {
namespace murmur_hash {
uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t seed)
uint32_t hash32(bytes_view data, uint32_t seed)
{
uint32_t length = data.size();
uint32_t m = 0x5bd1e995;
uint32_t r = 24;
@@ -38,13 +39,13 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se
for (uint32_t i = 0; i < len_4; i++)
{
uint32_t i_4 = i << 2;
uint32_t k = data[offset + i_4 + 3];
uint32_t k = data[i_4 + 3];
k = k << 8;
k = k | (data[offset + i_4 + 2] & 0xff);
k = k | (data[i_4 + 2] & 0xff);
k = k << 8;
k = k | (data[offset + i_4 + 1] & 0xff);
k = k | (data[i_4 + 1] & 0xff);
k = k << 8;
k = k | (data[offset + i_4 + 0] & 0xff);
k = k | (data[i_4 + 0] & 0xff);
k *= m;
k ^= (uint32_t)k >> r;
k *= m;
@@ -60,15 +61,15 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se
{
if (left >= 3)
{
h ^= (uint32_t) data[offset + length - 3] << 16;
h ^= (uint32_t) data[length - 3] << 16;
}
if (left >= 2)
{
h ^= (uint32_t) data[offset + length - 2] << 8;
h ^= (uint32_t) data[length - 2] << 8;
}
if (left >= 1)
{
h ^= (uint32_t) data[offset + length - 1];
h ^= (uint32_t) data[length - 1];
}
h *= m;
@@ -81,8 +82,9 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se
return h;
}
uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed)
uint64_t hash2_64(bytes_view key, uint64_t seed)
{
uint32_t length = key.size();
uint64_t m64 = 0xc6a4a7935bd1e995L;
uint32_t r64 = 47;
@@ -94,10 +96,10 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s
{
uint32_t i_8 = i << 3;
uint64_t k64 = ((uint64_t) key[offset+i_8+0] & 0xff) + (((uint64_t) key[offset+i_8+1] & 0xff)<<8) +
(((uint64_t) key[offset+i_8+2] & 0xff)<<16) + (((uint64_t) key[offset+i_8+3] & 0xff)<<24) +
(((uint64_t) key[offset+i_8+4] & 0xff)<<32) + (((uint64_t) key[offset+i_8+5] & 0xff)<<40) +
(((uint64_t) key[offset+i_8+6] & 0xff)<<48) + (((uint64_t) key[offset+i_8+7] & 0xff)<<56);
uint64_t k64 = ((uint64_t) key[i_8+0] & 0xff) + (((uint64_t) key[i_8+1] & 0xff)<<8) +
(((uint64_t) key[i_8+2] & 0xff)<<16) + (((uint64_t) key[i_8+3] & 0xff)<<24) +
(((uint64_t) key[i_8+4] & 0xff)<<32) + (((uint64_t) key[i_8+5] & 0xff)<<40) +
(((uint64_t) key[i_8+6] & 0xff)<<48) + (((uint64_t) key[i_8+7] & 0xff)<<56);
k64 *= m64;
k64 ^= k64 >> r64;
@@ -114,19 +116,19 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s
case 0:
break;
case 7:
h64 ^= (uint64_t) key[offset + length - rem + 6] << 48;
h64 ^= (uint64_t) key[length - rem + 6] << 48;
case 6:
h64 ^= (uint64_t) key[offset + length - rem + 5] << 40;
h64 ^= (uint64_t) key[length - rem + 5] << 40;
case 5:
h64 ^= (uint64_t) key[offset + length - rem + 4] << 32;
h64 ^= (uint64_t) key[length - rem + 4] << 32;
case 4:
h64 ^= (uint64_t) key[offset + length - rem + 3] << 24;
h64 ^= (uint64_t) key[length - rem + 3] << 24;
case 3:
h64 ^= (uint64_t) key[offset + length - rem + 2] << 16;
h64 ^= (uint64_t) key[length - rem + 2] << 16;
case 2:
h64 ^= (uint64_t) key[offset + length - rem + 1] << 8;
h64 ^= (uint64_t) key[length - rem + 1] << 8;
case 1:
h64 ^= (uint64_t) key[offset + length - rem];
h64 ^= (uint64_t) key[length - rem];
h64 *= m64;
}
@@ -137,14 +139,13 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s
return h64;
}
static uint64_t getblock(const bytes &key, uint32_t offset, uint32_t index)
static uint64_t getblock(bytes_view key, uint32_t index)
{
uint32_t i_8 = index << 3;
uint32_t blockOffset = offset + i_8;
return ((uint64_t) key[blockOffset + 0] & 0xff) + (((uint64_t) key[blockOffset + 1] & 0xff) << 8) +
(((uint64_t) key[blockOffset + 2] & 0xff) << 16) + (((uint64_t) key[blockOffset + 3] & 0xff) << 24) +
(((uint64_t) key[blockOffset + 4] & 0xff) << 32) + (((uint64_t) key[blockOffset + 5] & 0xff) << 40) +
(((uint64_t) key[blockOffset + 6] & 0xff) << 48) + (((uint64_t) key[blockOffset + 7] & 0xff) << 56);
return ((uint64_t) key[i_8 + 0] & 0xff) + (((uint64_t) key[i_8 + 1] & 0xff) << 8) +
(((uint64_t) key[i_8 + 2] & 0xff) << 16) + (((uint64_t) key[i_8 + 3] & 0xff) << 24) +
(((uint64_t) key[i_8 + 4] & 0xff) << 32) + (((uint64_t) key[i_8 + 5] & 0xff) << 40) +
(((uint64_t) key[i_8 + 6] & 0xff) << 48) + (((uint64_t) key[i_8 + 7] & 0xff) << 56);
}
static uint64_t rotl64(uint64_t v, uint32_t n)
@@ -163,8 +164,9 @@ static uint64_t fmix(uint64_t k)
return k;
}
void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed, std::array<uint64_t,2> &result)
void hash3_x64_128(bytes_view key, uint64_t seed, std::array<uint64_t,2> &result)
{
uint32_t length = key.size();
const uint32_t nblocks = length >> 4; // Process as 128-bit blocks.
uint64_t h1 = seed;
@@ -178,8 +180,8 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t
for(uint32_t i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock(key, offset, i*2+0);
uint64_t k2 = getblock(key, offset, i*2+1);
uint64_t k1 = getblock(key, i*2+0);
uint64_t k2 = getblock(key, i*2+1);
k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1;
@@ -194,29 +196,29 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t
// tail
// Advance offset to the unprocessed tail of the data.
offset += nblocks * 16;
key.remove_prefix(nblocks * 16);
uint64_t k1 = 0;
uint64_t k2 = 0;
switch(length & 15)
{
case 15: k2 ^= ((uint64_t) key[offset+14]) << 48;
case 14: k2 ^= ((uint64_t) key[offset+13]) << 40;
case 13: k2 ^= ((uint64_t) key[offset+12]) << 32;
case 12: k2 ^= ((uint64_t) key[offset+11]) << 24;
case 11: k2 ^= ((uint64_t) key[offset+10]) << 16;
case 10: k2 ^= ((uint64_t) key[offset+9]) << 8;
case 9: k2 ^= ((uint64_t) key[offset+8]) << 0;
case 15: k2 ^= ((uint64_t) key[14]) << 48;
case 14: k2 ^= ((uint64_t) key[13]) << 40;
case 13: k2 ^= ((uint64_t) key[12]) << 32;
case 12: k2 ^= ((uint64_t) key[11]) << 24;
case 11: k2 ^= ((uint64_t) key[10]) << 16;
case 10: k2 ^= ((uint64_t) key[9]) << 8;
case 9: k2 ^= ((uint64_t) key[8]) << 0;
k2 *= c2; k2 = rotl64(k2,33); k2 *= c1; h2 ^= k2;
case 8: k1 ^= ((uint64_t) key[offset+7]) << 56;
case 7: k1 ^= ((uint64_t) key[offset+6]) << 48;
case 6: k1 ^= ((uint64_t) key[offset+5]) << 40;
case 5: k1 ^= ((uint64_t) key[offset+4]) << 32;
case 4: k1 ^= ((uint64_t) key[offset+3]) << 24;
case 3: k1 ^= ((uint64_t) key[offset+2]) << 16;
case 2: k1 ^= ((uint64_t) key[offset+1]) << 8;
case 1: k1 ^= ((uint64_t) key[offset]);
case 8: k1 ^= ((uint64_t) key[7]) << 56;
case 7: k1 ^= ((uint64_t) key[6]) << 48;
case 6: k1 ^= ((uint64_t) key[5]) << 40;
case 5: k1 ^= ((uint64_t) key[4]) << 32;
case 4: k1 ^= ((uint64_t) key[3]) << 24;
case 3: k1 ^= ((uint64_t) key[2]) << 16;
case 2: k1 ^= ((uint64_t) key[1]) << 8;
case 1: k1 ^= ((uint64_t) key[0]);
k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1;
};

View File

@@ -39,12 +39,9 @@ namespace utils {
namespace murmur_hash
{
uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length,
int32_t seed);
uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length,
uint64_t seed);
void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length,
uint64_t seed, std::array<uint64_t,2> &result);
uint32_t hash32(bytes_view data, int32_t seed);
uint64_t hash2_64(bytes_view key, uint64_t seed);
void hash3_x64_128(bytes_view key, uint64_t seed, std::array<uint64_t,2> &result);
};
} // namespace utils

View File

@@ -31,18 +31,19 @@ namespace validation {
* Based on org.apache.cassandra.thrift.ThriftValidation#validate_key()
*/
void
validate_cql_key(schema_ptr schema, const partition_key& key) {
if (key.empty()) {
validate_cql_key(schema_ptr schema, const partition_key::one& key) {
bytes_view b(key);
if (b.empty()) {
throw exceptions::invalid_request_exception("Key may not be empty");
}
// check that key can be handled by FBUtilities.writeShortByteArray
if (key.size() > max_key_size) {
throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", key.size(), max_key_size));
if (b.size() > max_key_size) {
throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", b.size(), max_key_size));
}
try {
schema->partition_key_type->validate(key);
schema->partition_key_type->validate(b);
} catch (const marshal_exception& e) {
throw exceptions::invalid_request_exception(e.why());
}

View File

@@ -31,7 +31,7 @@ namespace validation {
constexpr size_t max_key_size = std::numeric_limits<uint16_t>::max();
void validate_cql_key(schema_ptr schema, const partition_key& key);
void validate_cql_key(schema_ptr schema, const partition_key::one& key);
schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name);
}