From f7aa2a58bd74eb61ed247ffbfcb61b6d08175a9f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 10 Mar 2015 09:45:24 +0100 Subject: [PATCH] cql3: Convert SelectStatement and SelectStatement.RawStatement --- configure.py | 1 + cql3/statements/select_statement.cc | 186 +++++ cql3/statements/select_statement.hh | 1097 ++++++++++----------------- 3 files changed, 575 insertions(+), 709 deletions(-) create mode 100644 cql3/statements/select_statement.cc diff --git a/configure.py b/configure.py index 375c107c90..dcda0438d0 100755 --- a/configure.py +++ b/configure.py @@ -264,6 +264,7 @@ urchin_core = (['database.cc', 'cql3/statements/modification_statement.cc', 'cql3/statements/update_statement.cc', 'cql3/statements/delete_statement.cc', + 'cql3/statements/select_statement.cc', 'thrift/handler.cc', 'thrift/server.cc', 'thrift/thrift_validation.cc', diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc new file mode 100644 index 0000000000..3375ddd00a --- /dev/null +++ b/cql3/statements/select_statement.cc @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +#include "cql3/statements/select_statement.hh" +#include "cql3/selection/selection.hh" +#include "core/shared_ptr.hh" +#include "query.hh" + +namespace cql3 { + +namespace statements { + +const shared_ptr select_statement::_default_parameters = ::make_shared(); + +future> +select_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) { + auto cl = options.get_consistency(); + + validate_for_read(_schema->ks_name, cl); + + int32_t limit = get_limit(options); + auto now = db_clock::now(); + + auto command = ::make_lw_shared(_schema->ks_name, _schema->cf_name, + _restrictions->get_partition_key_ranges(options), make_partition_slice(options), limit); + + int32_t page_size = options.get_page_size(); + + // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. + // If we user provided a page_size we'll use that to page internally (because why not), otherwise we use our default + // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). + if (_selection->is_aggregate() && page_size <= 0) { + page_size = DEFAULT_COUNT_PAGE_SIZE; + } + + warn(unimplemented::cause::PAGING); + return execute(proxy, command, state, options, now); + +#if 0 + if (page_size <= 0 || !command || !query_pagers::may_need_paging(command, page_size)) { + return execute(proxy, command, state, options, now); + } + + auto pager = query_pagers::pager(command, cl, state.get_client_state(), options.get_paging_state()); + + if (selection->isAggregate()) { + return page_aggregate_query(pager, options, page_size, now); + } + + // We can't properly do post-query ordering if we page (see #6722) + if (needs_post_query_ordering()) { + throw exceptions::invalid_request_exception( + "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); + } + + return pager->fetch_page(page_size).then([this, pager, &options, limit, now] (auto page) { + auto msg = process_results(page, options, limit, now); + + if (!pager->is_exhausted()) { + msg->result->metadata->set_has_more_pages(pager->state()); + } + + return msg; + }); +#endif +} + +future> +select_statement::execute(service::storage_proxy& proxy, lw_shared_ptr cmd, + service::query_state& state, const query_options& options, db_clock::time_point now) { + return proxy.query(std::move(cmd), options.get_consistency()) + .then([this, &options, now, cmd] (auto result) { + return this->process_results(std::move(result), cmd, options, now); + }); +} + +shared_ptr +select_statement::process_results(foreign_ptr> results, lw_shared_ptr cmd, + const query_options& options, db_clock::time_point now) { + auto builder = _selection->make_result_set_builder(now, options.get_protocol_version()); + + auto add_value = [builder] (const column_definition& def, std::experimental::optional& cell) { + if (!cell) { + builder->add_empty(); + return; + } + + if (def.type->is_multi_cell()) { + fail(unimplemented::cause::COLLECTIONS); +#if 0 + List cells = row.getMultiCellColumn(def.name); + ByteBuffer buffer = cells == null + ? null + : ((CollectionType)def.type).serializeForNativeProtocol(cells, options.getProtocolVersion()); + result.add(buffer); + return; +#endif + } + + builder->add(def, cell->as_atomic_cell()); + }; + + for (auto&& e : results->partitions) { + // FIXME: deserialize into views + auto key = _schema->partition_key_type->deserialize_value(e.first); + auto& partition = e.second; + + if (!partition.static_row.empty() && partition.rows.empty() + && !_restrictions->uses_secondary_indexing() + && _restrictions->has_no_clustering_columns_restriction()) { + builder->new_row(); + uint32_t static_id = 0; + for (auto&& def : _selection->get_columns()) { + if (def->is_partition_key()) { + builder->add(key[def->component_index()]); + } else if (def->is_static()) { + add_value(*def, partition.static_row.cells[static_id++]); + } else { + builder->add_empty(); + } + } + } else { + for (auto&& e : partition.rows) { + auto c_key = _schema->clustering_key_type->deserialize_value(e.first); + auto& cells = e.second.cells; + uint32_t static_id = 0; + uint32_t regular_id = 0; + + builder->new_row(); + for (auto&& def : _selection->get_columns()) { + switch (def->kind) { + case column_definition::column_kind::PARTITION: + builder->add(key[def->component_index()]); + break; + case column_definition::column_kind::CLUSTERING: + builder->add(c_key[def->component_index()]); + break; + case column_definition::column_kind::REGULAR: + add_value(*def, cells[regular_id++]); + break; + case column_definition::column_kind::STATIC: + add_value(*def, partition.static_row.cells[static_id++]); + break; + default: + assert(0); + } + } + } + } + } + + auto rs = builder->build(); + if (needs_post_query_ordering()) { + rs->sort(_ordering_comparator); + } + if (_is_reversed) { + rs->reverse(); + } + rs->trim(cmd->row_limit); + return ::make_shared(std::move(rs)); +} + +} +} diff --git a/cql3/statements/select_statement.hh b/cql3/statements/select_statement.hh index 283f620170..26be9a9ee4 100644 --- a/cql3/statements/select_statement.hh +++ b/cql3/statements/select_statement.hh @@ -25,59 +25,17 @@ #ifndef CQL3_STATEMENTS_SELECT_STATEMENT_HH #define CQL3_STATEMENTS_SELECT_STATEMENT_HH -#if 0 -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.base.Objects; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.auth.Permission; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.restrictions.StatementRestrictions; -import org.apache.cassandra.cql3.selection.RawSelector; -import org.apache.cassandra.cql3.selection.Selection; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.Composites; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.db.marshal.CollectionType; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.pager.Pageable; -import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.service.pager.QueryPagers; -import org.apache.cassandra.thrift.ThriftValidation; -import org.apache.cassandra.transport.messages.ResultMessage; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; - -import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; -import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; -import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; -import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; -#endif - #include "cql3/statements/cf_statement.hh" #include "cql3/cql_statement.hh" - +#include "cql3/selection/selection.hh" +#include "cql3/selection/raw_selector.hh" +#include "cql3/restrictions/statement_restrictions.hh" +#include "cql3/result_set.hh" +#include "exceptions/unrecognized_entity_exception.hh" +#include "service/client_state.hh" #include "core/shared_ptr.hh" +#include "core/distributed.hh" +#include "validation.hh" namespace cql3 { @@ -89,172 +47,130 @@ namespace statements { * */ class select_statement : public cql_statement { -private: -#if 0 - private static final int DEFAULT_COUNT_PAGE_SIZE = 10000; - - private final int boundTerms; -#endif public: -#if 0 - public final CFMetaData cfm; - public final Parameters parameters; - private final Selection selection; - private final Term limit; + class parameters final { + public: + using orderings_type = std::unordered_map, bool, + shared_ptr_value_hash, shared_ptr_equal_by_value>; + private: + const orderings_type _orderings; + const bool _is_distinct; + const bool _allow_filtering; + public: + parameters() + : _is_distinct{false} + , _allow_filtering{false} + { } + parameters(orderings_type orderings, + bool is_distinct, + bool allow_filtering) + : _orderings{std::move(orderings)} + , _is_distinct{is_distinct} + , _allow_filtering{allow_filtering} + { } + bool is_distinct() { return _is_distinct; } + bool allow_filtering() { return _allow_filtering; } + orderings_type const& orderings() { return _orderings; } + }; +private: + static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000; + static const ::shared_ptr _default_parameters; + schema_ptr _schema; + uint32_t _bound_terms; + ::shared_ptr _parameters; + ::shared_ptr _selection; + ::shared_ptr _restrictions; + bool _is_reversed; + ::shared_ptr _limit; - private final StatementRestrictions restrictions; + template + using compare_fn = std::function; - private final boolean isReversed; + using result_row_type = std::vector; + using ordering_comparator_type = compare_fn; /** * The comparator used to orders results when multiple keys are selected (using IN). */ - private final Comparator> orderingComparator; + ordering_comparator_type _ordering_comparator; +public: + select_statement(schema_ptr schema, + uint32_t bound_terms, + ::shared_ptr parameters, + ::shared_ptr selection, + ::shared_ptr restrictions, + bool is_reversed, + ordering_comparator_type ordering_comparator, + ::shared_ptr limit) + : _schema(schema) + , _bound_terms(bound_terms) + , _parameters(std::move(parameters)) + , _selection(std::move(selection)) + , _restrictions(std::move(restrictions)) + , _is_reversed(is_reversed) + , _limit(std::move(limit)) + , _ordering_comparator(std::move(ordering_comparator)) + { } - // Used by forSelection below - private static final Parameters defaultParameters = new Parameters(Collections.emptyMap(), false, false); - - public SelectStatement(CFMetaData cfm, - int boundTerms, - Parameters parameters, - Selection selection, - StatementRestrictions restrictions, - boolean isReversed, - Comparator> orderingComparator, - Term limit) - { - this.cfm = cfm; - this.boundTerms = boundTerms; - this.selection = selection; - this.restrictions = restrictions; - this.isReversed = isReversed; - this.orderingComparator = orderingComparator; - this.parameters = parameters; - this.limit = limit; + virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const override { + return _selection->uses_function(ks_name, function_name) + || _restrictions->uses_function(ks_name, function_name) + || (_limit && _limit->uses_function(ks_name, function_name)); } - @Override - public boolean usesFunction(String ksName, String functionName) - { - return selection.usesFunction(ksName, functionName) - || restrictions.usesFunction(ksName, functionName) - || (limit != null && limit.usesFunction(ksName, functionName)); - } - - // Creates a simple select based on the given selection. + // Creates a simple select based on the given selection // Note that the results select statement should not be used for actual queries, but only for processing already // queried data through processColumnFamily. - static SelectStatement forSelection(CFMetaData cfm, Selection selection) - { - return new SelectStatement(cfm, - 0, - defaultParameters, - selection, - StatementRestrictions.empty(cfm), - false, - null, - null); + static ::shared_ptr for_selection(schema_ptr schema, + ::shared_ptr selection) { + return ::make_shared(schema, + 0, + _default_parameters, + selection, + ::make_shared(schema), + false, + ordering_comparator_type{}, + ::shared_ptr{}); } - public ResultSet.Metadata getResultMetadata() - { - return selection.getResultMetadata(); + ::shared_ptr get_result_metadata() { + return _selection->get_result_metadata(); } - public int getBoundTerms() - { - return boundTerms; + virtual uint32_t get_bound_terms() override { + return _bound_terms; } - public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException - { + virtual void check_access(const service::client_state& state) override { + warn(unimplemented::cause::PERMISSIONS); +#if 0 state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT); +#endif } - public void validate(ClientState state) throws InvalidRequestException - { - // Nothing to do, all validation has been done by RawStatement.prepare() + virtual void validate(const service::client_state& state) override { + // Nothing to do, all validation has been done by raw_statemet::prepare() } - public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException - { - ConsistencyLevel cl = options.getConsistency(); - checkNotNull(cl, "Invalid empty consistency level"); + virtual future<::shared_ptr> execute(service::storage_proxy& proxy, + service::query_state& state, const query_options& options) override; - cl.validateForRead(keyspace()); - - int limit = getLimit(options); - long now = System.currentTimeMillis(); - Pageable command = getPageableCommand(options, limit, now); - - int pageSize = options.getPageSize(); - - // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. - // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default - // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). - if (selection.isAggregate() && pageSize <= 0) - pageSize = DEFAULT_COUNT_PAGE_SIZE; - - if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) - { - return execute(command, options, limit, now, state); - } - - QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState()); - - if (selection.isAggregate()) - return pageAggregateQuery(pager, options, pageSize, now); - - // We can't properly do post-query ordering if we page (see #6722) - checkFalse(needsPostQueryOrdering(), - "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" - + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); - - List page = pager.fetchPage(pageSize); - ResultMessage.Rows msg = processResults(page, options, limit, now); - - if (!pager.isExhausted()) - msg.result.metadata.setHasMorePages(pager.state()); - - return msg; + virtual future<::shared_ptr> execute_internal(database& db, + service::query_state& state, const query_options& options) { + throw std::runtime_error("not implemented"); } - private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException - { - int limitForQuery = updateLimitForQuery(limit); - if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) - return getRangeCommand(options, limitForQuery, now); - - List commands = getSliceCommands(options, limitForQuery, now); - return commands == null ? null : new Pageable.ReadCommands(commands, limitForQuery); - } - - public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException - { - return getPageableCommand(options, getLimit(options), System.currentTimeMillis()); - } - - private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) throws RequestValidationException, RequestExecutionException - { - List rows; - if (command == null) - { - rows = Collections.emptyList(); - } - else - { - rows = command instanceof Pageable.ReadCommands - ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState()) - : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency()); - } - - return processResults(rows, options, limit, now); - } + future<::shared_ptr> execute(service::storage_proxy& proxy, + lw_shared_ptr cmd, service::query_state& state, + const query_options& options, db_clock::time_point now); + shared_ptr process_results(foreign_ptr> results, + lw_shared_ptr cmd, const query_options& options, db_clock::time_point now); +#if 0 private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now) throws RequestValidationException, RequestExecutionException { - Selection.ResultSetBuilder result = selection.resultSetBuilder(now); + Selection.ResultSetBuilder result = _selection->resultSetBuilder(now); while (!pager.isExhausted()) { for (org.apache.cassandra.db.Row row : pager.fetchPage(pageSize)) @@ -269,12 +185,6 @@ public: return new ResultMessage.Rows(result.build(options.getProtocolVersion())); } - public ResultMessage.Rows processResults(List rows, QueryOptions options, int limit, long now) throws RequestValidationException - { - ResultSet rset = process(rows, options, limit, now); - return new ResultMessage.Rows(rset); - } - static List readLocally(String keyspaceName, List cmds) { Keyspace keyspace = Keyspace.open(keyspaceName); @@ -306,164 +216,42 @@ public: public String keyspace() { - return cfm.ksName; + return _schema.ks_name; } public String columnFamily() { - return cfm.cfName; + return _schema.cfName; } +#endif - private List getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException - { - Collection keys = restrictions.getPartitionKeys(options); + query::partition_slice make_partition_slice(const query_options& options) { + std::vector static_columns; + std::vector regular_columns; - List commands = new ArrayList<>(keys.size()); - - IDiskAtomFilter filter = makeFilter(options, limit); - if (filter == null) - return null; - - // Note that we use the total limit for every key, which is potentially inefficient. - // However, IN + LIMIT is not a very sensible choice. - for (ByteBuffer key : keys) - { - QueryProcessor.validateKey(key); - // We should not share the slice filter amongst the commands (hence the cloneShallow), due to - // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method - // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly) - commands.add(ReadCommand.create(keyspace(), ByteBufferUtil.clone(key), columnFamily(), now, filter.cloneShallow())); + if (_selection->contains_static_columns()) { + static_columns.reserve(_selection->get_column_count()); } - return commands; - } + regular_columns.reserve(_selection->get_column_count()); - private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException - { - IDiskAtomFilter filter = makeFilter(options, limit); - if (filter == null) - return null; - - List expressions = getValidatedIndexExpressions(options); - // The LIMIT provided by the user is the number of CQL row he wants returned. - // We want to have getRangeSlice to count the number of columns, not the number of keys. - AbstractBounds keyBounds = restrictions.getPartitionKeyBounds(options); - return keyBounds == null - ? null - : new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false); - } - - private ColumnSlice makeStaticSlice() - { - // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the - // same effect while saving a few CPU cycles. - return isReversed - ? new ColumnSlice(cfm.comparator.staticPrefix().end(), Composites.EMPTY) - : new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()); - } - - private IDiskAtomFilter makeFilter(QueryOptions options, int limit) - throws InvalidRequestException - { - int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); - if (parameters.isDistinct) - { - // For distinct, we only care about fetching the beginning of each partition. If we don't have - // static columns, we in fact only care about the first cell, so we query only that (we don't "group"). - // If we do have static columns, we do need to fetch the first full group (to have the static columns values). - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selection.containsStaticColumns() ? toGroup : -1); + for (auto&& col : _selection->get_columns()) { + if (col->is_static()) { + static_columns.push_back(col->id); + } else if (col->is_regular()) { + regular_columns.push_back(col->id); + } } - else if (restrictions.isColumnRange()) - { - List startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options); - List endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options); - assert startBounds.size() == endBounds.size(); - // Handles fetching static columns. Note that for 2i, the filter is just used to restrict - // the part of the index to query so adding the static slice would be useless and confusing. - // For 2i, static columns are retrieve in CompositesSearcher with each index hit. - ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing() - ? makeStaticSlice() - : null; - - // The case where startBounds == 1 is common enough that it's worth optimizing - if (startBounds.size() == 1) - { - ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0)); - if (slice.isAlwaysEmpty(cfm.comparator, isReversed)) - return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); - - if (staticSlice == null) - return sliceFilter(slice, limit, toGroup); - - if (isReversed) - return slice.includes(cfm.comparator.reverseComparator(), staticSlice.start) - ? sliceFilter(new ColumnSlice(slice.start, staticSlice.finish), limit, toGroup) - : sliceFilter(new ColumnSlice[]{ slice, staticSlice }, limit, toGroup); - else - return slice.includes(cfm.comparator, staticSlice.finish) - ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup) - : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup); - } - - List l = new ArrayList(startBounds.size()); - for (int i = 0; i < startBounds.size(); i++) - { - ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i)); - if (!slice.isAlwaysEmpty(cfm.comparator, isReversed)) - l.add(slice); - } - - if (l.isEmpty()) - return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); - if (staticSlice == null) - return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup); - - // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but if there is - // a static slice, it could overlap with the 2nd slice. Check for it and correct if that's the case - ColumnSlice[] slices; - if (isReversed) - { - if (l.get(l.size() - 1).includes(cfm.comparator.reverseComparator(), staticSlice.start)) - { - slices = l.toArray(new ColumnSlice[l.size()]); - slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, Composites.EMPTY); - } - else - { - slices = l.toArray(new ColumnSlice[l.size()+1]); - slices[slices.length-1] = staticSlice; - } - } - else - { - if (l.get(0).includes(cfm.comparator, staticSlice.finish)) - { - slices = new ColumnSlice[l.size()]; - slices[0] = new ColumnSlice(Composites.EMPTY, l.get(0).finish); - for (int i = 1; i < l.size(); i++) - slices[i] = l.get(i); - } - else - { - slices = new ColumnSlice[l.size()+1]; - slices[0] = staticSlice; - for (int i = 0; i < l.size(); i++) - slices[i+1] = l.get(i); - } - } - return sliceFilter(slices, limit, toGroup); - } - else - { - SortedSet cellNames = getRequestedColumns(options); - if (cellNames == null) // in case of IN () for the last column of the key - return null; - QueryProcessor.validateCellNames(cellNames, cfm.comparator); - return new NamesQueryFilter(cellNames, true); + if (_parameters->is_distinct()) { + return query::partition_slice({}, std::move(static_columns), {}); } + + return query::partition_slice(_restrictions->get_clustering_bounds(options), + std::move(static_columns), std::move(regular_columns)); } +#if 0 private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup) { return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup); @@ -471,31 +259,35 @@ public: private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup) { - assert ColumnSlice.validateSlices(slices, cfm.comparator, isReversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (isReversed ? " (reversed)" : "")); - return new SliceQueryFilter(slices, isReversed, limit, toGroup); + assert ColumnSlice.validateSlices(slices, _schema.comparator, _is_reversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (_is_reversed ? " (reversed)" : "")); + return new SliceQueryFilter(slices, _is_reversed, limit, toGroup); } +#endif - private int getLimit(QueryOptions options) throws InvalidRequestException - { - if (limit != null) - { - ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit"); - - try - { - Int32Type.instance.validate(b); - int l = Int32Type.instance.compose(b); - checkTrue(l > 0, "LIMIT must be strictly positive"); - return l; - } - catch (MarshalException e) - { - throw new InvalidRequestException("Invalid limit value"); - } +private: + int32_t get_limit(const query_options& options) const { + if (!_limit) { + return std::numeric_limits::max(); + } + + auto val = _limit->bind_and_get(options); + if (!val) { + throw exceptions::invalid_request_exception("Invalid null value of limit"); + } + + try { + int32_type->validate(*val); + auto l = *boost::any_cast>(int32_type->deserialize(*val)); + if (l <= 0) { + throw exceptions::invalid_request_exception("LIMIT must be strictly positive"); + } + return l; + } catch (const marshal_exception& e) { + throw exceptions::invalid_request_exception("Invalid limit value"); } - return Integer.MAX_VALUE; } +#if 0 private int updateLimitForQuery(int limit) { // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary @@ -712,393 +504,280 @@ public: result.add(row.getColumn(def.name)); } +#endif - private boolean needsPostQueryOrdering() - { +private: + bool needs_post_query_ordering() const { // We need post-query ordering only for queries with IN on the partition key and an ORDER BY. - return restrictions.keyIsInRelation() && !parameters.orderings.isEmpty(); + return _restrictions->key_is_in_relation() && !_parameters->orderings().empty(); } - /** - * Orders results when multiple keys are selected (using IN) - */ - private void orderResults(ResultSet cqlRows) - { - if (cqlRows.size() == 0 || !needsPostQueryOrdering()) +public: + class raw_statement; +}; + +class select_statement::raw_statement : public cf_statement +{ +private: + ::shared_ptr _parameters; + std::vector<::shared_ptr> _select_clause; + std::vector<::shared_ptr> _where_clause; + ::shared_ptr _limit; +public: + raw_statement(::shared_ptr cf_name, + ::shared_ptr parameters, + std::vector<::shared_ptr> select_clause, + std::vector<::shared_ptr> where_clause, + ::shared_ptr limit) + : cf_statement(std::move(cf_name)) + , _parameters(std::move(parameters)) + , _select_clause(std::move(select_clause)) + , _where_clause(std::move(where_clause)) + , _limit(std::move(limit)) + { } + + virtual std::unique_ptr prepare(database& db) override { + schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family()); + auto bound_names = get_bound_variables(); + + auto selection = _select_clause.empty() + ? selection::selection::wildcard(schema) + : selection::selection::from_selectors(schema, _select_clause); + + auto restrictions = prepare_restrictions(schema, bound_names, selection); + + if (_parameters->is_distinct()) { + validate_distinct_selection(schema, selection, restrictions); + } + + select_statement::ordering_comparator_type ordering_comparator; + bool is_reversed_ = false; + + if (!_parameters->orderings().empty()) { + verify_ordering_is_allowed(restrictions); + ordering_comparator = get_ordering_comparator(schema, selection, restrictions); + is_reversed_ = is_reversed(schema); + } + + if (is_reversed_) { + restrictions->reverse(); + } + + check_needs_filtering(restrictions); + + auto stmt = ::make_shared(schema, + bound_names->size(), + _parameters, + std::move(selection), + std::move(restrictions), + is_reversed_, + std::move(ordering_comparator), + prepare_limit(bound_names)); + + return std::make_unique(std::move(stmt), std::move(*bound_names)); + } + +private: + ::shared_ptr prepare_restrictions(schema_ptr schema, + ::shared_ptr bound_names, ::shared_ptr selection) { + try { + return ::make_shared(schema, std::move(_where_clause), bound_names, + selection->contains_only_static_columns(), selection->contains_a_collection()); + } catch (const exceptions::unrecognized_entity_exception& e) { + if (contains_alias(e.entity)) { + throw exceptions::invalid_request_exception(sprint("Aliases aren't allowed in the where clause ('%s')", e.relation->to_string())); + } + throw; + } + } + + /** Returns a ::shared_ptr for the limit or null if no limit is set */ + ::shared_ptr prepare_limit(::shared_ptr bound_names) { + if (!_limit) { + return {}; + } + + auto prep_limit = _limit->prepare(keyspace(), limit_receiver()); + prep_limit->collect_marker_specification(bound_names); + return prep_limit; + } + + static void verify_ordering_is_allowed(::shared_ptr restrictions) { + if (restrictions->uses_secondary_indexing()) { + throw exceptions::invalid_request_exception("ORDER BY with 2ndary indexes is not supported."); + } + if (restrictions->is_key_range()) { + throw exceptions::invalid_request_exception("ORDER BY is only supported when the partition key is restricted by an EQ or an IN."); + } + } + + static void validate_distinct_selection(schema_ptr schema, ::shared_ptr selection, + ::shared_ptr restrictions) { + for (auto&& def : selection->get_columns()) { + if (!def->is_partition_key() && !def->is_static()) { + throw exceptions::invalid_request_exception(sprint( + "SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)", + def->name_as_text())); + } + } + + // If it's a key range, we require that all partition key columns are selected so we don't have to bother + // with post-query grouping. + if (!restrictions->is_key_range()) { return; + } - Collections.sort(cqlRows.rows, orderingComparator); + for (auto&& def : schema->partition_key_columns()) { + if (!selection->has_column(def)) { + throw exceptions::invalid_request_exception(sprint( + "SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name_as_text())); + } + } } -#endif - class raw_statement : public cf_statement - { -#if 0 - private final Parameters parameters; - private final List selectClause; - private final List whereClause; - private final Term.Raw limit; + void handle_unrecognized_ordering_column(::shared_ptr column) { + if (contains_alias(column)) { + throw exceptions::invalid_request_exception(sprint("Aliases are not allowed in order by clause ('%s')", *column)); + } + throw exceptions::invalid_request_exception(sprint("Order by on unknown column %s", *column)); + } - public RawStatement(CFName cfName, Parameters parameters, List selectClause, List whereClause, Term.Raw limit) - { - super(cfName); - this.parameters = parameters; - this.selectClause = selectClause; - this.whereClause = whereClause == null ? Collections.emptyList() : whereClause; - this.limit = limit; + select_statement::ordering_comparator_type get_ordering_comparator(schema_ptr schema, ::shared_ptr selection, + ::shared_ptr restrictions) { + if (!restrictions->key_is_in_relation()) { + return {}; } - public ParsedStatement.Prepared prepare() throws InvalidRequestException - { - CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); - VariableSpecifications boundNames = getBoundVariables(); + std::vector> sorters; + sorters.reserve(_parameters->orderings().size()); - Selection selection = selectClause.isEmpty() - ? Selection.wildcard(cfm) - : Selection.fromSelectors(cfm, selectClause); - - StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection); - - if (parameters.isDistinct) - validateDistinctSelection(cfm, selection, restrictions); - - Comparator> orderingComparator = null; - boolean isReversed = false; - - if (!parameters.orderings.isEmpty()) - { - verifyOrderingIsAllowed(restrictions); - orderingComparator = getOrderingComparator(cfm, selection, restrictions); - isReversed = isReversed(cfm); + // If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting, + // even if we don't + // ultimately ship them to the client (CASSANDRA-4911). + for (auto&& e : _parameters->orderings()) { + auto&& raw = e.first; + ::shared_ptr column = raw->prepare_column_identifier(schema); + const column_definition* def = schema->get_column_definition(column->name()); + if (!def) { + handle_unrecognized_ordering_column(column); + } + auto index = selection->index_of(*def); + if (index < 0) { + index = selection->add_column_for_ordering(*def); } - if (isReversed) - restrictions.reverse(); - - checkNeedsFiltering(restrictions); - - SelectStatement stmt = new SelectStatement(cfm, - boundNames.size(), - parameters, - selection, - restrictions, - isReversed, - orderingComparator, - prepareLimit(boundNames)); - - return new ParsedStatement.Prepared(stmt, boundNames); + sorters.emplace_back(index, def->type); } - /** - * Prepares the restrictions. - * - * @param cfm the column family meta data - * @param boundNames the variable specifications - * @param selection the selection - * @return the restrictions - * @throws InvalidRequestException if a problem occurs while building the restrictions - */ - private StatementRestrictions prepareRestrictions(CFMetaData cfm, - VariableSpecifications boundNames, - Selection selection) throws InvalidRequestException - { - try - { - return new StatementRestrictions(cfm, - whereClause, - boundNames, - selection.containsOnlyStaticColumns(), - selection.containsACollection()); - } - catch (UnrecognizedEntityException e) - { - if (containsAlias(e.entity)) - throw invalidRequest("Aliases aren't allowed in the where clause ('%s')", e.relation); - throw e; - } - } + return [sorters = std::move(sorters)] (const result_row_type& r1, const result_row_type& r2) mutable { + for (auto&& e : sorters) { + auto& c1 = r1[e.first]; + auto& c2 = r2[e.first]; + auto type = e.second; - /** Returns a Term for the limit or null if no limit is set */ - private Term prepareLimit(VariableSpecifications boundNames) throws InvalidRequestException - { - if (limit == null) - return null; - - Term prepLimit = limit.prepare(keyspace(), limitReceiver()); - prepLimit.collectMarkerSpecification(boundNames); - return prepLimit; - } - - private static void verifyOrderingIsAllowed(StatementRestrictions restrictions) throws InvalidRequestException - { - checkFalse(restrictions.usesSecondaryIndexing(), "ORDER BY with 2ndary indexes is not supported."); - checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported when the partition key is restricted by an EQ or an IN."); - } - - private static void validateDistinctSelection(CFMetaData cfm, - Selection selection, - StatementRestrictions restrictions) - throws InvalidRequestException - { - Collection requestedColumns = selection.getColumns(); - for (ColumnDefinition def : requestedColumns) - checkFalse(!def.isPartitionKey() && !def.isStatic(), - "SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)", - def.name); - - // If it's a key range, we require that all partition key columns are selected so we don't have to bother - // with post-query grouping. - if (!restrictions.isKeyRange()) - return; - - for (ColumnDefinition def : cfm.partitionKeyColumns()) - checkTrue(requestedColumns.contains(def), - "SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name); - } - - private void handleUnrecognizedOrderingColumn(ColumnIdentifier column) throws InvalidRequestException - { - checkFalse(containsAlias(column), "Aliases are not allowed in order by clause ('%s')", column); - checkFalse(true, "Order by on unknown column %s", column); - } - - private Comparator> getOrderingComparator(CFMetaData cfm, - Selection selection, - StatementRestrictions restrictions) - throws InvalidRequestException - { - if (!restrictions.keyIsInRelation()) - return null; - - Map orderingIndexes = getOrderingIndex(cfm, selection); - - List idToSort = new ArrayList(); - List> sorters = new ArrayList>(); - - for (ColumnIdentifier.Raw raw : parameters.orderings.keySet()) - { - ColumnIdentifier identifier = raw.prepare(cfm); - ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier); - idToSort.add(orderingIndexes.get(orderingColumn.name)); - sorters.add(orderingColumn.type); - } - return idToSort.size() == 1 ? new SingleColumnComparator(idToSort.get(0), sorters.get(0)) - : new CompositeComparator(sorters, idToSort); - } - - private Map getOrderingIndex(CFMetaData cfm, Selection selection) - throws InvalidRequestException - { - // If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting, - // even if we don't - // ultimately ship them to the client (CASSANDRA-4911). - Map orderingIndexes = new HashMap<>(); - for (ColumnIdentifier.Raw raw : parameters.orderings.keySet()) - { - ColumnIdentifier column = raw.prepare(cfm); - final ColumnDefinition def = cfm.getColumnDefinition(column); - if (def == null) - handleUnrecognizedOrderingColumn(column); - int index = selection.indexOf(def); - if (index < 0) - index = selection.addColumnForOrdering(def); - orderingIndexes.put(def.name, index); - } - return orderingIndexes; - } - - private boolean isReversed(CFMetaData cfm) throws InvalidRequestException - { - Boolean[] reversedMap = new Boolean[cfm.clusteringColumns().size()]; - int i = 0; - for (Map.Entry entry : parameters.orderings.entrySet()) - { - ColumnIdentifier column = entry.getKey().prepare(cfm); - boolean reversed = entry.getValue(); - - ColumnDefinition def = cfm.getColumnDefinition(column); - if (def == null) - handleUnrecognizedOrderingColumn(column); - - checkTrue(def.isClusteringColumn(), - "Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", column); - - checkTrue(i++ == def.position(), - "Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"); - - reversedMap[def.position()] = (reversed != def.isReversedType()); - } - - // Check that all boolean in reversedMap, if set, agrees - Boolean isReversed = null; - for (Boolean b : reversedMap) - { - // Column on which order is specified can be in any order - if (b == null) - continue; - - if (isReversed == null) - { - isReversed = b; - continue; + if (bool(c1) != bool(c2)) { + return bool(c2); + } + if (c1) { + int result = type->compare(*c1, *c2); + if (result != 0) { + return result < 0; + } } - checkTrue(isReversed.equals(b), "Unsupported order by relation"); } - assert isReversed != null; - return isReversed; + return false; + }; + } + + bool is_reversed(schema_ptr schema) { + std::experimental::optional reversed_map[schema->clustering_key_size()]; + + uint32_t i = 0; + for (auto&& e : _parameters->orderings()) { + ::shared_ptr column = e.first->prepare_column_identifier(schema); + bool reversed = e.second; + + auto def = schema->get_column_definition(column->name()); + if (!def) { + handle_unrecognized_ordering_column(column); + } + + if (!def->is_clustering_key()) { + throw exceptions::invalid_request_exception(sprint( + "Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", *column)); + } + + if (i != def->component_index()) { + throw exceptions::invalid_request_exception( + "Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"); + } + + reversed_map[i] = std::experimental::make_optional(reversed != def->type->is_reversed()); + ++i; } - /** If ALLOW FILTERING was not specified, this verifies that it is not needed */ - private void checkNeedsFiltering(StatementRestrictions restrictions) throws InvalidRequestException - { - // non-key-range non-indexed queries cannot involve filtering underneath - if (!parameters.allowFiltering && (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())) - { - // We will potentially filter data if either: - // - Have more than one IndexExpression - // - Have no index expression and the column filter is not the identity - checkFalse(restrictions.needFiltering(), - "Cannot execute this query as it might involve data filtering and " + - "thus may have unpredictable performance. If you want to execute " + - "this query despite the performance unpredictability, use ALLOW FILTERING"); - } + // GCC incorrenctly complains about "*is_reversed_" below + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" - // We don't internally support exclusive slice bounds on non-composite tables. To deal with it we do an - // inclusive slice and remove post-query the value that shouldn't be returned. One problem however is that - // if there is a user limit, that limit may make the query return before the end of the slice is reached, - // in which case, once we'll have removed bound post-query, we might end up with less results than - // requested which would be incorrect. For single-partition query, this is not a problem, we just ask for - // one more result (see updateLimitForQuery()) since that's enough to compensate for that problem. For key - // range however, each returned row may include one result that will have to be trimmed, so we would have - // to bump the query limit by N where N is the number of rows we will return, but we don't know that in - // advance. So, since we currently don't have a good way to handle such query, we refuse it (#7059) rather - // than answering with something that is wrong. - if (restrictions.isNonCompositeSliceWithExclusiveBounds() && restrictions.isKeyRange() && limit != null) - { - SingleColumnRelation rel = findInclusiveClusteringRelationForCompact(restrictions.cfm); - throw invalidRequest("The query requests a restriction of rows with a strict bound (%s) over a range of partitions. " - + "This is not supported by the underlying storage engine for COMPACT tables if a LIMIT is provided. " - + "Please either make the condition non strict (%s) or remove the user LIMIT", rel, rel.withNonStrictOperator()); + // Check that all bool in reversedMap, if set, agrees + std::experimental::optional is_reversed_{}; + for (auto&& b : reversed_map) { + if (b) { + if (!is_reversed_) { + is_reversed_ = b; + } else { + if ((*is_reversed_) != *b) { + throw exceptions::invalid_request_exception("Unsupported order by relation"); + } + } } } - private SingleColumnRelation findInclusiveClusteringRelationForCompact(CFMetaData cfm) - { - for (Relation r : whereClause) - { - // We only call this when sliceRestriction != null, i.e. for compact table with non composite comparator, - // so it can't be a MultiColumnRelation. - SingleColumnRelation rel = (SingleColumnRelation)r; + assert(is_reversed_); + return *is_reversed_; - if (cfm.getColumnDefinition(rel.getEntity().prepare(cfm)).isClusteringColumn() - && (rel.operator() == Operator.GT || rel.operator() == Operator.LT)) - return rel; + #pragma GCC diagnostic pop + } + + /** If ALLOW FILTERING was not specified, this verifies that it is not needed */ + void check_needs_filtering(::shared_ptr restrictions) { + // non-key-range non-indexed queries cannot involve filtering underneath + if (!_parameters->allow_filtering() && (restrictions->is_key_range() || restrictions->uses_secondary_indexing())) { + // We will potentially filter data if either: + // - Have more than one IndexExpression + // - Have no index expression and the column filter is not the identity + if (restrictions->need_filtering()) { + throw exceptions::invalid_request_exception( + "Cannot execute this query as it might involve data filtering and " + "thus may have unpredictable performance. If you want to execute " + "this query despite the performance unpredictability, use ALLOW FILTERING"); } - - // We're not supposed to call this method unless we know this can't happen - throw new AssertionError(); } + } - private boolean containsAlias(final ColumnIdentifier name) - { - return Iterables.any(selectClause, new Predicate() - { - public boolean apply(RawSelector raw) - { - return name.equals(raw.alias); - } - }); - } + bool contains_alias(::shared_ptr name) { + return std::any_of(_select_clause.begin(), _select_clause.end(), [name] (auto raw) { + return *name == *raw->alias; + }); + } - private ColumnSpecification limitReceiver() - { - return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("name", cfName) - .add("selectClause", selectClause) - .add("whereClause", whereClause) - .add("isDistinct", parameters.isDistinct) - .toString(); - } -#endif - }; - - class parameters final { - public: - using orderings_type = std::unordered_map, bool, shared_ptr_value_hash, shared_ptr_equal_by_value>; - private: - const orderings_type _orderings; - const bool _is_distinct; - const bool _allow_filtering; - public: - parameters(const orderings_type& orderings, - bool is_distinct, - bool allow_filtering) - : _orderings{orderings} - , _is_distinct{is_distinct} - , _allow_filtering{allow_filtering} - { } - }; + ::shared_ptr limit_receiver() { + return ::make_shared(keyspace(), column_family(), ::make_shared("[limit]", true), + int32_type); + } #if 0 - /** - * Used in orderResults(...) method when single 'ORDER BY' condition where given - */ - private static class SingleColumnComparator implements Comparator> - { - private final int index; - private final Comparator comparator; - - public SingleColumnComparator(int columnIndex, Comparator orderer) - { - index = columnIndex; - comparator = orderer; + public: + virtual sstring to_string() override { + return sstring("raw_statement(") + + "name=" + cf_name->to_string() + + ", selectClause=" + to_string(_select_clause) + + ", whereClause=" + to_string(_where_clause) + + ", isDistinct=" + to_string(_parameters->is_distinct()) + + ")"; } - - public int compare(List a, List b) - { - return comparator.compare(a.get(index), b.get(index)); - } - } - - /** - * Used in orderResults(...) method when multiple 'ORDER BY' conditions where given - */ - private static class CompositeComparator implements Comparator> - { - private final List> orderTypes; - private final List positions; - - private CompositeComparator(List> orderTypes, List positions) - { - this.orderTypes = orderTypes; - this.positions = positions; - } - - public int compare(List a, List b) - { - for (int i = 0; i < positions.size(); i++) - { - Comparator type = orderTypes.get(i); - int columnPos = positions.get(i); - - ByteBuffer aValue = a.get(columnPos); - ByteBuffer bValue = b.get(columnPos); - - int comparison = type.compare(aValue, bValue); - - if (comparison != 0) - return comparison; - } - - return 0; - } - } + }; #endif };