mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-25 19:10:42 +00:00
Merge branch 'penberg/consistency-level-cleanups/v1' from seastar-dev.git
From Pekka: This series cleans up consistency_level.hh dependency issues that allow us to use 'enum consistency_level' cleanly in CQL transport layer exceptions in exceptions.hh. No functional changes.
This commit is contained in:
@@ -266,6 +266,7 @@ urchin_core = (['database.cc',
|
||||
'cql3/selection/selection.cc',
|
||||
'cql3/selection/selector.cc',
|
||||
'cql3/restrictions/statement_restrictions.cc',
|
||||
'db/consistency_level.cc',
|
||||
'db/system_keyspace.cc',
|
||||
'db/legacy_schema_tables.cc',
|
||||
'db/commitlog/commitlog.cc',
|
||||
|
||||
299
db/consistency_level.cc
Normal file
299
db/consistency_level.cc
Normal file
@@ -0,0 +1,299 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#include "db/consistency_level.hh"
|
||||
|
||||
#include <boost/range/algorithm/partition.hpp>
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "core/sstring.hh"
|
||||
#include "schema.hh"
|
||||
#include "database.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "db/read_repair_decision.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
size_t quorum_for(keyspace& ks) {
|
||||
return (ks.get_replication_strategy().get_replication_factor() / 2) + 1;
|
||||
}
|
||||
|
||||
size_t local_quorum_for(keyspace& ks, const sstring& dc) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
network_topology_strategy* nrs =
|
||||
static_cast<network_topology_strategy*>(&rs);
|
||||
|
||||
return (nrs->get_replication_factor(dc) / 2) + 1;
|
||||
}
|
||||
|
||||
return quorum_for(ks);
|
||||
}
|
||||
|
||||
size_t block_for_local_serial(keyspace& ks) {
|
||||
using namespace locator;
|
||||
|
||||
//
|
||||
// TODO: Consider caching the final result in order to avoid all these
|
||||
// useless dereferencing. Note however that this will introduce quite
|
||||
// a lot of complications since both snitch output for a local host
|
||||
// and the snitch itself (and thus its output) may change dynamically.
|
||||
//
|
||||
auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr));
|
||||
}
|
||||
|
||||
size_t block_for_each_quorum(keyspace& ks) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
network_topology_strategy* nrs =
|
||||
static_cast<network_topology_strategy*>(&rs);
|
||||
size_t n = 0;
|
||||
|
||||
for (auto& dc : nrs->get_datacenters()) {
|
||||
n += local_quorum_for(ks, dc);
|
||||
}
|
||||
|
||||
return n;
|
||||
} else {
|
||||
return quorum_for(ks);
|
||||
}
|
||||
}
|
||||
|
||||
size_t block_for(keyspace& ks, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::ONE:
|
||||
case consistency_level::LOCAL_ONE:
|
||||
return 1;
|
||||
case consistency_level::ANY:
|
||||
return 1;
|
||||
case consistency_level::TWO:
|
||||
return 2;
|
||||
case consistency_level::THREE:
|
||||
return 3;
|
||||
case consistency_level::QUORUM:
|
||||
case consistency_level::SERIAL:
|
||||
return quorum_for(ks);
|
||||
case consistency_level::ALL:
|
||||
return ks.get_replication_strategy().get_replication_factor();
|
||||
case consistency_level::LOCAL_QUORUM:
|
||||
case consistency_level::LOCAL_SERIAL:
|
||||
return block_for_local_serial(ks);
|
||||
case consistency_level::EACH_QUORUM:
|
||||
return block_for_each_quorum(ks);
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
bool is_datacenter_local(consistency_level l) {
|
||||
return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM;
|
||||
}
|
||||
|
||||
bool is_local(gms::inet_address endpoint) {
|
||||
using namespace locator;
|
||||
|
||||
auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
return snitch_ptr->get_datacenter(local_addr) ==
|
||||
snitch_ptr->get_datacenter(endpoint);
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address>
|
||||
filter_for_query_dc_local(consistency_level cl,
|
||||
keyspace& ks,
|
||||
const std::vector<gms::inet_address>& live_endpoints) {
|
||||
using namespace gms;
|
||||
|
||||
std::vector<inet_address> local;
|
||||
std::vector<inet_address> other;
|
||||
local.reserve(live_endpoints.size());
|
||||
other.reserve(live_endpoints.size());
|
||||
|
||||
std::partition_copy(live_endpoints.begin(), live_endpoints.end(),
|
||||
std::back_inserter(local), std::back_inserter(other),
|
||||
is_local);
|
||||
|
||||
// check if blockfor more than we have localep's
|
||||
size_t bf = block_for(ks, cl);
|
||||
if (local.size() < bf) {
|
||||
size_t other_items_count = std::min(bf - local.size(), other.size());
|
||||
local.reserve(local.size() + other_items_count);
|
||||
|
||||
std::move(other.begin(), other.begin() + other_items_count,
|
||||
std::back_inserter(local));
|
||||
}
|
||||
|
||||
return local;
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address>
|
||||
filter_for_query(consistency_level cl,
|
||||
keyspace& ks,
|
||||
std::vector<gms::inet_address> live_endpoints,
|
||||
read_repair_decision read_repair) {
|
||||
/*
|
||||
* Endpoints are expected to be restricted to live replicas, sorted by
|
||||
* snitch preference. For LOCAL_QUORUM, move local-DC replicas in front
|
||||
* first as we need them there whether we do read repair (since the first
|
||||
* replica gets the data read) or not (since we'll take the block_for first
|
||||
* ones).
|
||||
*/
|
||||
if (is_datacenter_local(cl)) {
|
||||
boost::range::partition(live_endpoints, is_local);
|
||||
}
|
||||
|
||||
switch (read_repair) {
|
||||
case read_repair_decision::NONE:
|
||||
{
|
||||
size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl));
|
||||
|
||||
live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end());
|
||||
}
|
||||
// fall through
|
||||
case read_repair_decision::GLOBAL:
|
||||
return std::move(live_endpoints);
|
||||
case read_repair_decision::DC_LOCAL:
|
||||
return filter_for_query_dc_local(cl, ks, live_endpoints);
|
||||
default:
|
||||
throw std::runtime_error("Unknown read repair type");
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> filter_for_query(consistency_level cl, keyspace& ks, std::vector<gms::inet_address>& live_endpoints) {
|
||||
return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE);
|
||||
}
|
||||
|
||||
bool
|
||||
is_sufficient_live_nodes(consistency_level cl,
|
||||
keyspace& ks,
|
||||
const std::vector<gms::inet_address>& live_endpoints) {
|
||||
using namespace locator;
|
||||
|
||||
switch (cl) {
|
||||
case consistency_level::ANY:
|
||||
// local hint is acceptable, and local node is always live
|
||||
return true;
|
||||
case consistency_level::LOCAL_ONE:
|
||||
return count_local_endpoints(live_endpoints) >= 1;
|
||||
case consistency_level::LOCAL_QUORUM:
|
||||
return count_local_endpoints(live_endpoints) >= block_for(ks, cl);
|
||||
case consistency_level::EACH_QUORUM:
|
||||
{
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) {
|
||||
if (entry.second < local_quorum_for(ks, entry.first)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Fallthough on purpose for SimpleStrategy
|
||||
default:
|
||||
return live_endpoints.size() >= block_for(ks, cl);
|
||||
}
|
||||
}
|
||||
|
||||
void validate_for_read(const sstring& keyspace_name, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::ANY:
|
||||
throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes");
|
||||
case consistency_level::EACH_QUORUM:
|
||||
throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes");
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void validate_for_write(const sstring& keyspace_name, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::SERIAL:
|
||||
case consistency_level::LOCAL_SERIAL:
|
||||
throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes");
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
// This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
|
||||
public void validateForCasCommit(String keyspaceName) throws InvalidRequestException
|
||||
{
|
||||
switch (this)
|
||||
{
|
||||
case EACH_QUORUM:
|
||||
requireNetworkTopologyStrategy(keyspaceName);
|
||||
break;
|
||||
case SERIAL:
|
||||
case LOCAL_SERIAL:
|
||||
throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
|
||||
}
|
||||
}
|
||||
|
||||
public void validateForCas() throws InvalidRequestException
|
||||
{
|
||||
if (!isSerialConsistency())
|
||||
throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
|
||||
}
|
||||
#endif
|
||||
|
||||
bool is_serial_consistency(consistency_level cl) {
|
||||
return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL;
|
||||
}
|
||||
|
||||
void validate_counter_for_write(schema_ptr s, consistency_level cl) {
|
||||
if (cl == consistency_level::ANY) {
|
||||
throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name()));
|
||||
}
|
||||
|
||||
if (is_serial_consistency(cl)) {
|
||||
throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable");
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException
|
||||
{
|
||||
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
|
||||
if (!(strategy instanceof NetworkTopologyStrategy))
|
||||
throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName()));
|
||||
}
|
||||
#endif
|
||||
|
||||
}
|
||||
@@ -24,276 +24,50 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/range/algorithm/partition.hpp>
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "core/sstring.hh"
|
||||
#include "schema.hh"
|
||||
#include "database.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "db/read_repair_decision.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/read_repair_decision.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "database.hh"
|
||||
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
namespace db {
|
||||
|
||||
#if 0
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
size_t quorum_for(keyspace& ks);
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import net.nicoulaj.compilecommand.annotations.Inline;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
size_t local_quorum_for(keyspace& ks, const sstring& dc);
|
||||
|
||||
import org.apache.cassandra.config.CFMetaData;
|
||||
import org.apache.cassandra.config.DatabaseDescriptor;
|
||||
import org.apache.cassandra.config.ReadRepairDecision;
|
||||
import org.apache.cassandra.exceptions.InvalidRequestException;
|
||||
import org.apache.cassandra.exceptions.UnavailableException;
|
||||
import org.apache.cassandra.locator.AbstractReplicationStrategy;
|
||||
import org.apache.cassandra.locator.NetworkTopologyStrategy;
|
||||
import org.apache.cassandra.transport.ProtocolException;
|
||||
#endif
|
||||
size_t block_for_local_serial(keyspace& ks);
|
||||
|
||||
enum class consistency_level {
|
||||
ANY,
|
||||
ONE,
|
||||
TWO,
|
||||
THREE,
|
||||
QUORUM,
|
||||
ALL,
|
||||
LOCAL_QUORUM,
|
||||
EACH_QUORUM,
|
||||
SERIAL,
|
||||
LOCAL_SERIAL,
|
||||
LOCAL_ONE
|
||||
};
|
||||
size_t block_for_each_quorum(keyspace& ks);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, consistency_level cl);
|
||||
size_t block_for(keyspace& ks, consistency_level cl);
|
||||
|
||||
struct unavailable_exception : exceptions::cassandra_exception {
|
||||
consistency_level consistency;
|
||||
int32_t required;
|
||||
int32_t alive;
|
||||
bool is_datacenter_local(consistency_level l);
|
||||
|
||||
unavailable_exception(consistency_level cl, int32_t required, int32_t alive)
|
||||
: exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive))
|
||||
, consistency(cl)
|
||||
, required(required)
|
||||
, alive(alive)
|
||||
{}
|
||||
};
|
||||
|
||||
#if 0
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);
|
||||
|
||||
// Used by the binary protocol
|
||||
public final int code;
|
||||
private final boolean isDCLocal;
|
||||
private static final ConsistencyLevel[] codeIdx;
|
||||
static
|
||||
{
|
||||
int maxCode = -1;
|
||||
for (ConsistencyLevel cl : ConsistencyLevel.values())
|
||||
maxCode = Math.max(maxCode, cl.code);
|
||||
codeIdx = new ConsistencyLevel[maxCode + 1];
|
||||
for (ConsistencyLevel cl : ConsistencyLevel.values())
|
||||
{
|
||||
if (codeIdx[cl.code] != null)
|
||||
throw new IllegalStateException("Duplicate code");
|
||||
codeIdx[cl.code] = cl;
|
||||
}
|
||||
}
|
||||
|
||||
private ConsistencyLevel(int code)
|
||||
{
|
||||
this(code, false);
|
||||
}
|
||||
|
||||
private ConsistencyLevel(int code, boolean isDCLocal)
|
||||
{
|
||||
this.code = code;
|
||||
this.isDCLocal = isDCLocal;
|
||||
}
|
||||
|
||||
public static ConsistencyLevel fromCode(int code)
|
||||
{
|
||||
if (code < 0 || code >= codeIdx.length)
|
||||
throw new ProtocolException(String.format("Unknown code %d for a consistency level", code));
|
||||
return codeIdx[code];
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
inline size_t quorum_for(keyspace& ks) {
|
||||
return (ks.get_replication_strategy().get_replication_factor() / 2) + 1;
|
||||
}
|
||||
|
||||
inline size_t local_quorum_for(keyspace& ks, const sstring& dc) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
network_topology_strategy* nrs =
|
||||
static_cast<network_topology_strategy*>(&rs);
|
||||
|
||||
return (nrs->get_replication_factor(dc) / 2) + 1;
|
||||
}
|
||||
|
||||
return quorum_for(ks);
|
||||
}
|
||||
|
||||
inline size_t block_for_local_serial(keyspace& ks) {
|
||||
using namespace locator;
|
||||
|
||||
//
|
||||
// TODO: Consider caching the final result in order to avoid all these
|
||||
// useless dereferencing. Note however that this will introduce quite
|
||||
// a lot of complications since both snitch output for a local host
|
||||
// and the snitch itself (and thus its output) may change dynamically.
|
||||
//
|
||||
auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
return local_quorum_for(ks, snitch_ptr->get_datacenter(local_addr));
|
||||
}
|
||||
|
||||
inline size_t block_for_each_quorum(keyspace& ks) {
|
||||
using namespace locator;
|
||||
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
network_topology_strategy* nrs =
|
||||
static_cast<network_topology_strategy*>(&rs);
|
||||
size_t n = 0;
|
||||
|
||||
for (auto& dc : nrs->get_datacenters()) {
|
||||
n += local_quorum_for(ks, dc);
|
||||
}
|
||||
|
||||
return n;
|
||||
} else {
|
||||
return quorum_for(ks);
|
||||
}
|
||||
}
|
||||
|
||||
inline size_t block_for(keyspace& ks, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::ONE:
|
||||
case consistency_level::LOCAL_ONE:
|
||||
return 1;
|
||||
case consistency_level::ANY:
|
||||
return 1;
|
||||
case consistency_level::TWO:
|
||||
return 2;
|
||||
case consistency_level::THREE:
|
||||
return 3;
|
||||
case consistency_level::QUORUM:
|
||||
case consistency_level::SERIAL:
|
||||
return quorum_for(ks);
|
||||
case consistency_level::ALL:
|
||||
return ks.get_replication_strategy().get_replication_factor();
|
||||
case consistency_level::LOCAL_QUORUM:
|
||||
case consistency_level::LOCAL_SERIAL:
|
||||
return block_for_local_serial(ks);
|
||||
case consistency_level::EACH_QUORUM:
|
||||
return block_for_each_quorum(ks);
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
inline bool is_datacenter_local(consistency_level l) {
|
||||
return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM;
|
||||
}
|
||||
|
||||
inline bool is_local(gms::inet_address endpoint) {
|
||||
using namespace locator;
|
||||
|
||||
auto& snitch_ptr = i_endpoint_snitch::get_local_snitch_ptr();
|
||||
auto local_addr = utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
return snitch_ptr->get_datacenter(local_addr) ==
|
||||
snitch_ptr->get_datacenter(endpoint);
|
||||
}
|
||||
bool is_local(gms::inet_address endpoint);
|
||||
|
||||
template<typename Range>
|
||||
inline size_t count_local_endpoints(Range& live_endpoints) {
|
||||
return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local);
|
||||
}
|
||||
|
||||
inline std::vector<gms::inet_address>
|
||||
std::vector<gms::inet_address>
|
||||
filter_for_query_dc_local(consistency_level cl,
|
||||
keyspace& ks,
|
||||
const std::vector<gms::inet_address>& live_endpoints) {
|
||||
using namespace gms;
|
||||
const std::vector<gms::inet_address>& live_endpoints);
|
||||
|
||||
std::vector<inet_address> local;
|
||||
std::vector<inet_address> other;
|
||||
local.reserve(live_endpoints.size());
|
||||
other.reserve(live_endpoints.size());
|
||||
|
||||
std::partition_copy(live_endpoints.begin(), live_endpoints.end(),
|
||||
std::back_inserter(local), std::back_inserter(other),
|
||||
is_local);
|
||||
|
||||
// check if blockfor more than we have localep's
|
||||
size_t bf = block_for(ks, cl);
|
||||
if (local.size() < bf) {
|
||||
size_t other_items_count = std::min(bf - local.size(), other.size());
|
||||
local.reserve(local.size() + other_items_count);
|
||||
|
||||
std::move(other.begin(), other.begin() + other_items_count,
|
||||
std::back_inserter(local));
|
||||
}
|
||||
|
||||
return local;
|
||||
}
|
||||
|
||||
inline std::vector<gms::inet_address>
|
||||
std::vector<gms::inet_address>
|
||||
filter_for_query(consistency_level cl,
|
||||
keyspace& ks,
|
||||
std::vector<gms::inet_address> live_endpoints,
|
||||
read_repair_decision read_repair) {
|
||||
/*
|
||||
* Endpoints are expected to be restricted to live replicas, sorted by
|
||||
* snitch preference. For LOCAL_QUORUM, move local-DC replicas in front
|
||||
* first as we need them there whether we do read repair (since the first
|
||||
* replica gets the data read) or not (since we'll take the block_for first
|
||||
* ones).
|
||||
*/
|
||||
if (is_datacenter_local(cl)) {
|
||||
boost::range::partition(live_endpoints, is_local);
|
||||
}
|
||||
read_repair_decision read_repair);
|
||||
|
||||
switch (read_repair) {
|
||||
case read_repair_decision::NONE:
|
||||
{
|
||||
size_t start_pos = std::min(live_endpoints.size(), block_for(ks, cl));
|
||||
|
||||
live_endpoints.erase(live_endpoints.begin() + start_pos, live_endpoints.end());
|
||||
}
|
||||
// fall through
|
||||
case read_repair_decision::GLOBAL:
|
||||
return std::move(live_endpoints);
|
||||
case read_repair_decision::DC_LOCAL:
|
||||
return filter_for_query_dc_local(cl, ks, live_endpoints);
|
||||
default:
|
||||
throw std::runtime_error("Unknown read repair type");
|
||||
}
|
||||
}
|
||||
|
||||
inline
|
||||
std::vector<gms::inet_address> filter_for_query(consistency_level cl, keyspace& ks, std::vector<gms::inet_address>& live_endpoints) {
|
||||
return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE);
|
||||
}
|
||||
std::vector<gms::inet_address> filter_for_query(consistency_level cl, keyspace& ks, std::vector<gms::inet_address>& live_endpoints);
|
||||
|
||||
template <typename Range>
|
||||
inline std::unordered_map<sstring, size_t> count_per_dc_endpoints(
|
||||
@@ -324,39 +98,10 @@ inline std::unordered_map<sstring, size_t> count_per_dc_endpoints(
|
||||
return dc_endpoints;
|
||||
}
|
||||
|
||||
inline bool
|
||||
bool
|
||||
is_sufficient_live_nodes(consistency_level cl,
|
||||
keyspace& ks,
|
||||
const std::vector<gms::inet_address>& live_endpoints) {
|
||||
using namespace locator;
|
||||
|
||||
switch (cl) {
|
||||
case consistency_level::ANY:
|
||||
// local hint is acceptable, and local node is always live
|
||||
return true;
|
||||
case consistency_level::LOCAL_ONE:
|
||||
return count_local_endpoints(live_endpoints) >= 1;
|
||||
case consistency_level::LOCAL_QUORUM:
|
||||
return count_local_endpoints(live_endpoints) >= block_for(ks, cl);
|
||||
case consistency_level::EACH_QUORUM:
|
||||
{
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
if (rs.get_type() == replication_strategy_type::network_topology) {
|
||||
for (auto& entry : count_per_dc_endpoints(ks, live_endpoints)) {
|
||||
if (entry.second < local_quorum_for(ks, entry.first)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Fallthough on purpose for SimpleStrategy
|
||||
default:
|
||||
return live_endpoints.size() >= block_for(ks, cl);
|
||||
}
|
||||
}
|
||||
const std::vector<gms::inet_address>& live_endpoints);
|
||||
|
||||
template<typename Range>
|
||||
inline bool assure_sufficient_live_nodes_each_quorum(
|
||||
@@ -373,7 +118,7 @@ inline bool assure_sufficient_live_nodes_each_quorum(
|
||||
auto dc_live = entry.second;
|
||||
|
||||
if (dc_live < dc_block_for) {
|
||||
throw unavailable_exception(cl, dc_block_for, dc_live);
|
||||
throw exceptions::unavailable_exception(cl, dc_block_for, dc_live);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,7 +141,7 @@ inline void assure_sufficient_live_nodes(
|
||||
break;
|
||||
case consistency_level::LOCAL_ONE:
|
||||
if (count_local_endpoints(live_endpoints) == 0) {
|
||||
throw unavailable_exception(cl, 1, 0);
|
||||
throw exceptions::unavailable_exception(cl, 1, 0);
|
||||
}
|
||||
break;
|
||||
case consistency_level::LOCAL_QUORUM: {
|
||||
@@ -415,7 +160,7 @@ inline void assure_sufficient_live_nodes(
|
||||
logger.debug(builder.toString());
|
||||
}
|
||||
#endif
|
||||
throw unavailable_exception(cl, need, local_live);
|
||||
throw exceptions::unavailable_exception(cl, need, local_live);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -428,80 +173,18 @@ inline void assure_sufficient_live_nodes(
|
||||
size_t live = live_endpoints.size();
|
||||
if (live < need) {
|
||||
// logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor);
|
||||
throw unavailable_exception(cl, need, live);
|
||||
throw exceptions::unavailable_exception(cl, need, live);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static inline
|
||||
void validate_for_read(const sstring& keyspace_name, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::ANY:
|
||||
throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes");
|
||||
case consistency_level::EACH_QUORUM:
|
||||
throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes");
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
void validate_for_read(const sstring& keyspace_name, consistency_level cl);
|
||||
|
||||
static inline
|
||||
void validate_for_write(const sstring& keyspace_name, consistency_level cl) {
|
||||
switch (cl) {
|
||||
case consistency_level::SERIAL:
|
||||
case consistency_level::LOCAL_SERIAL:
|
||||
throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes");
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
void validate_for_write(const sstring& keyspace_name, consistency_level cl);
|
||||
|
||||
#if 0
|
||||
// This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
|
||||
public void validateForCasCommit(String keyspaceName) throws InvalidRequestException
|
||||
{
|
||||
switch (this)
|
||||
{
|
||||
case EACH_QUORUM:
|
||||
requireNetworkTopologyStrategy(keyspaceName);
|
||||
break;
|
||||
case SERIAL:
|
||||
case LOCAL_SERIAL:
|
||||
throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
|
||||
}
|
||||
}
|
||||
bool is_serial_consistency(consistency_level cl);
|
||||
|
||||
public void validateForCas() throws InvalidRequestException
|
||||
{
|
||||
if (!isSerialConsistency())
|
||||
throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
|
||||
}
|
||||
#endif
|
||||
|
||||
static inline
|
||||
bool is_serial_consistency(consistency_level cl) {
|
||||
return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL;
|
||||
}
|
||||
|
||||
static inline
|
||||
void validate_counter_for_write(schema_ptr s, consistency_level cl) {
|
||||
if (cl == consistency_level::ANY) {
|
||||
throw exceptions::invalid_request_exception(sprint("Consistency level ANY is not yet supported for counter table %s", s->cf_name()));
|
||||
}
|
||||
|
||||
if (is_serial_consistency(cl)) {
|
||||
throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable");
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException
|
||||
{
|
||||
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
|
||||
if (!(strategy instanceof NetworkTopologyStrategy))
|
||||
throw new InvalidRequestException(String.format("consistency level %s not compatible with replication strategy (%s)", this, strategy.getClass().getName()));
|
||||
}
|
||||
#endif
|
||||
void validate_counter_for_write(schema_ptr s, consistency_level cl);
|
||||
|
||||
}
|
||||
|
||||
47
db/consistency_level_type.hh
Normal file
47
db/consistency_level_type.hh
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace db {
|
||||
|
||||
enum class consistency_level {
|
||||
ANY,
|
||||
ONE,
|
||||
TWO,
|
||||
THREE,
|
||||
QUORUM,
|
||||
ALL,
|
||||
LOCAL_QUORUM,
|
||||
EACH_QUORUM,
|
||||
SERIAL,
|
||||
LOCAL_SERIAL,
|
||||
LOCAL_ONE
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, consistency_level cl);
|
||||
|
||||
}
|
||||
@@ -22,9 +22,9 @@
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#ifndef EXCEPTIONS_HH
|
||||
#define EXCEPTIONS_HH
|
||||
#pragma once
|
||||
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include <stdexcept>
|
||||
#include "core/sstring.hh"
|
||||
#include "core/print.hh"
|
||||
@@ -69,6 +69,43 @@ public:
|
||||
sstring get_message() const { return what(); }
|
||||
};
|
||||
|
||||
struct unavailable_exception : exceptions::cassandra_exception {
|
||||
db::consistency_level consistency;
|
||||
int32_t required;
|
||||
int32_t alive;
|
||||
|
||||
unavailable_exception(db::consistency_level cl, int32_t required, int32_t alive)
|
||||
: exceptions::cassandra_exception(exceptions::exception_code::UNAVAILABLE, sprint("Cannot achieve consistency level for cl %s. Requires %ld, alive %ld", cl, required, alive))
|
||||
, consistency(cl)
|
||||
, required(required)
|
||||
, alive(alive)
|
||||
{}
|
||||
};
|
||||
|
||||
class request_timeout_exception : public cassandra_exception {
|
||||
public:
|
||||
db::consistency_level consistency;
|
||||
int32_t received;
|
||||
int32_t block_for;
|
||||
|
||||
request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for)
|
||||
: cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)}
|
||||
, consistency{consistency}
|
||||
, received{received}
|
||||
, block_for{block_for}
|
||||
{ }
|
||||
};
|
||||
|
||||
class read_timeout_exception : public request_timeout_exception {
|
||||
public:
|
||||
bool data_present;
|
||||
|
||||
read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present)
|
||||
: request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for}
|
||||
, data_present{data_present}
|
||||
{ }
|
||||
};
|
||||
|
||||
class request_validation_exception : public cassandra_exception {
|
||||
public:
|
||||
using cassandra_exception::cassandra_exception;
|
||||
@@ -151,4 +188,3 @@ public:
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
|
||||
namespace exceptions {
|
||||
|
||||
class request_timeout_exception : public cassandra_exception {
|
||||
public:
|
||||
db::consistency_level consistency;
|
||||
int32_t received;
|
||||
int32_t block_for;
|
||||
|
||||
request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for)
|
||||
: cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)}
|
||||
, consistency{consistency}
|
||||
, received{received}
|
||||
, block_for{block_for}
|
||||
{ }
|
||||
};
|
||||
|
||||
class read_timeout_exception : public request_timeout_exception {
|
||||
public:
|
||||
bool data_present;
|
||||
|
||||
read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present)
|
||||
: request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for}
|
||||
, data_present{data_present}
|
||||
{ }
|
||||
};
|
||||
|
||||
}
|
||||
@@ -38,7 +38,7 @@
|
||||
#include "core/future-util.hh"
|
||||
#include "db/read_repair_decision.hh"
|
||||
#include "db/config.hh"
|
||||
#include "exceptions/request_timeout_exception.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/iterator/counting_iterator.hpp>
|
||||
@@ -796,7 +796,7 @@ storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl)
|
||||
});
|
||||
} catch (no_such_keyspace& ex) {
|
||||
return make_exception_future<>(std::current_exception());
|
||||
} catch(db::unavailable_exception& ex) {
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.write_unavailables++;
|
||||
logger.trace("Unavailable");
|
||||
return make_exception_future<>(std::current_exception());
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
#include "service/query_state.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "transport/protocol_exception.hh"
|
||||
#include "exceptions/request_timeout_exception.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
@@ -409,7 +409,7 @@ future<> cql_server::connection::process_request() {
|
||||
--_server._requests_serving;
|
||||
try {
|
||||
f.get();
|
||||
} catch (const db::unavailable_exception& ex) {
|
||||
} catch (const exceptions::unavailable_exception& ex) {
|
||||
write_unavailable_error(stream, ex.code(), ex.consistency, ex.required, ex.alive);
|
||||
} catch (const exceptions::read_timeout_exception& ex) {
|
||||
write_read_timeout_error(stream, ex.code(), ex.consistency, ex.received, ex.block_for, ex.data_present);
|
||||
|
||||
Reference in New Issue
Block a user