Merge remote-tracking branch 'dev/penberg/create-keyspace-stmt'

From Pekka:

This series adds 'create keyspace' support to the CQL parser and AST
executor. As a side-effect, we pull metadata classes from config as well
as migration manager from services.

Please note that migration manager is a stub for now so no actual
keyspace is created in the database internals.
This commit is contained in:
Tomasz Grabiec
2015-03-12 10:31:32 +01:00
13 changed files with 1051 additions and 185 deletions

197
config/ks_meta_data.hh Normal file
View File

@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "config/ut_meta_data.hh"
#include "schema.hh"
#include "core/shared_ptr.hh"
namespace config {
class ks_meta_data final {
public:
#if 0
public final String name;
public final Class<? extends AbstractReplicationStrategy> strategyClass;
public final Map<String, String> strategyOptions;
private final Map<String, CFMetaData> cfMetaData;
public final boolean durableWrites;
public final UTMetaData userTypes;
public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites)
{
this(name, strategyClass, strategyOptions, durableWrites, Collections.<CFMetaData>emptyList(), new UTMetaData());
}
public KSMetaData(String name,
Class<? extends AbstractReplicationStrategy> strategyClass,
Map<String, String> strategyOptions,
boolean durableWrites,
Iterable<CFMetaData> cfDefs)
{
this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData());
}
#endif
ks_meta_data(sstring name,
sstring strategy_name,
std::unordered_map<sstring, sstring> strategy_options,
bool durable_writes,
std::vector<schema_ptr> cf_defs,
shared_ptr<ut_meta_data> user_types)
{
#if 0
this.name = name;
this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
this.strategyOptions = strategyOptions;
Map<String, CFMetaData> cfmap = new HashMap<>();
for (CFMetaData cfm : cfDefs)
cfmap.put(cfm.cfName, cfm);
this.cfMetaData = Collections.unmodifiableMap(cfmap);
this.durableWrites = durableWrites;
this.userTypes = userTypes;
#endif
}
// For new user created keyspaces (through CQL)
static lw_shared_ptr<ks_meta_data> new_keyspace(sstring name, sstring strategy_name, std::unordered_map<sstring, sstring> options, bool durable_writes) {
#if 0
Class<? extends AbstractReplicationStrategy> cls = AbstractReplicationStrategy.getClass(strategyName);
if (cls.equals(LocalStrategy.class))
throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
#endif
return new_keyspace(name, strategy_name, options, durable_writes, std::vector<schema_ptr>{});
}
static lw_shared_ptr<ks_meta_data> new_keyspace(sstring name, sstring strategy_name, std::unordered_map<sstring, sstring> options, bool durables_writes, std::vector<schema_ptr> cf_defs)
{
return ::make_lw_shared<ks_meta_data>(name, strategy_name, options, durables_writes, cf_defs, ::make_shared<ut_meta_data>());
}
#if 0
public KSMetaData cloneWithTableRemoved(CFMetaData table)
{
// clone ksm but do not include the new table
List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
newTables.remove(table);
assert newTables.size() == cfMetaData().size() - 1;
return cloneWith(newTables, userTypes);
}
public KSMetaData cloneWithTableAdded(CFMetaData table)
{
// clone ksm but include the new table
List<CFMetaData> newTables = new ArrayList<>(cfMetaData().values());
newTables.add(table);
assert newTables.size() == cfMetaData().size() + 1;
return cloneWith(newTables, userTypes);
}
public KSMetaData cloneWith(Iterable<CFMetaData> tables, UTMetaData types)
{
return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types);
}
public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
{
return new KSMetaData(name, strategyClass, strategyOptions, true, Arrays.asList(cfDefs));
}
public static KSMetaData testMetadataNotDurable(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
{
return new KSMetaData(name, strategyClass, strategyOptions, false, Arrays.asList(cfDefs));
}
@Override
public int hashCode()
{
return Objects.hashCode(name, strategyClass, strategyOptions, cfMetaData, durableWrites, userTypes);
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (!(o instanceof KSMetaData))
return false;
KSMetaData other = (KSMetaData) o;
return Objects.equal(name, other.name)
&& Objects.equal(strategyClass, other.strategyClass)
&& Objects.equal(strategyOptions, other.strategyOptions)
&& Objects.equal(cfMetaData, other.cfMetaData)
&& Objects.equal(durableWrites, other.durableWrites)
&& Objects.equal(userTypes, other.userTypes);
}
public Map<String, CFMetaData> cfMetaData()
{
return cfMetaData;
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("name", name)
.add("strategyClass", strategyClass.getSimpleName())
.add("strategyOptions", strategyOptions)
.add("cfMetaData", cfMetaData)
.add("durableWrites", durableWrites)
.add("userTypes", userTypes)
.toString();
}
public static Map<String,String> optsWithRF(final Integer rf)
{
return Collections.singletonMap("replication_factor", rf.toString());
}
public KSMetaData validate() throws ConfigurationException
{
if (!CFMetaData.isNameValid(name))
throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", Schema.NAME_LENGTH, name));
// Attempt to instantiate the ARS, which will throw a ConfigException if the strategy_options aren't fully formed
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
AbstractReplicationStrategy.validateReplicationStrategy(name, strategyClass, tmd, eps, strategyOptions);
for (CFMetaData cfm : cfMetaData.values())
cfm.validate();
return this;
}
#endif
};
}

83
config/ut_meta_data.hh Normal file
View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
namespace config {
/**
* Defined (and loaded) user types.
*
* In practice, because user types are global, we have only one instance of
* this class that retrieve through the Schema class.
*/
class ut_meta_data final {
#if 0
private final Map<ByteBuffer, UserType> userTypes;
public UTMetaData()
{
this(new HashMap<ByteBuffer, UserType>());
}
public UTMetaData(Map<ByteBuffer, UserType> types)
{
this.userTypes = types;
}
public UserType getType(ByteBuffer typeName)
{
return userTypes.get(typeName);
}
public Map<ByteBuffer, UserType> getAllTypes()
{
// Copy to avoid concurrent modification while iterating. Not intended to be called on a critical path anyway
return new HashMap<>(userTypes);
}
// This is *not* thread safe but is only called in Schema that is synchronized.
public void addType(UserType type)
{
UserType old = userTypes.get(type.name);
assert old == null || type.isCompatibleWith(old);
userTypes.put(type.name, type);
}
// Same remarks than for addType
public void removeType(UserType type)
{
userTypes.remove(type.name);
}
public boolean equals(Object that)
{
if (!(that instanceof UTMetaData))
return false;
return userTypes.equals(((UTMetaData) that).userTypes);
}
#endif
};
}

View File

@@ -153,44 +153,39 @@ using operations_type = std::vector<std::pair<::shared_ptr<cql3::column_identifi
listener->syntax_error(*this, msg);
}
std::map<sstring, sstring> convert_property_map(shared_ptr<cql3::maps::literal> map) {
throw std::runtime_error("not implemented");
#if 0
if (map == null || map.entries == null || map.entries.isEmpty())
return Collections.<String, String>emptyMap();
Map<String, String> res = new HashMap<String, String>(map.entries.size());
for (Pair<Term.Raw, Term.Raw> entry : map.entries)
{
std::unordered_map<sstring, sstring> convert_property_map(shared_ptr<cql3::maps::literal> map) {
if (!map || map->entries.empty()) {
return std::unordered_map<sstring, sstring>{};
}
std::unordered_map<sstring, sstring> res{map->entries.size()};
for (auto&& entry : map->entries) {
// Because the parser tries to be smart and recover on error (to
// allow displaying more than one error I suppose), we have null
// entries in there. Just skip those, a proper error will be thrown in the end.
if (entry.left == null || entry.right == null)
break;
if (!(entry.left instanceof Constants.Literal))
{
String msg = "Invalid property name: " + entry.left;
if (entry.left instanceof AbstractMarker.Raw)
msg += " (bind variables are not supported in DDL queries)";
addRecognitionError(msg);
if (!entry.first || !entry.second) {
break;
}
if (!(entry.right instanceof Constants.Literal))
{
String msg = "Invalid property value: " + entry.right + " for property: " + entry.left;
if (entry.right instanceof AbstractMarker.Raw)
auto left = dynamic_pointer_cast<cql3::constants::literal>(entry.first);
if (!left) {
sstring msg = "Invalid property name: " + entry.first->to_string();
if (dynamic_pointer_cast<cql3::abstract_marker::raw>(entry.first)) {
msg += " (bind variables are not supported in DDL queries)";
addRecognitionError(msg);
}
add_recognition_error(msg);
break;
}
res.put(((Constants.Literal)entry.left).getRawText(), ((Constants.Literal)entry.right).getRawText());
auto right = dynamic_pointer_cast<cql3::constants::literal>(entry.second);
if (!right) {
sstring msg = "Invalid property value: " + entry.first->to_string() + " for property: " + entry.second->to_string();
if (dynamic_pointer_cast<cql3::abstract_marker::raw>(entry.second)) {
msg += " (bind variables are not supported in DDL queries)";
}
add_recognition_error(msg);
break;
}
res.emplace(left->get_raw_text(), right->get_raw_text());
}
return res;
#endif
}
void add_raw_update(std::vector<std::pair<::shared_ptr<cql3::column_identifier::raw>,::shared_ptr<cql3::operation::raw_update>>>& operations,

View File

@@ -26,6 +26,9 @@
#include "cql3/statements/schema_altering_statement.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "service/migration_manager.hh"
#include "config/ks_meta_data.hh"
#include "transport/event.hh"
#include "core/shared_ptr.hh"
@@ -104,12 +107,14 @@ public:
throw new InvalidRequestException(String.format("\"%s\" is not a valid keyspace name", name));
if (name.length() > Schema.NAME_LENGTH)
throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, name));
#endif
attrs.validate();
if (attrs.getReplicationStrategyClass() == null)
throw new ConfigurationException("Missing mandatory replication strategy class");
_attrs->validate();
if (!bool(_attrs->get_replication_strategy_class())) {
throw exceptions::configuration_exception("Missing mandatory replication strategy class");
}
#if 0
// The strategy is validated through KSMetaData.validate() in announceNewKeyspace below.
// However, for backward compatibility with thrift, this doesn't validate unexpected options yet,
// so doing proper validation here.
@@ -122,24 +127,19 @@ public:
}
virtual bool announce_migration(bool is_local_only) override {
throw std::runtime_error("not implemented");
#if 0
try {
MigrationManager.announceNewKeyspace(attrs.asKSMetadata(name), isLocalOnly);
service::migration_manager::announce_new_keyspace(_attrs->as_ks_metadata(_name), is_local_only);
return true;
} catch (AlreadyExistsException e) {
if (ifNotExists)
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return false;
}
throw e;
}
#endif
}
virtual shared_ptr<transport::event::schema_change> change_event() override {
throw std::runtime_error("not implemented");
#if 0
return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, keyspace());
#endif
return make_shared<transport::event::schema_change>(transport::event::schema_change::change_type::CREATED, keyspace());
}
};

View File

@@ -26,27 +26,23 @@
#define CQL3_STATEMENTS_KS_PROP_DEFS_HH
#include "cql3/statements/property_definitions.hh"
#include "config/ks_meta_data.hh"
#include "core/sstring.hh"
#include <experimental/optional>
namespace cql3 {
namespace statements {
#if 0
package org.apache.cassandra.cql3.statements;
import java.util.*;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.exceptions.*;
#endif
class ks_prop_defs : public property_definitions {
public:
static constexpr auto KW_DURABLE_WRITES = "durable_writes";
static constexpr auto KW_REPLICATION = "replication";
static constexpr auto REPLICATION_STRATEGY_CLASS_KEY = "class";
#if 0
public static final String KW_DURABLE_WRITES = "durable_writes";
public static final String KW_REPLICATION = "replication";
public static final String REPLICATION_STRATEGY_CLASS_KEY = "class";
public static final Set<String> keywords = new HashSet<>();
public static final Set<String> obsoleteKeywords = new HashSet<>();
@@ -55,44 +51,44 @@ class ks_prop_defs : public property_definitions {
keywords.add(KW_DURABLE_WRITES);
keywords.add(KW_REPLICATION);
}
private String strategyClass;
public void validate() throws SyntaxException
{
#endif
private:
std::experimental::optional<sstring> _strategy_class;
public:
void validate() {
// Skip validation if the strategy class is already set as it means we've alreayd
// prepared (and redoing it would set strategyClass back to null, which we don't want)
if (strategyClass != null)
if (_strategy_class) {
return;
}
#if 0
validate(keywords, obsoleteKeywords);
Map<String, String> replicationOptions = getReplicationOptions();
if (!replicationOptions.isEmpty())
{
strategyClass = replicationOptions.get(REPLICATION_STRATEGY_CLASS_KEY);
replicationOptions.remove(REPLICATION_STRATEGY_CLASS_KEY);
#endif
auto replication_options = get_replication_options();
if (!replication_options.empty()) {
_strategy_class = replication_options[REPLICATION_STRATEGY_CLASS_KEY];
// FIXME
//replication_options.remove(REPLICATION_STRATEGY_CLASS_KEY);
}
}
public Map<String, String> getReplicationOptions() throws SyntaxException
{
Map<String, String> replicationOptions = getMap(KW_REPLICATION);
if (replicationOptions == null)
return Collections.emptyMap();
return replicationOptions;
std::unordered_map<sstring, sstring> get_replication_options() const {
auto replication_options = get_map(KW_REPLICATION);
if (replication_options) {
return replication_options.value();
}
return std::unordered_map<sstring, sstring>{};
}
public String getReplicationStrategyClass()
{
return strategyClass;
std::experimental::optional<sstring> get_replication_strategy_class() const {
return _strategy_class;
}
public KSMetaData asKSMetadata(String ksName) throws RequestValidationException
{
return KSMetaData.newKeyspace(ksName, getReplicationStrategyClass(), getReplicationOptions(), getBoolean(KW_DURABLE_WRITES, true));
lw_shared_ptr<config::ks_meta_data> as_ks_metadata(sstring ks_name) {
return config::ks_meta_data::new_keyspace(ks_name, get_replication_strategy_class().value(), get_replication_options(), get_boolean(KW_DURABLE_WRITES, true));
}
#if 0
public KSMetaData asKSMetadataUpdate(KSMetaData old) throws RequestValidationException
{
String sClass = strategyClass;

View File

@@ -22,15 +22,17 @@
* Modified by Cloudius Systems
*/
#ifndef CQL3_STATEMENTS_PROPERTY_DEFINITIONS_HH
#define CQL3_STATEMENTS_PROPERTY_DEFINITIONS_HH
#pragma once
#include "exceptions/exceptions.hh"
#include "core/print.hh"
#include "core/sstring.hh"
#include <experimental/optional>
#include <unordered_map>
#include <map>
#include <algorithm>
#include <cctype>
#include <string>
#include <boost/any.hpp>
@@ -38,17 +40,6 @@ namespace cql3 {
namespace statements {
#if 0
package org.apache.cassandra.cql3.statements;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.SyntaxException;
#endif
class property_definitions {
protected:
#if 0
@@ -60,7 +51,7 @@ protected:
: _properties{}
{ }
public:
virtual void add_property(const sstring& name, sstring value) {
void add_property(const sstring& name, sstring value) {
auto it = _properties.find(name);
if (it != _properties.end()) {
throw exceptions::syntax_exception(sprint("Multiple definition for property '%s'", name));
@@ -68,7 +59,7 @@ public:
_properties.emplace(name, value);
}
virtual void add_property(const sstring& name, const std::map<sstring, sstring>& value) {
void add_property(const sstring& name, const std::unordered_map<sstring, sstring>& value) {
auto it = _properties.find(name);
if (it != _properties.end()) {
throw exceptions::syntax_exception(sprint("Multiple definition for property '%s'", name));
@@ -76,110 +67,106 @@ public:
_properties.emplace(name, value);
}
#if 0
public void validate(Set<String> keywords, Set<String> obsolete) throws SyntaxException
{
for (String name : properties.keySet())
{
if (keywords.contains(name))
void validate(std::set<sstring> keywords, std::set<sstring> obsolete) {
for (auto&& kv : _properties) {
auto&& name = kv.first;
if (keywords.count(name)) {
continue;
if (obsolete.contains(name))
}
if (obsolete.count(name)) {
#if 0
logger.warn("Ignoring obsolete property {}", name);
else
throw new SyntaxException(String.format("Unknown property '%s'", name));
#endif
} else {
throw exceptions::syntax_exception(sprint("Unknown property '%s'", name));
}
}
}
protected:
std::experimental::optional<sstring> get_simple(const sstring& name) const {
auto it = _properties.find(name);
if (it == _properties.end()) {
return std::experimental::optional<sstring>{};
}
try {
return boost::any_cast<sstring>(it->second);
} catch (const boost::bad_any_cast& e) {
throw exceptions::syntax_exception(sprint("Invalid value for property '%s'. It should be a string", name));
}
}
protected String getSimple(String name) throws SyntaxException
{
Object val = properties.get(name);
if (val == null)
return null;
if (!(val instanceof String))
throw new SyntaxException(String.format("Invalid value for property '%s'. It should be a string", name));
return (String)val;
std::experimental::optional<std::unordered_map<sstring, sstring>> get_map(const sstring& name) const {
auto it = _properties.find(name);
if (it == _properties.end()) {
return std::experimental::optional<std::unordered_map<sstring, sstring>>{};
}
try {
return boost::any_cast<std::unordered_map<sstring, sstring>>(it->second);
} catch (const boost::bad_any_cast& e) {
throw exceptions::syntax_exception(sprint("Invalid value for property '%s'. It should be a map.", name));
}
}
public:
bool has_property(const sstring& name) const {
return _properties.find(name) != _properties.end();
}
protected Map<String, String> getMap(String name) throws SyntaxException
{
Object val = properties.get(name);
if (val == null)
return null;
if (!(val instanceof Map))
throw new SyntaxException(String.format("Invalid value for property '%s'. It should be a map.", name));
return (Map<String, String>)val;
}
public Boolean hasProperty(String name)
{
return properties.containsKey(name);
}
public String getString(String key, String defaultValue) throws SyntaxException
{
String value = getSimple(key);
return value != null ? value : defaultValue;
sstring get_string(sstring key, sstring default_value) const {
auto value = get_simple(key);
if (value) {
return value.value();
} else {
return default_value;
}
}
// Return a property value, typed as a Boolean
public Boolean getBoolean(String key, Boolean defaultValue) throws SyntaxException
{
String value = getSimple(key);
return (value == null) ? defaultValue : value.toLowerCase().matches("(1|true|yes)");
bool get_boolean(sstring key, bool default_value) const {
auto value = get_simple(key);
if (value) {
std::string s{value.value()};
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
return s == "1" || s == "true" || s == "yes";
} else {
return default_value;
}
}
// Return a property value, typed as a double
public double getDouble(String key, double defaultValue) throws SyntaxException
{
String value = getSimple(key);
if (value == null)
{
return defaultValue;
}
else
{
try
{
return Double.parseDouble(value);
}
catch (NumberFormatException e)
{
throw new SyntaxException(String.format("Invalid double value %s for '%s'", value, key));
double get_double(sstring key, double default_value) const {
auto value = get_simple(key);
if (value) {
auto val = value.value();
try {
return std::stod(val);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sprint("Invalid double value %s for '%s'", val, key));
}
} else {
return default_value;
}
}
// Return a property value, typed as an Integer
public Integer getInt(String key, Integer defaultValue) throws SyntaxException
{
String value = getSimple(key);
return toInt(key, value, defaultValue);
int32_t get_int(sstring key, int32_t default_value) const {
auto value = get_simple(key);
return to_int(key, value, default_value);
}
public static Integer toInt(String key, String value, Integer defaultValue) throws SyntaxException
{
if (value == null)
{
return defaultValue;
}
else
{
try
{
return Integer.valueOf(value);
}
catch (NumberFormatException e)
{
throw new SyntaxException(String.format("Invalid integer value %s for '%s'", value, key));
static int32_t to_int(sstring key, std::experimental::optional<sstring> value, int32_t default_value) {
if (value) {
auto val = value.value();
try {
return std::stoi(val);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sprint("Invalid integer value %s for '%s'", val, key));
}
} else {
return default_value;
}
}
#endif
};
}
}
#endif

View File

@@ -27,6 +27,7 @@
#include <stdexcept>
#include "core/sstring.hh"
#include "core/print.hh"
namespace exceptions {
@@ -106,6 +107,37 @@ public:
{ }
};
class configuration_exception : public request_validation_exception {
public:
configuration_exception(sstring msg)
: request_validation_exception{exception_code::CONFIG_ERROR, std::move(msg)}
{ }
configuration_exception(exception_code code, sstring msg)
: request_validation_exception{code, std::move(msg)}
{ }
};
class already_exists_exception : public configuration_exception {
public:
const sstring ks_name;
const sstring cf_name;
private:
already_exists_exception(sstring ks_name_, sstring cf_name_, sstring msg)
: configuration_exception{exception_code::ALREADY_EXISTS, msg}
, ks_name{ks_name_}
, cf_name{cf_name_}
{ }
public:
already_exists_exception(sstring ks_name_, sstring cf_name_)
: already_exists_exception{ks_name_, cf_name_, sprint("Cannot add already existing table \"%s\" to keyspace \"%s\"", cf_name_, ks_name_)}
{ }
already_exists_exception(sstring ks_name_)
: already_exists_exception{ks_name_, "", sprint("Cannot add existing keyspace \"%s\"", ks_name_)}
{ }
};
class recognition_exception : public std::runtime_error {
public:
recognition_exception(const std::string& msg) : std::runtime_error(msg) {};
@@ -117,5 +149,4 @@ public:
};
}
#endif

View File

@@ -0,0 +1,564 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "config/ks_meta_data.hh"
#if 0
package org.apache.cassandra.service;
import java.io.DataInput;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.LegacySchemaTables;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
#endif
namespace service {
class migration_manager {
#if 0
private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
public static final MigrationManager instance = new MigrationManager();
private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
public static final int MIGRATION_DELAY_IN_MS = 60000;
private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>();
private MigrationManager() {}
public void register(IMigrationListener listener)
{
listeners.add(listener);
}
public void unregister(IMigrationListener listener)
{
listeners.remove(listener);
}
public void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
}
/**
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
}
if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
}
else
{
// Include a delay to make sure we have a chance to apply any changes being
// pushed out simultaneously. See CASSANDRA-5025
Runnable runnable = new Runnable()
{
public void run()
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (epState == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
UUID currentVersion = UUID.fromString(value.value);
if (Schema.instance.getVersion().equals(currentVersion))
{
logger.debug("not submitting migration task for {} because our versions match", endpoint);
return;
}
logger.debug("submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
}
};
ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
}
private static Future<?> submitMigrationTask(InetAddress endpoint)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
}
private static boolean shouldPullSchemaFrom(InetAddress endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint);
}
#endif
public:
#if 0
public static boolean isReadyForBootstrap()
{
return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
}
public void notifyCreateKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
listener.onCreateKeyspace(ksm.name);
}
public void notifyCreateColumnFamily(CFMetaData cfm)
{
for (IMigrationListener listener : listeners)
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyCreateUserType(UserType ut)
{
for (IMigrationListener listener : listeners)
listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyCreateFunction(UDFunction udf)
{
for (IMigrationListener listener : listeners)
listener.onCreateFunction(udf.name().keyspace, udf.name().name);
}
public void notifyCreateAggregate(UDAggregate udf)
{
for (IMigrationListener listener : listeners)
listener.onCreateAggregate(udf.name().keyspace, udf.name().name);
}
public void notifyUpdateKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
listener.onUpdateKeyspace(ksm.name);
}
public void notifyUpdateColumnFamily(CFMetaData cfm)
{
for (IMigrationListener listener : listeners)
listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyUpdateUserType(UserType ut)
{
for (IMigrationListener listener : listeners)
listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyUpdateFunction(UDFunction udf)
{
for (IMigrationListener listener : listeners)
listener.onUpdateFunction(udf.name().keyspace, udf.name().name);
}
public void notifyUpdateAggregate(UDAggregate udf)
{
for (IMigrationListener listener : listeners)
listener.onUpdateAggregate(udf.name().keyspace, udf.name().name);
}
public void notifyDropKeyspace(KSMetaData ksm)
{
for (IMigrationListener listener : listeners)
listener.onDropKeyspace(ksm.name);
}
public void notifyDropColumnFamily(CFMetaData cfm)
{
for (IMigrationListener listener : listeners)
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
public void notifyDropUserType(UserType ut)
{
for (IMigrationListener listener : listeners)
listener.onDropUserType(ut.keyspace, ut.getNameAsString());
}
public void notifyDropFunction(UDFunction udf)
{
for (IMigrationListener listener : listeners)
listener.onDropFunction(udf.name().keyspace, udf.name().name);
}
public void notifyDropAggregate(UDAggregate udf)
{
for (IMigrationListener listener : listeners)
listener.onDropAggregate(udf.name().keyspace, udf.name().name);
}
public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
{
announceNewKeyspace(ksm, false);
}
#endif
static void announce_new_keyspace(lw_shared_ptr<config::ks_meta_data> ksm, bool announce_locally) {
warn(unimplemented::cause::MIGRATIONS);
#if 0
announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally);
#endif
}
#if 0
public static void announceNewKeyspace(KSMetaData ksm, long timestamp, boolean announceLocally) throws ConfigurationException
{
ksm.validate();
if (Schema.instance.getKSMetaData(ksm.name) != null)
throw new AlreadyExistsException(ksm.name);
logger.info(String.format("Create new Keyspace: %s", ksm));
announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally);
}
public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException
{
announceNewColumnFamily(cfm, false);
}
public static void announceNewColumnFamily(CFMetaData cfm, boolean announceLocally) throws ConfigurationException
{
cfm.validate();
KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
if (ksm == null)
throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName));
else if (ksm.cfMetaData().containsKey(cfm.cfName))
throw new AlreadyExistsException(cfm.ksName, cfm.cfName);
logger.info(String.format("Create new table: %s", cfm));
announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceNewType(UserType newType, boolean announceLocally)
{
KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace);
announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceNewFunction(UDFunction udf, boolean announceLocally)
{
logger.info(String.format("Create scalar function '%s'", udf.name()));
KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceNewAggregate(UDAggregate udf, boolean announceLocally)
{
logger.info(String.format("Create aggregate function '%s'", udf.name()));
KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException
{
announceKeyspaceUpdate(ksm, false);
}
public static void announceKeyspaceUpdate(KSMetaData ksm, boolean announceLocally) throws ConfigurationException
{
ksm.validate();
KSMetaData oldKsm = Schema.instance.getKSMetaData(ksm.name);
if (oldKsm == null)
throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name));
logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm));
announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException
{
announceColumnFamilyUpdate(cfm, fromThrift, false);
}
public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift, boolean announceLocally) throws ConfigurationException
{
cfm.validate();
CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName);
if (oldCfm == null)
throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName));
KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName);
oldCfm.validateCompatility(cfm);
logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm));
announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally);
}
public static void announceTypeUpdate(UserType updatedType, boolean announceLocally)
{
announceNewType(updatedType, announceLocally);
}
public static void announceKeyspaceDrop(String ksName) throws ConfigurationException
{
announceKeyspaceDrop(ksName, false);
}
public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException
{
KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName);
if (oldKsm == null)
throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName));
logger.info(String.format("Drop Keyspace '%s'", oldKsm.name));
announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException
{
announceColumnFamilyDrop(ksName, cfName, false);
}
public static void announceColumnFamilyDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException
{
CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName);
if (oldCfm == null)
throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName));
KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName));
announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceTypeDrop(UserType droppedType)
{
announceTypeDrop(droppedType, false);
}
public static void announceTypeDrop(UserType droppedType, boolean announceLocally)
{
KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace);
announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceFunctionDrop(UDFunction udf, boolean announceLocally)
{
logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes()));
KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
}
public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally)
{
logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes()));
KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace);
announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally);
}
/**
* actively announce a new version to active hosts via rpc
* @param schema The schema mutation to be applied
*/
private static void announce(Mutation schema, boolean announceLocally)
{
if (announceLocally)
{
try
{
LegacySchemaTables.mergeSchema(Collections.singletonList(schema), false);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
else
{
FBUtilities.waitOnFuture(announce(Collections.singletonList(schema)));
}
}
private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
{
MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
schema,
MigrationsSerializer.instance);
MessagingService.instance().sendOneWay(msg, endpoint);
}
// Returns a future on the local application of the schema
private static Future<?> announce(final Collection<Mutation> schema)
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
protected void runMayThrow() throws IOException, ConfigurationException
{
LegacySchemaTables.mergeSchema(schema);
}
});
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
// only push schema to nodes with known and equal versions
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
MessagingService.instance().knowsVersion(endpoint) &&
MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
pushSchemaMutation(endpoint, schema);
}
return f;
}
/**
* Announce my version passively over gossip.
* Used to notify nodes as they arrive in the cluster.
*
* @param version The schema version to announce
*/
public static void passiveAnnounce(UUID version)
{
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
logger.debug("Gossiping my schema version {}", version);
}
/**
* Clear all locally stored schema information and reset schema to initial state.
* Called by user (via JMX) who wants to get rid of schema disagreement.
*
* @throws IOException if schema tables truncation fails
*/
public static void resetLocalSchema() throws IOException
{
logger.info("Starting local schema reset...");
logger.debug("Truncating schema tables...");
LegacySchemaTables.truncateSchemaTables();
logger.debug("Clearing local schema keyspace definitions...");
Schema.instance.clear();
Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
// force migration if there are nodes around
for (InetAddress node : liveEndpoints)
{
if (shouldPullSchemaFrom(node))
{
logger.debug("Requesting schema from {}", node);
FBUtilities.waitOnFuture(submitMigrationTask(node));
break;
}
}
logger.info("Local schema reset is complete.");
}
public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>>
{
public static MigrationsSerializer instance = new MigrationsSerializer();
public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException
{
out.writeInt(schema.size());
for (Mutation mutation : schema)
Mutation.serializer.serialize(mutation, out, version);
}
public Collection<Mutation> deserialize(DataInput in, int version) throws IOException
{
int count = in.readInt();
Collection<Mutation> schema = new ArrayList<>(count);
for (int i = 0; i < count; i++)
schema.add(Mutation.serializer.deserialize(in, version));
return schema;
}
public long serializedSize(Collection<Mutation> schema, int version)
{
int size = TypeSizes.NATIVE.sizeof(schema.size());
for (Mutation mutation : schema)
size += Mutation.serializer.serializedSize(mutation, version);
return size;
}
}
#endif
};
}

View File

@@ -72,6 +72,17 @@ static future<> require_column_has_value(distributed<database>& ddb, const sstri
});
}
SEASTAR_TEST_CASE(test_create_keyspace_statement) {
auto db = make_shared<distributed<database>>();
auto state = make_shared<conversation_state>(*db, ks_name);
return db->start().then([state] {
return state->execute_cql("create keyspace ks with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").discard_result();
}).finally([db] {
return db->stop().finally([db] {});
});
}
SEASTAR_TEST_CASE(test_insert_statement) {
auto db = make_shared<distributed<database>>();
auto state = make_shared<conversation_state>(*db, ks_name);

View File

@@ -26,6 +26,8 @@
#include "core/sstring.hh"
#include <experimental/optional>
namespace transport {
class event {
@@ -221,9 +223,9 @@ public:
const change_type change;
const target_type target;
const sstring keyspace;
const sstring table_or_type_or_function;
const std::experimental::optional<sstring> table_or_type_or_function;
schema_change(const change_type change_, const target_type target_, const sstring& keyspace_, const sstring& table_or_type_or_function_)
schema_change(const change_type change_, const target_type target_, const sstring& keyspace_, const std::experimental::optional<sstring>& table_or_type_or_function_)
: event{event_type::SCHEMA_CHANGE}
, change{change_}
, target{target_}
@@ -236,12 +238,10 @@ public:
#endif
}
schema_change(const change_type change_, const sstring keyspace_)
: schema_change{change_, target_type::KEYSPACE, keyspace_, std::experimental::optional<sstring>{}}
{ }
#if 0
public SchemaChange(Change change, String keyspace)
{
this(change, Target.KEYSPACE, keyspace, null);
}
// Assumes the type has already been deserialized
public static SchemaChange deserializeEvent(ByteBuf cb, int version)
{

View File

@@ -499,7 +499,7 @@ public:
_response->write_string(to_string(sc->target));
_response->write_string(sc->keyspace);
if (sc->target != transport::event::schema_change::target_type::KEYSPACE) {
_response->write_string(sc->table_or_type_or_function);
_response->write_string(sc->table_or_type_or_function.value());
}
break;
}

View File

@@ -22,6 +22,7 @@ std::ostream& operator<<(std::ostream& out, cause c) {
case cause::COLLECTIONS: return out << "COLLECTIONS";
case cause::COUNTERS: return out << "COUNTERS";
case cause::METRICS: return out << "METRICS";
case cause::MIGRATIONS: return out << "MIGRATIONS";
case cause::COMPACT_TABLES: return out << "COMPACT_TABLES";
case cause::GOSSIP: return out << "GOSSIP";
case cause::TOKEN_RESTRICTION: return out << "TOKEN_RESTRICTION";

View File

@@ -21,6 +21,7 @@ enum class cause {
COLLECTIONS,
COUNTERS,
METRICS,
MIGRATIONS,
COMPACT_TABLES,
GOSSIP,
TOKEN_RESTRICTION,