This commit is contained in:
Avi Kivity
2016-06-01 18:29:23 +03:00
18 changed files with 549 additions and 173 deletions

View File

@@ -46,11 +46,12 @@ options {
#include "cql3/statements/property_definitions.hh"
#include "cql3/statements/drop_table_statement.hh"
#include "cql3/statements/truncate_statement.hh"
#include "cql3/statements/update_statement.hh"
#include "cql3/statements/delete_statement.hh"
#include "cql3/statements/raw/update_statement.hh"
#include "cql3/statements/raw/insert_statement.hh"
#include "cql3/statements/raw/delete_statement.hh"
#include "cql3/statements/index_prop_defs.hh"
#include "cql3/statements/use_statement.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/raw/batch_statement.hh"
#include "cql3/statements/create_user_statement.hh"
#include "cql3/statements/alter_user_statement.hh"
#include "cql3/statements/drop_user_statement.hh"
@@ -439,7 +440,7 @@ orderByClause[raw::select_statement::parameters::orderings_type& orderings]
* USING TIMESTAMP <long>;
*
*/
insertStatement returns [::shared_ptr<update_statement::parsed_insert> expr]
insertStatement returns [::shared_ptr<raw::insert_statement> expr]
@init {
auto attrs = ::make_shared<cql3::attributes::raw>();
std::vector<::shared_ptr<cql3::column_identifier::raw>> column_names;
@@ -454,7 +455,7 @@ insertStatement returns [::shared_ptr<update_statement::parsed_insert> expr]
( K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
( usingClause[attrs] )?
{
$expr = ::make_shared<update_statement::parsed_insert>(std::move(cf),
$expr = ::make_shared<raw::insert_statement>(std::move(cf),
std::move(attrs),
std::move(column_names),
std::move(values),
@@ -477,7 +478,7 @@ usingClauseObjective[::shared_ptr<cql3::attributes::raw> attrs]
* SET name1 = value1, name2 = value2
* WHERE key = value;
*/
updateStatement returns [::shared_ptr<update_statement::parsed_update> expr]
updateStatement returns [::shared_ptr<raw::update_statement> expr]
@init {
auto attrs = ::make_shared<cql3::attributes::raw>();
std::vector<std::pair<::shared_ptr<cql3::column_identifier::raw>, ::shared_ptr<cql3::operation::raw_update>>> operations;
@@ -488,7 +489,7 @@ updateStatement returns [::shared_ptr<update_statement::parsed_update> expr]
K_WHERE wclause=whereClause
( K_IF conditions=updateConditions )?
{
return ::make_shared<update_statement::parsed_update>(std::move(cf),
return ::make_shared<raw::update_statement>(std::move(cf),
std::move(attrs),
std::move(operations),
std::move(wclause),
@@ -507,7 +508,7 @@ updateConditions returns [conditions_type conditions]
* WHERE KEY = keyname
[IF (EXISTS | name = value, ...)];
*/
deleteStatement returns [::shared_ptr<delete_statement::parsed> expr]
deleteStatement returns [::shared_ptr<raw::delete_statement> expr]
@init {
auto attrs = ::make_shared<cql3::attributes::raw>();
std::vector<::shared_ptr<cql3::operation::raw_deletion>> column_deletions;
@@ -519,7 +520,7 @@ deleteStatement returns [::shared_ptr<delete_statement::parsed> expr]
K_WHERE wclause=whereClause
( K_IF ( K_EXISTS { if_exists = true; } | conditions=updateConditions ))?
{
return ::make_shared<delete_statement::parsed>(cf,
return ::make_shared<raw::delete_statement>(cf,
std::move(attrs),
std::move(column_deletions),
std::move(wclause),
@@ -566,11 +567,11 @@ usingClauseDelete[::shared_ptr<cql3::attributes::raw> attrs]
* ...
* APPLY BATCH
*/
batchStatement returns [shared_ptr<cql3::statements::batch_statement::parsed> expr]
batchStatement returns [shared_ptr<cql3::statements::raw::batch_statement> expr]
@init {
using btype = cql3::statements::batch_statement::type;
using btype = cql3::statements::raw::batch_statement::type;
btype type = btype::LOGGED;
std::vector<shared_ptr<cql3::statements::modification_statement::parsed>> statements;
std::vector<shared_ptr<cql3::statements::raw::modification_statement>> statements;
auto attrs = make_shared<cql3::attributes::raw>();
}
: K_BEGIN
@@ -579,11 +580,11 @@ batchStatement returns [shared_ptr<cql3::statements::batch_statement::parsed> ex
( s=batchStatementObjective ';'? { statements.push_back(std::move(s)); } )*
K_APPLY K_BATCH
{
$expr = ::make_shared<cql3::statements::batch_statement::parsed>(type, std::move(attrs), std::move(statements));
$expr = ::make_shared<cql3::statements::raw::batch_statement>(type, std::move(attrs), std::move(statements));
}
;
batchStatementObjective returns [shared_ptr<cql3::statements::modification_statement::parsed> statement]
batchStatementObjective returns [shared_ptr<cql3::statements::raw::modification_statement> statement]
: i=insertStatement { $statement = i; }
| u=updateStatement { $statement = u; }
| d=deleteStatement { $statement = d; }

View File

@@ -38,6 +38,7 @@
*/
#include "batch_statement.hh"
#include "raw/batch_statement.hh"
#include "db/config.hh"
namespace cql3 {
@@ -100,6 +101,30 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
}
}
namespace raw {
shared_ptr<prepared_statement>
batch_statement::prepare(database& db) {
auto&& bound_names = get_bound_variables();
std::vector<shared_ptr<cql3::statements::modification_statement>> statements;
for (auto&& parsed : _parsed_statements) {
statements.push_back(parsed->prepare(db, bound_names));
}
auto&& prep_attrs = _attrs->prepare(db, "[batch]", "[batch]");
prep_attrs->collect_marker_specification(bound_names);
cql3::statements::batch_statement batch_statement_(bound_names->size(), _type, std::move(statements), std::move(prep_attrs));
batch_statement_.validate();
return ::make_shared<prepared>(make_shared(std::move(batch_statement_)),
bound_names->get_specifications());
}
}
}
}

View File

@@ -39,6 +39,8 @@
#include "cql3/cql_statement.hh"
#include "modification_statement.hh"
#include "raw/modification_statement.hh"
#include "raw/batch_statement.hh"
#include "service/storage_proxy.hh"
#include "transport/messages/result_message.hh"
#include "timestamp.hh"
@@ -63,9 +65,7 @@ namespace statements {
class batch_statement : public cql_statement_no_metadata {
static logging::logger _logger;
public:
enum class type {
LOGGED, UNLOGGED, COUNTER
};
using type = raw::batch_statement::type;
private:
int _bound_terms;
public:
@@ -317,46 +317,6 @@ public:
return sprint("BatchStatement(type=%s, statements=%s)", _type, join(", ", _statements));
}
#endif
class parsed : public raw::cf_statement {
type _type;
shared_ptr<attributes::raw> _attrs;
std::vector<shared_ptr<modification_statement::parsed>> _parsed_statements;
public:
parsed(
type type_,
shared_ptr<attributes::raw> attrs,
std::vector<shared_ptr<modification_statement::parsed>> parsed_statements)
: cf_statement(nullptr)
, _type(type_)
, _attrs(std::move(attrs))
, _parsed_statements(std::move(parsed_statements)) {
}
virtual void prepare_keyspace(const service::client_state& state) override {
for (auto&& s : _parsed_statements) {
s->prepare_keyspace(state);
}
}
virtual shared_ptr<prepared> prepare(database& db) override {
auto&& bound_names = get_bound_variables();
std::vector<shared_ptr<modification_statement>> statements;
for (auto&& parsed : _parsed_statements) {
statements.push_back(parsed->prepare(db, bound_names));
}
auto&& prep_attrs = _attrs->prepare(db, "[batch]", "[batch]");
prep_attrs->collect_marker_specification(bound_names);
batch_statement batch_statement_(bound_names->size(), _type, std::move(statements), std::move(prep_attrs));
batch_statement_.validate();
return ::make_shared<prepared>(make_shared(std::move(batch_statement_)),
bound_names->get_specifications());
}
};
};
}

View File

@@ -40,6 +40,7 @@
*/
#include "delete_statement.hh"
#include "raw/delete_statement.hh"
namespace cql3 {
@@ -76,11 +77,13 @@ void delete_statement::add_update_for_key(mutation& m, const exploded_clustering
}
}
::shared_ptr<modification_statement>
delete_statement::parsed::prepare_internal(database& db, schema_ptr schema, ::shared_ptr<variable_specifications> bound_names,
std::unique_ptr<attributes> attrs) {
namespace raw {
auto stmt = ::make_shared<delete_statement>(statement_type::DELETE, bound_names->size(), schema, std::move(attrs));
::shared_ptr<cql3::statements::modification_statement>
delete_statement::prepare_internal(database& db, schema_ptr schema, ::shared_ptr<variable_specifications> bound_names,
std::unique_ptr<attributes> attrs) {
using statement_type = cql3::statements::modification_statement::statement_type;
auto stmt = ::make_shared<cql3::statements::delete_statement>(statement_type::DELETE, bound_names->size(), schema, std::move(attrs));
for (auto&& deletion : _deletions) {
auto&& id = deletion->affected_column()->prepare_column_identifier(schema);
@@ -104,13 +107,13 @@ delete_statement::parsed::prepare_internal(database& db, schema_ptr schema, ::sh
return stmt;
}
delete_statement::parsed::parsed(::shared_ptr<cf_name> name,
delete_statement::delete_statement(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<operation::raw_deletion>> deletions,
std::vector<::shared_ptr<relation>> where_clause,
conditions_vector conditions,
bool if_exists)
: modification_statement::parsed(std::move(name), std::move(attrs), std::move(conditions), false, if_exists)
: raw::modification_statement(std::move(name), std::move(attrs), std::move(conditions), false, if_exists)
, _deletions(std::move(deletions))
, _where_clause(std::move(where_clause))
{ }
@@ -118,3 +121,5 @@ delete_statement::parsed::parsed(::shared_ptr<cf_name> name,
}
}
}

View File

@@ -42,6 +42,7 @@
#pragma once
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/attributes.hh"
#include "cql3/operation.hh"
#include "database_fwd.hh"
@@ -79,22 +80,6 @@ public:
}
#endif
class parsed : public modification_statement::parsed {
private:
std::vector<::shared_ptr<operation::raw_deletion>> _deletions;
std::vector<::shared_ptr<relation>> _where_clause;
public:
parsed(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<operation::raw_deletion>> deletions,
std::vector<::shared_ptr<relation>> where_clause,
conditions_vector conditions,
bool if_exists);
protected:
virtual ::shared_ptr<modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs);
};
};
}

View File

@@ -40,6 +40,7 @@
*/
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/restrictions/single_column_restriction.hh"
#include "cql3/single_column_relation.hh"
@@ -561,21 +562,23 @@ modification_statement::process_where_clause(database& db, std::vector<relation_
}
}
namespace raw {
::shared_ptr<prepared_statement>
modification_statement::parsed::prepare(database& db) {
modification_statement::modification_statement::prepare(database& db) {
auto bound_names = get_bound_variables();
auto statement = prepare(db, bound_names);
return ::make_shared<prepared>(std::move(statement), *bound_names);
}
::shared_ptr<modification_statement>
modification_statement::parsed::prepare(database& db, ::shared_ptr<variable_specifications> bound_names) {
::shared_ptr<cql3::statements::modification_statement>
modification_statement::prepare(database& db, ::shared_ptr<variable_specifications> bound_names) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto prepared_attributes = _attrs->prepare(db, keyspace(), column_family());
prepared_attributes->collect_marker_specification(bound_names);
::shared_ptr<modification_statement> stmt = prepare_internal(db, schema, bound_names, std::move(prepared_attributes));
::shared_ptr<cql3::statements::modification_statement> stmt = prepare_internal(db, schema, bound_names, std::move(prepared_attributes));
if (_if_not_exists || _if_exists || !_conditions.empty()) {
if (stmt->is_counter()) {
@@ -617,6 +620,8 @@ modification_statement::parsed::prepare(database& db, ::shared_ptr<variable_spec
return stmt;
}
}
void
modification_statement::validate(distributed<service::storage_proxy>&, const service::client_state& state) {
if (has_conditions() && attrs->is_timestamp_set()) {
@@ -689,7 +694,9 @@ void modification_statement::validate_where_clause_for_conditions() {
// no-op by default
}
modification_statement::parsed::parsed(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, conditions_vector conditions, bool if_not_exists, bool if_exists)
namespace raw {
modification_statement::modification_statement(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, conditions_vector conditions, bool if_not_exists, bool if_exists)
: cf_statement{std::move(name)}
, _attrs{std::move(attrs)}
, _conditions{std::move(conditions)}
@@ -700,3 +707,5 @@ modification_statement::parsed::parsed(::shared_ptr<cf_name> name, ::shared_ptr<
}
}
}

View File

@@ -66,6 +66,9 @@ namespace cql3 {
namespace statements {
namespace raw { class modification_statement; }
/*
* Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
*/
@@ -345,27 +348,7 @@ protected:
* @throws InvalidRequestException
*/
virtual void validate_where_clause_for_conditions();
public:
class parsed : public raw::cf_statement {
public:
using conditions_vector = std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<column_condition::raw>>>;
protected:
const ::shared_ptr<attributes::raw> _attrs;
const std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<column_condition::raw>>> _conditions;
private:
const bool _if_not_exists;
const bool _if_exists;
protected:
parsed(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, conditions_vector conditions, bool if_not_exists, bool if_exists);
public:
virtual ::shared_ptr<prepared> prepare(database& db) override;
::shared_ptr<modification_statement> prepare(database& db, ::shared_ptr<variable_specifications> bound_names);;
protected:
virtual ::shared_ptr<modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs) = 0;
};
friend class raw::modification_statement;
};
std::ostream& operator<<(std::ostream& out, modification_statement::statement_type t);

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by ScyllaDB
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cql3/cql_statement.hh"
#include "modification_statement.hh"
#include "service/storage_proxy.hh"
#include "transport/messages/result_message.hh"
#include "timestamp.hh"
#include "log.hh"
#include "to_string.hh"
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/uniqued.hpp>
#include <boost/iterator/counting_iterator.hpp>
#pragma once
namespace cql3 {
namespace statements {
namespace raw {
class batch_statement : public raw::cf_statement {
public:
enum class type {
LOGGED, UNLOGGED, COUNTER
};
private:
type _type;
shared_ptr<attributes::raw> _attrs;
std::vector<shared_ptr<raw::modification_statement>> _parsed_statements;
public:
batch_statement(
type type_,
shared_ptr<attributes::raw> attrs,
std::vector<shared_ptr<raw::modification_statement>> parsed_statements)
: cf_statement(nullptr)
, _type(type_)
, _attrs(std::move(attrs))
, _parsed_statements(std::move(parsed_statements)) {
}
virtual void prepare_keyspace(const service::client_state& state) override {
for (auto&& s : _parsed_statements) {
s->prepare_keyspace(state);
}
}
virtual shared_ptr<prepared> prepare(database& db) override;
};
}
}
}

View File

@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/attributes.hh"
#include "cql3/operation.hh"
#include "database_fwd.hh"
namespace cql3 {
namespace statements {
namespace raw {
class delete_statement : public modification_statement {
private:
std::vector<::shared_ptr<operation::raw_deletion>> _deletions;
std::vector<::shared_ptr<relation>> _where_clause;
public:
delete_statement(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<operation::raw_deletion>> deletions,
std::vector<::shared_ptr<relation>> where_clause,
conditions_vector conditions,
bool if_exists);
protected:
virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs);
};
}
}
}

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/column_identifier.hh"
#include "cql3/term.hh"
#include "database_fwd.hh"
#include <vector>
#include "unimplemented.hh"
namespace cql3 {
namespace statements {
namespace raw {
class insert_statement : public raw::modification_statement {
private:
const std::vector<::shared_ptr<column_identifier::raw>> _column_names;
const std::vector<::shared_ptr<term::raw>> _column_values;
public:
/**
* A parsed <code>INSERT</code> statement.
*
* @param name column family being operated on
* @param columnNames list of column names
* @param columnValues list of column values (corresponds to names)
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
insert_statement(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<column_identifier::raw>> column_names,
std::vector<::shared_ptr<term::raw>> column_values,
bool if_not_exists);
virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs) override;
};
}
}
}

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/restrictions/restriction.hh"
#include "cql3/statements/raw/cf_statement.hh"
#include "cql3/column_identifier.hh"
#include "cql3/update_parameters.hh"
#include "cql3/column_condition.hh"
#include "cql3/cql_statement.hh"
#include "cql3/attributes.hh"
#include "cql3/operation.hh"
#include "cql3/relation.hh"
#include "db/consistency_level.hh"
#include "core/shared_ptr.hh"
#include "core/future-util.hh"
#include "unimplemented.hh"
#include "validation.hh"
#include "service/storage_proxy.hh"
#include <memory>
namespace cql3 {
namespace statements {
class modification_statement;
namespace raw {
class modification_statement : public cf_statement {
public:
using conditions_vector = std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<column_condition::raw>>>;
protected:
const ::shared_ptr<attributes::raw> _attrs;
const std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<column_condition::raw>>> _conditions;
private:
const bool _if_not_exists;
const bool _if_exists;
protected:
modification_statement(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, conditions_vector conditions, bool if_not_exists, bool if_exists);
public:
virtual ::shared_ptr<prepared> prepare(database& db) override;
::shared_ptr<cql3::statements::modification_statement> prepare(database& db, ::shared_ptr<variable_specifications> bound_names);;
protected:
virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs) = 0;
};
}
}
}

View File

@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/column_identifier.hh"
#include "cql3/term.hh"
#include "database_fwd.hh"
#include <vector>
#include "unimplemented.hh"
namespace cql3 {
namespace statements {
class update_statement;
namespace raw {
class update_statement : public raw::modification_statement {
private:
// Provided for an UPDATE
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> _updates;
std::vector<relation_ptr> _where_clause;
public:
/**
* Creates a new UpdateStatement from a column family name, columns map, consistency
* level, and key term.
*
* @param name column family being operated on
* @param attrs additional attributes for statement (timestamp, timeToLive)
* @param updates a map of column operations to perform
* @param whereClause the where clause
*/
update_statement(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> updates,
std::vector<relation_ptr> where_clause,
conditions_vector conditions);
protected:
virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs);
};
}
}
}

View File

@@ -40,6 +40,8 @@
*/
#include "update_statement.hh"
#include "raw/update_statement.hh"
#include "raw/insert_statement.hh"
#include "unimplemented.hh"
#include "cql3/operation_impl.hh"
@@ -108,21 +110,24 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering
#endif
}
update_statement::parsed_insert::parsed_insert(::shared_ptr<cf_name> name,
namespace raw {
insert_statement::insert_statement( ::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<column_identifier::raw>> column_names,
std::vector<::shared_ptr<term::raw>> column_values,
bool if_not_exists)
: modification_statement::parsed{std::move(name), std::move(attrs), conditions_vector{}, if_not_exists, false}
: raw::modification_statement{std::move(name), std::move(attrs), conditions_vector{}, if_not_exists, false}
, _column_names{std::move(column_names)}
, _column_values{std::move(column_values)}
{ }
::shared_ptr<modification_statement>
update_statement::parsed_insert::prepare_internal(database& db, schema_ptr schema,
::shared_ptr<cql3::statements::modification_statement>
insert_statement::prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs)
{
auto stmt = ::make_shared<update_statement>(statement_type::INSERT, bound_names->size(), schema, std::move(attrs));
using statement_type = cql3::statements::modification_statement::statement_type;
auto stmt = ::make_shared<cql3::statements::update_statement>(statement_type::INSERT, bound_names->size(), schema, std::move(attrs));
// Created from an INSERT
if (stmt->is_counter()) {
@@ -164,21 +169,22 @@ update_statement::parsed_insert::prepare_internal(database& db, schema_ptr schem
return stmt;
}
update_statement::parsed_update::parsed_update(::shared_ptr<cf_name> name,
update_statement::update_statement( ::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> updates,
std::vector<relation_ptr> where_clause,
conditions_vector conditions)
: modification_statement::parsed(std::move(name), std::move(attrs), std::move(conditions), false, false)
: raw::modification_statement(std::move(name), std::move(attrs), std::move(conditions), false, false)
, _updates(std::move(updates))
, _where_clause(std::move(where_clause))
{ }
::shared_ptr<modification_statement>
update_statement::parsed_update::prepare_internal(database& db, schema_ptr schema,
::shared_ptr<cql3::statements::modification_statement>
update_statement::prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs)
{
auto stmt = ::make_shared<update_statement>(statement_type::UPDATE, bound_names->size(), schema, std::move(attrs));
using statement_type = cql3::statements::modification_statement::statement_type;
auto stmt = ::make_shared<cql3::statements::update_statement>(statement_type::UPDATE, bound_names->size(), schema, std::move(attrs));
for (auto&& entry : _updates) {
auto id = entry.first->prepare_column_identifier(schema);
@@ -203,3 +209,5 @@ update_statement::parsed_update::prepare_internal(database& db, schema_ptr schem
}
}
}

View File

@@ -42,6 +42,7 @@
#pragma once
#include "cql3/statements/modification_statement.hh"
#include "cql3/statements/raw/modification_statement.hh"
#include "cql3/column_identifier.hh"
#include "cql3/term.hh"
@@ -69,55 +70,6 @@ private:
virtual bool require_full_clustering_key() const override;
virtual void add_update_for_key(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) override;
public:
class parsed_insert : public modification_statement::parsed {
private:
const std::vector<::shared_ptr<column_identifier::raw>> _column_names;
const std::vector<::shared_ptr<term::raw>> _column_values;
public:
/**
* A parsed <code>INSERT</code> statement.
*
* @param name column family being operated on
* @param columnNames list of column names
* @param columnValues list of column values (corresponds to names)
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
parsed_insert(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<column_identifier::raw>> column_names,
std::vector<::shared_ptr<term::raw>> column_values,
bool if_not_exists);
virtual ::shared_ptr<modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs) override;
};
class parsed_update : public modification_statement::parsed {
private:
// Provided for an UPDATE
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> _updates;
std::vector<relation_ptr> _where_clause;
public:
/**
* Creates a new UpdateStatement from a column family name, columns map, consistency
* level, and key term.
*
* @param name column family being operated on
* @param attrs additional attributes for statement (timestamp, timeToLive)
* @param updates a map of column operations to perform
* @param whereClause the where clause
*/
parsed_update(::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> updates,
std::vector<relation_ptr> where_clause,
conditions_vector conditions);
protected:
virtual ::shared_ptr<modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs);
};
};
}

View File

@@ -1177,7 +1177,8 @@ future<> column_family::populate(sstring sstdir) {
return do_with(std::vector<future<>>(), [this, sstdir, verifier, descriptor] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [this, sstdir, verifier, descriptor, &futures] (directory_entry de) {
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
auto f = probe_file(sstdir, de.name).then([verifier, descriptor, sstdir] (auto entry) {
auto f = probe_file(sstdir, de.name).then([verifier, descriptor, sstdir, de] (auto entry) {
auto filename = sstdir + "/" + de.name;
if (entry.component == sstables::sstable::component_type::TemporaryStatistics) {
return remove_file(sstables::sstable::filename(sstdir, entry.ks, entry.cf, entry.version, entry.generation,
entry.format, sstables::sstable::component_type::TemporaryStatistics));
@@ -1186,9 +1187,9 @@ future<> column_family::populate(sstring sstdir) {
if (verifier->count(entry.generation)) {
if (verifier->at(entry.generation) == status::has_toc_file) {
if (entry.component == sstables::sstable::component_type::TOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed");
throw sstables::malformed_sstable_exception("Invalid State encountered. TOC file already processed", filename);
} else if (entry.component == sstables::sstable::component_type::TemporaryTOC) {
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed");
throw sstables::malformed_sstable_exception("Invalid State encountered. Temporary TOC file found after TOC file was processed", filename);
}
} else if (entry.component == sstables::sstable::component_type::TOC) {
verifier->at(entry.generation) = status::has_toc_file;

View File

@@ -2058,7 +2058,6 @@ public:
auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000;
auto delta = __int128_t(digest_resolver->last_modified()) - __int128_t(exec->_cmd->read_timestamp);
if (std::abs(delta) <= write_timeout) {
print("HERE %d\n", int64_t(delta));
exec->_proxy->_stats.global_read_repairs_canceled_due_to_concurrent_write++;
// if CL is local and non matching data is modified less then write_timeout ms ago do only local repair
auto i = boost::range::remove_if(exec->_targets, std::not1(std::cref(db::is_local)));

View File

@@ -25,6 +25,9 @@ namespace sstables {
class malformed_sstable_exception : public std::exception {
sstring _msg;
public:
malformed_sstable_exception(sstring msg, sstring filename)
: malformed_sstable_exception{sprint("%s in sstable %s", msg, filename)}
{}
malformed_sstable_exception(sstring s) : _msg(s) {}
const char *what() const noexcept {
return _msg.c_str();

View File

@@ -708,17 +708,17 @@ future<> sstable::read_toc() {
sstlog.debug("Reading TOC file {} ", file_path);
return open_checked_file_dma(sstable_read_error, file_path, open_flags::ro).then([this] (file f) {
return open_checked_file_dma(sstable_read_error, file_path, open_flags::ro).then([this, file_path] (file f) {
auto bufptr = allocate_aligned_buffer<char>(4096, 4096);
auto buf = bufptr.get();
auto fut = f.dma_read(0, buf, 4096);
return std::move(fut).then([this, f = std::move(f), bufptr = std::move(bufptr)] (size_t size) mutable {
return std::move(fut).then([this, f = std::move(f), bufptr = std::move(bufptr), file_path] (size_t size) mutable {
// This file is supposed to be very small. Theoretically we should check its size,
// but if we so much as read a whole page from it, there is definitely something fishy
// going on - and this simplifies the code.
if (size >= 4096) {
throw malformed_sstable_exception("SSTable too big: " + to_sstring(size) + " bytes.");
throw malformed_sstable_exception("SSTable too big: " + to_sstring(size) + " bytes", file_path);
}
std::experimental::string_view buf(bufptr.get(), size);
@@ -735,11 +735,11 @@ future<> sstable::read_toc() {
_components.insert(reverse_map(c, _component_map));
} catch (std::out_of_range& oor) {
_components.clear(); // so subsequent read_toc will be forced to fail again
throw malformed_sstable_exception("Unrecognized TOC component: " + c);
throw malformed_sstable_exception("Unrecognized TOC component: " + c, file_path);
}
}
if (!_components.size()) {
throw malformed_sstable_exception("Empty TOC");
throw malformed_sstable_exception("Empty TOC", file_path);
}
return f.close().finally([f] {});
});