From 852afdfb3c6a2b6ab37fbe12ebdcf38914386565 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:25:42 +0100 Subject: [PATCH 01/36] validation: Add missing include --- validation.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/validation.cc b/validation.cc index e7576ced1f..38de99703b 100644 --- a/validation.cc +++ b/validation.cc @@ -23,6 +23,7 @@ */ #include "validation.hh" +#include "exceptions/exceptions.hh" namespace validation { From 208fdfab458d66606f8e52d231361255a95006c8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 12:32:09 +0100 Subject: [PATCH 02/36] cql3: Move methods from header to source file --- cql3/statements/modification_statement.cc | 59 +++++++++++++++++++++++ cql3/statements/modification_statement.hh | 58 +--------------------- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 25a7607fd9..620f75e38c 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -387,6 +387,65 @@ modification_statement::precess_where_clause(std::vector where_cla } } +std::unique_ptr +modification_statement::parsed::prepare(database& db) { + auto bound_names = get_bound_variables(); + auto statement = prepare(db, bound_names); + return std::make_unique(std::move(statement), *bound_names); +} + +::shared_ptr +modification_statement::parsed::prepare(database& db, ::shared_ptr bound_names) { + schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); + + auto prepared_attributes = _attrs->prepare(keyspace(), column_family()); + prepared_attributes->collect_marker_specification(bound_names); + + ::shared_ptr stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes)); + + if (_if_not_exists || _if_exists || !_conditions.empty()) { + if (stmt->is_counter()) { + throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables"); + } + if (_attrs->timestamp) { + throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates"); + } + + if (_if_not_exists) { + // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense. + // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes. + assert(_conditions.empty()); + assert(!_if_exists); + stmt->set_if_not_exist_condition(); + } else if (_if_exists) { + assert(_conditions.empty()); + assert(!_if_not_exists); + stmt->set_if_exist_condition(); + } else { + for (auto&& entry : _conditions) { + auto id = entry.first->prepare_column_identifier(schema); + column_definition* def = get_column_definition(schema, *id); + if (!def) { + throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id)); + } + + auto condition = entry.second->prepare(keyspace(), *def); + condition->collect_marker_specificaton(bound_names); + + switch (def->kind) { + case column_definition::PARTITION: + case column_definition::CLUSTERING: + throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id)); + default: + stmt->add_condition(condition); + } + } + } + stmt->validate_where_clause_for_conditions(); + } + return stmt; +} + } } diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index d24ae96111..eeb5a6b54b 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -476,62 +476,8 @@ public: { } public: - std::unique_ptr prepare(database& db) override { - auto bound_names = get_bound_variables(); - auto statement = prepare(db, bound_names); - return std::make_unique(std::move(statement), *bound_names); - } - - ::shared_ptr prepare(database& db, ::shared_ptr bound_names) { - schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); - - auto prepared_attributes = _attrs->prepare(keyspace(), column_family()); - prepared_attributes->collect_marker_specification(bound_names); - - ::shared_ptr stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes)); - - if (_if_not_exists || _if_exists || !_conditions.empty()) { - if (stmt->is_counter()) { - throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables"); - } - if (_attrs->timestamp) { - throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates"); - } - - if (_if_not_exists) { - // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense. - // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes. - assert(_conditions.empty()); - assert(!_if_exists); - stmt->set_if_not_exist_condition(); - } else if (_if_exists) { - assert(_conditions.empty()); - assert(!_if_not_exists); - stmt->set_if_exist_condition(); - } else { - for (auto&& entry : _conditions) { - auto id = entry.first->prepare_column_identifier(schema); - column_definition* def = get_column_definition(schema, *id); - if (!def) { - throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id)); - } - - auto condition = entry.second->prepare(keyspace(), *def); - condition->collect_marker_specificaton(bound_names); - - switch (def->kind) { - case column_definition::PARTITION: - case column_definition::CLUSTERING: - throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id)); - default: - stmt->add_condition(condition); - } - } - } - stmt->validate_where_clause_for_conditions(); - } - return stmt; - }; + std::unique_ptr prepare(database& db); + ::shared_ptr prepare(database& db, ::shared_ptr bound_names);; protected: virtual ::shared_ptr prepare_internal(schema_ptr schema, ::shared_ptr bound_names, std::unique_ptr attrs) = 0; From 12af20721922ba36b3b0096bce851ab611dc014c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 12:33:24 +0100 Subject: [PATCH 03/36] cql3: Add missing virtual/override --- cql3/statements/modification_statement.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index eeb5a6b54b..d8ea40be99 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -476,7 +476,7 @@ public: { } public: - std::unique_ptr prepare(database& db); + virtual std::unique_ptr prepare(database& db) override; ::shared_ptr prepare(database& db, ::shared_ptr bound_names);; protected: virtual ::shared_ptr prepare_internal(schema_ptr schema, From 987cdaa6cb61103eaeb4e58df48bef7dd9a720be Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 13:12:54 +0100 Subject: [PATCH 04/36] cql3: Optimise includes --- cql3/statements/modification_statement.hh | 1 - 1 file changed, 1 deletion(-) diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index d8ea40be99..936e9a06c4 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -35,7 +35,6 @@ #include "cql3/operation.hh" #include "cql3/relation.hh" -#include "db/column_family.hh" #include "db/consistency_level.hh" #include "core/shared_ptr.hh" From 73f1c6cb445513a0d0c21d7d72ed246572580e6a Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 13:13:33 +0100 Subject: [PATCH 05/36] cql3: Make name accessors return sstring ref instead of value To avoid unnecessary copies. --- cql3/cf_name.hh | 4 ++-- cql3/statements/alter_keyspace_statement.hh | 2 +- cql3/statements/cf_statement.hh | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cql3/cf_name.hh b/cql3/cf_name.hh index 0cb8227406..6b15e31074 100644 --- a/cql3/cf_name.hh +++ b/cql3/cf_name.hh @@ -52,11 +52,11 @@ public: return bool(_ks_name); } - sstring get_keyspace() const { + const sstring& get_keyspace() const { return *_ks_name; } - sstring get_column_family() const { + const sstring& get_column_family() const { return _cf_name; } diff --git a/cql3/statements/alter_keyspace_statement.hh b/cql3/statements/alter_keyspace_statement.hh index 1a28ddb71c..182618f0ee 100644 --- a/cql3/statements/alter_keyspace_statement.hh +++ b/cql3/statements/alter_keyspace_statement.hh @@ -60,7 +60,7 @@ public: , _attrs{std::move(attrs)} { } - virtual sstring keyspace() const override { + virtual const sstring& keyspace() const override { return _name; } diff --git a/cql3/statements/cf_statement.hh b/cql3/statements/cf_statement.hh index 93836ef6f7..20321fc891 100644 --- a/cql3/statements/cf_statement.hh +++ b/cql3/statements/cf_statement.hh @@ -62,12 +62,12 @@ public: } } - virtual sstring keyspace() const { + virtual const sstring& keyspace() const { assert(_cf_name->has_keyspace()); // "The statement hasn't be prepared correctly"; return _cf_name->get_keyspace(); } - virtual sstring column_family() const { + virtual const sstring& column_family() const { return _cf_name->get_column_family(); } }; From ef7ce1406f9dd73b87610a771d4da0ead8998653 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 14:16:04 +0100 Subject: [PATCH 06/36] cql3: Make column_identifier ostream-printable --- configure.py | 1 + cql3/column_identifier.cc | 13 +++++++++++++ cql3/column_identifier.hh | 4 +++- 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 cql3/column_identifier.cc diff --git a/configure.py b/configure.py index e20f365219..e6873d1fd3 100755 --- a/configure.py +++ b/configure.py @@ -244,6 +244,7 @@ deps = { 'service/storage_proxy.cc', 'cql3/operator.cc', 'cql3/relation.cc', + 'cql3/column_identifier.cc', 'db/db.cc', 'io/io.cc', 'utils/utils.cc', diff --git a/cql3/column_identifier.cc b/cql3/column_identifier.cc new file mode 100644 index 0000000000..0e36985f51 --- /dev/null +++ b/cql3/column_identifier.cc @@ -0,0 +1,13 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "cql3/column_identifier.hh" + +namespace cql3 { + +std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) { + return out << id._text; +} + +} diff --git a/cql3/column_identifier.hh b/cql3/column_identifier.hh index 28b643d466..4f50182330 100644 --- a/cql3/column_identifier.hh +++ b/cql3/column_identifier.hh @@ -31,6 +31,7 @@ #include #include +#include namespace cql3 { @@ -137,7 +138,7 @@ public: * once the comparator is known with prepare(). This should only be used with identifiers that are actual * column names. See CASSANDRA-8178 for more background. */ - class raw : public selectable::raw { + class raw final : public selectable::raw { private: const sstring _raw_text; sstring _text; @@ -195,6 +196,7 @@ public: } friend std::hash; + friend std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id); }; }; From 87597ba3a44e7ee3f55714be27e66b48347899e0 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 14:17:18 +0100 Subject: [PATCH 07/36] cql3: Fix typo in process_where_clause() --- cql3/statements/modification_statement.cc | 2 +- cql3/statements/modification_statement.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 620f75e38c..178ea865a9 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -356,7 +356,7 @@ modification_statement::add_key_value(column_definition& def, ::shared_ptr } void -modification_statement::precess_where_clause(std::vector where_clause, ::shared_ptr names) { +modification_statement::process_where_clause(std::vector where_clause, ::shared_ptr names) { for (auto&& relation : where_clause) { if (relation->is_multi_column()) { throw exceptions::invalid_request_exception(sprint("Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", relation->to_string())); diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index 936e9a06c4..d788f01dc4 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -266,7 +266,7 @@ private: public: void add_key_value(column_definition& def, ::shared_ptr value); - void precess_where_clause(std::vector where_clause, ::shared_ptr names); + void process_where_clause(std::vector where_clause, ::shared_ptr names); std::vector build_partition_keys(const query_options& options); private: From 754976bbecc1b9dd98eef74385a38b1a86b74804 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 14:17:39 +0100 Subject: [PATCH 08/36] cql3: Convert ParsedUpdate --- cql3/statements/update_statement.cc | 30 +++++++++++++ cql3/statements/update_statement.hh | 66 +++++++++-------------------- 2 files changed, 49 insertions(+), 47 deletions(-) diff --git a/cql3/statements/update_statement.cc b/cql3/statements/update_statement.cc index bb7899c913..189509b0d0 100644 --- a/cql3/statements/update_statement.cc +++ b/cql3/statements/update_statement.cc @@ -136,6 +136,36 @@ update_statement::parsed_insert::prepare_internal(schema_ptr schema, return stmt; } +::shared_ptr +update_statement::parsed_update::prepare_internal(schema_ptr schema, + ::shared_ptr bound_names, std::unique_ptr attrs) +{ + auto stmt = ::make_shared(statement_type::UPDATE, bound_names->size(), schema, std::move(attrs)); + + for (auto&& entry : _updates) { + auto id = entry.first->prepare_column_identifier(schema); + auto def = get_column_definition(schema, *id); + if (!def) { + throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *entry.first)); + } + + auto operation = entry.second->prepare(keyspace(), *def); + operation->collect_marker_specification(bound_names); + + switch (def->kind) { + case column_definition::column_kind::PARTITION: + case column_definition::column_kind::CLUSTERING: + throw exceptions::invalid_request_exception(sprint("PRIMARY KEY part %s found in SET part", *entry.first)); + default: + stmt->add_operation(std::move(operation)); + break; + } + } + + stmt->process_where_clause(_where_clause, bound_names); + return stmt; +} + } } diff --git a/cql3/statements/update_statement.hh b/cql3/statements/update_statement.hh index 8dee40cffe..c9a37b24c6 100644 --- a/cql3/statements/update_statement.hh +++ b/cql3/statements/update_statement.hh @@ -89,7 +89,7 @@ private: * @param columnValues list of column values (corresponds to names) * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ - parsed_insert(std::experimental::optional&& name, + parsed_insert(std::experimental::optional name, ::shared_ptr attrs, std::vector<::shared_ptr> column_names, std::vector<::shared_ptr> column_values, @@ -104,13 +104,12 @@ private: }; -#if 0 - public static class ParsedUpdate extends ModificationStatement.Parsed - { + class parsed_update : public modification_statement::parsed { + private: // Provided for an UPDATE - private final List> updates; - private final List whereClause; - + std::vector, ::shared_ptr>> _updates; + std::vector _where_clause; + public: /** * Creates a new UpdateStatement from a column family name, columns map, consistency * level, and key term. @@ -120,46 +119,19 @@ private: * @param updates a map of column operations to perform * @param whereClause the where clause */ - public ParsedUpdate(CFName name, - Attributes.Raw attrs, - List> updates, - List whereClause, - List> conditions) - { - super(name, attrs, conditions, false, false); - this.updates = updates; - this.whereClause = whereClause; - } - - protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException - { - UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, boundNames.size(), cfm, attrs); - - for (Pair entry : updates) - { - ColumnDefinition def = cfm.getColumnDefinition(entry.left.prepare(cfm)); - if (def == null) - throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left)); - - Operation operation = entry.right.prepare(keyspace(), def); - operation.collectMarkerSpecification(boundNames); - - switch (def.kind) - { - case PARTITION_KEY: - case CLUSTERING_COLUMN: - throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left)); - default: - stmt.addOperation(operation); - break; - } - } - - stmt.processWhereClause(whereClause, boundNames); - return stmt; - } - } -#endif + parsed_update(std::experimental::optional name, + ::shared_ptr attrs, + std::vector, ::shared_ptr>> updates, + std::vector where_clause, + std::vector, ::shared_ptr>> conditions) + : modification_statement::parsed(std::move(name), std::move(attrs), std::move(conditions), false, false) + , _updates(std::move(updates)) + , _where_clause(std::move(where_clause)) + { } + protected: + virtual ::shared_ptr prepare_internal(schema_ptr schema, + ::shared_ptr bound_names, std::unique_ptr attrs); + }; }; } From 90604df3761146cf458c7a10e2f4bb7fb4e59999 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 14:19:42 +0100 Subject: [PATCH 09/36] cql3: Move method definition to the source file --- cql3/column_identifier.cc | 20 ++++++++++++++++++++ cql3/column_identifier.hh | 20 +------------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cql3/column_identifier.cc b/cql3/column_identifier.cc index 0e36985f51..f501681e85 100644 --- a/cql3/column_identifier.cc +++ b/cql3/column_identifier.cc @@ -6,6 +6,26 @@ namespace cql3 { +::shared_ptr prepare_column_identifier(schema_ptr s) { +#if 0 + AbstractType comparator = cfm.comparator.asAbstractType(); + if (cfm.getIsDense() || comparator instanceof CompositeType || comparator instanceof UTF8Type) + return new ColumnIdentifier(text, true); + + // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator + // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to + // make a special check for those and treat them normally. See CASSANDRA-8178. + ByteBuffer bufferName = ByteBufferUtil.bytes(text); + for (ColumnDefinition def : cfm.partitionKeyColumns()) + { + if (def.name.bytes.equals(bufferName)) + return new ColumnIdentifier(text, true); + } + return new ColumnIdentifier(comparator.fromString(rawText), text); +#endif + throw std::runtime_error("not implemented"); +} + std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) { return out << id._text; } diff --git a/cql3/column_identifier.hh b/cql3/column_identifier.hh index 4f50182330..6b976afb43 100644 --- a/cql3/column_identifier.hh +++ b/cql3/column_identifier.hh @@ -156,25 +156,7 @@ public: return prepare_column_identifier(s); } - ::shared_ptr prepare_column_identifier(schema_ptr s) { -#if 0 - AbstractType comparator = cfm.comparator.asAbstractType(); - if (cfm.getIsDense() || comparator instanceof CompositeType || comparator instanceof UTF8Type) - return new ColumnIdentifier(text, true); - - // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator - // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to - // make a special check for those and treat them normally. See CASSANDRA-8178. - ByteBuffer bufferName = ByteBufferUtil.bytes(text); - for (ColumnDefinition def : cfm.partitionKeyColumns()) - { - if (def.name.bytes.equals(bufferName)) - return new ColumnIdentifier(text, true); - } - return new ColumnIdentifier(comparator.fromString(rawText), text); -#endif - throw std::runtime_error("not implemented"); - } + ::shared_ptr prepare_column_identifier(schema_ptr s); virtual bool processes_selection() const override { return false; From 87a38b52ef7e8649ec886d3f1a953634b7bb86ae Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 17:13:15 +0100 Subject: [PATCH 10/36] types: Add from_string() stub --- types.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/types.hh b/types.hh index 0303687ec7..44e780fd90 100644 --- a/types.hh +++ b/types.hh @@ -115,6 +115,9 @@ public: virtual sstring to_string(const bytes& b) { throw std::runtime_error("not implemented"); } + virtual bytes from_string(const sstring& text) { + throw std::runtime_error("not implemented"); + } virtual bool is_counter() { return false; } virtual bool is_collection() { return false; } protected: From 9f8ac4ab2a9f1c91c83aca72957aacd2492a655d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 4 Feb 2015 17:13:29 +0100 Subject: [PATCH 11/36] cql3: Implement column_identifier::raw::prepare() --- cql3/column_identifier.cc | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/cql3/column_identifier.cc b/cql3/column_identifier.cc index f501681e85..ba7c3c02fc 100644 --- a/cql3/column_identifier.cc +++ b/cql3/column_identifier.cc @@ -6,24 +6,22 @@ namespace cql3 { -::shared_ptr prepare_column_identifier(schema_ptr s) { -#if 0 - AbstractType comparator = cfm.comparator.asAbstractType(); - if (cfm.getIsDense() || comparator instanceof CompositeType || comparator instanceof UTF8Type) - return new ColumnIdentifier(text, true); +::shared_ptr +column_identifier::raw::prepare_column_identifier(schema_ptr schema) { + if (schema->regular_column_name_type == utf8_type) { + return ::make_shared(_text, true); + } - // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator - // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to - // make a special check for those and treat them normally. See CASSANDRA-8178. - ByteBuffer bufferName = ByteBufferUtil.bytes(text); - for (ColumnDefinition def : cfm.partitionKeyColumns()) - { - if (def.name.bytes.equals(bufferName)) - return new ColumnIdentifier(text, true); - } - return new ColumnIdentifier(comparator.fromString(rawText), text); -#endif - throw std::runtime_error("not implemented"); + // We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator + // to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to + // make a special check for those and treat them normally. See CASSANDRA-8178. + auto text_bytes = to_bytes(_text); + auto def = schema->get_column_definition(text_bytes); + if (def) { + return ::make_shared(std::move(text_bytes), _text); + } + + return ::make_shared(schema->regular_column_name_type->from_string(_raw_text), _text); } std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) { From 211f52e40a557178187492b386240067231a505d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 18:43:23 +0100 Subject: [PATCH 12/36] schema: Move to schema.hh --- database.hh | 104 +--------------------------------------------- schema.hh | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 103 deletions(-) create mode 100644 schema.hh diff --git a/database.hh b/database.hh index b1249889e7..3d1b59f298 100644 --- a/database.hh +++ b/database.hh @@ -27,6 +27,7 @@ #include "tuple.hh" #include "core/future.hh" #include "cql3/column_specification.hh" +#include "schema.hh" struct row; struct paritition; @@ -43,109 +44,6 @@ struct partition { std::map rows; }; -using column_id = uint32_t; - -class column_definition final { -private: - bytes _name; -public: - enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC }; - column_definition(bytes name, data_type type, column_id id, column_kind kind); - data_type type; - column_id id; // unique within (kind, schema instance) - column_kind kind; - ::shared_ptr column_specification; - bool is_static() const { return kind == column_kind::STATIC; } - bool is_partition_key() const { return kind == column_kind::PARTITION; } - const sstring& name_as_text() const; - const bytes& name() const; -}; - -struct thrift_schema { - shared_ptr partition_key_type; -}; - -/* - * Keep this effectively immutable. - */ -class schema final { -private: - std::unordered_map _columns_by_name; - std::map _regular_columns_by_name; -public: - struct column { - bytes name; - data_type type; - struct name_compare { - shared_ptr type; - name_compare(shared_ptr type) : type(type) {} - bool operator()(const column& cd1, const column& cd2) const { - return type->less(cd1.name, cd2.name); - } - }; - }; -private: - void build_columns(std::vector columns, column_definition::column_kind kind, std::vector& dst); - ::shared_ptr make_column_specification(column_definition& def); -public: - gc_clock::duration default_time_to_live = gc_clock::duration::zero(); - const sstring ks_name; - const sstring cf_name; - std::vector partition_key; - std::vector clustering_key; - std::vector regular_columns; // sorted by name - shared_ptr> partition_key_type; - shared_ptr> clustering_key_type; - shared_ptr clustering_key_prefix_type; - data_type regular_column_name_type; - thrift_schema thrift; -public: - schema(sstring ks_name, sstring cf_name, - std::vector partition_key, - std::vector clustering_key, - std::vector regular_columns, - shared_ptr regular_column_name_type); - bool is_dense() const { - return false; - } - bool is_counter() const { - return false; - } - column_definition* get_column_definition(const bytes& name); - auto regular_begin() { - return regular_columns.begin(); - } - auto regular_end() { - return regular_columns.end(); - } - auto regular_lower_bound(const bytes& name) { - // TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator - auto i = _regular_columns_by_name.lower_bound(name); - if (i == _regular_columns_by_name.end()) { - return regular_end(); - } else { - return regular_columns.begin() + i->second->id; - } - } - auto regular_upper_bound(const bytes& name) { - // TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator - auto i = _regular_columns_by_name.upper_bound(name); - if (i == _regular_columns_by_name.end()) { - return regular_end(); - } else { - return regular_columns.begin() + i->second->id; - } - } - column_id get_regular_columns_count() { - return regular_columns.size(); - } - data_type column_name_type(column_definition& def) { - return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type; - } -}; - -using schema_ptr = lw_shared_ptr; - struct column_family { column_family(schema_ptr schema); partition& find_or_create_partition(const bytes& key); diff --git a/schema.hh b/schema.hh new file mode 100644 index 0000000000..2482e9d08c --- /dev/null +++ b/schema.hh @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include + +#include "cql3/column_specification.hh" +#include "core/shared_ptr.hh" +#include "types.hh" +#include "tuple.hh" +#include "gc_clock.hh" + +using column_id = uint32_t; + +class column_definition final { +private: + bytes _name; +public: + enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC }; + column_definition(bytes name, data_type type, column_id id, column_kind kind); + data_type type; + column_id id; // unique within (kind, schema instance) + column_kind kind; + ::shared_ptr column_specification; + bool is_static() const { return kind == column_kind::STATIC; } + bool is_partition_key() const { return kind == column_kind::PARTITION; } + const sstring& name_as_text() const; + const bytes& name() const; +}; + +struct thrift_schema { + shared_ptr partition_key_type; +}; + +/* + * Keep this effectively immutable. + */ +class schema final { +private: + std::unordered_map _columns_by_name; + std::map _regular_columns_by_name; +public: + struct column { + bytes name; + data_type type; + struct name_compare { + shared_ptr type; + name_compare(shared_ptr type) : type(type) {} + bool operator()(const column& cd1, const column& cd2) const { + return type->less(cd1.name, cd2.name); + } + }; + }; +private: + void build_columns(std::vector columns, column_definition::column_kind kind, std::vector& dst); + ::shared_ptr make_column_specification(column_definition& def); +public: + gc_clock::duration default_time_to_live = gc_clock::duration::zero(); + const sstring ks_name; + const sstring cf_name; + std::vector partition_key; + std::vector clustering_key; + std::vector regular_columns; // sorted by name + shared_ptr> partition_key_type; + shared_ptr> clustering_key_type; + shared_ptr clustering_key_prefix_type; + data_type regular_column_name_type; + thrift_schema thrift; +public: + schema(sstring ks_name, sstring cf_name, + std::vector partition_key, + std::vector clustering_key, + std::vector regular_columns, + shared_ptr regular_column_name_type); + bool is_dense() const { + return false; + } + bool is_counter() const { + return false; + } + column_definition* get_column_definition(const bytes& name); + auto regular_begin() { + return regular_columns.begin(); + } + auto regular_end() { + return regular_columns.end(); + } + auto regular_lower_bound(const bytes& name) { + // TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator + auto i = _regular_columns_by_name.lower_bound(name); + if (i == _regular_columns_by_name.end()) { + return regular_end(); + } else { + return regular_columns.begin() + i->second->id; + } + } + auto regular_upper_bound(const bytes& name) { + // TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator + auto i = _regular_columns_by_name.upper_bound(name); + if (i == _regular_columns_by_name.end()) { + return regular_end(); + } else { + return regular_columns.begin() + i->second->id; + } + } + column_id get_regular_columns_count() { + return regular_columns.size(); + } + data_type column_name_type(column_definition& def) { + return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type; + } +}; + +using schema_ptr = lw_shared_ptr; From df8f7279b3266619ac9179007bc16a2feb50bdc8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:03:29 +0100 Subject: [PATCH 13/36] db: Simplify row tombstone management --- db/api.hh | 38 ++++++++------------------------------ 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/db/api.hh b/db/api.hh index 717de7d794..3a6cfc57fc 100644 --- a/db/api.hh +++ b/db/api.hh @@ -88,42 +88,20 @@ struct deletable_row final { row cells; }; -struct row_tombstone final { - tombstone t; - - /* - * Prefix can be shorter than the clustering key size, in which case it - * means that all rows whose keys have that prefix are removed. - * - * Empty prefix removes all rows, which is equivalent to removing the whole partition. - */ - bytes prefix; -}; - -struct row_tombstone_compare final { -private: - data_type _type; -public: - row_tombstone_compare(data_type type) : _type(type) {} - - bool operator()(const row_tombstone& t1, const row_tombstone& t2) { - return _type->less(t1.prefix, t2.prefix); - } -}; +using row_tombstone_set = std::map; class partition final { private: schema_ptr _schema; tombstone _tombstone; - row _static_row; std::map _rows; - std::set _row_tombstones; + row_tombstone_set _row_tombstones; public: partition(schema_ptr s) : _schema(std::move(s)) , _rows(key_compare(_schema->clustering_key_type)) - , _row_tombstones(row_tombstone_compare(_schema->clustering_key_prefix_type)) + , _row_tombstones(serialized_compare(_schema->clustering_key_prefix_type)) { } void apply(tombstone t) { @@ -136,14 +114,14 @@ public: } else if (prefix.size() == _schema->clustering_key.size()) { _rows[serialize_value(*_schema->clustering_key_type, prefix)].t.apply(t); } else { - apply(row_tombstone{t, serialize_value(*_schema->clustering_key_prefix_type, prefix)}); + apply_row_tombstone(serialize_value(*_schema->clustering_key_prefix_type, prefix), t); } } - void apply(const row_tombstone& rt) { - auto i = _row_tombstones.lower_bound(rt); - if (i == _row_tombstones.end() || !_schema->clustering_key_prefix_type->equal(rt.prefix, i->prefix) || rt.t > i->t) { - _row_tombstones.insert(i, rt); + void apply_row_tombstone(const bytes& prefix, const tombstone& t) { + auto i = _row_tombstones.lower_bound(prefix); + if (i == _row_tombstones.end() || !_schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { + _row_tombstones.insert(i, {prefix, t}); } } From 1212ad18d094d670181a06ad3d751750632b9e93 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:05:40 +0100 Subject: [PATCH 14/36] db: Make setting static vs. clustered cell explicit --- cql3/constants.hh | 7 ++++++- db/api.hh | 21 ++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/cql3/constants.hh b/cql3/constants.hh index ac1913419c..199c95df8e 100644 --- a/cql3/constants.hh +++ b/cql3/constants.hh @@ -338,7 +338,12 @@ public: virtual void execute(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override { bytes_opt value = _t->bind_and_get(params._options); - m.set_cell(prefix, column.id, value ? params.make_cell(*value) : params.make_dead_cell()); + auto cell = value ? params.make_cell(*value) : params.make_dead_cell(); + if (column.is_static()) { + m.set_static_cell(column, std::move(cell)); + } else { + m.set_clustered_cell(prefix, column, std::move(cell)); + } } }; diff --git a/db/api.hh b/db/api.hh index 3a6cfc57fc..f16c4d4bb0 100644 --- a/db/api.hh +++ b/db/api.hh @@ -133,11 +133,8 @@ public: return _rows[key].cells; } - row& get_row(const clustering_prefix& prefix) { - if (prefix.empty()) { - return static_row(); - } - return clustered_row(serialize_value(*_schema->clustering_key_type, prefix)); + row& clustered_row(clustering_key&& key) { + return _rows[std::move(key)].cells; } }; @@ -156,8 +153,18 @@ public: mutation(mutation&&) = default; mutation(const mutation&) = delete; - void set_cell(const clustering_prefix& prefix, column_id col, boost::any value) { - p.get_row(prefix)[col] = value; + void set_static_cell(const column_definition& def, boost::any value) { + p.static_row()[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); + row[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_key& key, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(key); + row[def.id] = std::move(value); } }; From 3645e983c0d788a4b66a352403f5dfdc343f4715 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:22:01 +0100 Subject: [PATCH 15/36] db: Refactor atomic_cell structure The new structure has common timestamp field extracted, it has the same meaning for both live and dead cells. It will make it easier to merge cells this way. --- cql3/update_parameters.hh | 11 +++++----- db/api.hh | 42 ++++++++++++++++++++++++--------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index 876f946f9b..a641a8567b 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -65,20 +65,19 @@ public: } api::atomic_cell make_dead_cell() const { - return {make_tombstone()}; + return api::atomic_cell{_timestamp, api::atomic_cell::dead{_local_deletion_time}}; } api::atomic_cell make_cell(bytes value) const { auto ttl = _ttl; - if (!ttl.count()) { + if (ttl.count() <= 0) { ttl = _schema->default_time_to_live; } - return api::atomic_cell(api::live_atomic_cell(_timestamp, - ttl.count() ? api::ttl_opt{_local_deletion_time + ttl} : api::ttl_opt{}, - std::move(value))); - } + return api::atomic_cell{_timestamp, + api::atomic_cell::live{ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, std::move(value)}}; + }; #if 0 public Cell makeCounter(CellName name, long delta) throws InvalidRequestException diff --git a/db/api.hh b/db/api.hh index f16c4d4bb0..daf1e2c5cf 100644 --- a/db/api.hh +++ b/db/api.hh @@ -66,21 +66,23 @@ struct tombstone final { using ttl_opt = std::experimental::optional; -class live_atomic_cell final { -private: - timestamp_type _timestamp; - ttl_opt _ttl; - bytes _value; -public: - live_atomic_cell(timestamp_type timestamp, ttl_opt ttl, bytes value) - : _timestamp(timestamp) - , _ttl(ttl) - , _value(value) { - } +struct atomic_cell final { + struct dead { + gc_clock::time_point ttl; + }; + struct live { + ttl_opt ttl; + bytes value; + }; + api::timestamp_type timestamp; + boost::variant value; + bool is_live() const { return value.which() == 1; } + // Call only when is_live() == true + const live& as_live() const { return boost::get(value); } + // Call only when is_live() == false + const dead& as_dead() const { return boost::get(value); } }; -using atomic_cell = boost::variant; - using row = std::map; struct deletable_row final { @@ -90,7 +92,7 @@ struct deletable_row final { using row_tombstone_set = std::map; -class partition final { +class mutation_partition final { private: schema_ptr _schema; tombstone _tombstone; @@ -98,7 +100,7 @@ private: std::map _rows; row_tombstone_set _row_tombstones; public: - partition(schema_ptr s) + mutation_partition(schema_ptr s) : _schema(std::move(s)) , _rows(key_compare(_schema->clustering_key_type)) , _row_tombstones(serialized_compare(_schema->clustering_key_prefix_type)) @@ -136,13 +138,21 @@ public: row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; } + + row* find_row(const clustering_key& key) { + auto i = _rows.find(key); + if (i == _rows.end()) { + return nullptr; + } + return &i->second.cells; + } }; class mutation final { public: schema_ptr schema; partition_key key; - partition p; + mutation_partition p; public: mutation(partition_key key_, schema_ptr schema_) : schema(std::move(schema_)) From 19e89a60579f1bf303247157eac6dc39089a73b0 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:23:45 +0100 Subject: [PATCH 16/36] db: Introduce mutation_partition::apply(const mutation_partition&) It merges two partition mutations together. It is assumed that the first one (invocation target) is much larger. --- database.cc | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++ db/api.hh | 2 ++ schema.hh | 4 +++ types.hh | 1 + 4 files changed, 80 insertions(+) diff --git a/database.cc b/database.cc index 3a3c465b17..90b073fd6b 100644 --- a/database.cc +++ b/database.cc @@ -254,3 +254,76 @@ database::find_keyspace(sstring name) { } return nullptr; } + +// Based on org.apache.cassandra.db.AbstractCell#reconcile() +static inline +int +compare_for_merge(const column_definition& def, const atomic_cell& left, const atomic_cell& right) { + if (left.timestamp != right.timestamp) { + return left.timestamp > right.timestamp ? 1 : -1; + } + if (left.value.which() != right.value.which()) { + return left.is_live() ? -1 : 1; + } + if (left.is_live()) { + return def.type->compare(left.as_live().value, right.as_live().value); + } else { + auto& c1 = left.as_dead(); + auto& c2 = right.as_dead(); + if (c1.ttl != c2.ttl) { + // Origin compares big-endian serialized TTL + return (uint32_t)c1.ttl.time_since_epoch().count() < (uint32_t)c2.ttl.time_since_epoch().count() ? -1 : 1; + } + return 0; + } +} + +static inline +int +compare_for_merge(const column_definition& def, + const std::pair& left, + const std::pair& right) { + if (def.is_atomic()) { + return compare_for_merge(def, boost::any_cast(left.second), + boost::any_cast(right.second)); + } else { + throw std::runtime_error("not implemented"); + } +} + +void mutation_partition::apply(const mutation_partition& p) { + _tombstone.apply(p._tombstone); + + for (auto&& entry : p._row_tombstones) { + apply_row_tombstone(entry.first, entry.second); + } + + auto merge_cells = [this] (row& old_row, const row& new_row) { + for (auto&& new_column : new_row) { + auto col = new_column.first; + auto i = old_row.find(col); + if (i == old_row.end()) { + _static_row.emplace_hint(i, new_column); + } else { + auto& old_column = *i; + auto& def = _schema->regular_column_at(col); + if (compare_for_merge(def, old_column, new_column) < 0) { + old_column.second = new_column.second; + } + } + } + }; + + merge_cells(_static_row, p._static_row); + + for (auto&& entry : p._rows) { + auto& key = entry.first; + auto i = _rows.find(key); + if (i == _rows.end()) { + _rows.emplace_hint(i, entry); + } else { + i->second.t.apply(entry.second.t); + merge_cells(i->second.cells, entry.second.cells); + } + } +} diff --git a/db/api.hh b/db/api.hh index daf1e2c5cf..a81eae1cee 100644 --- a/db/api.hh +++ b/db/api.hh @@ -127,6 +127,8 @@ public: } } + void apply(const mutation_partition& p); + row& static_row() { return _static_row; } diff --git a/schema.hh b/schema.hh index 2482e9d08c..1d949212da 100644 --- a/schema.hh +++ b/schema.hh @@ -26,6 +26,7 @@ public: ::shared_ptr column_specification; bool is_static() const { return kind == column_kind::STATIC; } bool is_partition_key() const { return kind == column_kind::PARTITION; } + bool is_atomic() const { return !type->is_multi_cell(); } const sstring& name_as_text() const; const bytes& name() const; }; @@ -111,6 +112,9 @@ public: data_type column_name_type(column_definition& def) { return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type; } + column_definition& regular_column_at(column_id id) { + return regular_columns[id]; + } }; using schema_ptr = lw_shared_ptr; diff --git a/types.hh b/types.hh index 44e780fd90..c7297abb83 100644 --- a/types.hh +++ b/types.hh @@ -120,6 +120,7 @@ public: } virtual bool is_counter() { return false; } virtual bool is_collection() { return false; } + virtual bool is_multi_cell() { return false; } protected: template > bool default_less(const bytes& b1, const bytes& b2, Compare compare = Compare()); From 800ba79efa5783a376569ea5010cb9cb3c3d29fb Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:28:06 +0100 Subject: [PATCH 17/36] db: Drop api:: namespace from mutation model classes In preparation for merging into database.hh --- cql3/constants.hh | 2 +- cql3/operation.hh | 2 +- cql3/statements/delete_statement.cc | 2 +- cql3/statements/delete_statement.hh | 2 +- cql3/statements/modification_statement.cc | 24 +++++++++++------------ cql3/statements/modification_statement.hh | 18 ++++++++--------- cql3/statements/update_statement.cc | 2 +- cql3/statements/update_statement.hh | 2 +- cql3/update_parameters.hh | 15 +++++++------- db/api.hh | 16 +++++++-------- service/storage_proxy.cc | 6 +++--- service/storage_proxy.hh | 6 +++--- validation.cc | 2 +- validation.hh | 2 +- 14 files changed, 50 insertions(+), 51 deletions(-) diff --git a/cql3/constants.hh b/cql3/constants.hh index 199c95df8e..d73dac31d5 100644 --- a/cql3/constants.hh +++ b/cql3/constants.hh @@ -336,7 +336,7 @@ public: public: using operation::operation; - virtual void execute(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override { + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override { bytes_opt value = _t->bind_and_get(params._options); auto cell = value ? params.make_cell(*value) : params.make_dead_cell(); if (column.is_static()) { diff --git a/cql3/operation.hh b/cql3/operation.hh index d9afc44466..fca7989cbc 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -104,7 +104,7 @@ public: /** * Execute the operation. */ - virtual void execute(api::mutation& m, const api::clustering_prefix& row_key, const update_parameters& params) = 0; + virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0; /** * A parsed raw UPDATE operation. diff --git a/cql3/statements/delete_statement.cc b/cql3/statements/delete_statement.cc index 3a84877d2d..ed9250c203 100644 --- a/cql3/statements/delete_statement.cc +++ b/cql3/statements/delete_statement.cc @@ -28,7 +28,7 @@ namespace cql3 { namespace statements { -void delete_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) { +void delete_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) { if (_column_operations.empty()) { m.p.apply_delete(prefix, params.make_tombstone()); return; diff --git a/cql3/statements/delete_statement.hh b/cql3/statements/delete_statement.hh index d2f9a289cf..219d6ad0a6 100644 --- a/cql3/statements/delete_statement.hh +++ b/cql3/statements/delete_statement.hh @@ -62,7 +62,7 @@ public: return false; } - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override; #if 0 protected void validateWhereClauseForConditions() throws InvalidRequestException diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 178ea865a9..608c96ce81 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -50,13 +50,13 @@ operator<<(std::ostream& out, modification_statement::statement_type t) { return out; } -future> +future> modification_statement::get_mutations(const query_options& options, bool local, int64_t now) { auto keys = make_lw_shared(build_partition_keys(options)); auto prefix = make_lw_shared(create_clustering_prefix(options)); return make_update_parameters(keys, prefix, options, local, now).then( [this, keys = std::move(keys), prefix = std::move(prefix), now] (auto params_ptr) { - std::vector mutations; + std::vector mutations; mutations.reserve(keys->size()); for (auto key : *keys) { validation::validate_cql_key(s, key); @@ -70,8 +70,8 @@ modification_statement::get_mutations(const query_options& options, bool local, future> modification_statement::make_update_parameters( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, const query_options& options, bool local, int64_t now) { @@ -87,8 +87,8 @@ modification_statement::make_update_parameters( future modification_statement::read_required_rows( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, bool local, db::consistency_level cl) { if (!requires_read()) { @@ -148,7 +148,7 @@ modification_statement::get_first_empty_key() { return {}; } -api::clustering_prefix +clustering_prefix modification_statement::create_clustering_prefix_internal(const query_options& options) { std::vector components; const column_definition* first_empty_key = nullptr; @@ -184,7 +184,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o return components; } -api::clustering_prefix +clustering_prefix modification_statement::create_clustering_prefix(const query_options& options) { // If the only updated/deleted columns are static, then we don't need clustering columns. // And in fact, unless it is an INSERT, we reject if clustering columns are provided as that @@ -222,9 +222,9 @@ modification_statement::create_clustering_prefix(const query_options& options) { return create_clustering_prefix_internal(options); } -std::vector +std::vector modification_statement::build_partition_keys(const query_options& options) { - std::vector result; + std::vector result; std::vector components; auto remaining = s->partition_key.size(); @@ -244,7 +244,7 @@ modification_statement::build_partition_keys(const query_options& options) { throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text())); } components.push_back(val); - api::partition_key key = serialize_value(*s->partition_key_type, components); + partition_key key = serialize_value(*s->partition_key_type, components); validation::validate_cql_key(s, key); result.push_back(key); } else { @@ -256,7 +256,7 @@ modification_statement::build_partition_keys(const query_options& options) { full_components.reserve(components.size() + 1); auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components)); *i = val; - api::partition_key key = serialize_value(*s->partition_key_type, full_components); + partition_key key = serialize_value(*s->partition_key_type, full_components); validation::validate_cql_key(s, key); result.push_back(key); } diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index d788f01dc4..e9f65ace5c 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -160,7 +160,7 @@ public: virtual bool require_full_clustering_key() const = 0; - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) = 0; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0; virtual int get_bound_terms() override { return _bound_terms; @@ -267,11 +267,11 @@ private: public: void add_key_value(column_definition& def, ::shared_ptr value); void process_where_clause(std::vector where_clause, ::shared_ptr names); - std::vector build_partition_keys(const query_options& options); + std::vector build_partition_keys(const query_options& options); private: - api::clustering_prefix create_clustering_prefix(const query_options& options); - api::clustering_prefix create_clustering_prefix_internal(const query_options& options); + clustering_prefix create_clustering_prefix(const query_options& options); + clustering_prefix create_clustering_prefix_internal(const query_options& options); protected: const column_definition* get_first_empty_key(); @@ -285,8 +285,8 @@ public: protected: future read_required_rows( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, bool local, db::consistency_level cl); @@ -435,12 +435,12 @@ private: * @return vector of the mutations * @throws invalid_request_exception on invalid requests */ - future> get_mutations(const query_options& options, bool local, int64_t now); + future> get_mutations(const query_options& options, bool local, int64_t now); public: future> make_update_parameters( - lw_shared_ptr> keys, - lw_shared_ptr prefix, + lw_shared_ptr> keys, + lw_shared_ptr prefix, const query_options& options, bool local, int64_t now); diff --git a/cql3/statements/update_statement.cc b/cql3/statements/update_statement.cc index 189509b0d0..4501ebd348 100644 --- a/cql3/statements/update_statement.cc +++ b/cql3/statements/update_statement.cc @@ -31,7 +31,7 @@ namespace cql3 { namespace statements { -void update_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) { +void update_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) { if (s->is_dense()) { throw std::runtime_error("Dense tables not supported yet"); #if 0 diff --git a/cql3/statements/update_statement.hh b/cql3/statements/update_statement.hh index c9a37b24c6..9519603474 100644 --- a/cql3/statements/update_statement.hh +++ b/cql3/statements/update_statement.hh @@ -74,7 +74,7 @@ private: return true; } - virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override; + virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override; class parsed_insert : public modification_statement::parsed { private: diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index a641a8567b..ab6aa808ac 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -36,8 +36,7 @@ namespace cql3 { class update_parameters final { public: using prefetched_rows_type = std::experimental::optional< - std::unordered_map>; + std::unordered_map>; private: const gc_clock::duration _ttl; const prefetched_rows_type _prefetched; // For operation that require a read-before-write @@ -64,19 +63,19 @@ public: } } - api::atomic_cell make_dead_cell() const { - return api::atomic_cell{_timestamp, api::atomic_cell::dead{_local_deletion_time}}; + atomic_cell make_dead_cell() const { + return atomic_cell{_timestamp, atomic_cell::dead{_local_deletion_time}}; } - api::atomic_cell make_cell(bytes value) const { + atomic_cell make_cell(bytes value) const { auto ttl = _ttl; if (ttl.count() <= 0) { ttl = _schema->default_time_to_live; } - return api::atomic_cell{_timestamp, - api::atomic_cell::live{ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, std::move(value)}}; + return atomic_cell{_timestamp, + atomic_cell::live{ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, std::move(value)}}; }; #if 0 @@ -87,7 +86,7 @@ public: } #endif - api::tombstone make_tombstone() const { + tombstone make_tombstone() const { return {_timestamp, _local_deletion_time}; } diff --git a/db/api.hh b/db/api.hh index a81eae1cee..a067f0b1ab 100644 --- a/db/api.hh +++ b/db/api.hh @@ -13,8 +13,6 @@ #include "database.hh" #include "db/consistency_level.hh" -namespace api { - using partition_key_type = tuple_type<>; using clustering_key_type = tuple_type<>; using clustering_prefix_type = tuple_prefix; @@ -22,27 +20,31 @@ using partition_key = bytes; using clustering_key = bytes; using clustering_prefix = clustering_prefix_type::value_type; +namespace api { + using timestamp_type = int64_t; timestamp_type constexpr missing_timestamp = std::numeric_limits::min(); timestamp_type constexpr min_timestamp = std::numeric_limits::min() + 1; timestamp_type constexpr max_timestamp = std::numeric_limits::max(); +} + /** * Represents deletion operation. Can be commuted with other tombstones via apply() method. * Can be empty. * */ struct tombstone final { - timestamp_type timestamp; + api::timestamp_type timestamp; gc_clock::time_point ttl; - tombstone(timestamp_type timestamp, gc_clock::time_point ttl) + tombstone(api::timestamp_type timestamp, gc_clock::time_point ttl) : timestamp(timestamp) , ttl(ttl) { } tombstone() - : tombstone(missing_timestamp, {}) + : tombstone(api::missing_timestamp, {}) { } bool operator<(const tombstone& t) const { @@ -54,7 +56,7 @@ struct tombstone final { } operator bool() const { - return timestamp != missing_timestamp; + return timestamp != api::missing_timestamp; } void apply(const tombstone& t) { @@ -179,5 +181,3 @@ public: row[def.id] = std::move(value); } }; - -} diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 92d220187c..f2f8bc1ae0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -475,7 +475,7 @@ namespace service { * @param consistency_level the consistency level for the operation */ future<> -storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { +storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { throw std::runtime_error("NOT IMPLEMENTED"); #if 0 Tracing.trace("Determining replicas for mutation"); @@ -559,7 +559,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_leve } future<> -storage_proxy::mutate_with_triggers(std::vector mutations, db::consistency_level cl, +storage_proxy::mutate_with_triggers(std::vector mutations, db::consistency_level cl, bool should_mutate_atomically) { unimplemented::triggers(); #if 0 @@ -587,7 +587,7 @@ storage_proxy::mutate_with_triggers(std::vector mutations, db::co * @param consistency_level the consistency level for the operation */ future<> -storage_proxy::mutate_atomically(std::vector mutations, db::consistency_level cl) { +storage_proxy::mutate_atomically(std::vector mutations, db::consistency_level cl) { unimplemented::lwt(); #if 0 Tracing.trace("Determining replicas for atomic batch"); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b04255e8bb..b56ad6f105 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -39,9 +39,9 @@ public: * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate(std::vector mutations, db::consistency_level cl); + static future<> mutate(std::vector mutations, db::consistency_level cl); - static future<> mutate_with_triggers(std::vector mutations, + static future<> mutate_with_triggers(std::vector mutations, db::consistency_level cl, bool should_mutate_atomically); /** @@ -53,7 +53,7 @@ public: * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate_atomically(std::vector mutations, db::consistency_level cl); + static future<> mutate_atomically(std::vector mutations, db::consistency_level cl); }; } diff --git a/validation.cc b/validation.cc index 38de99703b..7963b3c2e6 100644 --- a/validation.cc +++ b/validation.cc @@ -31,7 +31,7 @@ namespace validation { * Based on org.apache.cassandra.thrift.ThriftValidation#validate_key() */ void -validate_cql_key(schema_ptr schema, const api::partition_key& key) { +validate_cql_key(schema_ptr schema, const partition_key& key) { if (key.empty()) { throw exceptions::invalid_request_exception("Key may not be empty"); } diff --git a/validation.hh b/validation.hh index 9d625efada..078f0b0d56 100644 --- a/validation.hh +++ b/validation.hh @@ -31,7 +31,7 @@ namespace validation { constexpr size_t max_key_size = std::numeric_limits::max(); -void validate_cql_key(schema_ptr schema, const api::partition_key& key); +void validate_cql_key(schema_ptr schema, const partition_key& key); schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name); } From 48c11a01db3415e90954db19684e399271093c1d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 5 Feb 2015 19:29:26 +0100 Subject: [PATCH 18/36] db: Add ability to apply mutations into the database For simplicity partition data is stored using the same object which is used for mutations: mutation_partition. Later we can introduce a more efficient version. --- database.cc | 28 ++++++++++++---------------- database.hh | 23 +++++++---------------- db/api.hh | 8 +++----- thrift/handler.cc | 39 ++++++++++++++++++++++++++++----------- 4 files changed, 50 insertions(+), 48 deletions(-) diff --git a/database.cc b/database.cc index 90b073fd6b..e4d2f8aa93 100644 --- a/database.cc +++ b/database.cc @@ -10,10 +10,6 @@ thread_local logging::logger dblog("database"); -partition::partition(column_family& cf) - : rows(key_compare(cf._schema->clustering_key_type)) { -} - template std::vector<::shared_ptr> get_column_types(const Sequence& column_definitions) { @@ -78,7 +74,7 @@ column_family::column_family(schema_ptr schema) , partitions(key_compare(_schema->thrift.partition_key_type)) { } -partition* +mutation_partition* column_family::find_partition(const bytes& key) { auto i = partitions.find(key); return i == partitions.end() ? nullptr : &i->second; @@ -86,33 +82,28 @@ column_family::find_partition(const bytes& key) { row* column_family::find_row(const bytes& partition_key, const bytes& clustering_key) { - partition* p = find_partition(partition_key); + mutation_partition* p = find_partition(partition_key); if (!p) { return nullptr; } - auto i = p->rows.find(clustering_key); - return i == p->rows.end() ? nullptr : &i->second; + return p->find_row(clustering_key); } -partition& +mutation_partition& column_family::find_or_create_partition(const bytes& key) { // call lower_bound so we have a hint for the insert, just in case. auto i = partitions.lower_bound(key); if (i == partitions.end() || key != i->first) { - i = partitions.emplace_hint(i, std::make_pair(std::move(key), partition(*this))); + i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); } return i->second; } row& column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) { - partition& p = find_or_create_partition(partition_key); + mutation_partition& p = find_or_create_partition(partition_key); // call lower_bound so we have a hint for the insert, just in case. - auto i = p.rows.lower_bound(clustering_key); - if (i == p.rows.end() || clustering_key != i->first) { - i = p.rows.emplace_hint(i, std::make_pair(std::move(clustering_key), row())); - } - return i->second; + return p.clustered_row(clustering_key); } sstring to_hex(const bytes& b) { @@ -255,6 +246,11 @@ database::find_keyspace(sstring name) { return nullptr; } +void column_family::apply(const mutation& m) { + mutation_partition& p = find_or_create_partition(m.key); + p.apply(m.p); +} + // Based on org.apache.cassandra.db.AbstractCell#reconcile() static inline int diff --git a/database.hh b/database.hh index 3d1b59f298..6cab430c82 100644 --- a/database.hh +++ b/database.hh @@ -27,32 +27,23 @@ #include "tuple.hh" #include "core/future.hh" #include "cql3/column_specification.hh" +#include +#include +#include "db/api.hh" #include "schema.hh" -struct row; -struct paritition; struct column_family; -struct row { - std::vector cells; -}; - -struct partition { - explicit partition(column_family& cf); - row static_columns; - // row key within partition -> row - std::map rows; -}; - struct column_family { column_family(schema_ptr schema); - partition& find_or_create_partition(const bytes& key); + mutation_partition& find_or_create_partition(const bytes& key); row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key); - partition* find_partition(const bytes& key); + mutation_partition* find_partition(const bytes& key); row* find_row(const bytes& partition_key, const bytes& clustering_key); schema_ptr _schema; // partition key -> partition - std::map partitions; + std::map partitions; + void apply(const mutation& m); }; class keyspace { diff --git a/db/api.hh b/db/api.hh index a067f0b1ab..b62365d59b 100644 --- a/db/api.hh +++ b/db/api.hh @@ -4,14 +4,12 @@ #pragma once -#include -#include -#include +#include +#include +#include "schema.hh" #include "db_clock.hh" #include "gc_clock.hh" -#include "database.hh" -#include "db/consistency_level.hh" using partition_key_type = tuple_type<>; using clustering_key_type = tuple_type<>; diff --git a/thrift/handler.cc b/thrift/handler.cc index 1dc32c17bc..6085b992a5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -91,12 +91,19 @@ public: // FIXME: force limit count? while (beg != end && count--) { column_definition& def = range.reversed ? *--end : *beg++; - Column col; - col.__set_name(def.name()); - col.__set_value(rw->cells[def.id]); - ColumnOrSuperColumn v; - v.__set_column(std::move(col)); - ret.push_back(std::move(v)); + if (def.is_atomic()) { + const auto& cell = boost::any_cast((*rw)[def.id]); + if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels + Column col; + col.__set_name(def.name()); + col.__set_value(cell.as_live().value); + col.__set_timestamp(cell.timestamp); + // FIXME: set ttl + ColumnOrSuperColumn v; + v.__set_column(std::move(col)); + ret.push_back(std::move(v)); + }; + } } } } else { @@ -184,21 +191,30 @@ public: sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; auto& cf = lookup_column_family(cf_name); - auto& row = cf.find_or_create_row(key, null_clustering_key); + mutation m_to_apply(key, cf._schema); for (const Mutation& m : mutations) { if (m.__isset.column_or_supercolumn) { auto&& cosc = m.column_or_supercolumn; if (cosc.__isset.column) { auto&& col = cosc.column; bytes cname = to_bytes(col.name); - // FIXME: use a lookup map auto def = cf._schema->get_column_definition(cname); if (!def) { throw make_exception("column %s not found", col.name); } - auto& cells = row.cells; - cells.resize(cf._schema->get_regular_columns_count()); - cells[def->id] = to_bytes(col.value); + if (def->kind != column_definition::column_kind::REGULAR) { + throw make_exception("Column %s is not settable", col.name); + } + gc_clock::duration ttl; + if (col.__isset.ttl) { + ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); + } + if (ttl.count() <= 0) { + ttl = cf._schema->default_time_to_live; + } + auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); + m_to_apply.set_clustered_cell(null_clustering_key, *def, + atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); } else if (cosc.__isset.super_column) { // FIXME: implement } else if (cosc.__isset.counter_column) { @@ -215,6 +231,7 @@ public: throw make_exception("Mutation must have either column or deletion"); } } + cf.apply(m_to_apply); } } } catch (std::exception& ex) { From e20cc1c1f9f93c656f1bb4bd37b3a305f12de04a Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 11:51:25 +0100 Subject: [PATCH 19/36] db: Avoid storing schema pointer with each partition --- cql3/statements/delete_statement.cc | 2 +- database.cc | 10 +++++----- db/api.hh | 20 +++++++++----------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/cql3/statements/delete_statement.cc b/cql3/statements/delete_statement.cc index ed9250c203..e5fa52024c 100644 --- a/cql3/statements/delete_statement.cc +++ b/cql3/statements/delete_statement.cc @@ -30,7 +30,7 @@ namespace statements { void delete_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) { if (_column_operations.empty()) { - m.p.apply_delete(prefix, params.make_tombstone()); + m.p.apply_delete(s, prefix, params.make_tombstone()); return; } diff --git a/database.cc b/database.cc index e4d2f8aa93..5a75c423d2 100644 --- a/database.cc +++ b/database.cc @@ -248,7 +248,7 @@ database::find_keyspace(sstring name) { void column_family::apply(const mutation& m) { mutation_partition& p = find_or_create_partition(m.key); - p.apply(m.p); + p.apply(_schema, m.p); } // Based on org.apache.cassandra.db.AbstractCell#reconcile() @@ -287,14 +287,14 @@ compare_for_merge(const column_definition& def, } } -void mutation_partition::apply(const mutation_partition& p) { +void mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(entry.first, entry.second); + apply_row_tombstone(schema, entry.first, entry.second); } - auto merge_cells = [this] (row& old_row, const row& new_row) { + auto merge_cells = [this, schema] (row& old_row, const row& new_row) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); @@ -302,7 +302,7 @@ void mutation_partition::apply(const mutation_partition& p) { _static_row.emplace_hint(i, new_column); } else { auto& old_column = *i; - auto& def = _schema->regular_column_at(col); + auto& def = schema->regular_column_at(col); if (compare_for_merge(def, old_column, new_column) < 0) { old_column.second = new_column.second; } diff --git a/db/api.hh b/db/api.hh index b62365d59b..4da7ebabf5 100644 --- a/db/api.hh +++ b/db/api.hh @@ -94,40 +94,38 @@ using row_tombstone_set = std::map; class mutation_partition final { private: - schema_ptr _schema; tombstone _tombstone; row _static_row; std::map _rows; row_tombstone_set _row_tombstones; public: mutation_partition(schema_ptr s) - : _schema(std::move(s)) - , _rows(key_compare(_schema->clustering_key_type)) - , _row_tombstones(serialized_compare(_schema->clustering_key_prefix_type)) + : _rows(key_compare(s->clustering_key_type)) + , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) { } void apply(tombstone t) { _tombstone.apply(t); } - void apply_delete(const clustering_prefix& prefix, tombstone t) { + void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) { if (prefix.empty()) { apply(t); - } else if (prefix.size() == _schema->clustering_key.size()) { - _rows[serialize_value(*_schema->clustering_key_type, prefix)].t.apply(t); + } else if (prefix.size() == schema->clustering_key.size()) { + _rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t); } else { - apply_row_tombstone(serialize_value(*_schema->clustering_key_prefix_type, prefix), t); + apply_row_tombstone(schema, serialize_value(*schema->clustering_key_prefix_type, prefix), t); } } - void apply_row_tombstone(const bytes& prefix, const tombstone& t) { + void apply_row_tombstone(schema_ptr schema, const bytes& prefix, const tombstone& t) { auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !_schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { + if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { _row_tombstones.insert(i, {prefix, t}); } } - void apply(const mutation_partition& p); + void apply(schema_ptr schema, const mutation_partition& p); row& static_row() { return _static_row; From d5a7f37c450567ff12dc402e087f9f917dbf2a68 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 12:36:53 +0100 Subject: [PATCH 20/36] db: Merge api.hh into database.hh --- cql3/operation.hh | 2 +- cql3/query_options.hh | 2 +- cql3/statements/delete_statement.hh | 2 +- cql3/statements/update_statement.hh | 2 +- cql3/update_parameters.hh | 2 +- database.hh | 168 +++++++++++++++++++++++++- db/api.hh | 179 ---------------------------- service/storage_proxy.hh | 2 +- validation.hh | 2 +- 9 files changed, 173 insertions(+), 188 deletions(-) delete mode 100644 db/api.hh diff --git a/cql3/operation.hh b/cql3/operation.hh index fca7989cbc..16d5f36cd6 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -27,7 +27,7 @@ #include "core/shared_ptr.hh" -#include "db/api.hh" +#include "database.hh" #include diff --git a/cql3/query_options.hh b/cql3/query_options.hh index 7bcb604ef3..01f2106628 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -25,7 +25,7 @@ #ifndef CQL3_CQL_QUERY_OPTIONS_HH #define CQL3_CQL_QUERY_OPTIONS_HH -#include "db/api.hh" +#include "database.hh" #include "db/consistency_level.hh" #include "service/query_state.hh" diff --git a/cql3/statements/delete_statement.hh b/cql3/statements/delete_statement.hh index 219d6ad0a6..975bd0e75c 100644 --- a/cql3/statements/delete_statement.hh +++ b/cql3/statements/delete_statement.hh @@ -27,7 +27,7 @@ #include "cql3/statements/modification_statement.hh" #include "cql3/attributes.hh" #include "cql3/operation.hh" -#include "db/api.hh" +#include "database.hh" namespace cql3 { diff --git a/cql3/statements/update_statement.hh b/cql3/statements/update_statement.hh index 9519603474..d2cad8c09d 100644 --- a/cql3/statements/update_statement.hh +++ b/cql3/statements/update_statement.hh @@ -29,7 +29,7 @@ #include "cql3/column_identifier.hh" #include "cql3/term.hh" -#include "db/api.hh" +#include "database.hh" #include #include "unimplemented.hh" diff --git a/cql3/update_parameters.hh b/cql3/update_parameters.hh index ab6aa808ac..609e27f7cb 100644 --- a/cql3/update_parameters.hh +++ b/cql3/update_parameters.hh @@ -25,7 +25,7 @@ #ifndef CQL3_UPDATE_PARAMETERS_HH #define CQL3_UPDATE_PARAMETERS_HH -#include "db/api.hh" +#include "database.hh" #include "exceptions/exceptions.hh" namespace cql3 { diff --git a/database.hh b/database.hh index 6cab430c82..a2fc65ffd0 100644 --- a/database.hh +++ b/database.hh @@ -29,10 +29,174 @@ #include "cql3/column_specification.hh" #include #include -#include "db/api.hh" #include "schema.hh" -struct column_family; +using partition_key_type = tuple_type<>; +using clustering_key_type = tuple_type<>; +using clustering_prefix_type = tuple_prefix; +using partition_key = bytes; +using clustering_key = bytes; +using clustering_prefix = clustering_prefix_type::value_type; + +namespace api { + +using timestamp_type = int64_t; +timestamp_type constexpr missing_timestamp = std::numeric_limits::min(); +timestamp_type constexpr min_timestamp = std::numeric_limits::min() + 1; +timestamp_type constexpr max_timestamp = std::numeric_limits::max(); + +} + +/** +* Represents deletion operation. Can be commuted with other tombstones via apply() method. +* Can be empty. +* +*/ +struct tombstone final { + api::timestamp_type timestamp; + gc_clock::time_point ttl; + + tombstone(api::timestamp_type timestamp, gc_clock::time_point ttl) + : timestamp(timestamp) + , ttl(ttl) + { } + + tombstone() + : tombstone(api::missing_timestamp, {}) + { } + + bool operator<(const tombstone& t) const { + return timestamp < t.timestamp || ttl < t.ttl; + } + + bool operator==(const tombstone& t) const { + return timestamp == t.timestamp && ttl == t.ttl; + } + + operator bool() const { + return timestamp != api::missing_timestamp; + } + + void apply(const tombstone& t) { + if (*this < t) { + *this = t; + } + } +}; + +using ttl_opt = std::experimental::optional; + +struct atomic_cell final { + struct dead { + gc_clock::time_point ttl; + }; + struct live { + ttl_opt ttl; + bytes value; + }; + api::timestamp_type timestamp; + boost::variant value; + bool is_live() const { return value.which() == 1; } + // Call only when is_live() == true + const live& as_live() const { return boost::get(value); } + // Call only when is_live() == false + const dead& as_dead() const { return boost::get(value); } +}; + +using row = std::map; + +struct deletable_row final { + tombstone t; + row cells; +}; + +using row_tombstone_set = std::map; + +class mutation_partition final { +private: + tombstone _tombstone; + row _static_row; + std::map _rows; + row_tombstone_set _row_tombstones; +public: + mutation_partition(schema_ptr s) + : _rows(key_compare(s->clustering_key_type)) + , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) + { } + + void apply(tombstone t) { + _tombstone.apply(t); + } + + void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) { + if (prefix.empty()) { + apply(t); + } else if (prefix.size() == schema->clustering_key.size()) { + _rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t); + } else { + apply_row_tombstone(schema, serialize_value(*schema->clustering_key_prefix_type, prefix), t); + } + } + + void apply_row_tombstone(schema_ptr schema, const bytes& prefix, const tombstone& t) { + auto i = _row_tombstones.lower_bound(prefix); + if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { + _row_tombstones.insert(i, {prefix, t}); + } + } + + void apply(schema_ptr schema, const mutation_partition& p); + + row& static_row() { + return _static_row; + } + + row& clustered_row(const clustering_key& key) { + return _rows[key].cells; + } + + row& clustered_row(clustering_key&& key) { + return _rows[std::move(key)].cells; + } + + row* find_row(const clustering_key& key) { + auto i = _rows.find(key); + if (i == _rows.end()) { + return nullptr; + } + return &i->second.cells; + } +}; + +class mutation final { +public: + schema_ptr schema; + partition_key key; + mutation_partition p; +public: + mutation(partition_key key_, schema_ptr schema_) + : schema(std::move(schema_)) + , key(std::move(key_)) + , p(schema) + { } + + mutation(mutation&&) = default; + mutation(const mutation&) = delete; + + void set_static_cell(const column_definition& def, boost::any value) { + p.static_row()[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); + row[def.id] = std::move(value); + } + + void set_clustered_cell(const clustering_key& key, const column_definition& def, boost::any value) { + auto& row = p.clustered_row(key); + row[def.id] = std::move(value); + } +}; struct column_family { column_family(schema_ptr schema); diff --git a/db/api.hh b/db/api.hh deleted file mode 100644 index 4da7ebabf5..0000000000 --- a/db/api.hh +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2015 Cloudius Systems - */ - -#pragma once - -#include -#include - -#include "schema.hh" -#include "db_clock.hh" -#include "gc_clock.hh" - -using partition_key_type = tuple_type<>; -using clustering_key_type = tuple_type<>; -using clustering_prefix_type = tuple_prefix; -using partition_key = bytes; -using clustering_key = bytes; -using clustering_prefix = clustering_prefix_type::value_type; - -namespace api { - -using timestamp_type = int64_t; -timestamp_type constexpr missing_timestamp = std::numeric_limits::min(); -timestamp_type constexpr min_timestamp = std::numeric_limits::min() + 1; -timestamp_type constexpr max_timestamp = std::numeric_limits::max(); - -} - -/** - * Represents deletion operation. Can be commuted with other tombstones via apply() method. - * Can be empty. - * - */ -struct tombstone final { - api::timestamp_type timestamp; - gc_clock::time_point ttl; - - tombstone(api::timestamp_type timestamp, gc_clock::time_point ttl) - : timestamp(timestamp) - , ttl(ttl) - { } - - tombstone() - : tombstone(api::missing_timestamp, {}) - { } - - bool operator<(const tombstone& t) const { - return timestamp < t.timestamp || ttl < t.ttl; - } - - bool operator==(const tombstone& t) const { - return timestamp == t.timestamp && ttl == t.ttl; - } - - operator bool() const { - return timestamp != api::missing_timestamp; - } - - void apply(const tombstone& t) { - if (*this < t) { - *this = t; - } - } -}; - -using ttl_opt = std::experimental::optional; - -struct atomic_cell final { - struct dead { - gc_clock::time_point ttl; - }; - struct live { - ttl_opt ttl; - bytes value; - }; - api::timestamp_type timestamp; - boost::variant value; - bool is_live() const { return value.which() == 1; } - // Call only when is_live() == true - const live& as_live() const { return boost::get(value); } - // Call only when is_live() == false - const dead& as_dead() const { return boost::get(value); } -}; - -using row = std::map; - -struct deletable_row final { - tombstone t; - row cells; -}; - -using row_tombstone_set = std::map; - -class mutation_partition final { -private: - tombstone _tombstone; - row _static_row; - std::map _rows; - row_tombstone_set _row_tombstones; -public: - mutation_partition(schema_ptr s) - : _rows(key_compare(s->clustering_key_type)) - , _row_tombstones(serialized_compare(s->clustering_key_prefix_type)) - { } - - void apply(tombstone t) { - _tombstone.apply(t); - } - - void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) { - if (prefix.empty()) { - apply(t); - } else if (prefix.size() == schema->clustering_key.size()) { - _rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t); - } else { - apply_row_tombstone(schema, serialize_value(*schema->clustering_key_prefix_type, prefix), t); - } - } - - void apply_row_tombstone(schema_ptr schema, const bytes& prefix, const tombstone& t) { - auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { - _row_tombstones.insert(i, {prefix, t}); - } - } - - void apply(schema_ptr schema, const mutation_partition& p); - - row& static_row() { - return _static_row; - } - - row& clustered_row(const clustering_key& key) { - return _rows[key].cells; - } - - row& clustered_row(clustering_key&& key) { - return _rows[std::move(key)].cells; - } - - row* find_row(const clustering_key& key) { - auto i = _rows.find(key); - if (i == _rows.end()) { - return nullptr; - } - return &i->second.cells; - } -}; - -class mutation final { -public: - schema_ptr schema; - partition_key key; - mutation_partition p; -public: - mutation(partition_key key_, schema_ptr schema_) - : schema(std::move(schema_)) - , key(std::move(key_)) - , p(schema) - { } - - mutation(mutation&&) = default; - mutation(const mutation&) = delete; - - void set_static_cell(const column_definition& def, boost::any value) { - p.static_row()[def.id] = std::move(value); - } - - void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, boost::any value) { - auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); - row[def.id] = std::move(value); - } - - void set_clustered_cell(const clustering_key& key, const column_definition& def, boost::any value) { - auto& row = p.clustered_row(key); - row[def.id] = std::move(value); - } -}; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b56ad6f105..5b620c5b2d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -24,7 +24,7 @@ #pragma once -#include "db/api.hh" +#include "database.hh" namespace service { diff --git a/validation.hh b/validation.hh index 078f0b0d56..f19e7723f0 100644 --- a/validation.hh +++ b/validation.hh @@ -25,7 +25,7 @@ #pragma once #include "database.hh" -#include "db/api.hh" +#include "database.hh" namespace validation { From 92c95c6c816419e633fe5b24ba9c2882c34386b2 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 11:59:29 +0100 Subject: [PATCH 21/36] types: Add tuple_type::decompose_value() shorthand method --- tuple.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tuple.hh b/tuple.hh index 2ebbe9c39c..79f7e4fa86 100644 --- a/tuple.hh +++ b/tuple.hh @@ -50,6 +50,9 @@ public: } } } + bytes decompose_value(const value_type& values) { + return ::serialize_value(*this, values); + } value_type deserialize_value(std::istream& in) { std::vector result; result.reserve(types.size()); From f1c9f64a25c0e66b90c358537b78d841b737a575 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 12:23:59 +0100 Subject: [PATCH 22/36] types: Implement string conversions for int32_type and string_type --- tuple.hh | 6 +++++ types.cc | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ types.hh | 8 ++---- 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/tuple.hh b/tuple.hh index 79f7e4fa86..6377062853 100644 --- a/tuple.hh +++ b/tuple.hh @@ -153,6 +153,12 @@ public: // TODO: make the length byte-order comparable by adding numeric_limits::min() when serializing return false; } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; using tuple_prefix = tuple_type; diff --git a/types.cc b/types.cc index 53f77e07ff..82ad5fec09 100644 --- a/types.cc +++ b/types.cc @@ -2,6 +2,7 @@ * Copyright (C) 2015 Cloudius Systems, Ltd. */ +#include #include "types.hh" template @@ -54,6 +55,33 @@ struct int32_type_impl : simple_type_impl { auto v = int32_t(net::ntoh(u)); return boost::any(v); } + int32_t compose_value(const bytes& b) { + if (b.size() != 4) { + throw marshal_exception(); + } + return (int32_t)net::ntoh(*reinterpret_cast(b.begin())); + } + bytes decompose_value(int32_t v) { + bytes b(bytes::initialized_later(), sizeof(v)); + *reinterpret_cast(b.begin()) = (int32_t)net::hton((uint32_t)v); + return b; + } + int32_t parse_int(const sstring& s) { + try { + return boost::lexical_cast(s); + } catch (const boost::bad_lexical_cast& e) { + throw marshal_exception(sprint("Invalid number format '%s'", s)); + } + } + virtual bytes from_string(const sstring& s) override { + return decompose_value(parse_int(s)); + } + virtual sstring to_string(const bytes& b) override { + if (b.empty()) { + return {}; + } + return to_sstring(compose_value(b)); + } }; struct long_type_impl : simple_type_impl { @@ -75,6 +103,12 @@ struct long_type_impl : simple_type_impl { auto v = int64_t(net::ntoh(u)); return boost::any(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct string_type_impl : public abstract_type { @@ -101,6 +135,12 @@ struct string_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + return to_bytes(s); + } + virtual sstring to_string(const bytes& b) override { + return sstring(b); + } }; struct bytes_type_impl : public abstract_type { @@ -126,6 +166,12 @@ struct bytes_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct boolean_type_impl : public simple_type_impl { @@ -143,6 +189,12 @@ struct boolean_type_impl : public simple_type_impl { } return boost::any(tmp != 0); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct date_type_impl : public abstract_type { @@ -174,6 +226,12 @@ struct date_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct timeuuid_type_impl : public abstract_type { @@ -214,6 +272,12 @@ struct timeuuid_type_impl : public abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } private: static int compare_bytes(const bytes& o1, const bytes& o2) { auto compare_pos = [&] (unsigned pos, int mask, int ifequal) { @@ -251,6 +315,12 @@ struct timestamp_type_impl : simple_type_impl { return boost::any(db_clock::time_point(db_clock::duration(net::ntoh(v)))); } // FIXME: isCompatibleWith(timestampuuid) + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; struct uuid_type_impl : abstract_type { @@ -302,6 +372,12 @@ struct uuid_type_impl : abstract_type { virtual size_t hash(const bytes& v) override { return std::hash()(v); } + virtual bytes from_string(const sstring& s) override { + throw std::runtime_error("not implemented"); + } + virtual sstring to_string(const bytes& b) override { + throw std::runtime_error("not implemented"); + } }; thread_local shared_ptr int32_type(make_shared()); diff --git a/types.hh b/types.hh index c7297abb83..3a11bb3d2c 100644 --- a/types.hh +++ b/types.hh @@ -112,12 +112,8 @@ public: validate(b); return to_string(b); } - virtual sstring to_string(const bytes& b) { - throw std::runtime_error("not implemented"); - } - virtual bytes from_string(const sstring& text) { - throw std::runtime_error("not implemented"); - } + virtual sstring to_string(const bytes& b) = 0; + virtual bytes from_string(const sstring& text) = 0; virtual bool is_counter() { return false; } virtual bool is_collection() { return false; } virtual bool is_multi_cell() { return false; } From ee10d9b437766a3e19f3542a2934e662ee1da26e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 12:15:51 +0100 Subject: [PATCH 23/36] tests: Add test for int32_type string conversions --- configure.py | 25 +++++++++++++------- test.py | 1 + tests/urchin/types_test.cc | 48 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 tests/urchin/types_test.cc diff --git a/configure.py b/configure.py index e6873d1fd3..f6e1ffd749 100755 --- a/configure.py +++ b/configure.py @@ -99,6 +99,10 @@ modes = { }, } +urchin_tests = [ + 'tests/urchin/types_test', +] + tests = [ 'tests/test-reactor', 'tests/fileiotest', @@ -121,7 +125,7 @@ tests = [ 'tests/output_stream_test', 'tests/udp_zero_copy', 'tests/test-serialization' - ] + ] + urchin_tests apps = [ 'apps/httpd/httpd', @@ -221,9 +225,7 @@ memcache = [ cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'Cassandra') -deps = { - 'seastar': (['main.cc', - 'database.cc', +urchin_core = (['database.cc', 'log.cc', 'cql/binary.rl', 'cql/server.cc', @@ -245,17 +247,19 @@ deps = { 'cql3/operator.cc', 'cql3/relation.cc', 'cql3/column_identifier.cc', - 'db/db.cc', - 'io/io.cc', + 'db/db.cc', + 'io/io.cc', 'utils/utils.cc', 'utils/UUID_gen.cc', - 'gms/version_generator.cc', + 'gms/version_generator.cc', 'dht/dht.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] - + core - ), + + core) + +deps = { + 'seastar': ['main.cc'] + urchin_core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, 'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache, @@ -282,6 +286,9 @@ deps = { 'tests/test-serialization': ['tests/test-serialization.cc'], } +for t in urchin_tests: + deps[t] = urchin_core + [t + '.cc'] + warnings = [ '-Wno-mismatched-tags', # clang-only ] diff --git a/test.py b/test.py index 118793504a..ee6201c7d6 100755 --- a/test.py +++ b/test.py @@ -10,6 +10,7 @@ all_tests = [ 'memcached/test_ascii_parser', 'sstring_test', 'output_stream_test', + 'urchin/types_test', ] last_len = 0 diff --git a/tests/urchin/types_test.cc b/tests/urchin/types_test.cc new file mode 100644 index 0000000000..fa94ddce3e --- /dev/null +++ b/tests/urchin/types_test.cc @@ -0,0 +1,48 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "types.hh" + +BOOST_AUTO_TEST_CASE(test_int32_type_string_conversions) { + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("1234567890"), int32_type->decompose(1234567890))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(1234567890)), "1234567890"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("12"), int32_type->decompose(12))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0012"), int32_type->decompose(12))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+12"), int32_type->decompose(12))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(12)), "12"); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-12"), int32_type->decompose(-12))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(-12)), "-12"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0"), int32_type->decompose(0))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-0"), int32_type->decompose(0))); + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+0"), int32_type->decompose(0))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(0)), "0"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-2147483648"), int32_type->decompose((int32_t)-2147483648))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483648)), "-2147483648"); + + BOOST_REQUIRE(int32_type->equal(int32_type->from_string("2147483647"), int32_type->decompose((int32_t)2147483647))); + BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483647)), "-2147483647"); + + auto test_parsing_fails = [] (sstring text) { + try { + int32_type->from_string(text); + BOOST_FAIL(sprint("Parsing of '%s' should have failed", text)); + } catch (const marshal_exception& e) { + // expected + } + }; + + test_parsing_fails("asd"); + test_parsing_fails("-2147483649"); + test_parsing_fails("2147483648"); + test_parsing_fails("2147483648123"); + + BOOST_REQUIRE_EQUAL(int32_type->to_string(bytes()), ""); +} From 28915bf67385cabe2df3a6e8dd1c8aeea11527dd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 12:16:36 +0100 Subject: [PATCH 24/36] tests: Add test for applying mutations --- configure.py | 1 + test.py | 1 + tests/urchin/mutation_test.cc | 41 +++++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+) create mode 100644 tests/urchin/mutation_test.cc diff --git a/configure.py b/configure.py index f6e1ffd749..f3b1f8cbfa 100755 --- a/configure.py +++ b/configure.py @@ -100,6 +100,7 @@ modes = { } urchin_tests = [ + 'tests/urchin/mutation_test', 'tests/urchin/types_test', ] diff --git a/test.py b/test.py index ee6201c7d6..7cdd4f93fc 100755 --- a/test.py +++ b/test.py @@ -11,6 +11,7 @@ all_tests = [ 'sstring_test', 'output_stream_test', 'urchin/types_test', + 'urchin/mutation_test', ] last_len = 0 diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc new file mode 100644 index 0000000000..6d4e84b50d --- /dev/null +++ b/tests/urchin/mutation_test.cc @@ -0,0 +1,41 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "core/sstring.hh" +#include "database.hh" + +static sstring some_keyspace("ks"); +static sstring some_column_family("cf"); + +static boost::any make_atomic_cell(bytes value) { + return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}}; +}; + +BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { + auto s = make_lw_shared(some_keyspace, some_column_family, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + column_definition& r1_col = *s->get_column_definition("r1"); + partition_key key = to_bytes("key1"); + clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3))); + cf.apply(m); + + row& row = cf.find_or_create_row(key, c_key); + auto& cell = boost::any_cast(row[r1_col.id]); + BOOST_REQUIRE(cell.is_live()); + BOOST_REQUIRE(int32_type->equal(cell.as_live().value, int32_type->decompose(3))); +} From 9530a372ccaf21345bbf47a4be6dfd22275d07b6 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 12:56:56 +0100 Subject: [PATCH 25/36] cql3: Take reference to storage_proxy and call instance methods --- cql3/cql_statement.hh | 5 +++-- cql3/statements/modification_statement.cc | 14 +++++++------- cql3/statements/modification_statement.hh | 8 ++++---- cql3/statements/schema_altering_statement.hh | 4 ++-- cql3/statements/truncate_statement.hh | 4 ++-- cql3/statements/use_statement.hh | 4 ++-- service/storage_proxy.hh | 9 +++++---- 7 files changed, 25 insertions(+), 23 deletions(-) diff --git a/cql3/cql_statement.hh b/cql3/cql_statement.hh index 8ddde8477e..1c6de1a138 100644 --- a/cql3/cql_statement.hh +++ b/cql3/cql_statement.hh @@ -28,6 +28,7 @@ #include "transport/messages/result_message.hh" #include "service/client_state.hh" #include "service/query_state.hh" +#include "service/storage_proxy.hh" #include "cql3/query_options.hh" #include "database.hh" @@ -62,7 +63,7 @@ public: * @param options options for this query (consistency, variables, pageSize, ...) */ virtual future> - execute(service::query_state& state, const query_options& options) = 0; + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) = 0; /** * Variant of execute used for internal query against the system tables, and thus only query the local node = 0. @@ -70,7 +71,7 @@ public: * @param state the current query state */ virtual future> - execute_internal(service::query_state& state, const query_options& options) = 0; + execute_internal(database& db, service::query_state& state, const query_options& options) = 0; virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const = 0; }; diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index 608c96ce81..da880e7ae7 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -278,23 +278,23 @@ modification_statement::build_partition_keys(const query_options& options) { } future> -modification_statement::execute(service::query_state& qs, const query_options& options) { +modification_statement::execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { if (has_conditions() && options.get_protocol_version() == 1) { throw new exceptions::invalid_request_exception("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2."); } if (has_conditions()) { - return execute_with_condition(qs, options); + return execute_with_condition(proxy, qs, options); } - return execute_without_condition(qs, options).then([] { + return execute_without_condition(proxy, qs, options).then([] { return make_ready_future>( std::experimental::optional{}); }); } future<> -modification_statement::execute_without_condition(service::query_state& qs, const query_options& options) { +modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { auto cl = options.get_consistency(); if (is_counter()) { db::validate_counter_for_write(s, cl); @@ -302,16 +302,16 @@ modification_statement::execute_without_condition(service::query_state& qs, cons db::validate_for_write(s->ks_name, cl); } - return get_mutations(options, false, options.get_timestamp(qs)).then([cl] (auto mutations) { + return get_mutations(options, false, options.get_timestamp(qs)).then([cl, &proxy] (auto mutations) { if (mutations.empty()) { return now(); } - return service::storage_proxy::mutate_with_triggers(std::move(mutations), cl, false); + return proxy.mutate_with_triggers(std::move(mutations), cl, false); }); } future> -modification_statement::execute_with_condition(service::query_state& qs, const query_options& options) { +modification_statement::execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) { unimplemented::lwt(); #if 0 List keys = buildPartitionKeyNames(options); diff --git a/cql3/statements/modification_statement.hh b/cql3/statements/modification_statement.hh index e9f65ace5c..dc01ea8be9 100644 --- a/cql3/statements/modification_statement.hh +++ b/cql3/statements/modification_statement.hh @@ -296,19 +296,19 @@ public: } virtual future> - execute(service::query_state& qs, const query_options& options) override; + execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) override; virtual future> - execute_internal(service::query_state& qs, const query_options& options) override { + execute_internal(database& db, service::query_state& qs, const query_options& options) override { throw std::runtime_error("not implemented"); } private: future<> - execute_without_condition(service::query_state& qs, const query_options& options); + execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options); future> - execute_with_condition(service::query_state& qs, const query_options& options); + execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options); #if 0 public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh index b3a2ea5c3d..6e5bcbdb5a 100644 --- a/cql3/statements/schema_altering_statement.hh +++ b/cql3/statements/schema_altering_statement.hh @@ -82,7 +82,7 @@ protected: #endif virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing @@ -97,7 +97,7 @@ protected: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { throw std::runtime_error("unsupported operation"); #if 0 try diff --git a/cql3/statements/truncate_statement.hh b/cql3/statements/truncate_statement.hh index b0130929d9..f376e281b1 100644 --- a/cql3/statements/truncate_statement.hh +++ b/cql3/statements/truncate_statement.hh @@ -63,7 +63,7 @@ public: } virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 try @@ -87,7 +87,7 @@ public: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { throw std::runtime_error("unsupported operation"); } }; diff --git a/cql3/statements/use_statement.hh b/cql3/statements/use_statement.hh index ae44205ccd..c641fabcdb 100644 --- a/cql3/statements/use_statement.hh +++ b/cql3/statements/use_statement.hh @@ -61,7 +61,7 @@ public: } virtual future> - execute(service::query_state& state, const query_options& options) override { + execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override { throw std::runtime_error("not implemented"); #if 0 state.getClientState().setKeyspace(keyspace); @@ -70,7 +70,7 @@ public: } virtual future> - execute_internal(service::query_state& state, const query_options& options) override { + execute_internal(database& db, service::query_state& state, const query_options& options) override { // Internal queries are exclusively on the system keyspace and 'use' is thus useless throw std::runtime_error("unsupported operation"); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 5b620c5b2d..e334efa83a 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -25,6 +25,7 @@ #pragma once #include "database.hh" +#include "db/consistency_level.hh" namespace service { @@ -39,10 +40,10 @@ public: * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate(std::vector mutations, db::consistency_level cl); + future<> mutate(std::vector mutations, db::consistency_level cl); - static future<> mutate_with_triggers(std::vector mutations, - db::consistency_level cl, bool should_mutate_atomically); + future<> mutate_with_triggers(std::vector mutations, db::consistency_level cl, + bool should_mutate_atomically); /** * See mutate. Adds additional steps before and after writing a batch. @@ -53,7 +54,7 @@ public: * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - static future<> mutate_atomically(std::vector mutations, db::consistency_level cl); + future<> mutate_atomically(std::vector mutations, db::consistency_level cl); }; } From 2244eab6c12701646f103e6b356e407d0c405a97 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 16:40:25 +0100 Subject: [PATCH 26/36] db: Steal data from mutations when applying Taking mutations by r-value reference allows us to avoid copies. --- database.cc | 23 ++++++++++++----------- database.hh | 18 ++++++++++++------ tests/urchin/mutation_test.cc | 2 +- thrift/handler.cc | 2 +- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/database.cc b/database.cc index 5a75c423d2..d5049f97a0 100644 --- a/database.cc +++ b/database.cc @@ -246,9 +246,10 @@ database::find_keyspace(sstring name) { return nullptr; } -void column_family::apply(const mutation& m) { - mutation_partition& p = find_or_create_partition(m.key); - p.apply(_schema, m.p); +void +column_family::apply(mutation&& m) { + mutation_partition& p = find_or_create_partition(std::move(m.key)); + p.apply(_schema, std::move(m.p)); } // Based on org.apache.cassandra.db.AbstractCell#reconcile() @@ -287,39 +288,39 @@ compare_for_merge(const column_definition& def, } } -void mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { +void mutation_partition::apply(schema_ptr schema, mutation_partition&& p) { _tombstone.apply(p._tombstone); for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(schema, entry.first, entry.second); + apply_row_tombstone(schema, std::move(entry)); } - auto merge_cells = [this, schema] (row& old_row, const row& new_row) { + auto merge_cells = [this, schema] (row& old_row, row&& new_row) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); if (i == old_row.end()) { - _static_row.emplace_hint(i, new_column); + _static_row.emplace_hint(i, std::move(new_column)); } else { auto& old_column = *i; auto& def = schema->regular_column_at(col); if (compare_for_merge(def, old_column, new_column) < 0) { - old_column.second = new_column.second; + old_column.second = std::move(new_column.second); } } } }; - merge_cells(_static_row, p._static_row); + merge_cells(_static_row, std::move(p._static_row)); for (auto&& entry : p._rows) { auto& key = entry.first; auto i = _rows.find(key); if (i == _rows.end()) { - _rows.emplace_hint(i, entry); + _rows.emplace_hint(i, std::move(entry)); } else { i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, entry.second.cells); + merge_cells(i->second.cells, std::move(entry.second.cells)); } } } diff --git a/database.hh b/database.hh index a2fc65ffd0..e241d2ce00 100644 --- a/database.hh +++ b/database.hh @@ -134,18 +134,24 @@ public: } else if (prefix.size() == schema->clustering_key.size()) { _rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t); } else { - apply_row_tombstone(schema, serialize_value(*schema->clustering_key_prefix_type, prefix), t); + apply_row_tombstone(schema, {serialize_value(*schema->clustering_key_prefix_type, prefix), t}); } } - void apply_row_tombstone(schema_ptr schema, const bytes& prefix, const tombstone& t) { + void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) { + apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); + } + + void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { + auto& prefix = row_tombstone.first; auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) || t > i->second) { - _row_tombstones.insert(i, {prefix, t}); + if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) + || row_tombstone.second > i->second) { + _row_tombstones.insert(i, std::move(row_tombstone)); } } - void apply(schema_ptr schema, const mutation_partition& p); + void apply(schema_ptr schema, mutation_partition&& p); row& static_row() { return _static_row; @@ -207,7 +213,7 @@ struct column_family { schema_ptr _schema; // partition key -> partition std::map partitions; - void apply(const mutation& m); + void apply(mutation&& m); }; class keyspace { diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 6d4e84b50d..f792dfeb61 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -32,7 +32,7 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { mutation m(key, s); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3))); - cf.apply(m); + cf.apply(std::move(m)); row& row = cf.find_or_create_row(key, c_key); auto& cell = boost::any_cast(row[r1_col.id]); diff --git a/thrift/handler.cc b/thrift/handler.cc index 6085b992a5..2d650c54c5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -231,7 +231,7 @@ public: throw make_exception("Mutation must have either column or deletion"); } } - cf.apply(m_to_apply); + cf.apply(std::move(m_to_apply)); } } } catch (std::exception& ex) { From 1b66f334555e3116f04094346abc4f0a637aa069 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 16:42:34 +0100 Subject: [PATCH 27/36] db: Apply mutations locally from storage_proxy Eventually we should rather send them to replicas, but for now we just apply locally. --- database.cc | 15 ++++++++++++--- database.hh | 1 + service/storage_proxy.cc | 14 +++++++++++++- service/storage_proxy.hh | 4 ++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/database.cc b/database.cc index d5049f97a0..85185487d5 100644 --- a/database.cc +++ b/database.cc @@ -228,13 +228,22 @@ column_definition::name() const { return _name; } -schema_ptr -keyspace::find_schema(sstring cf_name) { +column_family* +keyspace::find_column_family(sstring cf_name) { auto i = column_families.find(cf_name); if (i == column_families.end()) { + return nullptr; + } + return &i->second; +} + +schema_ptr +keyspace::find_schema(sstring cf_name) { + auto cf = find_column_family(cf_name); + if (!cf) { return {}; } - return i->second._schema; + return cf->_schema; } keyspace* diff --git a/database.hh b/database.hh index e241d2ce00..fe77136dc1 100644 --- a/database.hh +++ b/database.hh @@ -221,6 +221,7 @@ public: std::unordered_map column_families; static future populate(sstring datadir); schema_ptr find_schema(sstring cf_name); + column_family* find_column_family(sstring cf_name); }; class database { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f2f8bc1ae0..55c442e915 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -476,7 +476,19 @@ namespace service { */ future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { - throw std::runtime_error("NOT IMPLEMENTED"); + // FIXME: send it to replicas instead of applying locally + for (auto&& m : mutations) { + // FIXME: lookup column_family by UUID + keyspace* ks = _db.find_keyspace(m.schema->ks_name); + assert(ks); // FIXME: load keyspace meta-data from storage + column_family* cf = ks->find_column_family(m.schema->cf_name); + if (cf) { + cf->apply(std::move(m)); + } else { + // TODO: log a warning + } + } + return make_ready_future<>(); #if 0 Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index e334efa83a..5b9f2e3eb2 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -30,7 +30,11 @@ namespace service { class storage_proxy /*implements StorageProxyMBean*/ { +private: + database& _db; public: + storage_proxy(database& db) : _db(db) {} + /** * Use this method to have these Mutations applied * across all replicas. This method will take care From 543cc40d1a3b39421258d511e93d5171539bc560 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 6 Feb 2015 20:44:04 +0100 Subject: [PATCH 28/36] tests: Add performance test for in-memory row mutation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It shows that our serialization code, which is using ostringstream under the hood, is really dragging us down. In the perf profile we can see dynamic casts high called from iostream classes, called from abstract_type::decompose() $ taskset -c 0 build/release/tests/perf/perf_mutation Timing mutation of single column within once row 83536.54 tps 115157.06 tps 85059.52 tps 60443.80 tps 112878.47 tps $ perf report - 11,66% perf_mutation libstdc++.so.6.0.20 [.] __cxxabiv1::__si_class_type_info::__do_dyncast(long, __cxxabiv1::__class_type_info::__sub_kind, __cxxabiv1:▒ + __cxxabiv1::__si_class_type_info::__do_dyncast(long, __cxxabiv1::__class_type_info::__sub_kind, __cxxabiv1::__class_type_info const*, void const*, __cxxabiv1::__c◆ - __dynamic_cast ▒ - 43,58% std::num_get > > const& std::use_facet >::_M_cache_locale(std::locale const&) ▒ - std::basic_ios >::init(std::basic_streambuf >*) ▒ 92,70% std::basic_istringstream, std::allocator >::basic_istringstream(std::string const&, std::_Ios_Openmode) ▒ + 7,30% abstract_type::decompose(boost::any const&) ▒ - 27,36% bool std::has_facet > > >(std::locale const&) ▒ std::basic_ios >::_M_cache_locale(std::locale const&) ▒ - std::basic_ios >::init(std::basic_streambuf >*) ▒ 93,14% std::basic_istringstream, std::allocator >::basic_istringstream(std::string const&, std::_Ios_Openmode) ▒ + 6,86% abstract_type::decompose(boost::any const&) ▒ + 14,54% bool std::has_facet > > >(std::locale const&) ▒ + 14,52% std::num_put > > const& std::use_facet > > const& std::use_facet > > const& std::use_facet > > >(std::locale const&) ▒ + 15,16% bool std::has_facet > > >(std::locale const&) ▒ + 15,04% bool std::has_facet >(std::locale const&) ▒ + 14,74% std::ctype const& std::use_facet >(std::locale const&) ▒ - 7,98% perf_mutation libstdc++.so.6.0.20 [.] __cxxabiv1::__vmi_class_type_info::__do_dyncast(long, __cxxabiv1::__class_type_info::__sub_kind, __cxxabiv1▒ - __cxxabiv1::__vmi_class_type_info::__do_dyncast(long, __cxxabiv1::__class_type_info::__sub_kind, __cxxabiv1::__class_type_info const*, void const*, __cxxabiv1::__▒ - __dynamic_cast ▒ - 77,13% std::ctype const& std::use_facet >(std::locale const&) ▒ std::basic_ios >::_M_cache_locale(std::locale const&) ▒ + std::basic_ios >::init(std::basic_streambuf >*) ▒ + 22,87% bool std::has_facet >(std::locale const&) ▒ + 6,45% perf_mutation libstdc++.so.6.0.20 [.] std::locale::locale() ▒ + 6,40% perf_mutation libstdc++.so.6.0.20 [.] std::locale::~locale() ▒ + 5,02% perf_mutation libstdc++.so.6.0.20 [.] std::locale::operator=(std::locale const&) ▒ + 4,18% perf_mutation libc-2.19.so [.] __GI___strcmp_ssse3 ▒ --- configure.py | 1 + tests/perf/perf.hh | 30 ++++++++++++++++++++++++++++++ tests/perf/perf_mutation.cc | 30 ++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) create mode 100644 tests/perf/perf.hh create mode 100644 tests/perf/perf_mutation.cc diff --git a/configure.py b/configure.py index f3b1f8cbfa..3b1eba9300 100755 --- a/configure.py +++ b/configure.py @@ -102,6 +102,7 @@ modes = { urchin_tests = [ 'tests/urchin/mutation_test', 'tests/urchin/types_test', + 'tests/perf/perf_mutation', ] tests = [ diff --git a/tests/perf/perf.hh b/tests/perf/perf.hh new file mode 100644 index 0000000000..ecb57a31c5 --- /dev/null +++ b/tests/perf/perf.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include +#include + +template +void time_it(Func func, int iterations = 5) { + using clk = std::chrono::high_resolution_clock; + + for (int i = 0; i < iterations; i++) { + auto start = clk::now(); + auto end_at = start + std::chrono::seconds(1); + uint64_t count = 0; + + while (clk::now() < end_at) { + for (int i = 0; i < 10000; i++) { // amortize clock reading cost + func(); + count++; + } + } + + auto end = clk::now(); + auto duration = std::chrono::duration(end - start).count(); + std::cout << sprint("%.2f", (double)count / duration) << " tps\n"; + } +} diff --git a/tests/perf/perf_mutation.cc b/tests/perf/perf_mutation.cc new file mode 100644 index 0000000000..af59a3434a --- /dev/null +++ b/tests/perf/perf_mutation.cc @@ -0,0 +1,30 @@ +#include "database.hh" +#include "perf.hh" + +static boost::any make_atomic_cell(bytes value) { + return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}}; +}; + +int main(int argc, char* argv[]) { + auto s = make_lw_shared("ks", "cf", + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + std::cout << "Timing mutation of single column within one row...\n"; + + partition_key key = to_bytes("key1"); + clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)}); + bytes value = int32_type->decompose(3); + + time_it([&] { + mutation m(key, s); + column_definition& col = *s->get_column_definition("r1"); + m.set_clustered_cell(c_key, col, make_atomic_cell(value)); + cf.apply(std::move(m)); + }); +} From 0bde5f74e77e983c92f7abd17f50cdf400bcecb2 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:04:07 +0100 Subject: [PATCH 29/36] db: Introduce tombstone::compare() and express operators using it --- database.hh | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/database.hh b/database.hh index fe77136dc1..5b1f7303e1 100644 --- a/database.hh +++ b/database.hh @@ -65,12 +65,26 @@ struct tombstone final { : tombstone(api::missing_timestamp, {}) { } + int compare(const tombstone& t) const { + if (timestamp < t.timestamp) { + return -1; + } else if (timestamp > t.timestamp) { + return 1; + } else if (ttl < t.ttl) { + return -1; + } else if (ttl > t.ttl) { + return 1; + } else { + return 0; + } + } + bool operator<(const tombstone& t) const { - return timestamp < t.timestamp || ttl < t.ttl; + return compare(t) < 0; } bool operator==(const tombstone& t) const { - return timestamp == t.timestamp && ttl == t.ttl; + return compare(t) == 0; } operator bool() const { From 37599d5d10f66c71874e4490034eda9f99dbc8dd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:05:19 +0100 Subject: [PATCH 30/36] db: Define the rest of comparison operators in tombstone --- database.hh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/database.hh b/database.hh index 5b1f7303e1..8d41e495aa 100644 --- a/database.hh +++ b/database.hh @@ -83,10 +83,26 @@ struct tombstone final { return compare(t) < 0; } + bool operator<=(const tombstone& t) const { + return compare(t) <= 0; + } + + bool operator>(const tombstone& t) const { + return compare(t) > 0; + } + + bool operator>=(const tombstone& t) const { + return compare(t) >= 0; + } + bool operator==(const tombstone& t) const { return compare(t) == 0; } + bool operator!=(const tombstone& t) const { + return compare(t) != 0; + } + operator bool() const { return timestamp != api::missing_timestamp; } From 7b7f63645bdbfd03dc039a75cd64a6d8ce8b29a8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:04:58 +0100 Subject: [PATCH 31/36] db: Make tombstone::operator bool() explicit --- database.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database.hh b/database.hh index 8d41e495aa..bd71d4d7f5 100644 --- a/database.hh +++ b/database.hh @@ -103,7 +103,7 @@ struct tombstone final { return compare(t) != 0; } - operator bool() const { + explicit operator bool() const { return timestamp != api::missing_timestamp; } From 0a5bf555ea05d2cdc02a7e1815025b9c58a88585 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:07:12 +0100 Subject: [PATCH 32/36] types: Introduce tuple_type::is_prefix_of() --- database.hh | 4 ++++ tuple.hh | 46 ++++++++++++++++++++++++++++++++++++++++++++++ types.hh | 2 +- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/database.hh b/database.hh index bd71d4d7f5..1093c372d6 100644 --- a/database.hh +++ b/database.hh @@ -112,6 +112,10 @@ struct tombstone final { *this = t; } } + + friend std::ostream& operator<<(std::ostream& out, const tombstone& t) { + return out << "{timestamp=" << t.timestamp << ", ttl=" << t.ttl.time_since_epoch().count() << "}"; + } }; using ttl_opt = std::experimental::optional; diff --git a/tuple.hh b/tuple.hh index 6377062853..5eecd238c1 100644 --- a/tuple.hh +++ b/tuple.hh @@ -159,6 +159,52 @@ public: virtual sstring to_string(const bytes& b) override { throw std::runtime_error("not implemented"); } + /** + * Returns true iff all components of 'prefix' are equal to corresponding + * leading components of 'value'. + * + * The 'value' is assumed to be serialized using tuple_type + */ + bool is_prefix_of(const bytes& prefix, const bytes& value) const { + assert(AllowPrefixes); + + if (prefix.size() > value.size()) { + return false; + } + + bytes_view i1(prefix); + bytes_view i2(value); + + for (auto&& type : types) { + if (i1.empty()) { + return true; + } + if (i2.empty()) { + return false; + } + assert(i1.size() >= sizeof(int32_t)); + assert(i2.size() >= sizeof(int32_t)); + auto len1 = (int32_t) net::ntoh(*reinterpret_cast(i1.begin())); + auto len2 = (int32_t) net::ntoh(*reinterpret_cast(i2.begin())); + i1.remove_prefix(sizeof(int32_t)); + i2.remove_prefix(sizeof(int32_t)); + if ((len1 < 0) != (len2 < 0)) { + // one is empty and another one is not + return false; + } + if (len1 >= 0) { + // both are not empty + // TODO: make equal() accept bytes_view + if (!type->equal(bytes(i1.begin(), i1.begin() + len1), + bytes(i2.begin(), i2.begin() + len2))) { + return false; + } + i1.remove_prefix((uint32_t) len1); + i2.remove_prefix((uint32_t) len2); + } + } + return true; + } }; using tuple_prefix = tuple_type; diff --git a/types.hh b/types.hh index 3a11bb3d2c..839b2f0233 100644 --- a/types.hh +++ b/types.hh @@ -18,7 +18,7 @@ // FIXME: should be int8_t using bytes = basic_sstring; - +using bytes_view = std::experimental::string_view; using bytes_opt = std::experimental::optional; sstring to_hex(const bytes& b); From 138ed6faac5e7f35bb8654dff5f3c7aefadeb01e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:39:41 +0100 Subject: [PATCH 33/36] tests: Add test for tuple_type::is_prefix_of() --- tests/urchin/types_test.cc | 25 +++++++++++++++++++++++++ tuple.hh | 9 +++++++++ 2 files changed, 34 insertions(+) diff --git a/tests/urchin/types_test.cc b/tests/urchin/types_test.cc index fa94ddce3e..de0c63f398 100644 --- a/tests/urchin/types_test.cc +++ b/tests/urchin/types_test.cc @@ -7,6 +7,7 @@ #include #include "types.hh" +#include "tuple.hh" BOOST_AUTO_TEST_CASE(test_int32_type_string_conversions) { BOOST_REQUIRE(int32_type->equal(int32_type->from_string("1234567890"), int32_type->decompose(1234567890))); @@ -46,3 +47,27 @@ BOOST_AUTO_TEST_CASE(test_int32_type_string_conversions) { BOOST_REQUIRE_EQUAL(int32_type->to_string(bytes()), ""); } + +BOOST_AUTO_TEST_CASE(test_tuple_is_prefix_of) { + tuple_type<> type({utf8_type, utf8_type, utf8_type}); + auto prefix_type = type.as_prefix(); + + auto val = type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}}); + + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}}), val)); + + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes()}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("b")}, {bytes("c")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("c")}, {bytes("b")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("abc")}}), val)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("ab")}}), val)); + + auto val2 = type.serialize_value({{bytes("a")}, {bytes("b")}, {}}); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val2)); + BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {}}), val2)); + BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes()}}), val2)); +} diff --git a/tuple.hh b/tuple.hh index 5eecd238c1..33e4c7444f 100644 --- a/tuple.hh +++ b/tuple.hh @@ -18,6 +18,7 @@ private: const std::vector> types; const bool _byte_order_equal; public: + using prefix_type = tuple_type; using value_type = std::vector; tuple_type(std::vector> types) @@ -27,6 +28,11 @@ public: return t->is_byte_order_equal(); })) { } + + prefix_type as_prefix() { + return prefix_type(types); + } + /* * Format: * ... @@ -50,6 +56,9 @@ public: } } } + bytes serialize_value(const value_type& values) { + return ::serialize_value(*this, values); + } bytes decompose_value(const value_type& values) { return ::serialize_value(*this, values); } From a6f4f2d6aa3b76cbe61bb63a8a4f00971db8e7ae Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:10:35 +0100 Subject: [PATCH 34/36] db: Add mutation_partition::tombstone_for_row() --- database.hh | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/database.hh b/database.hh index 1093c372d6..b0edcfeac6 100644 --- a/database.hh +++ b/database.hh @@ -206,6 +206,27 @@ public: } return &i->second.cells; } + + /** + * Returns the base tombstone for all cells of given clustering row. Such tombstone + * holds all information necessary to decide whether cells in a row are deleted or not, + * in addition to any information inside individual cells. + */ + tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key) { + tombstone t = _tombstone; + + auto i = _row_tombstones.lower_bound(key); + if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) { + t.apply(i->second); + } + + auto j = _rows.find(key); + if (j != _rows.end()) { + t.apply(j->second.t); + } + + return t; + } }; class mutation final { From 7ea7a6822b2b5953315cf280af7a1a9b36f1333a Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:12:09 +0100 Subject: [PATCH 35/36] db: Fix mutation_partition::apply_row_tombstone() It was not updating already inserted tombstones. --- database.hh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/database.hh b/database.hh index b0edcfeac6..43e2694a7f 100644 --- a/database.hh +++ b/database.hh @@ -179,9 +179,10 @@ public: void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone) { auto& prefix = row_tombstone.first; auto i = _row_tombstones.lower_bound(prefix); - if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first) - || row_tombstone.second > i->second) { - _row_tombstones.insert(i, std::move(row_tombstone)); + if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) { + _row_tombstones.emplace_hint(i, std::move(row_tombstone)); + } else if (row_tombstone.second > i->second) { + i->second = row_tombstone.second; } } From fb6941bbee3b23a478b440e963d8cf4b4abda293 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 7 Feb 2015 22:13:30 +0100 Subject: [PATCH 36/36] tests: Add test for row tombstones --- database.hh | 4 ++++ tests/urchin/mutation_test.cc | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/database.hh b/database.hh index 43e2694a7f..197c52a19d 100644 --- a/database.hh +++ b/database.hh @@ -188,6 +188,10 @@ public: void apply(schema_ptr schema, mutation_partition&& p); + const row_tombstone_set& row_tombstones() { + return _row_tombstones; + } + row& static_row() { return _static_row; } diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index f792dfeb61..417397ffe4 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -39,3 +39,37 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.as_live().value, int32_type->decompose(3))); } + +BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { + auto s = make_lw_shared(some_keyspace, some_column_family, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + + column_family cf(s); + + column_definition& r1_col = *s->get_column_definition("r1"); + partition_key key = to_bytes("key1"); + + clustering_key c_key1 = s->clustering_key_type->decompose_value( + {int32_type->decompose(1)} + ); + + clustering_key c_key2 = s->clustering_key_type->decompose_value( + {int32_type->decompose(2)} + ); + + auto ttl = gc_clock::now() + std::chrono::seconds(1); + + mutation m(key, s); + m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl)); + m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl)); + + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl)); + + m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); + BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); +}