Merge "CQL: Fix create table statement"

From Pekka:

"This series fixes create table statement to actually specify columns.
Note that clustering keys are not yet supported."
This commit is contained in:
Avi Kivity
2015-06-07 15:05:21 +03:00
6 changed files with 241 additions and 175 deletions

View File

@@ -383,6 +383,7 @@ urchin_core = (['database.cc',
'cql3/maps.cc',
'cql3/functions/functions.cc',
'cql3/statements/cf_prop_defs.cc',
'cql3/statements/create_table_statement.cc',
'cql3/statements/schema_altering_statement.cc',
'cql3/statements/modification_statement.cc',
'cql3/statements/update_statement.cc',

View File

@@ -24,10 +24,7 @@ public:
}
virtual bool is_counter() const {
throw std::runtime_error("not implemented");
#if 0
return type == Native.COUNTER;
#endif
return _type->is_counter();
}
virtual sstring to_string() const {

View File

@@ -43,6 +43,7 @@ public:
cql3_type(sstring name, data_type type, bool native = true)
: _name(std::move(name)), _type(std::move(type)), _native(native) {}
bool is_collection() const { return _type->is_collection(); }
bool is_counter() const { return _type->is_counter(); }
bool is_native() const { return _native; }
data_type get_type() const { return _type; }
sstring to_string() const { return _name; }

View File

@@ -25,8 +25,10 @@
#pragma once
#include "cql3/statements/property_definitions.hh"
#include "schema.hh"
#include "database.hh"
#include "schema_builder.hh"
namespace cql3 {
@@ -173,6 +175,14 @@ public:
}
#endif
// Keep this in sync with apply_to_schema().
void apply_to_builder(schema_builder& builder) {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
}
}
// Keep this in sync with apply_to_builder().
void apply_to_schema(schema* s) {
if (has_property(KW_COMMENT)) {
s->set_comment(get_string(KW_COMMENT, ""));

View File

@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include "cql3/statements/create_table_statement.hh"
#include "schema_builder.hh"
namespace cql3 {
namespace statements {
create_table_statement::create_table_statement(::shared_ptr<cf_name> name,
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
std::set<::shared_ptr<column_identifier>> static_columns)
: schema_altering_statement{name}
, _static_columns{static_columns}
, _properties{properties}
, _if_not_exists{if_not_exists}
{
#if 0
try
{
if (!this.properties.hasProperty(CFPropDefs.KW_COMPRESSION) && CFMetaData.DEFAULT_COMPRESSOR != null)
this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
new HashMap<String, String>()
{{
put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
}});
}
catch (SyntaxException e)
{
throw new AssertionError(e);
}
#endif
}
void create_table_statement::check_access(const service::client_state& state) {
warn(unimplemented::cause::PERMISSIONS);
#if 0
state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
#endif
}
void create_table_statement::validate(service::storage_proxy&, const service::client_state& state) {
// validated in announceMigration()
}
// Column definitions
std::vector<column_definition> create_table_statement::get_columns()
{
std::vector<column_definition> column_defs;
for (auto&& col : _columns) {
column_kind kind = column_kind::regular_column;
if (_static_columns.count(col.first)) {
kind = column_kind::static_column;
}
column_defs.emplace_back(col.first->name(), col.second, kind);
}
return column_defs;
}
future<bool> create_table_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) {
return service::migration_manager::announce_new_column_family(proxy, get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) {
try {
f.get();
return true;
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return false;
}
throw e;
}
});
}
shared_ptr<transport::event::schema_change> create_table_statement::change_event() {
return make_shared<transport::event::schema_change>(transport::event::schema_change::change_type::CREATED, transport::event::schema_change::target_type::TABLE, keyspace(), column_family());
}
/**
* Returns a CFMetaData instance based on the parameters parsed from this
* <code>CREATE</code> statement, or defaults where applicable.
*
* @return a CFMetaData instance corresponding to the values parsed from this statement
* @throws InvalidRequestException on failure to validate parsed parameters
*/
schema_ptr create_table_statement::get_cf_meta_data() {
schema_builder builder{keyspace(), column_family()};
apply_properties_to(builder);
return builder.build();
}
void create_table_statement::apply_properties_to(schema_builder& builder) {
auto&& columns = get_columns();
for (auto&& column : columns) {
builder.with_column(column);
}
#if 0
cfmd.defaultValidator(defaultValidator)
.keyValidator(keyValidator)
.addAllColumnDefinitions(getColumns(cfmd))
.isDense(isDense);
#endif
add_column_metadata_from_aliases(builder, _key_aliases, _key_validator, column_kind::partition_key);
#if 0
addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
#endif
_properties->apply_to_builder(builder);
}
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, data_type comparator, column_kind kind)
{
#if 0
if (comparator instanceof CompositeType)
{
CompositeType ct = (CompositeType)comparator;
for (int i = 0; i < aliases.size(); ++i)
if (aliases.get(i) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
}
else
{
#endif
assert(aliases.size() <= 1);
if (!aliases.empty())
builder.with_column(aliases[0], comparator, kind);
#if 0
}
#endif
}
}
}

View File

@@ -51,151 +51,45 @@ class create_table_statement : public schema_altering_statement {
private:
#if 0
private AbstractType<?> defaultValidator;
private AbstractType<?> keyValidator;
private final List<ByteBuffer> keyAliases = new ArrayList<ByteBuffer>();
#endif
data_type _key_validator;
std::vector<bytes> _key_aliases;
#if 0
private final List<ByteBuffer> columnAliases = new ArrayList<ByteBuffer>();
private ByteBuffer valueAlias;
private boolean isDense;
private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
#endif
std::map<::shared_ptr<column_identifier>, data_type> _columns;
const std::set<::shared_ptr<column_identifier>> _static_columns;
const ::shared_ptr<cf_prop_defs> _properties;
const bool _if_not_exists;
public:
create_table_statement(::shared_ptr<cf_name> name, ::shared_ptr<cf_prop_defs> properties, bool if_not_exists, std::set<::shared_ptr<column_identifier>> static_columns)
: schema_altering_statement{name}
, _static_columns{static_columns}
, _properties{properties}
, _if_not_exists{if_not_exists}
{
#if 0
try
{
if (!this.properties.hasProperty(CFPropDefs.KW_COMPRESSION) && CFMetaData.DEFAULT_COMPRESSOR != null)
this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
new HashMap<String, String>()
{{
put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
}});
}
catch (SyntaxException e)
{
throw new AssertionError(e);
}
#endif
}
create_table_statement(::shared_ptr<cf_name> name,
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
std::set<::shared_ptr<column_identifier>> static_columns);
virtual void check_access(const service::client_state& state) override {
warn(unimplemented::cause::PERMISSIONS);
#if 0
state.hasKeyspaceAccess(keyspace(), Permission.CREATE);
#endif
}
virtual void check_access(const service::client_state& state) override;
virtual void validate(service::storage_proxy&, const service::client_state& state) override {
// validated in announceMigration()
}
virtual void validate(service::storage_proxy&, const service::client_state& state) override;
#if 0
// Column definitions
private List<ColumnDefinition> getColumns(CFMetaData cfm)
{
List<ColumnDefinition> columnDefs = new ArrayList<>(columns.size());
Integer componentIndex = comparator.isCompound() ? comparator.clusteringPrefixSize() : null;
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
{
ColumnIdentifier id = col.getKey();
columnDefs.add(staticColumns.contains(id)
? ColumnDefinition.staticDef(cfm, col.getKey().bytes, col.getValue(), componentIndex)
: ColumnDefinition.regularDef(cfm, col.getKey().bytes, col.getValue(), componentIndex));
}
virtual future<bool> announce_migration(service::storage_proxy& proxy, bool is_local_only) override;
return columnDefs;
}
#endif
virtual shared_ptr<transport::event::schema_change> change_event() override;
virtual future<bool> announce_migration(service::storage_proxy& proxy, bool is_local_only) override {
return service::migration_manager::announce_new_column_family(proxy, get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) {
try {
f.get();
return true;
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return false;
}
throw e;
}
});
}
schema_ptr get_cf_meta_data();
virtual shared_ptr<transport::event::schema_change> change_event() override {
return make_shared<transport::event::schema_change>(transport::event::schema_change::change_type::CREATED, transport::event::schema_change::target_type::TABLE, keyspace(), column_family());
}
/**
* Returns a CFMetaData instance based on the parameters parsed from this
* <code>CREATE</code> statement, or defaults where applicable.
*
* @return a CFMetaData instance corresponding to the values parsed from this statement
* @throws InvalidRequestException on failure to validate parsed parameters
*/
schema_ptr get_cf_meta_data() {
auto s = make_lw_shared(schema({}, keyspace(), column_family(),
// partition key
{},
// clustering key
{},
// regular columns
{},
// static columns
{},
// regular column name type
utf8_type,
// comment
""
));
apply_properties_to(s.get());
return s;
}
void apply_properties_to(schema* s) {
#if 0
cfmd.defaultValidator(defaultValidator)
.keyValidator(keyValidator)
.addAllColumnDefinitions(getColumns(cfmd))
.isDense(isDense);
addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY);
addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN);
if (valueAlias != null)
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
#endif
_properties->apply_to_schema(s);
}
#if 0
private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind)
{
if (comparator instanceof CompositeType)
{
CompositeType ct = (CompositeType)comparator;
for (int i = 0; i < aliases.size(); ++i)
if (aliases.get(i) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind));
}
else
{
assert aliases.size() <= 1;
if (!aliases.isEmpty() && aliases.get(0) != null)
cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind));
}
}
#endif
class raw_statement;
friend raw_statement;
private:
std::vector<column_definition> get_columns();
void apply_properties_to(schema_builder& builder);
void add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, data_type comparator, column_kind kind);
};
class create_table_statement::raw_statement : public cf_statement {
@@ -235,39 +129,39 @@ public:
auto stmt = ::make_shared<create_table_statement>(_cf_name, properties, _if_not_exists, _static_columns);
#if 0
Map<ByteBuffer, CollectionType> definedMultiCellCollections = null;
for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
{
ColumnIdentifier id = entry.getKey();
CQL3Type pt = entry.getValue().prepare(keyspace());
if (pt.isCollection() && ((CollectionType) pt.getType()).isMultiCell())
{
if (definedMultiCellCollections == null)
definedMultiCellCollections = new HashMap<>();
definedMultiCellCollections.put(id.bytes, (CollectionType) pt.getType());
std::map<bytes, data_type> defined_multi_cell_collections;
for (auto&& entry : _definitions) {
::shared_ptr<column_identifier> id = entry.first;
::shared_ptr<cql3_type> pt = entry.second->prepare(db, keyspace());
if (pt->is_collection() && pt->get_type()->is_multi_cell()) {
defined_multi_cell_collections.emplace(id->name(), pt->get_type());
}
stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
stmt->_columns.emplace(id, pt->get_type()); // we'll remove what is not a column below
}
if (_key_aliases.empty()) {
throw exceptions::invalid_request_exception("No PRIMARY KEY specifed (exactly one required)");
} else if (_key_aliases.size() > 1) {
throw exceptions::invalid_request_exception("Multiple PRIMARY KEYs specifed (exactly one required)");
}
if (keyAliases.isEmpty())
throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
else if (keyAliases.size() > 1)
throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
List<ColumnIdentifier> kAliases = keyAliases.get(0);
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
for (ColumnIdentifier alias : kAliases)
{
stmt.keyAliases.add(alias.bytes);
AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
if (t instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
if (staticColumns.contains(alias))
throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
keyTypes.add(t);
auto& key_aliases = _key_aliases[0];
std::vector<data_type> key_types;
for (auto&& alias : key_aliases) {
stmt->_key_aliases.emplace_back(alias->name());
auto t = get_type_and_remove(stmt->_columns, alias);
if (t->is_counter()) {
throw exceptions::invalid_request_exception(sprint("counter type is not supported for PRIMARY KEY part %s", alias->text()));
}
if (_static_columns.count(alias) > 0) {
throw exceptions::invalid_request_exception(sprint("Static column %s cannot be part of the PRIMARY KEY", alias->text()));
}
key_types.emplace_back(t);
}
if (key_types.size() > 1) {
throw std::runtime_error("compound key types are not supported");
}
stmt->_key_validator = key_types[0];
#if 0
stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
// Dense means that no part of the comparator stores a CQL column name. This means
@@ -418,20 +312,23 @@ public:
return ::make_shared<parsed_statement::prepared>(stmt);
}
#if 0
private AbstractType<?> getTypeAndRemove(Map<ColumnIdentifier, AbstractType> columns, ColumnIdentifier t) throws InvalidRequestException
{
AbstractType type = columns.get(t);
if (type == null)
throw new InvalidRequestException(String.format("Unknown definition %s referenced in PRIMARY KEY", t));
if (type.isCollection() && type.isMultiCell())
throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
columns.remove(t);
Boolean isReversed = definedOrdering.get(t);
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
data_type get_type_and_remove(std::map<::shared_ptr<column_identifier>, data_type>& columns, ::shared_ptr<column_identifier> t)
{
if (columns.count(t) == 0) {
throw exceptions::invalid_request_exception(sprint("Unknown definition %s referenced in PRIMARY KEY", t->text()));
}
auto type = columns.at(t);
if (type->is_collection() && type->is_multi_cell()) {
throw exceptions::invalid_request_exception(sprint("Invalid collection type for PRIMARY KEY component %s", t->text()));
}
columns.erase(t);
#if 0
// FIXME: reversed types are not supported
Boolean isReversed = definedOrdering.get(t);
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
#endif
return type;
}
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
_defined_names.emplace(def);