From a372a9170b7cafe9e86bfdc567db0cafcc6dc0fa Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 15:29:45 +0200 Subject: [PATCH 01/14] config: Import UTMetaData.java Signed-off-by: Pekka Enberg --- config/UTMetaData.java | 76 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 config/UTMetaData.java diff --git a/config/UTMetaData.java b/config/UTMetaData.java new file mode 100644 index 0000000000..08cedee004 --- /dev/null +++ b/config/UTMetaData.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.db.marshal.*; + +/** + * Defined (and loaded) user types. + * + * In practice, because user types are global, we have only one instance of + * this class that retrieve through the Schema class. + */ +public final class UTMetaData +{ + private final Map userTypes; + + public UTMetaData() + { + this(new HashMap()); + } + + public UTMetaData(Map types) + { + this.userTypes = types; + } + + public UserType getType(ByteBuffer typeName) + { + return userTypes.get(typeName); + } + + public Map getAllTypes() + { + // Copy to avoid concurrent modification while iterating. Not intended to be called on a critical path anyway + return new HashMap<>(userTypes); + } + + // This is *not* thread safe but is only called in Schema that is synchronized. + public void addType(UserType type) + { + UserType old = userTypes.get(type.name); + assert old == null || type.isCompatibleWith(old); + userTypes.put(type.name, type); + } + + // Same remarks than for addType + public void removeType(UserType type) + { + userTypes.remove(type.name); + } + + public boolean equals(Object that) + { + if (!(that instanceof UTMetaData)) + return false; + return userTypes.equals(((UTMetaData) that).userTypes); + } +} From df5fe24ed7817c93f2b9b0cd017370172b84edeb Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 15:30:03 +0200 Subject: [PATCH 02/14] config: Convert UTMetaData to C++ Signed-off-by: Pekka Enberg --- config/{UTMetaData.java => ut_meta_data.hh} | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) rename config/{UTMetaData.java => ut_meta_data.hh} (93%) diff --git a/config/UTMetaData.java b/config/ut_meta_data.hh similarity index 93% rename from config/UTMetaData.java rename to config/ut_meta_data.hh index 08cedee004..d62df9b386 100644 --- a/config/UTMetaData.java +++ b/config/ut_meta_data.hh @@ -15,12 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.config; -import java.nio.ByteBuffer; -import java.util.*; +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ -import org.apache.cassandra.db.marshal.*; +#pragma once + +namespace config { /** * Defined (and loaded) user types. @@ -28,8 +32,8 @@ import org.apache.cassandra.db.marshal.*; * In practice, because user types are global, we have only one instance of * this class that retrieve through the Schema class. */ -public final class UTMetaData -{ +class ut_meta_data final { +#if 0 private final Map userTypes; public UTMetaData() @@ -73,4 +77,7 @@ public final class UTMetaData return false; return userTypes.equals(((UTMetaData) that).userTypes); } +#endif +}; + } From a5aefb5ef26e28a76b900ced5ad928699f18ced4 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:53:26 +0200 Subject: [PATCH 03/14] config: Import KSMetaData.java Signed-off-by: Pekka Enberg --- config/KSMetaData.java | 184 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 config/KSMetaData.java diff --git a/config/KSMetaData.java b/config/KSMetaData.java new file mode 100644 index 0000000000..1537aae803 --- /dev/null +++ b/config/KSMetaData.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import java.util.*; + +import com.google.common.base.Objects; + +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.service.StorageService; + +public final class KSMetaData +{ + public final String name; + public final Class strategyClass; + public final Map strategyOptions; + private final Map cfMetaData; + public final boolean durableWrites; + + public final UTMetaData userTypes; + + public KSMetaData(String name, + Class strategyClass, + Map strategyOptions, + boolean durableWrites) + { + this(name, strategyClass, strategyOptions, durableWrites, Collections.emptyList(), new UTMetaData()); + } + + public KSMetaData(String name, + Class strategyClass, + Map strategyOptions, + boolean durableWrites, + Iterable cfDefs) + { + this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData()); + } + + private KSMetaData(String name, + Class strategyClass, + Map strategyOptions, + boolean durableWrites, + Iterable cfDefs, + UTMetaData userTypes) + { + this.name = name; + this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass; + this.strategyOptions = strategyOptions; + Map cfmap = new HashMap<>(); + for (CFMetaData cfm : cfDefs) + cfmap.put(cfm.cfName, cfm); + this.cfMetaData = Collections.unmodifiableMap(cfmap); + this.durableWrites = durableWrites; + this.userTypes = userTypes; + } + + // For new user created keyspaces (through CQL) + public static KSMetaData newKeyspace(String name, String strategyName, Map options, boolean durableWrites) throws ConfigurationException + { + Class cls = AbstractReplicationStrategy.getClass(strategyName); + if (cls.equals(LocalStrategy.class)) + throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use."); + + return newKeyspace(name, cls, options, durableWrites, Collections.emptyList()); + } + + public static KSMetaData newKeyspace(String name, Class strategyClass, Map options, boolean durablesWrites, Iterable cfDefs) + { + return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData()); + } + + public KSMetaData cloneWithTableRemoved(CFMetaData table) + { + // clone ksm but do not include the new table + List newTables = new ArrayList<>(cfMetaData().values()); + newTables.remove(table); + assert newTables.size() == cfMetaData().size() - 1; + return cloneWith(newTables, userTypes); + } + + public KSMetaData cloneWithTableAdded(CFMetaData table) + { + // clone ksm but include the new table + List newTables = new ArrayList<>(cfMetaData().values()); + newTables.add(table); + assert newTables.size() == cfMetaData().size() + 1; + return cloneWith(newTables, userTypes); + } + + public KSMetaData cloneWith(Iterable tables, UTMetaData types) + { + return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types); + } + + public static KSMetaData testMetadata(String name, Class strategyClass, Map strategyOptions, CFMetaData... cfDefs) + { + return new KSMetaData(name, strategyClass, strategyOptions, true, Arrays.asList(cfDefs)); + } + + public static KSMetaData testMetadataNotDurable(String name, Class strategyClass, Map strategyOptions, CFMetaData... cfDefs) + { + return new KSMetaData(name, strategyClass, strategyOptions, false, Arrays.asList(cfDefs)); + } + + @Override + public int hashCode() + { + return Objects.hashCode(name, strategyClass, strategyOptions, cfMetaData, durableWrites, userTypes); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof KSMetaData)) + return false; + + KSMetaData other = (KSMetaData) o; + + return Objects.equal(name, other.name) + && Objects.equal(strategyClass, other.strategyClass) + && Objects.equal(strategyOptions, other.strategyOptions) + && Objects.equal(cfMetaData, other.cfMetaData) + && Objects.equal(durableWrites, other.durableWrites) + && Objects.equal(userTypes, other.userTypes); + } + + public Map cfMetaData() + { + return cfMetaData; + } + + @Override + public String toString() + { + return Objects.toStringHelper(this) + .add("name", name) + .add("strategyClass", strategyClass.getSimpleName()) + .add("strategyOptions", strategyOptions) + .add("cfMetaData", cfMetaData) + .add("durableWrites", durableWrites) + .add("userTypes", userTypes) + .toString(); + } + + public static Map optsWithRF(final Integer rf) + { + return Collections.singletonMap("replication_factor", rf.toString()); + } + + public KSMetaData validate() throws ConfigurationException + { + if (!CFMetaData.isNameValid(name)) + throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", Schema.NAME_LENGTH, name)); + + // Attempt to instantiate the ARS, which will throw a ConfigException if the strategy_options aren't fully formed + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch(); + AbstractReplicationStrategy.validateReplicationStrategy(name, strategyClass, tmd, eps, strategyOptions); + + for (CFMetaData cfm : cfMetaData.values()) + cfm.validate(); + + return this; + } +} From 75a35956916070122c316563ce03193f92658106 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 09:37:10 +0200 Subject: [PATCH 04/14] config: Convert KSMetaData to C++ Signed-off-by: Pekka Enberg --- config/{KSMetaData.java => ks_meta_data.hh} | 55 +++++++++++++-------- 1 file changed, 34 insertions(+), 21 deletions(-) rename config/{KSMetaData.java => ks_meta_data.hh} (84%) diff --git a/config/KSMetaData.java b/config/ks_meta_data.hh similarity index 84% rename from config/KSMetaData.java rename to config/ks_meta_data.hh index 1537aae803..367b0a9eb5 100644 --- a/config/KSMetaData.java +++ b/config/ks_meta_data.hh @@ -15,18 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.config; -import java.util.*; +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ -import com.google.common.base.Objects; +#pragma once -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.service.StorageService; +#include "config/ut_meta_data.hh" +#include "schema.hh" -public final class KSMetaData -{ +#include "core/shared_ptr.hh" + +namespace config { + +class ks_meta_data final { +public: +#if 0 public final String name; public final Class strategyClass; public final Map strategyOptions; @@ -51,14 +58,15 @@ public final class KSMetaData { this(name, strategyClass, strategyOptions, durableWrites, cfDefs, new UTMetaData()); } - - private KSMetaData(String name, - Class strategyClass, - Map strategyOptions, - boolean durableWrites, - Iterable cfDefs, - UTMetaData userTypes) +#endif + ks_meta_data(sstring name, + sstring strategy_name, + std::unordered_map strategy_options, + bool durable_writes, + std::vector cf_defs, + shared_ptr user_types) { +#if 0 this.name = name; this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass; this.strategyOptions = strategyOptions; @@ -68,23 +76,25 @@ public final class KSMetaData this.cfMetaData = Collections.unmodifiableMap(cfmap); this.durableWrites = durableWrites; this.userTypes = userTypes; +#endif } // For new user created keyspaces (through CQL) - public static KSMetaData newKeyspace(String name, String strategyName, Map options, boolean durableWrites) throws ConfigurationException - { + static lw_shared_ptr new_keyspace(sstring name, sstring strategy_name, std::unordered_map options, bool durable_writes) { +#if 0 Class cls = AbstractReplicationStrategy.getClass(strategyName); if (cls.equals(LocalStrategy.class)) throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use."); - - return newKeyspace(name, cls, options, durableWrites, Collections.emptyList()); +#endif + return new_keyspace(name, strategy_name, options, durable_writes, std::vector{}); } - public static KSMetaData newKeyspace(String name, Class strategyClass, Map options, boolean durablesWrites, Iterable cfDefs) + static lw_shared_ptr new_keyspace(sstring name, sstring strategy_name, std::unordered_map options, bool durables_writes, std::vector cf_defs) { - return new KSMetaData(name, strategyClass, options, durablesWrites, cfDefs, new UTMetaData()); + return ::make_lw_shared(name, strategy_name, options, durables_writes, cf_defs, ::make_shared()); } +#if 0 public KSMetaData cloneWithTableRemoved(CFMetaData table) { // clone ksm but do not include the new table @@ -181,4 +191,7 @@ public final class KSMetaData return this; } +#endif +}; + } From 57e6c63b7a5709e9b70031e66ddcd40e641643da Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 11:09:37 +0200 Subject: [PATCH 05/14] cql3: Convert statements.PropertyDefinitions to C++, take 3 Signed-off-by: Pekka Enberg --- cql3/statements/property_definitions.hh | 177 +++++++++++------------- 1 file changed, 82 insertions(+), 95 deletions(-) diff --git a/cql3/statements/property_definitions.hh b/cql3/statements/property_definitions.hh index a9e57b091c..5117adb9f5 100644 --- a/cql3/statements/property_definitions.hh +++ b/cql3/statements/property_definitions.hh @@ -22,15 +22,17 @@ * Modified by Cloudius Systems */ -#ifndef CQL3_STATEMENTS_PROPERTY_DEFINITIONS_HH -#define CQL3_STATEMENTS_PROPERTY_DEFINITIONS_HH +#pragma once #include "exceptions/exceptions.hh" #include "core/print.hh" #include "core/sstring.hh" +#include #include -#include +#include +#include +#include #include @@ -38,17 +40,6 @@ namespace cql3 { namespace statements { -#if 0 -package org.apache.cassandra.cql3.statements; - -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.exceptions.SyntaxException; -#endif - class property_definitions { protected: #if 0 @@ -60,7 +51,7 @@ protected: : _properties{} { } public: - virtual void add_property(const sstring& name, sstring value) { + void add_property(const sstring& name, sstring value) { auto it = _properties.find(name); if (it != _properties.end()) { throw exceptions::syntax_exception(sprint("Multiple definition for property '%s'", name)); @@ -68,7 +59,7 @@ public: _properties.emplace(name, value); } - virtual void add_property(const sstring& name, const std::map& value) { + void add_property(const sstring& name, const std::unordered_map& value) { auto it = _properties.find(name); if (it != _properties.end()) { throw exceptions::syntax_exception(sprint("Multiple definition for property '%s'", name)); @@ -76,110 +67,106 @@ public: _properties.emplace(name, value); } -#if 0 - public void validate(Set keywords, Set obsolete) throws SyntaxException - { - for (String name : properties.keySet()) - { - if (keywords.contains(name)) + void validate(std::set keywords, std::set obsolete) { + for (auto&& kv : _properties) { + auto&& name = kv.first; + if (keywords.count(name)) { continue; - - if (obsolete.contains(name)) + } + if (obsolete.count(name)) { +#if 0 logger.warn("Ignoring obsolete property {}", name); - else - throw new SyntaxException(String.format("Unknown property '%s'", name)); +#endif + } else { + throw exceptions::syntax_exception(sprint("Unknown property '%s'", name)); + } + } + } +protected: + std::experimental::optional get_simple(const sstring& name) const { + auto it = _properties.find(name); + if (it == _properties.end()) { + return std::experimental::optional{}; + } + try { + return boost::any_cast(it->second); + } catch (const boost::bad_any_cast& e) { + throw exceptions::syntax_exception(sprint("Invalid value for property '%s'. It should be a string", name)); } } - protected String getSimple(String name) throws SyntaxException - { - Object val = properties.get(name); - if (val == null) - return null; - if (!(val instanceof String)) - throw new SyntaxException(String.format("Invalid value for property '%s'. It should be a string", name)); - return (String)val; + std::experimental::optional> get_map(const sstring& name) const { + auto it = _properties.find(name); + if (it == _properties.end()) { + return std::experimental::optional>{}; + } + try { + return boost::any_cast>(it->second); + } catch (const boost::bad_any_cast& e) { + throw exceptions::syntax_exception(sprint("Invalid value for property '%s'. It should be a map.", name)); + } + } +public: + bool has_property(const sstring& name) const { + return _properties.find(name) != _properties.end(); } - protected Map getMap(String name) throws SyntaxException - { - Object val = properties.get(name); - if (val == null) - return null; - if (!(val instanceof Map)) - throw new SyntaxException(String.format("Invalid value for property '%s'. It should be a map.", name)); - return (Map)val; - } - - public Boolean hasProperty(String name) - { - return properties.containsKey(name); - } - - public String getString(String key, String defaultValue) throws SyntaxException - { - String value = getSimple(key); - return value != null ? value : defaultValue; + sstring get_string(sstring key, sstring default_value) const { + auto value = get_simple(key); + if (value) { + return value.value(); + } else { + return default_value; + } } // Return a property value, typed as a Boolean - public Boolean getBoolean(String key, Boolean defaultValue) throws SyntaxException - { - String value = getSimple(key); - return (value == null) ? defaultValue : value.toLowerCase().matches("(1|true|yes)"); + bool get_boolean(sstring key, bool default_value) const { + auto value = get_simple(key); + if (value) { + std::string s{value.value()}; + std::transform(s.begin(), s.end(), s.begin(), ::tolower); + return s == "1" || s == "true" || s == "yes"; + } else { + return default_value; + } } // Return a property value, typed as a double - public double getDouble(String key, double defaultValue) throws SyntaxException - { - String value = getSimple(key); - if (value == null) - { - return defaultValue; - } - else - { - try - { - return Double.parseDouble(value); - } - catch (NumberFormatException e) - { - throw new SyntaxException(String.format("Invalid double value %s for '%s'", value, key)); + double get_double(sstring key, double default_value) const { + auto value = get_simple(key); + if (value) { + auto val = value.value(); + try { + return std::stod(val); + } catch (const std::exception& e) { + throw exceptions::syntax_exception(sprint("Invalid double value %s for '%s'", val, key)); } + } else { + return default_value; } } // Return a property value, typed as an Integer - public Integer getInt(String key, Integer defaultValue) throws SyntaxException - { - String value = getSimple(key); - return toInt(key, value, defaultValue); + int32_t get_int(sstring key, int32_t default_value) const { + auto value = get_simple(key); + return to_int(key, value, default_value); } - public static Integer toInt(String key, String value, Integer defaultValue) throws SyntaxException - { - if (value == null) - { - return defaultValue; - } - else - { - try - { - return Integer.valueOf(value); - } - catch (NumberFormatException e) - { - throw new SyntaxException(String.format("Invalid integer value %s for '%s'", value, key)); + static int32_t to_int(sstring key, std::experimental::optional value, int32_t default_value) { + if (value) { + auto val = value.value(); + try { + return std::stoi(val); + } catch (const std::exception& e) { + throw exceptions::syntax_exception(sprint("Invalid integer value %s for '%s'", val, key)); } + } else { + return default_value; } } -#endif }; } } - -#endif From 6f15c18f0ad1db7d3c6a0242f25d3e9a8cab4e5e Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 09:40:52 +0200 Subject: [PATCH 06/14] cql3: Convert KSPropDefs to C++, take #2 Signed-off-by: Pekka Enberg --- cql3/statements/ks_prop_defs.hh | 74 ++++++++++++++++----------------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/cql3/statements/ks_prop_defs.hh b/cql3/statements/ks_prop_defs.hh index 4806368053..3d6ba6b9e3 100644 --- a/cql3/statements/ks_prop_defs.hh +++ b/cql3/statements/ks_prop_defs.hh @@ -26,27 +26,23 @@ #define CQL3_STATEMENTS_KS_PROP_DEFS_HH #include "cql3/statements/property_definitions.hh" +#include "config/ks_meta_data.hh" +#include "core/sstring.hh" + +#include namespace cql3 { namespace statements { -#if 0 -package org.apache.cassandra.cql3.statements; - -import java.util.*; - -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.exceptions.*; -#endif - class ks_prop_defs : public property_definitions { +public: + static constexpr auto KW_DURABLE_WRITES = "durable_writes"; + static constexpr auto KW_REPLICATION = "replication"; + + static constexpr auto REPLICATION_STRATEGY_CLASS_KEY = "class"; + #if 0 - public static final String KW_DURABLE_WRITES = "durable_writes"; - public static final String KW_REPLICATION = "replication"; - - public static final String REPLICATION_STRATEGY_CLASS_KEY = "class"; - public static final Set keywords = new HashSet<>(); public static final Set obsoleteKeywords = new HashSet<>(); @@ -55,44 +51,44 @@ class ks_prop_defs : public property_definitions { keywords.add(KW_DURABLE_WRITES); keywords.add(KW_REPLICATION); } - - private String strategyClass; - - public void validate() throws SyntaxException - { +#endif +private: + std::experimental::optional _strategy_class; +public: + void validate() { // Skip validation if the strategy class is already set as it means we've alreayd // prepared (and redoing it would set strategyClass back to null, which we don't want) - if (strategyClass != null) + if (_strategy_class) { return; - + } +#if 0 validate(keywords, obsoleteKeywords); - - Map replicationOptions = getReplicationOptions(); - if (!replicationOptions.isEmpty()) - { - strategyClass = replicationOptions.get(REPLICATION_STRATEGY_CLASS_KEY); - replicationOptions.remove(REPLICATION_STRATEGY_CLASS_KEY); +#endif + auto replication_options = get_replication_options(); + if (!replication_options.empty()) { + _strategy_class = replication_options[REPLICATION_STRATEGY_CLASS_KEY]; + // FIXME + //replication_options.remove(REPLICATION_STRATEGY_CLASS_KEY); } } - public Map getReplicationOptions() throws SyntaxException - { - Map replicationOptions = getMap(KW_REPLICATION); - if (replicationOptions == null) - return Collections.emptyMap(); - return replicationOptions; + std::unordered_map get_replication_options() const { + auto replication_options = get_map(KW_REPLICATION); + if (replication_options) { + return replication_options.value(); + } + return std::unordered_map{}; } - public String getReplicationStrategyClass() - { - return strategyClass; + std::experimental::optional get_replication_strategy_class() const { + return _strategy_class; } - public KSMetaData asKSMetadata(String ksName) throws RequestValidationException - { - return KSMetaData.newKeyspace(ksName, getReplicationStrategyClass(), getReplicationOptions(), getBoolean(KW_DURABLE_WRITES, true)); + lw_shared_ptr as_ks_metadata(sstring ks_name) { + return config::ks_meta_data::new_keyspace(ks_name, get_replication_strategy_class().value(), get_replication_options(), get_boolean(KW_DURABLE_WRITES, true)); } +#if 0 public KSMetaData asKSMetadataUpdate(KSMetaData old) throws RequestValidationException { String sClass = strategyClass; From 2bff7888b66a4fd058114d625f68950c28c9dcf8 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:50:15 +0200 Subject: [PATCH 07/14] unimplemented: Add MIGRATIONS Signed-off-by: Pekka Enberg --- unimplemented.cc | 1 + unimplemented.hh | 1 + 2 files changed, 2 insertions(+) diff --git a/unimplemented.cc b/unimplemented.cc index e0da830cee..de091d012c 100644 --- a/unimplemented.cc +++ b/unimplemented.cc @@ -22,6 +22,7 @@ std::ostream& operator<<(std::ostream& out, cause c) { case cause::COLLECTIONS: return out << "COLLECTIONS"; case cause::COUNTERS: return out << "COUNTERS"; case cause::METRICS: return out << "METRICS"; + case cause::MIGRATIONS: return out << "MIGRATIONS"; case cause::COMPACT_TABLES: return out << "COMPACT_TABLES"; case cause::GOSSIP: return out << "GOSSIP"; case cause::TOKEN_RESTRICTION: return out << "TOKEN_RESTRICTION"; diff --git a/unimplemented.hh b/unimplemented.hh index 2b83a9d5e7..54bb552755 100644 --- a/unimplemented.hh +++ b/unimplemented.hh @@ -21,6 +21,7 @@ enum class cause { COLLECTIONS, COUNTERS, METRICS, + MIGRATIONS, COMPACT_TABLES, GOSSIP, TOKEN_RESTRICTION, From 5bdf0e4429baae91fcd9e003140305f6aae6a1fd Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 6 Mar 2015 10:36:22 +0200 Subject: [PATCH 08/14] service: Import MigrationManager.java Signed-off-by: Pekka Enberg --- service/MigrationManager.java | 540 ++++++++++++++++++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 service/MigrationManager.java diff --git a/service/MigrationManager.java b/service/MigrationManager.java new file mode 100644 index 0000000000..45162355ed --- /dev/null +++ b/service/MigrationManager.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.io.DataInput; +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; + +public class MigrationManager +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); + + public static final MigrationManager instance = new MigrationManager(); + + private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + + public static final int MIGRATION_DELAY_IN_MS = 60000; + + private final List listeners = new CopyOnWriteArrayList<>(); + + private MigrationManager() {} + + public void register(IMigrationListener listener) + { + listeners.add(listener); + } + + public void unregister(IMigrationListener listener) + { + listeners.remove(listener); + } + + public void scheduleSchemaPull(InetAddress endpoint, EndpointState state) + { + VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); + + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) + maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); + } + + /** + * If versions differ this node sends request with local migration list to the endpoint + * and expecting to receive a list of migrations to apply locally. + */ + private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) + { + if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) + { + logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); + return; + } + + if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + { + // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately + logger.debug("Submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + } + else + { + // Include a delay to make sure we have a chance to apply any changes being + // pushed out simultaneously. See CASSANDRA-5025 + Runnable runnable = new Runnable() + { + public void run() + { + // grab the latest version of the schema since it may have changed again since the initial scheduling + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epState == null) + { + logger.debug("epState vanished for {}, not submitting migration task", endpoint); + return; + } + VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); + UUID currentVersion = UUID.fromString(value.value); + if (Schema.instance.getVersion().equals(currentVersion)) + { + logger.debug("not submitting migration task for {} because our versions match", endpoint); + return; + } + logger.debug("submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + } + }; + ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + } + } + + private static Future submitMigrationTask(InetAddress endpoint) + { + /* + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + } + + private static boolean shouldPullSchemaFrom(InetAddress endpoint) + { + /* + * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) + * Don't request schema from fat clients + */ + return MessagingService.instance().knowsVersion(endpoint) + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version + && !Gossiper.instance.isGossipOnlyMember(endpoint); + } + + public static boolean isReadyForBootstrap() + { + return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; + } + + public void notifyCreateKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onCreateKeyspace(ksm.name); + } + + public void notifyCreateColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); + } + + public void notifyCreateUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); + } + + public void notifyCreateFunction(UDFunction udf) + { + for (IMigrationListener listener : listeners) + listener.onCreateFunction(udf.name().keyspace, udf.name().name); + } + + public void notifyCreateAggregate(UDAggregate udf) + { + for (IMigrationListener listener : listeners) + listener.onCreateAggregate(udf.name().keyspace, udf.name().name); + } + + public void notifyUpdateKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onUpdateKeyspace(ksm.name); + } + + public void notifyUpdateColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); + } + + public void notifyUpdateUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); + } + + public void notifyUpdateFunction(UDFunction udf) + { + for (IMigrationListener listener : listeners) + listener.onUpdateFunction(udf.name().keyspace, udf.name().name); + } + + public void notifyUpdateAggregate(UDAggregate udf) + { + for (IMigrationListener listener : listeners) + listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); + } + + public void notifyDropKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onDropKeyspace(ksm.name); + } + + public void notifyDropColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onDropColumnFamily(cfm.ksName, cfm.cfName); + } + + public void notifyDropUserType(UserType ut) + { + for (IMigrationListener listener : listeners) + listener.onDropUserType(ut.keyspace, ut.getNameAsString()); + } + + public void notifyDropFunction(UDFunction udf) + { + for (IMigrationListener listener : listeners) + listener.onDropFunction(udf.name().keyspace, udf.name().name); + } + + public void notifyDropAggregate(UDAggregate udf) + { + for (IMigrationListener listener : listeners) + listener.onDropAggregate(udf.name().keyspace, udf.name().name); + } + + public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException + { + announceNewKeyspace(ksm, false); + } + + public static void announceNewKeyspace(KSMetaData ksm, boolean announceLocally) throws ConfigurationException + { + announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally); + } + + public static void announceNewKeyspace(KSMetaData ksm, long timestamp, boolean announceLocally) throws ConfigurationException + { + ksm.validate(); + + if (Schema.instance.getKSMetaData(ksm.name) != null) + throw new AlreadyExistsException(ksm.name); + + logger.info(String.format("Create new Keyspace: %s", ksm)); + announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); + } + + public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException + { + announceNewColumnFamily(cfm, false); + } + + public static void announceNewColumnFamily(CFMetaData cfm, boolean announceLocally) throws ConfigurationException + { + cfm.validate(); + + KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); + if (ksm == null) + throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName)); + else if (ksm.cfMetaData().containsKey(cfm.cfName)) + throw new AlreadyExistsException(cfm.ksName, cfm.cfName); + + logger.info(String.format("Create new table: %s", cfm)); + announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewType(UserType newType, boolean announceLocally) + { + KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace); + announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewFunction(UDFunction udf, boolean announceLocally) + { + logger.info(String.format("Create scalar function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) + { + logger.info(String.format("Create aggregate function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException + { + announceKeyspaceUpdate(ksm, false); + } + + public static void announceKeyspaceUpdate(KSMetaData ksm, boolean announceLocally) throws ConfigurationException + { + ksm.validate(); + + KSMetaData oldKsm = Schema.instance.getKSMetaData(ksm.name); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); + + logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm)); + announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException + { + announceColumnFamilyUpdate(cfm, fromThrift, false); + } + + public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift, boolean announceLocally) throws ConfigurationException + { + cfm.validate(); + + CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName); + if (oldCfm == null) + throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); + + oldCfm.validateCompatility(cfm); + + logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm)); + announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally); + } + + public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) + { + announceNewType(updatedType, announceLocally); + } + + public static void announceKeyspaceDrop(String ksName) throws ConfigurationException + { + announceKeyspaceDrop(ksName, false); + } + + public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException + { + KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); + + logger.info(String.format("Drop Keyspace '%s'", oldKsm.name)); + announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException + { + announceColumnFamilyDrop(ksName, cfName, false); + } + + public static void announceColumnFamilyDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException + { + CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName); + if (oldCfm == null) + throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(ksName); + + logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName)); + announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceTypeDrop(UserType droppedType) + { + announceTypeDrop(droppedType, false); + } + + public static void announceTypeDrop(UserType droppedType, boolean announceLocally) + { + KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace); + announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) + { + logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) + { + logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + /** + * actively announce a new version to active hosts via rpc + * @param schema The schema mutation to be applied + */ + private static void announce(Mutation schema, boolean announceLocally) + { + if (announceLocally) + { + try + { + LegacySchemaTables.mergeSchema(Collections.singletonList(schema), false); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + else + { + FBUtilities.waitOnFuture(announce(Collections.singletonList(schema))); + } + } + + private static void pushSchemaMutation(InetAddress endpoint, Collection schema) + { + MessageOut> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, + schema, + MigrationsSerializer.instance); + MessagingService.instance().sendOneWay(msg, endpoint); + } + + // Returns a future on the local application of the schema + private static Future announce(final Collection schema) + { + Future f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() + { + protected void runMayThrow() throws IOException, ConfigurationException + { + LegacySchemaTables.mergeSchema(schema); + } + }); + + for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) + { + // only push schema to nodes with known and equal versions + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + MessagingService.instance().knowsVersion(endpoint) && + MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) + pushSchemaMutation(endpoint, schema); + } + + return f; + } + + /** + * Announce my version passively over gossip. + * Used to notify nodes as they arrive in the cluster. + * + * @param version The schema version to announce + */ + public static void passiveAnnounce(UUID version) + { + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + logger.debug("Gossiping my schema version {}", version); + } + + /** + * Clear all locally stored schema information and reset schema to initial state. + * Called by user (via JMX) who wants to get rid of schema disagreement. + * + * @throws IOException if schema tables truncation fails + */ + public static void resetLocalSchema() throws IOException + { + logger.info("Starting local schema reset..."); + + logger.debug("Truncating schema tables..."); + + LegacySchemaTables.truncateSchemaTables(); + + logger.debug("Clearing local schema keyspace definitions..."); + + Schema.instance.clear(); + + Set liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + + // force migration if there are nodes around + for (InetAddress node : liveEndpoints) + { + if (shouldPullSchemaFrom(node)) + { + logger.debug("Requesting schema from {}", node); + FBUtilities.waitOnFuture(submitMigrationTask(node)); + break; + } + } + + logger.info("Local schema reset is complete."); + } + + public static class MigrationsSerializer implements IVersionedSerializer> + { + public static MigrationsSerializer instance = new MigrationsSerializer(); + + public void serialize(Collection schema, DataOutputPlus out, int version) throws IOException + { + out.writeInt(schema.size()); + for (Mutation mutation : schema) + Mutation.serializer.serialize(mutation, out, version); + } + + public Collection deserialize(DataInput in, int version) throws IOException + { + int count = in.readInt(); + Collection schema = new ArrayList<>(count); + + for (int i = 0; i < count; i++) + schema.add(Mutation.serializer.deserialize(in, version)); + + return schema; + } + + public long serializedSize(Collection schema, int version) + { + int size = TypeSizes.NATIVE.sizeof(schema.size()); + for (Mutation mutation : schema) + size += Mutation.serializer.serializedSize(mutation, version); + return size; + } + } +} From bcb6c1b9c32b2a9c8821f956bcd2c66c64313e54 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:45:13 +0200 Subject: [PATCH 09/14] service: Convert MigrationManager to C++ Signed-off-by: Pekka Enberg --- ...ationManager.java => migration_manager.hh} | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) rename service/{MigrationManager.java => migration_manager.hh} (98%) diff --git a/service/MigrationManager.java b/service/migration_manager.hh similarity index 98% rename from service/MigrationManager.java rename to service/migration_manager.hh index 45162355ed..36454c636f 100644 --- a/service/MigrationManager.java +++ b/service/migration_manager.hh @@ -15,6 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include "config/ks_meta_data.hh" + +#if 0 package org.apache.cassandra.service; import java.io.DataInput; @@ -52,9 +64,12 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; +#endif -public class MigrationManager -{ +namespace service { + +class migration_manager { +#if 0 private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); public static final MigrationManager instance = new MigrationManager(); @@ -152,7 +167,9 @@ public class MigrationManager && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version && !Gossiper.instance.isGossipOnlyMember(endpoint); } - +#endif +public: +#if 0 public static boolean isReadyForBootstrap() { return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; @@ -252,12 +269,16 @@ public class MigrationManager { announceNewKeyspace(ksm, false); } +#endif - public static void announceNewKeyspace(KSMetaData ksm, boolean announceLocally) throws ConfigurationException - { + static void announce_new_keyspace(lw_shared_ptr ksm, bool announce_locally) { + warn(unimplemented::cause::MIGRATIONS); +#if 0 announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally); +#endif } +#if 0 public static void announceNewKeyspace(KSMetaData ksm, long timestamp, boolean announceLocally) throws ConfigurationException { ksm.validate(); @@ -537,4 +558,7 @@ public class MigrationManager return size; } } +#endif +}; + } From 07e73bdb26170f50bf64bc659a4e426f1cfa2ca9 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 10 Mar 2015 11:09:21 +0200 Subject: [PATCH 10/14] cql3/Cql.g: Implement convert_property_map() helper Signed-off-by: Pekka Enberg --- cql3/Cql.g | 51 +++++++++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/cql3/Cql.g b/cql3/Cql.g index bad935ef0e..622783b0a6 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -153,44 +153,39 @@ using operations_type = std::vectorsyntax_error(*this, msg); } - std::map convert_property_map(shared_ptr map) { - throw std::runtime_error("not implemented"); -#if 0 - if (map == null || map.entries == null || map.entries.isEmpty()) - return Collections.emptyMap(); - - Map res = new HashMap(map.entries.size()); - - for (Pair entry : map.entries) - { + std::unordered_map convert_property_map(shared_ptr map) { + if (!map || map->entries.empty()) { + return std::unordered_map{}; + } + std::unordered_map res{map->entries.size()}; + for (auto&& entry : map->entries) { // Because the parser tries to be smart and recover on error (to // allow displaying more than one error I suppose), we have null // entries in there. Just skip those, a proper error will be thrown in the end. - if (entry.left == null || entry.right == null) - break; - - if (!(entry.left instanceof Constants.Literal)) - { - String msg = "Invalid property name: " + entry.left; - if (entry.left instanceof AbstractMarker.Raw) - msg += " (bind variables are not supported in DDL queries)"; - addRecognitionError(msg); + if (!entry.first || !entry.second) { break; } - if (!(entry.right instanceof Constants.Literal)) - { - String msg = "Invalid property value: " + entry.right + " for property: " + entry.left; - if (entry.right instanceof AbstractMarker.Raw) + auto left = dynamic_pointer_cast(entry.first); + if (!left) { + sstring msg = "Invalid property name: " + entry.first->to_string(); + if (dynamic_pointer_cast(entry.first)) { msg += " (bind variables are not supported in DDL queries)"; - addRecognitionError(msg); + } + add_recognition_error(msg); break; } - - res.put(((Constants.Literal)entry.left).getRawText(), ((Constants.Literal)entry.right).getRawText()); + auto right = dynamic_pointer_cast(entry.second); + if (!right) { + sstring msg = "Invalid property value: " + entry.first->to_string() + " for property: " + entry.second->to_string(); + if (dynamic_pointer_cast(entry.second)) { + msg += " (bind variables are not supported in DDL queries)"; + } + add_recognition_error(msg); + break; + } + res.emplace(left->get_raw_text(), right->get_raw_text()); } - return res; -#endif } void add_raw_update(std::vector,::shared_ptr>>& operations, From 5b3922f860762628cae482b15a79a360677a879a Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:06:49 +0200 Subject: [PATCH 11/14] transport: Convert Event.SchemaChange constructors, take #2 Signed-off-by: Pekka Enberg --- transport/event.hh | 14 +++++++------- transport/server.cc | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/transport/event.hh b/transport/event.hh index d429c9ecb9..4fb8b7dd2e 100644 --- a/transport/event.hh +++ b/transport/event.hh @@ -26,6 +26,8 @@ #include "core/sstring.hh" +#include + namespace transport { class event { @@ -221,9 +223,9 @@ public: const change_type change; const target_type target; const sstring keyspace; - const sstring table_or_type_or_function; + const std::experimental::optional table_or_type_or_function; - schema_change(const change_type change_, const target_type target_, const sstring& keyspace_, const sstring& table_or_type_or_function_) + schema_change(const change_type change_, const target_type target_, const sstring& keyspace_, const std::experimental::optional& table_or_type_or_function_) : event{event_type::SCHEMA_CHANGE} , change{change_} , target{target_} @@ -236,12 +238,10 @@ public: #endif } + schema_change(const change_type change_, const sstring keyspace_) + : schema_change{change_, target_type::KEYSPACE, keyspace_, std::experimental::optional{}} + { } #if 0 - public SchemaChange(Change change, String keyspace) - { - this(change, Target.KEYSPACE, keyspace, null); - } - // Assumes the type has already been deserialized public static SchemaChange deserializeEvent(ByteBuf cb, int version) { diff --git a/transport/server.cc b/transport/server.cc index 9cf21165d2..08552eeb26 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -499,7 +499,7 @@ public: _response->write_string(to_string(sc->target)); _response->write_string(sc->keyspace); if (sc->target != transport::event::schema_change::target_type::KEYSPACE) { - _response->write_string(sc->table_or_type_or_function); + _response->write_string(sc->table_or_type_or_function.value()); } break; } From f2d0f325d457240445e6ad93dde86c191e2bf4c9 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:19:31 +0200 Subject: [PATCH 12/14] exceptions: Add already_exists_exception class Signed-off-by: Pekka Enberg --- exceptions/exceptions.hh | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index bc4eb85c98..bcc2c9b6e9 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -27,6 +27,7 @@ #include #include "core/sstring.hh" +#include "core/print.hh" namespace exceptions { @@ -106,6 +107,37 @@ public: { } }; +class configuration_exception : public request_validation_exception { +public: + configuration_exception(sstring msg) + : request_validation_exception{exception_code::CONFIG_ERROR, std::move(msg)} + { } + + configuration_exception(exception_code code, sstring msg) + : request_validation_exception{code, std::move(msg)} + { } +}; + +class already_exists_exception : public configuration_exception { +public: + const sstring ks_name; + const sstring cf_name; +private: + already_exists_exception(sstring ks_name_, sstring cf_name_, sstring msg) + : configuration_exception{exception_code::ALREADY_EXISTS, msg} + , ks_name{ks_name_} + , cf_name{cf_name_} + { } +public: + already_exists_exception(sstring ks_name_, sstring cf_name_) + : already_exists_exception{ks_name_, cf_name_, sprint("Cannot add already existing table \"%s\" to keyspace \"%s\"", cf_name_, ks_name_)} + { } + + already_exists_exception(sstring ks_name_) + : already_exists_exception{ks_name_, "", sprint("Cannot add existing keyspace \"%s\"", ks_name_)} + { } +}; + class recognition_exception : public std::runtime_error { public: recognition_exception(const std::string& msg) : std::runtime_error(msg) {}; @@ -117,5 +149,4 @@ public: }; } - #endif From e1948dcc0c8d1a8fd760ea4c1d1d51b90a9be07b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 9 Mar 2015 11:05:31 +0200 Subject: [PATCH 13/14] cql3: Convert create_keyspace_statement to C++, take 2 Signed-off-by: Pekka Enberg --- cql3/statements/create_keyspace_statement.hh | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cql3/statements/create_keyspace_statement.hh b/cql3/statements/create_keyspace_statement.hh index 9b9c6071c4..96fa7305a9 100644 --- a/cql3/statements/create_keyspace_statement.hh +++ b/cql3/statements/create_keyspace_statement.hh @@ -26,6 +26,9 @@ #include "cql3/statements/schema_altering_statement.hh" #include "cql3/statements/ks_prop_defs.hh" +#include "service/migration_manager.hh" +#include "config/ks_meta_data.hh" +#include "transport/event.hh" #include "core/shared_ptr.hh" @@ -104,12 +107,14 @@ public: throw new InvalidRequestException(String.format("\"%s\" is not a valid keyspace name", name)); if (name.length() > Schema.NAME_LENGTH) throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, name)); +#endif - attrs.validate(); - - if (attrs.getReplicationStrategyClass() == null) - throw new ConfigurationException("Missing mandatory replication strategy class"); + _attrs->validate(); + if (!bool(_attrs->get_replication_strategy_class())) { + throw exceptions::configuration_exception("Missing mandatory replication strategy class"); + } +#if 0 // The strategy is validated through KSMetaData.validate() in announceNewKeyspace below. // However, for backward compatibility with thrift, this doesn't validate unexpected options yet, // so doing proper validation here. @@ -122,24 +127,19 @@ public: } virtual bool announce_migration(bool is_local_only) override { - throw std::runtime_error("not implemented"); -#if 0 try { - MigrationManager.announceNewKeyspace(attrs.asKSMetadata(name), isLocalOnly); + service::migration_manager::announce_new_keyspace(_attrs->as_ks_metadata(_name), is_local_only); return true; - } catch (AlreadyExistsException e) { - if (ifNotExists) + } catch (const exceptions::already_exists_exception& e) { + if (_if_not_exists) { return false; + } throw e; } -#endif } virtual shared_ptr change_event() override { - throw std::runtime_error("not implemented"); -#if 0 - return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, keyspace()); -#endif + return make_shared(transport::event::schema_change::change_type::CREATED, keyspace()); } }; From 3f019ee9c0488f7913ff9f505dd86d538dccb80a Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 4 Mar 2015 14:34:56 +0200 Subject: [PATCH 14/14] tests: create keyspace statement test case Signed-off-by: Pekka Enberg --- tests/urchin/cql_query_test.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index fa3f0e57ac..ccf545d8ac 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -72,6 +72,17 @@ static future<> require_column_has_value(distributed& ddb, const sstri }); } +SEASTAR_TEST_CASE(test_create_keyspace_statement) { + auto db = make_shared>(); + auto state = make_shared(*db, ks_name); + + return db->start().then([state] { + return state->execute_cql("create keyspace ks with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").discard_result(); + }).finally([db] { + return db->stop().finally([db] {}); + }); +} + SEASTAR_TEST_CASE(test_insert_statement) { auto db = make_shared>(); auto state = make_shared(*db, ks_name);