cql3: Convert SingleColumnPrimaryKeyRestrictions

This commit is contained in:
Tomasz Grabiec
2015-03-04 21:26:24 +01:00
parent f2b9bbe74a
commit e202a13fff
4 changed files with 275 additions and 313 deletions

View File

@@ -1,312 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.restrictions;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.IndexExpression;
import org.apache.cassandra.db.composites.CBuilder;
import org.apache.cassandra.db.composites.CType;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.composites.Composite.EOC;
import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.composites.CompositesBuilder;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.exceptions.InvalidRequestException;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
/**
* A set of single column restrictions on a primary key part (partition key or clustering key).
*/
final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
{
/**
* The composite type.
*/
private final CType ctype;
/**
* The restrictions.
*/
private final SingleColumnRestrictions restrictions;
/**
* <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code> otherwise.
*/
private boolean eq;
/**
* <code>true</code> if the restrictions are corresponding to an IN, <code>false</code> otherwise.
*/
private boolean in;
/**
* <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code> otherwise.
*/
private boolean slice;
/**
* <code>true</code> if the restrictions are corresponding to a Contains, <code>false</code> otherwise.
*/
private boolean contains;
public SingleColumnPrimaryKeyRestrictions(CType ctype)
{
this.ctype = ctype;
this.restrictions = new SingleColumnRestrictions();
this.eq = true;
}
private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
SingleColumnRestriction restriction) throws InvalidRequestException
{
this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
this.ctype = primaryKeyRestrictions.ctype;
if (!primaryKeyRestrictions.isEmpty())
{
ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
ColumnDefinition newColumn = restriction.getColumnDef();
checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position() > lastColumn.position(),
"Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
newColumn.name,
lastColumn.name);
if (newColumn.position() < lastColumn.position())
checkFalse(restriction.isSlice(),
"PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
restrictions.nextColumn(newColumn).name,
newColumn.name);
}
if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
this.slice = true;
else if (restriction.isContains() || primaryKeyRestrictions.isContains())
this.contains = true;
else if (restriction.isIN())
this.in = true;
else
this.eq = true;
}
@Override
public boolean isSlice()
{
return slice;
}
@Override
public boolean isEQ()
{
return eq;
}
@Override
public boolean isIN()
{
return in;
}
@Override
public boolean isOnToken()
{
return false;
}
@Override
public boolean isContains()
{
return contains;
}
@Override
public boolean isMultiColumn()
{
return false;
}
@Override
public boolean usesFunction(String ksName, String functionName)
{
return restrictions.usesFunction(ksName, functionName);
}
@Override
public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
{
if (restriction.isMultiColumn())
{
checkTrue(isEmpty(),
"Mixing single column relations and multi column relations on clustering columns is not allowed");
return (PrimaryKeyRestrictions) restriction;
}
if (restriction.isOnToken())
{
checkTrue(isEmpty(), "Columns \"%s\" cannot be restricted by both a normal relation and a token relation",
((TokenRestriction) restriction).getColumnNamesAsString());
return (PrimaryKeyRestrictions) restriction;
}
return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
}
@Override
public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
{
CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
for (ColumnDefinition def : restrictions.getColumnDefs())
{
Restriction r = restrictions.getRestriction(def);
assert !r.isSlice();
List<ByteBuffer> values = r.values(options);
if (values.isEmpty())
return Collections.emptyList();
builder.addEachElementToAll(values);
checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
}
return builder.build();
}
@Override
public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
{
CBuilder builder = ctype.builder();
List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
// The end-of-component of composite doesn't depend on whether the
// component type is reversed or not (i.e. the ReversedType is applied
// to the component comparator but not to the end-of-component itself),
// it only depends on whether the slice is reversed
int keyPosition = 0;
for (ColumnDefinition def : defs)
{
// In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
// So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
// But if the actual comparator itself is reversed, we must inversed the bounds too.
Bound b = !def.isReversedType() ? bound : bound.reverse();
Restriction r = restrictions.getRestriction(def);
if (keyPosition != def.position() || r.isContains())
{
EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
return compositeBuilder.buildWithEOC(eoc);
}
if (r.isSlice())
{
if (!r.hasBound(b))
{
// There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
// For composites, if there was preceding component and we're computing the end, we must change the last component
// End-Of-Component, otherwise we would be selecting only one record.
EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
return compositeBuilder.buildWithEOC(eoc);
}
ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null clustering key part %s", r);
compositeBuilder.addElementToAll(value);
Composite.EOC eoc = eocFor(r, bound, b);
return compositeBuilder.buildWithEOC(eoc);
}
List<ByteBuffer> values = r.values(options);
if (values.isEmpty())
return Collections.emptyList();
compositeBuilder.addEachElementToAll(values);
checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part %s", def.name);
keyPosition++;
}
// Means no relation at all or everything was an equal
// Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
// it would be harmless to do it. However, we use this method got the partition key too. And when a query
// with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
// case using the eoc would be bad, since for the random partitioner we have no guarantee that
// prefix.end() will sort after prefix (see #5240).
EOC eoc = bound.isEnd() && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
return compositeBuilder.buildWithEOC(eoc);
}
@Override
public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
return Composites.toByteBuffers(valuesAsComposites(options));
}
@Override
public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
{
return Composites.toByteBuffers(boundsAsComposites(b, options));
}
private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
{
if (eocBound.isStart())
return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
}
@Override
public boolean hasBound(Bound b)
{
if (isEmpty())
return false;
return restrictions.lastRestriction().hasBound(b);
}
@Override
public boolean isInclusive(Bound b)
{
if (isEmpty())
return false;
return restrictions.lastRestriction().isInclusive(b);
}
@Override
public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
{
return restrictions.hasSupportingIndex(indexManager);
}
@Override
public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
{
restrictions.addIndexExpressionTo(expressions, options);
}
@Override
public Collection<ColumnDefinition> getColumnDefs()
{
return restrictions.getColumnDefs();
}
}

View File

@@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include <vector>
#include "schema.hh"
#include "cartesian_product.hh"
#include "cql3/restrictions/primary_key_restrictions.hh"
#include "cql3/restrictions/single_column_restrictions.hh"
namespace cql3 {
namespace restrictions {
/**
* A set of single column restrictions on a primary key part (partition key or clustering key).
*/
class single_column_primary_key_restrictions : public primary_key_restrictions {
private:
schema_ptr _schema;
::shared_ptr<single_column_restrictions> _restrictions;
::shared_ptr<tuple_type<true>> _tuple;
bool _slice;
bool _contains;
bool _in;
public:
single_column_primary_key_restrictions(schema_ptr schema, ::shared_ptr<tuple_type<true>> tuple)
: _schema(schema)
, _restrictions(::make_shared<single_column_restrictions>(schema))
, _tuple(std::move(tuple))
, _slice(false)
, _contains(false)
, _in(false)
{ }
virtual bool is_on_token() override {
return false;
}
virtual bool is_multi_column() override {
return false;
}
virtual bool is_slice() override {
return _slice;
}
virtual bool is_contains() override {
return _contains;
}
virtual bool is_IN() override {
return _in;
}
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) override {
return _restrictions->uses_function(ks_name, function_name);
}
void do_merge_with(::shared_ptr<single_column_restriction> restriction) {
if (!_restrictions->empty()) {
auto last_column = *_restrictions->last_column();
auto new_column = restriction->get_column_def();
if (_slice && _schema->position(new_column) > _schema->position(last_column)) {
throw exceptions::invalid_request_exception(sprint(
"Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
new_column.name_as_text(), last_column.name_as_text()));
}
if (_schema->position(new_column) < _schema->position(last_column)) {
if (restriction->is_slice()) {
throw exceptions::invalid_request_exception(sprint(
"PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
_restrictions->next_column(new_column)->name_as_text(), new_column.name_as_text()));
}
}
}
_slice |= restriction->is_slice();
_in |= restriction->is_IN();
_contains |= restriction->is_contains();
_restrictions->add_restriction(restriction);
}
virtual void merge_with(::shared_ptr<restriction> restriction) override {
if (restriction->is_multi_column()) {
throw exceptions::invalid_request_exception(
"Mixing single column relations and multi column relations on clustering columns is not allowed");
}
if (restriction->is_on_token()) {
fail(unimplemented::cause::TOKEN_RESTRICTION);
#if 0
throw exceptions::invalid_request_exception("Columns \"%s\" cannot be restricted by both a normal relation and a token relation",
((TokenRestriction) restriction).getColumnNamesAsString());
#endif
}
do_merge_with(::static_pointer_cast<single_column_restriction>(restriction));
}
virtual std::vector<bytes> values_as_serialized_tuples(const query_options& options) override {
std::vector<std::vector<bytes_opt>> value_vector;
value_vector.reserve(_restrictions->size());
for (auto def : _restrictions->get_column_defs()) {
auto r = _restrictions->get_restriction(*def);
assert(!r->is_slice());
std::vector<bytes_opt> values = r->values(options);
for (auto&& val : values) {
if (!val) {
throw exceptions::invalid_request_exception(sprint("Invalid null value for column %s", def->name_as_text()));
}
}
if (values.empty()) {
return {};
}
value_vector.emplace_back(std::move(values));
}
std::vector<bytes> result;
result.reserve(cartesian_product_size(value_vector));
for (auto&& v : make_cartesian_product(value_vector)) {
result.emplace_back(_tuple->serialize_value(v));
}
return result;
}
virtual std::vector<query::range> bounds(const query_options& options) override {
std::vector<query::range> ranges;
std::vector<std::vector<bytes_opt>> vec_of_values;
// TODO: optimize for all EQ case
for (auto def : _restrictions->get_column_defs()) {
auto r = _restrictions->get_restriction(*def);
if (vec_of_values.size() != _schema->position(*def) || r->is_contains()) {
// The prefixes built so far are the longest we can build,
// the rest of the constraints will have to be applied using filtering.
break;
}
if (r->is_slice()) {
// TODO: make restriction::bounds() return query::range to simplify all this
if (cartesian_product_is_empty(vec_of_values)) {
auto read_value = [r, &options] (statements::bound b) {
auto value = r->bounds(b, options)[0];
if (!value) {
throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string()));
}
return *value;
};
if (r->has_bound(statements::bound::START) && r->has_bound(statements::bound::END)) {
ranges.emplace_back(query::range(read_value(statements::bound::START), read_value(statements::bound::END),
r->is_inclusive(statements::bound::START), r->is_inclusive(statements::bound::END)));
} else if (r->has_bound(statements::bound::START)) {
ranges.emplace_back(query::range::make_starting_with(read_value(statements::bound::START),
r->is_inclusive(statements::bound::START)));
} else {
assert(r->has_bound(statements::bound::END));
ranges.emplace_back(query::range::make_ending_with(read_value(statements::bound::END),
r->is_inclusive(statements::bound::END)));
}
if (def->type->is_reversed()) {
ranges.back().reverse();
}
return std::move(ranges);
}
ranges.reserve(cartesian_product_size(vec_of_values));
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
auto read_bounds = [r, &prefix, &options, this](bytes& value_holder, bool& inclusive_holder, statements::bound bound) {
if (r->has_bound(bound)) {
auto value = std::move(r->bounds(bound, options)[0]);
if (!value) {
throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", r->to_string()));
}
prefix.emplace_back(std::move(value));
value_holder = _tuple->serialize_value(prefix);
prefix.pop_back();
inclusive_holder = r->is_inclusive(bound);
} else {
value_holder = _tuple->serialize_value(prefix);
inclusive_holder = true;
}
};
bytes start_tuple;
bytes end_tuple;
bool start_inclusive;
bool end_inclusive;
read_bounds(start_tuple, start_inclusive, statements::bound::START);
read_bounds(end_tuple, end_inclusive, statements::bound::END);
ranges.emplace_back(query::range(std::move(start_tuple), std::move(end_tuple),
start_inclusive, end_inclusive));
if (def->type->is_reversed()) {
ranges.back().reverse();
}
}
return std::move(ranges);
}
auto values = r->values(options);
for (auto&& val : values) {
if (!val) {
throw exceptions::invalid_request_exception(sprint("Invalid null clustering key part %s", def->name_as_text()));
}
}
if (values.empty()) {
return {};
}
vec_of_values.emplace_back(std::move(values));
}
ranges.reserve(cartesian_product_size(vec_of_values));
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
ranges.emplace_back(query::range::make_singular(_tuple->serialize_value(prefix)));
}
return std::move(ranges);
}
#if 0
virtual bool hasSupportingIndex(SecondaryIndexManager indexManager) override {
return restrictions.hasSupportingIndex(indexManager);
}
virtual void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) override {
restrictions.addIndexExpressionTo(expressions, options);
}
#endif
virtual std::vector<const column_definition*> get_column_defs() override {
return _restrictions->get_column_defs();
}
virtual bool empty() override {
return _restrictions->empty();
}
virtual uint32_t size() override {
return _restrictions->size();
}
};
}
}

View File

@@ -24,6 +24,7 @@ std::ostream& operator<<(std::ostream& out, cause c) {
case cause::METRICS: return out << "METRICS";
case cause::COMPACT_TABLES: return out << "COMPACT_TABLES";
case cause::GOSSIP: return out << "GOSSIP";
case cause::TOKEN_RESTRICTION: return out << "TOKEN_RESTRICTION";
}
assert(0);
}

View File

@@ -22,7 +22,8 @@ enum class cause {
COUNTERS,
METRICS,
COMPACT_TABLES,
GOSSIP
GOSSIP,
TOKEN_RESTRICTION,
};
void fail(cause what) __attribute__((noreturn));