Merge branch seastar-dev.git 'collections'

Collections support from Avi.
This commit is contained in:
Tomasz Grabiec
2015-03-05 20:19:04 +01:00
13 changed files with 708 additions and 122 deletions

View File

@@ -6,6 +6,7 @@
#include "bytes.hh"
#include "timestamp.hh"
#include "gc_clock.hh"
#include <cstdint>
template<typename T>
@@ -122,6 +123,9 @@ public:
ttl_opt ttl() const {
return atomic_cell::ttl(_data);
}
bytes_view serialize() const {
return _data;
}
friend class atomic_cell::one;
};
@@ -171,6 +175,22 @@ public:
friend class atomic_cell_or_collection;
};
// Represents a mutation of a collection. Actual format is determined by collection type,
// and is:
// set: list of atomic_cell
// map: list of pair<atomic_cell, bytes> (for key/value)
// list: tbd, probably ugly
class collection_mutation {
public:
struct view {
bytes_view data;
};
struct one {
bytes data;
operator view() const { return { data }; }
};
};
// A variant type that can hold either an atomic_cell, or a serialized collection.
// Which type is stored is determinied by the schema.
class atomic_cell_or_collection final {
@@ -178,8 +198,21 @@ class atomic_cell_or_collection final {
private:
atomic_cell_or_collection(bytes&& data) : _data(std::move(data)) {}
public:
atomic_cell_or_collection(atomic_cell::one ac) : _data(std::move(ac._data)) {}
static atomic_cell_or_collection from_atomic_cell(atomic_cell::one data) { return { std::move(data._data) }; }
atomic_cell::view as_atomic_cell() const { return atomic_cell::view::from_bytes(_data); }
// FIXME: insert collection variant here
atomic_cell_or_collection(collection_mutation::one cm) : _data(std::move(cm.data)) {}
static atomic_cell_or_collection from_collection_mutation(collection_mutation::one data) {
return std::move(data.data);
}
collection_mutation::view as_collection_mutation() const {
return collection_mutation::view{_data};
}
};
class column_definition;
int compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right);
void merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww);

41
combine.hh Normal file
View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
// combine two sorted uniqued sequences into a single sorted sequence
// unique elements are copied, duplicate elements are merged with a
// binary function.
template <typename InputIterator1,
typename InputIterator2,
typename OutputIterator,
typename Compare,
typename Merge>
OutputIterator
combine(InputIterator1 begin1, InputIterator1 end1,
InputIterator2 begin2, InputIterator2 end2,
OutputIterator out,
Compare compare,
Merge merge) {
while (begin1 != end1 && begin2 != end2) {
auto& e1 = *begin1;
auto& e2 = *begin2;
if (compare(e1, e2)) {
*out++ = e1;
++begin1;
} else if (compare(e2, e1)) {
*out++ = e2;
++begin2;
} else {
*out++ = merge(e1, e2);
++begin1;
++begin2;
}
}
out = std::copy(begin1, end1, out);
out = std::copy(begin2, end2, out);
return out;
}

View File

@@ -255,6 +255,7 @@ urchin_core = (['database.cc',
'cql3/abstract_marker.cc',
'cql3/cql3.cc',
'cql3/cql3_type.cc',
'cql3/operation.cc',
'cql3/functions/functions.cc',
'cql3/statements/modification_statement.cc',
'cql3/statements/update_statement.cc',

View File

@@ -1158,9 +1158,7 @@ columnOperation[operations_type& operations]
columnOperationDifferentiator[operations_type& operations, ::shared_ptr<cql3::column_identifier::raw> key]
: '=' normalColumnOperation[operations, key]
#if 0
| '[' k=term ']' specializedColumnOperation[operations, key, k]
#endif
;
normalColumnOperation[operations_type& operations, ::shared_ptr<cql3::column_identifier::raw> key]
@@ -1196,14 +1194,16 @@ normalColumnOperation[operations_type& operations, ::shared_ptr<cql3::column_ide
#endif
;
#if 0
specializedColumnOperation[List<Pair<ColumnIdentifier.Raw, Operation.RawUpdate>> operations, ColumnIdentifier.Raw key, Term.Raw k]
specializedColumnOperation[std::vector<std::pair<shared_ptr<cql3::column_identifier::raw>,
shared_ptr<cql3::operation::raw_update>>> operations,
shared_ptr<cql3::column_identifier::raw> key,
shared_ptr<cql3::term::raw> k]
: '=' t=term
{
addRawUpdate(operations, key, new Operation.SetElement(k, t));
add_raw_update(operations, key, make_shared<cql3::operation::set_element>(k, t));
}
;
#endif
columnCondition[conditions_type& conditions]
// Note: we'll reject duplicates later

View File

@@ -27,6 +27,8 @@
#include "cql3/abstract_marker.hh"
#include "cql3/term.hh"
#include "operation.hh"
#include "update_parameters.hh"
namespace cql3 {
@@ -60,20 +62,20 @@ import org.apache.cassandra.utils.Pair;
* Static helper methods and classes for maps.
*/
class maps {
private:
maps() = delete;
public:
#if 0
private Maps() {}
public static ColumnSpecification keySpecOf(ColumnSpecification column)
{
return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("key(" + column.name + ")", true), ((MapType)column.type).getKeysType());
static shared_ptr<column_specification> key_spec_of(column_specification& column) {
return ::make_shared<column_specification>(column.ks_name, column.cf_name,
::make_shared<column_identifier>(sprint("key(%s)", *column.name), true),
dynamic_pointer_cast<map_type_impl>(column.type)->get_keys_type());
}
public static ColumnSpecification valueSpecOf(ColumnSpecification column)
{
return new ColumnSpecification(column.ksName, column.cfName, new ColumnIdentifier("value(" + column.name + ")", true), ((MapType)column.type).getValuesType());
static shared_ptr<column_specification> value_spec_of(column_specification& column) {
return ::make_shared<column_specification>(column.ks_name, column.cf_name,
::make_shared<column_identifier>(sprint("value(%s)", *column.name), true),
dynamic_pointer_cast<map_type_impl>(column.type)->get_values_type());
}
#endif
class literal : public term::raw {
public:
@@ -329,51 +331,47 @@ public:
Putter.doPut(t, cf, prefix, column, params);
}
}
#endif
public static class SetterByKey extends Operation
{
private final Term k;
public SetterByKey(ColumnDefinition column, Term k, Term t)
{
super(column, t);
this.k = k;
class setter_by_key : public operation {
const shared_ptr<term> _k;
public:
setter_by_key(column_definition& column, shared_ptr<term> k, shared_ptr<term> t)
: operation(column, std::move(t)), _k(std::move(k)) {
}
@Override
public void collectMarkerSpecification(VariableSpecifications boundNames)
{
super.collectMarkerSpecification(boundNames);
k.collectMarkerSpecification(boundNames);
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names) override {
operation::collect_marker_specification(bound_names);
_k->collect_marker_specification(bound_names);
}
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
assert column.type.isMultiCell() : "Attempted to set a value for a single key on a frozen map";
ByteBuffer key = k.bindAndGet(params.options);
ByteBuffer value = t.bindAndGet(params.options);
if (key == null)
throw new InvalidRequestException("Invalid null map key");
CellName cellName = cf.getComparator().create(prefix, column, key);
if (value == null)
{
cf.addColumn(params.makeTombstone(cellName));
virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override {
using exceptions::invalid_request_exception;
assert(column.type->is_multi_cell()); // "Attempted to set a value for a single key on a frozen map"m
bytes_opt key = _k->bind_and_get(params._options);
bytes_opt value = _t->bind_and_get(params._options);
if (!key) {
throw invalid_request_exception("Invalid null map key");
}
else
{
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
throw new InvalidRequestException(String.format("Map value is too long. Map values are limited to %d bytes but %d bytes value provided",
FBUtilities.MAX_UNSIGNED_SHORT,
value.remaining()));
cf.addColumn(params.makeColumn(cellName, value));
if (value && value->size() >= std::numeric_limits<uint16_t>::max()) {
throw invalid_request_exception(
sprint("Map value is too long. Map values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(),
value->size()));
}
auto avalue = value ? params.make_cell(*value) : params.make_dead_cell();
map_type_impl::mutation update = { { std::move(*key), std::move(avalue) } };
// should have been verified as map earlier?
auto ctype = static_pointer_cast<map_type_impl>(column.type);
auto col_mut = ctype->serialize_mutation_form(std::move(update));
if (column.is_static()) {
m.set_static_cell(column, std::move(col_mut));
} else {
m.set_clustered_cell(prefix, column, std::move(col_mut));
}
}
}
};
#if 0
public static class Putter extends Operation
{
public Putter(ColumnDefinition column, Term t)

65
cql3/operation.cc Normal file
View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "operation.hh"
#include "operation_impl.hh"
#include "maps.hh"
namespace cql3 {
shared_ptr<operation>
operation::set_element::prepare(const sstring& keyspace, column_definition& receiver) {
using exceptions::invalid_request_exception;
auto rtype = dynamic_pointer_cast<collection_type_impl>(receiver.type);
if (!rtype) {
throw invalid_request_exception(sprint("Invalid operation (%s) for non collection column %s", receiver, receiver.name()));
} else if (!rtype->is_multi_cell()) {
throw invalid_request_exception(sprint("Invalid operation (%s) for frozen collection column %s", receiver, receiver.name()));
}
if (&rtype->_kind == &collection_type_impl::kind::list) {
abort();
// FIXME:
#if 0
Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver));
Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver));
return new Lists.SetterByIndex(receiver, idx, lval);
#endif
} else if (&rtype->_kind == &collection_type_impl::kind::set) {
throw invalid_request_exception(sprint("Invalid operation (%s) for set column %s", receiver, receiver.name()));
} else if (&rtype->_kind == &collection_type_impl::kind::map) {
auto key = _selector->prepare(keyspace, maps::key_spec_of(*receiver.column_specification));
auto mval = _value->prepare(keyspace, maps::value_spec_of(*receiver.column_specification));
return make_shared<maps::setter_by_key>(receiver, key, mval);
}
abort();
}
bool
operation::set_element::is_compatible_with(shared_ptr<raw_update> other) {
// TODO: we could check that the other operation is not setting the same element
// too (but since the index/key set may be a bind variables we can't always do it at this point)
return !dynamic_pointer_cast<set_value>(std::move(other));
}
}

View File

@@ -26,13 +26,16 @@
#define CQL3_OPERATION_HH
#include "core/shared_ptr.hh"
#include "exceptions/exceptions.hh"
#include "database.hh"
#include "term.hh"
#include <experimental/optional>
namespace cql3 {
class update_parameters;
#if 0
package org.apache.cassandra.cql3;
@@ -77,6 +80,8 @@ public:
, _t{t}
{ }
virtual ~operation() {}
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const {
return _t && _t->uses_function(ks_name, function_name);
}
@@ -172,54 +177,26 @@ public:
class set_value;
class set_element : public raw_update {
const shared_ptr<term::raw> _selector;
const shared_ptr<term::raw> _value;
public:
set_element(shared_ptr<term::raw> selector, shared_ptr<term::raw> value)
: _selector(std::move(selector)), _value(std::move(value)) {
}
virtual shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver);
#if 0
public static class SetElement implements RawUpdate
{
private final Term.Raw selector;
private final Term.Raw value;
public SetElement(Term.Raw selector, Term.Raw value)
{
this.selector = selector;
this.value = value;
}
public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException
{
if (!(receiver.type instanceof CollectionType))
throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver.name));
else if (!(receiver.type.isMultiCell()))
throw new InvalidRequestException(String.format("Invalid operation (%s) for frozen collection column %s", toString(receiver), receiver.name));
switch (((CollectionType)receiver.type).kind)
{
case LIST:
Term idx = selector.prepare(keyspace, Lists.indexSpecOf(receiver));
Term lval = value.prepare(keyspace, Lists.valueSpecOf(receiver));
return new Lists.SetterByIndex(receiver, idx, lval);
case SET:
throw new InvalidRequestException(String.format("Invalid operation (%s) for set column %s", toString(receiver), receiver.name));
case MAP:
Term key = selector.prepare(keyspace, Maps.keySpecOf(receiver));
Term mval = value.prepare(keyspace, Maps.valueSpecOf(receiver));
return new Maps.SetterByKey(receiver, key, mval);
}
throw new AssertionError();
}
protected String toString(ColumnSpecification column)
{
return String.format("%s[%s] = %s", column.name, selector, value);
}
public boolean isCompatibleWith(RawUpdate other)
{
// TODO: we could check that the other operation is not setting the same element
// too (but since the index/key set may be a bind variables we can't always do it at this point)
return !(other instanceof SetValue);
}
}
#endif
virtual bool is_compatible_with(shared_ptr<raw_update> other) override;
};
#if 0
public static class Addition implements RawUpdate
{
private final Term.Raw value;

View File

@@ -326,7 +326,6 @@ column_family::apply(const mutation& m) {
}
// Based on org.apache.cassandra.db.AbstractCell#reconcile()
static inline
int
compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) {
if (left.timestamp() != right.timestamp()) {
@@ -347,15 +346,18 @@ compare_atomic_cell_for_merge(atomic_cell::view left, atomic_cell::view right) {
}
}
static inline
int
compare_for_merge(const column_definition& def,
const std::pair<column_id, atomic_cell_or_collection>& left,
const std::pair<column_id, atomic_cell_or_collection>& right) {
void
merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww) {
if (def.is_atomic()) {
return compare_atomic_cell_for_merge(left.second.as_atomic_cell(), right.second.as_atomic_cell());
if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) {
// FIXME: move()?
old = neww;
}
} else {
fail(unimplemented::cause::COLLECTIONS);
auto ct = static_pointer_cast<collection_type_impl>(def.type);
old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation());
}
}
@@ -367,23 +369,24 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
apply_row_tombstone(schema, entry);
}
auto merge_cells = [this, schema] (row& old_row, const row& new_row) {
auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) {
for (auto&& new_column : new_row) {
auto col = new_column.first;
auto i = old_row.find(col);
if (i == old_row.end()) {
_static_row.emplace_hint(i, new_column);
old_row.emplace_hint(i, new_column);
} else {
auto& old_column = *i;
auto& def = schema->regular_column_at(col);
if (compare_for_merge(def, old_column, new_column) < 0) {
old_column.second = new_column.second;
}
auto& def = find_column_def(col);
merge_column(def, old_column.second, new_column.second);
}
}
};
merge_cells(_static_row, p._static_row);
auto find_static_column_def = [schema] (auto col) -> column_definition& { return schema->static_column_at(col); };
auto find_regular_column_def = [schema] (auto col) -> column_definition& { return schema->regular_column_at(col); };
merge_cells(_static_row, p._static_row, find_static_column_def);
for (auto&& entry : p._rows) {
auto& key = entry.first;
@@ -392,7 +395,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
_rows.emplace_hint(i, entry);
} else {
i->second.t.apply(entry.second.t);
merge_cells(i->second.cells, entry.second.cells);
merge_cells(i->second.cells, entry.second.cells, find_regular_column_def);
}
}
}

View File

@@ -94,27 +94,28 @@ public:
mutation(mutation&&) = default;
mutation(const mutation&) = default;
void set_static_cell(const column_definition& def, atomic_cell::one value) {
emplace_or_insert(p.static_row(), def.id, std::move(value));
void set_static_cell(const column_definition& def, atomic_cell_or_collection value) {
update_column(p.static_row(), def, std::move(value));
}
void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell::one value) {
void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix));
emplace_or_insert(row, def.id, std::move(value));
update_column(row, def, std::move(value));
}
void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell::one value) {
void set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = p.clustered_row(key);
emplace_or_insert(row, def.id, std::move(value));
update_column(row, def, std::move(value));
}
private:
static void emplace_or_insert(row& row, column_id id, atomic_cell::one&& value) {
static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) {
// our mutations are not yet immutable
auto id = def.id;
auto i = row.lower_bound(id);
if (i == row.end() || i->first != id) {
row.emplace_hint(i, id, atomic_cell_or_collection::from_atomic_cell(std::move(value)));
row.emplace_hint(i, id, std::move(value));
} else {
i->second = atomic_cell_or_collection::from_atomic_cell(std::move(value));
merge_column(def, i->second, value);
}
}
friend std::ostream& operator<<(std::ostream& os, const mutation& m);

View File

@@ -34,6 +34,9 @@ public:
bool is_compact_value() const;
const sstring& name_as_text() const;
const bytes& name() const;
friend std::ostream& operator<<(std::ostream& os, const column_definition& cd) {
return os << cd.name_as_text();
}
};
struct thrift_schema {
@@ -121,6 +124,9 @@ public:
column_definition& regular_column_at(column_id id) {
return _regular_columns[id];
}
column_definition& static_column_at(column_id id) {
return _static_columns[id];
}
bool is_last_partition_key(column_definition& def) {
return &_partition_key[_partition_key.size() - 1] == &def;
}

View File

@@ -66,3 +66,36 @@ BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl));
}
BOOST_AUTO_TEST_CASE(test_map_mutations) {
auto my_map_type = map_type_impl::get_instance(int32_type, utf8_type, true);
auto s = make_lw_shared(schema(some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", int32_type}}, {}, {{"s1", my_map_type}}, utf8_type));
column_family cf(s);
partition_key key = to_bytes("key1");
auto& column = *s->get_column_definition("s1");
map_type_impl::mutation mmut1{{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}};
mutation m1(key, s);
m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1));
cf.apply(m1);
map_type_impl::mutation mmut2{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102")))}};
mutation m2(key, s);
m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2));
cf.apply(m2);
map_type_impl::mutation mmut3{{int32_type->decompose(103), make_atomic_cell(utf8_type->decompose(sstring("103")))}};
mutation m3(key, s);
m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3));
cf.apply(m3);
map_type_impl::mutation mmut2o{{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102 override")))}};
mutation m2o(key, s);
m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o));
cf.apply(m2o);
row& r = cf.find_or_create_partition(key).static_row();
auto i = r.find(column.id);
BOOST_REQUIRE(i != r.end());
auto cell = i->second.as_collection_mutation();
auto muts = my_map_type->deserialize_mutation_form(cell.data);
BOOST_REQUIRE(muts.size() == 3);
// FIXME: more strict tests
}

330
types.cc
View File

@@ -7,7 +7,11 @@
#include "types.hh"
#include "core/print.hh"
#include "net/ip.hh"
#include "database.hh"
#include "util/serialization.hh"
#include "combine.hh"
#include <cmath>
#include <sstream>
template<typename T>
struct simple_type_traits {
@@ -541,6 +545,332 @@ struct float_type_impl : floating_type_impl<float> {
};
thread_local logging::logger collection_type_impl::_logger("collection_type_impl");
const size_t collection_type_impl::max_elements;
const collection_type_impl::kind collection_type_impl::kind::map(
[] (shared_ptr<cql3::column_specification> collection, bool is_key) -> shared_ptr<cql3::column_specification> {
// FIXME: implement
// return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection);
abort();
});
const collection_type_impl::kind collection_type_impl::kind::set(
[] (shared_ptr<cql3::column_specification> collection, bool is_key) -> shared_ptr<cql3::column_specification> {
// FIXME: implement
// return Sets.valueSpecOf(collection);
abort();
});
const collection_type_impl::kind collection_type_impl::kind::list(
[] (shared_ptr<cql3::column_specification> collection, bool is_key) -> shared_ptr<cql3::column_specification> {
// FIXME: implement
// return Lists.valueSpecOf(collection);
abort();
});
shared_ptr<cql3::column_specification>
collection_type_impl::kind::make_collection_receiver(shared_ptr<cql3::column_specification> collection, bool is_key) const {
return _impl(std::move(collection), is_key);
}
shared_ptr<cql3::column_specification>
collection_type_impl::make_collection_receiver(shared_ptr<cql3::column_specification> collection, bool is_key) {
return _kind.make_collection_receiver(std::move(collection), is_key);
}
std::vector<atomic_cell::one>
collection_type_impl::enforce_limit(std::vector<atomic_cell::one> cells, int version) {
assert(is_multi_cell());
if (version >= 3 || cells.size() <= max_elements) {
return cells;
}
_logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. "
"Please see http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.", cells.size(), max_elements, max_elements);
cells.erase(cells.begin() + max_elements, cells.end());
return cells;
}
bytes
collection_type_impl::serialize_for_native_protocol(std::vector<atomic_cell::one> cells, int version) {
assert(is_multi_cell());
cells = enforce_limit(std::move(cells), version);
std::vector<bytes> values = serialized_values(std::move(cells));
// FIXME: implement
abort();
// return CollectionSerializer.pack(values, cells.size(), version);
}
bool
collection_type_impl::is_compatible_with(abstract_type& previous) {
// FIXME: implement
abort();
}
shared_ptr<cql3::cql3_type>
collection_type_impl::as_cql3_type() {
// FIXME: implement
abort();
}
int read_collection_size(bytes_view& in, int version) {
if (version >= 3) {
return read_simple<int32_t>(in);
} else {
return read_simple<uint16_t>(in);
}
}
void write_collection_size(std::ostream& out, int size, int version) {
if (version >= 3) {
serialize_int32(out, size);
} else {
serialize_int16(out, uint16_t(size));
}
}
bytes read_collection_value(bytes_view& in, int version) {
auto size = version >= 3 ? read_simple<int32_t>(in) : read_simple<uint16_t>(in);
auto v = read_simple_bytes(in, size);
return bytes(v.begin(), v.end());
}
void write_collection_value(std::ostream& out, int version, data_type type, const boost::any& value) {
// We have to copy here, because we can't guess the size.
// FIXME: somehow.
std::ostringstream tmp;
type->serialize(value, tmp);
auto val_bytes = tmp.str();
if (version >= 3) {
serialize_int32(out, int32_t(val_bytes.size()));
} else {
serialize_int16(out, uint16_t(val_bytes.size()));
}
out.rdbuf()->sputn(val_bytes.data(), val_bytes.size());
}
namespace std {
template <>
struct hash<pair<data_type, data_type>> : private std::hash<data_type> {
size_t operator()(const pair<data_type, data_type>& p) const {
// don't simply xor, it will generate the same result for sequential
// pointers
auto f = hash<data_type>::operator()(p.first);
auto s = hash<data_type>::operator()(p.second);
return f ^ ((s << 7) | s >> (std::numeric_limits<decltype(s)>::digits - 7));
}
};
}
shared_ptr<map_type_impl>
map_type_impl::get_instance(data_type keys, data_type values, bool is_multi_cell) {
auto& map = is_multi_cell ? _instances : _frozen_instances;
auto p = std::make_pair(keys, values);
auto i = map.find(p);
if (i == map.end()) {
auto t = make_shared<map_type_impl>(keys, values, is_multi_cell);
i = map.insert(std::make_pair(std::move(p), std::move(t))).first;
}
return i->second;
}
map_type_impl::map_type_impl(data_type keys, data_type values, bool is_multi_cell)
: collection_type_impl("map<" + keys->name() + ", " + values->name() + ">", kind::map)
, _keys(std::move(keys))
, _values(std::move(values))
, _is_multi_cell(is_multi_cell) {
}
data_type
map_type_impl::freeze() {
if (_is_multi_cell) {
return get_instance(_keys, _values, false);
} else {
return shared_from_this();
}
}
bool
map_type_impl::is_compatible_with_frozen(collection_type_impl& previous) {
assert(!_is_multi_cell);
auto* p = dynamic_cast<map_type_impl*>(&previous);
if (!p) {
return false;
}
return _keys->is_compatible_with(*p->_keys)
&& _values->is_compatible_with(*p->_values);
}
bool
map_type_impl::is_value_compatible_with_frozen(collection_type_impl& previous) {
assert(!_is_multi_cell);
auto* p = dynamic_cast<map_type_impl*>(&previous);
if (!p) {
return false;
}
return _keys->is_compatible_with(*p->_keys)
&& _values->is_value_compatible_with(*p->_values);
}
bool
map_type_impl::less(bytes_view o1, bytes_view o2) {
return compare_maps(_keys, _values, o1, o2) < 0;
}
int32_t
map_type_impl::compare_maps(data_type keys, data_type values, bytes_view o1, bytes_view o2) {
if (o1.empty()) {
return o2.empty() ? 0 : -1;
} else if (o2.empty()) {
return 1;
}
int protocol_version = 3;
int size1 = read_collection_size(o1, protocol_version);
int size2 = read_collection_size(o2, protocol_version);
// FIXME: use std::lexicographical_compare()
for (int i = 0; i < std::min(size1, size2); ++i) {
auto k1 = read_collection_value(o1, protocol_version);
auto k2 = read_collection_value(o2, protocol_version);
auto cmp = keys->compare(k1, k2);
if (cmp != 0) {
return cmp;
}
auto v1 = read_collection_value(o1, protocol_version);
auto v2 = read_collection_value(o2, protocol_version);
cmp = values->compare(v1, v2);
if (cmp != 0) {
return cmp;
}
}
return size1 == size2 ? 0 : (size1 < size2 ? -1 : 1);
}
void
map_type_impl::serialize(const boost::any& value, std::ostream& out) {
return serialize(value, out, 3);
}
void
map_type_impl::serialize(const boost::any& value, std::ostream& out, int protocol_version) {
auto& m = boost::any_cast<const native_type&>(value);
write_collection_size(out, m.size(), protocol_version);
for (auto&& kv : m) {
write_collection_value(out, protocol_version, _keys, kv.first);
write_collection_value(out, protocol_version, _values, kv.second);
}
}
object_opt
map_type_impl::deserialize(bytes_view v) {
return deserialize(v, 3);
}
object_opt
map_type_impl::deserialize(bytes_view in, int protocol_version) {
if (in.empty()) {
return {};
}
native_type m;
auto size = read_collection_size(in, protocol_version);
for (int i = 0; i < size; ++i) {
bytes kb = read_collection_value(in, protocol_version);
auto k = _keys->deserialize(kb);
bytes vb = read_collection_value(in, protocol_version);
auto v = _values->deserialize(vb);
m.insert(m.end(), std::make_pair(std::move(k), std::move(v)));
}
return { std::move(m) };
}
sstring
map_type_impl::to_string(const bytes& b) {
// FIXME:
abort();
}
size_t
map_type_impl::hash(bytes_view v) {
// FIXME:
abort();
}
bytes
map_type_impl::from_string(sstring_view text) {
// FIXME:
abort();
}
std::vector<bytes>
map_type_impl::serialized_values(std::vector<atomic_cell::one> cells) {
// FIXME:
abort();
}
auto map_type_impl::deserialize_mutation_form(bytes_view in) -> mutation {
auto nr = read_simple<uint32_t>(in);
mutation ret;
ret.reserve(nr);
for (uint32_t i = 0; i != nr; ++i) {
// FIXME: we could probably avoid the need for size
auto ksize = read_simple<uint32_t>(in);
auto key = read_simple_bytes(in, ksize);
auto vsize = read_simple<uint32_t>(in);
auto value = atomic_cell::view::from_bytes(read_simple_bytes(in, vsize));
ret.emplace_back(key, value);
}
assert(in.empty());
return ret;
}
collection_mutation::one
map_type_impl::serialize_mutation_form(mutation mut) {
std::ostringstream out;
auto write32 = [&out] (uint32_t v) {
v = net::hton(v);
out.write(reinterpret_cast<char*>(&v), sizeof(v));
};
auto writeb = [&out, write32] (bytes_view v) {
write32(v.size());
out.write(v.begin(), v.size());
};
// FIXME: overflow?
write32(mut.size());
for (auto&& kv : mut) {
auto&& k = kv.first;
auto&& v = kv.second;
writeb(k);
writeb(v.serialize());
}
auto s = out.str();
return collection_mutation::one{bytes(s.data(), s.size())};
}
collection_mutation::one
map_type_impl::merge(collection_mutation::view a, collection_mutation::view b) {
auto aa = deserialize_mutation_form(a.data);
auto bb = deserialize_mutation_form(b.data);
mutation merged;
merged.reserve(aa.size() + bb.size());
using element_type = std::pair<bytes_view, atomic_cell::view>;
auto compare = [this] (const element_type& e1, const element_type& e2) {
return _keys->less(e1.first, e2.first);
};
auto merge = [this] (const element_type& e1, const element_type& e2) {
// FIXME: use std::max()?
return std::make_pair(e1.first, compare_atomic_cell_for_merge(e1.second, e2.second) < 0 ? e1.second : e2.second);
};
combine(aa.begin(), aa.end(),
bb.begin(), bb.end(),
std::back_inserter(merged),
compare,
merge);
return serialize_mutation_form(merged);
}
thread_local std::unordered_map<std::pair<data_type, data_type>, map_type_impl::map_type> map_type_impl::_instances;
thread_local std::unordered_map<std::pair<data_type, data_type>, map_type_impl::map_type> map_type_impl::_frozen_instances;
thread_local shared_ptr<abstract_type> int32_type(make_shared<int32_type_impl>());
thread_local shared_ptr<abstract_type> long_type(make_shared<long_type_impl>());
thread_local shared_ptr<abstract_type> ascii_type(make_shared<string_type_impl>("ascii", cql3::native_cql3_type::ascii));

100
types.hh
View File

@@ -16,10 +16,13 @@
#include "net/byteorder.hh"
#include "db_clock.hh"
#include "bytes.hh"
#include "log.hh"
#include "atomic_cell.hh"
namespace cql3 {
class cql3_type;
class column_specification;
}
@@ -48,7 +51,7 @@ inline int32_t compare_unsigned(bytes_view v1, bytes_view v2) {
return (int32_t) (v1.size() - v2.size());
}
class abstract_type {
class abstract_type : public enable_shared_from_this<abstract_type> {
sstring _name;
public:
abstract_type(sstring name) : _name(name) {}
@@ -150,10 +153,94 @@ public:
virtual bool is_collection() { return false; }
virtual bool is_multi_cell() { return false; }
virtual ::shared_ptr<cql3::cql3_type> as_cql3_type() = 0;
virtual shared_ptr<abstract_type> freeze() { return shared_from_this(); }
};
using data_type = shared_ptr<abstract_type>;
class collection_type_impl : public abstract_type {
static thread_local logging::logger _logger;
public:
static constexpr const size_t max_elements = 65535;
class kind {
std::function<shared_ptr<cql3::column_specification> (shared_ptr<cql3::column_specification> collection, bool is_key)> _impl;
public:
kind(std::function<shared_ptr<cql3::column_specification> (shared_ptr<cql3::column_specification> collection, bool is_key)> impl)
: _impl(std::move(impl)) {}
shared_ptr<cql3::column_specification> make_collection_receiver(shared_ptr<cql3::column_specification> collection, bool is_key) const;
static const kind map;
static const kind set;
static const kind list;
};
const kind& _kind;
protected:
explicit collection_type_impl(sstring name, const kind& k)
: abstract_type(std::move(name)), _kind(k) {}
public:
virtual data_type name_comparator() = 0;
virtual data_type value_comparator() = 0;
shared_ptr<cql3::column_specification> make_collection_receiver(shared_ptr<cql3::column_specification> collection, bool is_key);
virtual bool is_collection() override { return true; }
bool is_map() const { return &_kind == &kind::map; }
std::vector<atomic_cell::one> enforce_limit(std::vector<atomic_cell::one>, int version);
virtual std::vector<bytes> serialized_values(std::vector<atomic_cell::one> cells) = 0;
bytes serialize_for_native_protocol(std::vector<atomic_cell::one> cells, int version);
virtual bool is_compatible_with(abstract_type& previous) override;
virtual bool is_compatible_with_frozen(collection_type_impl& previous) = 0;
virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) = 0;
virtual shared_ptr<cql3::cql3_type> as_cql3_type() override;
virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) = 0;
};
using collection_type = shared_ptr<collection_type_impl>;
class map_type_impl final : public collection_type_impl {
using map_type = shared_ptr<map_type_impl>;
static thread_local std::unordered_map<std::pair<data_type, data_type>, map_type> _instances;
static thread_local std::unordered_map<std::pair<data_type, data_type>, map_type> _frozen_instances;
data_type _keys;
data_type _values;
data_type _key_value_pair_type;
bool _is_multi_cell;
public:
// type returned by deserialize() and expected by serialize
// does not support mutations/ttl/tombstone - purely for I/O.
using native_type = std::vector<std::pair<boost::any, boost::any>>;
// representation of a map mutation, key/value pairs, value is a mutation itself
using mutation = std::vector<std::pair<bytes_view, atomic_cell::view>>;
static shared_ptr<map_type_impl> get_instance(data_type keys, data_type values, bool is_multi_cell);
map_type_impl(data_type keys, data_type values, bool is_multi_cell);
data_type get_keys_type() const { return _keys; }
data_type get_values_type() const { return _values; }
virtual data_type name_comparator() override { return _keys; }
virtual data_type value_comparator() override { return _values; }
virtual bool is_multi_cell() override { return _is_multi_cell; }
virtual data_type freeze() override;
virtual bool is_compatible_with_frozen(collection_type_impl& previous) override;
virtual bool is_value_compatible_with_frozen(collection_type_impl& previous) override;
virtual bool less(bytes_view o1, bytes_view o2) override;
static int32_t compare_maps(data_type keys_comparator, data_type values_comparator,
bytes_view o1, bytes_view o2);
virtual bool is_byte_order_comparable() const override { return false; }
virtual void serialize(const boost::any& value, std::ostream& out) override;
void serialize(const boost::any& value, std::ostream& out, int protocol_version);
virtual object_opt deserialize(bytes_view v) override;
object_opt deserialize(bytes_view v, int protocol_version);
virtual sstring to_string(const bytes& b) override;
virtual size_t hash(bytes_view v) override;
virtual bytes from_string(sstring_view text) override;
virtual std::vector<bytes> serialized_values(std::vector<atomic_cell::one> cells) override;
mutation deserialize_mutation_form(bytes_view in);
// FIXME: use iterators?
collection_mutation::one serialize_mutation_form(mutation mut);
virtual collection_mutation::one merge(collection_mutation::view a, collection_mutation::view b) override;
};
using map_type = shared_ptr<map_type_impl>;
inline
size_t hash_value(const shared_ptr<abstract_type>& x) {
return std::hash<abstract_type*>()(x.get());
@@ -323,6 +410,17 @@ T read_simple_exactly(bytes_view& v) {
return net::ntoh(*reinterpret_cast<const net::packed<T>*>(p));
}
inline
bytes_view
read_simple_bytes(bytes_view& v, size_t n) {
if (v.size() < n) {
throw marshal_exception();
}
bytes_view ret(v.begin(), n);
v.remove_prefix(n);
return ret;
}
template<typename T>
object_opt read_simple_opt(bytes_view& v) {
if (v.empty()) {