diff --git a/configure.py b/configure.py index 90776068fc..d63e797206 100755 --- a/configure.py +++ b/configure.py @@ -266,6 +266,7 @@ urchin_core = (['database.cc', 'cql3/selection/selection.cc', 'cql3/selection/selector.cc', 'cql3/restrictions/statement_restrictions.cc', + 'db/consistency_level.cc', 'db/system_keyspace.cc', 'db/legacy_schema_tables.cc', 'db/commitlog/commitlog.cc', diff --git a/db/consistency_level.cc b/db/consistency_level.cc new file mode 100644 index 0000000000..8d7f05b369 --- /dev/null +++ b/db/consistency_level.cc @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include "db/consistency_level.hh" + +#include +#include "exceptions/exceptions.hh" +#include "core/sstring.hh" +#include "schema.hh" +#include "database.hh" +#include "unimplemented.hh" +#include "db/read_repair_decision.hh" +#include "locator/abstract_replication_strategy.hh" +#include "locator/network_topology_strategy.hh" +#include "utils/fb_utilities.hh" + +namespace db { + +size_t quorum_for(keyspace& ks) { + return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; +} + +size_t local_quorum_for(keyspace& ks, const sstring& dc) { + using namespace locator; + + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + network_topology_strategy* nrs = + static_cast(&rs); + + return (nrs->get_replication_factor(dc) / 2) + 1; + } + + return quorum_for(ks); +} + +size_t block_for_local_serial(keyspace& ks) { + using namespace locator; + + // + // TODO: Consider caching the final result in order to avoid all these + // useless dereferencing. Note however that this will introduce quite + // a lot of complications since both snitch output for a local host + // and the snitch itself (and thus its output) may change dynamically. + // + auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); + auto local_addr = utils::fb_utilities::get_broadcast_address(); + + return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr)); +} + +size_t block_for_each_quorum(keyspace& ks) { + using namespace locator; + + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + network_topology_strategy* nrs = + static_cast(&rs); + size_t n = 0; + + for (auto& dc : nrs->get_datacenters()) { + n += local_quorum_for(ks, dc); + } + + return n; + } else { + return quorum_for(ks); + } +} + +size_t block_for(keyspace& ks, consistency_level cl) { + switch (cl) { + case consistency_level::ONE: + case consistency_level::LOCAL_ONE: + return 1; + case consistency_level::ANY: + return 1; + case consistency_level::TWO: + return 2; + case consistency_level::THREE: + return 3; + case consistency_level::QUORUM: + case consistency_level::SERIAL: + return quorum_for(ks); + case consistency_level::ALL: + return ks.get_replication_strategy().get_replication_factor(); + case consistency_level::LOCAL_QUORUM: + case consistency_level::LOCAL_SERIAL: + return block_for_local_serial(ks); + case consistency_level::EACH_QUORUM: + return block_for_each_quorum(ks); + default: + abort(); + } +} + +bool is_datacenter_local(consistency_level l) { + return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM; +} + +bool is_local(gms::inet_address endpoint) { + using namespace locator; + + auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); + auto local_addr = utils::fb_utilities::get_broadcast_address(); + + return snitch_ptr->get_datacenter(local_addr) == + snitch_ptr->get_datacenter(endpoint); +} + +std::vector +filter_for_query_dc_local(consistency_level cl, + keyspace& ks, + const std::vector& live_endpoints) { + using namespace gms; + + std::vector local; + std::vector other; + local.reserve(live_endpoints.size()); + other.reserve(live_endpoints.size()); + + std::partition_copy(live_endpoints.begin(), live_endpoints.end(), + std::back_inserter(local), std::back_inserter(other), + is_local); + + // check if blockfor more than we have localep's + size_t bf = block_for(ks, cl); + if (local.size() < bf) { + size_t other_items_count = std::min(bf - local.size(), other.size()); + local.reserve(local.size() + other_items_count); + + std::move(other.begin(), other.begin() + other_items_count, + std::back_inserter(local)); + } + + return local; +} + +std::vector +filter_for_query(consistency_level cl, + keyspace& ks, + std::vector live_endpoints, + read_repair_decision read_repair) { + /* + * Endpoints are expected to be restricted to live replicas, sorted by + * snitch preference. For LOCAL_QUORUM, move local-DC replicas in front + * first as we need them there whether we do read repair (since the first + * replica gets the data read) or not (since we'll take the block_for first + * ones). + */ + if (is_datacenter_local(cl)) { + boost::range::partition(live_endpoints, is_local); + } + + switch (read_repair) { + case read_repair_decision::NONE: + { + size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl)); + + live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end()); + } + // fall through + case read_repair_decision::GLOBAL: + return std::move(live_endpoints); + case read_repair_decision::DC_LOCAL: + return filter_for_query_dc_local(cl, ks, live_endpoints); + default: + throw std::runtime_error("Unknown read repair type"); + } +} + +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints) { + return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE); +} + +bool +is_sufficient_live_nodes(consistency_level cl, + keyspace& ks, + const std::vector& live_endpoints) { + using namespace locator; + + switch (cl) { + case consistency_level::ANY: + // local hint is acceptable, and local node is always live + return true; + case consistency_level::LOCAL_ONE: + return count_local_endpoints(live_endpoints) >= 1; + case consistency_level::LOCAL_QUORUM: + return count_local_endpoints(live_endpoints) >= block_for(ks, cl); + case consistency_level::EACH_QUORUM: + { + auto& rs = ks.get_replication_strategy(); + + if (rs.get_type() == replication_strategy_type::network_topology) { + for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) { + if (entry.second < local_quorum_for(ks, entry.first)) { + return false; + } + } + + return true; + } + } + // Fallthough on purpose for SimpleStrategy + default: + return live_endpoints.size() >= block_for(ks, cl); + } +} + +void validate_for_read(const sstring& keyspace_name, consistency_level cl) { + switch (cl) { + case consistency_level::ANY: + throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes"); + case consistency_level::EACH_QUORUM: + throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes"); + default: + break; + } +} + +void validate_for_write(const sstring& keyspace_name, consistency_level cl) { + switch (cl) { + case consistency_level::SERIAL: + case consistency_level::LOCAL_SERIAL: + throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes"); + default: + break; + } +} + +#if 0 + // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL + public void validateForCasCommit(String keyspaceName) throws InvalidRequestException + { + switch (this) + { + case EACH_QUORUM: + requireNetworkTopologyStrategy(keyspaceName); + break; + case SERIAL: + case LOCAL_SERIAL: + throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); + } + } + + public void validateForCas() throws InvalidRequestException + { + if (!isSerialConsistency()) + throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); + } +#endif + +bool is_serial_consistency(consistency_level cl) { + return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL; +} + +void validate_counter_for_write(schema_ptr s, consistency_level cl) { + if (cl == consistency_level::ANY) { + throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name())); + } + + if (is_serial_consistency(cl)) { + throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable"); + } +} + +#if 0 + private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException + { + AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); + if (!(strategy instanceof NetworkTopologyStrategy)) + throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName())); + } +#endif + +} diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 4e74769ede..ba84f8513f 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -24,276 +24,50 @@ #pragma once -#include -#include "exceptions/exceptions.hh" -#include "core/sstring.hh" -#include "schema.hh" -#include "database.hh" -#include "unimplemented.hh" -#include "db/read_repair_decision.hh" -#include "locator/abstract_replication_strategy.hh" #include "locator/network_topology_strategy.hh" +#include "db/consistency_level_type.hh" +#include "db/read_repair_decision.hh" +#include "exceptions/exceptions.hh" #include "utils/fb_utilities.hh" +#include "gms/inet_address.hh" +#include "database.hh" + +#include +#include namespace db { -#if 0 -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +size_t quorum_for(keyspace& ks); -import com.google.common.collect.Iterables; -import net.nicoulaj.compilecommand.annotations.Inline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +size_t local_quorum_for(keyspace& ks, const sstring& dc); -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.ReadRepairDecision; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.NetworkTopologyStrategy; -import org.apache.cassandra.transport.ProtocolException; -#endif +size_t block_for_local_serial(keyspace& ks); -enum class consistency_level { - ANY, - ONE, - TWO, - THREE, - QUORUM, - ALL, - LOCAL_QUORUM, - EACH_QUORUM, - SERIAL, - LOCAL_SERIAL, - LOCAL_ONE -}; +size_t block_for_each_quorum(keyspace& ks); -std::ostream& operator<<(std::ostream& os, consistency_level cl); +size_t block_for(keyspace& ks, consistency_level cl); -struct unavailable_exception : exceptions::cassandra_exception { - consistency_level consistency; - int32_t required; - int32_t alive; +bool is_datacenter_local(consistency_level l); - unavailable_exception(consistency_level cl, int32_t required, int32_t alive) - : exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive)) - , consistency(cl) - , required(required) - , alive(alive) - {} -}; - -#if 0 - private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class); - - // Used by the binary protocol - public final int code; - private final boolean isDCLocal; - private static final ConsistencyLevel[] codeIdx; - static - { - int maxCode = -1; - for (ConsistencyLevel cl : ConsistencyLevel.values()) - maxCode = Math.max(maxCode, cl.code); - codeIdx = new ConsistencyLevel[maxCode + 1]; - for (ConsistencyLevel cl : ConsistencyLevel.values()) - { - if (codeIdx[cl.code] != null) - throw new IllegalStateException("Duplicate code"); - codeIdx[cl.code] = cl; - } - } - - private ConsistencyLevel(int code) - { - this(code, false); - } - - private ConsistencyLevel(int code, boolean isDCLocal) - { - this.code = code; - this.isDCLocal = isDCLocal; - } - - public static ConsistencyLevel fromCode(int code) - { - if (code < 0 || code >= codeIdx.length) - throw new ProtocolException(String.format("Unknown code %d for a consistency level", code)); - return codeIdx[code]; - } - -#endif - -inline size_t quorum_for(keyspace& ks) { - return (ks.get_replication_strategy().get_replication_factor() / 2) + 1; -} - -inline size_t local_quorum_for(keyspace& ks, const sstring& dc) { - using namespace locator; - - auto& rs = ks.get_replication_strategy(); - - if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); - - return (nrs->get_replication_factor(dc) / 2) + 1; - } - - return quorum_for(ks); -} - -inline size_t block_for_local_serial(keyspace& ks) { - using namespace locator; - - // - // TODO: Consider caching the final result in order to avoid all these - // useless dereferencing. Note however that this will introduce quite - // a lot of complications since both snitch output for a local host - // and the snitch itself (and thus its output) may change dynamically. - // - auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); - auto local_addr = utils::fb_utilities::get_broadcast_address(); - - return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr)); -} - -inline size_t block_for_each_quorum(keyspace& ks) { - using namespace locator; - - auto& rs = ks.get_replication_strategy(); - - if (rs.get_type() == replication_strategy_type::network_topology) { - network_topology_strategy* nrs = - static_cast(&rs); - size_t n = 0; - - for (auto& dc : nrs->get_datacenters()) { - n += local_quorum_for(ks, dc); - } - - return n; - } else { - return quorum_for(ks); - } -} - -inline size_t block_for(keyspace& ks, consistency_level cl) { - switch (cl) { - case consistency_level::ONE: - case consistency_level::LOCAL_ONE: - return 1; - case consistency_level::ANY: - return 1; - case consistency_level::TWO: - return 2; - case consistency_level::THREE: - return 3; - case consistency_level::QUORUM: - case consistency_level::SERIAL: - return quorum_for(ks); - case consistency_level::ALL: - return ks.get_replication_strategy().get_replication_factor(); - case consistency_level::LOCAL_QUORUM: - case consistency_level::LOCAL_SERIAL: - return block_for_local_serial(ks); - case consistency_level::EACH_QUORUM: - return block_for_each_quorum(ks); - default: - abort(); - } -} - -inline bool is_datacenter_local(consistency_level l) { - return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM; -} - -inline bool is_local(gms::inet_address endpoint) { - using namespace locator; - - auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr(); - auto local_addr = utils::fb_utilities::get_broadcast_address(); - - return snitch_ptr->get_datacenter(local_addr) == - snitch_ptr->get_datacenter(endpoint); -} +bool is_local(gms::inet_address endpoint); template inline size_t count_local_endpoints(Range& live_endpoints) { return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local); } -inline std::vector +std::vector filter_for_query_dc_local(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { - using namespace gms; + const std::vector& live_endpoints); - std::vector local; - std::vector other; - local.reserve(live_endpoints.size()); - other.reserve(live_endpoints.size()); - - std::partition_copy(live_endpoints.begin(), live_endpoints.end(), - std::back_inserter(local), std::back_inserter(other), - is_local); - - // check if blockfor more than we have localep's - size_t bf = block_for(ks, cl); - if (local.size() < bf) { - size_t other_items_count = std::min(bf - local.size(), other.size()); - local.reserve(local.size() + other_items_count); - - std::move(other.begin(), other.begin() + other_items_count, - std::back_inserter(local)); - } - - return local; -} - -inline std::vector +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair) { - /* - * Endpoints are expected to be restricted to live replicas, sorted by - * snitch preference. For LOCAL_QUORUM, move local-DC replicas in front - * first as we need them there whether we do read repair (since the first - * replica gets the data read) or not (since we'll take the block_for first - * ones). - */ - if (is_datacenter_local(cl)) { - boost::range::partition(live_endpoints, is_local); - } + read_repair_decision read_repair); - switch (read_repair) { - case read_repair_decision::NONE: - { - size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl)); - - live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end()); - } - // fall through - case read_repair_decision::GLOBAL: - return std::move(live_endpoints); - case read_repair_decision::DC_LOCAL: - return filter_for_query_dc_local(cl, ks, live_endpoints); - default: - throw std::runtime_error("Unknown read repair type"); - } -} - -inline -std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints) { - return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE); -} +std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints); template inline std::unordered_map count_per_dc_endpoints( @@ -324,39 +98,10 @@ inline std::unordered_map count_per_dc_endpoints( return dc_endpoints; } -inline bool +bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { - using namespace locator; - - switch (cl) { - case consistency_level::ANY: - // local hint is acceptable, and local node is always live - return true; - case consistency_level::LOCAL_ONE: - return count_local_endpoints(live_endpoints) >= 1; - case consistency_level::LOCAL_QUORUM: - return count_local_endpoints(live_endpoints) >= block_for(ks, cl); - case consistency_level::EACH_QUORUM: - { - auto& rs = ks.get_replication_strategy(); - - if (rs.get_type() == replication_strategy_type::network_topology) { - for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) { - if (entry.second < local_quorum_for(ks, entry.first)) { - return false; - } - } - - return true; - } - } - // Fallthough on purpose for SimpleStrategy - default: - return live_endpoints.size() >= block_for(ks, cl); - } -} + const std::vector& live_endpoints); template inline bool assure_sufficient_live_nodes_each_quorum( @@ -373,7 +118,7 @@ inline bool assure_sufficient_live_nodes_each_quorum( auto dc_live = entry.second; if (dc_live < dc_block_for) { - throw unavailable_exception(cl, dc_block_for, dc_live); + throw exceptions::unavailable_exception(cl, dc_block_for, dc_live); } } @@ -396,7 +141,7 @@ inline void assure_sufficient_live_nodes( break; case consistency_level::LOCAL_ONE: if (count_local_endpoints(live_endpoints) == 0) { - throw unavailable_exception(cl, 1, 0); + throw exceptions::unavailable_exception(cl, 1, 0); } break; case consistency_level::LOCAL_QUORUM: { @@ -415,7 +160,7 @@ inline void assure_sufficient_live_nodes( logger.debug(builder.toString()); } #endif - throw unavailable_exception(cl, need, local_live); + throw exceptions::unavailable_exception(cl, need, local_live); } break; } @@ -428,80 +173,18 @@ inline void assure_sufficient_live_nodes( size_t live = live_endpoints.size(); if (live < need) { // logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor); - throw unavailable_exception(cl, need, live); + throw exceptions::unavailable_exception(cl, need, live); } break; } } -static inline -void validate_for_read(const sstring& keyspace_name, consistency_level cl) { - switch (cl) { - case consistency_level::ANY: - throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes"); - case consistency_level::EACH_QUORUM: - throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes"); - default: - break; - } -} +void validate_for_read(const sstring& keyspace_name, consistency_level cl); -static inline -void validate_for_write(const sstring& keyspace_name, consistency_level cl) { - switch (cl) { - case consistency_level::SERIAL: - case consistency_level::LOCAL_SERIAL: - throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes"); - default: - break; - } -} +void validate_for_write(const sstring& keyspace_name, consistency_level cl); -#if 0 - // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL - public void validateForCasCommit(String keyspaceName) throws InvalidRequestException - { - switch (this) - { - case EACH_QUORUM: - requireNetworkTopologyStrategy(keyspaceName); - break; - case SERIAL: - case LOCAL_SERIAL: - throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); - } - } +bool is_serial_consistency(consistency_level cl); - public void validateForCas() throws InvalidRequestException - { - if (!isSerialConsistency()) - throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); - } -#endif - -static inline -bool is_serial_consistency(consistency_level cl) { - return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL; -} - -static inline -void validate_counter_for_write(schema_ptr s, consistency_level cl) { - if (cl == consistency_level::ANY) { - throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name())); - } - - if (is_serial_consistency(cl)) { - throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable"); - } -} - -#if 0 - private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException - { - AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - if (!(strategy instanceof NetworkTopologyStrategy)) - throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName())); - } -#endif +void validate_counter_for_write(schema_ptr s, consistency_level cl); } diff --git a/db/consistency_level_type.hh b/db/consistency_level_type.hh new file mode 100644 index 0000000000..fe40d49e50 --- /dev/null +++ b/db/consistency_level_type.hh @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#pragma once + +#include + +namespace db { + +enum class consistency_level { + ANY, + ONE, + TWO, + THREE, + QUORUM, + ALL, + LOCAL_QUORUM, + EACH_QUORUM, + SERIAL, + LOCAL_SERIAL, + LOCAL_ONE +}; + +std::ostream& operator<<(std::ostream& os, consistency_level cl); + +} diff --git a/exceptions/exceptions.hh b/exceptions/exceptions.hh index 28e318ac01..db1dd4009a 100644 --- a/exceptions/exceptions.hh +++ b/exceptions/exceptions.hh @@ -22,9 +22,9 @@ * Modified by Cloudius Systems */ -#ifndef EXCEPTIONS_HH -#define EXCEPTIONS_HH +#pragma once +#include "db/consistency_level_type.hh" #include #include "core/sstring.hh" #include "core/print.hh" @@ -69,6 +69,43 @@ public: sstring get_message() const { return what(); } }; +struct unavailable_exception : exceptions::cassandra_exception { + db::consistency_level consistency; + int32_t required; + int32_t alive; + + unavailable_exception(db::consistency_level cl, int32_t required, int32_t alive) + : exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive)) + , consistency(cl) + , required(required) + , alive(alive) + {} +}; + +class request_timeout_exception : public cassandra_exception { +public: + db::consistency_level consistency; + int32_t received; + int32_t block_for; + + request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for) + : cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)} + , consistency{consistency} + , received{received} + , block_for{block_for} + { } +}; + +class read_timeout_exception : public request_timeout_exception { +public: + bool data_present; + + read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present) + : request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for} + , data_present{data_present} + { } +}; + class request_validation_exception : public cassandra_exception { public: using cassandra_exception::cassandra_exception; @@ -151,4 +188,3 @@ public: }; } -#endif diff --git a/exceptions/request_timeout_exception.hh b/exceptions/request_timeout_exception.hh deleted file mode 100644 index 9aa896fe31..0000000000 --- a/exceptions/request_timeout_exception.hh +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Copyright 2015 Cloudius Systems - * - * Modified by Cloudius Systems - */ - -#pragma once - -#include "exceptions/exceptions.hh" -#include "db/consistency_level.hh" - -namespace exceptions { - -class request_timeout_exception : public cassandra_exception { -public: - db::consistency_level consistency; - int32_t received; - int32_t block_for; - - request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for) - : cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)} - , consistency{consistency} - , received{received} - , block_for{block_for} - { } -}; - -class read_timeout_exception : public request_timeout_exception { -public: - bool data_present; - - read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present) - : request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for} - , data_present{data_present} - { } -}; - -} diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b5684b7aa2..5b6a8e6776 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -38,7 +38,7 @@ #include "core/future-util.hh" #include "db/read_repair_decision.hh" #include "db/config.hh" -#include "exceptions/request_timeout_exception.hh" +#include "exceptions/exceptions.hh" #include #include #include @@ -796,7 +796,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) }); } catch (no_such_keyspace& ex) { return make_exception_future<>(std::current_exception()); - } catch(db::unavailable_exception& ex) { + } catch (exceptions::unavailable_exception& ex) { _stats.write_unavailables++; logger.trace("Unavailable"); return make_exception_future<>(std::current_exception()); diff --git a/transport/server.cc b/transport/server.cc index 80987ae549..ac17c9115b 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -23,7 +23,7 @@ #include "service/query_state.hh" #include "service/client_state.hh" #include "transport/protocol_exception.hh" -#include "exceptions/request_timeout_exception.hh" +#include "exceptions/exceptions.hh" #include #include @@ -409,7 +409,7 @@ future<> cql_server::connection::process_request() { --_server._requests_serving; try { f.get(); - } catch (const db::unavailable_exception& ex) { + } catch (const exceptions::unavailable_exception& ex) { write_unavailable_error(stream, ex.code(), ex.consistency, ex.required, ex.alive); } catch (const exceptions::read_timeout_exception& ex) { write_read_timeout_error(stream, ex.code(), ex.consistency, ex.received, ex.block_for, ex.data_present);