diff --git a/configure.py b/configure.py index e20f365219..3b1eba9300 100755 --- a/configure.py +++ b/configure.py @@ -99,6 +99,12 @@ modes = { }, } +urchin_tests = [ + 'tests/urchin/mutation_test', + 'tests/urchin/types_test', + 'tests/perf/perf_mutation', +] + tests = [ 'tests/test-reactor', 'tests/fileiotest', @@ -121,7 +127,7 @@ tests = [ 'tests/output_stream_test', 'tests/udp_zero_copy', 'tests/test-serialization' - ] + ] + urchin_tests apps = [ 'apps/httpd/httpd', @@ -221,9 +227,7 @@ memcache = [ cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'Cassandra') -deps = { - 'seastar': (['main.cc', - 'database.cc', +urchin_core = (['database.cc', 'log.cc', 'cql/binary.rl', 'cql/server.cc', @@ -244,17 +248,20 @@ deps = { 'service/storage_proxy.cc', 'cql3/operator.cc', 'cql3/relation.cc', - 'db/db.cc', - 'io/io.cc', + 'cql3/column_identifier.cc', + 'db/db.cc', + 'io/io.cc', 'utils/utils.cc', 'utils/UUID_gen.cc', - 'gms/version_generator.cc', + 'gms/version_generator.cc', 'dht/dht.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] - + core - ), + + core) + +deps = { + 'seastar': ['main.cc'] + urchin_core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, 'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache, @@ -281,6 +288,9 @@ deps = { 'tests/test-serialization': ['tests/test-serialization.cc'], } +for t in urchin_tests: + deps[t] = urchin_core + [t + '.cc'] + warnings = [ '-Wno-mismatched-tags', # clang-only ] diff --git a/cql3/cf_name.hh b/cql3/cf_name.hh index 0cb8227406..6b15e31074 100644 --- a/cql3/cf_name.hh +++ b/cql3/cf_name.hh @@ -52,11 +52,11 @@ public: return bool(_ks_name); } - sstring get_keyspace() const { + const sstring& get_keyspace() const { return *_ks_name; } - sstring get_column_family() const { + const sstring& get_column_family() const { return _cf_name; } diff --git a/cql3/column_identifier.cc b/cql3/column_identifier.cc new file mode 100644 index 0000000000..ba7c3c02fc --- /dev/null +++ b/cql3/column_identifier.cc @@ -0,0 +1,31 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "cql3/column_identifier.hh" + +namespace cql3 { + +::shared_ptr +column_identifier::raw::prepare_column_identifier(schema_ptr schema) { + if (schema->regular_column_name_type == utf8_type) { + return ::make_shared(_text, true); + } + + // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator + // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to + // make a special check for those and treat them normally. See CASSANDRA-8178. + auto text_bytes = to_bytes(_text); + auto def = schema->get_column_definition(text_bytes); + if (def) { + return ::make_shared(std::move(text_bytes), _text); + } + + return ::make_shared(schema->regular_column_name_type->from_string(_raw_text), _text); +} + +std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) { + return out << id._text; +} + +} diff --git a/cql3/column_identifier.hh b/cql3/column_identifier.hh index 28b643d466..6b976afb43 100644 --- a/cql3/column_identifier.hh +++ b/cql3/column_identifier.hh @@ -31,6 +31,7 @@ #include #include +#include namespace cql3 { @@ -137,7 +138,7 @@ public: * once the comparator is known with prepare(). This should only be used with identifiers that are actual * column names. See CASSANDRA-8178 for more background. */ - class raw : public selectable::raw { + class raw final : public selectable::raw { private: const sstring _raw_text; sstring _text; @@ -155,25 +156,7 @@ public: return prepare_column_identifier(s); } - ::shared_ptr prepare_column_identifier(schema_ptr s) { -#if 0 - AbstractType comparator = cfm.comparator.asAbstractType(); - if (cfm.getIsDense() || comparator instanceof CompositeType || comparator instanceof UTF8Type) - return new ColumnIdentifier(text, true); - - // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator - // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to - // make a special check for those and treat them normally. See CASSANDRA-8178. - ByteBuffer bufferName = ByteBufferUtil.bytes(text); - for (ColumnDefinition def : cfm.partitionKeyColumns()) - { - if (def.name.bytes.equals(bufferName)) - return new ColumnIdentifier(text, true); - } - return new ColumnIdentifier(comparator.fromString(rawText), text); -#endif - throw std::runtime_error("not implemented"); - } + ::shared_ptr prepare_column_identifier(schema_ptr s); virtual bool processes_selection() const override { return false; @@ -195,6 +178,7 @@ public: } friend std::hash; + friend std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id); }; }; diff --git a/cql3/constants.hh b/cql3/constants.hh index ac1913419c..d73dac31d5 100644 --- a/cql3/constants.hh +++ b/cql3/constants.hh @@ -336,9 +336,14 @@ public: public: using operation::operation; - virtual void execute(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override { + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override { bytes_opt value = _t->bind_and_get(params._options); - m.set_cell(prefix, column.id, value ? params.make_cell(*value) : params.make_dead_cell()); + auto cell = value ? params.make_cell(*value) : params.make_dead_cell(); + if (column.is_static()) { + m.set_static_cell(column, std::move(cell)); + } else { + m.set_clustered_cell(prefix, column, std::move(cell)); + } } }; diff --git a/cql3/cql_statement.hh b/cql3/cql_statement.hh index 8ddde8477e..1c6de1a138 100644 --- a/cql3/cql_statement.hh +++ b/cql3/cql_statement.hh @@ -28,6 +28,7 @@ #include "transport/messages/result_message.hh" #include "service/client_state.hh" #include "service/query_state.hh" +#include "service/storage_proxy.hh" #include "cql3/query_options.hh" #include "database.hh" @@ -62,7 +63,7 @@ public: * @param options options for this query (consistency, variables, pageSize, ...) */ virtual future> - execute(service::query_state& state, const query_options& options) = 0; + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) = 0; /** * Variant of execute used for internal query against the system tables, and thus only query the local node = 0. @@ -70,7 +71,7 @@ public: * @param state the current query state */ virtual future> - execute_internal(service::query_state& state, const query_options& options) = 0; + execute_internal(database& db, service::query_state& state, const query_options& options) = 0; virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const = 0; }; diff --git a/cql3/operation.hh b/cql3/operation.hh index d9afc44466..16d5f36cd6 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -27,7 +27,7 @@ #include "core/shared_ptr.hh" -#include "db/api.hh" +#include "database.hh" #include @@ -104,7 +104,7 @@ public: /** * Execute the operation. */ - virtual void execute(api::mutation& m, const api::clustering_prefix& row_key, const update_parameters& params) = 0; + virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0; /** * A parsed raw UPDATE operation. diff --git a/cql3/query_options.hh b/cql3/query_options.hh index 7bcb604ef3..01f2106628 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -25,7 +25,7 @@ #ifndef CQL3_CQL_QUERY_OPTIONS_HH #define CQL3_CQL_QUERY_OPTIONS_HH -#include "db/api.hh" +#include "database.hh" #include "db/consistency_level.hh" #include "service/query_state.hh" diff --git a/cql3/statements/alter_keyspace_statement.hh b/cql3/statements/alter_keyspace_statement.hh index 1a28ddb71c..182618f0ee 100644 --- a/cql3/statements/alter_keyspace_statement.hh +++ b/cql3/statements/alter_keyspace_statement.hh @@ -60,7 +60,7 @@ public: , _attrs{std::move(attrs)} { } - virtual sstring keyspace() const override { + virtual const sstring& keyspace() const override { return _name; } diff --git a/cql3/statements/cf_statement.hh b/cql3/statements/cf_statement.hh index 93836ef6f7..20321fc891 100644 --- a/cql3/statements/cf_statement.hh +++ b/cql3/statements/cf_statement.hh @@ -62,12 +62,12 @@ public: } } - virtual sstring keyspace() const { + virtual const sstring& keyspace() const { assert(_cf_name->has_keyspace()); // "The statement hasn't be prepared correctly"; return _cf_name->get_keyspace(); } - virtual sstring column_family() const { + virtual const sstring& column_family() const { return _cf_name->get_column_family(); } }; diff --git a/cql3/statements/delete_statement.cc b/cql3/statements/delete_statement.cc index 3a84877d2d..e5fa52024c 100644 --- a/cql3/statements/delete_statement.cc +++ b/cql3/statements/delete_statement.cc @@ -28,9 +28,9 @@ namespace cql3 { namespace statements { -void delete_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) { +void delete_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) { if (_column_operations.empty()) { - m.p.apply_delete(prefix, params.make_tombstone()); + m.p.apply_delete(s, prefix, params.make_tombstone()); return; } diff --git a/cql3/statements/delete_statement.hh b/cql3/statements/delete_statement.hh index d2f9a289cf..975bd0e75c 100644 --- a/cql3/statements/delete_statement.hh +++ b/cql3/statements/delete_statement.hh @@ -27,7 +27,7 @@ #include "cql3/statements/modification_statement.hh" #include "cql3/attributes.hh" #include "cql3/operation.hh" -#include "db/api.hh" +#include "database.hh" namespace cql3 { @@ -62,7 +62,7 @@ public: return false; } - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override; #if 0 protected void validateWhereClauseForConditions() throws InvalidRequestException diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 25a7607fd9..da880e7ae7 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -50,13 +50,13 @@ operator<<(std::ostream& out, modification_statement::statement_type t) { return out; } -future> +future> modification_statement::get_mutations(const query_options& options, bool local, int64_t now) { auto keys = make_lw_shared(build_partition_keys(options)); auto prefix = make_lw_shared(create_clustering_prefix(options)); return make_update_parameters(keys, prefix, options, local, now).then( [this, keys = std::move(keys), prefix = std::move(prefix), now] (auto params_ptr) { - std::vector mutations; + std::vector mutations; mutations.reserve(keys->size()); for (auto key : *keys) { validation::validate_cql_key(s, key); @@ -70,8 +70,8 @@ modification_statement::get_mutations(const query_options& options, bool local, future> modification_statement::make_update_parameters( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, const query_options& options, bool local, int64_t now) { @@ -87,8 +87,8 @@ modification_statement::make_update_parameters( future modification_statement::read_required_rows( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, bool local, db::consistency_level cl) { if (!requires_read()) { @@ -148,7 +148,7 @@ modification_statement::get_first_empty_key() { return {}; } -api::clustering_prefix +clustering_prefix modification_statement::create_clustering_prefix_internal(const query_options& options) { std::vector components; const column_definition* first_empty_key = nullptr; @@ -184,7 +184,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o return components; } -api::clustering_prefix +clustering_prefix modification_statement::create_clustering_prefix(const query_options& options) { // If the only updated/deleted columns are static, then we don't need clustering columns. // And in fact, unless it is an INSERT, we reject if clustering columns are provided as that @@ -222,9 +222,9 @@ modification_statement::create_clustering_prefix(const query_options& options) { return create_clustering_prefix_internal(options); } -std::vector +std::vector modification_statement::build_partition_keys(const query_options& options) { - std::vector result; + std::vector result; std::vector components; auto remaining = s->partition_key.size(); @@ -244,7 +244,7 @@ modification_statement::build_partition_keys(const query_options& options) { throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text())); } components.push_back(val); - api::partition_key key = serialize_value(*s->partition_key_type, components); + partition_key key = serialize_value(*s->partition_key_type, components); validation::validate_cql_key(s, key); result.push_back(key); } else { @@ -256,7 +256,7 @@ modification_statement::build_partition_keys(const query_options& options) { full_components.reserve(components.size() + 1); auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components)); *i = val; - api::partition_key key = serialize_value(*s->partition_key_type, full_components); + partition_key key = serialize_value(*s->partition_key_type, full_components); validation::validate_cql_key(s, key); result.push_back(key); } @@ -278,23 +278,23 @@ modification_statement::build_partition_keys(const query_options& options) { } future> -modification_statement::execute(service::query_state& qs, const query_options& options) { +modification_statement::execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { if (has_conditions() && options.get_protocol_version() == 1) { throw new exceptions::invalid_request_exception("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2."); } if (has_conditions()) { - return execute_with_condition(qs, options); + return execute_with_condition(proxy, qs, options); } - return execute_without_condition(qs, options).then([] { + return execute_without_condition(proxy, qs, options).then([] { return make_ready_future>( std::experimental::optional{}); }); } future<> -modification_statement::execute_without_condition(service::query_state& qs, const query_options& options) { +modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { auto cl = options.get_consistency(); if (is_counter()) { db::validate_counter_for_write(s, cl); @@ -302,16 +302,16 @@ modification_statement::execute_without_condition(service::query_state& qs, cons db::validate_for_write(s->ks_name, cl); } - return get_mutations(options, false, options.get_timestamp(qs)).then([cl] (auto mutations) { + return get_mutations(options, false, options.get_timestamp(qs)).then([cl, &proxy] (auto mutations) { if (mutations.empty()) { return now(); } - return service::storage_proxy::mutate_with_triggers(std::move(mutations), cl, false); + return proxy.mutate_with_triggers(std::move(mutations), cl, false); }); } future> -modification_statement::execute_with_condition(service::query_state& qs, const query_options& options) { +modification_statement::execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { unimplemented::lwt(); #if 0 List keys = buildPartitionKeyNames(options); @@ -356,7 +356,7 @@ modification_statement::add_key_value(column_definition& def, ::shared_ptr } void -modification_statement::precess_where_clause(std::vector where_clause, ::shared_ptr names) { +modification_statement::process_where_clause(std::vector where_clause, ::shared_ptr names) { for (auto&& relation : where_clause) { if (relation->is_multi_column()) { throw exceptions::invalid_request_exception(sprint("Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", relation->to_string())); @@ -387,6 +387,65 @@ modification_statement::precess_where_clause(std::vector where_cla } } +std::unique_ptr +modification_statement::parsed::prepare(database& db) { + auto bound_names = get_bound_variables(); + auto statement = prepare(db, bound_names); + return std::make_unique(std::move(statement), *bound_names); +} + +::shared_ptr +modification_statement::parsed::prepare(database& db, ::shared_ptr bound_names) { + schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); + + auto prepared_attributes = _attrs->prepare(keyspace(), column_family()); + prepared_attributes->collect_marker_specification(bound_names); + + ::shared_ptr stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes)); + + if (_if_not_exists || _if_exists || !_conditions.empty()) { + if (stmt->is_counter()) { + throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables"); + } + if (_attrs->timestamp) { + throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates"); + } + + if (_if_not_exists) { + // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense. + // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes. + assert(_conditions.empty()); + assert(!_if_exists); + stmt->set_if_not_exist_condition(); + } else if (_if_exists) { + assert(_conditions.empty()); + assert(!_if_not_exists); + stmt->set_if_exist_condition(); + } else { + for (auto&& entry : _conditions) { + auto id = entry.first->prepare_column_identifier(schema); + column_definition* def = get_column_definition(schema, *id); + if (!def) { + throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id)); + } + + auto condition = entry.second->prepare(keyspace(), *def); + condition->collect_marker_specificaton(bound_names); + + switch (def->kind) { + case column_definition::PARTITION: + case column_definition::CLUSTERING: + throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id)); + default: + stmt->add_condition(condition); + } + } + } + stmt->validate_where_clause_for_conditions(); + } + return stmt; +} + } } diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index d24ae96111..dc01ea8be9 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -35,7 +35,6 @@ #include "cql3/operation.hh" #include "cql3/relation.hh" -#include "db/column_family.hh" #include "db/consistency_level.hh" #include "core/shared_ptr.hh" @@ -161,7 +160,7 @@ public: virtual bool require_full_clustering_key() const = 0; - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) = 0; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0; virtual int get_bound_terms() override { return _bound_terms; @@ -267,12 +266,12 @@ private: public: void add_key_value(column_definition& def, ::shared_ptr value); - void precess_where_clause(std::vector where_clause, ::shared_ptr names); - std::vector build_partition_keys(const query_options& options); + void process_where_clause(std::vector where_clause, ::shared_ptr names); + std::vector build_partition_keys(const query_options& options); private: - api::clustering_prefix create_clustering_prefix(const query_options& options); - api::clustering_prefix create_clustering_prefix_internal(const query_options& options); + clustering_prefix create_clustering_prefix(const query_options& options); + clustering_prefix create_clustering_prefix_internal(const query_options& options); protected: const column_definition* get_first_empty_key(); @@ -286,8 +285,8 @@ public: protected: future read_required_rows( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, bool local, db::consistency_level cl); @@ -297,19 +296,19 @@ public: } virtual future> - execute(service::query_state& qs, const query_options& options) override; + execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) override; virtual future> - execute_internal(service::query_state& qs, const query_options& options) override { + execute_internal(database& db, service::query_state& qs, const query_options& options) override { throw std::runtime_error("not implemented"); } private: future<> - execute_without_condition(service::query_state& qs, const query_options& options); + execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options); future> - execute_with_condition(service::query_state& qs, const query_options& options); + execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options); #if 0 public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException @@ -436,12 +435,12 @@ private: * @return vector of the mutations * @throws invalid_request_exception on invalid requests */ - future> get_mutations(const query_options& options, bool local, int64_t now); + future> get_mutations(const query_options& options, bool local, int64_t now); public: future> make_update_parameters( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, const query_options& options, bool local, int64_t now); @@ -476,62 +475,8 @@ public: { } public: - std::unique_ptr prepare(database& db) override { - auto bound_names = get_bound_variables(); - auto statement = prepare(db, bound_names); - return std::make_unique(std::move(statement), *bound_names); - } - - ::shared_ptr prepare(database& db, ::shared_ptr bound_names) { - schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); - - auto prepared_attributes = _attrs->prepare(keyspace(), column_family()); - prepared_attributes->collect_marker_specification(bound_names); - - ::shared_ptr stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes)); - - if (_if_not_exists || _if_exists || !_conditions.empty()) { - if (stmt->is_counter()) { - throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables"); - } - if (_attrs->timestamp) { - throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates"); - } - - if (_if_not_exists) { - // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense. - // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes. - assert(_conditions.empty()); - assert(!_if_exists); - stmt->set_if_not_exist_condition(); - } else if (_if_exists) { - assert(_conditions.empty()); - assert(!_if_not_exists); - stmt->set_if_exist_condition(); - } else { - for (auto&& entry : _conditions) { - auto id = entry.first->prepare_column_identifier(schema); - column_definition* def = get_column_definition(schema, *id); - if (!def) { - throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id)); - } - - auto condition = entry.second->prepare(keyspace(), *def); - condition->collect_marker_specificaton(bound_names); - - switch (def->kind) { - case column_definition::PARTITION: - case column_definition::CLUSTERING: - throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id)); - default: - stmt->add_condition(condition); - } - } - } - stmt->validate_where_clause_for_conditions(); - } - return stmt; - }; + virtual std::unique_ptr prepare(database& db) override; + ::shared_ptr prepare(database& db, ::shared_ptr bound_names);; protected: virtual ::shared_ptr prepare_internal(schema_ptr schema, ::shared_ptr bound_names, std::unique_ptr attrs) = 0; diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh index b3a2ea5c3d..6e5bcbdb5a 100644 --- a/cql3/statements/schema_altering_statement.hh +++ b/cql3/statements/schema_altering_statement.hh @@ -82,7 +82,7 @@ protected: #endif virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing @@ -97,7 +97,7 @@ protected: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { throw std::runtime_error("unsupported operation"); #if 0 try diff --git a/cql3/statements/truncate_statement.hh b/cql3/statements/truncate_statement.hh index b0130929d9..f376e281b1 100644 --- a/cql3/statements/truncate_statement.hh +++ b/cql3/statements/truncate_statement.hh @@ -63,7 +63,7 @@ public: } virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 try @@ -87,7 +87,7 @@ public: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { throw std::runtime_error("unsupported operation"); } }; diff --git a/cql3/statements/update_statement.cc b/cql3/statements/update_statement.cc index bb7899c913..4501ebd348 100644 --- a/cql3/statements/update_statement.cc +++ b/cql3/statements/update_statement.cc @@ -31,7 +31,7 @@ namespace cql3 { namespace statements { -void update_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) { +void update_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) { if (s->is_dense()) { throw std::runtime_error("Dense tables not supported yet"); #if 0 @@ -136,6 +136,36 @@ update_statement::parsed_insert::prepare_internal(schema_ptr schema, return stmt; } +::shared_ptr +update_statement::parsed_update::prepare_internal(schema_ptr schema, + ::shared_ptr bound_names, std::unique_ptr attrs) +{ + auto stmt = ::make_shared(statement_type::UPDATE, bound_names->size(), schema, std::move(attrs)); + + for (auto&& entry : _updates) { + auto id = entry.first->prepare_column_identifier(schema); + auto def = get_column_definition(schema, *id); + if (!def) { + throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *entry.first)); + } + + auto operation = entry.second->prepare(keyspace(), *def); + operation->collect_marker_specification(bound_names); + + switch (def->kind) { + case column_definition::column_kind::PARTITION: + case column_definition::column_kind::CLUSTERING: + throw exceptions::invalid_request_exception(sprint("PRIMARY KEY part %s found in SET part", *entry.first)); + default: + stmt->add_operation(std::move(operation)); + break; + } + } + + stmt->process_where_clause(_where_clause, bound_names); + return stmt; +} + } } diff --git a/cql3/statements/update_statement.hh b/cql3/statements/update_statement.hh index 8dee40cffe..d2cad8c09d 100644 --- a/cql3/statements/update_statement.hh +++ b/cql3/statements/update_statement.hh @@ -29,7 +29,7 @@ #include "cql3/column_identifier.hh" #include "cql3/term.hh" -#include "db/api.hh" +#include "database.hh" #include #include "unimplemented.hh" @@ -74,7 +74,7 @@ private: return true; } - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override; class parsed_insert : public modification_statement::parsed { private: @@ -89,7 +89,7 @@ private: * @param columnValues list of column values (corresponds to names) * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ - parsed_insert(std::experimental::optional&& name, + parsed_insert(std::experimental::optional name, ::shared_ptr attrs, std::vector<::shared_ptr> column_names, std::vector<::shared_ptr> column_values, @@ -104,13 +104,12 @@ private: }; -#if 0 - public static class ParsedUpdate extends ModificationStatement.Parsed - { + class parsed_update : public modification_statement::parsed { + private: // Provided for an UPDATE - private final List> updates; - private final List whereClause; - + std::vector, ::shared_ptr>> _updates; + std::vector _where_clause; + public: /** * Creates a new UpdateStatement from a column family name, columns map, consistency * level, and key term. @@ -120,46 +119,19 @@ private: * @param updates a map of column operations to perform * @param whereClause the where clause */ - public ParsedUpdate(CFName name, - Attributes.Raw attrs, - List> updates, - List whereClause, - List> conditions) - { - super(name, attrs, conditions, false, false); - this.updates = updates; - this.whereClause = whereClause; - } - - protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException - { - UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, boundNames.size(), cfm, attrs); - - for (Pair entry : updates) - { - ColumnDefinition def = cfm.getColumnDefinition(entry.left.prepare(cfm)); - if (def == null) - throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left)); - - Operation operation = entry.right.prepare(keyspace(), def); - operation.collectMarkerSpecification(boundNames); - - switch (def.kind) - { - case PARTITION_KEY: - case CLUSTERING_COLUMN: - throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left)); - default: - stmt.addOperation(operation); - break; - } - } - - stmt.processWhereClause(whereClause, boundNames); - return stmt; - } - } -#endif + parsed_update(std::experimental::optional name, + ::shared_ptr attrs, + std::vector, ::shared_ptr>> updates, + std::vector where_clause, + std::vector, ::shared_ptr>> conditions) + : modification_statement::parsed(std::move(name), std::move(attrs), std::move(conditions), false, false) + , _updates(std::move(updates)) + , _where_clause(std::move(where_clause)) + { } + protected: + virtual ::shared_ptr prepare_internal(schema_ptr schema, + ::shared_ptr bound_names, std::unique_ptr attrs); + }; }; } diff --git a/cql3/statements/use_statement.hh b/cql3/statements/use_statement.hh index ae44205ccd..c641fabcdb 100644 --- a/cql3/statements/use_statement.hh +++ b/cql3/statements/use_statement.hh @@ -61,7 +61,7 @@ public: } virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 state.getClientState().setKeyspace(keyspace); @@ -70,7 +70,7 @@ public: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { // Internal queries are exclusively on the system keyspace and 'use' is thus useless throw std::runtime_error("unsupported operation"); } diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index 876f946f9b..609e27f7cb 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -25,7 +25,7 @@ #ifndef CQL3_UPDATE_PARAMETERS_HH #define CQL3_UPDATE_PARAMETERS_HH -#include "db/api.hh" +#include "database.hh" #include "exceptions/exceptions.hh" namespace cql3 { @@ -36,8 +36,7 @@ namespace cql3 { class update_parameters final { public: using prefetched_rows_type = std::experimental::optional< - std::unordered_map>; + std::unordered_map>; private: const gc_clock::duration _ttl; const prefetched_rows_type _prefetched; // For operation that require a read-before-write @@ -64,21 +63,20 @@ public: } } - api::atomic_cell make_dead_cell() const { - return {make_tombstone()}; + atomic_cell make_dead_cell() const { + return atomic_cell{_timestamp, atomic_cell::dead{_local_deletion_time}}; } - api::atomic_cell make_cell(bytes value) const { + atomic_cell make_cell(bytes value) const { auto ttl = _ttl; - if (!ttl.count()) { + if (ttl.count() <= 0) { ttl = _schema->default_time_to_live; } - return api::atomic_cell(api::live_atomic_cell(_timestamp, - ttl.count() ? api::ttl_opt{_local_deletion_time + ttl} : api::ttl_opt{}, - std::move(value))); - } + return atomic_cell{_timestamp, + atomic_cell::live{ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, std::move(value)}}; + }; #if 0 public Cell makeCounter(CellName name, long delta) throws InvalidRequestException @@ -88,7 +86,7 @@ public: } #endif - api::tombstone make_tombstone() const { + tombstone make_tombstone() const { return {_timestamp, _local_deletion_time}; } diff --git a/database.cc b/database.cc index 3a3c465b17..85185487d5 100644 --- a/database.cc +++ b/database.cc @@ -10,10 +10,6 @@ thread_local logging::logger dblog("database"); -partition::partition(column_family& cf) - : rows(key_compare(cf._schema->clustering_key_type)) { -} - template std::vector<::shared_ptr> get_column_types(const Sequence& column_definitions) { @@ -78,7 +74,7 @@ column_family::column_family(schema_ptr schema) , partitions(key_compare(_schema->thrift.partition_key_type)) { } -partition* +mutation_partition* column_family::find_partition(const bytes& key) { auto i = partitions.find(key); return i == partitions.end() ? nullptr : &i->second; @@ -86,33 +82,28 @@ column_family::find_partition(const bytes& key) { row* column_family::find_row(const bytes& partition_key, const bytes& clustering_key) { - partition* p = find_partition(partition_key); + mutation_partition* p = find_partition(partition_key); if (!p) { return nullptr; } - auto i = p->rows.find(clustering_key); - return i == p->rows.end() ? nullptr : &i->second; + return p->find_row(clustering_key); } -partition& +mutation_partition& column_family::find_or_create_partition(const bytes& key) { // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key); if (i == partitions.end() || key != i->first) { - i = partitions.emplace_hint(i, std::make_pair(std::move(key), partition(*this))); + i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); } return i->second; } row& column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { - partition& p = find_or_create_partition(partition_key); + mutation_partition& p = find_or_create_partition(partition_key); // call lower_bound so we have a hint for the insert, just in case. - auto i = p.rows.lower_bound(clustering_key); - if (i == p.rows.end() || clustering_key != i->first) { - i = p.rows.emplace_hint(i, std::make_pair(std::move(clustering_key), row())); - } - return i->second; + return p.clustered_row(clustering_key); } sstring to_hex(const bytes& b) { @@ -237,13 +228,22 @@ column_definition::name() const { return _name; } -schema_ptr -keyspace::find_schema(sstring cf_name) { +column_family* +keyspace::find_column_family(sstring cf_name) { auto i = column_families.find(cf_name); if (i == column_families.end()) { + return nullptr; + } + return &i->second; +} + +schema_ptr +keyspace::find_schema(sstring cf_name) { + auto cf = find_column_family(cf_name); + if (!cf) { return {}; } - return i->second._schema; + return cf->_schema; } keyspace* @@ -254,3 +254,82 @@ database::find_keyspace(sstring name) { } return nullptr; } + +void +column_family::apply(mutation&& m) { + mutation_partition& p = find_or_create_partition(std::move(m.key)); + p.apply(_schema, std::move(m.p)); +} + +// Based on org.apache.cassandra.db.AbstractCell#reconcile() +static inline +int +compare_for_merge(const column_definition& def, const atomic_cell& left, const atomic_cell& right) { + if (left.timestamp != right.timestamp) { + return left.timestamp > right.timestamp ? 1 : -1; + } + if (left.value.which() != right.value.which()) { + return left.is_live() ? -1 : 1; + } + if (left.is_live()) { + return def.type->compare(left.as_live().value, right.as_live().value); + } else { + auto& c1 = left.as_dead(); + auto& c2 = right.as_dead(); + if (c1.ttl != c2.ttl) { + // Origin compares big-endian serialized TTL + return (uint32_t)c1.ttl.time_since_epoch().count() < (uint32_t)c2.ttl.time_since_epoch().count() ? -1 : 1; + } + return 0; + } +} + +static inline +int +compare_for_merge(const column_definition& def, + const std::pair& left, + const std::pair& right) { + if (def.is_atomic()) { + return compare_for_merge(def, boost::any_cast(left.second), + boost::any_cast(right.second)); + } else { + throw std::runtime_error("not implemented"); + } +} + +void mutation_partition::apply(schema_ptr schema, mutation_partition&& p) { + _tombstone.apply(p._tombstone); + + for (auto&& entry : p._row_tombstones) { + apply_row_tombstone(schema, std::move(entry)); + } + + auto merge_cells = [this, schema] (row& old_row, row&& new_row) { + for (auto&& new_column : new_row) { + auto col = new_column.first; + auto i = old_row.find(col); + if (i == old_row.end()) { + _static_row.emplace_hint(i, std::move(new_column)); + } else { + auto& old_column = *i; + auto& def = schema->regular_column_at(col); + if (compare_for_merge(def, old_column, new_column) < 0) { + old_column.second = std::move(new_column.second); + } + } + } + }; + + merge_cells(_static_row, std::move(p._static_row)); + + for (auto&& entry : p._rows) { + auto& key = entry.first; + auto i = _rows.find(key); + if (i == _rows.end()) { + _rows.emplace_hint(i, std::move(entry)); + } else { + i->second.t.apply(entry.second.t); + merge_cells(i->second.cells, std::move(entry.second.cells)); + } + } +} diff --git a/database.hh b/database.hh index b1249889e7..197c52a19d 100644 --- a/database.hh +++ b/database.hh @@ -27,134 +27,253 @@ #include "tuple.hh" #include "core/future.hh" #include "cql3/column_specification.hh" +#include +#include +#include "schema.hh" -struct row; -struct paritition; -struct column_family; +using partition_key_type = tuple_type<>; +using clustering_key_type = tuple_type<>; +using clustering_prefix_type = tuple_prefix; +using partition_key = bytes; +using clustering_key = bytes; +using clustering_prefix = clustering_prefix_type::value_type; -struct row { - std::vector cells; +namespace api { + +using timestamp_type = int64_t; +timestamp_type constexpr missing_timestamp = std::numeric_limits::min(); +timestamp_type constexpr min_timestamp = std::numeric_limits::min() + 1; +timestamp_type constexpr max_timestamp = std::numeric_limits::max(); + +} + +/** +* Represents deletion operation. Can be commuted with other tombstones via apply() method. +* Can be empty. +* +*/ +struct tombstone final { + api::timestamp_type timestamp; + gc_clock::time_point ttl; + + tombstone(api::timestamp_type timestamp, gc_clock::time_point ttl) + : timestamp(timestamp) + , ttl(ttl) + { } + + tombstone() + : tombstone(api::missing_timestamp, {}) + { } + + int compare(const tombstone& t) const { + if (timestamp < t.timestamp) { + return -1; + } else if (timestamp > t.timestamp) { + return 1; + } else if (ttl < t.ttl) { + return -1; + } else if (ttl > t.ttl) { + return 1; + } else { + return 0; + } + } + + bool operator<(const tombstone& t) const { + return compare(t) < 0; + } + + bool operator<=(const tombstone& t) const { + return compare(t) <= 0; + } + + bool operator>(const tombstone& t) const { + return compare(t) > 0; + } + + bool operator>=(const tombstone& t) const { + return compare(t) >= 0; + } + + bool operator==(const tombstone& t) const { + return compare(t) == 0; + } + + bool operator!=(const tombstone& t) const { + return compare(t) != 0; + } + + explicit operator bool() const { + return timestamp != api::missing_timestamp; + } + + void apply(const tombstone& t) { + if (*this < t) { + *this = t; + } + } + + friend std::ostream& operator<<(std::ostream& out, const tombstone& t) { + return out << "{timestamp=" << t.timestamp << ", ttl=" << t.ttl.time_since_epoch().count() << "}"; + } }; -struct partition { - explicit partition(column_family& cf); - row static_columns; - // row key within partition -> row - std::map rows; -}; +using ttl_opt = std::experimental::optional; -using column_id = uint32_t; - -class column_definition final { -private: - bytes _name; -public: - enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC }; - column_definition(bytes name, data_type type, column_id id, column_kind kind); - data_type type; - column_id id; // unique within (kind, schema instance) - column_kind kind; - ::shared_ptr column_specification; - bool is_static() const { return kind == column_kind::STATIC; } - bool is_partition_key() const { return kind == column_kind::PARTITION; } - const sstring& name_as_text() const; - const bytes& name() const; -}; - -struct thrift_schema { - shared_ptr partition_key_type; -}; - -/* - * Keep this effectively immutable. - */ -class schema final { -private: - std::unordered_map _columns_by_name; - std::map _regular_columns_by_name; -public: - struct column { - bytes name; - data_type type; - struct name_compare { - shared_ptr type; - name_compare(shared_ptr type) : type(type) {} - bool operator()(const column& cd1, const column& cd2) const { - return type->less(cd1.name, cd2.name); - } - }; +struct atomic_cell final { + struct dead { + gc_clock::time_point ttl; }; + struct live { + ttl_opt ttl; + bytes value; + }; + api::timestamp_type timestamp; + boost::variant value; + bool is_live() const { return value.which() == 1; } + // Call only when is_live() == true + const live& as_live() const { return boost::get(value); } + // Call only when is_live() == false + const dead& as_dead() const { return boost::get(value); } +}; + +using row = std::map; + +struct deletable_row final { + tombstone t; + row cells; +}; + +using row_tombstone_set = std::map; + +class mutation_partition final { private: - void build_columns(std::vector columns, column_definition::column_kind kind, std::vector& dst); - ::shared_ptr make_column_specification(column_definition& def); + tombstone _tombstone; + row _static_row; + std::map _rows; + row_tombstone_set _row_tombstones; public: - gc_clock::duration default_time_to_live = gc_clock::duration::zero(); - const sstring ks_name; - const sstring cf_name; - std::vector partition_key; - std::vector clustering_key; - std::vector regular_columns; // sorted by name - shared_ptr> partition_key_type; - shared_ptr> clustering_key_type; - shared_ptr clustering_key_prefix_type; - data_type regular_column_name_type; - thrift_schema thrift; -public: - schema(sstring ks_name, sstring cf_name, - std::vector partition_key, - std::vector clustering_key, - std::vector regular_columns, - shared_ptr regular_column_name_type); - bool is_dense() const { - return false; + mutation_partition(schema_ptr s) + : _rows(key_compare(s->clustering_key_type)) + , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) + { } + + void apply(tombstone t) { + _tombstone.apply(t); } - bool is_counter() const { - return false; - } - column_definition* get_column_definition(const bytes& name); - auto regular_begin() { - return regular_columns.begin(); - } - auto regular_end() { - return regular_columns.end(); - } - auto regular_lower_bound(const bytes& name) { - // TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator - auto i = _regular_columns_by_name.lower_bound(name); - if (i == _regular_columns_by_name.end()) { - return regular_end(); + + void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) { + if (prefix.empty()) { + apply(t); + } else if (prefix.size() == schema->clustering_key.size()) { + _rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t); } else { - return regular_columns.begin() + i->second->id; + apply_row_tombstone(schema, {serialize_value(*schema->clustering_key_prefix_type, prefix), t}); } } - auto regular_upper_bound(const bytes& name) { - // TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator - auto i = _regular_columns_by_name.upper_bound(name); - if (i == _regular_columns_by_name.end()) { - return regular_end(); - } else { - return regular_columns.begin() + i->second->id; + + void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) { + apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); + } + + void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { + auto& prefix = row_tombstone.first; + auto i = _row_tombstones.lower_bound(prefix); + if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) { + _row_tombstones.emplace_hint(i, std::move(row_tombstone)); + } else if (row_tombstone.second > i->second) { + i->second = row_tombstone.second; } } - column_id get_regular_columns_count() { - return regular_columns.size(); + + void apply(schema_ptr schema, mutation_partition&& p); + + const row_tombstone_set& row_tombstones() { + return _row_tombstones; } - data_type column_name_type(column_definition& def) { - return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type; + + row& static_row() { + return _static_row; + } + + row& clustered_row(const clustering_key& key) { + return _rows[key].cells; + } + + row& clustered_row(clustering_key&& key) { + return _rows[std::move(key)].cells; + } + + row* find_row(const clustering_key& key) { + auto i = _rows.find(key); + if (i == _rows.end()) { + return nullptr; + } + return &i->second.cells; + } + + /** + * Returns the base tombstone for all cells of given clustering row. Such tombstone + * holds all information necessary to decide whether cells in a row are deleted or not, + * in addition to any information inside individual cells. + */ + tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key) { + tombstone t = _tombstone; + + auto i = _row_tombstones.lower_bound(key); + if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) { + t.apply(i->second); + } + + auto j = _rows.find(key); + if (j != _rows.end()) { + t.apply(j->second.t); + } + + return t; } }; -using schema_ptr = lw_shared_ptr; +class mutation final { +public: + schema_ptr schema; + partition_key key; + mutation_partition p; +public: + mutation(partition_key key_, schema_ptr schema_) + : schema(std::move(schema_)) + , key(std::move(key_)) + , p(schema) + { } + + mutation(mutation&&) = default; + mutation(const mutation&) = delete; + + void set_static_cell(const column_definition& def, boost::any value) { + p.static_row()[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); + row[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_key& key, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(key); + row[def.id] = std::move(value); + } +}; struct column_family { column_family(schema_ptr schema); - partition& find_or_create_partition(const bytes& key); + mutation_partition& find_or_create_partition(const bytes& key); row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key); - partition* find_partition(const bytes& key); + mutation_partition* find_partition(const bytes& key); row* find_row(const bytes& partition_key, const bytes& clustering_key); schema_ptr _schema; // partition key -> partition - std::map partitions; + std::map partitions; + void apply(mutation&& m); }; class keyspace { @@ -162,6 +281,7 @@ public: std::unordered_map column_families; static future populate(sstring datadir); schema_ptr find_schema(sstring cf_name); + column_family* find_column_family(sstring cf_name); }; class database { diff --git a/db/api.hh b/db/api.hh deleted file mode 100644 index 717de7d794..0000000000 --- a/db/api.hh +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright 2015 Cloudius Systems - */ - -#pragma once - -#include -#include -#include - -#include "db_clock.hh" -#include "gc_clock.hh" -#include "database.hh" -#include "db/consistency_level.hh" - -namespace api { - -using partition_key_type = tuple_type<>; -using clustering_key_type = tuple_type<>; -using clustering_prefix_type = tuple_prefix; -using partition_key = bytes; -using clustering_key = bytes; -using clustering_prefix = clustering_prefix_type::value_type; - -using timestamp_type = int64_t; -timestamp_type constexpr missing_timestamp = std::numeric_limits::min(); -timestamp_type constexpr min_timestamp = std::numeric_limits::min() + 1; -timestamp_type constexpr max_timestamp = std::numeric_limits::max(); - -/** - * Represents deletion operation. Can be commuted with other tombstones via apply() method. - * Can be empty. - * - */ -struct tombstone final { - timestamp_type timestamp; - gc_clock::time_point ttl; - - tombstone(timestamp_type timestamp, gc_clock::time_point ttl) - : timestamp(timestamp) - , ttl(ttl) - { } - - tombstone() - : tombstone(missing_timestamp, {}) - { } - - bool operator<(const tombstone& t) const { - return timestamp < t.timestamp || ttl < t.ttl; - } - - bool operator==(const tombstone& t) const { - return timestamp == t.timestamp && ttl == t.ttl; - } - - operator bool() const { - return timestamp != missing_timestamp; - } - - void apply(const tombstone& t) { - if (*this < t) { - *this = t; - } - } -}; - -using ttl_opt = std::experimental::optional; - -class live_atomic_cell final { -private: - timestamp_type _timestamp; - ttl_opt _ttl; - bytes _value; -public: - live_atomic_cell(timestamp_type timestamp, ttl_opt ttl, bytes value) - : _timestamp(timestamp) - , _ttl(ttl) - , _value(value) { - } -}; - -using atomic_cell = boost::variant; - -using row = std::map; - -struct deletable_row final { - tombstone t; - row cells; -}; - -struct row_tombstone final { - tombstone t; - - /* - * Prefix can be shorter than the clustering key size, in which case it - * means that all rows whose keys have that prefix are removed. - * - * Empty prefix removes all rows, which is equivalent to removing the whole partition. - */ - bytes prefix; -}; - -struct row_tombstone_compare final { -private: - data_type _type; -public: - row_tombstone_compare(data_type type) : _type(type) {} - - bool operator()(const row_tombstone& t1, const row_tombstone& t2) { - return _type->less(t1.prefix, t2.prefix); - } -}; - -class partition final { -private: - schema_ptr _schema; - tombstone _tombstone; - - row _static_row; - std::map _rows; - std::set _row_tombstones; -public: - partition(schema_ptr s) - : _schema(std::move(s)) - , _rows(key_compare(_schema->clustering_key_type)) - , _row_tombstones(row_tombstone_compare(_schema->clustering_key_prefix_type)) - { } - - void apply(tombstone t) { - _tombstone.apply(t); - } - - void apply_delete(const clustering_prefix& prefix, tombstone t) { - if (prefix.empty()) { - apply(t); - } else if (prefix.size() == _schema->clustering_key.size()) { - _rows[serialize_value(*_schema->clustering_key_type, prefix)].t.apply(t); - } else { - apply(row_tombstone{t, serialize_value(*_schema->clustering_key_prefix_type, prefix)}); - } - } - - void apply(const row_tombstone& rt) { - auto i = _row_tombstones.lower_bound(rt); - if (i == _row_tombstones.end() || !_schema->clustering_key_prefix_type->equal(rt.prefix, i->prefix) || rt.t > i->t) { - _row_tombstones.insert(i, rt); - } - } - - row& static_row() { - return _static_row; - } - - row& clustered_row(const clustering_key& key) { - return _rows[key].cells; - } - - row& get_row(const clustering_prefix& prefix) { - if (prefix.empty()) { - return static_row(); - } - return clustered_row(serialize_value(*_schema->clustering_key_type, prefix)); - } -}; - -class mutation final { -public: - schema_ptr schema; - partition_key key; - partition p; -public: - mutation(partition_key key_, schema_ptr schema_) - : schema(std::move(schema_)) - , key(std::move(key_)) - , p(schema) - { } - - mutation(mutation&&) = default; - mutation(const mutation&) = delete; - - void set_cell(const clustering_prefix& prefix, column_id col, boost::any value) { - p.get_row(prefix)[col] = value; - } -}; - -} diff --git a/schema.hh b/schema.hh new file mode 100644 index 0000000000..1d949212da --- /dev/null +++ b/schema.hh @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include + +#include "cql3/column_specification.hh" +#include "core/shared_ptr.hh" +#include "types.hh" +#include "tuple.hh" +#include "gc_clock.hh" + +using column_id = uint32_t; + +class column_definition final { +private: + bytes _name; +public: + enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC }; + column_definition(bytes name, data_type type, column_id id, column_kind kind); + data_type type; + column_id id; // unique within (kind, schema instance) + column_kind kind; + ::shared_ptr column_specification; + bool is_static() const { return kind == column_kind::STATIC; } + bool is_partition_key() const { return kind == column_kind::PARTITION; } + bool is_atomic() const { return !type->is_multi_cell(); } + const sstring& name_as_text() const; + const bytes& name() const; +}; + +struct thrift_schema { + shared_ptr partition_key_type; +}; + +/* + * Keep this effectively immutable. + */ +class schema final { +private: + std::unordered_map _columns_by_name; + std::map _regular_columns_by_name; +public: + struct column { + bytes name; + data_type type; + struct name_compare { + shared_ptr type; + name_compare(shared_ptr type) : type(type) {} + bool operator()(const column& cd1, const column& cd2) const { + return type->less(cd1.name, cd2.name); + } + }; + }; +private: + void build_columns(std::vector columns, column_definition::column_kind kind, std::vector& dst); + ::shared_ptr make_column_specification(column_definition& def); +public: + gc_clock::duration default_time_to_live = gc_clock::duration::zero(); + const sstring ks_name; + const sstring cf_name; + std::vector partition_key; + std::vector clustering_key; + std::vector regular_columns; // sorted by name + shared_ptr> partition_key_type; + shared_ptr> clustering_key_type; + shared_ptr clustering_key_prefix_type; + data_type regular_column_name_type; + thrift_schema thrift; +public: + schema(sstring ks_name, sstring cf_name, + std::vector partition_key, + std::vector clustering_key, + std::vector regular_columns, + shared_ptr regular_column_name_type); + bool is_dense() const { + return false; + } + bool is_counter() const { + return false; + } + column_definition* get_column_definition(const bytes& name); + auto regular_begin() { + return regular_columns.begin(); + } + auto regular_end() { + return regular_columns.end(); + } + auto regular_lower_bound(const bytes& name) { + // TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator + auto i = _regular_columns_by_name.lower_bound(name); + if (i == _regular_columns_by_name.end()) { + return regular_end(); + } else { + return regular_columns.begin() + i->second->id; + } + } + auto regular_upper_bound(const bytes& name) { + // TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator + auto i = _regular_columns_by_name.upper_bound(name); + if (i == _regular_columns_by_name.end()) { + return regular_end(); + } else { + return regular_columns.begin() + i->second->id; + } + } + column_id get_regular_columns_count() { + return regular_columns.size(); + } + data_type column_name_type(column_definition& def) { + return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type; + } + column_definition& regular_column_at(column_id id) { + return regular_columns[id]; + } +}; + +using schema_ptr = lw_shared_ptr; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 92d220187c..55c442e915 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -475,8 +475,20 @@ namespace service { * @param consistency_level the consistency level for the operation */ future<> -storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { - throw std::runtime_error("NOT IMPLEMENTED"); +storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { + // FIXME: send it to replicas instead of applying locally + for (auto&& m : mutations) { + // FIXME: lookup column_family by UUID + keyspace* ks = _db.find_keyspace(m.schema->ks_name); + assert(ks); // FIXME: load keyspace meta-data from storage + column_family* cf = ks->find_column_family(m.schema->cf_name); + if (cf) { + cf->apply(std::move(m)); + } else { + // TODO: log a warning + } + } + return make_ready_future<>(); #if 0 Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -559,7 +571,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_leve } future<> -storage_proxy::mutate_with_triggers(std::vector mutations, db::consistency_level cl, +storage_proxy::mutate_with_triggers(std::vector mutations, db::consistency_level cl, bool should_mutate_atomically) { unimplemented::triggers(); #if 0 @@ -587,7 +599,7 @@ storage_proxy::mutate_with_triggers(std::vector mutations, db::co * @param consistency_level the consistency level for the operation */ future<> -storage_proxy::mutate_atomically(std::vector mutations, db::consistency_level cl) { +storage_proxy::mutate_atomically(std::vector mutations, db::consistency_level cl) { unimplemented::lwt(); #if 0 Tracing.trace("Determining replicas for atomic batch"); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b04255e8bb..5b9f2e3eb2 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -24,12 +24,17 @@ #pragma once -#include "db/api.hh" +#include "database.hh" +#include "db/consistency_level.hh" namespace service { class storage_proxy /*implements StorageProxyMBean*/ { +private: + database& _db; public: + storage_proxy(database& db) : _db(db) {} + /** * Use this method to have these Mutations applied * across all replicas. This method will take care @@ -39,10 +44,10 @@ public: * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate(std::vector mutations, db::consistency_level cl); + future<> mutate(std::vector mutations, db::consistency_level cl); - static future<> mutate_with_triggers(std::vector mutations, - db::consistency_level cl, bool should_mutate_atomically); + future<> mutate_with_triggers(std::vector mutations, db::consistency_level cl, + bool should_mutate_atomically); /** * See mutate. Adds additional steps before and after writing a batch. @@ -53,7 +58,7 @@ public: * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate_atomically(std::vector mutations, db::consistency_level cl); + future<> mutate_atomically(std::vector mutations, db::consistency_level cl); }; } diff --git a/test.py b/test.py index 118793504a..7cdd4f93fc 100755 --- a/test.py +++ b/test.py @@ -10,6 +10,8 @@ all_tests = [ 'memcached/test_ascii_parser', 'sstring_test', 'output_stream_test', + 'urchin/types_test', + 'urchin/mutation_test', ] last_len = 0 diff --git a/tests/perf/perf.hh b/tests/perf/perf.hh new file mode 100644 index 0000000000..ecb57a31c5 --- /dev/null +++ b/tests/perf/perf.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include +#include + +template +void time_it(Func func, int iterations = 5) { + using clk = std::chrono::high_resolution_clock; + + for (int i = 0; i < iterations; i++) { + auto start = clk::now(); + auto end_at = start + std::chrono::seconds(1); + uint64_t count = 0; + + while (clk::now() < end_at) { + for (int i = 0; i < 10000; i++) { // amortize clock reading cost + func(); + count++; + } + } + + auto end = clk::now(); + auto duration = std::chrono::duration(end - start).count(); + std::cout << sprint("%.2f", (double)count / duration) << " tps\n"; + } +} diff --git a/tests/perf/perf_mutation.cc b/tests/perf/perf_mutation.cc new file mode 100644 index 0000000000..af59a3434a --- /dev/null +++ b/tests/perf/perf_mutation.cc @@ -0,0 +1,30 @@ +#include "database.hh" +#include "perf.hh" + +static boost::any make_atomic_cell(bytes value) { + return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}}; +}; + +int main(int argc, char* argv[]) { + auto s = make_lw_shared("ks", "cf", + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + std::cout << "Timing mutation of single column within one row...\n"; + + partition_key key = to_bytes("key1"); + clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + bytes value = int32_type->decompose(3); + + time_it([&] { + mutation m(key, s); + column_definition& col = *s->get_column_definition("r1"); + m.set_clustered_cell(c_key, col, make_atomic_cell(value)); + cf.apply(std::move(m)); + }); +} diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc new file mode 100644 index 0000000000..417397ffe4 --- /dev/null +++ b/tests/urchin/mutation_test.cc @@ -0,0 +1,75 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "core/sstring.hh" +#include "database.hh" + +static sstring some_keyspace("ks"); +static sstring some_column_family("cf"); + +static boost::any make_atomic_cell(bytes value) { + return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}}; +}; + +BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { + auto s = make_lw_shared(some_keyspace, some_column_family, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + column_definition& r1_col = *s->get_column_definition("r1"); + partition_key key = to_bytes("key1"); + clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3))); + cf.apply(std::move(m)); + + row& row = cf.find_or_create_row(key, c_key); + auto& cell = boost::any_cast(row[r1_col.id]); + BOOST_REQUIRE(cell.is_live()); + BOOST_REQUIRE(int32_type->equal(cell.as_live().value, int32_type->decompose(3))); +} + +BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { + auto s = make_lw_shared(some_keyspace, some_column_family, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + column_definition& r1_col = *s->get_column_definition("r1"); + partition_key key = to_bytes("key1"); + + clustering_key c_key1 = s->clustering_key_type->decompose_value( + {int32_type->decompose(1)} + ); + + clustering_key c_key2 = s->clustering_key_type->decompose_value( + {int32_type->decompose(2)} + ); + + auto ttl = gc_clock::now() + std::chrono::seconds(1); + + mutation m(key, s); + m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl)); + + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl)); + + m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); +} diff --git a/tests/urchin/types_test.cc b/tests/urchin/types_test.cc new file mode 100644 index 0000000000..de0c63f398 --- /dev/null +++ b/tests/urchin/types_test.cc @@ -0,0 +1,73 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "types.hh" +#include "tuple.hh" + +BOOST_AUTO_TEST_CASE(test_int32_type_string_conversions) { + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("1234567890"), int32_type->decompose(1234567890))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(1234567890)), "1234567890"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("12"), int32_type->decompose(12))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0012"), int32_type->decompose(12))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+12"), int32_type->decompose(12))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(12)), "12"); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-12"), int32_type->decompose(-12))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(-12)), "-12"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0"), int32_type->decompose(0))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-0"), int32_type->decompose(0))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+0"), int32_type->decompose(0))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(0)), "0"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-2147483648"), int32_type->decompose((int32_t)-2147483648))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483648)), "-2147483648"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("2147483647"), int32_type->decompose((int32_t)2147483647))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483647)), "-2147483647"); + + auto test_parsing_fails = [] (sstring text) { + try { + int32_type->from_string(text); + BOOST_FAIL(sprint("Parsing of '%s' should have failed", text)); + } catch (const marshal_exception& e) { + // expected + } + }; + + test_parsing_fails("asd"); + test_parsing_fails("-2147483649"); + test_parsing_fails("2147483648"); + test_parsing_fails("2147483648123"); + + BOOST_REQUIRE_EQUAL(int32_type->to_string(bytes()), ""); +} + +BOOST_AUTO_TEST_CASE(test_tuple_is_prefix_of) { + tuple_type<> type({utf8_type, utf8_type, utf8_type}); + auto prefix_type = type.as_prefix(); + + auto val = type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}}); + + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}}), val)); + + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes()}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("b")}, {bytes("c")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("c")}, {bytes("b")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("abc")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("ab")}}), val)); + + auto val2 = type.serialize_value({{bytes("a")}, {bytes("b")}, {}}); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val2)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {}}), val2)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes()}}), val2)); +} diff --git a/thrift/handler.cc b/thrift/handler.cc index 1dc32c17bc..2d650c54c5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -91,12 +91,19 @@ public: // FIXME: force limit count? while (beg != end && count--) { column_definition& def = range.reversed ? *--end : *beg++; - Column col; - col.__set_name(def.name()); - col.__set_value(rw->cells[def.id]); - ColumnOrSuperColumn v; - v.__set_column(std::move(col)); - ret.push_back(std::move(v)); + if (def.is_atomic()) { + const auto& cell = boost::any_cast((*rw)[def.id]); + if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels + Column col; + col.__set_name(def.name()); + col.__set_value(cell.as_live().value); + col.__set_timestamp(cell.timestamp); + // FIXME: set ttl + ColumnOrSuperColumn v; + v.__set_column(std::move(col)); + ret.push_back(std::move(v)); + }; + } } } } else { @@ -184,21 +191,30 @@ public: sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; auto& cf = lookup_column_family(cf_name); - auto& row = cf.find_or_create_row(key, null_clustering_key); + mutation m_to_apply(key, cf._schema); for (const Mutation& m : mutations) { if (m.__isset.column_or_supercolumn) { auto&& cosc = m.column_or_supercolumn; if (cosc.__isset.column) { auto&& col = cosc.column; bytes cname = to_bytes(col.name); - // FIXME: use a lookup map auto def = cf._schema->get_column_definition(cname); if (!def) { throw make_exception("column %s not found", col.name); } - auto& cells = row.cells; - cells.resize(cf._schema->get_regular_columns_count()); - cells[def->id] = to_bytes(col.value); + if (def->kind != column_definition::column_kind::REGULAR) { + throw make_exception("Column %s is not settable", col.name); + } + gc_clock::duration ttl; + if (col.__isset.ttl) { + ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); + } + if (ttl.count() <= 0) { + ttl = cf._schema->default_time_to_live; + } + auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); + m_to_apply.set_clustered_cell(null_clustering_key, *def, + atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); } else if (cosc.__isset.super_column) { // FIXME: implement } else if (cosc.__isset.counter_column) { @@ -215,6 +231,7 @@ public: throw make_exception("Mutation must have either column or deletion"); } } + cf.apply(std::move(m_to_apply)); } } } catch (std::exception& ex) { diff --git a/tuple.hh b/tuple.hh index 2ebbe9c39c..33e4c7444f 100644 --- a/tuple.hh +++ b/tuple.hh @@ -18,6 +18,7 @@ private: const std::vector> types; const bool _byte_order_equal; public: + using prefix_type = tuple_type; using value_type = std::vector; tuple_type(std::vector> types) @@ -27,6 +28,11 @@ public: return t->is_byte_order_equal(); })) { } + + prefix_type as_prefix() { + return prefix_type(types); + } + /* * Format: * ... @@ -50,6 +56,12 @@ public: } } } + bytes serialize_value(const value_type& values) { + return ::serialize_value(*this, values); + } + bytes decompose_value(const value_type& values) { + return ::serialize_value(*this, values); + } value_type deserialize_value(std::istream& in) { std::vector result; result.reserve(types.size()); @@ -150,6 +162,58 @@ public: // TODO: make the length byte-order comparable by adding numeric_limits::min() when serializing return false; } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } + /** + * Returns true iff all components of 'prefix' are equal to corresponding + * leading components of 'value'. + * + * The 'value' is assumed to be serialized using tuple_type + */ + bool is_prefix_of(const bytes& prefix, const bytes& value) const { + assert(AllowPrefixes); + + if (prefix.size() > value.size()) { + return false; + } + + bytes_view i1(prefix); + bytes_view i2(value); + + for (auto&& type : types) { + if (i1.empty()) { + return true; + } + if (i2.empty()) { + return false; + } + assert(i1.size() >= sizeof(int32_t)); + assert(i2.size() >= sizeof(int32_t)); + auto len1 = (int32_t) net::ntoh(*reinterpret_cast(i1.begin())); + auto len2 = (int32_t) net::ntoh(*reinterpret_cast(i2.begin())); + i1.remove_prefix(sizeof(int32_t)); + i2.remove_prefix(sizeof(int32_t)); + if ((len1 < 0) != (len2 < 0)) { + // one is empty and another one is not + return false; + } + if (len1 >= 0) { + // both are not empty + // TODO: make equal() accept bytes_view + if (!type->equal(bytes(i1.begin(), i1.begin() + len1), + bytes(i2.begin(), i2.begin() + len2))) { + return false; + } + i1.remove_prefix((uint32_t) len1); + i2.remove_prefix((uint32_t) len2); + } + } + return true; + } }; using tuple_prefix = tuple_type; diff --git a/types.cc b/types.cc index 53f77e07ff..82ad5fec09 100644 --- a/types.cc +++ b/types.cc @@ -2,6 +2,7 @@ * Copyright (C) 2015 Cloudius Systems, Ltd. */ +#include #include "types.hh" template @@ -54,6 +55,33 @@ struct int32_type_impl : simple_type_impl { auto v = int32_t(net::ntoh(u)); return boost::any(v); } + int32_t compose_value(const bytes& b) { + if (b.size() != 4) { + throw marshal_exception(); + } + return (int32_t)net::ntoh(*reinterpret_cast(b.begin())); + } + bytes decompose_value(int32_t v) { + bytes b(bytes::initialized_later(), sizeof(v)); + *reinterpret_cast(b.begin()) = (int32_t)net::hton((uint32_t)v); + return b; + } + int32_t parse_int(const sstring& s) { + try { + return boost::lexical_cast(s); + } catch (const boost::bad_lexical_cast& e) { + throw marshal_exception(sprint("Invalid number format '%s'", s)); + } + } + virtual bytes from_string(const sstring& s) override { + return decompose_value(parse_int(s)); + } + virtual sstring to_string(const bytes& b) override { + if (b.empty()) { + return {}; + } + return to_sstring(compose_value(b)); + } }; struct long_type_impl : simple_type_impl { @@ -75,6 +103,12 @@ struct long_type_impl : simple_type_impl { auto v = int64_t(net::ntoh(u)); return boost::any(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct string_type_impl : public abstract_type { @@ -101,6 +135,12 @@ struct string_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + return to_bytes(s); + } + virtual sstring to_string(const bytes& b) override { + return sstring(b); + } }; struct bytes_type_impl : public abstract_type { @@ -126,6 +166,12 @@ struct bytes_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct boolean_type_impl : public simple_type_impl { @@ -143,6 +189,12 @@ struct boolean_type_impl : public simple_type_impl { } return boost::any(tmp != 0); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct date_type_impl : public abstract_type { @@ -174,6 +226,12 @@ struct date_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct timeuuid_type_impl : public abstract_type { @@ -214,6 +272,12 @@ struct timeuuid_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } private: static int compare_bytes(const bytes& o1, const bytes& o2) { auto compare_pos = [&] (unsigned pos, int mask, int ifequal) { @@ -251,6 +315,12 @@ struct timestamp_type_impl : simple_type_impl { return boost::any(db_clock::time_point(db_clock::duration(net::ntoh(v)))); } // FIXME: isCompatibleWith(timestampuuid) + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct uuid_type_impl : abstract_type { @@ -302,6 +372,12 @@ struct uuid_type_impl : abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; thread_local shared_ptr int32_type(make_shared()); diff --git a/types.hh b/types.hh index 0303687ec7..839b2f0233 100644 --- a/types.hh +++ b/types.hh @@ -18,7 +18,7 @@ // FIXME: should be int8_t using bytes = basic_sstring; - +using bytes_view = std::experimental::string_view; using bytes_opt = std::experimental::optional; sstring to_hex(const bytes& b); @@ -112,11 +112,11 @@ public: validate(b); return to_string(b); } - virtual sstring to_string(const bytes& b) { - throw std::runtime_error("not implemented"); - } + virtual sstring to_string(const bytes& b) = 0; + virtual bytes from_string(const sstring& text) = 0; virtual bool is_counter() { return false; } virtual bool is_collection() { return false; } + virtual bool is_multi_cell() { return false; } protected: template > bool default_less(const bytes& b1, const bytes& b2, Compare compare = Compare()); diff --git a/validation.cc b/validation.cc index e7576ced1f..7963b3c2e6 100644 --- a/validation.cc +++ b/validation.cc @@ -23,6 +23,7 @@ */ #include "validation.hh" +#include "exceptions/exceptions.hh" namespace validation { @@ -30,7 +31,7 @@ namespace validation { * Based on org.apache.cassandra.thrift.ThriftValidation#validate_key() */ void -validate_cql_key(schema_ptr schema, const api::partition_key& key) { +validate_cql_key(schema_ptr schema, const partition_key& key) { if (key.empty()) { throw exceptions::invalid_request_exception("Key may not be empty"); } diff --git a/validation.hh b/validation.hh index 9d625efada..f19e7723f0 100644 --- a/validation.hh +++ b/validation.hh @@ -25,13 +25,13 @@ #pragma once #include "database.hh" -#include "db/api.hh" +#include "database.hh" namespace validation { constexpr size_t max_key_size = std::numeric_limits::max(); -void validate_cql_key(schema_ptr schema, const api::partition_key& key); +void validate_cql_key(schema_ptr schema, const partition_key& key); schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name); }