diff --git a/atomic_cell.hh b/atomic_cell.hh index 9ee671007f..a04fed2edb 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -6,6 +6,7 @@ #include "bytes.hh" #include "timestamp.hh" +#include "gc_clock.hh" #include template @@ -122,6 +123,9 @@ public: ttl_opt ttl() const { return atomic_cell::ttl(_data); } + bytes_view serialize() const { + return _data; + } friend class atomic_cell::one; }; @@ -171,6 +175,22 @@ public: friend class atomic_cell_or_collection; }; +// Represents a mutation of a collection. Actual format is determined by collection type, +// and is: +// set: list of atomic_cell +// map: list of pair (for key/value) +// list: tbd, probably ugly +class collection_mutation { +public: + struct view { + bytes_view data; + }; + struct one { + bytes data; + operator view() const { return { data }; } + }; +}; + // A variant type that can hold either an atomic_cell, or a serialized collection. // Which type is stored is determinied by the schema. class atomic_cell_or_collection final { @@ -178,8 +198,21 @@ class atomic_cell_or_collection final { private: atomic_cell_or_collection(bytes&& data) : _data(std::move(data)) {} public: + atomic_cell_or_collection(atomic_cell::one ac) : _data(std::move(ac._data)) {} static atomic_cell_or_collection from_atomic_cell(atomic_cell::one data) { return { std::move(data._data) }; } atomic_cell::view as_atomic_cell() const { return atomic_cell::view::from_bytes(_data); } - // FIXME: insert collection variant here + atomic_cell_or_collection(collection_mutation::one cm) : _data(std::move(cm.data)) {} + static atomic_cell_or_collection from_collection_mutation(collection_mutation::one data) { + return std::move(data.data); + } + collection_mutation::view as_collection_mutation() const { + return collection_mutation::view{_data}; + } }; +class column_definition; + +int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right); +void merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww); diff --git a/combine.hh b/combine.hh new file mode 100644 index 0000000000..71738d2f2b --- /dev/null +++ b/combine.hh @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +// combine two sorted uniqued sequences into a single sorted sequence +// unique elements are copied, duplicate elements are merged with a +// binary function. +template +OutputIterator +combine(InputIterator1 begin1, InputIterator1 end1, + InputIterator2 begin2, InputIterator2 end2, + OutputIterator out, + Compare compare, + Merge merge) { + while (begin1 != end1 && begin2 != end2) { + auto& e1 = *begin1; + auto& e2 = *begin2; + if (compare(e1, e2)) { + *out++ = e1; + ++begin1; + } else if (compare(e2, e1)) { + *out++ = e2; + ++begin2; + } else { + *out++ = merge(e1, e2); + ++begin1; + ++begin2; + } + } + out = std::copy(begin1, end1, out); + out = std::copy(begin2, end2, out); + return out; +} + + diff --git a/configure.py b/configure.py index 306ebe6f35..ca88d094ce 100755 --- a/configure.py +++ b/configure.py @@ -255,6 +255,7 @@ urchin_core = (['database.cc', 'cql3/abstract_marker.cc', 'cql3/cql3.cc', 'cql3/cql3_type.cc', + 'cql3/operation.cc', 'cql3/functions/functions.cc', 'cql3/statements/modification_statement.cc', 'cql3/statements/update_statement.cc', diff --git a/cql3/Cql.g b/cql3/Cql.g index 6797179171..8cb4444ae3 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -1158,9 +1158,7 @@ columnOperation[operations_type& operations] columnOperationDifferentiator[operations_type& operations, ::shared_ptr key] : '=' normalColumnOperation[operations, key] -#if 0 | '[' k=term ']' specializedColumnOperation[operations, key, k] -#endif ; normalColumnOperation[operations_type& operations, ::shared_ptr key] @@ -1196,14 +1194,16 @@ normalColumnOperation[operations_type& operations, ::shared_ptr> operations, ColumnIdentifier.Raw key, Term.Raw k] +specializedColumnOperation[std::vector, + shared_ptr>> operations, + shared_ptr key, + shared_ptr k] + : '=' t=term { - addRawUpdate(operations, key, new Operation.SetElement(k, t)); + add_raw_update(operations, key, make_shared(k, t)); } ; -#endif columnCondition[conditions_type& conditions] // Note: we'll reject duplicates later diff --git a/cql3/maps.hh b/cql3/maps.hh index 03ea3f27b6..a648540bf0 100644 --- a/cql3/maps.hh +++ b/cql3/maps.hh @@ -27,6 +27,8 @@ #include "cql3/abstract_marker.hh" #include "cql3/term.hh" +#include "operation.hh" +#include "update_parameters.hh" namespace cql3 { @@ -60,20 +62,20 @@ import org.apache.cassandra.utils.Pair; * Static helper methods and classes for maps. */ class maps { +private: + maps() = delete; public: -#if 0 - private Maps() {} - - public static ColumnSpecification keySpecOf(ColumnSpecification column) - { - return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("key(" + column.name + ")", true), ((MapType)column.type).getKeysType()); + static shared_ptr key_spec_of(column_specification& column) { + return ::make_shared(column.ks_name, column.cf_name, + ::make_shared(sprint("key(%s)", *column.name), true), + dynamic_pointer_cast(column.type)->get_keys_type()); } - public static ColumnSpecification valueSpecOf(ColumnSpecification column) - { - return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("value(" + column.name + ")", true), ((MapType)column.type).getValuesType()); + static shared_ptr value_spec_of(column_specification& column) { + return ::make_shared(column.ks_name, column.cf_name, + ::make_shared(sprint("value(%s)", *column.name), true), + dynamic_pointer_cast(column.type)->get_values_type()); } -#endif class literal : public term::raw { public: @@ -329,51 +331,47 @@ public: Putter.doPut(t, cf, prefix, column, params); } } +#endif - public static class SetterByKey extends Operation - { - private final Term k; - - public SetterByKey(ColumnDefinition column, Term k, Term t) - { - super(column, t); - this.k = k; + class setter_by_key : public operation { + const shared_ptr _k; + public: + setter_by_key(column_definition& column, shared_ptr k, shared_ptr t) + : operation(column, std::move(t)), _k(std::move(k)) { } - @Override - public void collectMarkerSpecification(VariableSpecifications boundNames) - { - super.collectMarkerSpecification(boundNames); - k.collectMarkerSpecification(boundNames); + virtual void collect_marker_specification(shared_ptr bound_names) override { + operation::collect_marker_specification(bound_names); + _k->collect_marker_specification(bound_names); } - public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException - { - assert column.type.isMultiCell() : "Attempted to set a value for a single key on a frozen map"; - ByteBuffer key = k.bindAndGet(params.options); - ByteBuffer value = t.bindAndGet(params.options); - if (key == null) - throw new InvalidRequestException("Invalid null map key"); - - CellName cellName = cf.getComparator().create(prefix, column, key); - - if (value == null) - { - cf.addColumn(params.makeTombstone(cellName)); + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override { + using exceptions::invalid_request_exception; + assert(column.type->is_multi_cell()); // "Attempted to set a value for a single key on a frozen map"m + bytes_opt key = _k->bind_and_get(params._options); + bytes_opt value = _t->bind_and_get(params._options); + if (!key) { + throw invalid_request_exception("Invalid null map key"); } - else - { - // We don't support value > 64K because the serialization format encode the length as an unsigned short. - if (value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) - throw new InvalidRequestException(String.format("Map value is too long. Map values are limited to %d bytes but %d bytes value provided", - FBUtilities.MAX_UNSIGNED_SHORT, - value.remaining())); - - cf.addColumn(params.makeColumn(cellName, value)); + if (value && value->size() >= std::numeric_limits::max()) { + throw invalid_request_exception( + sprint("Map value is too long. Map values are limited to %d bytes but %d bytes value provided", + std::numeric_limits::max(), + value->size())); + } + auto avalue = value ? params.make_cell(*value) : params.make_dead_cell(); + map_type_impl::mutation update = { { std::move(*key), std::move(avalue) } }; + // should have been verified as map earlier? + auto ctype = static_pointer_cast(column.type); + auto col_mut = ctype->serialize_mutation_form(std::move(update)); + if (column.is_static()) { + m.set_static_cell(column, std::move(col_mut)); + } else { + m.set_clustered_cell(prefix, column, std::move(col_mut)); } } - } - + }; +#if 0 public static class Putter extends Operation { public Putter(ColumnDefinition column, Term t) diff --git a/cql3/operation.cc b/cql3/operation.cc new file mode 100644 index 0000000000..6f826e0920 --- /dev/null +++ b/cql3/operation.cc @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "operation.hh" +#include "operation_impl.hh" +#include "maps.hh" + +namespace cql3 { + + +shared_ptr +operation::set_element::prepare(const sstring& keyspace, column_definition& receiver) { + using exceptions::invalid_request_exception; + auto rtype = dynamic_pointer_cast(receiver.type); + if (!rtype) { + throw invalid_request_exception(sprint("Invalid operation (%s) for non collection column %s", receiver, receiver.name())); + } else if (!rtype->is_multi_cell()) { + throw invalid_request_exception(sprint("Invalid operation (%s) for frozen collection column %s", receiver, receiver.name())); + } + + if (&rtype->_kind == &collection_type_impl::kind::list) { + abort(); + // FIXME: +#if 0 + Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver)); + Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver)); + return new Lists.SetterByIndex(receiver, idx, lval); +#endif + } else if (&rtype->_kind == &collection_type_impl::kind::set) { + throw invalid_request_exception(sprint("Invalid operation (%s) for set column %s", receiver, receiver.name())); + } else if (&rtype->_kind == &collection_type_impl::kind::map) { + auto key = _selector->prepare(keyspace, maps::key_spec_of(*receiver.column_specification)); + auto mval = _value->prepare(keyspace, maps::value_spec_of(*receiver.column_specification)); + return make_shared(receiver, key, mval); + } + abort(); +} + +bool +operation::set_element::is_compatible_with(shared_ptr other) { + // TODO: we could check that the other operation is not setting the same element + // too (but since the index/key set may be a bind variables we can't always do it at this point) + return !dynamic_pointer_cast(std::move(other)); +} + +} diff --git a/cql3/operation.hh b/cql3/operation.hh index 16d5f36cd6..d3698d8520 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -26,13 +26,16 @@ #define CQL3_OPERATION_HH #include "core/shared_ptr.hh" - +#include "exceptions/exceptions.hh" #include "database.hh" +#include "term.hh" #include namespace cql3 { +class update_parameters; + #if 0 package org.apache.cassandra.cql3; @@ -77,6 +80,8 @@ public: , _t{t} { } + virtual ~operation() {} + virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const { return _t && _t->uses_function(ks_name, function_name); } @@ -172,54 +177,26 @@ public: class set_value; + class set_element : public raw_update { + const shared_ptr _selector; + const shared_ptr _value; + public: + set_element(shared_ptr selector, shared_ptr value) + : _selector(std::move(selector)), _value(std::move(value)) { + } + + virtual shared_ptr prepare(const sstring& keyspace, column_definition& receiver); #if 0 - public static class SetElement implements RawUpdate - { - private final Term.Raw selector; - private final Term.Raw value; - - public SetElement(Term.Raw selector, Term.Raw value) - { - this.selector = selector; - this.value = value; - } - - public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException - { - if (!(receiver.type instanceof CollectionType)) - throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver.name)); - else if (!(receiver.type.isMultiCell())) - throw new InvalidRequestException(String.format("Invalid operation (%s) for frozen collection column %s", toString(receiver), receiver.name)); - - switch (((CollectionType)receiver.type).kind) - { - case LIST: - Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver)); - Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver)); - return new Lists.SetterByIndex(receiver, idx, lval); - case SET: - throw new InvalidRequestException(String.format("Invalid operation (%s) for set column %s", toString(receiver), receiver.name)); - case MAP: - Term key = selector.prepare(keyspace, Maps.keySpecOf(receiver)); - Term mval = value.prepare(keyspace, Maps.valueSpecOf(receiver)); - return new Maps.SetterByKey(receiver, key, mval); - } - throw new AssertionError(); - } - protected String toString(ColumnSpecification column) { return String.format("%s[%s] = %s", column.name, selector, value); } - public boolean isCompatibleWith(RawUpdate other) - { - // TODO: we could check that the other operation is not setting the same element - // too (but since the index/key set may be a bind variables we can't always do it at this point) - return !(other instanceof SetValue); - } - } +#endif + virtual bool is_compatible_with(shared_ptr other) override; + }; +#if 0 public static class Addition implements RawUpdate { private final Term.Raw value; diff --git a/database.cc b/database.cc index 5fd0ca07de..f3334d6a8d 100644 --- a/database.cc +++ b/database.cc @@ -326,7 +326,6 @@ column_family::apply(const mutation& m) { } // Based on org.apache.cassandra.db.AbstractCell#reconcile() -static inline int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) { if (left.timestamp() != right.timestamp()) { @@ -347,15 +346,18 @@ compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) { } } -static inline -int -compare_for_merge(const column_definition& def, - const std::pair& left, - const std::pair& right) { +void +merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww) { if (def.is_atomic()) { - return compare_atomic_cell_for_merge(left.second.as_atomic_cell(), right.second.as_atomic_cell()); + if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) { + // FIXME: move()? + old = neww; + } } else { - fail(unimplemented::cause::COLLECTIONS); + auto ct = static_pointer_cast(def.type); + old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); } } @@ -367,23 +369,24 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { apply_row_tombstone(schema, entry); } - auto merge_cells = [this, schema] (row& old_row, const row& new_row) { + auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); if (i == old_row.end()) { - _static_row.emplace_hint(i, new_column); + old_row.emplace_hint(i, new_column); } else { auto& old_column = *i; - auto& def = schema->regular_column_at(col); - if (compare_for_merge(def, old_column, new_column) < 0) { - old_column.second = new_column.second; - } + auto& def = find_column_def(col); + merge_column(def, old_column.second, new_column.second); } } }; - merge_cells(_static_row, p._static_row); + auto find_static_column_def = [schema] (auto col) -> column_definition& { return schema->static_column_at(col); }; + auto find_regular_column_def = [schema] (auto col) -> column_definition& { return schema->regular_column_at(col); }; + + merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { auto& key = entry.first; @@ -392,7 +395,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _rows.emplace_hint(i, entry); } else { i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, entry.second.cells); + merge_cells(i->second.cells, entry.second.cells, find_regular_column_def); } } } diff --git a/database.hh b/database.hh index 785a35ac21..ac65c77ddf 100644 --- a/database.hh +++ b/database.hh @@ -94,27 +94,28 @@ public: mutation(mutation&&) = default; mutation(const mutation&) = default; - void set_static_cell(const column_definition& def, atomic_cell::one value) { - emplace_or_insert(p.static_row(), def.id, std::move(value)); + void set_static_cell(const column_definition& def, atomic_cell_or_collection value) { + update_column(p.static_row(), def, std::move(value)); } - void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell::one value) { + void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); - emplace_or_insert(row, def.id, std::move(value)); + update_column(row, def, std::move(value)); } - void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell::one value) { + void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(key); - emplace_or_insert(row, def.id, std::move(value)); + update_column(row, def, std::move(value)); } private: - static void emplace_or_insert(row& row, column_id id, atomic_cell::one&& value) { + static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) { // our mutations are not yet immutable + auto id = def.id; auto i = row.lower_bound(id); if (i == row.end() || i->first != id) { - row.emplace_hint(i, id, atomic_cell_or_collection::from_atomic_cell(std::move(value))); + row.emplace_hint(i, id, std::move(value)); } else { - i->second = atomic_cell_or_collection::from_atomic_cell(std::move(value)); + merge_column(def, i->second, value); } } friend std::ostream& operator<<(std::ostream& os, const mutation& m); diff --git a/schema.hh b/schema.hh index 39f58d2b46..d9d169dbb8 100644 --- a/schema.hh +++ b/schema.hh @@ -34,6 +34,9 @@ public: bool is_compact_value() const; const sstring& name_as_text() const; const bytes& name() const; + friend std::ostream& operator<<(std::ostream& os, const column_definition& cd) { + return os << cd.name_as_text(); + } }; struct thrift_schema { @@ -121,6 +124,9 @@ public: column_definition& regular_column_at(column_id id) { return _regular_columns[id]; } + column_definition& static_column_at(column_id id) { + return _static_columns[id]; + } bool is_last_partition_key(column_definition& def) { return &_partition_key[_partition_key.size() - 1] == &def; } diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 2689181d8c..e23b833617 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -66,3 +66,36 @@ BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); } + +BOOST_AUTO_TEST_CASE(test_map_mutations) { + auto my_map_type = map_type_impl::get_instance(int32_type, utf8_type, true); + auto s = make_lw_shared(schema(some_keyspace, some_column_family, + {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type)); + column_family cf(s); + partition_key key = to_bytes("key1"); + auto& column = *s->get_column_definition("s1"); + map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}}; + mutation m1(key, s); + m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1)); + cf.apply(m1); + map_type_impl::mutation mmut2{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102")))}}; + mutation m2(key, s); + m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2)); + cf.apply(m2); + map_type_impl::mutation mmut3{{int32_type->decompose(103), make_atomic_cell(utf8_type->decompose(sstring("103")))}}; + mutation m3(key, s); + m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3)); + cf.apply(m3); + map_type_impl::mutation mmut2o{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102 override")))}}; + mutation m2o(key, s); + m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o)); + cf.apply(m2o); + + row& r = cf.find_or_create_partition(key).static_row(); + auto i = r.find(column.id); + BOOST_REQUIRE(i != r.end()); + auto cell = i->second.as_collection_mutation(); + auto muts = my_map_type->deserialize_mutation_form(cell.data); + BOOST_REQUIRE(muts.size() == 3); + // FIXME: more strict tests +} diff --git a/types.cc b/types.cc index cb92f18505..16825987c0 100644 --- a/types.cc +++ b/types.cc @@ -7,7 +7,11 @@ #include "types.hh" #include "core/print.hh" #include "net/ip.hh" +#include "database.hh" +#include "util/serialization.hh" +#include "combine.hh" #include +#include template struct simple_type_traits { @@ -541,6 +545,332 @@ struct float_type_impl : floating_type_impl { }; +thread_local logging::logger collection_type_impl::_logger("collection_type_impl"); +const size_t collection_type_impl::max_elements; + +const collection_type_impl::kind collection_type_impl::kind::map( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection); + abort(); + }); +const collection_type_impl::kind collection_type_impl::kind::set( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return Sets.valueSpecOf(collection); + abort(); + }); +const collection_type_impl::kind collection_type_impl::kind::list( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return Lists.valueSpecOf(collection); + abort(); + }); + +shared_ptr +collection_type_impl::kind::make_collection_receiver(shared_ptr collection, bool is_key) const { + return _impl(std::move(collection), is_key); +} + +shared_ptr +collection_type_impl::make_collection_receiver(shared_ptr collection, bool is_key) { + return _kind.make_collection_receiver(std::move(collection), is_key); +} + +std::vector +collection_type_impl::enforce_limit(std::vector cells, int version) { + assert(is_multi_cell()); + if (version >= 3 || cells.size() <= max_elements) { + return cells; + } + _logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. " + "Please see http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.", cells.size(), max_elements, max_elements); + cells.erase(cells.begin() + max_elements, cells.end()); + return cells; +} + +bytes +collection_type_impl::serialize_for_native_protocol(std::vector cells, int version) { + assert(is_multi_cell()); + cells = enforce_limit(std::move(cells), version); + std::vector values = serialized_values(std::move(cells)); + // FIXME: implement + abort(); + // return CollectionSerializer.pack(values, cells.size(), version); +} + +bool +collection_type_impl::is_compatible_with(abstract_type& previous) { + // FIXME: implement + abort(); +} + +shared_ptr +collection_type_impl::as_cql3_type() { + // FIXME: implement + abort(); +} + +int read_collection_size(bytes_view& in, int version) { + if (version >= 3) { + return read_simple(in); + } else { + return read_simple(in); + } +} + +void write_collection_size(std::ostream& out, int size, int version) { + if (version >= 3) { + serialize_int32(out, size); + } else { + serialize_int16(out, uint16_t(size)); + } +} + +bytes read_collection_value(bytes_view& in, int version) { + auto size = version >= 3 ? read_simple(in) : read_simple(in); + auto v = read_simple_bytes(in, size); + return bytes(v.begin(), v.end()); +} + +void write_collection_value(std::ostream& out, int version, data_type type, const boost::any& value) { + // We have to copy here, because we can't guess the size. + // FIXME: somehow. + std::ostringstream tmp; + type->serialize(value, tmp); + auto val_bytes = tmp.str(); + if (version >= 3) { + serialize_int32(out, int32_t(val_bytes.size())); + } else { + serialize_int16(out, uint16_t(val_bytes.size())); + } + out.rdbuf()->sputn(val_bytes.data(), val_bytes.size()); +} + +namespace std { + +template <> +struct hash> : private std::hash { + size_t operator()(const pair& p) const { + // don't simply xor, it will generate the same result for sequential + // pointers + auto f = hash::operator()(p.first); + auto s = hash::operator()(p.second); + return f ^ ((s << 7) | s >> (std::numeric_limits::digits - 7)); + } +}; + +} + +shared_ptr +map_type_impl::get_instance(data_type keys, data_type values, bool is_multi_cell) { + auto& map = is_multi_cell ? _instances : _frozen_instances; + auto p = std::make_pair(keys, values); + auto i = map.find(p); + if (i == map.end()) { + auto t = make_shared(keys, values, is_multi_cell); + i = map.insert(std::make_pair(std::move(p), std::move(t))).first; + } + return i->second; +} + +map_type_impl::map_type_impl(data_type keys, data_type values, bool is_multi_cell) + : collection_type_impl("map<" + keys->name() + ", " + values->name() + ">", kind::map) + , _keys(std::move(keys)) + , _values(std::move(values)) + , _is_multi_cell(is_multi_cell) { +} + +data_type +map_type_impl::freeze() { + if (_is_multi_cell) { + return get_instance(_keys, _values, false); + } else { + return shared_from_this(); + } +} + +bool +map_type_impl::is_compatible_with_frozen(collection_type_impl& previous) { + assert(!_is_multi_cell); + auto* p = dynamic_cast(&previous); + if (!p) { + return false; + } + return _keys->is_compatible_with(*p->_keys) + && _values->is_compatible_with(*p->_values); +} + +bool +map_type_impl::is_value_compatible_with_frozen(collection_type_impl& previous) { + assert(!_is_multi_cell); + auto* p = dynamic_cast(&previous); + if (!p) { + return false; + } + return _keys->is_compatible_with(*p->_keys) + && _values->is_value_compatible_with(*p->_values); +} + +bool +map_type_impl::less(bytes_view o1, bytes_view o2) { + return compare_maps(_keys, _values, o1, o2) < 0; +} + +int32_t +map_type_impl::compare_maps(data_type keys, data_type values, bytes_view o1, bytes_view o2) { + if (o1.empty()) { + return o2.empty() ? 0 : -1; + } else if (o2.empty()) { + return 1; + } + int protocol_version = 3; + int size1 = read_collection_size(o1, protocol_version); + int size2 = read_collection_size(o2, protocol_version); + // FIXME: use std::lexicographical_compare() + for (int i = 0; i < std::min(size1, size2); ++i) { + auto k1 = read_collection_value(o1, protocol_version); + auto k2 = read_collection_value(o2, protocol_version); + auto cmp = keys->compare(k1, k2); + if (cmp != 0) { + return cmp; + } + auto v1 = read_collection_value(o1, protocol_version); + auto v2 = read_collection_value(o2, protocol_version); + cmp = values->compare(v1, v2); + if (cmp != 0) { + return cmp; + } + } + return size1 == size2 ? 0 : (size1 < size2 ? -1 : 1); +} + +void +map_type_impl::serialize(const boost::any& value, std::ostream& out) { + return serialize(value, out, 3); +} + +void +map_type_impl::serialize(const boost::any& value, std::ostream& out, int protocol_version) { + auto& m = boost::any_cast(value); + write_collection_size(out, m.size(), protocol_version); + for (auto&& kv : m) { + write_collection_value(out, protocol_version, _keys, kv.first); + write_collection_value(out, protocol_version, _values, kv.second); + } +} + +object_opt +map_type_impl::deserialize(bytes_view v) { + return deserialize(v, 3); +} + +object_opt +map_type_impl::deserialize(bytes_view in, int protocol_version) { + if (in.empty()) { + return {}; + } + native_type m; + auto size = read_collection_size(in, protocol_version); + for (int i = 0; i < size; ++i) { + bytes kb = read_collection_value(in, protocol_version); + auto k = _keys->deserialize(kb); + bytes vb = read_collection_value(in, protocol_version); + auto v = _values->deserialize(vb); + m.insert(m.end(), std::make_pair(std::move(k), std::move(v))); + } + return { std::move(m) }; +} + +sstring +map_type_impl::to_string(const bytes& b) { + // FIXME: + abort(); +} + +size_t +map_type_impl::hash(bytes_view v) { + // FIXME: + abort(); +} + +bytes +map_type_impl::from_string(sstring_view text) { + // FIXME: + abort(); +} + +std::vector +map_type_impl::serialized_values(std::vector cells) { + // FIXME: + abort(); +} + +auto map_type_impl::deserialize_mutation_form(bytes_view in) -> mutation { + auto nr = read_simple(in); + mutation ret; + ret.reserve(nr); + for (uint32_t i = 0; i != nr; ++i) { + // FIXME: we could probably avoid the need for size + auto ksize = read_simple(in); + auto key = read_simple_bytes(in, ksize); + auto vsize = read_simple(in); + auto value = atomic_cell::view::from_bytes(read_simple_bytes(in, vsize)); + ret.emplace_back(key, value); + } + assert(in.empty()); + return ret; +} + +collection_mutation::one +map_type_impl::serialize_mutation_form(mutation mut) { + std::ostringstream out; + auto write32 = [&out] (uint32_t v) { + v = net::hton(v); + out.write(reinterpret_cast(&v), sizeof(v)); + }; + auto writeb = [&out, write32] (bytes_view v) { + write32(v.size()); + out.write(v.begin(), v.size()); + }; + // FIXME: overflow? + write32(mut.size()); + for (auto&& kv : mut) { + auto&& k = kv.first; + auto&& v = kv.second; + writeb(k); + writeb(v.serialize()); + } + auto s = out.str(); + return collection_mutation::one{bytes(s.data(), s.size())}; +} + +collection_mutation::one +map_type_impl::merge(collection_mutation::view a, collection_mutation::view b) { + auto aa = deserialize_mutation_form(a.data); + auto bb = deserialize_mutation_form(b.data); + mutation merged; + merged.reserve(aa.size() + bb.size()); + using element_type = std::pair; + auto compare = [this] (const element_type& e1, const element_type& e2) { + return _keys->less(e1.first, e2.first); + }; + auto merge = [this] (const element_type& e1, const element_type& e2) { + // FIXME: use std::max()? + return std::make_pair(e1.first, compare_atomic_cell_for_merge(e1.second, e2.second) < 0 ? e1.second : e2.second); + }; + combine(aa.begin(), aa.end(), + bb.begin(), bb.end(), + std::back_inserter(merged), + compare, + merge); + return serialize_mutation_form(merged); +} + +thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_instances; +thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_frozen_instances; + + thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii", cql3::native_cql3_type::ascii)); diff --git a/types.hh b/types.hh index 89b9d32c9d..1c44fad9d5 100644 --- a/types.hh +++ b/types.hh @@ -16,10 +16,13 @@ #include "net/byteorder.hh" #include "db_clock.hh" #include "bytes.hh" +#include "log.hh" +#include "atomic_cell.hh" namespace cql3 { class cql3_type; +class column_specification; } @@ -48,7 +51,7 @@ inline int32_t compare_unsigned(bytes_view v1, bytes_view v2) { return (int32_t) (v1.size() - v2.size()); } -class abstract_type { +class abstract_type : public enable_shared_from_this { sstring _name; public: abstract_type(sstring name) : _name(name) {} @@ -150,10 +153,94 @@ public: virtual bool is_collection() { return false; } virtual bool is_multi_cell() { return false; } virtual ::shared_ptr as_cql3_type() = 0; + virtual shared_ptr freeze() { return shared_from_this(); } }; using data_type = shared_ptr; +class collection_type_impl : public abstract_type { + static thread_local logging::logger _logger; +public: + static constexpr const size_t max_elements = 65535; + + class kind { + std::function (shared_ptr collection, bool is_key)> _impl; + public: + kind(std::function (shared_ptr collection, bool is_key)> impl) + : _impl(std::move(impl)) {} + shared_ptr make_collection_receiver(shared_ptr collection, bool is_key) const; + static const kind map; + static const kind set; + static const kind list; + }; + + const kind& _kind; + +protected: + explicit collection_type_impl(sstring name, const kind& k) + : abstract_type(std::move(name)), _kind(k) {} +public: + virtual data_type name_comparator() = 0; + virtual data_type value_comparator() = 0; + shared_ptr make_collection_receiver(shared_ptr collection, bool is_key); + virtual bool is_collection() override { return true; } + bool is_map() const { return &_kind == &kind::map; } + std::vector enforce_limit(std::vector, int version); + virtual std::vector serialized_values(std::vector cells) = 0; + bytes serialize_for_native_protocol(std::vector cells, int version); + virtual bool is_compatible_with(abstract_type& previous) override; + virtual bool is_compatible_with_frozen(collection_type_impl& previous) = 0; + virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) = 0; + virtual shared_ptr as_cql3_type() override; + virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) = 0; +}; + +using collection_type = shared_ptr; + +class map_type_impl final : public collection_type_impl { + using map_type = shared_ptr; + static thread_local std::unordered_map, map_type> _instances; + static thread_local std::unordered_map, map_type> _frozen_instances; + data_type _keys; + data_type _values; + data_type _key_value_pair_type; + bool _is_multi_cell; +public: + // type returned by deserialize() and expected by serialize + // does not support mutations/ttl/tombstone - purely for I/O. + using native_type = std::vector>; + // representation of a map mutation, key/value pairs, value is a mutation itself + using mutation = std::vector>; + static shared_ptr get_instance(data_type keys, data_type values, bool is_multi_cell); + map_type_impl(data_type keys, data_type values, bool is_multi_cell); + data_type get_keys_type() const { return _keys; } + data_type get_values_type() const { return _values; } + virtual data_type name_comparator() override { return _keys; } + virtual data_type value_comparator() override { return _values; } + virtual bool is_multi_cell() override { return _is_multi_cell; } + virtual data_type freeze() override; + virtual bool is_compatible_with_frozen(collection_type_impl& previous) override; + virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) override; + virtual bool less(bytes_view o1, bytes_view o2) override; + static int32_t compare_maps(data_type keys_comparator, data_type values_comparator, + bytes_view o1, bytes_view o2); + virtual bool is_byte_order_comparable() const override { return false; } + virtual void serialize(const boost::any& value, std::ostream& out) override; + void serialize(const boost::any& value, std::ostream& out, int protocol_version); + virtual object_opt deserialize(bytes_view v) override; + object_opt deserialize(bytes_view v, int protocol_version); + virtual sstring to_string(const bytes& b) override; + virtual size_t hash(bytes_view v) override; + virtual bytes from_string(sstring_view text) override; + virtual std::vector serialized_values(std::vector cells) override; + mutation deserialize_mutation_form(bytes_view in); + // FIXME: use iterators? + collection_mutation::one serialize_mutation_form(mutation mut); + virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) override; +}; + +using map_type = shared_ptr; + inline size_t hash_value(const shared_ptr& x) { return std::hash()(x.get()); @@ -323,6 +410,17 @@ T read_simple_exactly(bytes_view& v) { return net::ntoh(*reinterpret_cast*>(p)); } +inline +bytes_view +read_simple_bytes(bytes_view& v, size_t n) { + if (v.size() < n) { + throw marshal_exception(); + } + bytes_view ret(v.begin(), n); + v.remove_prefix(n); + return ret; +} + template object_opt read_simple_opt(bytes_view& v) { if (v.empty()) {