From 7eba98f18c9d0fee988f53791b9dd034b6880476 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 4 Mar 2015 16:49:16 +0200 Subject: [PATCH 01/24] atomic_cell: add missing include --- atomic_cell.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/atomic_cell.hh b/atomic_cell.hh index d47862409a..f645161403 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -6,6 +6,7 @@ #include "bytes.hh" #include "timestamp.hh" +#include "gc_clock.hh" #include template From 853d0b2ca0d0270368b6865fb89084ba2adeaf31 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 2 Mar 2015 16:36:14 +0200 Subject: [PATCH 02/24] abstract_type: implement freeze() --- types.hh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/types.hh b/types.hh index 5198e13cd4..949af3fe6d 100644 --- a/types.hh +++ b/types.hh @@ -48,7 +48,7 @@ inline int32_t compare_unsigned(bytes_view v1, bytes_view v2) { return (int32_t) (v1.size() - v2.size()); } -class abstract_type { +class abstract_type : public enable_shared_from_this { sstring _name; public: abstract_type(sstring name) : _name(name) {} @@ -150,6 +150,7 @@ public: virtual bool is_collection() { return false; } virtual bool is_multi_cell() { return false; } virtual ::shared_ptr as_cql3_type() = 0; + virtual shared_ptr freeze() { return shared_from_this(); } }; using data_type = shared_ptr; From 74beb625271b4accd8b7106d8aca558018b9b193 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:29:09 +0200 Subject: [PATCH 03/24] schema: add stringify operation for column_definition --- schema.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/schema.hh b/schema.hh index 39f58d2b46..b7cb3b69d7 100644 --- a/schema.hh +++ b/schema.hh @@ -34,6 +34,9 @@ public: bool is_compact_value() const; const sstring& name_as_text() const; const bytes& name() const; + friend std::ostream& operator<<(std::ostream& os, const column_definition& cd) { + return os << cd.name_as_text(); + } }; struct thrift_schema { From bb58deadc072767d9df21ed6fe7dd172bedf8413 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 3 Mar 2015 13:10:59 +0200 Subject: [PATCH 04/24] cql3: add virtual destructor to class operation --- cql3/operation.hh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cql3/operation.hh b/cql3/operation.hh index 16d5f36cd6..97f390cebe 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -77,6 +77,8 @@ public: , _t{t} { } + virtual ~operation() {} + virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const { return _t && _t->uses_function(ks_name, function_name); } From 76d1256565eb49a10a6acfdd8f2a8fdec72c728c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:09:54 +0200 Subject: [PATCH 05/24] atomic_cell: add serialize() method Since atomic_cell is already in serialized form, the implementation is simple. --- atomic_cell.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/atomic_cell.hh b/atomic_cell.hh index f645161403..ff3013640f 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -123,6 +123,9 @@ public: ttl_opt ttl() const { return atomic_cell::ttl(_data); } + bytes_view serialize() const { + return _data; + } friend class atomic_cell::one; }; From 2f8be37fcab2f9b67ca2ebdad4356e9b33d4f79d Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:10:32 +0200 Subject: [PATCH 06/24] atomic_cell_or_collection: allow construction from atomic_cell Since an atomic_cell has an is-a relationship to atomic_cell_or_collection, we can allow direct conversion. Type information is only lost, not added. --- atomic_cell.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/atomic_cell.hh b/atomic_cell.hh index ff3013640f..ca5668cce8 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -182,6 +182,7 @@ class atomic_cell_or_collection final { private: atomic_cell_or_collection(bytes&& data) : _data(std::move(data)) {} public: + atomic_cell_or_collection(atomic_cell::one ac) : _data(std::move(ac._data)) {} static atomic_cell_or_collection from_atomic_cell(atomic_cell::one data) { return { std::move(data._data) }; } atomic_cell::view as_atomic_cell() const { return atomic_cell::view::from_bytes(_data); } // FIXME: insert collection variant here From 57b6d4ada5f852ab7092228484fb1dbffd72ad03 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:19:31 +0200 Subject: [PATCH 07/24] atomic_cell: add collection support We leave interpretation to the backing type (map/set/list), so there is not much code here. --- atomic_cell.hh | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/atomic_cell.hh b/atomic_cell.hh index ca5668cce8..0d17f42038 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -175,6 +175,22 @@ public: friend class atomic_cell_or_collection; }; +// Represents a mutation of a collection. Actual format is determined by collection type, +// and is: +// set: list of atomic_cell +// map: list of pair (for key/value) +// list: tbd, probably ugly +class collection_mutation { +public: + struct view { + bytes_view data; + }; + struct one { + bytes data; + operator view() const { return { data }; } + }; +}; + // A variant type that can hold either an atomic_cell, or a serialized collection. // Which type is stored is determinied by the schema. class atomic_cell_or_collection final { @@ -185,6 +201,12 @@ public: atomic_cell_or_collection(atomic_cell::one ac) : _data(std::move(ac._data)) {} static atomic_cell_or_collection from_atomic_cell(atomic_cell::one data) { return { std::move(data._data) }; } atomic_cell::view as_atomic_cell() const { return atomic_cell::view::from_bytes(_data); } - // FIXME: insert collection variant here + atomic_cell_or_collection(collection_mutation::one cm) : _data(std::move(cm.data)) {} + static atomic_cell_or_collection from_collection_mutation(collection_mutation::one data) { + return std::move(data.data); + } + collection_mutation::view as_collection_mutation() const { + return collection_mutation::view{_data}; + } }; From b14d9f1f0264391143b79e1f41d63ccc67fcb690 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:21:24 +0200 Subject: [PATCH 08/24] mutation: support for collections We simply store the collection mutation as we do atomic cells -- merging will be done by the consumer. --- database.hh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/database.hh b/database.hh index 785a35ac21..cbcf96affc 100644 --- a/database.hh +++ b/database.hh @@ -94,27 +94,27 @@ public: mutation(mutation&&) = default; mutation(const mutation&) = default; - void set_static_cell(const column_definition& def, atomic_cell::one value) { + void set_static_cell(const column_definition& def, atomic_cell_or_collection value) { emplace_or_insert(p.static_row(), def.id, std::move(value)); } - void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell::one value) { + void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); emplace_or_insert(row, def.id, std::move(value)); } - void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell::one value) { + void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(key); emplace_or_insert(row, def.id, std::move(value)); } private: - static void emplace_or_insert(row& row, column_id id, atomic_cell::one&& value) { + static void emplace_or_insert(row& row, column_id id, atomic_cell_or_collection&& value) { // our mutations are not yet immutable auto i = row.lower_bound(id); if (i == row.end() || i->first != id) { - row.emplace_hint(i, id, atomic_cell_or_collection::from_atomic_cell(std::move(value))); + row.emplace_hint(i, id, std::move(value)); } else { - i->second = atomic_cell_or_collection::from_atomic_cell(std::move(value)); + i->second = std::move(value); } } friend std::ostream& operator<<(std::ostream& os, const mutation& m); From 56d5c24a6aa9a0a5756e885049853e1954ab883d Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 26 Feb 2015 15:27:18 +0200 Subject: [PATCH 09/24] db: implement CollectionType (partial) --- types.cc | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ types.hh | 41 ++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/types.cc b/types.cc index cb92f18505..98e0a672bd 100644 --- a/types.cc +++ b/types.cc @@ -7,6 +7,7 @@ #include "types.hh" #include "core/print.hh" #include "net/ip.hh" +#include "database.hh" #include template @@ -541,6 +542,72 @@ struct float_type_impl : floating_type_impl { }; +thread_local logging::logger collection_type_impl::_logger("collection_type_impl"); +const size_t collection_type_impl::max_elements; + +const collection_type_impl::kind collection_type_impl::kind::map( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection); + abort(); + }); +const collection_type_impl::kind collection_type_impl::kind::set( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return Sets.valueSpecOf(collection); + abort(); + }); +const collection_type_impl::kind collection_type_impl::kind::list( + [] (shared_ptr collection, bool is_key) -> shared_ptr { + // FIXME: implement + // return Lists.valueSpecOf(collection); + abort(); + }); + +shared_ptr +collection_type_impl::kind::make_collection_receiver(shared_ptr collection, bool is_key) const { + return _impl(std::move(collection), is_key); +} + +shared_ptr +collection_type_impl::make_collection_receiver(shared_ptr collection, bool is_key) { + return _kind.make_collection_receiver(std::move(collection), is_key); +} + +std::vector +collection_type_impl::enforce_limit(std::vector cells, int version) { + assert(is_multi_cell()); + if (version >= 3 || cells.size() <= max_elements) { + return cells; + } + _logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. " + "Please see http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.", cells.size(), max_elements, max_elements); + cells.erase(cells.begin() + max_elements, cells.end()); + return cells; +} + +bytes +collection_type_impl::serialize_for_native_protocol(std::vector cells, int version) { + assert(is_multi_cell()); + cells = enforce_limit(std::move(cells), version); + std::vector values = serialized_values(std::move(cells)); + // FIXME: implement + abort(); + // return CollectionSerializer.pack(values, cells.size(), version); +} + +bool +collection_type_impl::is_compatible_with(abstract_type& previous) { + // FIXME: implement + abort(); +} + +shared_ptr +collection_type_impl::as_cql3_type() { + // FIXME: implement + abort(); +} + thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii", cql3::native_cql3_type::ascii)); diff --git a/types.hh b/types.hh index 949af3fe6d..fc2a1f9ddc 100644 --- a/types.hh +++ b/types.hh @@ -16,10 +16,13 @@ #include "net/byteorder.hh" #include "db_clock.hh" #include "bytes.hh" +#include "log.hh" +#include "atomic_cell.hh" namespace cql3 { class cql3_type; +class column_specification; } @@ -155,6 +158,44 @@ public: using data_type = shared_ptr; +class collection_type_impl : public abstract_type { + static thread_local logging::logger _logger; +public: + static constexpr const size_t max_elements = 65535; + + class kind { + std::function (shared_ptr collection, bool is_key)> _impl; + public: + kind(std::function (shared_ptr collection, bool is_key)> impl) + : _impl(std::move(impl)) {} + shared_ptr make_collection_receiver(shared_ptr collection, bool is_key) const; + static const kind map; + static const kind set; + static const kind list; + }; + + const kind& _kind; + +protected: + explicit collection_type_impl(sstring name, const kind& k) + : abstract_type(std::move(name)), _kind(k) {} +public: + virtual data_type name_comparator() = 0; + virtual data_type value_comparator() = 0; + shared_ptr make_collection_receiver(shared_ptr collection, bool is_key); + virtual bool is_collection() override { return true; } + bool is_map() const { return &_kind == &kind::map; } + std::vector enforce_limit(std::vector, int version); + virtual std::vector serialized_values(std::vector cells) = 0; + bytes serialize_for_native_protocol(std::vector cells, int version); + virtual bool is_compatible_with(abstract_type& previous) override; + virtual bool is_compatible_with_frozen(collection_type_impl& previous) = 0; + virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) = 0; + virtual shared_ptr as_cql3_type() override; +}; + +using collection_type = shared_ptr; + inline size_t hash_value(const shared_ptr& x) { return std::hash()(x.get()); From b5571e4c1820e984c93d54275815c6d143f771a5 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 2 Mar 2015 16:37:34 +0200 Subject: [PATCH 10/24] db: implement map_type (partial) --- types.cc | 242 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ types.hh | 54 +++++++++++++ 2 files changed, 296 insertions(+) diff --git a/types.cc b/types.cc index 98e0a672bd..1d22c58fa9 100644 --- a/types.cc +++ b/types.cc @@ -8,7 +8,9 @@ #include "core/print.hh" #include "net/ip.hh" #include "database.hh" +#include "util/serialization.hh" #include +#include template struct simple_type_traits { @@ -608,6 +610,246 @@ collection_type_impl::as_cql3_type() { abort(); } +int read_collection_size(bytes_view& in, int version) { + if (version >= 3) { + return read_simple(in); + } else { + return read_simple(in); + } +} + +void write_collection_size(std::ostream& out, int size, int version) { + if (version >= 3) { + serialize_int32(out, size); + } else { + serialize_int16(out, uint16_t(size)); + } +} + +bytes read_collection_value(bytes_view& in, int version) { + auto size = version >= 3 ? read_simple(in) : read_simple(in); + auto v = read_simple_bytes(in, size); + return bytes(v.begin(), v.end()); +} + +void write_collection_value(std::ostream& out, int version, data_type type, const boost::any& value) { + // We have to copy here, because we can't guess the size. + // FIXME: somehow. + std::ostringstream tmp; + type->serialize(value, tmp); + auto val_bytes = tmp.str(); + if (version >= 3) { + serialize_int32(out, int32_t(val_bytes.size())); + } else { + serialize_int16(out, uint16_t(val_bytes.size())); + } + out.rdbuf()->sputn(val_bytes.data(), val_bytes.size()); +} + +namespace std { + +template <> +struct hash> : private std::hash { + size_t operator()(const pair& p) const { + // don't simply xor, it will generate the same result for sequential + // pointers + auto f = hash::operator()(p.first); + auto s = hash::operator()(p.second); + return f ^ ((s << 7) | s >> (std::numeric_limits::digits - 7)); + } +}; + +} + +shared_ptr +map_type_impl::get_instance(data_type keys, data_type values, bool is_multi_cell) { + auto& map = is_multi_cell ? _instances : _frozen_instances; + auto p = std::make_pair(keys, values); + auto i = map.find(p); + if (i == map.end()) { + auto t = make_shared(keys, values, is_multi_cell); + i = map.insert(std::make_pair(std::move(p), std::move(t))).first; + } + return i->second; +} + +map_type_impl::map_type_impl(data_type keys, data_type values, bool is_multi_cell) + : collection_type_impl("map<" + keys->name() + ", " + values->name() + ">", kind::map) + , _keys(std::move(keys)) + , _values(std::move(values)) + , _is_multi_cell(is_multi_cell) { +} + +data_type +map_type_impl::freeze() { + if (_is_multi_cell) { + return get_instance(_keys, _values, false); + } else { + return shared_from_this(); + } +} + +bool +map_type_impl::is_compatible_with_frozen(collection_type_impl& previous) { + assert(!_is_multi_cell); + auto* p = dynamic_cast(&previous); + if (!p) { + return false; + } + return _keys->is_compatible_with(*p->_keys) + && _values->is_compatible_with(*p->_values); +} + +bool +map_type_impl::is_value_compatible_with_frozen(collection_type_impl& previous) { + assert(!_is_multi_cell); + auto* p = dynamic_cast(&previous); + if (!p) { + return false; + } + return _keys->is_compatible_with(*p->_keys) + && _values->is_value_compatible_with(*p->_values); +} + +bool +map_type_impl::less(bytes_view o1, bytes_view o2) { + return compare_maps(_keys, _values, o1, o2) < 0; +} + +int32_t +map_type_impl::compare_maps(data_type keys, data_type values, bytes_view o1, bytes_view o2) { + if (o1.empty()) { + return o2.empty() ? 0 : -1; + } else if (o2.empty()) { + return 1; + } + int protocol_version = 3; + int size1 = read_collection_size(o1, protocol_version); + int size2 = read_collection_size(o2, protocol_version); + // FIXME: use std::lexicographical_compare() + for (int i = 0; i < std::min(size1, size2); ++i) { + auto k1 = read_collection_value(o1, protocol_version); + auto k2 = read_collection_value(o2, protocol_version); + auto cmp = keys->compare(k1, k2); + if (cmp != 0) { + return cmp; + } + auto v1 = read_collection_value(o1, protocol_version); + auto v2 = read_collection_value(o2, protocol_version); + cmp = values->compare(v1, v2); + if (cmp != 0) { + return cmp; + } + } + return size1 == size2 ? 0 : (size1 < size2 ? -1 : 1); +} + +void +map_type_impl::serialize(const boost::any& value, std::ostream& out) { + return serialize(value, out, 3); +} + +void +map_type_impl::serialize(const boost::any& value, std::ostream& out, int protocol_version) { + auto& m = boost::any_cast(value); + write_collection_size(out, m.size(), protocol_version); + for (auto&& kv : m) { + write_collection_value(out, protocol_version, _keys, kv.first); + write_collection_value(out, protocol_version, _values, kv.second); + } +} + +object_opt +map_type_impl::deserialize(bytes_view v) { + return deserialize(v, 3); +} + +object_opt +map_type_impl::deserialize(bytes_view in, int protocol_version) { + if (in.empty()) { + return {}; + } + native_type m; + auto size = read_collection_size(in, protocol_version); + for (int i = 0; i < size; ++i) { + bytes kb = read_collection_value(in, protocol_version); + auto k = _keys->deserialize(kb); + bytes vb = read_collection_value(in, protocol_version); + auto v = _values->deserialize(vb); + m.insert(m.end(), std::make_pair(std::move(k), std::move(v))); + } + return { std::move(m) }; +} + +sstring +map_type_impl::to_string(const bytes& b) { + // FIXME: + abort(); +} + +size_t +map_type_impl::hash(bytes_view v) { + // FIXME: + abort(); +} + +bytes +map_type_impl::from_string(sstring_view text) { + // FIXME: + abort(); +} + +std::vector +map_type_impl::serialized_values(std::vector cells) { + // FIXME: + abort(); +} + +auto map_type_impl::deserialize_mutation_form(bytes_view in) -> mutation { + auto nr = read_simple(in); + mutation ret; + ret.reserve(nr); + for (uint32_t i = 0; i != nr; ++i) { + // FIXME: we could probably avoid the need for size + auto ksize = read_simple(in); + auto key = read_simple_bytes(in, ksize); + auto vsize = read_simple(in); + auto value = atomic_cell::view::from_bytes(read_simple_bytes(in, vsize)); + ret.emplace_back(key, value); + } + assert(in.empty()); + return ret; +} + +collection_mutation::one +map_type_impl::serialize_mutation_form(mutation mut) { + std::ostringstream out; + auto write32 = [&out] (uint32_t v) { + v = net::hton(v); + out.write(reinterpret_cast(&v), sizeof(v)); + }; + auto writeb = [&out, write32] (bytes_view v) { + write32(v.size()); + out.write(v.begin(), v.size()); + }; + // FIXME: overflow? + write32(mut.size()); + for (auto&& kv : mut) { + auto&& k = kv.first; + auto&& v = kv.second; + writeb(k); + writeb(v.serialize()); + } + auto s = out.str(); + return collection_mutation::one{bytes(s.data(), s.size())}; +} + + + +thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_instances; +thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_frozen_instances; + + thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii", cql3::native_cql3_type::ascii)); diff --git a/types.hh b/types.hh index fc2a1f9ddc..e111def4bd 100644 --- a/types.hh +++ b/types.hh @@ -196,6 +196,49 @@ public: using collection_type = shared_ptr; +class map_type_impl final : public collection_type_impl { + using map_type = shared_ptr; + static thread_local std::unordered_map, map_type> _instances; + static thread_local std::unordered_map, map_type> _frozen_instances; + data_type _keys; + data_type _values; + data_type _key_value_pair_type; + bool _is_multi_cell; +public: + // type returned by deserialize() and expected by serialize + // does not support mutations/ttl/tombstone - purely for I/O. + using native_type = std::vector>; + // representation of a map mutation, key/value pairs, value is a mutation itself + using mutation = std::vector>; + static shared_ptr get_instance(data_type keys, data_type values, bool is_multi_cell); + map_type_impl(data_type keys, data_type values, bool is_multi_cell); + data_type get_keys_type() const { return _keys; } + data_type get_values_type() const { return _values; } + virtual data_type name_comparator() override { return _keys; } + virtual data_type value_comparator() override { return _values; } + virtual bool is_multi_cell() override { return _is_multi_cell; } + virtual data_type freeze() override; + virtual bool is_compatible_with_frozen(collection_type_impl& previous) override; + virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) override; + virtual bool less(bytes_view o1, bytes_view o2) override; + static int32_t compare_maps(data_type keys_comparator, data_type values_comparator, + bytes_view o1, bytes_view o2); + virtual bool is_byte_order_comparable() const override { return false; } + virtual void serialize(const boost::any& value, std::ostream& out) override; + void serialize(const boost::any& value, std::ostream& out, int protocol_version); + virtual object_opt deserialize(bytes_view v) override; + object_opt deserialize(bytes_view v, int protocol_version); + virtual sstring to_string(const bytes& b) override; + virtual size_t hash(bytes_view v) override; + virtual bytes from_string(sstring_view text) override; + virtual std::vector serialized_values(std::vector cells) override; + mutation deserialize_mutation_form(bytes_view in); + // FIXME: use iterators? + collection_mutation::one serialize_mutation_form(mutation mut); +}; + +using map_type = shared_ptr; + inline size_t hash_value(const shared_ptr& x) { return std::hash()(x.get()); @@ -365,6 +408,17 @@ T read_simple_exactly(bytes_view& v) { return net::ntoh(*reinterpret_cast*>(p)); } +inline +bytes_view +read_simple_bytes(bytes_view& v, size_t n) { + if (v.size() < n) { + throw marshal_exception(); + } + bytes_view ret(v.begin(), n); + v.remove_prefix(n); + return ret; +} + template object_opt read_simple_opt(bytes_view& v) { if (v.empty()) { From 4602d86dc93ed454a490bf8761f1ed5441e8bee4 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:28:40 +0200 Subject: [PATCH 11/24] cql3: convert maps::key_spec_of/maps::value_spec_of --- cql3/maps.hh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cql3/maps.hh b/cql3/maps.hh index 03ea3f27b6..f5135a2b77 100644 --- a/cql3/maps.hh +++ b/cql3/maps.hh @@ -60,20 +60,20 @@ import org.apache.cassandra.utils.Pair; * Static helper methods and classes for maps. */ class maps { +private: + maps() = delete; public: -#if 0 - private Maps() {} - - public static ColumnSpecification keySpecOf(ColumnSpecification column) - { - return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("key(" + column.name + ")", true), ((MapType)column.type).getKeysType()); + static shared_ptr key_spec_of(column_specification& column) { + return ::make_shared(column.ks_name, column.cf_name, + ::make_shared(sprint("key(%s)", *column.name), true), + dynamic_pointer_cast(column.type)->get_keys_type()); } - public static ColumnSpecification valueSpecOf(ColumnSpecification column) - { - return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("value(" + column.name + ")", true), ((MapType)column.type).getValuesType()); + static shared_ptr value_spec_of(column_specification& column) { + return ::make_shared(column.ks_name, column.cf_name, + ::make_shared(sprint("value(%s)", *column.name), true), + dynamic_pointer_cast(column.type)->get_values_type()); } -#endif class literal : public term::raw { public: From 7a94b0c0a3bbe32fded8950c8f8814285ee2248b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:33:26 +0200 Subject: [PATCH 12/24] cql3: convert maps::setter_by_key Basic operation to set a single map element to a value. --- cql3/maps.hh | 72 +++++++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/cql3/maps.hh b/cql3/maps.hh index f5135a2b77..a648540bf0 100644 --- a/cql3/maps.hh +++ b/cql3/maps.hh @@ -27,6 +27,8 @@ #include "cql3/abstract_marker.hh" #include "cql3/term.hh" +#include "operation.hh" +#include "update_parameters.hh" namespace cql3 { @@ -329,51 +331,47 @@ public: Putter.doPut(t, cf, prefix, column, params); } } +#endif - public static class SetterByKey extends Operation - { - private final Term k; - - public SetterByKey(ColumnDefinition column, Term k, Term t) - { - super(column, t); - this.k = k; + class setter_by_key : public operation { + const shared_ptr _k; + public: + setter_by_key(column_definition& column, shared_ptr k, shared_ptr t) + : operation(column, std::move(t)), _k(std::move(k)) { } - @Override - public void collectMarkerSpecification(VariableSpecifications boundNames) - { - super.collectMarkerSpecification(boundNames); - k.collectMarkerSpecification(boundNames); + virtual void collect_marker_specification(shared_ptr bound_names) override { + operation::collect_marker_specification(bound_names); + _k->collect_marker_specification(bound_names); } - public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException - { - assert column.type.isMultiCell() : "Attempted to set a value for a single key on a frozen map"; - ByteBuffer key = k.bindAndGet(params.options); - ByteBuffer value = t.bindAndGet(params.options); - if (key == null) - throw new InvalidRequestException("Invalid null map key"); - - CellName cellName = cf.getComparator().create(prefix, column, key); - - if (value == null) - { - cf.addColumn(params.makeTombstone(cellName)); + virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override { + using exceptions::invalid_request_exception; + assert(column.type->is_multi_cell()); // "Attempted to set a value for a single key on a frozen map"m + bytes_opt key = _k->bind_and_get(params._options); + bytes_opt value = _t->bind_and_get(params._options); + if (!key) { + throw invalid_request_exception("Invalid null map key"); } - else - { - // We don't support value > 64K because the serialization format encode the length as an unsigned short. - if (value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) - throw new InvalidRequestException(String.format("Map value is too long. Map values are limited to %d bytes but %d bytes value provided", - FBUtilities.MAX_UNSIGNED_SHORT, - value.remaining())); - - cf.addColumn(params.makeColumn(cellName, value)); + if (value && value->size() >= std::numeric_limits::max()) { + throw invalid_request_exception( + sprint("Map value is too long. Map values are limited to %d bytes but %d bytes value provided", + std::numeric_limits::max(), + value->size())); + } + auto avalue = value ? params.make_cell(*value) : params.make_dead_cell(); + map_type_impl::mutation update = { { std::move(*key), std::move(avalue) } }; + // should have been verified as map earlier? + auto ctype = static_pointer_cast(column.type); + auto col_mut = ctype->serialize_mutation_form(std::move(update)); + if (column.is_static()) { + m.set_static_cell(column, std::move(col_mut)); + } else { + m.set_clustered_cell(prefix, column, std::move(col_mut)); } } - } - + }; +#if 0 public static class Putter extends Operation { public Putter(ColumnDefinition column, Term t) From 1fcce7cdcb9bbf69cf4330decd193d94ed1059af Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:34:40 +0200 Subject: [PATCH 13/24] cql3: convert operation::set_element --- configure.py | 1 + cql3/operation.cc | 65 +++++++++++++++++++++++++++++++++++++++++++++++ cql3/operation.hh | 59 +++++++++++++----------------------------- 3 files changed, 83 insertions(+), 42 deletions(-) create mode 100644 cql3/operation.cc diff --git a/configure.py b/configure.py index a210fe0545..84bcee2ff1 100755 --- a/configure.py +++ b/configure.py @@ -254,6 +254,7 @@ urchin_core = (['database.cc', 'cql3/abstract_marker.cc', 'cql3/cql3.cc', 'cql3/cql3_type.cc', + 'cql3/operation.cc', 'cql3/functions/functions.cc', 'cql3/statements/modification_statement.cc', 'cql3/statements/update_statement.cc', diff --git a/cql3/operation.cc b/cql3/operation.cc new file mode 100644 index 0000000000..6f826e0920 --- /dev/null +++ b/cql3/operation.cc @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "operation.hh" +#include "operation_impl.hh" +#include "maps.hh" + +namespace cql3 { + + +shared_ptr +operation::set_element::prepare(const sstring& keyspace, column_definition& receiver) { + using exceptions::invalid_request_exception; + auto rtype = dynamic_pointer_cast(receiver.type); + if (!rtype) { + throw invalid_request_exception(sprint("Invalid operation (%s) for non collection column %s", receiver, receiver.name())); + } else if (!rtype->is_multi_cell()) { + throw invalid_request_exception(sprint("Invalid operation (%s) for frozen collection column %s", receiver, receiver.name())); + } + + if (&rtype->_kind == &collection_type_impl::kind::list) { + abort(); + // FIXME: +#if 0 + Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver)); + Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver)); + return new Lists.SetterByIndex(receiver, idx, lval); +#endif + } else if (&rtype->_kind == &collection_type_impl::kind::set) { + throw invalid_request_exception(sprint("Invalid operation (%s) for set column %s", receiver, receiver.name())); + } else if (&rtype->_kind == &collection_type_impl::kind::map) { + auto key = _selector->prepare(keyspace, maps::key_spec_of(*receiver.column_specification)); + auto mval = _value->prepare(keyspace, maps::value_spec_of(*receiver.column_specification)); + return make_shared(receiver, key, mval); + } + abort(); +} + +bool +operation::set_element::is_compatible_with(shared_ptr other) { + // TODO: we could check that the other operation is not setting the same element + // too (but since the index/key set may be a bind variables we can't always do it at this point) + return !dynamic_pointer_cast(std::move(other)); +} + +} diff --git a/cql3/operation.hh b/cql3/operation.hh index 97f390cebe..d3698d8520 100644 --- a/cql3/operation.hh +++ b/cql3/operation.hh @@ -26,13 +26,16 @@ #define CQL3_OPERATION_HH #include "core/shared_ptr.hh" - +#include "exceptions/exceptions.hh" #include "database.hh" +#include "term.hh" #include namespace cql3 { +class update_parameters; + #if 0 package org.apache.cassandra.cql3; @@ -174,54 +177,26 @@ public: class set_value; + class set_element : public raw_update { + const shared_ptr _selector; + const shared_ptr _value; + public: + set_element(shared_ptr selector, shared_ptr value) + : _selector(std::move(selector)), _value(std::move(value)) { + } + + virtual shared_ptr prepare(const sstring& keyspace, column_definition& receiver); #if 0 - public static class SetElement implements RawUpdate - { - private final Term.Raw selector; - private final Term.Raw value; - - public SetElement(Term.Raw selector, Term.Raw value) - { - this.selector = selector; - this.value = value; - } - - public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException - { - if (!(receiver.type instanceof CollectionType)) - throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver.name)); - else if (!(receiver.type.isMultiCell())) - throw new InvalidRequestException(String.format("Invalid operation (%s) for frozen collection column %s", toString(receiver), receiver.name)); - - switch (((CollectionType)receiver.type).kind) - { - case LIST: - Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver)); - Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver)); - return new Lists.SetterByIndex(receiver, idx, lval); - case SET: - throw new InvalidRequestException(String.format("Invalid operation (%s) for set column %s", toString(receiver), receiver.name)); - case MAP: - Term key = selector.prepare(keyspace, Maps.keySpecOf(receiver)); - Term mval = value.prepare(keyspace, Maps.valueSpecOf(receiver)); - return new Maps.SetterByKey(receiver, key, mval); - } - throw new AssertionError(); - } - protected String toString(ColumnSpecification column) { return String.format("%s[%s] = %s", column.name, selector, value); } - public boolean isCompatibleWith(RawUpdate other) - { - // TODO: we could check that the other operation is not setting the same element - // too (but since the index/key set may be a bind variables we can't always do it at this point) - return !(other instanceof SetValue); - } - } +#endif + virtual bool is_compatible_with(shared_ptr other) override; + }; +#if 0 public static class Addition implements RawUpdate { private final Term.Raw value; From a6b692612aeea371eb3939e6609e7ef1874c629a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 13:35:31 +0200 Subject: [PATCH 14/24] cql3: convert grammer for setting a collection element --- cql3/Cql.g | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cql3/Cql.g b/cql3/Cql.g index 6797179171..8cb4444ae3 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -1158,9 +1158,7 @@ columnOperation[operations_type& operations] columnOperationDifferentiator[operations_type& operations, ::shared_ptr key] : '=' normalColumnOperation[operations, key] -#if 0 | '[' k=term ']' specializedColumnOperation[operations, key, k] -#endif ; normalColumnOperation[operations_type& operations, ::shared_ptr key] @@ -1196,14 +1194,16 @@ normalColumnOperation[operations_type& operations, ::shared_ptr> operations, ColumnIdentifier.Raw key, Term.Raw k] +specializedColumnOperation[std::vector, + shared_ptr>> operations, + shared_ptr key, + shared_ptr k] + : '=' t=term { - addRawUpdate(operations, key, new Operation.SetElement(k, t)); + add_raw_update(operations, key, make_shared(k, t)); } ; -#endif columnCondition[conditions_type& conditions] // Note: we'll reject duplicates later From ded878212c7c6e7726b3c2f91a7724bed7670bc6 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 16:27:02 +0200 Subject: [PATCH 15/24] db: simplify mutation_partition::apply() Since merging cells is a different operation for atomic cells and collections, move it into compare_for_merge(), which is where we check the column type. Rename compare_for_merge to merge_column(), since it now does more than compares. --- database.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/database.cc b/database.cc index 5fd0ca07de..4c2aeede2b 100644 --- a/database.cc +++ b/database.cc @@ -348,12 +348,15 @@ compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) { } static inline -int -compare_for_merge(const column_definition& def, - const std::pair& left, - const std::pair& right) { +void +merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww) { if (def.is_atomic()) { - return compare_atomic_cell_for_merge(left.second.as_atomic_cell(), right.second.as_atomic_cell()); + if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) { + // FIXME: move()? + old = neww; + } } else { fail(unimplemented::cause::COLLECTIONS); } @@ -376,9 +379,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { } else { auto& old_column = *i; auto& def = schema->regular_column_at(col); - if (compare_for_merge(def, old_column, new_column) < 0) { - old_column.second = new_column.second; - } + merge_column(def, old_column.second, new_column.second); } } }; From df22293baf261be4ff0cc72875ccaa3264a6f730 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 17:02:50 +0200 Subject: [PATCH 16/24] atomic_cell: export compare_atomic_cell_for_merge Will be used for merging maps. --- atomic_cell.hh | 1 + database.cc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/atomic_cell.hh b/atomic_cell.hh index 0d17f42038..ccb452aeb8 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -210,3 +210,4 @@ public: } }; +int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right); diff --git a/database.cc b/database.cc index 4c2aeede2b..548d9275d7 100644 --- a/database.cc +++ b/database.cc @@ -326,7 +326,6 @@ column_family::apply(const mutation& m) { } // Based on org.apache.cassandra.db.AbstractCell#reconcile() -static inline int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) { if (left.timestamp() != right.timestamp()) { From 5dbfe3939916cf797b129a06d0c93c52c377853b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 17:03:29 +0200 Subject: [PATCH 17/24] Add combine() template Similar to std::merge(), merges two sorted sequences. Duplicates, however, are merged using a binary function parameter rather than duplicated. --- combine.hh | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 combine.hh diff --git a/combine.hh b/combine.hh new file mode 100644 index 0000000000..71738d2f2b --- /dev/null +++ b/combine.hh @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +// combine two sorted uniqued sequences into a single sorted sequence +// unique elements are copied, duplicate elements are merged with a +// binary function. +template +OutputIterator +combine(InputIterator1 begin1, InputIterator1 end1, + InputIterator2 begin2, InputIterator2 end2, + OutputIterator out, + Compare compare, + Merge merge) { + while (begin1 != end1 && begin2 != end2) { + auto& e1 = *begin1; + auto& e2 = *begin2; + if (compare(e1, e2)) { + *out++ = e1; + ++begin1; + } else if (compare(e2, e1)) { + *out++ = e2; + ++begin2; + } else { + *out++ = merge(e1, e2); + ++begin1; + ++begin2; + } + } + out = std::copy(begin1, end1, out); + out = std::copy(begin2, end2, out); + return out; +} + + From 98f2a51df9b781a0ea30ec18cdf9476641639b76 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 17:04:40 +0200 Subject: [PATCH 18/24] db: implement collection mutation merging Only for maps, as they are the only collection implemented at present. --- database.cc | 3 ++- types.cc | 23 ++++++++++++++++++++++- types.hh | 2 ++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/database.cc b/database.cc index 548d9275d7..9cdfc49b1d 100644 --- a/database.cc +++ b/database.cc @@ -357,7 +357,8 @@ merge_column(const column_definition& def, old = neww; } } else { - fail(unimplemented::cause::COLLECTIONS); + auto ct = static_pointer_cast(def.type); + old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); } } diff --git a/types.cc b/types.cc index 1d22c58fa9..16825987c0 100644 --- a/types.cc +++ b/types.cc @@ -9,6 +9,7 @@ #include "net/ip.hh" #include "database.hh" #include "util/serialization.hh" +#include "combine.hh" #include #include @@ -844,7 +845,27 @@ map_type_impl::serialize_mutation_form(mutation mut) { return collection_mutation::one{bytes(s.data(), s.size())}; } - +collection_mutation::one +map_type_impl::merge(collection_mutation::view a, collection_mutation::view b) { + auto aa = deserialize_mutation_form(a.data); + auto bb = deserialize_mutation_form(b.data); + mutation merged; + merged.reserve(aa.size() + bb.size()); + using element_type = std::pair; + auto compare = [this] (const element_type& e1, const element_type& e2) { + return _keys->less(e1.first, e2.first); + }; + auto merge = [this] (const element_type& e1, const element_type& e2) { + // FIXME: use std::max()? + return std::make_pair(e1.first, compare_atomic_cell_for_merge(e1.second, e2.second) < 0 ? e1.second : e2.second); + }; + combine(aa.begin(), aa.end(), + bb.begin(), bb.end(), + std::back_inserter(merged), + compare, + merge); + return serialize_mutation_form(merged); +} thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_instances; thread_local std::unordered_map, map_type_impl::map_type> map_type_impl::_frozen_instances; diff --git a/types.hh b/types.hh index e111def4bd..b18caac4c1 100644 --- a/types.hh +++ b/types.hh @@ -192,6 +192,7 @@ public: virtual bool is_compatible_with_frozen(collection_type_impl& previous) = 0; virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) = 0; virtual shared_ptr as_cql3_type() override; + virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) = 0; }; using collection_type = shared_ptr; @@ -235,6 +236,7 @@ public: mutation deserialize_mutation_form(bytes_view in); // FIXME: use iterators? collection_mutation::one serialize_mutation_form(mutation mut); + virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) override; }; using map_type = shared_ptr; From 42a9c0f7d3cb45d7261a423b64e61132ef9d9222 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 19:03:29 +0200 Subject: [PATCH 19/24] atomic_cell: export merge_column --- atomic_cell.hh | 5 +++++ database.cc | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/atomic_cell.hh b/atomic_cell.hh index ccb452aeb8..548c0b25a9 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -210,4 +210,9 @@ public: } }; +class column_definition; + int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right); +void merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww); diff --git a/database.cc b/database.cc index 9cdfc49b1d..971f357381 100644 --- a/database.cc +++ b/database.cc @@ -346,7 +346,6 @@ compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) { } } -static inline void merge_column(const column_definition& def, atomic_cell_or_collection& old, From bb0d2a4f03e1546955f66406fd9482cb1ae4ea99 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 19:04:02 +0200 Subject: [PATCH 20/24] db: fix mutation::set_*_cell() applied twice to same column With a collection, setting two separate elements in a collection would cause the second to override the first. This also applies, with much smaller effect, to normal cells (for example, updating the same counter twice, or issuing two updates to the same cell but with different timestamps, via thrift). Fix by merging the two values rather than replacing the old one. --- database.hh | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/database.hh b/database.hh index cbcf96affc..ac65c77ddf 100644 --- a/database.hh +++ b/database.hh @@ -95,26 +95,27 @@ public: mutation(const mutation&) = default; void set_static_cell(const column_definition& def, atomic_cell_or_collection value) { - emplace_or_insert(p.static_row(), def.id, std::move(value)); + update_column(p.static_row(), def, std::move(value)); } void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix)); - emplace_or_insert(row, def.id, std::move(value)); + update_column(row, def, std::move(value)); } void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = p.clustered_row(key); - emplace_or_insert(row, def.id, std::move(value)); + update_column(row, def, std::move(value)); } private: - static void emplace_or_insert(row& row, column_id id, atomic_cell_or_collection&& value) { + static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) { // our mutations are not yet immutable + auto id = def.id; auto i = row.lower_bound(id); if (i == row.end() || i->first != id) { row.emplace_hint(i, id, std::move(value)); } else { - i->second = std::move(value); + merge_column(def, i->second, value); } } friend std::ostream& operator<<(std::ostream& os, const mutation& m); From de2e9f9eeac4e70d5a311028a7787878e9121485 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 19:57:34 +0200 Subject: [PATCH 21/24] db: fix wrong row updated by merge_cells() merge_cells() is called for both static and clustered rows, yet it always updates the static row. Fix by updating the row passed by the caller. --- database.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database.cc b/database.cc index 971f357381..d6ed462ae0 100644 --- a/database.cc +++ b/database.cc @@ -374,7 +374,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { auto col = new_column.first; auto i = old_row.find(col); if (i == old_row.end()) { - _static_row.emplace_hint(i, new_column); + old_row.emplace_hint(i, new_column); } else { auto& old_column = *i; auto& def = schema->regular_column_at(col); From 6f6b1fdb009868be73b1db756045669ef1eae54b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 19:59:18 +0200 Subject: [PATCH 22/24] schema: add static_column_at() Parallels regular_column_at(). --- schema.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/schema.hh b/schema.hh index b7cb3b69d7..d9d169dbb8 100644 --- a/schema.hh +++ b/schema.hh @@ -124,6 +124,9 @@ public: column_definition& regular_column_at(column_id id) { return _regular_columns[id]; } + column_definition& static_column_at(column_id id) { + return _static_columns[id]; + } bool is_last_partition_key(column_definition& def) { return &_partition_key[_partition_key.size() - 1] == &def; } From b77a52398f3d44908a0a40a16aab6da340cbf908 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 19:59:59 +0200 Subject: [PATCH 23/24] db: fix merge_cells using wrong column_definition merge_cells() always used the regular column_definition, even when called for a static row. Fix by parametrizing it with a method to get the column_definition. --- database.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/database.cc b/database.cc index d6ed462ae0..f3334d6a8d 100644 --- a/database.cc +++ b/database.cc @@ -369,7 +369,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { apply_row_tombstone(schema, entry); } - auto merge_cells = [this, schema] (row& old_row, const row& new_row) { + auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); @@ -377,13 +377,16 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { old_row.emplace_hint(i, new_column); } else { auto& old_column = *i; - auto& def = schema->regular_column_at(col); + auto& def = find_column_def(col); merge_column(def, old_column.second, new_column.second); } } }; - merge_cells(_static_row, p._static_row); + auto find_static_column_def = [schema] (auto col) -> column_definition& { return schema->static_column_at(col); }; + auto find_regular_column_def = [schema] (auto col) -> column_definition& { return schema->regular_column_at(col); }; + + merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { auto& key = entry.first; @@ -392,7 +395,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _rows.emplace_hint(i, entry); } else { i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, entry.second.cells); + merge_cells(i->second.cells, entry.second.cells, find_regular_column_def); } } } From 6126670078c5c23b02853cb402e1e5be3cede39c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Mar 2015 20:03:57 +0200 Subject: [PATCH 24/24] mutation_test: test maps --- tests/urchin/mutation_test.cc | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 2689181d8c..e23b833617 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -66,3 +66,36 @@ BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) { m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl)); } + +BOOST_AUTO_TEST_CASE(test_map_mutations) { + auto my_map_type = map_type_impl::get_instance(int32_type, utf8_type, true); + auto s = make_lw_shared(schema(some_keyspace, some_column_family, + {{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type)); + column_family cf(s); + partition_key key = to_bytes("key1"); + auto& column = *s->get_column_definition("s1"); + map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}}; + mutation m1(key, s); + m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1)); + cf.apply(m1); + map_type_impl::mutation mmut2{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102")))}}; + mutation m2(key, s); + m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2)); + cf.apply(m2); + map_type_impl::mutation mmut3{{int32_type->decompose(103), make_atomic_cell(utf8_type->decompose(sstring("103")))}}; + mutation m3(key, s); + m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3)); + cf.apply(m3); + map_type_impl::mutation mmut2o{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102 override")))}}; + mutation m2o(key, s); + m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o)); + cf.apply(m2o); + + row& r = cf.find_or_create_partition(key).static_row(); + auto i = r.find(column.id); + BOOST_REQUIRE(i != r.end()); + auto cell = i->second.as_collection_mutation(); + auto muts = my_map_type->deserialize_mutation_form(cell.data); + BOOST_REQUIRE(muts.size() == 3); + // FIXME: more strict tests +}