diff --git a/cql3/operation.hh b/cql3/operation.hh index 58e44c535e..b0d8e4ee8d 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -109,7 +109,7 @@ public: /** * Execute the operation. */ - virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0; + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0; /** * A parsed raw UPDATE operation. diff --git a/cql3/restrictions/forwarding_primary_key_restrictions.hh b/cql3/restrictions/forwarding_primary_key_restrictions.hh index e9b5fb770e..c7853b87e2 100644 --- a/cql3/restrictions/forwarding_primary_key_restrictions.hh +++ b/cql3/restrictions/forwarding_primary_key_restrictions.hh @@ -31,16 +31,17 @@ namespace cql3 { namespace restrictions { /** - * A primary_key_restrictions which forwards all its method calls to another - * primary_key_restrictions. Subclasses should override one or more methods to modify the behavior - * of the backing primary_key_restrictions as desired per the decorator pattern. + * A primary_key_restrictions which forwards all its method calls to another + * primary_key_restrictions. Subclasses should override one or more methods to modify the behavior + * of the backing primary_key_restrictions as desired per the decorator pattern. */ -class forwarding_primary_key_restrictions : public primary_key_restrictions { +template +class forwarding_primary_key_restrictions : public primary_key_restrictions { protected: /** * Returns the backing delegate instance that methods are forwarded to. */ - virtual ::shared_ptr get_delegate() = 0; + virtual ::shared_ptr> get_delegate() = 0; public: virtual bool uses_function(const sstring& ks_name, const sstring& function_name) override { @@ -61,11 +62,11 @@ public: } #endif - virtual std::vector values_as_serialized_tuples(const query_options& options) override { - return get_delegate()->values_as_serialized_tuples(options); + virtual std::vector values(const query_options& options) override { + return get_delegate()->values(options); } - virtual std::vector bounds(const query_options& options) override { + virtual std::vector> bounds(const query_options& options) override { return get_delegate()->bounds(options); } @@ -101,7 +102,7 @@ public: virtual void addIndexExpressionTo(List expressions, QueryOptions options) { get_delegate()->addIndexExpressionTo(expressions, options); } -#endif +#endif }; } diff --git a/cql3/restrictions/primary_key_restrictions.hh b/cql3/restrictions/primary_key_restrictions.hh index 6312458878..4684081238 100644 --- a/cql3/restrictions/primary_key_restrictions.hh +++ b/cql3/restrictions/primary_key_restrictions.hh @@ -47,13 +47,14 @@ namespace restrictions { * What was in AbstractPrimaryKeyRestrictions was moved here (In pre 1.8 Java interfaces could not have default * implementations of methods). */ +template class primary_key_restrictions : public restrictions { public: virtual void merge_with(::shared_ptr restriction) = 0; - virtual std::vector values_as_serialized_tuples(const query_options& options) = 0; + virtual std::vector values(const query_options& options) = 0; - virtual std::vector bounds(const query_options& options) = 0; + virtual std::vector> bounds(const query_options& options) = 0; virtual bool is_inclusive(statements::bound b) { return true; } diff --git a/cql3/restrictions/reverse_primary_key_restrictions.hh b/cql3/restrictions/reverse_primary_key_restrictions.hh index 2629d14ab9..3b40456fb0 100644 --- a/cql3/restrictions/reverse_primary_key_restrictions.hh +++ b/cql3/restrictions/reverse_primary_key_restrictions.hh @@ -33,19 +33,20 @@ namespace restrictions { /** * PrimaryKeyRestrictions decorator that reverse the slices. */ -class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions { +template +class reversed_primary_key_restrictions : public forwarding_primary_key_restrictions { private: - ::shared_ptr _restrictions; + ::shared_ptr> _restrictions; protected: - virtual ::shared_ptr get_delegate() override { + virtual ::shared_ptr> get_delegate() override { return _restrictions; } public: - reversed_primary_key_restrictions(shared_ptr restrictions) + reversed_primary_key_restrictions(shared_ptr> restrictions) : _restrictions(std::move(restrictions)) { } - virtual std::vector bounds(const query_options& options) override { + virtual std::vector> bounds(const query_options& options) override { auto ranges = _restrictions->bounds(options); for (auto&& range : ranges) { range.reverse(); diff --git a/cql3/restrictions/single_column_primary_key_restrictions.hh b/cql3/restrictions/single_column_primary_key_restrictions.hh index 2125d8b49f..93f39a9e8e 100644 --- a/cql3/restrictions/single_column_primary_key_restrictions.hh +++ b/cql3/restrictions/single_column_primary_key_restrictions.hh @@ -37,19 +37,20 @@ namespace restrictions { /** * A set of single column restrictions on a primary key part (partition key or clustering key). */ -class single_column_primary_key_restrictions : public primary_key_restrictions { +template +class single_column_primary_key_restrictions : public primary_key_restrictions { + using range_type = query::range; + using range_bound = typename range_type::bound; private: schema_ptr _schema; ::shared_ptr _restrictions; - ::shared_ptr> _tuple; bool _slice; bool _contains; bool _in; public: - single_column_primary_key_restrictions(schema_ptr schema, ::shared_ptr> tuple) + single_column_primary_key_restrictions(schema_ptr schema) : _schema(schema) , _restrictions(::make_shared(schema)) - , _tuple(std::move(tuple)) , _slice(false) , _contains(false) , _in(false) @@ -121,7 +122,7 @@ public: do_merge_with(::static_pointer_cast(restriction)); } - virtual std::vector values_as_serialized_tuples(const query_options& options) override { + virtual std::vector values(const query_options& options) override { std::vector> value_vector; value_vector.reserve(_restrictions->size()); for (auto def : _restrictions->get_column_defs()) { @@ -140,16 +141,16 @@ public: value_vector.emplace_back(std::move(values)); } - std::vector result; + std::vector result; result.reserve(cartesian_product_size(value_vector)); for (auto&& v : make_cartesian_product(value_vector)) { - result.emplace_back(_tuple->serialize_value(v)); + result.emplace_back(ValueType::from_exploded(*_schema, v)); } return result; } - virtual std::vector bounds(const query_options& options) override { - std::vector ranges; + virtual std::vector bounds(const query_options& options) override { + std::vector ranges; std::vector> vec_of_values; // TODO: optimize for all EQ case @@ -164,26 +165,20 @@ public: } if (r->is_slice()) { - // TODO: make restriction::bounds() return query::range to simplify all this if (cartesian_product_is_empty(vec_of_values)) { - auto read_value = [r, &options] (statements::bound b) { + auto read_bound = [r, &options, this] (statements::bound b) -> std::experimental::optional { + if (!r->has_bound(b)) { + return {}; + } auto value = r->bounds(b, options)[0]; if (!value) { throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string())); } - return *value; + return {range_bound(ValueType::from_exploded(*_schema, {*value}), r->is_inclusive(b))}; }; - if (r->has_bound(statements::bound::START) && r->has_bound(statements::bound::END)) { - ranges.emplace_back(query::range(read_value(statements::bound::START), read_value(statements::bound::END), - r->is_inclusive(statements::bound::START), r->is_inclusive(statements::bound::END))); - } else if (r->has_bound(statements::bound::START)) { - ranges.emplace_back(query::range::make_starting_with(read_value(statements::bound::START), - r->is_inclusive(statements::bound::START))); - } else { - assert(r->has_bound(statements::bound::END)); - ranges.emplace_back(query::range::make_ending_with(read_value(statements::bound::END), - r->is_inclusive(statements::bound::END))); - } + ranges.emplace_back(range_type( + read_bound(statements::bound::START), + read_bound(statements::bound::END))); if (def->type->is_reversed()) { ranges.back().reverse(); } @@ -192,31 +187,25 @@ public: ranges.reserve(cartesian_product_size(vec_of_values)); for (auto&& prefix : make_cartesian_product(vec_of_values)) { - auto read_bounds = [r, &prefix, &options, this](bytes& value_holder, bool& inclusive_holder, statements::bound bound) { + auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound { if (r->has_bound(bound)) { auto value = std::move(r->bounds(bound, options)[0]); if (!value) { throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string())); } prefix.emplace_back(std::move(value)); - value_holder = _tuple->serialize_value(prefix); + auto val = ValueType::from_exploded(*_schema, prefix); prefix.pop_back(); - inclusive_holder = r->is_inclusive(bound); + return range_bound(std::move(val), r->is_inclusive(bound)); } else { - value_holder = _tuple->serialize_value(prefix); - inclusive_holder = true; + return range_bound(ValueType::from_exploded(*_schema, prefix)); } }; - bytes start_tuple; - bytes end_tuple; - bool start_inclusive; - bool end_inclusive; + ranges.emplace_back(range_type( + read_bound(statements::bound::START), + read_bound(statements::bound::END))); - read_bounds(start_tuple, start_inclusive, statements::bound::START); - read_bounds(end_tuple, end_inclusive, statements::bound::END); - ranges.emplace_back(query::range(std::move(start_tuple), std::move(end_tuple), - start_inclusive, end_inclusive)); if (def->type->is_reversed()) { ranges.back().reverse(); } @@ -239,7 +228,7 @@ public: ranges.reserve(cartesian_product_size(vec_of_values)); for (auto&& prefix : make_cartesian_product(vec_of_values)) { - ranges.emplace_back(query::range::make_singular(_tuple->serialize_value(prefix))); + ranges.emplace_back(range_type::make_singular(ValueType::from_exploded(*_schema, prefix))); } return std::move(ranges); diff --git a/cql3/restrictions/statement_restrictions.hh b/cql3/restrictions/statement_restrictions.hh index 7b9df469de..272c960f51 100644 --- a/cql3/restrictions/statement_restrictions.hh +++ b/cql3/restrictions/statement_restrictions.hh @@ -50,12 +50,12 @@ private: /** * Restrictions on partitioning columns */ - ::shared_ptr _partition_key_restrictions; + ::shared_ptr> _partition_key_restrictions; /** * Restrictions on clustering columns */ - ::shared_ptr _clustering_columns_restrictions; + ::shared_ptr> _clustering_columns_restrictions; /** * Restriction on non-primary key columns (i.e. secondary index restrictions) @@ -212,14 +212,12 @@ private: auto& def = restriction->get_column_def(); if (def.is_partition_key()) { if (!_partition_key_restrictions) { - _partition_key_restrictions = ::make_shared(_schema, - _schema->partition_key_prefix_type); + _partition_key_restrictions = ::make_shared>(_schema); } _partition_key_restrictions->merge_with(restriction); } else if (def.is_clustering_key()) { if (!_clustering_columns_restrictions) { - _clustering_columns_restrictions = ::make_shared(_schema, - _schema->clustering_key_prefix_type); + _clustering_columns_restrictions = ::make_shared>(_schema); } _clustering_columns_restrictions->merge_with(restriction); } else { @@ -383,9 +381,9 @@ public: * @return the specified bound of the partition key * @throws InvalidRequestException if the boundary cannot be retrieved */ - std::vector get_partition_key_ranges(const query_options& options) const { + std::vector get_partition_key_ranges(const query_options& options) const { if (!_partition_key_restrictions) { - return {query::range::make_open_ended_both_sides()}; + return {query::partition_range::make_open_ended_both_sides()}; } return _partition_key_restrictions->bounds(options); } @@ -510,9 +508,9 @@ public: #endif public: - std::vector get_clustering_bounds(const query_options& options) const { + std::vector get_clustering_bounds(const query_options& options) const { if (!_clustering_columns_restrictions) { - return {query::range::make_open_ended_both_sides()}; + return {query::clustering_range::make_open_ended_both_sides()}; } return _clustering_columns_restrictions->bounds(options); } @@ -559,7 +557,8 @@ public: void reverse() { if (_clustering_columns_restrictions) { - _clustering_columns_restrictions = ::make_shared(_clustering_columns_restrictions); + _clustering_columns_restrictions = ::make_shared>( + _clustering_columns_restrictions); } } }; diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 36477325bd..62b3a32307 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -59,7 +59,6 @@ modification_statement::get_mutations(const query_options& options, bool local, std::vector mutations; mutations.reserve(keys->size()); for (auto key : *keys) { - validation::validate_cql_key(s, key); mutations.emplace_back(std::move(key), s); auto& m = mutations.back(); this->add_update_for_key(m, *prefix, *params_ptr); @@ -70,7 +69,7 @@ modification_statement::get_mutations(const query_options& options, bool local, future> modification_statement::make_update_parameters( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, const query_options& options, bool local, @@ -87,7 +86,7 @@ modification_statement::make_update_parameters( future modification_statement::read_required_rows( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, bool local, db::consistency_level cl) { @@ -181,7 +180,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o components.push_back(val); } } - return components; + return clustering_prefix(std::move(components)); } clustering_prefix @@ -222,9 +221,9 @@ modification_statement::create_clustering_prefix(const query_options& options) { return create_clustering_prefix_internal(options); } -std::vector +std::vector modification_statement::build_partition_keys(const query_options& options) { - std::vector result; + std::vector result; std::vector components; auto remaining = s->partition_key_size(); @@ -244,9 +243,9 @@ modification_statement::build_partition_keys(const query_options& options) { throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text())); } components.push_back(val); - partition_key key = serialize_value(*s->partition_key_type, components); + auto key = partition_key::one::from_exploded(*s, components); validation::validate_cql_key(s, key); - result.push_back(key); + result.emplace_back(std::move(key)); } else { for (auto&& val : values) { if (!val) { @@ -256,9 +255,9 @@ modification_statement::build_partition_keys(const query_options& options) { full_components.reserve(components.size() + 1); auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components)); *i = val; - partition_key key = serialize_value(*s->partition_key_type, full_components); + auto key = partition_key::one::from_exploded(*s, full_components); validation::validate_cql_key(s, key); - result.push_back(key); + result.emplace_back(std::move(key)); } } } else { diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index f573c61c74..2dd1a2288a 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -255,7 +255,7 @@ private: public: void add_key_value(column_definition& def, ::shared_ptr value); void process_where_clause(std::vector where_clause, ::shared_ptr names); - std::vector build_partition_keys(const query_options& options); + std::vector build_partition_keys(const query_options& options); private: clustering_prefix create_clustering_prefix(const query_options& options); @@ -273,7 +273,7 @@ public: protected: future read_required_rows( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, bool local, db::consistency_level cl); @@ -427,7 +427,7 @@ private: public: future> make_update_parameters( - lw_shared_ptr> keys, + lw_shared_ptr> keys, lw_shared_ptr prefix, const query_options& options, bool local, diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index 61c20c763d..1c16aa5c77 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -126,7 +126,7 @@ select_statement::process_results(foreign_ptr> resu for (auto&& e : results->partitions) { // FIXME: deserialize into views - auto key = _schema->partition_key_type->deserialize_value(e.first); + auto key = e.first.explode(*_schema); auto& partition = e.second; if (!partition.static_row.empty() && partition.rows.empty() @@ -145,7 +145,7 @@ select_statement::process_results(foreign_ptr> resu } } else { for (auto&& e : partition.rows) { - auto c_key = _schema->clustering_key_type->deserialize_value(e.first); + auto c_key = e.first.explode(*_schema); auto& cells = e.second.cells; uint32_t static_id = 0; uint32_t regular_id = 0; diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index 91a8a636fc..1c685964eb 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -36,7 +36,7 @@ namespace cql3 { class update_parameters final { public: using prefetched_rows_type = std::experimental::optional< - std::unordered_map>; + std::unordered_map>; private: const gc_clock::duration _ttl; const prefetched_rows_type _prefetched; // For operation that require a read-before-write diff --git a/database.cc b/database.cc index 3c531f3bd9..e79646bc66 100644 --- a/database.cc +++ b/database.cc @@ -82,17 +82,17 @@ schema::schema(sstring ks_name, sstring cf_name, std::vector partition_k column_family::column_family(schema_ptr schema) : _schema(std::move(schema)) - , partitions(key_compare(_schema->thrift.partition_key_type)) { + , partitions(partition_key::one::less_compare(*_schema)) { } mutation_partition* -column_family::find_partition(const bytes& key) { +column_family::find_partition(const partition_key::one& key) { auto i = partitions.find(key); return i == partitions.end() ? nullptr : &i->second; } row* -column_family::find_row(const bytes& partition_key, const bytes& clustering_key) { +column_family::find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) { mutation_partition* p = find_partition(partition_key); if (!p) { return nullptr; @@ -101,19 +101,18 @@ column_family::find_row(const bytes& partition_key, const bytes& clustering_key) } mutation_partition& -column_family::find_or_create_partition(const bytes& key) { +column_family::find_or_create_partition(const partition_key::one& key) { // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key); - if (i == partitions.end() || key != i->first) { + if (i == partitions.end() || !key.equal(*_schema, i->first)) { i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); } return i->second; } row& -column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { +column_family::find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key) { mutation_partition& p = find_or_create_partition(partition_key); - // call lower_bound so we have a hint for the insert, just in case. return p.clustered_row(clustering_key); } @@ -313,6 +312,15 @@ keyspace::find_schema(const sstring& cf_name) { return cf->_schema; } +schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) { + auto ks = find_keyspace(ks_name); + if (!ks) { + return {}; + } + + return ks->find_schema(cf_name); +} + keyspace* database::find_keyspace(const sstring& name) { auto i = keyspaces.find(name); @@ -364,12 +372,17 @@ merge_column(const column_definition& def, } } +mutation_partition::~mutation_partition() { + _rows.clear_and_dispose(std::default_delete()); + _row_tombstones.clear_and_dispose(std::default_delete()); +} + void mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); - for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(schema, entry); + for (auto&& e : p._row_tombstones) { + apply_row_tombstone(schema, e.prefix(), e.t()); } auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { @@ -392,63 +405,103 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { - auto& key = entry.first; - auto i = _rows.find(key); + auto& key = entry.key(); + auto i = _rows.find(key, rows_entry::compare(*schema)); if (i == _rows.end()) { - _rows.emplace_hint(i, entry); + auto e = new rows_entry(entry); + _rows.insert(i, *e); } else { - i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, entry.second.cells, find_regular_column_def); + i->apply(entry.row().t); + merge_cells(i->row().cells, entry.row().cells, find_regular_column_def); } } } tombstone -mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key& key) { +mutation_partition::tombstone_for_row(schema_ptr schema, const clustering_key::one& key) { tombstone t = _tombstone; - auto i = _row_tombstones.lower_bound(key); - if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) { - t.apply(i->second); + auto c = row_tombstones_entry::key_comparator( + clustering_key::one::prefix_view_type::less_compare_with_prefix(*schema)); + + // _row_tombstones contains only strict prefixes + for (unsigned prefix_len = 1; prefix_len < schema->clustering_key_size(); ++prefix_len) { + auto i = _row_tombstones.find(key.prefix_view(*schema, prefix_len), c); + if (i != _row_tombstones.end()) { + t.apply(i->t()); + } } - auto j = _rows.find(key); + auto j = _rows.find(key, rows_entry::compare(*schema)); if (j != _rows.end()) { - t.apply(j->second.t); + t.apply(j->row().t); } return t; } void -mutation_partition::apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { - auto& prefix = row_tombstone.first; - auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) { - _row_tombstones.emplace_hint(i, std::move(row_tombstone)); - } else if (row_tombstone.second > i->second) { - i->second = row_tombstone.second; +mutation_partition::apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t) { + assert(!prefix.is_full(*schema)); + auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(*schema)); + if (i == _row_tombstones.end() || !prefix.equal(*schema, i->prefix())) { + auto e = new row_tombstones_entry(std::move(prefix), t); + _row_tombstones.insert(i, *e); + } else { + i->apply(t); } } void mutation_partition::apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) { - if (prefix.empty()) { + if (!prefix) { apply(t); - } else if (prefix.size() == schema->clustering_key_size()) { - _rows[schema->clustering_key_type->serialize_value(prefix)].t.apply(t); + } else if (prefix.is_full(*schema)) { + apply_delete(schema, clustering_key::one::from_clustering_prefix(*schema, prefix), t); } else { - apply_row_tombstone(schema, {schema->clustering_key_prefix_type->serialize_value(prefix), t}); + apply_row_tombstone(schema, clustering_key::prefix::one::from_clustering_prefix(*schema, prefix), t); } } +void +mutation_partition::apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t) { + auto i = _rows.lower_bound(key, rows_entry::compare(*schema)); + if (i == _rows.end() || !i->key().equal(*schema, key)) { + auto e = new rows_entry(std::move(key)); + e->row().apply(t); + _rows.insert(i, *e); + } else { + i->row().apply(t); + } +} + +rows_entry* +mutation_partition::find_entry(schema_ptr schema, const clustering_key::prefix::one& key) { + auto i = _rows.find(key, rows_entry::compare_prefix(*schema)); + if (i == _rows.end()) { + return nullptr; + } + return &*i; +} + row* -mutation_partition::find_row(const clustering_key& key) { +mutation_partition::find_row(const clustering_key::one& key) { auto i = _rows.find(key); if (i == _rows.end()) { return nullptr; } - return &i->second.cells; + return &i->row().cells; +} + +row& +mutation_partition::clustered_row(const clustering_key::one& key) { + auto i = _rows.find(key); + if (i == _rows.end()) { + auto e = new rows_entry(key); + _rows.insert(i, *e); + return e->row().cells; + } + return i->row().cells; } bool column_definition::is_compact_value() const { @@ -457,7 +510,7 @@ bool column_definition::is_compact_value() const { } std::ostream& operator<<(std::ostream& os, const mutation& m) { - return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p); + return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), static_cast(m.key), m.p); } std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) { @@ -475,24 +528,27 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p if (!range.is_singular()) { fail(unimplemented::cause::RANGE_QUERIES); } - auto& key = range.start(); - if (!_schema->clustering_key_prefix_type->is_full(key)) { + auto& key = range.start_value(); + if (!key.is_full(*_schema)) { fail(unimplemented::cause::RANGE_QUERIES); } - auto row = partition.find_row(key); + + rows_entry* row = partition.find_entry(_schema, key); if (!row) { continue; } + auto&& cells = &row->row().cells; + // FIXME: handle removed rows properly. In CQL rows are separate entities (can be live or dead). - auto row_tombstone = partition.tombstone_for_row(_schema, key); + auto row_tombstone = partition.tombstone_for_row(_schema, row->key()); query::result::row result_row; result_row.cells.reserve(slice.regular_columns.size()); for (auto id : slice.regular_columns) { - auto i = row->find(id); - if (i == row->end()) { + auto i = cells->find(id); + if (i == cells->end()) { result_row.cells.emplace_back(); } else { auto def = _schema->regular_column_at(id); @@ -510,7 +566,7 @@ column_family::get_partition_slice(mutation_partition& partition, const query::p } } } - result.rows.emplace_back(key, std::move(result_row)); + result.rows.emplace_back(row->key(), std::move(result_row)); --limit; } @@ -532,10 +588,7 @@ column_family::query(const query::read_command& cmd) { uint32_t limit = cmd.row_limit; for (auto&& range : cmd.partition_ranges) { if (range.is_singular()) { - auto& key = range.start(); - if (!_schema->partition_key_prefix_type->is_full(key)) { - fail(unimplemented::cause::RANGE_QUERIES); - } + auto& key = range.start_value(); auto partition = find_partition(key); if (!partition) { return make_ready_future>(result); diff --git a/database.hh b/database.hh index ebc650b473..486ca42788 100644 --- a/database.hh +++ b/database.hh @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -34,59 +33,168 @@ #include "timestamp.hh" #include "tombstone.hh" #include "atomic_cell.hh" -#include "bytes.hh" #include "query.hh" - -using partition_key_type = tuple_type<>; -using clustering_key_type = tuple_type<>; -using clustering_prefix_type = tuple_prefix; -using partition_key = bytes; -using clustering_key = bytes; -using clustering_prefix = clustering_prefix_type::value_type; +#include "keys.hh" +#include using row = std::map; struct deletable_row final { tombstone t; row cells; + + void apply(tombstone t_) { + t.apply(t_); + } }; -using row_tombstone_set = std::map; +class row_tombstones_entry : public boost::intrusive::set_base_hook<> { + clustering_key::prefix::one _prefix; + tombstone _t; +public: + row_tombstones_entry(clustering_key::prefix::one&& prefix, tombstone t) + : _prefix(std::move(prefix)) + , _t(std::move(t)) + { } + clustering_key::prefix::one& prefix() { + return _prefix; + } + const clustering_key::prefix::one& prefix() const { + return _prefix; + } + tombstone& t() { + return _t; + } + const tombstone& t() const { + return _t; + } + void apply(tombstone t) { + _t.apply(t); + } + struct compare { + clustering_key::prefix::one::less_compare _c; + compare(const schema& s) : _c(s) {} + bool operator()(const row_tombstones_entry& e1, const row_tombstones_entry& e2) const { + return _c(e1._prefix, e2._prefix); + } + bool operator()(const clustering_key::prefix::one& prefix, const row_tombstones_entry& e) const { + return _c(prefix, e._prefix); + } + bool operator()(const row_tombstones_entry& e, const clustering_key::prefix::one& prefix) const { + return _c(e._prefix, prefix); + } + }; + template + struct delegating_compare { + Comparator _c; + delegating_compare(Comparator&& c) : _c(std::move(c)) {} + template + bool operator()(const Comparable& prefix, const row_tombstones_entry& e) const { + return _c(prefix, e._prefix); + } + template + bool operator()(const row_tombstones_entry& e, const Comparable& prefix) const { + return _c(e._prefix, prefix); + } + }; + template + static auto key_comparator(Comparator&& c) { + return delegating_compare(std::move(c)); + } +}; + +class rows_entry : public boost::intrusive::set_base_hook<> { + clustering_key::one _key; + deletable_row _row; +public: + rows_entry(clustering_key::one&& key) + : _key(std::move(key)) + { } + rows_entry(const clustering_key::one& key) + : _key(key) + { } + rows_entry(const rows_entry& e) + : _key(e._key) + , _row(e._row) + { } + clustering_key::one& key() { + return _key; + } + const clustering_key::one& key() const { + return _key; + } + deletable_row& row() { + return _row; + } + const deletable_row& row() const { + return _row; + } + void apply(tombstone t) { + _row.apply(t); + } + struct compare { + clustering_key::one::less_compare _c; + compare(const schema& s) : _c(s) {} + bool operator()(const rows_entry& e1, const rows_entry& e2) const { + return _c(e1._key, e2._key); + } + bool operator()(const clustering_key::one& key, const rows_entry& e) const { + return _c(key, e._key); + } + bool operator()(const rows_entry& e, const clustering_key::one& key) const { + return _c(e._key, key); + } + }; + struct compare_prefix { + clustering_key::one::less_compare_with_prefix _c; + compare_prefix(const schema& s) : _c(s) {} + bool operator()(const clustering_key::prefix::one& prefix, const rows_entry& e) const { + return _c(prefix, e._key); + } + bool operator()(const rows_entry& e, const clustering_key::prefix::one& prefix) const { + return _c(e._key, prefix); + } + }; +}; class mutation_partition final { private: tombstone _tombstone; row _static_row; - std::map _rows; - row_tombstone_set _row_tombstones; + boost::intrusive::set> _rows; + // Contains only strict prefixes so that we don't have to lookup full keys + // in both _row_tombstones and _rows. + boost::intrusive::set> _row_tombstones; public: mutation_partition(schema_ptr s) - : _rows(key_compare(s->clustering_key_type)) - , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) + : _rows(rows_entry::compare(*s)) + , _row_tombstones(row_tombstones_entry::compare(*s)) { } + mutation_partition(mutation_partition&&) = default; + ~mutation_partition(); void apply(tombstone t) { _tombstone.apply(t); } void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t); - void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) { - apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); - } - void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); + void apply_delete(schema_ptr schema, clustering_key::one&& key, tombstone t); + // prefix must not be full + void apply_row_tombstone(schema_ptr schema, clustering_key::prefix::one prefix, tombstone t); void apply(schema_ptr schema, const mutation_partition& p); - const row_tombstone_set& row_tombstones() const { return _row_tombstones; } row& static_row() { return _static_row; } - row& clustered_row(const clustering_key& key) { return _rows[key].cells; } - row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; } - row* find_row(const clustering_key& key); - tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key); + row& clustered_row(const clustering_key::one& key); + row* find_row(const clustering_key::one& key); + row* find_row(schema_ptr schema, const clustering_key::prefix::one& key); + rows_entry* find_entry(schema_ptr schema, const clustering_key::prefix::one& key); + tombstone tombstone_for_row(schema_ptr schema, const clustering_key::one& key); + tombstone tombstone_for_row(schema_ptr schema, const clustering_key::prefix::one& key); friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); }; class mutation final { public: schema_ptr schema; - partition_key key; + partition_key::one key; mutation_partition p; public: - mutation(partition_key key_, schema_ptr schema_) + mutation(partition_key::one key_, schema_ptr schema_) : schema(std::move(schema_)) , key(std::move(key_)) , p(schema) @@ -100,11 +208,11 @@ public: } void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { - auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); + auto& row = p.clustered_row(clustering_key::one::from_clustering_prefix(*schema, prefix)); update_column(row, def, std::move(value)); } - void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { + void set_clustered_cell(const clustering_key::one& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(key); update_column(row, def, std::move(value)); } @@ -133,13 +241,14 @@ private: struct column_family { column_family(schema_ptr schema); - mutation_partition& find_or_create_partition(const bytes& key); - row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key); - mutation_partition* find_partition(const bytes& key); - row* find_row(const bytes& partition_key, const bytes& clustering_key); + column_family(column_family&&) = default; + mutation_partition& find_or_create_partition(const partition_key::one& key); + row& find_or_create_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key); + mutation_partition* find_partition(const partition_key::one& key); + row* find_row(const partition_key::one& partition_key, const clustering_key::one& clustering_key); schema_ptr _schema; // partition key -> partition - std::map partitions; + std::map partitions; void apply(const mutation& m); // Returns at most "cmd.limit" rows future> query(const query::read_command& cmd); @@ -168,6 +277,7 @@ public: future<> init_from_data_directory(sstring datadir); future<> populate(sstring datadir); keyspace* find_keyspace(const sstring& name); + schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name); future<> stop() { return make_ready_future<>(); } void assign(database&& db) { *this = std::move(db); diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 65c6a9c528..16a725ba39 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -25,6 +25,7 @@ #include "core/shared_ptr.hh" #include "types.hh" +#include "keys.hh" #include namespace dht { @@ -66,7 +67,7 @@ token minimum_token(); class decorated_key { public: token _token; - bytes _key; + partition_key::one _key; }; class i_partitioner { @@ -78,10 +79,21 @@ public: * @param key the raw, client-facing key * @return decorated version of key */ - decorated_key decorate_key(const bytes& key) { + decorated_key decorate_key(const partition_key::one& key) { return { get_token(key), key }; } + /** + * Transform key to object representation of the on-disk format. + * + * @param key the raw, client-facing key + * @return decorated version of key + */ + decorated_key decorate_key(partition_key::one&& key) { + auto token = get_token(key); + return { std::move(token), std::move(key) }; + } + /** * Calculate a token representing the approximate "middle" of the given * range. @@ -105,7 +117,7 @@ public: * (This is NOT a method to create a token from its string representation; * for that, use tokenFactory.fromString.) */ - virtual token get_token(const bytes& key) = 0; + virtual token get_token(const partition_key::one& key) = 0; /** * @return a randomly generated token diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index 19ef36eda1..d55ef11258 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -16,12 +16,13 @@ murmur3_partitioner::normalize(int64_t in) { } token -murmur3_partitioner::get_token(const bytes& key) { +murmur3_partitioner::get_token(const partition_key::one& key_) { + bytes_view key(key_); if (key.empty()) { return minimum_token(); } std::array hash; - utils::murmur_hash::hash3_x64_128(key, 0, key.size(), 0, hash); + utils::murmur_hash::hash3_x64_128(key, 0, hash); // We don't normalize() the value, since token includes an is-before-everything // indicator. // FIXME: will this require a repair when importing a database? diff --git a/dht/murmur3_partitioner.hh b/dht/murmur3_partitioner.hh index 2255ab36b3..12e2ffd6b0 100644 --- a/dht/murmur3_partitioner.hh +++ b/dht/murmur3_partitioner.hh @@ -10,7 +10,7 @@ namespace dht { class murmur3_partitioner final : public i_partitioner { public: - virtual token get_token(const bytes& key) override; + virtual token get_token(const partition_key::one& key) override; virtual bool preserves_order() override { return false; } virtual std::map describe_ownership(const std::vector& sorted_tokens); virtual data_type get_token_validator(); diff --git a/keys.hh b/keys.hh new file mode 100644 index 0000000000..d241a94a7f --- /dev/null +++ b/keys.hh @@ -0,0 +1,287 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include "schema.hh" +#include "bytes.hh" +#include "types.hh" + +// +// This header defines type system for primary key holders. +// +// We distinguish partition keys and clustering keys. API-wise they are almost +// the same, but they're separate type hierarchies. +// +// Clustering keys are further divided into prefixed and non-prefixed (full). +// Non-prefixed keys always have full component set, as defined by schema. +// Prefixed ones can have any number of trailing components missing. They may +// differ in underlying representation. +// +// The main classes are: +// +// partition_key::one - full partition key +// clustering_key::one - full clustering key +// clustering_key::prefix::one - clustering key prefix +// +// These classes wrap only the minimum information required to store the key +// (the key value itself). Any information which can be inferred from schema +// is not stored. Therefore accessors need to be provided with a pointer to +// schema, from which information about structure is extracted. + +// FIXME: Keys can't contain nulls, so we could get rid of optionals + +// Abstracts serialized tuple, managed by tuple_type. +template +class tuple_wrapper { +protected: + bytes _bytes; +protected: + tuple_wrapper(bytes&& b) : _bytes(std::move(b)) {} + + static inline auto type(const schema& s) { + return TopLevel::tuple_type(s); + } +public: + static TopLevel make_empty(const schema& s) { + std::vector v; + v.resize(type(s)->types().size()); + return from_exploded(s, v); + } + + static TopLevel from_exploded(const schema& s, const std::vector& v) { + return TopLevel::from_bytes(type(s)->serialize_value(v)); + } + + static TopLevel from_deeply_exploded(const schema& s, const std::vector& v) { + return TopLevel::from_bytes(type(s)->serialize_value_deep(v)); + } + + static TopLevel from_single_value(const schema& s, bytes v) { + // FIXME: optimize + std::vector values; + values.emplace_back(bytes_opt(std::move(v))); + return from_exploded(s, values); + } + + // FIXME: get rid of optional<> and return views + std::vector explode(const schema& s) const { + return type(s)->deserialize_value(_bytes); + } + + struct less_compare { + data_type _t; + less_compare(const schema& s) : _t(type(s)) {} + bool operator()(const TopLevel& k1, const TopLevel& k2) const { + return _t->less(k1, k2); + } + }; + + struct hashing { + data_type _t; + hashing(const schema& s) : _t(type(s)) {} + size_t operator()(const TopLevel& o) const { + return _t->hash(o); + } + }; + + struct equality { + data_type _t; + equality(const schema& s) : _t(type(s)) {} + bool operator()(const TopLevel& o1, const TopLevel& o2) const { + return _t->equal(o1, o2); + } + }; + + bool equal(const schema& s, const TopLevel& other) const { + return type(s)->equal(*this, other); + } + + operator bytes_view() const { + return _bytes; + } +}; + +template +class prefix_view_on_full_tuple { +public: + using iterator = typename tuple_type::iterator; +private: + bytes_view _b; + unsigned _prefix_len; + iterator _begin; + iterator _end; +public: + prefix_view_on_full_tuple(const schema& s, bytes_view b, unsigned prefix_len) + : _b(b) + , _prefix_len(prefix_len) + , _begin(TopLevel::tuple_type(s)->begin(_b)) + , _end(_begin) + { + std::advance(_end, prefix_len); + } + + iterator begin() const { return _begin; } + iterator end() const { return _end; } + + struct less_compare_with_prefix { + shared_ptr> prefix_type; + + less_compare_with_prefix(const schema& s) + : prefix_type(PrefixTopLevel::tuple_type(s)) + { } + + bool operator()(const prefix_view_on_full_tuple& k1, const PrefixTopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + k1.begin(), k1.end(), + prefix_type->begin(k2), prefix_type->end(k2), + optional_less_compare); + } + + bool operator()(const PrefixTopLevel& k1, const prefix_view_on_full_tuple& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + prefix_type->begin(k1), prefix_type->end(k1), + k2.begin(), k2.end(), + optional_less_compare); + } + }; +}; + +template +class prefixable_full_tuple : public tuple_wrapper { + using base = tuple_wrapper; +protected: + prefixable_full_tuple(bytes&& b) : base(std::move(b)) {} +public: + using prefix_view_type = prefix_view_on_full_tuple; + + bool is_prefixed_by(const schema& s, const PrefixTopLevel& prefix) const { + auto t = base::type(s); + auto prefix_type = PrefixTopLevel::tuple_type(s); + return ::is_prefixed_by(t->types().begin(), + t->begin(*this), t->end(*this), + prefix_type->begin(prefix), prefix_type->end(prefix), + optional_equal); + } + + struct less_compare_with_prefix { + shared_ptr> prefix_type; + shared_ptr> full_type; + + less_compare_with_prefix(const schema& s) + : prefix_type(PrefixTopLevel::tuple_type(s)) + , full_type(TopLevel::tuple_type(s)) + { } + + bool operator()(const TopLevel& k1, const PrefixTopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + full_type->begin(k1), full_type->end(k1), + prefix_type->begin(k2), prefix_type->end(k2), + optional_less_compare); + } + + bool operator()(const PrefixTopLevel& k1, const TopLevel& k2) const { + return lexicographical_compare(prefix_type->types().begin(), + prefix_type->begin(k1), prefix_type->end(k1), + full_type->begin(k2), full_type->end(k2), + optional_less_compare); + } + }; + + auto prefix_view(const schema& s, unsigned prefix_len) const { + return prefix_view_type(s, *this, prefix_len); + } +}; + +template +class prefix_tuple_wrapper : public tuple_wrapper { + using base = tuple_wrapper; +protected: + prefix_tuple_wrapper(bytes&& b) : base(std::move(b)) {} +public: + bool is_full(const schema& s) const { + return TopLevel::tuple_type(s)->is_full(base::_bytes); + } + + // Can be called only if is_full() + FullTopLevel to_full(const schema& s) const { + return FullTopLevel::from_exploded(s, base::explode(s)); + } + + bool is_prefixed_by(const schema& s, const TopLevel& prefix) const { + auto t = base::type(s); + return ::is_prefixed_by(t->types().begin(), + t->begin(*this), t->end(*this), + t->begin(prefix), t->end(prefix), + optional_equal); + } +}; + +class partition_key { +public: + class one; + + using full_base = tuple_wrapper; + + class one : public full_base { + one(bytes&& b) : full_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.partition_key_type; } + }; +}; + +class clustering_prefix { + std::vector _v; +public: + clustering_prefix(std::vector&& v) : _v(std::move(v)) {} + clustering_prefix() {} + size_t size() const { + return _v.size(); + } + auto const& components() const { + return _v; + } + explicit operator bool() const { + return !_v.empty(); + } + bool is_full(const schema& s) const { + return _v.size() == s.clustering_key_size(); + } +}; + +class clustering_key { +public: + class one; + + struct prefix { + class one; + }; + + using full_base = prefixable_full_tuple; + using prefix_base = prefix_tuple_wrapper; + + class prefix::one : public prefix_base { + one(bytes&& b) : prefix_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.clustering_key_prefix_type; } + + static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) { + return from_exploded(s, prefix.components()); + } + }; + + class one : public full_base { + one(bytes&& b) : full_base(std::move(b)) {} + public: + static one from_bytes(bytes b) { return one(std::move(b)); } + static auto tuple_type(const schema& s) { return s.clustering_key_type; } + + static one from_clustering_prefix(const schema& s, const clustering_prefix& prefix) { + assert(prefix.is_full(s)); + return from_exploded(s, prefix.components()); + } + }; +}; diff --git a/query.hh b/query.hh index ae006323a2..a690308502 100644 --- a/query.hh +++ b/query.hh @@ -1,79 +1,83 @@ #pragma once +#include #include "schema.hh" #include "types.hh" #include "atomic_cell.hh" +#include "keys.hh" namespace query { // A range which can have inclusive, exclusive or open-ended bounds on each end. +template class range { - bytes _start; // empty if range is open on this end - bytes _end; // empty if range is open on this end (_end_inclusive == true) or same as start (_end_inclusive == false) - bool _start_inclusive; - bool _end_inclusive; + template + using optional = std::experimental::optional; public: - range(bytes start, bytes end, bool start_inclusive, bool end_inclusive) + class bound { + T _value; + bool _inclusive; + public: + bound(T value, bool inclusive = true) + : _value(std::move(value)) + , _inclusive(inclusive) + { } + const T& value() const { return _value; } + bool is_inclusive() const { return _inclusive; } + }; +private: + optional _start; + optional _end; + bool _singular; +public: + range(optional start, optional end) : _start(std::move(start)) , _end(std::move(end)) - , _start_inclusive(start_inclusive) - , _end_inclusive(end_inclusive) + , _singular(false) + { } + range(T value) + : _start(bound(std::move(value), true)) + , _end() + , _singular(true) { } public: + static range make(bound start, bound end) { + return range({std::move(start)}, {std::move(end)}); + } static range make_open_ended_both_sides() { - return {{}, {}, true, true}; + return {{}, {}}; } - static range make_singular(bytes key) { - return {std::move(key), {}, true, false}; + static range make_singular(T value) { + return {std::move(value)}; } - static range make_starting_with(bytes key, bool inclusive = true) { - assert(!key.empty()); - return {std::move(key), {}, inclusive, true}; + static range make_starting_with(bound b) { + return {{std::move(b)}, {}}; } - static range make_ending_with(bytes key, bool inclusive = true) { - assert(!key.empty()); - return {{}, std::move(key), true, inclusive}; - } - static range make_both_inclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), true, true}; - } - static range make_inclusive_exclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), true, false}; - } - static range make_exclusive_inclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), false, true}; - } - static range make_both_exclusive(bytes start, bytes end) { - assert(!start.empty()); - assert(!end.empty()); - return {std::move(start), std::move(end), false, false}; + static range make_ending_with(bound b) { + return {{}, {std::move(b)}}; } bool is_singular() const { - return _end.empty() && !_end_inclusive; + return _singular; } bool is_full() const { - return _start.empty() && _end.empty(); + return !_start && !_end; } void reverse() { - if (!is_singular()) { + if (!_singular) { std::swap(_start, _end); - std::swap(_end_inclusive, _start_inclusive); } } - const bytes& start() const { - return _start; + const T& start_value() const { + return _start->value(); } - const bytes& end() const { - return _end; + const T& end_value() const { + return _end->value(); } }; +using partition_range = range; +using clustering_range = range; + class result { public: class partition; @@ -81,9 +85,7 @@ public: // TODO: Optimize for singular partition range. In such case the caller // knows the partition key, no need to send it back. - - // std::pair::first is a serialized partition key. - std::vector> partitions; + std::vector> partitions; }; class result::row { @@ -101,9 +103,7 @@ public: // TODO: for some queries we could avoid sending keys back, because the client knows // what the key is (single row query for instance). - // - // std::pair::first is a serialized clustering row key. - std::vector> rows; + std::vector> rows; public: // Returns row count in this result. If there is a static row and no clustering rows, that counts as one row. // Otherwise, if there are some clustering rows, the static row doesn't count. @@ -114,11 +114,11 @@ public: class partition_slice { public: - std::vector row_ranges; + std::vector row_ranges; std::vector static_columns; // TODO: consider using bitmap std::vector regular_columns; // TODO: consider using bitmap public: - partition_slice(std::vector row_ranges, std::vector static_columns, + partition_slice(std::vector row_ranges, std::vector static_columns, std::vector regular_columns) : row_ranges(std::move(row_ranges)) , static_columns(std::move(static_columns)) @@ -130,11 +130,11 @@ class read_command { public: sstring keyspace; sstring column_family; - std::vector partition_ranges; // ranges must be non-overlapping + std::vector partition_ranges; // ranges must be non-overlapping partition_slice slice; uint32_t row_limit; public: - read_command(const sstring& keyspace, const sstring& column_family, std::vector partition_ranges, + read_command(const sstring& keyspace, const sstring& column_family, std::vector partition_ranges, partition_slice slice, uint32_t row_limit) : keyspace(keyspace) , column_family(column_family) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3fe8600faf..0590f2c010 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1155,8 +1155,12 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future>> storage_proxy::query(lw_shared_ptr cmd, db::consistency_level cl) { - if (cmd->partition_ranges.empty()) { + static auto make_empty = [] { return make_ready_future>>(make_foreign(make_lw_shared())); + }; + + if (cmd->partition_ranges.empty()) { + return make_empty(); } if (cmd->partition_ranges.size() != 1) { @@ -1167,7 +1171,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev auto& range = cmd->partition_ranges[0]; if (range.is_singular()) { - auto& key = range.start(); + auto& key = range.start_value(); auto dk = dht::global_partitioner().decorate_key(key); auto shard = _db.local().shard_of(dk._token); return _db.invoke_on(shard, [cmd] (database& db) { @@ -2128,7 +2132,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev } Set allEndpoints = Gossiper.instance.getLiveTokenOwners(); - + int blockFor = allEndpoints.size(); final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); @@ -2159,7 +2163,7 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev { return !Gossiper.instance.getUnreachableTokenOwners().isEmpty(); } - + public interface WritePerformer { public void apply(IMutation mutation, @@ -2320,15 +2324,15 @@ storage_proxy::query(lw_shared_ptr cmd, db::consistency_lev public void setTruncateRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setTruncateRpcTimeout(timeoutInMillis); } public void reloadTriggerClasses() { TriggerExecutor.instance.reloadClasses(); } - + public long getReadRepairAttempted() { return ReadRepairMetrics.attempted.count(); } - + public long getReadRepairRepairedBlocking() { return ReadRepairMetrics.repairedBlocking.count(); } - + public long getReadRepairRepairedBackground() { return ReadRepairMetrics.repairedBackground.count(); } diff --git a/tests/perf/perf_mutation.cc b/tests/perf/perf_mutation.cc index 08a83755d4..4231e86376 100644 --- a/tests/perf/perf_mutation.cc +++ b/tests/perf/perf_mutation.cc @@ -13,8 +13,8 @@ int main(int argc, char* argv[]) { std::cout << "Timing mutation of single column within one row...\n"; - partition_key key = to_bytes("key1"); - clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)}); bytes value = int32_type->decompose(3); time_it([&] { diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index f6649f45d3..d3894b1669 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -41,7 +41,7 @@ static future<> require_column_has_value(distributed& ddb, const sstri auto cf = ks->find_column_family(table_name); assert(cf != nullptr); auto schema = cf->_schema; - auto pkey = schema->partition_key_type->serialize_value_deep(pk); + auto pkey = partition_key::one::from_deeply_exploded(*schema, pk); auto dk = dht::global_partitioner().decorate_key(pkey); auto shard = db.shard_of(dk._token); return ddb.invoke_on(shard, [pkey = std::move(pkey), @@ -57,7 +57,7 @@ static future<> require_column_has_value(distributed& ddb, const sstri auto schema = cf->_schema; auto p = cf->find_partition(pkey); assert(p != nullptr); - auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); + auto row = p->find_row(clustering_key::one::from_deeply_exploded(*schema, ck)); assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); assert(col_def != nullptr); diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 41a1188ff9..edf184411d 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -24,8 +24,8 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { column_family cf(s); column_definition& r1_col = *s->get_column_definition("r1"); - partition_key key = to_bytes("key1"); - clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key = clustering_key::one::from_exploded(*s, {int32_type->decompose(2)}); mutation m(key, s); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3))); @@ -39,32 +39,62 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); } +BOOST_AUTO_TEST_CASE(test_multi_level_row_tombstones) { + auto s = make_lw_shared(schema(some_keyspace, some_column_family, + {{"p1", utf8_type}}, + {{"c1", int32_type}, {"c2", int32_type}, {"c3", int32_type}}, + {{"r1", int32_type}}, {}, utf8_type)); + + auto ttl = gc_clock::now() + std::chrono::seconds(1); + + mutation m(partition_key::one::from_exploded(*s, {to_bytes("key1")}), s); + + auto make_prefix = [s] (const std::vector& v) { + return clustering_key::prefix::one::from_deeply_exploded(*s, v); + }; + auto make_key = [s] (const std::vector& v) { + return clustering_key::one::from_deeply_exploded(*s, v); + }; + + m.p.apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 3})), tombstone(9, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(9, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(8, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl)); + + m.p.apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 2, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 3, 0})), tombstone(11, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, make_key({1, 4, 0})), tombstone(11, ttl)); +} + BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, - {{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); + {{"p1", utf8_type}}, {{"c1", int32_type}, {"c2", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); - - clustering_key c_key1 = s->clustering_key_type->decompose_value( - {int32_type->decompose(1)} - ); - - clustering_key c_key2 = s->clustering_key_type->decompose_value( - {int32_type->decompose(2)} - ); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); + auto c_key1 = clustering_key::one::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(0)}); + auto c_key1_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(1)}); + auto c_key2 = clustering_key::one::from_exploded(*s, {int32_type->decompose(2), int32_type->decompose(0)}); + auto c_key2_prefix = clustering_key::prefix::one::from_exploded(*s, {int32_type->decompose(2)}); auto ttl = gc_clock::now() + std::chrono::seconds(1); mutation m(key, s); - m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl)); - m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl)); + m.p.apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl)); - m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); } @@ -73,7 +103,7 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}}; mutation m1(key, s); @@ -106,7 +136,7 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_set_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell({})}}; mutation m1(key, s); @@ -139,8 +169,7 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) { auto s = make_lw_shared(schema(some_keyspace, some_column_family, {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_list_type}}, utf8_type)); column_family cf(s); - partition_key key = to_bytes("key1"); - auto key_type = timeuuid_type; + auto key = partition_key::one::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); }; collection_type_impl::mutation mmut1{{make_key(), make_atomic_cell(int32_type->decompose(101))}}; diff --git a/thrift/handler.cc b/thrift/handler.cc index a937187a3a..9c64d952a4 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -107,13 +107,20 @@ public: } void get_slice(tcxx::function const& _return)> cob, tcxx::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { - auto keyb = to_bytes(key); + auto& ks = lookup_keyspace(_db.local(), _ks_name); + auto schema = ks.find_schema(column_parent.column_family); + if (!_ks) { + return complete_with_exception(std::move(exn_cob), "column family %s not found", column_parent.column_family); + } + auto pk = key_from_thrift(schema, to_bytes(key)); + auto dk = dht::global_partitioner().decorate_key(pk); + auto shard = _db.local().shard_of(dk._token); + auto do_get = [this, - key = std::move(key), + pk = std::move(pk), column_parent = std::move(column_parent), predicate = std::move(predicate)] (database& db) { std::vector ret; - auto keyb = to_bytes(key); if (!column_parent.super_column.empty()) { throw unimplemented_exception(); } @@ -123,7 +130,7 @@ public: throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { auto&& range = predicate.slice_range; - row* rw = cf.find_row(keyb, bytes()); + row* rw = cf.find_row(pk, clustering_key::one::make_empty(*cf._schema)); if (rw) { auto beg = cf._schema->regular_begin(); if (!range.start.empty()) { @@ -157,8 +164,6 @@ public: throw make_exception("empty SlicePredicate"); } }; - auto dk = dht::global_partitioner().decorate_key(keyb); - auto shard = _db.local().shard_of(dk._token); _db.invoke_on(shard, [do_get = std::move(do_get)] (database& db) { return do_get(db); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] @@ -233,20 +238,20 @@ public: if (!_ks) { return complete_with_exception(std::move(exn_cob), "keyspace not set"); } - static bytes null_clustering_key = to_bytes(""); // Would like to use move_iterator below, but Mutation is filled with some const stuff. parallel_for_each(mutation_map.begin(), mutation_map.end(), [this] (std::pair>> key_cf) { - bytes key = to_bytes(key_cf.first); + bytes thrift_key = to_bytes(key_cf.first); std::map>& cf_mutations_map = key_cf.second; return parallel_for_each( boost::make_move_iterator(cf_mutations_map.begin()), boost::make_move_iterator(cf_mutations_map.end()), - [this, key] (std::pair> cf_mutations) { + [this, thrift_key] (std::pair> cf_mutations) { sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; auto& cf = lookup_column_family(*_ks, cf_name); - mutation m_to_apply(key, cf._schema); + mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema); + auto empty_clustering_key = clustering_key::one::make_empty(*cf._schema); for (const Mutation& m : mutations) { if (m.__isset.column_or_supercolumn) { auto&& cosc = m.column_or_supercolumn; @@ -268,7 +273,7 @@ public: ttl = cf._schema->default_time_to_live; } auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); - m_to_apply.set_clustered_cell(null_clustering_key, *def, + m_to_apply.set_clustered_cell(empty_clustering_key, *def, atomic_cell::one::make_live(col.timestamp, ttl_option, to_bytes(col.value))); } else if (cosc.__isset.super_column) { // FIXME: implement @@ -291,7 +296,7 @@ public: return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { auto& ks = db.keyspaces.at(_ks_name); auto& cf = ks.column_families.at(cf_name); - cf.apply(std::move(m_to_apply)); + cf.apply(m_to_apply); }); }); }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { @@ -513,13 +518,19 @@ private: throw make_exception("column family %s not found", cf_name); } } - keyspace& lookup_keyspace(database& db, const sstring ks_name) { + static keyspace& lookup_keyspace(database& db, const sstring& ks_name) { try { return db.keyspaces.at(ks_name); } catch (std::out_of_range&) { throw make_exception("Keyspace %s not found", ks_name); } } + static partition_key::one key_from_thrift(schema_ptr s, bytes k) { + if (s->partition_key_size() != 1) { + fail(unimplemented::cause::THRIFT); + } + return partition_key::one::from_single_value(*s, std::move(k)); + } }; class handler_factory : public CassandraCobSvIfFactory { diff --git a/tuple.hh b/tuple.hh index 9433297221..5064877433 100644 --- a/tuple.hh +++ b/tuple.hh @@ -32,6 +32,10 @@ public: tuple_type(tuple_type&&) = default; + auto const& types() { + return _types; + } + prefix_type as_prefix() { return prefix_type(_types); } @@ -75,7 +79,7 @@ public: bytes decompose_value(const value_type& values) { return ::serialize_value(*this, values); } - class component_iterator : public std::iterator> { + class iterator : public std::iterator> { private: ssize_t _types_left; bytes_view _v; @@ -111,27 +115,27 @@ public: } public: struct end_iterator_tag {}; - component_iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) { + iterator(const tuple_type& t, const bytes_view& v) : _types_left(t._types.size()), _v(v) { read_current(); } - component_iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {} - component_iterator& operator++() { + iterator(end_iterator_tag, const bytes_view& v) : _v(nullptr, 0) {} + iterator& operator++() { --_types_left; read_current(); return *this; } const value_type& operator*() const { return _current; } - bool operator!=(const component_iterator& i) const { return _v.begin() != i._v.begin(); } - bool operator==(const component_iterator& i) const { return _v.begin() == i._v.begin(); } + bool operator!=(const iterator& i) const { return _v.begin() != i._v.begin(); } + bool operator==(const iterator& i) const { return _v.begin() == i._v.begin(); } }; - component_iterator begin(const bytes_view& v) const { - return component_iterator(*this, v); + iterator begin(const bytes_view& v) const { + return iterator(*this, v); } - component_iterator end(const bytes_view& v) const { - return component_iterator(typename component_iterator::end_iterator_tag(), v); + iterator end(const bytes_view& v) const { + return iterator(typename iterator::end_iterator_tag(), v); } auto iter_items(const bytes_view& v) { - return boost::iterator_range(begin(v), end(v)); + return boost::iterator_range(begin(v), end(v)); } value_type deserialize_value(bytes_view v) { std::vector result; diff --git a/types.hh b/types.hh index 993741cdc0..d28f0040bd 100644 --- a/types.hh +++ b/types.hh @@ -27,6 +27,41 @@ class column_specification; } +// Like std::lexicographical_compare but injects values from shared sequence (types) to the comparator +// Compare is an abstract_type-aware less comparator, which takes the type as first argument. +template +bool lexicographical_compare(TypesIterator types, InputIt1 first1, InputIt1 last1, + InputIt2 first2, InputIt2 last2, Compare comp) { + while (first1 != last1 && first2 != last2) { + if (comp(*types, *first1, *first2)) { + return true; + } + if (comp(*types, *first2, *first1)) { + return false; + } + ++first1; + ++first2; + ++types; + } + return (first1 == last1) && (first2 != last2); +} + +// Returns true iff the second sequence is a prefix of the first sequence +// Equality is an abstract_type-aware equality checker which takes the type as first argument. +template +bool is_prefixed_by(TypesIterator types, InputIt1 first1, InputIt1 last1, + InputIt2 first2, InputIt2 last2, Equality equality) { + while (first1 != last1 && first2 != last2) { + if (!equality(*types, *first1, *first2)) { + return false; + } + ++first1; + ++first2; + ++types; + } + return first2 == last2; +} + using object_opt = std::experimental::optional; class marshal_exception : public std::exception { @@ -81,10 +116,10 @@ public: } } virtual object_opt deserialize(bytes_view v) = 0; - virtual void validate(const bytes& v) { + virtual void validate(bytes_view v) { // FIXME } - virtual void validate_collection_member(const bytes& v, const bytes& collection_name) { + virtual void validate_collection_member(bytes_view v, const bytes& collection_name) { validate(v); } virtual bool is_compatible_with(abstract_type& previous) { @@ -164,6 +199,29 @@ public: }; using data_type = shared_ptr; +using bytes_view_opt = std::experimental::optional; + +static inline +bool optional_less_compare(data_type t, bytes_view_opt e1, bytes_view_opt e2) { + if (bool(e1) != bool(e2)) { + return bool(e2); + } + if (!e1) { + return false; + } + return t->less(*e1, *e2); +} + +static inline +bool optional_equal(data_type t, bytes_view_opt e1, bytes_view_opt e2) { + if (bool(e1) != bool(e2)) { + return false; + } + if (!e1) { + return true; + } + return t->equal(*e1, *e2); +} class collection_type_impl : public abstract_type { static thread_local logging::logger _logger; diff --git a/unimplemented.cc b/unimplemented.cc index 0b9aa3c3c0..b3b9a86306 100644 --- a/unimplemented.cc +++ b/unimplemented.cc @@ -29,6 +29,7 @@ std::ostream& operator<<(std::ostream& out, cause c) { case cause::LEGACY_COMPOSITE_KEYS: return out << "LEGACY_COMPOSITE_KEYS"; case cause::COLLECTION_RANGE_TOMBSTONES: return out << "COLLECTION_RANGE_TOMBSTONES"; case cause::RANGE_QUERIES: return out << "RANGE_QUERIES"; + case cause::THRIFT: return out << "THRIFT"; } assert(0); } diff --git a/unimplemented.hh b/unimplemented.hh index fb4a182c16..42b6f1194c 100644 --- a/unimplemented.hh +++ b/unimplemented.hh @@ -27,7 +27,8 @@ enum class cause { TOKEN_RESTRICTION, LEGACY_COMPOSITE_KEYS, COLLECTION_RANGE_TOMBSTONES, - RANGE_QUERIES + RANGE_QUERIES, + THRIFT }; void fail(cause what) __attribute__((noreturn)); diff --git a/utils/murmur_hash.cc b/utils/murmur_hash.cc index a5e6357494..ec013aa2b5 100644 --- a/utils/murmur_hash.cc +++ b/utils/murmur_hash.cc @@ -26,8 +26,9 @@ namespace utils { namespace murmur_hash { -uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t seed) +uint32_t hash32(bytes_view data, uint32_t seed) { + uint32_t length = data.size(); uint32_t m = 0x5bd1e995; uint32_t r = 24; @@ -38,13 +39,13 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se for (uint32_t i = 0; i < len_4; i++) { uint32_t i_4 = i << 2; - uint32_t k = data[offset + i_4 + 3]; + uint32_t k = data[i_4 + 3]; k = k << 8; - k = k | (data[offset + i_4 + 2] & 0xff); + k = k | (data[i_4 + 2] & 0xff); k = k << 8; - k = k | (data[offset + i_4 + 1] & 0xff); + k = k | (data[i_4 + 1] & 0xff); k = k << 8; - k = k | (data[offset + i_4 + 0] & 0xff); + k = k | (data[i_4 + 0] & 0xff); k *= m; k ^= (uint32_t)k >> r; k *= m; @@ -60,15 +61,15 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se { if (left >= 3) { - h ^= (uint32_t) data[offset + length - 3] << 16; + h ^= (uint32_t) data[length - 3] << 16; } if (left >= 2) { - h ^= (uint32_t) data[offset + length - 2] << 8; + h ^= (uint32_t) data[length - 2] << 8; } if (left >= 1) { - h ^= (uint32_t) data[offset + length - 1]; + h ^= (uint32_t) data[length - 1]; } h *= m; @@ -81,8 +82,9 @@ uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, uint32_t se return h; } -uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed) +uint64_t hash2_64(bytes_view key, uint64_t seed) { + uint32_t length = key.size(); uint64_t m64 = 0xc6a4a7935bd1e995L; uint32_t r64 = 47; @@ -94,10 +96,10 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s { uint32_t i_8 = i << 3; - uint64_t k64 = ((uint64_t) key[offset+i_8+0] & 0xff) + (((uint64_t) key[offset+i_8+1] & 0xff)<<8) + - (((uint64_t) key[offset+i_8+2] & 0xff)<<16) + (((uint64_t) key[offset+i_8+3] & 0xff)<<24) + - (((uint64_t) key[offset+i_8+4] & 0xff)<<32) + (((uint64_t) key[offset+i_8+5] & 0xff)<<40) + - (((uint64_t) key[offset+i_8+6] & 0xff)<<48) + (((uint64_t) key[offset+i_8+7] & 0xff)<<56); + uint64_t k64 = ((uint64_t) key[i_8+0] & 0xff) + (((uint64_t) key[i_8+1] & 0xff)<<8) + + (((uint64_t) key[i_8+2] & 0xff)<<16) + (((uint64_t) key[i_8+3] & 0xff)<<24) + + (((uint64_t) key[i_8+4] & 0xff)<<32) + (((uint64_t) key[i_8+5] & 0xff)<<40) + + (((uint64_t) key[i_8+6] & 0xff)<<48) + (((uint64_t) key[i_8+7] & 0xff)<<56); k64 *= m64; k64 ^= k64 >> r64; @@ -114,19 +116,19 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s case 0: break; case 7: - h64 ^= (uint64_t) key[offset + length - rem + 6] << 48; + h64 ^= (uint64_t) key[length - rem + 6] << 48; case 6: - h64 ^= (uint64_t) key[offset + length - rem + 5] << 40; + h64 ^= (uint64_t) key[length - rem + 5] << 40; case 5: - h64 ^= (uint64_t) key[offset + length - rem + 4] << 32; + h64 ^= (uint64_t) key[length - rem + 4] << 32; case 4: - h64 ^= (uint64_t) key[offset + length - rem + 3] << 24; + h64 ^= (uint64_t) key[length - rem + 3] << 24; case 3: - h64 ^= (uint64_t) key[offset + length - rem + 2] << 16; + h64 ^= (uint64_t) key[length - rem + 2] << 16; case 2: - h64 ^= (uint64_t) key[offset + length - rem + 1] << 8; + h64 ^= (uint64_t) key[length - rem + 1] << 8; case 1: - h64 ^= (uint64_t) key[offset + length - rem]; + h64 ^= (uint64_t) key[length - rem]; h64 *= m64; } @@ -137,14 +139,13 @@ uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, uint64_t s return h64; } -static uint64_t getblock(const bytes &key, uint32_t offset, uint32_t index) +static uint64_t getblock(bytes_view key, uint32_t index) { uint32_t i_8 = index << 3; - uint32_t blockOffset = offset + i_8; - return ((uint64_t) key[blockOffset + 0] & 0xff) + (((uint64_t) key[blockOffset + 1] & 0xff) << 8) + - (((uint64_t) key[blockOffset + 2] & 0xff) << 16) + (((uint64_t) key[blockOffset + 3] & 0xff) << 24) + - (((uint64_t) key[blockOffset + 4] & 0xff) << 32) + (((uint64_t) key[blockOffset + 5] & 0xff) << 40) + - (((uint64_t) key[blockOffset + 6] & 0xff) << 48) + (((uint64_t) key[blockOffset + 7] & 0xff) << 56); + return ((uint64_t) key[i_8 + 0] & 0xff) + (((uint64_t) key[i_8 + 1] & 0xff) << 8) + + (((uint64_t) key[i_8 + 2] & 0xff) << 16) + (((uint64_t) key[i_8 + 3] & 0xff) << 24) + + (((uint64_t) key[i_8 + 4] & 0xff) << 32) + (((uint64_t) key[i_8 + 5] & 0xff) << 40) + + (((uint64_t) key[i_8 + 6] & 0xff) << 48) + (((uint64_t) key[i_8 + 7] & 0xff) << 56); } static uint64_t rotl64(uint64_t v, uint32_t n) @@ -163,8 +164,9 @@ static uint64_t fmix(uint64_t k) return k; } -void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t seed, std::array &result) +void hash3_x64_128(bytes_view key, uint64_t seed, std::array &result) { + uint32_t length = key.size(); const uint32_t nblocks = length >> 4; // Process as 128-bit blocks. uint64_t h1 = seed; @@ -178,8 +180,8 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t for(uint32_t i = 0; i < nblocks; i++) { - uint64_t k1 = getblock(key, offset, i*2+0); - uint64_t k2 = getblock(key, offset, i*2+1); + uint64_t k1 = getblock(key, i*2+0); + uint64_t k2 = getblock(key, i*2+1); k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1; @@ -194,29 +196,29 @@ void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, uint64_t // tail // Advance offset to the unprocessed tail of the data. - offset += nblocks * 16; + key.remove_prefix(nblocks * 16); uint64_t k1 = 0; uint64_t k2 = 0; switch(length & 15) { - case 15: k2 ^= ((uint64_t) key[offset+14]) << 48; - case 14: k2 ^= ((uint64_t) key[offset+13]) << 40; - case 13: k2 ^= ((uint64_t) key[offset+12]) << 32; - case 12: k2 ^= ((uint64_t) key[offset+11]) << 24; - case 11: k2 ^= ((uint64_t) key[offset+10]) << 16; - case 10: k2 ^= ((uint64_t) key[offset+9]) << 8; - case 9: k2 ^= ((uint64_t) key[offset+8]) << 0; + case 15: k2 ^= ((uint64_t) key[14]) << 48; + case 14: k2 ^= ((uint64_t) key[13]) << 40; + case 13: k2 ^= ((uint64_t) key[12]) << 32; + case 12: k2 ^= ((uint64_t) key[11]) << 24; + case 11: k2 ^= ((uint64_t) key[10]) << 16; + case 10: k2 ^= ((uint64_t) key[9]) << 8; + case 9: k2 ^= ((uint64_t) key[8]) << 0; k2 *= c2; k2 = rotl64(k2,33); k2 *= c1; h2 ^= k2; - case 8: k1 ^= ((uint64_t) key[offset+7]) << 56; - case 7: k1 ^= ((uint64_t) key[offset+6]) << 48; - case 6: k1 ^= ((uint64_t) key[offset+5]) << 40; - case 5: k1 ^= ((uint64_t) key[offset+4]) << 32; - case 4: k1 ^= ((uint64_t) key[offset+3]) << 24; - case 3: k1 ^= ((uint64_t) key[offset+2]) << 16; - case 2: k1 ^= ((uint64_t) key[offset+1]) << 8; - case 1: k1 ^= ((uint64_t) key[offset]); + case 8: k1 ^= ((uint64_t) key[7]) << 56; + case 7: k1 ^= ((uint64_t) key[6]) << 48; + case 6: k1 ^= ((uint64_t) key[5]) << 40; + case 5: k1 ^= ((uint64_t) key[4]) << 32; + case 4: k1 ^= ((uint64_t) key[3]) << 24; + case 3: k1 ^= ((uint64_t) key[2]) << 16; + case 2: k1 ^= ((uint64_t) key[1]) << 8; + case 1: k1 ^= ((uint64_t) key[0]); k1 *= c1; k1 = rotl64(k1,31); k1 *= c2; h1 ^= k1; }; diff --git a/utils/murmur_hash.hh b/utils/murmur_hash.hh index fa20c5f601..28b73fd96d 100644 --- a/utils/murmur_hash.hh +++ b/utils/murmur_hash.hh @@ -39,12 +39,9 @@ namespace utils { namespace murmur_hash { - uint32_t hash32(const bytes &data, uint32_t offset, uint32_t length, - int32_t seed); - uint64_t hash2_64(const bytes &key, uint32_t offset, uint32_t length, - uint64_t seed); - void hash3_x64_128(const bytes &key, uint32_t offset, uint32_t length, - uint64_t seed, std::array &result); + uint32_t hash32(bytes_view data, int32_t seed); + uint64_t hash2_64(bytes_view key, uint64_t seed); + void hash3_x64_128(bytes_view key, uint64_t seed, std::array &result); }; } // namespace utils diff --git a/validation.cc b/validation.cc index 7963b3c2e6..34d20e0420 100644 --- a/validation.cc +++ b/validation.cc @@ -31,18 +31,19 @@ namespace validation { * Based on org.apache.cassandra.thrift.ThriftValidation#validate_key() */ void -validate_cql_key(schema_ptr schema, const partition_key& key) { - if (key.empty()) { +validate_cql_key(schema_ptr schema, const partition_key::one& key) { + bytes_view b(key); + if (b.empty()) { throw exceptions::invalid_request_exception("Key may not be empty"); } // check that key can be handled by FBUtilities.writeShortByteArray - if (key.size() > max_key_size) { - throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", key.size(), max_key_size)); + if (b.size() > max_key_size) { + throw exceptions::invalid_request_exception(sprint("Key length of %d is longer than maximum of %d", b.size(), max_key_size)); } try { - schema->partition_key_type->validate(key); + schema->partition_key_type->validate(b); } catch (const marshal_exception& e) { throw exceptions::invalid_request_exception(e.why()); } diff --git a/validation.hh b/validation.hh index f19e7723f0..5ff3fe2291 100644 --- a/validation.hh +++ b/validation.hh @@ -31,7 +31,7 @@ namespace validation { constexpr size_t max_key_size = std::numeric_limits::max(); -void validate_cql_key(schema_ptr schema, const partition_key& key); +void validate_cql_key(schema_ptr schema, const partition_key::one& key); schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name); }