From 32378708d0dcd17f828baa3e3e70d45e9e2f02a0 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:36:36 +0300 Subject: [PATCH 1/6] db/consistency_level: Remove ifdef'd code Cleanup consistency_level.hh by removing untranslated code that's been sitting in the tree for a while. Signed-off-by: Pekka Enberg --- db/consistency_level.hh | 64 ----------------------------------------- 1 file changed, 64 deletions(-) diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 4e74769ede..0b5c1e5cf3 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -37,29 +37,6 @@ namespace db { -#if 0 -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Iterables; -import net.nicoulaj.compilecommand.annotations.Inline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.ReadRepairDecision; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.NetworkTopologyStrategy; -import org.apache.cassandra.transport.ProtocolException; -#endif - enum class consistency_level { ANY, ONE, @@ -89,47 +66,6 @@ struct unavailable_exception : exceptions::cassandra_exception { {} }; -#if 0 - private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class); - - // Used by the binary protocol - public final int code; - private final boolean isDCLocal; - private static final ConsistencyLevel[] codeIdx; - static - { - int maxCode = -1; - for (ConsistencyLevel cl : ConsistencyLevel.values()) - maxCode = Math.max(maxCode, cl.code); - codeIdx = new ConsistencyLevel[maxCode + 1]; - for (ConsistencyLevel cl : ConsistencyLevel.values()) - { - if (codeIdx[cl.code] != null) - throw new IllegalStateException("Duplicate code"); - codeIdx[cl.code] = cl; - } - } - - private ConsistencyLevel(int code) - { - this(code, false); - } - - private ConsistencyLevel(int code, boolean isDCLocal) - { - this.code = code; - this.isDCLocal = isDCLocal; - } - - public static ConsistencyLevel fromCode(int code) - { - if (code < 0 || code >= codeIdx.length) - throw new ProtocolException(String.format("Unknown code %d for a consistency level", code)); - return codeIdx[code]; - } - -#endif - inline size_t quorum_for(keyspace& ks) { return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; } From 7fc1311d4aebd2863ffad879a8577cb00bd740c7 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:46:23 +0300 Subject: [PATCH 2/6] db/consistency_level: Move implementation to .cc file Signed-off-by: Pekka Enberg --- configure.py | 1 + db/consistency_level.cc | 299 ++++++++++++++++++++++++++++++++++++++++ db/consistency_level.hh | 275 ++++-------------------------------- 3 files changed, 325 insertions(+), 250 deletions(-) create mode 100644 db/consistency_level.cc diff --git a/configure.py b/configure.py index 90776068fc..d63e797206 100755 --- a/configure.py +++ b/configure.py @@ -266,6 +266,7 @@ urchin_core = (['database.cc', 'cql3/selection/selection.cc', 'cql3/selection/selector.cc', 'cql3/restrictions/statement_restrictions.cc', + 'db/consistency_level.cc', 'db/system_keyspace.cc', 'db/legacy_schema_tables.cc', 'db/commitlog/commitlog.cc', diff --git a/db/consistency_level.cc b/db/consistency_level.cc new file mode 100644 index 0000000000..8d7f05b369 --- /dev/null +++ b/db/consistency_level.cc @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include "db/consistency_level.hh" + +#include +#include "exceptions/exceptions.hh" +#include "core/sstring.hh" +#include "schema.hh" +#include "database.hh" +#include "unimplemented.hh" +#include "db/read_repair_decision.hh" +#include "locator/abstract_replication_strategy.hh" +#include "locator/network_topology_strategy.hh" +#include "utils/fb_utilities.hh" + +namespace db { + +size_t quorum_for(keyspace& ks) { + return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; +} + +size_t local_quorum_for(keyspace& ks, const sstring& dc) { + using namespace locator; + + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + network_topology_strategy* nrs = + static_cast(&rs); + + return (nrs->get_replication_factor(dc) / 2) + 1; + } + + return quorum_for(ks); +} + +size_t block_for_local_serial(keyspace& ks) { + using namespace locator; + + // + // TODO: Consider caching the final result in order to avoid all these + // useless dereferencing. Note however that this will introduce quite + // a lot of complications since both snitch output for a local host + // and the snitch itself (and thus its output) may change dynamically. + // + auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); + auto local_addr = utils::fb_utilities::get_broadcast_address(); + + return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr)); +} + +size_t block_for_each_quorum(keyspace& ks) { + using namespace locator; + + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + network_topology_strategy* nrs = + static_cast(&rs); + size_t n = 0; + + for (auto& dc : nrs->get_datacenters()) { + n += local_quorum_for(ks, dc); + } + + return n; + } else { + return quorum_for(ks); + } +} + +size_t block_for(keyspace& ks, consistency_level cl) { + switch (cl) { + case consistency_level::ONE: + case consistency_level::LOCAL_ONE: + return 1; + case consistency_level::ANY: + return 1; + case consistency_level::TWO: + return 2; + case consistency_level::THREE: + return 3; + case consistency_level::QUORUM: + case consistency_level::SERIAL: + return quorum_for(ks); + case consistency_level::ALL: + return ks.get_replication_strategy().get_replication_factor(); + case consistency_level::LOCAL_QUORUM: + case consistency_level::LOCAL_SERIAL: + return block_for_local_serial(ks); + case consistency_level::EACH_QUORUM: + return block_for_each_quorum(ks); + default: + abort(); + } +} + +bool is_datacenter_local(consistency_level l) { + return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM; +} + +bool is_local(gms::inet_address endpoint) { + using namespace locator; + + auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); + auto local_addr = utils::fb_utilities::get_broadcast_address(); + + return snitch_ptr->get_datacenter(local_addr) == + snitch_ptr->get_datacenter(endpoint); +} + +std::vector +filter_for_query_dc_local(consistency_level cl, + keyspace& ks, + const std::vector& live_endpoints) { + using namespace gms; + + std::vector local; + std::vector other; + local.reserve(live_endpoints.size()); + other.reserve(live_endpoints.size()); + + std::partition_copy(live_endpoints.begin(), live_endpoints.end(), + std::back_inserter(local), std::back_inserter(other), + is_local); + + // check if blockfor more than we have localep's + size_t bf = block_for(ks, cl); + if (local.size() < bf) { + size_t other_items_count = std::min(bf - local.size(), other.size()); + local.reserve(local.size() + other_items_count); + + std::move(other.begin(), other.begin() + other_items_count, + std::back_inserter(local)); + } + + return local; +} + +std::vector +filter_for_query(consistency_level cl, + keyspace& ks, + std::vector live_endpoints, + read_repair_decision read_repair) { + /* + * Endpoints are expected to be restricted to live replicas, sorted by + * snitch preference. For LOCAL_QUORUM, move local-DC replicas in front + * first as we need them there whether we do read repair (since the first + * replica gets the data read) or not (since we'll take the block_for first + * ones). + */ + if (is_datacenter_local(cl)) { + boost::range::partition(live_endpoints, is_local); + } + + switch (read_repair) { + case read_repair_decision::NONE: + { + size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl)); + + live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end()); + } + // fall through + case read_repair_decision::GLOBAL: + return std::move(live_endpoints); + case read_repair_decision::DC_LOCAL: + return filter_for_query_dc_local(cl, ks, live_endpoints); + default: + throw std::runtime_error("Unknown read repair type"); + } +} + +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints) { + return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE); +} + +bool +is_sufficient_live_nodes(consistency_level cl, + keyspace& ks, + const std::vector& live_endpoints) { + using namespace locator; + + switch (cl) { + case consistency_level::ANY: + // local hint is acceptable, and local node is always live + return true; + case consistency_level::LOCAL_ONE: + return count_local_endpoints(live_endpoints) >= 1; + case consistency_level::LOCAL_QUORUM: + return count_local_endpoints(live_endpoints) >= block_for(ks, cl); + case consistency_level::EACH_QUORUM: + { + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) { + if (entry.second < local_quorum_for(ks, entry.first)) { + return false; + } + } + + return true; + } + } + // Fallthough on purpose for SimpleStrategy + default: + return live_endpoints.size() >= block_for(ks, cl); + } +} + +void validate_for_read(const sstring& keyspace_name, consistency_level cl) { + switch (cl) { + case consistency_level::ANY: + throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes"); + case consistency_level::EACH_QUORUM: + throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes"); + default: + break; + } +} + +void validate_for_write(const sstring& keyspace_name, consistency_level cl) { + switch (cl) { + case consistency_level::SERIAL: + case consistency_level::LOCAL_SERIAL: + throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes"); + default: + break; + } +} + +#if 0 + // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL + public void validateForCasCommit(String keyspaceName) throws InvalidRequestException + { + switch (this) + { + case EACH_QUORUM: + requireNetworkTopologyStrategy(keyspaceName); + break; + case SERIAL: + case LOCAL_SERIAL: + throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); + } + } + + public void validateForCas() throws InvalidRequestException + { + if (!isSerialConsistency()) + throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); + } +#endif + +bool is_serial_consistency(consistency_level cl) { + return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL; +} + +void validate_counter_for_write(schema_ptr s, consistency_level cl) { + if (cl == consistency_level::ANY) { + throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name())); + } + + if (is_serial_consistency(cl)) { + throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable"); + } +} + +#if 0 + private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException + { + AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); + if (!(strategy instanceof NetworkTopologyStrategy)) + throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName())); + } +#endif + +} diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 0b5c1e5cf3..9a14260473 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -24,16 +24,15 @@ #pragma once -#include -#include "exceptions/exceptions.hh" -#include "core/sstring.hh" -#include "schema.hh" -#include "database.hh" -#include "unimplemented.hh" -#include "db/read_repair_decision.hh" -#include "locator/abstract_replication_strategy.hh" #include "locator/network_topology_strategy.hh" +#include "db/read_repair_decision.hh" +#include "exceptions/exceptions.hh" #include "utils/fb_utilities.hh" +#include "gms/inet_address.hh" +#include "database.hh" + +#include +#include namespace db { @@ -66,170 +65,37 @@ struct unavailable_exception : exceptions::cassandra_exception { {} }; -inline size_t quorum_for(keyspace& ks) { - return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; -} +size_t quorum_for(keyspace& ks); -inline size_t local_quorum_for(keyspace& ks, const sstring& dc) { - using namespace locator; +size_t local_quorum_for(keyspace& ks, const sstring& dc); - auto& rs = ks.get_replication_strategy(); +size_t block_for_local_serial(keyspace& ks); - if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); +size_t block_for_each_quorum(keyspace& ks); - return (nrs->get_replication_factor(dc) / 2) + 1; - } +size_t block_for(keyspace& ks, consistency_level cl); - return quorum_for(ks); -} +bool is_datacenter_local(consistency_level l); -inline size_t block_for_local_serial(keyspace& ks) { - using namespace locator; - - // - // TODO: Consider caching the final result in order to avoid all these - // useless dereferencing. Note however that this will introduce quite - // a lot of complications since both snitch output for a local host - // and the snitch itself (and thus its output) may change dynamically. - // - auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); - auto local_addr = utils::fb_utilities::get_broadcast_address(); - - return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr)); -} - -inline size_t block_for_each_quorum(keyspace& ks) { - using namespace locator; - - auto& rs = ks.get_replication_strategy(); - - if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); - size_t n = 0; - - for (auto& dc : nrs->get_datacenters()) { - n += local_quorum_for(ks, dc); - } - - return n; - } else { - return quorum_for(ks); - } -} - -inline size_t block_for(keyspace& ks, consistency_level cl) { - switch (cl) { - case consistency_level::ONE: - case consistency_level::LOCAL_ONE: - return 1; - case consistency_level::ANY: - return 1; - case consistency_level::TWO: - return 2; - case consistency_level::THREE: - return 3; - case consistency_level::QUORUM: - case consistency_level::SERIAL: - return quorum_for(ks); - case consistency_level::ALL: - return ks.get_replication_strategy().get_replication_factor(); - case consistency_level::LOCAL_QUORUM: - case consistency_level::LOCAL_SERIAL: - return block_for_local_serial(ks); - case consistency_level::EACH_QUORUM: - return block_for_each_quorum(ks); - default: - abort(); - } -} - -inline bool is_datacenter_local(consistency_level l) { - return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM; -} - -inline bool is_local(gms::inet_address endpoint) { - using namespace locator; - - auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); - auto local_addr = utils::fb_utilities::get_broadcast_address(); - - return snitch_ptr->get_datacenter(local_addr) == - snitch_ptr->get_datacenter(endpoint); -} +bool is_local(gms::inet_address endpoint); template inline size_t count_local_endpoints(Range& live_endpoints) { return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local); } -inline std::vector +std::vector filter_for_query_dc_local(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { - using namespace gms; + const std::vector& live_endpoints); - std::vector local; - std::vector other; - local.reserve(live_endpoints.size()); - other.reserve(live_endpoints.size()); - - std::partition_copy(live_endpoints.begin(), live_endpoints.end(), - std::back_inserter(local), std::back_inserter(other), - is_local); - - // check if blockfor more than we have localep's - size_t bf = block_for(ks, cl); - if (local.size() < bf) { - size_t other_items_count = std::min(bf - local.size(), other.size()); - local.reserve(local.size() + other_items_count); - - std::move(other.begin(), other.begin() + other_items_count, - std::back_inserter(local)); - } - - return local; -} - -inline std::vector +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair) { - /* - * Endpoints are expected to be restricted to live replicas, sorted by - * snitch preference. For LOCAL_QUORUM, move local-DC replicas in front - * first as we need them there whether we do read repair (since the first - * replica gets the data read) or not (since we'll take the block_for first - * ones). - */ - if (is_datacenter_local(cl)) { - boost::range::partition(live_endpoints, is_local); - } + read_repair_decision read_repair); - switch (read_repair) { - case read_repair_decision::NONE: - { - size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl)); - - live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end()); - } - // fall through - case read_repair_decision::GLOBAL: - return std::move(live_endpoints); - case read_repair_decision::DC_LOCAL: - return filter_for_query_dc_local(cl, ks, live_endpoints); - default: - throw std::runtime_error("Unknown read repair type"); - } -} - -inline -std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints) { - return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE); -} +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints); template inline std::unordered_map count_per_dc_endpoints( @@ -260,39 +126,10 @@ inline std::unordered_map count_per_dc_endpoints( return dc_endpoints; } -inline bool +bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { - using namespace locator; - - switch (cl) { - case consistency_level::ANY: - // local hint is acceptable, and local node is always live - return true; - case consistency_level::LOCAL_ONE: - return count_local_endpoints(live_endpoints) >= 1; - case consistency_level::LOCAL_QUORUM: - return count_local_endpoints(live_endpoints) >= block_for(ks, cl); - case consistency_level::EACH_QUORUM: - { - auto& rs = ks.get_replication_strategy(); - - if (rs.get_type() == replication_strategy_type::network_topology) { - for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) { - if (entry.second < local_quorum_for(ks, entry.first)) { - return false; - } - } - - return true; - } - } - // Fallthough on purpose for SimpleStrategy - default: - return live_endpoints.size() >= block_for(ks, cl); - } -} + const std::vector& live_endpoints); template inline bool assure_sufficient_live_nodes_each_quorum( @@ -370,74 +207,12 @@ inline void assure_sufficient_live_nodes( } } -static inline -void validate_for_read(const sstring& keyspace_name, consistency_level cl) { - switch (cl) { - case consistency_level::ANY: - throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes"); - case consistency_level::EACH_QUORUM: - throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes"); - default: - break; - } -} +void validate_for_read(const sstring& keyspace_name, consistency_level cl); -static inline -void validate_for_write(const sstring& keyspace_name, consistency_level cl) { - switch (cl) { - case consistency_level::SERIAL: - case consistency_level::LOCAL_SERIAL: - throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes"); - default: - break; - } -} +void validate_for_write(const sstring& keyspace_name, consistency_level cl); -#if 0 - // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL - public void validateForCasCommit(String keyspaceName) throws InvalidRequestException - { - switch (this) - { - case EACH_QUORUM: - requireNetworkTopologyStrategy(keyspaceName); - break; - case SERIAL: - case LOCAL_SERIAL: - throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); - } - } +bool is_serial_consistency(consistency_level cl); - public void validateForCas() throws InvalidRequestException - { - if (!isSerialConsistency()) - throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); - } -#endif - -static inline -bool is_serial_consistency(consistency_level cl) { - return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL; -} - -static inline -void validate_counter_for_write(schema_ptr s, consistency_level cl) { - if (cl == consistency_level::ANY) { - throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name())); - } - - if (is_serial_consistency(cl)) { - throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable"); - } -} - -#if 0 - private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException - { - AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - if (!(strategy instanceof NetworkTopologyStrategy)) - throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName())); - } -#endif +void validate_counter_for_write(schema_ptr s, consistency_level cl); } From 055e25ed435321ed0475aab3c4b3d77bf3061b47 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:47:58 +0300 Subject: [PATCH 3/6] db/consistency_level: Move enum to separate header Move 'consistency_level' enumeration to a separate header file to fix dependency issues that arise when we move 'unavailable_exception' to exceptions.hh. Signed-off-by: Pekka Enberg --- db/consistency_level.hh | 17 +------------ db/consistency_level_type.hh | 47 ++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 16 deletions(-) create mode 100644 db/consistency_level_type.hh diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 9a14260473..23c7365f5e 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -25,6 +25,7 @@ #pragma once #include "locator/network_topology_strategy.hh" +#include "db/consistency_level_type.hh" #include "db/read_repair_decision.hh" #include "exceptions/exceptions.hh" #include "utils/fb_utilities.hh" @@ -36,22 +37,6 @@ namespace db { -enum class consistency_level { - ANY, - ONE, - TWO, - THREE, - QUORUM, - ALL, - LOCAL_QUORUM, - EACH_QUORUM, - SERIAL, - LOCAL_SERIAL, - LOCAL_ONE -}; - -std::ostream& operator<<(std::ostream& os, consistency_level cl); - struct unavailable_exception : exceptions::cassandra_exception { consistency_level consistency; int32_t required; diff --git a/db/consistency_level_type.hh b/db/consistency_level_type.hh new file mode 100644 index 0000000000..fe40d49e50 --- /dev/null +++ b/db/consistency_level_type.hh @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include + +namespace db { + +enum class consistency_level { + ANY, + ONE, + TWO, + THREE, + QUORUM, + ALL, + LOCAL_QUORUM, + EACH_QUORUM, + SERIAL, + LOCAL_SERIAL, + LOCAL_ONE +}; + +std::ostream& operator<<(std::ostream& os, consistency_level cl); + +} From 0b8c67ed793f36b05a26b608964df29563ae1474 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:52:01 +0300 Subject: [PATCH 4/6] exceptions: Move unavailable_exception to exceptions.hh Move unavailable_exception to exceptions.hh where other CQL transport level exceptions are defined in. Signed-off-by: Pekka Enberg --- db/consistency_level.hh | 21 ++++----------------- exceptions/exceptions.hh | 14 ++++++++++++++ service/storage_proxy.cc | 2 +- transport/server.cc | 2 +- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 23c7365f5e..ba84f8513f 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -37,19 +37,6 @@ namespace db { -struct unavailable_exception : exceptions::cassandra_exception { - consistency_level consistency; - int32_t required; - int32_t alive; - - unavailable_exception(consistency_level cl, int32_t required, int32_t alive) - : exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive)) - , consistency(cl) - , required(required) - , alive(alive) - {} -}; - size_t quorum_for(keyspace& ks); size_t local_quorum_for(keyspace& ks, const sstring& dc); @@ -131,7 +118,7 @@ inline bool assure_sufficient_live_nodes_each_quorum( auto dc_live = entry.second; if (dc_live < dc_block_for) { - throw unavailable_exception(cl, dc_block_for, dc_live); + throw exceptions::unavailable_exception(cl, dc_block_for, dc_live); } } @@ -154,7 +141,7 @@ inline void assure_sufficient_live_nodes( break; case consistency_level::LOCAL_ONE: if (count_local_endpoints(live_endpoints) == 0) { - throw unavailable_exception(cl, 1, 0); + throw exceptions::unavailable_exception(cl, 1, 0); } break; case consistency_level::LOCAL_QUORUM: { @@ -173,7 +160,7 @@ inline void assure_sufficient_live_nodes( logger.debug(builder.toString()); } #endif - throw unavailable_exception(cl, need, local_live); + throw exceptions::unavailable_exception(cl, need, local_live); } break; } @@ -186,7 +173,7 @@ inline void assure_sufficient_live_nodes( size_t live = live_endpoints.size(); if (live < need) { // logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor); - throw unavailable_exception(cl, need, live); + throw exceptions::unavailable_exception(cl, need, live); } break; } diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index 28e318ac01..62046dde8b 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -25,6 +25,7 @@ #ifndef EXCEPTIONS_HH #define EXCEPTIONS_HH +#include "db/consistency_level_type.hh" #include #include "core/sstring.hh" #include "core/print.hh" @@ -69,6 +70,19 @@ public: sstring get_message() const { return what(); } }; +struct unavailable_exception : exceptions::cassandra_exception { + db::consistency_level consistency; + int32_t required; + int32_t alive; + + unavailable_exception(db::consistency_level cl, int32_t required, int32_t alive) + : exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive)) + , consistency(cl) + , required(required) + , alive(alive) + {} +}; + class request_validation_exception : public cassandra_exception { public: using cassandra_exception::cassandra_exception; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b5684b7aa2..876de9bfeb 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -796,7 +796,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) }); } catch (no_such_keyspace& ex) { return make_exception_future<>(std::current_exception()); - } catch(db::unavailable_exception& ex) { + } catch (exceptions::unavailable_exception& ex) { _stats.write_unavailables++; logger.trace("Unavailable"); return make_exception_future<>(std::current_exception()); diff --git a/transport/server.cc b/transport/server.cc index 48f6557b75..3bb8a12b7e 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -379,7 +379,7 @@ future<> cql_server::connection::process_request() { }).then_wrapped([stream = f.stream, this] (future<> f) { try { f.get(); - } catch (const db::unavailable_exception& ex) { + } catch (const exceptions::unavailable_exception& ex) { write_unavailable_error(stream, ex.code(), ex.consistency, ex.required, ex.alive); } catch (const exceptions::read_timeout_exception& ex) { write_read_timeout_error(stream, ex.code(), ex.consistency, ex.received, ex.block_for, ex.data_present); From 3803e0ed5b2182ad485a3777a0c35667a09db303 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:54:34 +0300 Subject: [PATCH 5/6] exceptions: Move request_timeout_exception to exceptions.hh Now that consistency level dependency issues are sorted out, move request_timeout_exception to exceptions.hh. Signed-off-by: Pekka Enberg --- exceptions/exceptions.hh | 24 +++++++++++ exceptions/request_timeout_exception.hh | 56 ------------------------- service/storage_proxy.cc | 2 +- transport/server.cc | 2 +- 4 files changed, 26 insertions(+), 58 deletions(-) delete mode 100644 exceptions/request_timeout_exception.hh diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index 62046dde8b..5fd95473ab 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -83,6 +83,30 @@ struct unavailable_exception : exceptions::cassandra_exception { {} }; +class request_timeout_exception : public cassandra_exception { +public: + db::consistency_level consistency; + int32_t received; + int32_t block_for; + + request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for) + : cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)} + , consistency{consistency} + , received{received} + , block_for{block_for} + { } +}; + +class read_timeout_exception : public request_timeout_exception { +public: + bool data_present; + + read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present) + : request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for} + , data_present{data_present} + { } +}; + class request_validation_exception : public cassandra_exception { public: using cassandra_exception::cassandra_exception; diff --git a/exceptions/request_timeout_exception.hh b/exceptions/request_timeout_exception.hh deleted file mode 100644 index 9aa896fe31..0000000000 --- a/exceptions/request_timeout_exception.hh +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Copyright 2015 Cloudius Systems - * - * Modified by Cloudius Systems - */ - -#pragma once - -#include "exceptions/exceptions.hh" -#include "db/consistency_level.hh" - -namespace exceptions { - -class request_timeout_exception : public cassandra_exception { -public: - db::consistency_level consistency; - int32_t received; - int32_t block_for; - - request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for) - : cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)} - , consistency{consistency} - , received{received} - , block_for{block_for} - { } -}; - -class read_timeout_exception : public request_timeout_exception { -public: - bool data_present; - - read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present) - : request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for} - , data_present{data_present} - { } -}; - -} diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 876de9bfeb..5b6a8e6776 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -38,7 +38,7 @@ #include "core/future-util.hh" #include "db/read_repair_decision.hh" #include "db/config.hh" -#include "exceptions/request_timeout_exception.hh" +#include "exceptions/exceptions.hh" #include #include #include diff --git a/transport/server.cc b/transport/server.cc index 3bb8a12b7e..91413bef24 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -22,7 +22,7 @@ #include "service/query_state.hh" #include "service/client_state.hh" #include "transport/protocol_exception.hh" -#include "exceptions/request_timeout_exception.hh" +#include "exceptions/exceptions.hh" #include #include From 1c039f985550d9dd0667c4f3e281ed5ab2b14753 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 28 Jul 2015 09:59:04 +0300 Subject: [PATCH 6/6] exceptions/exceptions.hh: Use 'pragma once' as include guard Signed-off-by: Pekka Enberg --- exceptions/exceptions.hh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index 5fd95473ab..db1dd4009a 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -22,8 +22,7 @@ * Modified by Cloudius Systems */ -#ifndef EXCEPTIONS_HH -#define EXCEPTIONS_HH +#pragma once #include "db/consistency_level_type.hh" #include @@ -189,4 +188,3 @@ public: }; } -#endif