Merge remote-tracking branch 'dev/penberg/create-table-stmt/v2'

From Pekka:

This patch series adds support for _parsing_ CQL create table
statements. Changes to underlying schema are done by the migration
manager which is not implemented yet.
This commit is contained in:
Tomasz Grabiec
2015-03-24 14:12:02 +01:00
32 changed files with 1346 additions and 1038 deletions

View File

@@ -266,7 +266,10 @@ urchin_core = (['database.cc',
'cql3/cql3.cc',
'cql3/cql3_type.cc',
'cql3/operation.cc',
'cql3/lists.cc',
'cql3/sets.cc',
'cql3/functions/functions.cc',
'cql3/statements/cf_prop_defs.cc',
'cql3/statements/schema_altering_statement.cc',
'cql3/statements/modification_statement.cc',
'cql3/statements/update_statement.cc',

View File

@@ -31,6 +31,7 @@ options {
@parser::includes {
#include "cql3/statements/create_keyspace_statement.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/property_definitions.hh"
#include "cql3/statements/select_statement.hh"
#include "cql3/statements/update_statement.hh"
@@ -280,8 +281,8 @@ cqlStatement returns [shared_ptr<parsed_statement> stmt]
| st7= truncateStatement { $stmt = st7; }
#endif
| st8= createKeyspaceStatement { $stmt = st8; }
#if 0
| st9= createTableStatement { $stmt = st9; }
#if 0
| st10=createIndexStatement { $stmt = st10; }
| st11=dropKeyspaceStatement { $stmt = st11; }
| st12=dropTableStatement { $stmt = st12; }
@@ -667,7 +668,6 @@ createKeyspaceStatement returns [shared_ptr<cql3::statements::create_keyspace_st
K_WITH properties[attrs] { $expr = make_shared<cql3::statements::create_keyspace_statement>(ks, attrs, if_not_exists); }
;
#if 0
/**
* CREATE COLUMNFAMILY [IF NOT EXISTS] <CF> (
* <name1> <type>,
@@ -675,41 +675,44 @@ createKeyspaceStatement returns [shared_ptr<cql3::statements::create_keyspace_st
* <name3> <type>
* ) WITH <property> = <value> AND ...;
*/
createTableStatement returns [CreateTableStatement.RawStatement expr]
@init { boolean ifNotExists = false; }
: K_CREATE K_COLUMNFAMILY (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
cf=columnFamilyName { $expr = new CreateTableStatement.RawStatement(cf, ifNotExists); }
createTableStatement returns [shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
@init { bool if_not_exists = false; }
: K_CREATE K_COLUMNFAMILY (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
cf=columnFamilyName { $expr = make_shared<cql3::statements::create_table_statement::raw_statement>(cf, if_not_exists); }
cfamDefinition[expr]
;
cfamDefinition[CreateTableStatement.RawStatement expr]
cfamDefinition[shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
: '(' cfamColumns[expr] ( ',' cfamColumns[expr]? )* ')'
( K_WITH cfamProperty[expr] ( K_AND cfamProperty[expr] )*)?
;
cfamColumns[CreateTableStatement.RawStatement expr]
: k=ident v=comparatorType { boolean isStatic=false; } (K_STATIC {isStatic = true;})? { $expr.addDefinition(k, v, isStatic); }
(K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=ident { $expr.addColumnAlias(c); } )* ')'
cfamColumns[shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
@init { bool is_static=false; }
: k=ident v=comparatorType (K_STATIC {is_static = true;})? { $expr->add_definition(k, v, is_static); }
(K_PRIMARY K_KEY { $expr->add_key_aliases(std::vector<shared_ptr<cql3::column_identifier>>{k}); })?
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=ident { $expr->add_column_alias(c); } )* ')'
;
pkDef[CreateTableStatement.RawStatement expr]
: k=ident { $expr.addKeyAliases(Collections.singletonList(k)); }
| '(' { List<ColumnIdentifier> l = new ArrayList<ColumnIdentifier>(); } k1=ident { l.add(k1); } ( ',' kn=ident { l.add(kn); } )* ')' { $expr.addKeyAliases(l); }
pkDef[shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
@init { std::vector<shared_ptr<cql3::column_identifier>> l; }
: k=ident { $expr->add_key_aliases(std::vector<shared_ptr<cql3::column_identifier>>{k}); }
| '(' k1=ident { l.push_back(k1); } ( ',' kn=ident { l.push_back(kn); } )* ')' { $expr->add_key_aliases(l); }
;
cfamProperty[CreateTableStatement.RawStatement expr]
: property[expr.properties]
| K_COMPACT K_STORAGE { $expr.setCompactStorage(); }
cfamProperty[shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
: property[expr->properties]
| K_COMPACT K_STORAGE { $expr->set_compact_storage(); }
| K_CLUSTERING K_ORDER K_BY '(' cfamOrdering[expr] (',' cfamOrdering[expr])* ')'
;
cfamOrdering[CreateTableStatement.RawStatement expr]
@init{ boolean reversed=false; }
: k=ident (K_ASC | K_DESC { reversed=true;} ) { $expr.setOrdering(k, reversed); }
cfamOrdering[shared_ptr<cql3::statements::create_table_statement::raw_statement> expr]
@init{ bool reversed=false; }
: k=ident (K_ASC | K_DESC { reversed=true;} ) { $expr->set_ordering(k, reversed); }
;
#if 0
/**
* CREATE TYPE foo (
* <name1> <type1>,

View File

@@ -58,7 +58,7 @@ void column_condition::collect_marker_specificaton(::shared_ptr<variable_specifi
}
::shared_ptr<column_condition>
column_condition::raw::prepare(const sstring& keyspace, column_definition& receiver) {
column_condition::raw::prepare(const sstring& keyspace, const column_definition& receiver) {
if (receiver.type->is_counter()) {
throw exceptions::invalid_request_exception("Conditions on counters are not supported");
}

View File

@@ -56,7 +56,7 @@ namespace cql3 {
*/
class column_condition final {
public:
column_definition& column;
const column_definition& column;
private:
// For collection, when testing the equality of a specific element, nullptr otherwise.
::shared_ptr<term> _collection_element;
@@ -64,7 +64,7 @@ private:
std::vector<::shared_ptr<term>> _in_values;
const operator_type& _op;
public:
column_condition(column_definition& column, ::shared_ptr<term> collection_element,
column_condition(const column_definition& column, ::shared_ptr<term> collection_element,
::shared_ptr<term> value, std::vector<::shared_ptr<term>> in_values, const operator_type& op)
: column(column)
, _collection_element(std::move(collection_element))
@@ -77,33 +77,33 @@ public:
}
}
static ::shared_ptr<column_condition> condition(column_definition& def, ::shared_ptr<term> value, const operator_type& op) {
static ::shared_ptr<column_condition> condition(const column_definition& def, ::shared_ptr<term> value, const operator_type& op) {
return ::make_shared<column_condition>(def, ::shared_ptr<term>{}, std::move(value), std::vector<::shared_ptr<term>>{}, op);
}
static ::shared_ptr<column_condition> condition(column_definition& def, ::shared_ptr<term> collection_element,
static ::shared_ptr<column_condition> condition(const column_definition& def, ::shared_ptr<term> collection_element,
::shared_ptr<term> value, const operator_type& op) {
return ::make_shared<column_condition>(def, std::move(collection_element), std::move(value),
std::vector<::shared_ptr<term>>{}, op);
}
static ::shared_ptr<column_condition> in_condition(column_definition& def, std::vector<::shared_ptr<term>> in_values) {
static ::shared_ptr<column_condition> in_condition(const column_definition& def, std::vector<::shared_ptr<term>> in_values) {
return ::make_shared<column_condition>(def, ::shared_ptr<term>{}, ::shared_ptr<term>{},
std::move(in_values), operator_type::IN);
}
static ::shared_ptr<column_condition> in_condition(column_definition& def, ::shared_ptr<term> collection_element,
static ::shared_ptr<column_condition> in_condition(const column_definition& def, ::shared_ptr<term> collection_element,
std::vector<::shared_ptr<term>> in_values) {
return ::make_shared<column_condition>(def, std::move(collection_element), ::shared_ptr<term>{},
std::move(in_values), operator_type::IN);
}
static ::shared_ptr<column_condition> in_condition(column_definition& def, ::shared_ptr<term> in_marker) {
static ::shared_ptr<column_condition> in_condition(const column_definition& def, ::shared_ptr<term> in_marker) {
return ::make_shared<column_condition>(def, ::shared_ptr<term>{}, std::move(in_marker),
std::vector<::shared_ptr<term>>{}, operator_type::IN);
}
static ::shared_ptr<column_condition> in_condition(column_definition& def, ::shared_ptr<term> collection_element,
static ::shared_ptr<column_condition> in_condition(const column_definition& def, ::shared_ptr<term> collection_element,
::shared_ptr<term> in_marker) {
return ::make_shared<column_condition>(def, std::move(collection_element), std::move(in_marker),
std::vector<::shared_ptr<term>>{}, operator_type::IN);
@@ -758,7 +758,7 @@ public:
std::move(collection_element), operator_type::IN);
}
::shared_ptr<column_condition> prepare(const sstring& keyspace, column_definition& receiver);
::shared_ptr<column_condition> prepare(const sstring& keyspace, const column_definition& receiver);
};
};

View File

@@ -178,7 +178,7 @@ public:
};
static inline
column_definition* get_column_definition(schema_ptr schema, column_identifier& id) {
const column_definition* get_column_definition(schema_ptr schema, const column_identifier& id) {
return schema->get_column_definition(id.bytes_);
}

298
cql3/lists.cc Normal file
View File

@@ -0,0 +1,298 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "lists.hh"
#include "update_parameters.hh"
#include "column_identifier.hh"
#include "cql3_type.hh"
namespace cql3 {
shared_ptr<column_specification>
lists::index_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("idx(%s)", *column->name), true), int32_type);
}
shared_ptr<column_specification>
lists::value_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("value(%s)", *column->name), true),
dynamic_pointer_cast<list_type_impl>(column->type)->get_elements_type());
}
shared_ptr<term>
lists::literal::prepare(const sstring& keyspace, shared_ptr<column_specification> receiver) {
validate_assignable_to(keyspace, receiver);
auto&& value_spec = value_spec_of(receiver);
std::vector<shared_ptr<term>> values;
values.reserve(_elements.size());
bool all_terminal = true;
for (auto rt : _elements) {
auto&& t = rt->prepare(keyspace, value_spec);
if (t->contains_bind_marker()) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s: bind variables are not supported inside collection literals", *receiver->name));
}
if (dynamic_pointer_cast<non_terminal>(t)) {
all_terminal = false;
}
values.push_back(std::move(t));
}
delayed_value value(values);
if (all_terminal) {
return value.bind(query_options::DEFAULT);
} else {
return make_shared(std::move(value));
}
}
void
lists::literal::validate_assignable_to(const sstring keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<list_type_impl>(receiver->type)) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s of type %s",
*receiver->name, *receiver->type->as_cql3_type()));
}
auto&& value_spec = value_spec_of(receiver);
for (auto rt : _elements) {
if (!is_assignable(rt->test_assignment(keyspace, value_spec))) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s: value %s is not of type %s",
*receiver->name, *rt, *value_spec->type->as_cql3_type()));
}
}
}
assignment_testable::test_result
lists::literal::test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<list_type_impl>(receiver->type)) {
return assignment_testable::test_result::NOT_ASSIGNABLE;
}
// If there is no elements, we can't say it's an exact match (an empty list if fundamentally polymorphic).
if (_elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
auto&& value_spec = value_spec_of(receiver);
std::vector<shared_ptr<assignment_testable>> to_test;
to_test.reserve(_elements.size());
std::copy(_elements.begin(), _elements.end(), std::back_inserter(to_test));
return assignment_testable::test_all(keyspace, value_spec, to_test);
}
sstring
lists::literal::to_string() const {
return ::to_string(_elements);
}
lists::value
lists::value::from_serialized(bytes_view v, shared_ptr<list_type_impl> type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserializeForNativeProtocol()?!
auto&& l = boost::any_cast<list_type_impl::native_type>(type->deserialize(v, sf));
std::vector<bytes> elements;
elements.reserve(l.size());
for (auto&& element : l) {
// elements can be null in lists that represent a set of IN values
// FIXME: assumes that empty bytes is equivalent to null element
elements.push_back(element.empty() ? bytes() : type->get_elements_type()->decompose(element));
}
return value(std::move(elements));
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception(e.what());
}
}
bytes_opt
lists::value::get(const query_options& options) {
return get_with_protocol_version(options.get_serialization_format());
}
bytes
lists::value::get_with_protocol_version(serialization_format sf) {
return collection_type_impl::pack(_elements.begin(), _elements.end(), _elements.size(), sf);
}
bool
lists::value::equals(shared_ptr<list_type_impl> lt, const value& v) {
if (_elements.size() != v._elements.size()) {
return false;
}
return std::equal(_elements.begin(), _elements.end(),
v._elements.begin(),
[t = lt->get_elements_type()] (bytes_view e1, bytes_view e2) { return t->equal(e1, e2); });
}
std::vector<bytes>
lists::value::get_elements() {
return _elements;
}
sstring
lists::value::to_string() const {
std::ostringstream os;
os << "[";
bool is_first = true;
for (auto&& e : _elements) {
if (!is_first) {
os << ", ";
}
is_first = false;
os << to_hex(e);
}
os << "]";
return os.str();
}
bool
lists::delayed_value::contains_bind_marker() const {
// False since we don't support them in collection
return false;
}
void
lists::delayed_value::collect_marker_specification(shared_ptr<variable_specifications> bound_names) {
}
shared_ptr<terminal>
lists::delayed_value::bind(const query_options& options) {
std::vector<bytes> buffers;
buffers.reserve(_elements.size());
for (auto&& t : _elements) {
bytes_opt bo = t->bind_and_get(options);
if (!bo) {
throw exceptions::invalid_request_exception("null is not supported inside collections");
}
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (bo->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(sprint("List value is too long. List values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(),
bo->size()));
}
buffers.push_back(std::move(*bo));
}
return ::make_shared<value>(buffers);
}
::shared_ptr<terminal>
lists::marker::bind(const query_options& options) {
throw std::runtime_error("");
}
#if 0
public Value bind(QueryOptions options) throws InvalidRequestException
{
ByteBuffer value = options.getValues().get(bindIndex);
return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type, options.getProtocolVersion());
}
#endif
void
lists::setter::execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
tombstone ts;
if (column.type->is_multi_cell()) {
// delete + append
ts = params.make_tombstone_just_before();
}
do_append(_t, m, prefix, column, params, ts);
}
bool
lists::setter_by_index::requires_read() {
return true;
}
void
lists::setter_by_index::collect_marker_specification(shared_ptr<variable_specifications> bound_names) {
operation::collect_marker_specification(bound_names);
_idx->collect_marker_specification(std::move(bound_names));
}
void
lists::setter_by_index::execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
// we should not get here for frozen lists
assert(column.type->is_multi_cell()); // "Attempted to set an individual element on a frozen list";
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
bytes_opt index = _idx->bind_and_get(params._options);
bytes_opt value = _t->bind_and_get(params._options);
if (!index) {
throw exceptions::invalid_request_exception("Invalid null value for list index");
}
collection_mutation::view existing_list_ser = params.get_prefetched_list(m.key, row_key, column);
auto ltype = dynamic_pointer_cast<list_type_impl>(column.type);
collection_type_impl::mutation_view existing_list = ltype->deserialize_mutation_form(existing_list_ser.data);
// we verified that index is an int32_type
auto idx = net::ntoh(int32_t(*unaligned_cast<int32_t>(index->begin())));
if (idx < 0 || size_t(idx) >= existing_list.cells.size()) {
throw exceptions::invalid_request_exception(sprint("List index %d out of bound, list has size %d",
idx, existing_list.cells.size()));
}
bytes_view eidx = existing_list.cells[idx].first;
list_type_impl::mutation mut;
mut.cells.reserve(1);
if (!value) {
mut.cells.emplace_back(to_bytes(eidx), params.make_dead_cell());
} else {
if (value->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(
sprint("List value is too long. List values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(), value->size()));
}
mut.cells.emplace_back(to_bytes(eidx), params.make_cell(*value));
}
auto smut = ltype->serialize_mutation_form(mut);
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(smut)));
}
void
lists::do_append(shared_ptr<term> t,
mutation& m,
const exploded_clustering_prefix& prefix,
const column_definition& column,
const update_parameters& params,
tombstone ts) {
auto&& value = t->bind(params._options);
auto&& list_value = dynamic_pointer_cast<lists::value>(value);
auto&& ltype = dynamic_pointer_cast<list_type_impl>(column.type);
if (column.type->is_multi_cell()) {
// If we append null, do nothing. Note that for Setter, we've
// already removed the previous value so we're good here too
if (!value) {
return;
}
auto&& to_add = list_value->_elements;
collection_type_impl::mutation appended;
appended.tomb = ts;
appended.cells.reserve(to_add.size());
for (auto&& e : to_add) {
auto uuid1 = utils::UUID_gen::get_time_UUID_bytes();
auto uuid = bytes(reinterpret_cast<const char*>(uuid1.data()), uuid1.size());
appended.cells.emplace_back(std::move(uuid), params.make_cell(e));
}
m.set_cell(prefix, column, ltype->serialize_mutation_form(appended));
} else {
// for frozen lists, we're overwriting the whole cell value
if (!value) {
m.set_cell(prefix, column, params.make_dead_cell());
} else {
auto&& to_add = list_value->_elements;
auto&& newv = collection_mutation::one{list_type_impl::pack(to_add.begin(), to_add.end(), to_add.size(),
serialization_format::internal())};
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(newv)));
}
}
}
}

View File

@@ -28,6 +28,7 @@
#include "cql3/abstract_marker.hh"
#include "to_string.hh"
#include "utils/UUID_gen.hh"
#include "operation.hh"
namespace cql3 {
@@ -63,16 +64,8 @@ import org.slf4j.LoggerFactory;
class lists {
lists() = delete;
public:
static shared_ptr<column_specification> index_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("idx(%s)", *column->name), true), int32_type);
}
static shared_ptr<column_specification> value_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("value(%s)", *column->name), true),
dynamic_pointer_cast<list_type_impl>(column->type)->get_elements_type());
}
static shared_ptr<column_specification> index_spec_of(shared_ptr<column_specification> column);
static shared_ptr<column_specification> value_spec_of(shared_ptr<column_specification> column);
class literal : public term::raw {
const std::vector<shared_ptr<term::raw>> _elements;
@@ -80,67 +73,12 @@ public:
explicit literal(std::vector<shared_ptr<term::raw>> elements)
: _elements(std::move(elements)) {
}
shared_ptr<term> prepare(const sstring& keyspace, shared_ptr<column_specification> receiver) {
validate_assignable_to(keyspace, receiver);
auto&& value_spec = value_spec_of(receiver);
std::vector<shared_ptr<term>> values;
values.reserve(_elements.size());
bool all_terminal = true;
for (auto rt : _elements) {
auto&& t = rt->prepare(keyspace, value_spec);
if (t->contains_bind_marker()) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s: bind variables are not supported inside collection literals", *receiver->name));
}
if (dynamic_pointer_cast<non_terminal>(t)) {
all_terminal = false;
}
values.push_back(std::move(t));
}
delayed_value value(values);
if (all_terminal) {
return value.bind(query_options::DEFAULT);
} else {
return make_shared(std::move(value));
}
}
shared_ptr<term> prepare(const sstring& keyspace, shared_ptr<column_specification> receiver);
private:
void validate_assignable_to(const sstring keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<list_type_impl>(receiver->type)) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s of type %s",
*receiver->name, *receiver->type->as_cql3_type()));
}
auto&& value_spec = value_spec_of(receiver);
for (auto rt : _elements) {
if (!is_assignable(rt->test_assignment(keyspace, value_spec))) {
throw exceptions::invalid_request_exception(sprint("Invalid list literal for %s: value %s is not of type %s",
*receiver->name, *rt, *value_spec->type->as_cql3_type()));
}
}
}
void validate_assignable_to(const sstring keyspace, shared_ptr<column_specification> receiver);
public:
virtual assignment_testable::test_result test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver) override {
if (!dynamic_pointer_cast<list_type_impl>(receiver->type)) {
return assignment_testable::test_result::NOT_ASSIGNABLE;
}
// If there is no elements, we can't say it's an exact match (an empty list if fundamentally polymorphic).
if (_elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
auto&& value_spec = value_spec_of(receiver);
std::vector<shared_ptr<assignment_testable>> to_test;
to_test.reserve(_elements.size());
std::copy(_elements.begin(), _elements.end(), std::back_inserter(to_test));
return assignment_testable::test_all(keyspace, value_spec, to_test);
}
virtual sstring to_string() const override {
return ::to_string(_elements);
}
virtual assignment_testable::test_result test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver) override;
virtual sstring to_string() const override;
};
class value : public multi_item_terminal, collection_terminal {
@@ -149,61 +87,12 @@ public:
explicit value(std::vector<bytes> elements)
: _elements(std::move(elements)) {
}
static value from_serialized(bytes_view v, shared_ptr<list_type_impl> type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserializeForNativeProtocol()?!
auto&& l = boost::any_cast<list_type_impl::native_type>(type->deserialize(v, sf));
std::vector<bytes> elements;
elements.reserve(l.size());
for (auto&& element : l) {
// elements can be null in lists that represent a set of IN values
// FIXME: assumes that empty bytes is equivalent to null element
elements.push_back(element.empty() ? bytes() : type->get_elements_type()->decompose(element));
}
return value(std::move(elements));
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception(e.what());
}
}
virtual bytes_opt get(const query_options& options) override {
return get_with_protocol_version(options.get_serialization_format());
}
virtual bytes get_with_protocol_version(serialization_format sf) override {
return collection_type_impl::pack(_elements.begin(), _elements.end(), _elements.size(), sf);
}
bool equals(shared_ptr<list_type_impl> lt, const value& v) {
if (_elements.size() != v._elements.size()) {
return false;
}
return std::equal(_elements.begin(), _elements.end(),
v._elements.begin(),
[t = lt->get_elements_type()] (bytes_view e1, bytes_view e2) { return t->equal(e1, e2); });
}
virtual std::vector<bytes> get_elements() override {
return _elements;
}
virtual sstring to_string() const {
std::ostringstream os;
os << "[";
bool is_first = true;
for (auto&& e : _elements) {
if (!is_first) {
os << ", ";
}
is_first = false;
os << to_hex(e);
}
os << "]";
return os.str();
}
static value from_serialized(bytes_view v, shared_ptr<list_type_impl> type, serialization_format sf);
virtual bytes_opt get(const query_options& options) override;
virtual bytes get_with_protocol_version(serialization_format sf) override;
bool equals(shared_ptr<list_type_impl> lt, const value& v);
virtual std::vector<bytes> get_elements() override;
virtual sstring to_string() const;
friend class lists;
};
/**
@@ -221,36 +110,9 @@ public:
explicit delayed_value(std::vector<shared_ptr<term>> elements)
: _elements(std::move(elements)) {
}
virtual bool contains_bind_marker() const override {
// False since we don't support them in collection
return false;
}
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names) {
}
virtual shared_ptr<terminal> bind(const query_options& options) override {
std::vector<bytes> buffers;
buffers.reserve(_elements.size());
for (auto&& t : _elements) {
bytes_opt bo = t->bind_and_get(options);
if (!bo) {
throw exceptions::invalid_request_exception("null is not supported inside collections");
}
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (bo->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(sprint("List value is too long. List values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(),
bo->size()));
}
buffers.push_back(std::move(*bo));
}
return ::make_shared<value>(buffers);
}
virtual bool contains_bind_marker() const override;
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names);
virtual shared_ptr<terminal> bind(const query_options& options) override;
};
/**
@@ -268,9 +130,7 @@ public:
assert receiver.type instanceof ListType;
}
#endif
virtual ::shared_ptr<terminal> bind(const query_options& options) override {
throw std::runtime_error("");
}
virtual ::shared_ptr<terminal> bind(const query_options& options) override;
#if 0
public Value bind(QueryOptions options) throws InvalidRequestException
{
@@ -323,75 +183,21 @@ public:
class setter : public operation {
public:
setter(column_definition& column, shared_ptr<term> t)
setter(const column_definition& column, shared_ptr<term> t)
: operation(column, std::move(t)) {
}
virtual void execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) override {
tombstone ts;
if (column.type->is_multi_cell()) {
// delete + append
ts = params.make_tombstone_just_before();
}
do_append(_t, m, prefix, column, params, ts);
}
virtual void execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) override;
};
class setter_by_index : public operation {
shared_ptr<term> _idx;
public:
setter_by_index(column_definition& column, shared_ptr<term> idx, shared_ptr<term> t)
setter_by_index(const column_definition& column, shared_ptr<term> idx, shared_ptr<term> t)
: operation(column, std::move(t)), _idx(std::move(idx)) {
}
virtual bool requires_read() override {
return true;
}
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names) override {
operation::collect_marker_specification(bound_names);
_idx->collect_marker_specification(std::move(bound_names));
}
virtual void execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) override {
// we should not get here for frozen lists
assert(column.type->is_multi_cell()); // "Attempted to set an individual element on a frozen list";
auto row_key = clustering_key::from_clustering_prefix(*params._schema, prefix);
bytes_opt index = _idx->bind_and_get(params._options);
bytes_opt value = _t->bind_and_get(params._options);
if (!index) {
throw exceptions::invalid_request_exception("Invalid null value for list index");
}
collection_mutation::view existing_list_ser = params.get_prefetched_list(m.key, row_key, column);
auto ltype = dynamic_pointer_cast<list_type_impl>(column.type);
collection_type_impl::mutation_view existing_list = ltype->deserialize_mutation_form(existing_list_ser.data);
// we verified that index is an int32_type
auto idx = net::ntoh(int32_t(*unaligned_cast<int32_t>(index->begin())));
if (idx < 0 || size_t(idx) >= existing_list.cells.size()) {
throw exceptions::invalid_request_exception(sprint("List index %d out of bound, list has size %d",
idx, existing_list.cells.size()));
}
bytes_view eidx = existing_list.cells[idx].first;
list_type_impl::mutation mut;
mut.cells.reserve(1);
if (!value) {
mut.cells.emplace_back(to_bytes(eidx), params.make_dead_cell());
} else {
if (value->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(
sprint("List value is too long. List values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(), value->size()));
}
mut.cells.emplace_back(to_bytes(eidx), params.make_cell(*value));
}
auto smut = ltype->serialize_mutation_form(mut);
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(smut)));
}
virtual bool requires_read() override;
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names);
virtual void execute(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) override;
};
#if 0
@@ -415,39 +221,7 @@ public:
const exploded_clustering_prefix& prefix,
const column_definition& column,
const update_parameters& params,
tombstone ts = {}) {
auto&& value = t->bind(params._options);
auto&& list_value = dynamic_pointer_cast<lists::value>(value);
auto&& ltype = dynamic_pointer_cast<list_type_impl>(column.type);
if (column.type->is_multi_cell()) {
// If we append null, do nothing. Note that for Setter, we've
// already removed the previous value so we're good here too
if (!value) {
return;
}
auto&& to_add = list_value->_elements;
collection_type_impl::mutation appended;
appended.tomb = ts;
appended.cells.reserve(to_add.size());
for (auto&& e : to_add) {
auto uuid1 = utils::UUID_gen::get_time_UUID_bytes();
auto uuid = bytes(reinterpret_cast<const char*>(uuid1.data()), uuid1.size());
appended.cells.emplace_back(std::move(uuid), params.make_cell(e));
}
m.set_cell(prefix, column, ltype->serialize_mutation_form(appended));
} else {
// for frozen lists, we're overwriting the whole cell value
if (!value) {
m.set_cell(prefix, column, params.make_dead_cell());
} else {
auto&& to_add = list_value->_elements;
auto&& newv = collection_mutation::one{list_type_impl::pack(to_add.begin(), to_add.end(), to_add.size(),
serialization_format::internal())};
m.set_cell(prefix, column, atomic_cell_or_collection::from_collection_mutation(std::move(newv)));
}
}
}
tombstone ts = {});
#if 0
public static class Prepender extends Operation

View File

@@ -288,7 +288,7 @@ public:
class setter : public operation {
public:
setter(column_definition& column, shared_ptr<term> t)
setter(const column_definition& column, shared_ptr<term> t)
: operation(column, std::move(t)) {
}
@@ -308,7 +308,7 @@ public:
class setter_by_key : public operation {
const shared_ptr<term> _k;
public:
setter_by_key(column_definition& column, shared_ptr<term> k, shared_ptr<term> t)
setter_by_key(const column_definition& column, shared_ptr<term> k, shared_ptr<term> t)
: operation(column, std::move(t)), _k(std::move(k)) {
}
@@ -342,7 +342,7 @@ public:
class putter : public operation {
public:
putter(column_definition& column, shared_ptr<term> t)
putter(const column_definition& column, shared_ptr<term> t)
: operation(column, std::move(t)) {
}

View File

@@ -30,7 +30,7 @@ namespace cql3 {
shared_ptr<operation>
operation::set_element::prepare(const sstring& keyspace, column_definition& receiver) {
operation::set_element::prepare(const sstring& keyspace, const column_definition& receiver) {
using exceptions::invalid_request_exception;
auto rtype = dynamic_pointer_cast<collection_type_impl>(receiver.type);
if (!rtype) {
@@ -61,7 +61,7 @@ operation::set_element::is_compatible_with(shared_ptr<raw_update> other) {
}
shared_ptr<operation>
operation::addition::prepare(const sstring& keyspace, column_definition& receiver) {
operation::addition::prepare(const sstring& keyspace, const column_definition& receiver) {
auto v = _value->prepare(keyspace, receiver.column_specification);
auto ctype = dynamic_pointer_cast<collection_type_impl>(receiver.type);
@@ -97,7 +97,7 @@ operation::addition::is_compatible_with(shared_ptr<raw_update> other) {
}
shared_ptr<operation>
operation::subtraction::prepare(const sstring& keyspace, column_definition& receiver) {
operation::subtraction::prepare(const sstring& keyspace, const column_definition& receiver) {
warn(unimplemented::cause::COLLECTIONS);
throw exceptions::invalid_request_exception("unimplemented, go away");
// FIXME:
@@ -135,7 +135,7 @@ operation::subtraction::is_compatible_with(shared_ptr<raw_update> other) {
}
shared_ptr<operation>
operation::prepend::prepare(const sstring& keyspace, column_definition& receiver) {
operation::prepend::prepare(const sstring& keyspace, const column_definition& receiver) {
warn(unimplemented::cause::COLLECTIONS);
throw exceptions::invalid_request_exception("unimplemented, go away");
// FIXME:
@@ -158,7 +158,7 @@ operation::prepend::is_compatible_with(shared_ptr<raw_update> other) {
::shared_ptr <operation>
operation::set_value::prepare(const sstring& keyspace, column_definition& receiver) {
operation::set_value::prepare(const sstring& keyspace, const column_definition& receiver) {
auto v = _value->prepare(keyspace, receiver.column_specification);
if (receiver.type->is_counter()) {

View File

@@ -75,7 +75,7 @@ protected:
const ::shared_ptr<term> _t;
public:
operation(column_definition& column_, ::shared_ptr<term> t)
operation(const column_definition& column_, ::shared_ptr<term> t)
: column{column_}
, _t{t}
{ }
@@ -136,7 +136,7 @@ public:
* be a true column.
* @return the prepared update operation.
*/
virtual ::shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver) = 0;
virtual ::shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver) = 0;
/**
* @return whether this operation can be applied alongside the {@code
@@ -172,7 +172,7 @@ public:
* @param receiver the "column" this operation applies to.
* @return the prepared delete operation.
*/
virtual ::shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver) = 0;
virtual ::shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver) = 0;
};
class set_value;
@@ -185,7 +185,7 @@ public:
: _selector(std::move(selector)), _value(std::move(value)) {
}
virtual shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver);
virtual shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver);
#if 0
protected String toString(ColumnSpecification column)
{
@@ -203,7 +203,7 @@ public:
: _value(value) {
}
virtual shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver) override;
virtual shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver) override;
#if 0
protected String toString(ColumnSpecification column)
@@ -222,7 +222,7 @@ public:
: _value(value) {
}
virtual shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver) override;
virtual shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver) override;
#if 0
protected String toString(ColumnSpecification column)
@@ -241,7 +241,7 @@ public:
: _value(std::move(value)) {
}
virtual shared_ptr<operation> prepare(const sstring& keyspace, column_definition& receiver) override;
virtual shared_ptr<operation> prepare(const sstring& keyspace, const column_definition& receiver) override;
#if 0
protected String toString(ColumnSpecification column)

View File

@@ -38,7 +38,7 @@ private:
public:
set_value(::shared_ptr<term::raw> value) : _value(std::move(value)) {}
virtual ::shared_ptr <operation> prepare(const sstring& keyspace, column_definition& receiver) override;
virtual ::shared_ptr <operation> prepare(const sstring& keyspace, const column_definition& receiver) override;
#if 0
protected String toString(ColumnSpecification column)

View File

@@ -27,7 +27,7 @@
namespace cql3 {
column_definition&
const column_definition&
relation::to_column_definition(schema_ptr schema, ::shared_ptr<column_identifier::raw> entity) {
auto id = entity->prepare_column_identifier(schema);
auto def = get_column_definition(schema, *id);

View File

@@ -252,7 +252,7 @@ protected:
* @return the column definition corresponding to the specified entity
* @throws InvalidRequestException if the entity cannot be recognized
*/
virtual column_definition& to_column_definition(schema_ptr schema, ::shared_ptr<column_identifier::raw> entity) final;
virtual const column_definition& to_column_definition(schema_ptr schema, ::shared_ptr<column_identifier::raw> entity) final;
};
using relation_ptr = ::shared_ptr<relation>;

View File

@@ -180,7 +180,7 @@ class single_column_restriction::EQ final : public single_column_restriction {
private:
::shared_ptr<term> _value;
public:
EQ(column_definition& column_def, ::shared_ptr<term> value)
EQ(const column_definition& column_def, ::shared_ptr<term> value)
: single_column_restriction(column_def)
, _value(std::move(value))
{ }
@@ -310,7 +310,7 @@ private:
std::vector<::shared_ptr<term>> _entry_keys;
std::vector<::shared_ptr<term>> _entry_values;
public:
contains(column_definition& column_def, ::shared_ptr<term> t, bool is_key)
contains(const column_definition& column_def, ::shared_ptr<term> t, bool is_key)
: single_column_restriction(column_def) {
if (is_key) {
_keys.emplace_back(std::move(t));
@@ -319,7 +319,7 @@ public:
}
}
contains(column_definition& column_def, ::shared_ptr<term> map_key, ::shared_ptr<term> map_value)
contains(const column_definition& column_def, ::shared_ptr<term> map_key, ::shared_ptr<term> map_value)
: single_column_restriction(column_def) {
_entry_keys.emplace_back(std::move(map_key));
_entry_values.emplace_back(std::move(map_value));

256
cql3/sets.cc Normal file
View File

@@ -0,0 +1,256 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "sets.hh"
namespace cql3 {
shared_ptr<column_specification>
sets::value_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("value(%s)", *column->name), true),
dynamic_pointer_cast<set_type_impl>(column->type)->get_elements_type());
}
shared_ptr<term>
sets::literal::prepare(const sstring& keyspace, shared_ptr<column_specification> receiver) {
validate_assignable_to(keyspace, receiver);
// We've parsed empty maps as a set literal to break the ambiguity so
// handle that case now
if (_elements.empty() && dynamic_pointer_cast<map_type_impl>(receiver->type)) {
// use empty_type for comparator, set is empty anyway.
std::map<bytes, bytes, serialized_compare> m(empty_type->as_less_comparator());
return ::make_shared<maps::value>(std::move(m));
}
auto value_spec = value_spec_of(receiver);
std::vector<shared_ptr<term>> values;
values.reserve(_elements.size());
bool all_terminal = true;
for (shared_ptr<term::raw> rt : _elements)
{
auto t = rt->prepare(keyspace, value_spec);
if (t->contains_bind_marker()) {
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s: bind variables are not supported inside collection literals", *receiver->name));
}
if (dynamic_pointer_cast<non_terminal>(t)) {
all_terminal = false;
}
values.push_back(std::move(t));
}
auto compare = dynamic_pointer_cast<set_type_impl>(receiver->type)->get_elements_type()->as_less_comparator();
auto value = ::make_shared<delayed_value>(compare, std::move(values));
if (all_terminal) {
return value->bind(query_options::DEFAULT);
} else {
return value;
}
}
void
sets::literal::validate_assignable_to(const sstring& keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<set_type_impl>(receiver->type)) {
// We've parsed empty maps as a set literal to break the ambiguity so
// handle that case now
if (dynamic_pointer_cast<map_type_impl>(receiver->type) && _elements.empty()) {
return;
}
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s of type %s", *receiver->name, *receiver->type->as_cql3_type()));
}
auto&& value_spec = value_spec_of(receiver);
for (shared_ptr<term::raw> rt : _elements) {
if (!is_assignable(rt->test_assignment(keyspace, value_spec))) {
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s: value %s is not of type %s", *receiver->name, *rt, *value_spec->type->as_cql3_type()));
}
}
}
assignment_testable::test_result
sets::literal::test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<set_type_impl>(receiver->type)) {
// We've parsed empty maps as a set literal to break the ambiguity so handle that case now
if (dynamic_pointer_cast<map_type_impl>(receiver->type) && _elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
return assignment_testable::test_result::NOT_ASSIGNABLE;
}
// If there is no elements, we can't say it's an exact match (an empty set if fundamentally polymorphic).
if (_elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
auto&& value_spec = value_spec_of(receiver);
// FIXME: make assignment_testable::test_all() accept ranges
std::vector<shared_ptr<assignment_testable>> to_test(_elements.begin(), _elements.end());
return assignment_testable::test_all(keyspace, value_spec, to_test);
}
sstring
sets::literal::to_string() const {
return "{" + join(", ", _elements) + "}";
}
sets::value
sets::value::from_serialized(bytes_view v, set_type type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserializeForNativeProtocol?!
auto s = boost::any_cast<set_type_impl::native_type>(type->deserialize(v, sf));
std::set<bytes, serialized_compare> elements(type->as_less_comparator());
for (auto&& element : s) {
elements.insert(elements.end(), type->get_elements_type()->decompose(element));
}
return value(std::move(elements));
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception(e.why());
}
}
bytes_opt
sets::value::get(const query_options& options) {
return get_with_protocol_version(options.get_serialization_format());
}
bytes
sets::value::get_with_protocol_version(serialization_format sf) {
return collection_type_impl::pack(_elements.begin(), _elements.end(),
_elements.size(), sf);
}
bool
sets::value::equals(set_type st, const value& v) {
if (_elements.size() != v._elements.size()) {
return false;
}
auto&& elements_type = st->get_elements_type();
return std::equal(_elements.begin(), _elements.end(),
v._elements.begin(),
[elements_type] (bytes_view v1, bytes_view v2) {
return elements_type->equal(v1, v2);
});
}
sstring
sets::value::to_string() const {
sstring result = "{";
bool first = true;
for (auto&& e : _elements) {
if (!first) {
result += ", ";
}
first = true;
result += to_hex(e);
}
result += "}";
return result;
}
bool
sets::delayed_value::contains_bind_marker() const {
// False since we don't support them in collection
return false;
}
void
sets::delayed_value::collect_marker_specification(shared_ptr<variable_specifications> bound_names) {
}
shared_ptr<terminal>
sets::delayed_value::bind(const query_options& options) {
std::set<bytes, serialized_compare> buffers(_comparator);
for (auto&& t : _elements) {
bytes_opt b = t->bind_and_get(options);
if (!b) {
throw exceptions::invalid_request_exception("null is not supported inside collections");
}
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (b->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(sprint("Set value is too long. Set values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(),
b->size()));
}
buffers.insert(buffers.end(), std::move(*b));
}
return ::make_shared<value>(std::move(buffers));
}
::shared_ptr<terminal>
sets::marker::bind(const query_options& options) {
throw std::runtime_error("");
}
#if 0
public Value bind(QueryOptions options) throws InvalidRequestException
{
ByteBuffer value = options.getValues().get(bindIndex);
return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type, options.getProtocolVersion());
}
#endif
void
sets::setter::execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) {
if (column.type->is_multi_cell()) {
unimplemented::warn(unimplemented::cause::COLLECTION_RANGE_TOMBSTONES);
// FIXME: implement
// delete + add
#if 0
CellName name = cf.getComparator().create(prefix, column);
cf.addAtom(params.makeTombstoneForOverwrite(name.slice()));
#endif
}
adder::do_add(m, row_key, params, _t, column);
}
void
sets::adder::execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) {
assert(column.type->is_multi_cell()); // "Attempted to add items to a frozen set";
do_add(m, row_key, params, _t, column);
}
void
sets::adder::do_add(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params,
shared_ptr<term> t, const column_definition& column) {
auto&& value = t->bind(params._options);
auto set_value = dynamic_pointer_cast<sets::value>(std::move(value));
auto set_type = dynamic_pointer_cast<set_type_impl>(column.type);
if (column.type->is_multi_cell()) {
if (!set_value || set_value->_elements.empty()) {
return;
}
// FIXME: mutation_view? not compatible with params.make_cell().
collection_type_impl::mutation mut;
for (auto&& e : set_value->_elements) {
mut.cells.emplace_back(e, params.make_cell({}));
}
auto smut = set_type->serialize_mutation_form(mut);
m.set_cell(row_key, column, std::move(smut));
} else {
// for frozen sets, we're overwriting the whole cell
auto v = set_type->serialize_partially_deserialized_form(
{set_value->_elements.begin(), set_value->_elements.end()},
serialization_format::internal());
if (set_value->_elements.empty()) {
m.set_cell(row_key, column, params.make_dead_cell());
} else {
m.set_cell(row_key, column, params.make_cell(std::move(v)));
}
}
}
}

View File

@@ -63,11 +63,7 @@ import org.apache.cassandra.utils.FBUtilities;
class sets {
sets() = delete;
public:
static shared_ptr<column_specification> value_spec_of(shared_ptr<column_specification> column) {
return make_shared<column_specification>(column->ks_name, column->cf_name,
::make_shared<column_identifier>(sprint("value(%s)", *column->name), true),
dynamic_pointer_cast<set_type_impl>(column->type)->get_elements_type());
}
static shared_ptr<column_specification> value_spec_of(shared_ptr<column_specification> column);
class literal : public term::raw {
std::vector<shared_ptr<term::raw>> _elements;
@@ -75,90 +71,11 @@ public:
explicit literal(std::vector<shared_ptr<term::raw>> elements)
: _elements(std::move(elements)) {
}
shared_ptr<term> prepare(const sstring& keyspace, shared_ptr<column_specification> receiver) {
validate_assignable_to(keyspace, receiver);
// We've parsed empty maps as a set literal to break the ambiguity so
// handle that case now
if (_elements.empty() && dynamic_pointer_cast<map_type_impl>(receiver->type)) {
// use empty_type for comparator, set is empty anyway.
std::map<bytes, bytes, serialized_compare> m(empty_type->as_less_comparator());
return ::make_shared<maps::value>(std::move(m));
}
auto value_spec = value_spec_of(receiver);
std::vector<shared_ptr<term>> values;
values.reserve(_elements.size());
bool all_terminal = true;
for (shared_ptr<term::raw> rt : _elements)
{
auto t = rt->prepare(keyspace, value_spec);
if (t->contains_bind_marker()) {
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s: bind variables are not supported inside collection literals", *receiver->name));
}
if (dynamic_pointer_cast<non_terminal>(t)) {
all_terminal = false;
}
values.push_back(std::move(t));
}
auto compare = dynamic_pointer_cast<set_type_impl>(receiver->type)->get_elements_type()->as_less_comparator();
auto value = ::make_shared<delayed_value>(compare, std::move(values));
if (all_terminal) {
return value->bind(query_options::DEFAULT);
} else {
return value;
}
}
void validate_assignable_to(const sstring& keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<set_type_impl>(receiver->type)) {
// We've parsed empty maps as a set literal to break the ambiguity so
// handle that case now
if (dynamic_pointer_cast<map_type_impl>(receiver->type) && _elements.empty()) {
return;
}
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s of type %s", *receiver->name, *receiver->type->as_cql3_type()));
}
auto&& value_spec = value_spec_of(receiver);
for (shared_ptr<term::raw> rt : _elements) {
if (!is_assignable(rt->test_assignment(keyspace, value_spec))) {
throw exceptions::invalid_request_exception(sprint("Invalid set literal for %s: value %s is not of type %s", *receiver->name, *rt, *value_spec->type->as_cql3_type()));
}
}
}
shared_ptr<term> prepare(const sstring& keyspace, shared_ptr<column_specification> receiver);
void validate_assignable_to(const sstring& keyspace, shared_ptr<column_specification> receiver);
assignment_testable::test_result
test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver) {
if (!dynamic_pointer_cast<set_type_impl>(receiver->type)) {
// We've parsed empty maps as a set literal to break the ambiguity so handle that case now
if (dynamic_pointer_cast<map_type_impl>(receiver->type) && _elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
return assignment_testable::test_result::NOT_ASSIGNABLE;
}
// If there is no elements, we can't say it's an exact match (an empty set if fundamentally polymorphic).
if (_elements.empty()) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
}
auto&& value_spec = value_spec_of(receiver);
// FIXME: make assignment_testable::test_all() accept ranges
std::vector<shared_ptr<assignment_testable>> to_test(_elements.begin(), _elements.end());
return assignment_testable::test_all(keyspace, value_spec, to_test);
}
virtual sstring to_string() const override {
return "{" + join(", ", _elements) + "}";
}
test_assignment(const sstring& keyspace, shared_ptr<column_specification> receiver);
virtual sstring to_string() const override;
};
class value : public terminal, collection_terminal {
@@ -168,57 +85,11 @@ public:
value(std::set<bytes, serialized_compare> elements)
: _elements(std::move(elements)) {
}
static value from_serialized(bytes_view v, set_type type, serialization_format sf) {
try {
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
// FIXME: deserializeForNativeProtocol?!
auto s = boost::any_cast<set_type_impl::native_type>(type->deserialize(v, sf));
std::set<bytes, serialized_compare> elements(type->as_less_comparator());
for (auto&& element : s) {
elements.insert(elements.end(), type->get_elements_type()->decompose(element));
}
return value(std::move(elements));
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception(e.why());
}
}
virtual bytes_opt get(const query_options& options) override {
return get_with_protocol_version(options.get_serialization_format());
}
virtual bytes get_with_protocol_version(serialization_format sf) override {
return collection_type_impl::pack(_elements.begin(), _elements.end(),
_elements.size(), sf);
}
bool equals(set_type st, const value& v) {
if (_elements.size() != v._elements.size()) {
return false;
}
auto&& elements_type = st->get_elements_type();
return std::equal(_elements.begin(), _elements.end(),
v._elements.begin(),
[elements_type] (bytes_view v1, bytes_view v2) {
return elements_type->equal(v1, v2);
});
}
virtual sstring to_string() const override {
sstring result = "{";
bool first = true;
for (auto&& e : _elements) {
if (!first) {
result += ", ";
}
first = true;
result += to_hex(e);
}
result += "}";
return result;
}
static value from_serialized(bytes_view v, set_type type, serialization_format sf);
virtual bytes_opt get(const query_options& options) override;
virtual bytes get_with_protocol_version(serialization_format sf) override;
bool equals(set_type st, const value& v);
virtual sstring to_string() const override;
};
// See Lists.DelayedValue
@@ -229,35 +100,9 @@ public:
delayed_value(serialized_compare comparator, std::vector<shared_ptr<term>> elements)
: _comparator(std::move(comparator)), _elements(std::move(elements)) {
}
virtual bool contains_bind_marker() const override {
// False since we don't support them in collection
return false;
}
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names) override {
}
virtual shared_ptr<terminal> bind(const query_options& options) {
std::set<bytes, serialized_compare> buffers(_comparator);
for (auto&& t : _elements) {
bytes_opt b = t->bind_and_get(options);
if (!b) {
throw exceptions::invalid_request_exception("null is not supported inside collections");
}
// We don't support value > 64K because the serialization format encode the length as an unsigned short.
if (b->size() > std::numeric_limits<uint16_t>::max()) {
throw exceptions::invalid_request_exception(sprint("Set value is too long. Set values are limited to %d bytes but %d bytes value provided",
std::numeric_limits<uint16_t>::max(),
b->size()));
}
buffers.insert(buffers.end(), std::move(*b));
}
return ::make_shared<value>(std::move(buffers));
}
virtual bool contains_bind_marker() const override;
virtual void collect_marker_specification(shared_ptr<variable_specifications> bound_names) override;
virtual shared_ptr<terminal> bind(const query_options& options);
};
class marker : public abstract_marker {
@@ -273,9 +118,7 @@ public:
}
#endif
virtual ::shared_ptr<terminal> bind(const query_options& options) override {
throw std::runtime_error("");
}
virtual ::shared_ptr<terminal> bind(const query_options& options) override;
#if 0
public Value bind(QueryOptions options) throws InvalidRequestException
{
@@ -287,65 +130,20 @@ public:
class setter : public operation {
public:
setter(column_definition& column, shared_ptr<term> t)
setter(const column_definition& column, shared_ptr<term> t)
: operation(column, std::move(t)) {
}
virtual void execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) override {
if (column.type->is_multi_cell()) {
unimplemented::warn(unimplemented::cause::COLLECTION_RANGE_TOMBSTONES);
// FIXME: implement
// delete + add
#if 0
CellName name = cf.getComparator().create(prefix, column);
cf.addAtom(params.makeTombstoneForOverwrite(name.slice()));
#endif
}
adder::do_add(m, row_key, params, _t, column);
}
virtual void execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) override;
};
class adder : public operation {
public:
adder(column_definition& column, shared_ptr<term> t)
adder(const column_definition& column, shared_ptr<term> t)
: operation(column, std::move(t)) {
}
virtual void execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) override {
assert(column.type->is_multi_cell()); // "Attempted to add items to a frozen set";
do_add(m, row_key, params, _t, column);
}
virtual void execute(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params) override;
static void do_add(mutation& m, const exploded_clustering_prefix& row_key, const update_parameters& params,
shared_ptr<term> t, const column_definition& column) {
auto&& value = t->bind(params._options);
auto set_value = dynamic_pointer_cast<sets::value>(std::move(value));
auto set_type = dynamic_pointer_cast<set_type_impl>(column.type);
if (column.type->is_multi_cell()) {
if (!set_value || set_value->_elements.empty()) {
return;
}
// FIXME: mutation_view? not compatible with params.make_cell().
collection_type_impl::mutation mut;
for (auto&& e : set_value->_elements) {
mut.cells.emplace_back(e, params.make_cell({}));
}
auto smut = set_type->serialize_mutation_form(mut);
m.set_cell(row_key, column, std::move(smut));
} else {
// for frozen sets, we're overwriting the whole cell
auto v = set_type->serialize_partially_deserialized_form(
{set_value->_elements.begin(), set_value->_elements.end()},
serialization_format::internal());
if (set_value->_elements.empty()) {
m.set_cell(row_key, column, params.make_dead_cell());
} else {
m.set_cell(row_key, column, params.make_cell(std::move(v)));
}
}
}
shared_ptr<term> t, const column_definition& column);
};
#if 0

View File

@@ -46,7 +46,7 @@ single_column_relation::to_term(std::vector<::shared_ptr<column_specification>>
::shared_ptr<restrictions::restriction>
single_column_relation::new_EQ_restriction(schema_ptr schema, ::shared_ptr<variable_specifications> bound_names)
{
column_definition& column_def = to_column_definition(schema, _entity);
const column_definition& column_def = to_column_definition(schema, _entity);
if (!_map_key) {
auto term = to_term(to_receivers(schema, column_def), _value, schema->ks_name, bound_names);
return ::make_shared<single_column_restriction::EQ>(column_def, std::move(term));
@@ -61,7 +61,7 @@ single_column_relation::new_EQ_restriction(schema_ptr schema, ::shared_ptr<varia
}
std::vector<::shared_ptr<column_specification>>
single_column_relation::to_receivers(schema_ptr schema, column_definition& column_def)
single_column_relation::to_receivers(schema_ptr schema, const column_definition& column_def)
{
auto receiver = column_def.column_specification;

View File

@@ -175,7 +175,7 @@ private:
* @return the receivers for the specified relation.
* @throws exceptions::invalid_request_exception if the relation is invalid
*/
std::vector<::shared_ptr<column_specification>> to_receivers(schema_ptr schema, column_definition& column_def);
std::vector<::shared_ptr<column_specification>> to_receivers(schema_ptr schema, const column_definition& column_def);
#if 0
private ColumnSpecification makeCollectionReceiver(ColumnSpecification receiver, bool forKey)

View File

@@ -1,442 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.exceptions.*;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.utils.ByteBufferUtil;
/** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
public class CreateTableStatement extends SchemaAlteringStatement
{
public CellNameType comparator;
private AbstractType<?> defaultValidator;
private AbstractType<?> keyValidator;
private final List<ByteBuffer> keyAliases = new ArrayList<ByteBuffer>();
private final List<ByteBuffer> columnAliases = new ArrayList<ByteBuffer>();
private ByteBuffer valueAlias;
private boolean isDense;
private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
private final Set<ColumnIdentifier> staticColumns;
private final CFPropDefs properties;
private final boolean ifNotExists;
public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
{
super(name);
this.properties = properties;
this.ifNotExists = ifNotExists;
this.staticColumns = staticColumns;
try
{
if (!this.properties.hasProperty(CFPropDefs.KW_COMPRESSION) && CFMetaData.DEFAULT_COMPRESSOR != null)
this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
new HashMap<String, String>()
{{
put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
}});
}
catch (SyntaxException e)
{
throw new AssertionError(e);
}
}
public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
{
state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
}
public void validate(ClientState state)
{
// validated in announceMigration()
}
// Column definitions
private List<ColumnDefinition> getColumns(CFMetaData cfm)
{
List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
{
ColumnIdentifier id = col.getKey();
columnDefs.add(staticColumns.contains(id)
? ColumnDefinition.staticDef(cfm, col.getKey().bytes, col.getValue(), componentIndex)
: ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
}
return columnDefs;
}
public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
{
try
{
MigrationManager.announceNewColumnFamily(getCFMetaData(), isLocalOnly);
return true;
}
catch (AlreadyExistsException e)
{
if (ifNotExists)
return false;
throw e;
}
}
public Event.SchemaChange changeEvent()
{
return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
}
/**
* Returns a CFMetaData instance based on the parameters parsed from this
* <code>CREATE</code> statement, or defaults where applicable.
*
* @return a CFMetaData instance corresponding to the values parsed from this statement
* @throws InvalidRequestException on failure to validate parsed parameters
*/
public CFMetaData getCFMetaData() throws RequestValidationException
{
CFMetaData newCFMD;
newCFMD = new CFMetaData(keyspace(),
columnFamily(),
ColumnFamilyType.Standard,
comparator);
applyPropertiesTo(newCFMD);
return newCFMD;
}
public void applyPropertiesTo(CFMetaData cfmd) throws RequestValidationException
{
cfmd.defaultValidator(defaultValidator)
.keyValidator(keyValidator)
.addAllColumnDefinitions(getColumns(cfmd))
.isDense(isDense);
addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
properties.applyToCFMetadata(cfmd);
}
private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
{
if (comparator instanceof CompositeType)
{
CompositeType ct = (CompositeType)comparator;
for (int i = 0; i < aliases.size(); ++i)
if (aliases.get(i) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
}
else
{
assert aliases.size() <= 1;
if (!aliases.isEmpty() && aliases.get(0) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind));
}
}
public static class RawStatement extends CFStatement
{
private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
public final CFPropDefs properties = new CFPropDefs();
private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
private final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<ColumnIdentifier, Boolean>(); // Insertion ordering is important
private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
private boolean useCompactStorage;
private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
private final boolean ifNotExists;
public RawStatement(CFName name, boolean ifNotExists)
{
super(name);
this.ifNotExists = ifNotExists;
}
/**
* Transform this raw statement into a CreateTableStatement.
*/
public ParsedStatement.Prepared prepare() throws RequestValidationException
{
// Column family name
if (!columnFamily().matches("\\w+"))
throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character only: [0-9A-Za-z]+)", columnFamily()));
if (columnFamily().length() > Schema.NAME_LENGTH)
throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, columnFamily()));
for (Multiset.Entry<ColumnIdentifier> entry : definedNames.entrySet())
if (entry.getCount() > 1)
throw new InvalidRequestException(String.format("Multiple definition of identifier %s", entry.getElement()));
properties.validate();
CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
Map<ByteBuffer, CollectionType> definedMultiCellCollections = null;
for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
{
ColumnIdentifier id = entry.getKey();
CQL3Type pt = entry.getValue().prepare(keyspace());
if (pt.isCollection() && ((CollectionType) pt.getType()).isMultiCell())
{
if (definedMultiCellCollections == null)
definedMultiCellCollections = new HashMap<>();
definedMultiCellCollections.put(id.bytes, (CollectionType) pt.getType());
}
stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
}
if (keyAliases.isEmpty())
throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
else if (keyAliases.size() > 1)
throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
List<ColumnIdentifier> kAliases = keyAliases.get(0);
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
for (ColumnIdentifier alias : kAliases)
{
stmt.keyAliases.add(alias.bytes);
AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
if (t instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
if (staticColumns.contains(alias))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
keyTypes.add(t);
}
stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
// Dense means that no part of the comparator stores a CQL column name. This means
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
stmt.isDense = useCompactStorage && !columnAliases.isEmpty();
// Handle column aliases
if (columnAliases.isEmpty())
{
if (useCompactStorage)
{
// There should remain some column definition since it is a non-composite "static" CF
if (stmt.columns.isEmpty())
throw new InvalidRequestException("No definition found that is not part of the PRIMARY KEY");
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Non-frozen collection types are not supported with COMPACT STORAGE");
stmt.comparator = new SimpleSparseCellNameType(UTF8Type.instance);
}
else
{
stmt.comparator = definedMultiCellCollections == null
? new CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
: new CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(), ColumnToCollectionType.getInstance(definedMultiCellCollections));
}
}
else
{
// If we use compact storage and have only one alias, it is a
// standard "dynamic" CF, otherwise it's a composite
if (useCompactStorage && columnAliases.size() == 1)
{
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
ColumnIdentifier alias = columnAliases.get(0);
if (staticColumns.contains(alias))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
stmt.columnAliases.add(alias.bytes);
AbstractType<?> at = getTypeAndRemove(stmt.columns, alias);
if (at instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
stmt.comparator = new SimpleDenseCellNameType(at);
}
else
{
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(columnAliases.size() + 1);
for (ColumnIdentifier t : columnAliases)
{
stmt.columnAliases.add(t.bytes);
AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
if (type instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t));
if (staticColumns.contains(t))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
types.add(type);
}
if (useCompactStorage)
{
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
stmt.comparator = new CompoundDenseCellNameType(types);
}
else
{
stmt.comparator = definedMultiCellCollections == null
? new CompoundSparseCellNameType(types)
: new CompoundSparseCellNameType.WithCollection(types, ColumnToCollectionType.getInstance(definedMultiCellCollections));
}
}
}
if (!staticColumns.isEmpty())
{
// Only CQL3 tables can have static columns
if (useCompactStorage)
throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
// Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
if (columnAliases.isEmpty())
throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
}
if (useCompactStorage && !stmt.columnAliases.isEmpty())
{
if (stmt.columns.isEmpty())
{
// The only value we'll insert will be the empty one, so the default validator don't matter
stmt.defaultValidator = BytesType.instance;
// We need to distinguish between
// * I'm upgrading from thrift so the valueAlias is null
// * I've defined my table with only a PK (and the column value will be empty)
// So, we use an empty valueAlias (rather than null) for the second case
stmt.valueAlias = ByteBufferUtil.EMPTY_BYTE_BUFFER;
}
else
{
if (stmt.columns.size() > 1)
throw new InvalidRequestException(String.format("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: %s)", StringUtils.join(stmt.columns.keySet(), ", ")));
Map.Entry<ColumnIdentifier, AbstractType> lastEntry = stmt.columns.entrySet().iterator().next();
stmt.defaultValidator = lastEntry.getValue();
stmt.valueAlias = lastEntry.getKey().bytes;
stmt.columns.remove(lastEntry.getKey());
}
}
else
{
// For compact, we are in the "static" case, so we need at least one column defined. For non-compact however, having
// just the PK is fine since we have CQL3 row marker.
if (useCompactStorage && stmt.columns.isEmpty())
throw new InvalidRequestException("COMPACT STORAGE with non-composite PRIMARY KEY require one column not part of the PRIMARY KEY, none given");
// There is no way to insert/access a column that is not defined for non-compact storage, so
// the actual validator don't matter much (except that we want to recognize counter CF as limitation apply to them).
stmt.defaultValidator = !stmt.columns.isEmpty() && (stmt.columns.values().iterator().next() instanceof CounterColumnType)
? CounterColumnType.instance
: BytesType.instance;
}
// If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
if (!definedOrdering.isEmpty())
{
if (definedOrdering.size() > columnAliases.size())
throw new InvalidRequestException("Only clustering key columns can be defined in CLUSTERING ORDER directive");
int i = 0;
for (ColumnIdentifier id : definedOrdering.keySet())
{
ColumnIdentifier c = columnAliases.get(i);
if (!id.equals(c))
{
if (definedOrdering.containsKey(c))
throw new InvalidRequestException(String.format("The order of columns in the CLUSTERING ORDER directive must be the one of the clustering key (%s must appear before %s)", c, id));
else
throw new InvalidRequestException(String.format("Missing CLUSTERING ORDER for column %s", c));
}
++i;
}
}
return new ParsedStatement.Prepared(stmt);
}
private AbstractType<?> getTypeAndRemove(Map<ColumnIdentifier, AbstractType> columns, ColumnIdentifier t) throws InvalidRequestException
{
AbstractType type = columns.get(t);
if (type == null)
throw new InvalidRequestException(String.format("Unknown definition %s referenced in PRIMARY KEY", t));
if (type.isCollection() && type.isMultiCell())
throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
columns.remove(t);
Boolean isReversed = definedOrdering.get(t);
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
}
public void addDefinition(ColumnIdentifier def, CQL3Type.Raw type, boolean isStatic)
{
definedNames.add(def);
definitions.put(def, type);
if (isStatic)
staticColumns.add(def);
}
public void addKeyAliases(List<ColumnIdentifier> aliases)
{
keyAliases.add(aliases);
}
public void addColumnAlias(ColumnIdentifier alias)
{
columnAliases.add(alias);
}
public void setOrdering(ColumnIdentifier alias, boolean reversed)
{
definedOrdering.put(alias, reversed);
}
public void setCompactStorage()
{
useCompactStorage = true;
}
}
}

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include "cql3/statements/cf_prop_defs.hh"
namespace cql3 {
namespace statements {
const sstring cf_prop_defs::KW_COMMENT = "comment";
const sstring cf_prop_defs::KW_READREPAIRCHANCE = "read_repair_chance";
const sstring cf_prop_defs::KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
const sstring cf_prop_defs::KW_GCGRACESECONDS = "gc_grace_seconds";
const sstring cf_prop_defs::KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
const sstring cf_prop_defs::KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
const sstring cf_prop_defs::KW_CACHING = "caching";
const sstring cf_prop_defs::KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
const sstring cf_prop_defs::KW_MIN_INDEX_INTERVAL = "min_index_interval";
const sstring cf_prop_defs::KW_MAX_INDEX_INTERVAL = "max_index_interval";
const sstring cf_prop_defs::KW_SPECULATIVE_RETRY = "speculative_retry";
const sstring cf_prop_defs::KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
const sstring cf_prop_defs::KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
const sstring cf_prop_defs::KW_COMPACTION = "compaction";
const sstring cf_prop_defs::KW_COMPRESSION = "compression";
const sstring cf_prop_defs::COMPACTION_STRATEGY_CLASS_KEY = "class";
}
}

View File

@@ -15,6 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "cql3/statements/property_definitions.hh"
#include "schema.hh"
#if 0
package org.apache.cassandra.cql3.statements;
import java.util.Collections;
@@ -30,28 +44,34 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;
#endif
public class CFPropDefs extends PropertyDefinitions
{
public static final String KW_COMMENT = "comment";
public static final String KW_READREPAIRCHANCE = "read_repair_chance";
public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
public static final String KW_CACHING = "caching";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
public static final String KW_MIN_INDEX_INTERVAL = "min_index_interval";
public static final String KW_MAX_INDEX_INTERVAL = "max_index_interval";
public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
namespace cql3 {
public static final String KW_COMPACTION = "compaction";
public static final String KW_COMPRESSION = "compression";
namespace statements {
public static final String COMPACTION_STRATEGY_CLASS_KEY = "class";
class cf_prop_defs : public property_definitions {
public:
static const sstring KW_COMMENT;
static const sstring KW_READREPAIRCHANCE;
static const sstring KW_DCLOCALREADREPAIRCHANCE;
static const sstring KW_GCGRACESECONDS;
static const sstring KW_MINCOMPACTIONTHRESHOLD;
static const sstring KW_MAXCOMPACTIONTHRESHOLD;
static const sstring KW_CACHING;
static const sstring KW_DEFAULT_TIME_TO_LIVE;
static const sstring KW_MIN_INDEX_INTERVAL;
static const sstring KW_MAX_INDEX_INTERVAL;
static const sstring KW_SPECULATIVE_RETRY;
static const sstring KW_BF_FP_CHANCE;
static const sstring KW_MEMTABLE_FLUSH_PERIOD;
static const sstring KW_COMPACTION;
static const sstring KW_COMPRESSION;
static const sstring COMPACTION_STRATEGY_CLASS_KEY;
#if 0
public static final Set<String> keywords = new HashSet<>();
public static final Set<String> obsoleteKeywords = new HashSet<>();
@@ -77,9 +97,11 @@ public class CFPropDefs extends PropertyDefinitions
}
private Class<? extends AbstractCompactionStrategy> compactionStrategyClass = null;
public void validate() throws ConfigurationException, SyntaxException
{
#endif
public:
void validate() const {
// FIXME
#if 0
// Skip validation if the comapction strategy class is already set as it means we've alreayd
// prepared (and redoing it would set strategyClass back to null, which we don't want)
if (compactionStrategyClass != null)
@@ -128,8 +150,10 @@ public class CFPropDefs extends PropertyDefinitions
throw new ConfigurationException(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL);
SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, SpeculativeRetry.RetryType.NONE.name()));
#endif
}
#if 0
public Class<? extends AbstractCompactionStrategy> getCompactionStrategy()
{
return compactionStrategyClass;
@@ -165,12 +189,14 @@ public class CFPropDefs extends PropertyDefinitions
}
return options;
}
#endif
public void applyToCFMetadata(CFMetaData cfm) throws ConfigurationException, SyntaxException
{
if (hasProperty(KW_COMMENT))
cfm.comment(getString(KW_COMMENT, ""));
void apply_to_schema(schema_ptr s) {
if (has_property(KW_COMMENT)) {
s->set_comment(get_string(KW_COMMENT, ""));
}
#if 0
cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepairChance()));
cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
@@ -199,8 +225,10 @@ public class CFPropDefs extends PropertyDefinitions
CachingOptions cachingOptions = getCachingOptions();
if (cachingOptions != null)
cfm.caching(cachingOptions);
#endif
}
#if 0
@Override
public String toString()
{
@@ -215,4 +243,9 @@ public class CFPropDefs extends PropertyDefinitions
field, minimumValue, defaultValue));
}
#endif
};
}
}

View File

@@ -0,0 +1,488 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "cql3/statements/schema_altering_statement.hh"
#include "cql3/statements/cf_prop_defs.hh"
#include "cql3/statements/cf_statement.hh"
#include "cql3/cql3_type.hh"
#include "service/migration_manager.hh"
#include "schema.hh"
#include "core/shared_ptr.hh"
#include <unordered_map>
#include <utility>
#include <vector>
#include <set>
#if 0
package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.exceptions.*;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.utils.ByteBufferUtil;
#endif
namespace cql3 {
namespace statements {
/** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
class create_table_statement : public schema_altering_statement {
#if 0
public CellNameType comparator;
#endif
private:
#if 0
private AbstractType<?> defaultValidator;
private AbstractType<?> keyValidator;
private final List<ByteBuffer> keyAliases = new ArrayList<ByteBuffer>();
private final List<ByteBuffer> columnAliases = new ArrayList<ByteBuffer>();
private ByteBuffer valueAlias;
private boolean isDense;
private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
#endif
const std::set<::shared_ptr<column_identifier>> _static_columns;
const ::shared_ptr<cf_prop_defs> _properties;
const bool _if_not_exists;
public:
create_table_statement(::shared_ptr<cf_name> name, ::shared_ptr<cf_prop_defs> properties, bool if_not_exists, std::set<::shared_ptr<column_identifier>> static_columns)
: schema_altering_statement{name}
, _static_columns{static_columns}
, _properties{properties}
, _if_not_exists{if_not_exists}
{
#if 0
try
{
if (!this.properties.hasProperty(CFPropDefs.KW_COMPRESSION) && CFMetaData.DEFAULT_COMPRESSOR != null)
this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
new HashMap<String, String>()
{{
put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
}});
}
catch (SyntaxException e)
{
throw new AssertionError(e);
}
#endif
}
virtual void check_access(const service::client_state& state) override {
warn(unimplemented::cause::PERMISSIONS);
#if 0
state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
#endif
}
virtual void validate(const service::client_state& state) override {
// validated in announceMigration()
}
#if 0
// Column definitions
private List<ColumnDefinition> getColumns(CFMetaData cfm)
{
List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
{
ColumnIdentifier id = col.getKey();
columnDefs.add(staticColumns.contains(id)
? ColumnDefinition.staticDef(cfm, col.getKey().bytes, col.getValue(), componentIndex)
: ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
}
return columnDefs;
}
#endif
virtual bool announce_migration(bool is_local_only) override {
try {
service::migration_manager::announce_new_column_family(get_cf_meta_data(), is_local_only);
return true;
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return false;
}
throw e;
}
}
virtual shared_ptr<transport::event::schema_change> change_event() override {
return make_shared<transport::event::schema_change>(transport::event::schema_change::change_type::CREATED, transport::event::schema_change::target_type::TABLE, keyspace(), column_family());
}
/**
* Returns a CFMetaData instance based on the parameters parsed from this
* <code>CREATE</code> statement, or defaults where applicable.
*
* @return a CFMetaData instance corresponding to the values parsed from this statement
* @throws InvalidRequestException on failure to validate parsed parameters
*/
schema_ptr get_cf_meta_data() {
auto s = make_lw_shared(schema(keyspace(), column_family(),
// partition key
{},
// clustering key
{},
// regular columns
{},
// static columns
{},
// regular column name type
utf8_type,
// comment
""
));
apply_properties_to(s);
return s;
}
void apply_properties_to(schema_ptr s) {
#if 0
cfmd.defaultValidator(defaultValidator)
.keyValidator(keyValidator)
.addAllColumnDefinitions(getColumns(cfmd))
.isDense(isDense);
addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
#endif
_properties->apply_to_schema(s);
}
#if 0
private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
{
if (comparator instanceof CompositeType)
{
CompositeType ct = (CompositeType)comparator;
for (int i = 0; i < aliases.size(); ++i)
if (aliases.get(i) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
}
else
{
assert aliases.size() <= 1;
if (!aliases.isEmpty() && aliases.get(0) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind));
}
}
#endif
class raw_statement;
};
class create_table_statement::raw_statement : public cf_statement {
private:
std::unordered_map<::shared_ptr<column_identifier>, ::shared_ptr<cql3_type::raw>> _definitions;
public:
const ::shared_ptr<cf_prop_defs> properties = ::make_shared<cf_prop_defs>();
private:
std::vector<std::vector<::shared_ptr<column_identifier>>> _key_aliases;
std::vector<::shared_ptr<column_identifier>> _column_aliases;
std::vector<std::pair<::shared_ptr<column_identifier>, bool>> defined_ordering; // Insertion ordering is important
std::set<::shared_ptr<column_identifier>> _static_columns;
bool _use_compact_storage = false;
std::multiset<::shared_ptr<column_identifier>> _defined_names;
bool _if_not_exists;
public:
raw_statement(::shared_ptr<cf_name> name, bool if_not_exists)
: cf_statement{std::move(name)}
, _if_not_exists{if_not_exists}
{ }
virtual ::shared_ptr<prepared> prepare(database& db) override {
#if 0
// Column family name
if (!columnFamily().matches("\\w+"))
throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character only: [0-9A-Za-z]+)", columnFamily()));
if (columnFamily().length() > Schema.NAME_LENGTH)
throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, columnFamily()));
for (Multiset.Entry<ColumnIdentifier> entry : definedNames.entrySet())
if (entry.getCount() > 1)
throw new InvalidRequestException(String.format("Multiple definition of identifier %s", entry.getElement()));
#endif
properties->validate();
auto stmt = ::make_shared<create_table_statement>(_cf_name, properties, _if_not_exists, _static_columns);
#if 0
Map<ByteBuffer, CollectionType> definedMultiCellCollections = null;
for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
{
ColumnIdentifier id = entry.getKey();
CQL3Type pt = entry.getValue().prepare(keyspace());
if (pt.isCollection() && ((CollectionType) pt.getType()).isMultiCell())
{
if (definedMultiCellCollections == null)
definedMultiCellCollections = new HashMap<>();
definedMultiCellCollections.put(id.bytes, (CollectionType) pt.getType());
}
stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
}
if (keyAliases.isEmpty())
throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
else if (keyAliases.size() > 1)
throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
List<ColumnIdentifier> kAliases = keyAliases.get(0);
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
for (ColumnIdentifier alias : kAliases)
{
stmt.keyAliases.add(alias.bytes);
AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
if (t instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
if (staticColumns.contains(alias))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
keyTypes.add(t);
}
stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
// Dense means that no part of the comparator stores a CQL column name. This means
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
stmt.isDense = useCompactStorage && !columnAliases.isEmpty();
// Handle column aliases
if (columnAliases.isEmpty())
{
if (useCompactStorage)
{
// There should remain some column definition since it is a non-composite "static" CF
if (stmt.columns.isEmpty())
throw new InvalidRequestException("No definition found that is not part of the PRIMARY KEY");
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Non-frozen collection types are not supported with COMPACT STORAGE");
stmt.comparator = new SimpleSparseCellNameType(UTF8Type.instance);
}
else
{
stmt.comparator = definedMultiCellCollections == null
? new CompoundSparseCellNameType(Collections.<AbstractType<?>>emptyList())
: new CompoundSparseCellNameType.WithCollection(Collections.<AbstractType<?>>emptyList(), ColumnToCollectionType.getInstance(definedMultiCellCollections));
}
}
else
{
// If we use compact storage and have only one alias, it is a
// standard "dynamic" CF, otherwise it's a composite
if (useCompactStorage && columnAliases.size() == 1)
{
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
ColumnIdentifier alias = columnAliases.get(0);
if (staticColumns.contains(alias))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
stmt.columnAliases.add(alias.bytes);
AbstractType<?> at = getTypeAndRemove(stmt.columns, alias);
if (at instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
stmt.comparator = new SimpleDenseCellNameType(at);
}
else
{
List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(columnAliases.size() + 1);
for (ColumnIdentifier t : columnAliases)
{
stmt.columnAliases.add(t.bytes);
AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
if (type instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t));
if (staticColumns.contains(t))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
types.add(type);
}
if (useCompactStorage)
{
if (definedMultiCellCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
stmt.comparator = new CompoundDenseCellNameType(types);
}
else
{
stmt.comparator = definedMultiCellCollections == null
? new CompoundSparseCellNameType(types)
: new CompoundSparseCellNameType.WithCollection(types, ColumnToCollectionType.getInstance(definedMultiCellCollections));
}
}
}
if (!staticColumns.isEmpty())
{
// Only CQL3 tables can have static columns
if (useCompactStorage)
throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
// Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
if (columnAliases.isEmpty())
throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
}
if (useCompactStorage && !stmt.columnAliases.isEmpty())
{
if (stmt.columns.isEmpty())
{
// The only value we'll insert will be the empty one, so the default validator don't matter
stmt.defaultValidator = BytesType.instance;
// We need to distinguish between
// * I'm upgrading from thrift so the valueAlias is null
// * I've defined my table with only a PK (and the column value will be empty)
// So, we use an empty valueAlias (rather than null) for the second case
stmt.valueAlias = ByteBufferUtil.EMPTY_BYTE_BUFFER;
}
else
{
if (stmt.columns.size() > 1)
throw new InvalidRequestException(String.format("COMPACT STORAGE with composite PRIMARY KEY allows no more than one column not part of the PRIMARY KEY (got: %s)", StringUtils.join(stmt.columns.keySet(), ", ")));
Map.Entry<ColumnIdentifier, AbstractType> lastEntry = stmt.columns.entrySet().iterator().next();
stmt.defaultValidator = lastEntry.getValue();
stmt.valueAlias = lastEntry.getKey().bytes;
stmt.columns.remove(lastEntry.getKey());
}
}
else
{
// For compact, we are in the "static" case, so we need at least one column defined. For non-compact however, having
// just the PK is fine since we have CQL3 row marker.
if (useCompactStorage && stmt.columns.isEmpty())
throw new InvalidRequestException("COMPACT STORAGE with non-composite PRIMARY KEY require one column not part of the PRIMARY KEY, none given");
// There is no way to insert/access a column that is not defined for non-compact storage, so
// the actual validator don't matter much (except that we want to recognize counter CF as limitation apply to them).
stmt.defaultValidator = !stmt.columns.isEmpty() && (stmt.columns.values().iterator().next() instanceof CounterColumnType)
? CounterColumnType.instance
: BytesType.instance;
}
// If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK
if (!definedOrdering.isEmpty())
{
if (definedOrdering.size() > columnAliases.size())
throw new InvalidRequestException("Only clustering key columns can be defined in CLUSTERING ORDER directive");
int i = 0;
for (ColumnIdentifier id : definedOrdering.keySet())
{
ColumnIdentifier c = columnAliases.get(i);
if (!id.equals(c))
{
if (definedOrdering.containsKey(c))
throw new InvalidRequestException(String.format("The order of columns in the CLUSTERING ORDER directive must be the one of the clustering key (%s must appear before %s)", c, id));
else
throw new InvalidRequestException(String.format("Missing CLUSTERING ORDER for column %s", c));
}
++i;
}
}
#endif
return ::make_shared<parsed_statement::prepared>(stmt);
}
#if 0
private AbstractType<?> getTypeAndRemove(Map<ColumnIdentifier, AbstractType> columns, ColumnIdentifier t) throws InvalidRequestException
{
AbstractType type = columns.get(t);
if (type == null)
throw new InvalidRequestException(String.format("Unknown definition %s referenced in PRIMARY KEY", t));
if (type.isCollection() && type.isMultiCell())
throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
columns.remove(t);
Boolean isReversed = definedOrdering.get(t);
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
}
#endif
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
_defined_names.emplace(def);
_definitions.emplace(def, type);
if (is_static) {
_static_columns.emplace(def);
}
}
void add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases) {
_key_aliases.emplace_back(aliases);
}
void add_column_alias(::shared_ptr<column_identifier> alias) {
_column_aliases.emplace_back(alias);
}
void set_ordering(::shared_ptr<column_identifier> alias, bool reversed) {
defined_ordering.emplace_back(alias, reversed);
}
void set_compact_storage() {
_use_compact_storage = true;
}
};
}
}

View File

@@ -338,7 +338,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
}
void
modification_statement::add_key_values(column_definition& def, ::shared_ptr<restrictions::restriction> values) {
modification_statement::add_key_values(const column_definition& def, ::shared_ptr<restrictions::restriction> values) {
if (def.is_clustering_key()) {
_has_no_clustering_columns = false;
}
@@ -350,7 +350,7 @@ modification_statement::add_key_values(column_definition& def, ::shared_ptr<rest
}
void
modification_statement::add_key_value(column_definition& def, ::shared_ptr<term> value) {
modification_statement::add_key_value(const column_definition& def, ::shared_ptr<term> value) {
add_key_values(def, ::make_shared<restrictions::single_column_restriction::EQ>(def, value));
}
@@ -421,7 +421,7 @@ modification_statement::parsed::prepare(database& db, ::shared_ptr<variable_spec
} else {
for (auto&& entry : _conditions) {
auto id = entry.first->prepare_column_identifier(schema);
column_definition* def = get_column_definition(schema, *id);
const column_definition* def = get_column_definition(schema, *id);
if (!def) {
throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id));
}

View File

@@ -116,8 +116,8 @@ private:
bool _sets_static_columns = false;
bool _sets_regular_columns = false;
const std::function<column_definition&(::shared_ptr<column_condition>)> get_column_for_condition =
[](::shared_ptr<column_condition> cond) -> column_definition& {
const std::function<const column_definition&(::shared_ptr<column_condition>)> get_column_for_condition =
[](::shared_ptr<column_condition> cond) -> const column_definition& {
return cond->column;
};
@@ -250,10 +250,10 @@ public:
}
private:
void add_key_values(column_definition& def, ::shared_ptr<restrictions::restriction> values);
void add_key_values(const column_definition& def, ::shared_ptr<restrictions::restriction> values);
public:
void add_key_value(column_definition& def, ::shared_ptr<term> value);
void add_key_value(const column_definition& def, ::shared_ptr<term> value);
void process_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names);
std::vector<partition_key> build_partition_keys(const query_options& options);

View File

@@ -33,6 +33,7 @@
#include <algorithm>
#include <cctype>
#include <string>
#include <set>
#include <boost/any.hpp>

View File

@@ -52,7 +52,7 @@ namespace messages = transport::messages;
/**
* Abstract class for statements that alter the schema.
*/
class schema_altering_statement : public cf_statement, public virtual cql_statement, public ::enable_shared_from_this<schema_altering_statement> {
class schema_altering_statement : public cf_statement, public cql_statement, public ::enable_shared_from_this<schema_altering_statement> {
private:
const bool _is_column_family_level;

View File

@@ -25,7 +25,7 @@ get_column_types(const Sequence& column_definitions) {
}
::shared_ptr<cql3::column_specification>
schema::make_column_specification(column_definition& def) {
schema::make_column_specification(const column_definition& def) {
auto id = ::make_shared<cql3::column_identifier>(def.name(), column_name_type(def));
return ::make_shared<cql3::column_specification>(ks_name, cf_name, std::move(id), def.type);
}
@@ -39,11 +39,23 @@ schema::build_columns(const std::vector<column>& columns, column_definition::col
auto& col = columns[i];
dst.emplace_back(std::move(col.name), std::move(col.type), i, kind);
column_definition& def = dst.back();
_columns_by_name[def.name()] = &def;
def.column_specification = make_column_specification(def);
}
}
void schema::rehash_columns() {
_columns_by_name.clear();
_regular_columns_by_name.clear();
for (const column_definition& def : all_columns_in_select_order()) {
_columns_by_name[def.name()] = &def;
}
for (const column_definition& def : _regular_columns) {
_regular_columns_by_name[def.name()] = &def;
}
}
schema::schema(sstring ks_name, sstring cf_name, std::vector<column> partition_key,
std::vector<column> clustering_key,
std::vector<column> regular_columns,
@@ -51,14 +63,15 @@ schema::schema(sstring ks_name, sstring cf_name, std::vector<column> partition_k
data_type regular_column_name_type,
sstring comment)
: _regular_columns_by_name(serialized_compare(regular_column_name_type))
, _comment(comment)
, ks_name(std::move(ks_name))
, cf_name(std::move(cf_name))
, partition_key_type(::make_lw_shared<tuple_type<>>(get_column_types(partition_key)))
, clustering_key_type(::make_lw_shared<tuple_type<>>(get_column_types(clustering_key)))
, clustering_key_prefix_type(::make_lw_shared(clustering_key_type->as_prefix()))
, regular_column_name_type(regular_column_name_type)
{
this->_comment = std::move(comment);
this->ks_name = std::move(ks_name);
this->cf_name = std::move(cf_name);
this->partition_key_type = ::make_lw_shared<tuple_type<>>(get_column_types(partition_key));
this->clustering_key_type = ::make_lw_shared<tuple_type<>>(get_column_types(clustering_key));
this->clustering_key_prefix_type = ::make_lw_shared(clustering_key_type->as_prefix());
this->regular_column_name_type = regular_column_name_type;
if (partition_key.size() == 1) {
thrift.partition_key_type = partition_key[0].type;
} else {
@@ -71,12 +84,17 @@ schema::schema(sstring ks_name, sstring cf_name, std::vector<column> partition_k
std::sort(regular_columns.begin(), regular_columns.end(), column::name_compare(regular_column_name_type));
build_columns(regular_columns, column_definition::column_kind::REGULAR, _regular_columns);
for (column_definition& def : _regular_columns) {
_regular_columns_by_name[def.name()] = &def;
}
std::sort(static_columns.begin(), static_columns.end(), column::name_compare(utf8_type));
build_columns(static_columns, column_definition::column_kind::STATIC, _static_columns);
rehash_columns();
}
schema::schema(const schema& o)
: raw_schema(o)
, _regular_columns_by_name(serialized_compare(regular_column_name_type)) {
rehash_columns();
}
column_family::column_family(schema_ptr schema)
@@ -275,7 +293,7 @@ column_definition::column_definition(bytes name, data_type type, column_id id, c
, kind(kind)
{ }
column_definition* schema::get_column_definition(const bytes& name) {
const column_definition* schema::get_column_definition(const bytes& name) {
auto i = _columns_by_name.find(name);
if (i == _columns_by_name.end()) {
return nullptr;

View File

@@ -62,18 +62,33 @@ struct thrift_schema {
shared_ptr<abstract_type> partition_key_type;
};
/*
* Keep this effectively immutable.
*/
class schema final {
private:
std::unordered_map<bytes, column_definition*> _columns_by_name;
std::map<bytes, column_definition*, serialized_compare> _regular_columns_by_name;
// Schema fields which can be safely default-copied
// FIXME: encapsulate public fields so that we can make this a private inner structure of schema
class raw_schema {
protected:
std::vector<column_definition> _partition_key;
std::vector<column_definition> _clustering_key;
std::vector<column_definition> _regular_columns; // sorted by name
std::vector<column_definition> _static_columns; // sorted by name, present only when there's any clustering column
sstring _comment;
public:
gc_clock::duration default_time_to_live = gc_clock::duration::zero();
sstring ks_name;
sstring cf_name;
lw_shared_ptr<tuple_type<>> partition_key_type;
lw_shared_ptr<tuple_type<>> clustering_key_type;
lw_shared_ptr<tuple_prefix> clustering_key_prefix_type;
data_type regular_column_name_type;
thrift_schema thrift;
};
/*
* Keep this effectively immutable.
*/
class schema final : public raw_schema {
private:
std::unordered_map<bytes, const column_definition*> _columns_by_name;
std::map<bytes, const column_definition*, serialized_compare> _regular_columns_by_name;
public:
struct column {
bytes name;
@@ -88,16 +103,8 @@ public:
};
private:
void build_columns(const std::vector<column>& columns, column_definition::column_kind kind, std::vector<column_definition>& dst);
::shared_ptr<cql3::column_specification> make_column_specification(column_definition& def);
public:
gc_clock::duration default_time_to_live = gc_clock::duration::zero();
const sstring ks_name;
const sstring cf_name;
lw_shared_ptr<tuple_type<>> partition_key_type;
lw_shared_ptr<tuple_type<>> clustering_key_type;
lw_shared_ptr<tuple_prefix> clustering_key_prefix_type;
data_type regular_column_name_type;
thrift_schema thrift;
::shared_ptr<cql3::column_specification> make_column_specification(const column_definition& def);
void rehash_columns();
public:
schema(sstring ks_name, sstring cf_name,
std::vector<column> partition_key,
@@ -106,13 +113,17 @@ public:
std::vector<column> static_columns,
shared_ptr<abstract_type> regular_column_name_type,
sstring comment = {});
schema(const schema&);
void set_comment(const sstring& comment) {
_comment = comment;
}
bool is_dense() const {
return false;
}
bool is_counter() const {
return false;
}
column_definition* get_column_definition(const bytes& name);
const column_definition* get_column_definition(const bytes& name);
auto regular_begin() {
return _regular_columns.begin();
}
@@ -137,7 +148,7 @@ public:
return _regular_columns.begin() + i->second->id;
}
}
data_type column_name_type(column_definition& def) {
data_type column_name_type(const column_definition& def) {
return def.kind == column_definition::column_kind::REGULAR ? regular_column_name_type : utf8_type;
}
column_definition& regular_column_at(column_id id) {
@@ -146,7 +157,7 @@ public:
column_definition& static_column_at(column_id id) {
return _static_columns[id];
}
bool is_last_partition_key(column_definition& def) {
bool is_last_partition_key(const column_definition& def) {
return &_partition_key[_partition_key.size() - 1] == &def;
}
bool has_collections() {

View File

@@ -294,9 +294,11 @@ public:
{
announceNewColumnFamily(cfm, false);
}
#endif
public static void announceNewColumnFamily(CFMetaData cfm, boolean announceLocally) throws ConfigurationException
{
static void announce_new_column_family(schema_ptr cfm, bool announce_locally) {
warn(unimplemented::cause::MIGRATIONS);
#if 0
cfm.validate();
KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
@@ -307,8 +309,10 @@ public:
logger.info(String.format("Create new table: %s", cfm));
announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally);
#endif
}
#if 0
public static void announceNewType(UserType newType, boolean announceLocally)
{
KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace);

View File

@@ -19,7 +19,7 @@ int main(int argc, char* argv[]) {
time_it([&] {
mutation m(key, s);
column_definition& col = *s->get_column_definition("r1");
const column_definition& col = *s->get_column_definition("r1");
m.set_clustered_cell(c_key, col, make_atomic_cell(value));
cf.apply(std::move(m));
});

View File

@@ -93,6 +93,17 @@ SEASTAR_TEST_CASE(test_create_keyspace_statement) {
});
}
SEASTAR_TEST_CASE(test_create_table_statement) {
auto db = make_shared<distributed<database>>();
auto state = make_shared<conversation_state>(*db, ks_name);
return db->start().then([state] {
return state->execute_cql("create table users (user_name varchar PRIMARY KEY, birth_year bigint);").discard_result();
}).finally([db] {
return db->stop().finally([db] {});
});
}
SEASTAR_TEST_CASE(test_insert_statement) {
auto db = make_shared<distributed<database>>();
auto state = make_shared<conversation_state>(*db, ks_name);

View File

@@ -23,7 +23,7 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
column_family cf(s);
column_definition& r1_col = *s->get_column_definition("r1");
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(2)});