Merge branch 'tgrabiec/cql3' from git@github.com:cloudius-systems/seastar-dev.git

CQL3 Conversions, application of mutations into db.
This commit is contained in:
Tomasz Grabiec
2015-02-09 10:32:15 +01:00
37 changed files with 1092 additions and 539 deletions

View File

@@ -99,6 +99,12 @@ modes = {
},
}
urchin_tests = [
'tests/urchin/mutation_test',
'tests/urchin/types_test',
'tests/perf/perf_mutation',
]
tests = [
'tests/test-reactor',
'tests/fileiotest',
@@ -121,7 +127,7 @@ tests = [
'tests/output_stream_test',
'tests/udp_zero_copy',
'tests/test-serialization'
]
] + urchin_tests
apps = [
'apps/httpd/httpd',
@@ -221,9 +227,7 @@ memcache = [
cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'Cassandra')
deps = {
'seastar': (['main.cc',
'database.cc',
urchin_core = (['database.cc',
'log.cc',
'cql/binary.rl',
'cql/server.cc',
@@ -244,17 +248,20 @@ deps = {
'service/storage_proxy.cc',
'cql3/operator.cc',
'cql3/relation.cc',
'db/db.cc',
'io/io.cc',
'cql3/column_identifier.cc',
'db/db.cc',
'io/io.cc',
'utils/utils.cc',
'utils/UUID_gen.cc',
'gms/version_generator.cc',
'gms/version_generator.cc',
'dht/dht.cc',
]
+ [Antlr3Grammar('cql3/Cql.g')]
+ [Thrift('interface/cassandra.thrift', 'Cassandra')]
+ core
),
+ core)
deps = {
'seastar': ['main.cc'] + urchin_core,
'tests/test-reactor': ['tests/test-reactor.cc'] + core,
'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core,
'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache,
@@ -281,6 +288,9 @@ deps = {
'tests/test-serialization': ['tests/test-serialization.cc'],
}
for t in urchin_tests:
deps[t] = urchin_core + [t + '.cc']
warnings = [
'-Wno-mismatched-tags', # clang-only
]

View File

@@ -52,11 +52,11 @@ public:
return bool(_ks_name);
}
sstring get_keyspace() const {
const sstring& get_keyspace() const {
return *_ks_name;
}
sstring get_column_family() const {
const sstring& get_column_family() const {
return _cf_name;
}

31
cql3/column_identifier.cc Normal file
View File

@@ -0,0 +1,31 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include "cql3/column_identifier.hh"
namespace cql3 {
::shared_ptr<column_identifier>
column_identifier::raw::prepare_column_identifier(schema_ptr schema) {
if (schema->regular_column_name_type == utf8_type) {
return ::make_shared<column_identifier>(_text, true);
}
// We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator
// to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to
// make a special check for those and treat them normally. See CASSANDRA-8178.
auto text_bytes = to_bytes(_text);
auto def = schema->get_column_definition(text_bytes);
if (def) {
return ::make_shared<column_identifier>(std::move(text_bytes), _text);
}
return ::make_shared<column_identifier>(schema->regular_column_name_type->from_string(_raw_text), _text);
}
std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) {
return out << id._text;
}
}

View File

@@ -31,6 +31,7 @@
#include <algorithm>
#include <functional>
#include <iostream>
namespace cql3 {
@@ -137,7 +138,7 @@ public:
* once the comparator is known with prepare(). This should only be used with identifiers that are actual
* column names. See CASSANDRA-8178 for more background.
*/
class raw : public selectable::raw {
class raw final : public selectable::raw {
private:
const sstring _raw_text;
sstring _text;
@@ -155,25 +156,7 @@ public:
return prepare_column_identifier(s);
}
::shared_ptr<column_identifier> prepare_column_identifier(schema_ptr s) {
#if 0
AbstractType<?> comparator = cfm.comparator.asAbstractType();
if (cfm.getIsDense() || comparator instanceof CompositeType || comparator instanceof UTF8Type)
return new ColumnIdentifier(text, true);
// We have a Thrift-created table with a non-text comparator. We need to parse column names with the comparator
// to get the correct ByteBuffer representation. However, this doesn't apply to key aliases, so we need to
// make a special check for those and treat them normally. See CASSANDRA-8178.
ByteBuffer bufferName = ByteBufferUtil.bytes(text);
for (ColumnDefinition def : cfm.partitionKeyColumns())
{
if (def.name.bytes.equals(bufferName))
return new ColumnIdentifier(text, true);
}
return new ColumnIdentifier(comparator.fromString(rawText), text);
#endif
throw std::runtime_error("not implemented");
}
::shared_ptr<column_identifier> prepare_column_identifier(schema_ptr s);
virtual bool processes_selection() const override {
return false;
@@ -195,6 +178,7 @@ public:
}
friend std::hash<column_identifier::raw>;
friend std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id);
};
};

View File

@@ -336,9 +336,14 @@ public:
public:
using operation::operation;
virtual void execute(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override {
virtual void execute(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override {
bytes_opt value = _t->bind_and_get(params._options);
m.set_cell(prefix, column.id, value ? params.make_cell(*value) : params.make_dead_cell());
auto cell = value ? params.make_cell(*value) : params.make_dead_cell();
if (column.is_static()) {
m.set_static_cell(column, std::move(cell));
} else {
m.set_clustered_cell(prefix, column, std::move(cell));
}
}
};

View File

@@ -28,6 +28,7 @@
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/query_state.hh"
#include "service/storage_proxy.hh"
#include "cql3/query_options.hh"
#include "database.hh"
@@ -62,7 +63,7 @@ public:
* @param options options for this query (consistency, variables, pageSize, ...)
*/
virtual future<std::experimental::optional<transport::messages::result_message>>
execute(service::query_state& state, const query_options& options) = 0;
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) = 0;
/**
* Variant of execute used for internal query against the system tables, and thus only query the local node = 0.
@@ -70,7 +71,7 @@ public:
* @param state the current query state
*/
virtual future<std::experimental::optional<transport::messages::result_message>>
execute_internal(service::query_state& state, const query_options& options) = 0;
execute_internal(database& db, service::query_state& state, const query_options& options) = 0;
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const = 0;
};

View File

@@ -27,7 +27,7 @@
#include "core/shared_ptr.hh"
#include "db/api.hh"
#include "database.hh"
#include <experimental/optional>
@@ -104,7 +104,7 @@ public:
/**
* Execute the operation.
*/
virtual void execute(api::mutation& m, const api::clustering_prefix& row_key, const update_parameters& params) = 0;
virtual void execute(mutation& m, const clustering_prefix& row_key, const update_parameters& params) = 0;
/**
* A parsed raw UPDATE operation.

View File

@@ -25,7 +25,7 @@
#ifndef CQL3_CQL_QUERY_OPTIONS_HH
#define CQL3_CQL_QUERY_OPTIONS_HH
#include "db/api.hh"
#include "database.hh"
#include "db/consistency_level.hh"
#include "service/query_state.hh"

View File

@@ -60,7 +60,7 @@ public:
, _attrs{std::move(attrs)}
{ }
virtual sstring keyspace() const override {
virtual const sstring& keyspace() const override {
return _name;
}

View File

@@ -62,12 +62,12 @@ public:
}
}
virtual sstring keyspace() const {
virtual const sstring& keyspace() const {
assert(_cf_name->has_keyspace()); // "The statement hasn't be prepared correctly";
return _cf_name->get_keyspace();
}
virtual sstring column_family() const {
virtual const sstring& column_family() const {
return _cf_name->get_column_family();
}
};

View File

@@ -28,9 +28,9 @@ namespace cql3 {
namespace statements {
void delete_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) {
void delete_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) {
if (_column_operations.empty()) {
m.p.apply_delete(prefix, params.make_tombstone());
m.p.apply_delete(s, prefix, params.make_tombstone());
return;
}

View File

@@ -27,7 +27,7 @@
#include "cql3/statements/modification_statement.hh"
#include "cql3/attributes.hh"
#include "cql3/operation.hh"
#include "db/api.hh"
#include "database.hh"
namespace cql3 {
@@ -62,7 +62,7 @@ public:
return false;
}
virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override;
virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override;
#if 0
protected void validateWhereClauseForConditions() throws InvalidRequestException

View File

@@ -50,13 +50,13 @@ operator<<(std::ostream& out, modification_statement::statement_type t) {
return out;
}
future<std::vector<api::mutation>>
future<std::vector<mutation>>
modification_statement::get_mutations(const query_options& options, bool local, int64_t now) {
auto keys = make_lw_shared(build_partition_keys(options));
auto prefix = make_lw_shared(create_clustering_prefix(options));
return make_update_parameters(keys, prefix, options, local, now).then(
[this, keys = std::move(keys), prefix = std::move(prefix), now] (auto params_ptr) {
std::vector<api::mutation> mutations;
std::vector<mutation> mutations;
mutations.reserve(keys->size());
for (auto key : *keys) {
validation::validate_cql_key(s, key);
@@ -70,8 +70,8 @@ modification_statement::get_mutations(const query_options& options, bool local,
future<std::unique_ptr<update_parameters>>
modification_statement::make_update_parameters(
lw_shared_ptr<std::vector<api::partition_key>> keys,
lw_shared_ptr<api::clustering_prefix> prefix,
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<clustering_prefix> prefix,
const query_options& options,
bool local,
int64_t now) {
@@ -87,8 +87,8 @@ modification_statement::make_update_parameters(
future<update_parameters::prefetched_rows_type>
modification_statement::read_required_rows(
lw_shared_ptr<std::vector<api::partition_key>> keys,
lw_shared_ptr<api::clustering_prefix> prefix,
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<clustering_prefix> prefix,
bool local,
db::consistency_level cl) {
if (!requires_read()) {
@@ -148,7 +148,7 @@ modification_statement::get_first_empty_key() {
return {};
}
api::clustering_prefix
clustering_prefix
modification_statement::create_clustering_prefix_internal(const query_options& options) {
std::vector<bytes_opt> components;
const column_definition* first_empty_key = nullptr;
@@ -184,7 +184,7 @@ modification_statement::create_clustering_prefix_internal(const query_options& o
return components;
}
api::clustering_prefix
clustering_prefix
modification_statement::create_clustering_prefix(const query_options& options) {
// If the only updated/deleted columns are static, then we don't need clustering columns.
// And in fact, unless it is an INSERT, we reject if clustering columns are provided as that
@@ -222,9 +222,9 @@ modification_statement::create_clustering_prefix(const query_options& options) {
return create_clustering_prefix_internal(options);
}
std::vector<api::partition_key>
std::vector<partition_key>
modification_statement::build_partition_keys(const query_options& options) {
std::vector<api::partition_key> result;
std::vector<partition_key> result;
std::vector<bytes_opt> components;
auto remaining = s->partition_key.size();
@@ -244,7 +244,7 @@ modification_statement::build_partition_keys(const query_options& options) {
throw exceptions::invalid_request_exception(sprint("Invalid null value for partition key part %s", def.name_as_text()));
}
components.push_back(val);
api::partition_key key = serialize_value(*s->partition_key_type, components);
partition_key key = serialize_value(*s->partition_key_type, components);
validation::validate_cql_key(s, key);
result.push_back(key);
} else {
@@ -256,7 +256,7 @@ modification_statement::build_partition_keys(const query_options& options) {
full_components.reserve(components.size() + 1);
auto i = std::copy(components.begin(), components.end(), std::back_inserter(full_components));
*i = val;
api::partition_key key = serialize_value(*s->partition_key_type, full_components);
partition_key key = serialize_value(*s->partition_key_type, full_components);
validation::validate_cql_key(s, key);
result.push_back(key);
}
@@ -278,23 +278,23 @@ modification_statement::build_partition_keys(const query_options& options) {
}
future<std::experimental::optional<transport::messages::result_message>>
modification_statement::execute(service::query_state& qs, const query_options& options) {
modification_statement::execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {
if (has_conditions() && options.get_protocol_version() == 1) {
throw new exceptions::invalid_request_exception("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2.");
}
if (has_conditions()) {
return execute_with_condition(qs, options);
return execute_with_condition(proxy, qs, options);
}
return execute_without_condition(qs, options).then([] {
return execute_without_condition(proxy, qs, options).then([] {
return make_ready_future<std::experimental::optional<transport::messages::result_message>>(
std::experimental::optional<transport::messages::result_message>{});
});
}
future<>
modification_statement::execute_without_condition(service::query_state& qs, const query_options& options) {
modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {
auto cl = options.get_consistency();
if (is_counter()) {
db::validate_counter_for_write(s, cl);
@@ -302,16 +302,16 @@ modification_statement::execute_without_condition(service::query_state& qs, cons
db::validate_for_write(s->ks_name, cl);
}
return get_mutations(options, false, options.get_timestamp(qs)).then([cl] (auto mutations) {
return get_mutations(options, false, options.get_timestamp(qs)).then([cl, &proxy] (auto mutations) {
if (mutations.empty()) {
return now();
}
return service::storage_proxy::mutate_with_triggers(std::move(mutations), cl, false);
return proxy.mutate_with_triggers(std::move(mutations), cl, false);
});
}
future<std::experimental::optional<transport::messages::result_message>>
modification_statement::execute_with_condition(service::query_state& qs, const query_options& options) {
modification_statement::execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) {
unimplemented::lwt();
#if 0
List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -356,7 +356,7 @@ modification_statement::add_key_value(column_definition& def, ::shared_ptr<term>
}
void
modification_statement::precess_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names) {
modification_statement::process_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names) {
for (auto&& relation : where_clause) {
if (relation->is_multi_column()) {
throw exceptions::invalid_request_exception(sprint("Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", relation->to_string()));
@@ -387,6 +387,65 @@ modification_statement::precess_where_clause(std::vector<relation_ptr> where_cla
}
}
std::unique_ptr<parsed_statement::prepared>
modification_statement::parsed::prepare(database& db) {
auto bound_names = get_bound_variables();
auto statement = prepare(db, bound_names);
return std::make_unique<parsed_statement::prepared>(std::move(statement), *bound_names);
}
::shared_ptr<modification_statement>
modification_statement::parsed::prepare(database& db, ::shared_ptr<variable_specifications> bound_names) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto prepared_attributes = _attrs->prepare(keyspace(), column_family());
prepared_attributes->collect_marker_specification(bound_names);
::shared_ptr<modification_statement> stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes));
if (_if_not_exists || _if_exists || !_conditions.empty()) {
if (stmt->is_counter()) {
throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables");
}
if (_attrs->timestamp) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates");
}
if (_if_not_exists) {
// To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
// So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
assert(_conditions.empty());
assert(!_if_exists);
stmt->set_if_not_exist_condition();
} else if (_if_exists) {
assert(_conditions.empty());
assert(!_if_not_exists);
stmt->set_if_exist_condition();
} else {
for (auto&& entry : _conditions) {
auto id = entry.first->prepare_column_identifier(schema);
column_definition* def = get_column_definition(schema, *id);
if (!def) {
throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id));
}
auto condition = entry.second->prepare(keyspace(), *def);
condition->collect_marker_specificaton(bound_names);
switch (def->kind) {
case column_definition::PARTITION:
case column_definition::CLUSTERING:
throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id));
default:
stmt->add_condition(condition);
}
}
}
stmt->validate_where_clause_for_conditions();
}
return stmt;
}
}
}

View File

@@ -35,7 +35,6 @@
#include "cql3/operation.hh"
#include "cql3/relation.hh"
#include "db/column_family.hh"
#include "db/consistency_level.hh"
#include "core/shared_ptr.hh"
@@ -161,7 +160,7 @@ public:
virtual bool require_full_clustering_key() const = 0;
virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) = 0;
virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) = 0;
virtual int get_bound_terms() override {
return _bound_terms;
@@ -267,12 +266,12 @@ private:
public:
void add_key_value(column_definition& def, ::shared_ptr<term> value);
void precess_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names);
std::vector<api::partition_key> build_partition_keys(const query_options& options);
void process_where_clause(std::vector<relation_ptr> where_clause, ::shared_ptr<variable_specifications> names);
std::vector<partition_key> build_partition_keys(const query_options& options);
private:
api::clustering_prefix create_clustering_prefix(const query_options& options);
api::clustering_prefix create_clustering_prefix_internal(const query_options& options);
clustering_prefix create_clustering_prefix(const query_options& options);
clustering_prefix create_clustering_prefix_internal(const query_options& options);
protected:
const column_definition* get_first_empty_key();
@@ -286,8 +285,8 @@ public:
protected:
future<update_parameters::prefetched_rows_type> read_required_rows(
lw_shared_ptr<std::vector<api::partition_key>> keys,
lw_shared_ptr<api::clustering_prefix> prefix,
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<clustering_prefix> prefix,
bool local,
db::consistency_level cl);
@@ -297,19 +296,19 @@ public:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute(service::query_state& qs, const query_options& options) override;
execute(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) override;
virtual future<std::experimental::optional<transport::messages::result_message>>
execute_internal(service::query_state& qs, const query_options& options) override {
execute_internal(database& db, service::query_state& qs, const query_options& options) override {
throw std::runtime_error("not implemented");
}
private:
future<>
execute_without_condition(service::query_state& qs, const query_options& options);
execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options);
future<std::experimental::optional<transport::messages::result_message>>
execute_with_condition(service::query_state& qs, const query_options& options);
execute_with_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options);
#if 0
public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
@@ -436,12 +435,12 @@ private:
* @return vector of the mutations
* @throws invalid_request_exception on invalid requests
*/
future<std::vector<api::mutation>> get_mutations(const query_options& options, bool local, int64_t now);
future<std::vector<mutation>> get_mutations(const query_options& options, bool local, int64_t now);
public:
future<std::unique_ptr<update_parameters>> make_update_parameters(
lw_shared_ptr<std::vector<api::partition_key>> keys,
lw_shared_ptr<api::clustering_prefix> prefix,
lw_shared_ptr<std::vector<partition_key>> keys,
lw_shared_ptr<clustering_prefix> prefix,
const query_options& options,
bool local,
int64_t now);
@@ -476,62 +475,8 @@ public:
{ }
public:
std::unique_ptr<parsed_statement::prepared> prepare(database& db) override {
auto bound_names = get_bound_variables();
auto statement = prepare(db, bound_names);
return std::make_unique<parsed_statement::prepared>(std::move(statement), *bound_names);
}
::shared_ptr<modification_statement> prepare(database& db, ::shared_ptr<variable_specifications> bound_names) {
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
auto prepared_attributes = _attrs->prepare(keyspace(), column_family());
prepared_attributes->collect_marker_specification(bound_names);
::shared_ptr<modification_statement> stmt = prepare_internal(schema, bound_names, std::move(prepared_attributes));
if (_if_not_exists || _if_exists || !_conditions.empty()) {
if (stmt->is_counter()) {
throw exceptions::invalid_request_exception("Conditional updates are not supported on counter tables");
}
if (_attrs->timestamp) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for conditional updates");
}
if (_if_not_exists) {
// To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
// So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
assert(_conditions.empty());
assert(!_if_exists);
stmt->set_if_not_exist_condition();
} else if (_if_exists) {
assert(_conditions.empty());
assert(!_if_not_exists);
stmt->set_if_exist_condition();
} else {
for (auto&& entry : _conditions) {
auto id = entry.first->prepare_column_identifier(schema);
column_definition* def = get_column_definition(schema, *id);
if (!def) {
throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *id));
}
auto condition = entry.second->prepare(keyspace(), *def);
condition->collect_marker_specificaton(bound_names);
switch (def->kind) {
case column_definition::PARTITION:
case column_definition::CLUSTERING:
throw exceptions::invalid_request_exception(sprint("PRIMARY KEY column '%s' cannot have IF conditions", *id));
default:
stmt->add_condition(condition);
}
}
}
stmt->validate_where_clause_for_conditions();
}
return stmt;
};
virtual std::unique_ptr<parsed_statement::prepared> prepare(database& db) override;
::shared_ptr<modification_statement> prepare(database& db, ::shared_ptr<variable_specifications> bound_names);;
protected:
virtual ::shared_ptr<modification_statement> prepare_internal(schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs) = 0;

View File

@@ -82,7 +82,7 @@ protected:
#endif
virtual future<std::experimental::optional<transport::messages::result_message>>
execute(service::query_state& state, const query_options& options) override {
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override {
throw std::runtime_error("not implemented");
#if 0
// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
@@ -97,7 +97,7 @@ protected:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute_internal(service::query_state& state, const query_options& options) override {
execute_internal(database& db, service::query_state& state, const query_options& options) override {
throw std::runtime_error("unsupported operation");
#if 0
try

View File

@@ -63,7 +63,7 @@ public:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute(service::query_state& state, const query_options& options) override {
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override {
throw std::runtime_error("not implemented");
#if 0
try
@@ -87,7 +87,7 @@ public:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute_internal(service::query_state& state, const query_options& options) override {
execute_internal(database& db, service::query_state& state, const query_options& options) override {
throw std::runtime_error("unsupported operation");
}
};

View File

@@ -31,7 +31,7 @@ namespace cql3 {
namespace statements {
void update_statement::add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) {
void update_statement::add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) {
if (s->is_dense()) {
throw std::runtime_error("Dense tables not supported yet");
#if 0
@@ -136,6 +136,36 @@ update_statement::parsed_insert::prepare_internal(schema_ptr schema,
return stmt;
}
::shared_ptr<modification_statement>
update_statement::parsed_update::prepare_internal(schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs)
{
auto stmt = ::make_shared<update_statement>(statement_type::UPDATE, bound_names->size(), schema, std::move(attrs));
for (auto&& entry : _updates) {
auto id = entry.first->prepare_column_identifier(schema);
auto def = get_column_definition(schema, *id);
if (!def) {
throw exceptions::invalid_request_exception(sprint("Unknown identifier %s", *entry.first));
}
auto operation = entry.second->prepare(keyspace(), *def);
operation->collect_marker_specification(bound_names);
switch (def->kind) {
case column_definition::column_kind::PARTITION:
case column_definition::column_kind::CLUSTERING:
throw exceptions::invalid_request_exception(sprint("PRIMARY KEY part %s found in SET part", *entry.first));
default:
stmt->add_operation(std::move(operation));
break;
}
}
stmt->process_where_clause(_where_clause, bound_names);
return stmt;
}
}
}

View File

@@ -29,7 +29,7 @@
#include "cql3/column_identifier.hh"
#include "cql3/term.hh"
#include "db/api.hh"
#include "database.hh"
#include <vector>
#include "unimplemented.hh"
@@ -74,7 +74,7 @@ private:
return true;
}
virtual void add_update_for_key(api::mutation& m, const api::clustering_prefix& prefix, const update_parameters& params) override;
virtual void add_update_for_key(mutation& m, const clustering_prefix& prefix, const update_parameters& params) override;
class parsed_insert : public modification_statement::parsed {
private:
@@ -89,7 +89,7 @@ private:
* @param columnValues list of column values (corresponds to names)
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
parsed_insert(std::experimental::optional<cf_name>&& name,
parsed_insert(std::experimental::optional<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<::shared_ptr<column_identifier::raw>> column_names,
std::vector<::shared_ptr<term::raw>> column_values,
@@ -104,13 +104,12 @@ private:
};
#if 0
public static class ParsedUpdate extends ModificationStatement.Parsed
{
class parsed_update : public modification_statement::parsed {
private:
// Provided for an UPDATE
private final List<Pair<ColumnIdentifier.Raw, Operation.RawUpdate>> updates;
private final List<Relation> whereClause;
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> _updates;
std::vector<relation_ptr> _where_clause;
public:
/**
* Creates a new UpdateStatement from a column family name, columns map, consistency
* level, and key term.
@@ -120,46 +119,19 @@ private:
* @param updates a map of column operations to perform
* @param whereClause the where clause
*/
public ParsedUpdate(CFName name,
Attributes.Raw attrs,
List<Pair<ColumnIdentifier.Raw, Operation.RawUpdate>> updates,
List<Relation> whereClause,
List<Pair<ColumnIdentifier.Raw, ColumnCondition.Raw>> conditions)
{
super(name, attrs, conditions, false, false);
this.updates = updates;
this.whereClause = whereClause;
}
protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, boundNames.size(), cfm, attrs);
for (Pair<ColumnIdentifier.Raw, Operation.RawUpdate> entry : updates)
{
ColumnDefinition def = cfm.getColumnDefinition(entry.left.prepare(cfm));
if (def == null)
throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
Operation operation = entry.right.prepare(keyspace(), def);
operation.collectMarkerSpecification(boundNames);
switch (def.kind)
{
case PARTITION_KEY:
case CLUSTERING_COLUMN:
throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
default:
stmt.addOperation(operation);
break;
}
}
stmt.processWhereClause(whereClause, boundNames);
return stmt;
}
}
#endif
parsed_update(std::experimental::optional<cf_name> name,
::shared_ptr<attributes::raw> attrs,
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<operation::raw_update>>> updates,
std::vector<relation_ptr> where_clause,
std::vector<std::pair<::shared_ptr<column_identifier::raw>, ::shared_ptr<column_condition::raw>>> conditions)
: modification_statement::parsed(std::move(name), std::move(attrs), std::move(conditions), false, false)
, _updates(std::move(updates))
, _where_clause(std::move(where_clause))
{ }
protected:
virtual ::shared_ptr<modification_statement> prepare_internal(schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs);
};
};
}

View File

@@ -61,7 +61,7 @@ public:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute(service::query_state& state, const query_options& options) override {
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override {
throw std::runtime_error("not implemented");
#if 0
state.getClientState().setKeyspace(keyspace);
@@ -70,7 +70,7 @@ public:
}
virtual future<std::experimental::optional<transport::messages::result_message>>
execute_internal(service::query_state& state, const query_options& options) override {
execute_internal(database& db, service::query_state& state, const query_options& options) override {
// Internal queries are exclusively on the system keyspace and 'use' is thus useless
throw std::runtime_error("unsupported operation");
}

View File

@@ -25,7 +25,7 @@
#ifndef CQL3_UPDATE_PARAMETERS_HH
#define CQL3_UPDATE_PARAMETERS_HH
#include "db/api.hh"
#include "database.hh"
#include "exceptions/exceptions.hh"
namespace cql3 {
@@ -36,8 +36,7 @@ namespace cql3 {
class update_parameters final {
public:
using prefetched_rows_type = std::experimental::optional<
std::unordered_map<api::partition_key, api::row,
serialized_hash, serialized_equal>>;
std::unordered_map<partition_key, row, serialized_hash, serialized_equal>>;
private:
const gc_clock::duration _ttl;
const prefetched_rows_type _prefetched; // For operation that require a read-before-write
@@ -64,21 +63,20 @@ public:
}
}
api::atomic_cell make_dead_cell() const {
return {make_tombstone()};
atomic_cell make_dead_cell() const {
return atomic_cell{_timestamp, atomic_cell::dead{_local_deletion_time}};
}
api::atomic_cell make_cell(bytes value) const {
atomic_cell make_cell(bytes value) const {
auto ttl = _ttl;
if (!ttl.count()) {
if (ttl.count() <= 0) {
ttl = _schema->default_time_to_live;
}
return api::atomic_cell(api::live_atomic_cell(_timestamp,
ttl.count() ? api::ttl_opt{_local_deletion_time + ttl} : api::ttl_opt{},
std::move(value)));
}
return atomic_cell{_timestamp,
atomic_cell::live{ttl.count() > 0 ? ttl_opt{_local_deletion_time + ttl} : ttl_opt{}, std::move(value)}};
};
#if 0
public Cell makeCounter(CellName name, long delta) throws InvalidRequestException
@@ -88,7 +86,7 @@ public:
}
#endif
api::tombstone make_tombstone() const {
tombstone make_tombstone() const {
return {_timestamp, _local_deletion_time};
}

View File

@@ -10,10 +10,6 @@
thread_local logging::logger dblog("database");
partition::partition(column_family& cf)
: rows(key_compare(cf._schema->clustering_key_type)) {
}
template<typename Sequence>
std::vector<::shared_ptr<abstract_type>>
get_column_types(const Sequence& column_definitions) {
@@ -78,7 +74,7 @@ column_family::column_family(schema_ptr schema)
, partitions(key_compare(_schema->thrift.partition_key_type)) {
}
partition*
mutation_partition*
column_family::find_partition(const bytes& key) {
auto i = partitions.find(key);
return i == partitions.end() ? nullptr : &i->second;
@@ -86,33 +82,28 @@ column_family::find_partition(const bytes& key) {
row*
column_family::find_row(const bytes& partition_key, const bytes& clustering_key) {
partition* p = find_partition(partition_key);
mutation_partition* p = find_partition(partition_key);
if (!p) {
return nullptr;
}
auto i = p->rows.find(clustering_key);
return i == p->rows.end() ? nullptr : &i->second;
return p->find_row(clustering_key);
}
partition&
mutation_partition&
column_family::find_or_create_partition(const bytes& key) {
// call lower_bound so we have a hint for the insert, just in case.
auto i = partitions.lower_bound(key);
if (i == partitions.end() || key != i->first) {
i = partitions.emplace_hint(i, std::make_pair(std::move(key), partition(*this)));
i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema)));
}
return i->second;
}
row&
column_family::find_or_create_row(const bytes& partition_key, const bytes& clustering_key) {
partition& p = find_or_create_partition(partition_key);
mutation_partition& p = find_or_create_partition(partition_key);
// call lower_bound so we have a hint for the insert, just in case.
auto i = p.rows.lower_bound(clustering_key);
if (i == p.rows.end() || clustering_key != i->first) {
i = p.rows.emplace_hint(i, std::make_pair(std::move(clustering_key), row()));
}
return i->second;
return p.clustered_row(clustering_key);
}
sstring to_hex(const bytes& b) {
@@ -237,13 +228,22 @@ column_definition::name() const {
return _name;
}
schema_ptr
keyspace::find_schema(sstring cf_name) {
column_family*
keyspace::find_column_family(sstring cf_name) {
auto i = column_families.find(cf_name);
if (i == column_families.end()) {
return nullptr;
}
return &i->second;
}
schema_ptr
keyspace::find_schema(sstring cf_name) {
auto cf = find_column_family(cf_name);
if (!cf) {
return {};
}
return i->second._schema;
return cf->_schema;
}
keyspace*
@@ -254,3 +254,82 @@ database::find_keyspace(sstring name) {
}
return nullptr;
}
void
column_family::apply(mutation&& m) {
mutation_partition& p = find_or_create_partition(std::move(m.key));
p.apply(_schema, std::move(m.p));
}
// Based on org.apache.cassandra.db.AbstractCell#reconcile()
static inline
int
compare_for_merge(const column_definition& def, const atomic_cell& left, const atomic_cell& right) {
if (left.timestamp != right.timestamp) {
return left.timestamp > right.timestamp ? 1 : -1;
}
if (left.value.which() != right.value.which()) {
return left.is_live() ? -1 : 1;
}
if (left.is_live()) {
return def.type->compare(left.as_live().value, right.as_live().value);
} else {
auto& c1 = left.as_dead();
auto& c2 = right.as_dead();
if (c1.ttl != c2.ttl) {
// Origin compares big-endian serialized TTL
return (uint32_t)c1.ttl.time_since_epoch().count() < (uint32_t)c2.ttl.time_since_epoch().count() ? -1 : 1;
}
return 0;
}
}
static inline
int
compare_for_merge(const column_definition& def,
const std::pair<column_id, boost::any>& left,
const std::pair<column_id, boost::any>& right) {
if (def.is_atomic()) {
return compare_for_merge(def, boost::any_cast<const atomic_cell&>(left.second),
boost::any_cast<const atomic_cell&>(right.second));
} else {
throw std::runtime_error("not implemented");
}
}
void mutation_partition::apply(schema_ptr schema, mutation_partition&& p) {
_tombstone.apply(p._tombstone);
for (auto&& entry : p._row_tombstones) {
apply_row_tombstone(schema, std::move(entry));
}
auto merge_cells = [this, schema] (row& old_row, row&& new_row) {
for (auto&& new_column : new_row) {
auto col = new_column.first;
auto i = old_row.find(col);
if (i == old_row.end()) {
_static_row.emplace_hint(i, std::move(new_column));
} else {
auto& old_column = *i;
auto& def = schema->regular_column_at(col);
if (compare_for_merge(def, old_column, new_column) < 0) {
old_column.second = std::move(new_column.second);
}
}
}
};
merge_cells(_static_row, std::move(p._static_row));
for (auto&& entry : p._rows) {
auto& key = entry.first;
auto i = _rows.find(key);
if (i == _rows.end()) {
_rows.emplace_hint(i, std::move(entry));
} else {
i->second.t.apply(entry.second.t);
merge_cells(i->second.cells, std::move(entry.second.cells));
}
}
}

View File

@@ -27,134 +27,253 @@
#include "tuple.hh"
#include "core/future.hh"
#include "cql3/column_specification.hh"
#include <limits>
#include <cstddef>
#include "schema.hh"
struct row;
struct paritition;
struct column_family;
using partition_key_type = tuple_type<>;
using clustering_key_type = tuple_type<>;
using clustering_prefix_type = tuple_prefix;
using partition_key = bytes;
using clustering_key = bytes;
using clustering_prefix = clustering_prefix_type::value_type;
struct row {
std::vector<bytes> cells;
namespace api {
using timestamp_type = int64_t;
timestamp_type constexpr missing_timestamp = std::numeric_limits<timestamp_type>::min();
timestamp_type constexpr min_timestamp = std::numeric_limits<timestamp_type>::min() + 1;
timestamp_type constexpr max_timestamp = std::numeric_limits<timestamp_type>::max();
}
/**
* Represents deletion operation. Can be commuted with other tombstones via apply() method.
* Can be empty.
*
*/
struct tombstone final {
api::timestamp_type timestamp;
gc_clock::time_point ttl;
tombstone(api::timestamp_type timestamp, gc_clock::time_point ttl)
: timestamp(timestamp)
, ttl(ttl)
{ }
tombstone()
: tombstone(api::missing_timestamp, {})
{ }
int compare(const tombstone& t) const {
if (timestamp < t.timestamp) {
return -1;
} else if (timestamp > t.timestamp) {
return 1;
} else if (ttl < t.ttl) {
return -1;
} else if (ttl > t.ttl) {
return 1;
} else {
return 0;
}
}
bool operator<(const tombstone& t) const {
return compare(t) < 0;
}
bool operator<=(const tombstone& t) const {
return compare(t) <= 0;
}
bool operator>(const tombstone& t) const {
return compare(t) > 0;
}
bool operator>=(const tombstone& t) const {
return compare(t) >= 0;
}
bool operator==(const tombstone& t) const {
return compare(t) == 0;
}
bool operator!=(const tombstone& t) const {
return compare(t) != 0;
}
explicit operator bool() const {
return timestamp != api::missing_timestamp;
}
void apply(const tombstone& t) {
if (*this < t) {
*this = t;
}
}
friend std::ostream& operator<<(std::ostream& out, const tombstone& t) {
return out << "{timestamp=" << t.timestamp << ", ttl=" << t.ttl.time_since_epoch().count() << "}";
}
};
struct partition {
explicit partition(column_family& cf);
row static_columns;
// row key within partition -> row
std::map<bytes, row, key_compare> rows;
};
using ttl_opt = std::experimental::optional<gc_clock::time_point>;
using column_id = uint32_t;
class column_definition final {
private:
bytes _name;
public:
enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC };
column_definition(bytes name, data_type type, column_id id, column_kind kind);
data_type type;
column_id id; // unique within (kind, schema instance)
column_kind kind;
::shared_ptr<cql3::column_specification> column_specification;
bool is_static() const { return kind == column_kind::STATIC; }
bool is_partition_key() const { return kind == column_kind::PARTITION; }
const sstring& name_as_text() const;
const bytes& name() const;
};
struct thrift_schema {
shared_ptr<abstract_type> partition_key_type;
};
/*
* Keep this effectively immutable.
*/
class schema final {
private:
std::unordered_map<bytes, column_definition*> _columns_by_name;
std::map<bytes, column_definition*, serialized_compare> _regular_columns_by_name;
public:
struct column {
bytes name;
data_type type;
struct name_compare {
shared_ptr<abstract_type> type;
name_compare(shared_ptr<abstract_type> type) : type(type) {}
bool operator()(const column& cd1, const column& cd2) const {
return type->less(cd1.name, cd2.name);
}
};
struct atomic_cell final {
struct dead {
gc_clock::time_point ttl;
};
struct live {
ttl_opt ttl;
bytes value;
};
api::timestamp_type timestamp;
boost::variant<dead, live> value;
bool is_live() const { return value.which() == 1; }
// Call only when is_live() == true
const live& as_live() const { return boost::get<live>(value); }
// Call only when is_live() == false
const dead& as_dead() const { return boost::get<dead>(value); }
};
using row = std::map<column_id, boost::any>;
struct deletable_row final {
tombstone t;
row cells;
};
using row_tombstone_set = std::map<bytes, tombstone, serialized_compare>;
class mutation_partition final {
private:
void build_columns(std::vector<column> columns, column_definition::column_kind kind, std::vector<column_definition>& dst);
::shared_ptr<cql3::column_specification> make_column_specification(column_definition& def);
tombstone _tombstone;
row _static_row;
std::map<clustering_key, deletable_row, key_compare> _rows;
row_tombstone_set _row_tombstones;
public:
gc_clock::duration default_time_to_live = gc_clock::duration::zero();
const sstring ks_name;
const sstring cf_name;
std::vector<column_definition> partition_key;
std::vector<column_definition> clustering_key;
std::vector<column_definition> regular_columns; // sorted by name
shared_ptr<tuple_type<>> partition_key_type;
shared_ptr<tuple_type<>> clustering_key_type;
shared_ptr<tuple_prefix> clustering_key_prefix_type;
data_type regular_column_name_type;
thrift_schema thrift;
public:
schema(sstring ks_name, sstring cf_name,
std::vector<column> partition_key,
std::vector<column> clustering_key,
std::vector<column> regular_columns,
shared_ptr<abstract_type> regular_column_name_type);
bool is_dense() const {
return false;
mutation_partition(schema_ptr s)
: _rows(key_compare(s->clustering_key_type))
, _row_tombstones(serialized_compare(s->clustering_key_prefix_type))
{ }
void apply(tombstone t) {
_tombstone.apply(t);
}
bool is_counter() const {
return false;
}
column_definition* get_column_definition(const bytes& name);
auto regular_begin() {
return regular_columns.begin();
}
auto regular_end() {
return regular_columns.end();
}
auto regular_lower_bound(const bytes& name) {
// TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator
auto i = _regular_columns_by_name.lower_bound(name);
if (i == _regular_columns_by_name.end()) {
return regular_end();
void apply_delete(schema_ptr schema, const clustering_prefix& prefix, tombstone t) {
if (prefix.empty()) {
apply(t);
} else if (prefix.size() == schema->clustering_key.size()) {
_rows[serialize_value(*schema->clustering_key_type, prefix)].t.apply(t);
} else {
return regular_columns.begin() + i->second->id;
apply_row_tombstone(schema, {serialize_value(*schema->clustering_key_prefix_type, prefix), t});
}
}
auto regular_upper_bound(const bytes& name) {
// TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator
auto i = _regular_columns_by_name.upper_bound(name);
if (i == _regular_columns_by_name.end()) {
return regular_end();
} else {
return regular_columns.begin() + i->second->id;
void apply_row_tombstone(schema_ptr schema, bytes prefix, tombstone t) {
apply_row_tombstone(schema, {std::move(prefix), std::move(t)});
}
void apply_row_tombstone(schema_ptr schema, std::pair<bytes, tombstone> row_tombstone) {
auto& prefix = row_tombstone.first;
auto i = _row_tombstones.lower_bound(prefix);
if (i == _row_tombstones.end() || !schema->clustering_key_prefix_type->equal(prefix, i->first)) {
_row_tombstones.emplace_hint(i, std::move(row_tombstone));
} else if (row_tombstone.second > i->second) {
i->second = row_tombstone.second;
}
}
column_id get_regular_columns_count() {
return regular_columns.size();
void apply(schema_ptr schema, mutation_partition&& p);
const row_tombstone_set& row_tombstones() {
return _row_tombstones;
}
data_type column_name_type(column_definition& def) {
return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type;
row& static_row() {
return _static_row;
}
row& clustered_row(const clustering_key& key) {
return _rows[key].cells;
}
row& clustered_row(clustering_key&& key) {
return _rows[std::move(key)].cells;
}
row* find_row(const clustering_key& key) {
auto i = _rows.find(key);
if (i == _rows.end()) {
return nullptr;
}
return &i->second.cells;
}
/**
* Returns the base tombstone for all cells of given clustering row. Such tombstone
* holds all information necessary to decide whether cells in a row are deleted or not,
* in addition to any information inside individual cells.
*/
tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key) {
tombstone t = _tombstone;
auto i = _row_tombstones.lower_bound(key);
if (i != _row_tombstones.end() && schema->clustering_key_prefix_type->is_prefix_of(i->first, key)) {
t.apply(i->second);
}
auto j = _rows.find(key);
if (j != _rows.end()) {
t.apply(j->second.t);
}
return t;
}
};
using schema_ptr = lw_shared_ptr<schema>;
class mutation final {
public:
schema_ptr schema;
partition_key key;
mutation_partition p;
public:
mutation(partition_key key_, schema_ptr schema_)
: schema(std::move(schema_))
, key(std::move(key_))
, p(schema)
{ }
mutation(mutation&&) = default;
mutation(const mutation&) = delete;
void set_static_cell(const column_definition& def, boost::any value) {
p.static_row()[def.id] = std::move(value);
}
void set_clustered_cell(const clustering_prefix& prefix, const column_definition& def, boost::any value) {
auto& row = p.clustered_row(serialize_value(*schema->clustering_key_type, prefix));
row[def.id] = std::move(value);
}
void set_clustered_cell(const clustering_key& key, const column_definition& def, boost::any value) {
auto& row = p.clustered_row(key);
row[def.id] = std::move(value);
}
};
struct column_family {
column_family(schema_ptr schema);
partition& find_or_create_partition(const bytes& key);
mutation_partition& find_or_create_partition(const bytes& key);
row& find_or_create_row(const bytes& partition_key, const bytes& clustering_key);
partition* find_partition(const bytes& key);
mutation_partition* find_partition(const bytes& key);
row* find_row(const bytes& partition_key, const bytes& clustering_key);
schema_ptr _schema;
// partition key -> partition
std::map<bytes, partition, key_compare> partitions;
std::map<bytes, mutation_partition, key_compare> partitions;
void apply(mutation&& m);
};
class keyspace {
@@ -162,6 +281,7 @@ public:
std::unordered_map<sstring, column_family> column_families;
static future<keyspace> populate(sstring datadir);
schema_ptr find_schema(sstring cf_name);
column_family* find_column_family(sstring cf_name);
};
class database {

186
db/api.hh
View File

@@ -1,186 +0,0 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include <experimental/optional>
#include <limits>
#include <boost/variant/variant.hpp>
#include "db_clock.hh"
#include "gc_clock.hh"
#include "database.hh"
#include "db/consistency_level.hh"
namespace api {
using partition_key_type = tuple_type<>;
using clustering_key_type = tuple_type<>;
using clustering_prefix_type = tuple_prefix;
using partition_key = bytes;
using clustering_key = bytes;
using clustering_prefix = clustering_prefix_type::value_type;
using timestamp_type = int64_t;
timestamp_type constexpr missing_timestamp = std::numeric_limits<timestamp_type>::min();
timestamp_type constexpr min_timestamp = std::numeric_limits<timestamp_type>::min() + 1;
timestamp_type constexpr max_timestamp = std::numeric_limits<timestamp_type>::max();
/**
* Represents deletion operation. Can be commuted with other tombstones via apply() method.
* Can be empty.
*
*/
struct tombstone final {
timestamp_type timestamp;
gc_clock::time_point ttl;
tombstone(timestamp_type timestamp, gc_clock::time_point ttl)
: timestamp(timestamp)
, ttl(ttl)
{ }
tombstone()
: tombstone(missing_timestamp, {})
{ }
bool operator<(const tombstone& t) const {
return timestamp < t.timestamp || ttl < t.ttl;
}
bool operator==(const tombstone& t) const {
return timestamp == t.timestamp && ttl == t.ttl;
}
operator bool() const {
return timestamp != missing_timestamp;
}
void apply(const tombstone& t) {
if (*this < t) {
*this = t;
}
}
};
using ttl_opt = std::experimental::optional<gc_clock::time_point>;
class live_atomic_cell final {
private:
timestamp_type _timestamp;
ttl_opt _ttl;
bytes _value;
public:
live_atomic_cell(timestamp_type timestamp, ttl_opt ttl, bytes value)
: _timestamp(timestamp)
, _ttl(ttl)
, _value(value) {
}
};
using atomic_cell = boost::variant<tombstone, live_atomic_cell>;
using row = std::map<column_id, boost::any>;
struct deletable_row final {
tombstone t;
row cells;
};
struct row_tombstone final {
tombstone t;
/*
* Prefix can be shorter than the clustering key size, in which case it
* means that all rows whose keys have that prefix are removed.
*
* Empty prefix removes all rows, which is equivalent to removing the whole partition.
*/
bytes prefix;
};
struct row_tombstone_compare final {
private:
data_type _type;
public:
row_tombstone_compare(data_type type) : _type(type) {}
bool operator()(const row_tombstone& t1, const row_tombstone& t2) {
return _type->less(t1.prefix, t2.prefix);
}
};
class partition final {
private:
schema_ptr _schema;
tombstone _tombstone;
row _static_row;
std::map<clustering_key, deletable_row, key_compare> _rows;
std::set<row_tombstone, row_tombstone_compare> _row_tombstones;
public:
partition(schema_ptr s)
: _schema(std::move(s))
, _rows(key_compare(_schema->clustering_key_type))
, _row_tombstones(row_tombstone_compare(_schema->clustering_key_prefix_type))
{ }
void apply(tombstone t) {
_tombstone.apply(t);
}
void apply_delete(const clustering_prefix& prefix, tombstone t) {
if (prefix.empty()) {
apply(t);
} else if (prefix.size() == _schema->clustering_key.size()) {
_rows[serialize_value(*_schema->clustering_key_type, prefix)].t.apply(t);
} else {
apply(row_tombstone{t, serialize_value(*_schema->clustering_key_prefix_type, prefix)});
}
}
void apply(const row_tombstone& rt) {
auto i = _row_tombstones.lower_bound(rt);
if (i == _row_tombstones.end() || !_schema->clustering_key_prefix_type->equal(rt.prefix, i->prefix) || rt.t > i->t) {
_row_tombstones.insert(i, rt);
}
}
row& static_row() {
return _static_row;
}
row& clustered_row(const clustering_key& key) {
return _rows[key].cells;
}
row& get_row(const clustering_prefix& prefix) {
if (prefix.empty()) {
return static_row();
}
return clustered_row(serialize_value(*_schema->clustering_key_type, prefix));
}
};
class mutation final {
public:
schema_ptr schema;
partition_key key;
partition p;
public:
mutation(partition_key key_, schema_ptr schema_)
: schema(std::move(schema_))
, key(std::move(key_))
, p(schema)
{ }
mutation(mutation&&) = default;
mutation(const mutation&) = delete;
void set_cell(const clustering_prefix& prefix, column_id col, boost::any value) {
p.get_row(prefix)[col] = value;
}
};
}

120
schema.hh Normal file
View File

@@ -0,0 +1,120 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <unordered_map>
#include "cql3/column_specification.hh"
#include "core/shared_ptr.hh"
#include "types.hh"
#include "tuple.hh"
#include "gc_clock.hh"
using column_id = uint32_t;
class column_definition final {
private:
bytes _name;
public:
enum column_kind { PARTITION, CLUSTERING, REGULAR, STATIC };
column_definition(bytes name, data_type type, column_id id, column_kind kind);
data_type type;
column_id id; // unique within (kind, schema instance)
column_kind kind;
::shared_ptr<cql3::column_specification> column_specification;
bool is_static() const { return kind == column_kind::STATIC; }
bool is_partition_key() const { return kind == column_kind::PARTITION; }
bool is_atomic() const { return !type->is_multi_cell(); }
const sstring& name_as_text() const;
const bytes& name() const;
};
struct thrift_schema {
shared_ptr<abstract_type> partition_key_type;
};
/*
* Keep this effectively immutable.
*/
class schema final {
private:
std::unordered_map<bytes, column_definition*> _columns_by_name;
std::map<bytes, column_definition*, serialized_compare> _regular_columns_by_name;
public:
struct column {
bytes name;
data_type type;
struct name_compare {
shared_ptr<abstract_type> type;
name_compare(shared_ptr<abstract_type> type) : type(type) {}
bool operator()(const column& cd1, const column& cd2) const {
return type->less(cd1.name, cd2.name);
}
};
};
private:
void build_columns(std::vector<column> columns, column_definition::column_kind kind, std::vector<column_definition>& dst);
::shared_ptr<cql3::column_specification> make_column_specification(column_definition& def);
public:
gc_clock::duration default_time_to_live = gc_clock::duration::zero();
const sstring ks_name;
const sstring cf_name;
std::vector<column_definition> partition_key;
std::vector<column_definition> clustering_key;
std::vector<column_definition> regular_columns; // sorted by name
shared_ptr<tuple_type<>> partition_key_type;
shared_ptr<tuple_type<>> clustering_key_type;
shared_ptr<tuple_prefix> clustering_key_prefix_type;
data_type regular_column_name_type;
thrift_schema thrift;
public:
schema(sstring ks_name, sstring cf_name,
std::vector<column> partition_key,
std::vector<column> clustering_key,
std::vector<column> regular_columns,
shared_ptr<abstract_type> regular_column_name_type);
bool is_dense() const {
return false;
}
bool is_counter() const {
return false;
}
column_definition* get_column_definition(const bytes& name);
auto regular_begin() {
return regular_columns.begin();
}
auto regular_end() {
return regular_columns.end();
}
auto regular_lower_bound(const bytes& name) {
// TODO: use regular_columns and a version of std::lower_bound() with heterogeneous comparator
auto i = _regular_columns_by_name.lower_bound(name);
if (i == _regular_columns_by_name.end()) {
return regular_end();
} else {
return regular_columns.begin() + i->second->id;
}
}
auto regular_upper_bound(const bytes& name) {
// TODO: use regular_columns and a version of std::upper_bound() with heterogeneous comparator
auto i = _regular_columns_by_name.upper_bound(name);
if (i == _regular_columns_by_name.end()) {
return regular_end();
} else {
return regular_columns.begin() + i->second->id;
}
}
column_id get_regular_columns_count() {
return regular_columns.size();
}
data_type column_name_type(column_definition& def) {
return def.kind == column_definition::REGULAR ? regular_column_name_type : utf8_type;
}
column_definition& regular_column_at(column_id id) {
return regular_columns[id];
}
};
using schema_ptr = lw_shared_ptr<schema>;

View File

@@ -475,8 +475,20 @@ namespace service {
* @param consistency_level the consistency level for the operation
*/
future<>
storage_proxy::mutate(std::vector<api::mutation> mutations, db::consistency_level cl) {
throw std::runtime_error("NOT IMPLEMENTED");
storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
// FIXME: send it to replicas instead of applying locally
for (auto&& m : mutations) {
// FIXME: lookup column_family by UUID
keyspace* ks = _db.find_keyspace(m.schema->ks_name);
assert(ks); // FIXME: load keyspace meta-data from storage
column_family* cf = ks->find_column_family(m.schema->cf_name);
if (cf) {
cf->apply(std::move(m));
} else {
// TODO: log a warning
}
}
return make_ready_future<>();
#if 0
Tracing.trace("Determining replicas for mutation");
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -559,7 +571,7 @@ storage_proxy::mutate(std::vector<api::mutation> mutations, db::consistency_leve
}
future<>
storage_proxy::mutate_with_triggers(std::vector<api::mutation> mutations, db::consistency_level cl,
storage_proxy::mutate_with_triggers(std::vector<mutation> mutations, db::consistency_level cl,
bool should_mutate_atomically) {
unimplemented::triggers();
#if 0
@@ -587,7 +599,7 @@ storage_proxy::mutate_with_triggers(std::vector<api::mutation> mutations, db::co
* @param consistency_level the consistency level for the operation
*/
future<>
storage_proxy::mutate_atomically(std::vector<api::mutation> mutations, db::consistency_level cl) {
storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl) {
unimplemented::lwt();
#if 0
Tracing.trace("Determining replicas for atomic batch");

View File

@@ -24,12 +24,17 @@
#pragma once
#include "db/api.hh"
#include "database.hh"
#include "db/consistency_level.hh"
namespace service {
class storage_proxy /*implements StorageProxyMBean*/ {
private:
database& _db;
public:
storage_proxy(database& db) : _db(db) {}
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
@@ -39,10 +44,10 @@ public:
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
static future<> mutate(std::vector<api::mutation> mutations, db::consistency_level cl);
future<> mutate(std::vector<mutation> mutations, db::consistency_level cl);
static future<> mutate_with_triggers(std::vector<api::mutation> mutations,
db::consistency_level cl, bool should_mutate_atomically);
future<> mutate_with_triggers(std::vector<mutation> mutations, db::consistency_level cl,
bool should_mutate_atomically);
/**
* See mutate. Adds additional steps before and after writing a batch.
@@ -53,7 +58,7 @@ public:
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
static future<> mutate_atomically(std::vector<api::mutation> mutations, db::consistency_level cl);
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl);
};
}

View File

@@ -10,6 +10,8 @@ all_tests = [
'memcached/test_ascii_parser',
'sstring_test',
'output_stream_test',
'urchin/types_test',
'urchin/mutation_test',
]
last_len = 0

30
tests/perf/perf.hh Normal file
View File

@@ -0,0 +1,30 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include <chrono>
#include <iostream>
template<typename Func>
void time_it(Func func, int iterations = 5) {
using clk = std::chrono::high_resolution_clock;
for (int i = 0; i < iterations; i++) {
auto start = clk::now();
auto end_at = start + std::chrono::seconds(1);
uint64_t count = 0;
while (clk::now() < end_at) {
for (int i = 0; i < 10000; i++) { // amortize clock reading cost
func();
count++;
}
}
auto end = clk::now();
auto duration = std::chrono::duration<double>(end - start).count();
std::cout << sprint("%.2f", (double)count / duration) << " tps\n";
}
}

View File

@@ -0,0 +1,30 @@
#include "database.hh"
#include "perf.hh"
static boost::any make_atomic_cell(bytes value) {
return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}};
};
int main(int argc, char* argv[]) {
auto s = make_lw_shared<schema>("ks", "cf",
std::vector<schema::column>({{"p1", utf8_type}}),
std::vector<schema::column>({{"c1", int32_type}}),
std::vector<schema::column>({{"r1", int32_type}}),
utf8_type
);
column_family cf(s);
std::cout << "Timing mutation of single column within one row...\n";
partition_key key = to_bytes("key1");
clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)});
bytes value = int32_type->decompose(3);
time_it([&] {
mutation m(key, s);
column_definition& col = *s->get_column_definition("r1");
m.set_clustered_cell(c_key, col, make_atomic_cell(value));
cf.apply(std::move(m));
});
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2015 Cloudius Systems
*/
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE core
#include <boost/test/included/unit_test.hpp>
#include "core/sstring.hh"
#include "database.hh"
static sstring some_keyspace("ks");
static sstring some_column_family("cf");
static boost::any make_atomic_cell(bytes value) {
return atomic_cell{0, atomic_cell::live{ttl_opt{}, std::move(value)}};
};
BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
auto s = make_lw_shared<schema>(some_keyspace, some_column_family,
std::vector<schema::column>({{"p1", utf8_type}}),
std::vector<schema::column>({{"c1", int32_type}}),
std::vector<schema::column>({{"r1", int32_type}}),
utf8_type
);
column_family cf(s);
column_definition& r1_col = *s->get_column_definition("r1");
partition_key key = to_bytes("key1");
clustering_key c_key = s->clustering_key_type->decompose_value({int32_type->decompose(2)});
mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3)));
cf.apply(std::move(m));
row& row = cf.find_or_create_row(key, c_key);
auto& cell = boost::any_cast<const atomic_cell&>(row[r1_col.id]);
BOOST_REQUIRE(cell.is_live());
BOOST_REQUIRE(int32_type->equal(cell.as_live().value, int32_type->decompose(3)));
}
BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
auto s = make_lw_shared<schema>(some_keyspace, some_column_family,
std::vector<schema::column>({{"p1", utf8_type}}),
std::vector<schema::column>({{"c1", int32_type}}),
std::vector<schema::column>({{"r1", int32_type}}),
utf8_type
);
column_family cf(s);
column_definition& r1_col = *s->get_column_definition("r1");
partition_key key = to_bytes("key1");
clustering_key c_key1 = s->clustering_key_type->decompose_value(
{int32_type->decompose(1)}
);
clustering_key c_key2 = s->clustering_key_type->decompose_value(
{int32_type->decompose(2)}
);
auto ttl = gc_clock::now() + std::chrono::seconds(1);
mutation m(key, s);
m.p.apply_row_tombstone(s, c_key1, tombstone(1, ttl));
m.p.apply_row_tombstone(s, c_key2, tombstone(0, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key1), tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(0, ttl));
m.p.apply_row_tombstone(s, c_key2, tombstone(1, ttl));
BOOST_REQUIRE_EQUAL(m.p.tombstone_for_row(s, c_key2), tombstone(1, ttl));
}

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2015 Cloudius Systems
*/
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE core
#include <boost/test/included/unit_test.hpp>
#include "types.hh"
#include "tuple.hh"
BOOST_AUTO_TEST_CASE(test_int32_type_string_conversions) {
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("1234567890"), int32_type->decompose(1234567890)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(1234567890)), "1234567890");
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("12"), int32_type->decompose(12)));
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0012"), int32_type->decompose(12)));
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+12"), int32_type->decompose(12)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(12)), "12");
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-12"), int32_type->decompose(-12)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(-12)), "-12");
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("0"), int32_type->decompose(0)));
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-0"), int32_type->decompose(0)));
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("+0"), int32_type->decompose(0)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose(0)), "0");
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("-2147483648"), int32_type->decompose((int32_t)-2147483648)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483648)), "-2147483648");
BOOST_REQUIRE(int32_type->equal(int32_type->from_string("2147483647"), int32_type->decompose((int32_t)2147483647)));
BOOST_REQUIRE_EQUAL(int32_type->to_string(int32_type->decompose((int32_t)-2147483647)), "-2147483647");
auto test_parsing_fails = [] (sstring text) {
try {
int32_type->from_string(text);
BOOST_FAIL(sprint("Parsing of '%s' should have failed", text));
} catch (const marshal_exception& e) {
// expected
}
};
test_parsing_fails("asd");
test_parsing_fails("-2147483649");
test_parsing_fails("2147483648");
test_parsing_fails("2147483648123");
BOOST_REQUIRE_EQUAL(int32_type->to_string(bytes()), "");
}
BOOST_AUTO_TEST_CASE(test_tuple_is_prefix_of) {
tuple_type<> type({utf8_type, utf8_type, utf8_type});
auto prefix_type = type.as_prefix();
auto val = type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}});
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({}), val));
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}}), val));
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val));
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes("c")}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes()}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("b")}, {bytes("c")}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("c")}, {bytes("b")}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("abc")}}), val));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("ab")}}), val));
auto val2 = type.serialize_value({{bytes("a")}, {bytes("b")}, {}});
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}}), val2));
BOOST_REQUIRE(prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {}}), val2));
BOOST_REQUIRE(!prefix_type.is_prefix_of(prefix_type.serialize_value({{bytes("a")}, {bytes("b")}, {bytes()}}), val2));
}

View File

@@ -91,12 +91,19 @@ public:
// FIXME: force limit count?
while (beg != end && count--) {
column_definition& def = range.reversed ? *--end : *beg++;
Column col;
col.__set_name(def.name());
col.__set_value(rw->cells[def.id]);
ColumnOrSuperColumn v;
v.__set_column(std::move(col));
ret.push_back(std::move(v));
if (def.is_atomic()) {
const auto& cell = boost::any_cast<atomic_cell&>((*rw)[def.id]);
if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels
Column col;
col.__set_name(def.name());
col.__set_value(cell.as_live().value);
col.__set_timestamp(cell.timestamp);
// FIXME: set ttl
ColumnOrSuperColumn v;
v.__set_column(std::move(col));
ret.push_back(std::move(v));
};
}
}
}
} else {
@@ -184,21 +191,30 @@ public:
sstring cf_name = cf_mutations.first;
const std::vector<Mutation>& mutations = cf_mutations.second;
auto& cf = lookup_column_family(cf_name);
auto& row = cf.find_or_create_row(key, null_clustering_key);
mutation m_to_apply(key, cf._schema);
for (const Mutation& m : mutations) {
if (m.__isset.column_or_supercolumn) {
auto&& cosc = m.column_or_supercolumn;
if (cosc.__isset.column) {
auto&& col = cosc.column;
bytes cname = to_bytes(col.name);
// FIXME: use a lookup map
auto def = cf._schema->get_column_definition(cname);
if (!def) {
throw make_exception<InvalidRequestException>("column %s not found", col.name);
}
auto& cells = row.cells;
cells.resize(cf._schema->get_regular_columns_count());
cells[def->id] = to_bytes(col.value);
if (def->kind != column_definition::column_kind::REGULAR) {
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
}
gc_clock::duration ttl;
if (col.__isset.ttl) {
ttl = std::chrono::duration_cast<gc_clock::duration>(std::chrono::seconds(col.ttl));
}
if (ttl.count() <= 0) {
ttl = cf._schema->default_time_to_live;
}
auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt();
m_to_apply.set_clustered_cell(null_clustering_key, *def,
atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}});
} else if (cosc.__isset.super_column) {
// FIXME: implement
} else if (cosc.__isset.counter_column) {
@@ -215,6 +231,7 @@ public:
throw make_exception<InvalidRequestException>("Mutation must have either column or deletion");
}
}
cf.apply(std::move(m_to_apply));
}
}
} catch (std::exception& ex) {

View File

@@ -18,6 +18,7 @@ private:
const std::vector<shared_ptr<abstract_type>> types;
const bool _byte_order_equal;
public:
using prefix_type = tuple_type<true>;
using value_type = std::vector<bytes_opt>;
tuple_type(std::vector<shared_ptr<abstract_type>> types)
@@ -27,6 +28,11 @@ public:
return t->is_byte_order_equal();
}))
{ }
prefix_type as_prefix() {
return prefix_type(types);
}
/*
* Format:
* <len(value1)><value1><len(value2)><value2>...
@@ -50,6 +56,12 @@ public:
}
}
}
bytes serialize_value(const value_type& values) {
return ::serialize_value(*this, values);
}
bytes decompose_value(const value_type& values) {
return ::serialize_value(*this, values);
}
value_type deserialize_value(std::istream& in) {
std::vector<bytes_opt> result;
result.reserve(types.size());
@@ -150,6 +162,58 @@ public:
// TODO: make the length byte-order comparable by adding numeric_limits<int32_t>::min() when serializing
return false;
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
/**
* Returns true iff all components of 'prefix' are equal to corresponding
* leading components of 'value'.
*
* The 'value' is assumed to be serialized using tuple_type<AllowPrefixes=false>
*/
bool is_prefix_of(const bytes& prefix, const bytes& value) const {
assert(AllowPrefixes);
if (prefix.size() > value.size()) {
return false;
}
bytes_view i1(prefix);
bytes_view i2(value);
for (auto&& type : types) {
if (i1.empty()) {
return true;
}
if (i2.empty()) {
return false;
}
assert(i1.size() >= sizeof(int32_t));
assert(i2.size() >= sizeof(int32_t));
auto len1 = (int32_t) net::ntoh(*reinterpret_cast<const uint32_t*>(i1.begin()));
auto len2 = (int32_t) net::ntoh(*reinterpret_cast<const uint32_t*>(i2.begin()));
i1.remove_prefix(sizeof(int32_t));
i2.remove_prefix(sizeof(int32_t));
if ((len1 < 0) != (len2 < 0)) {
// one is empty and another one is not
return false;
}
if (len1 >= 0) {
// both are not empty
// TODO: make equal() accept bytes_view
if (!type->equal(bytes(i1.begin(), i1.begin() + len1),
bytes(i2.begin(), i2.begin() + len2))) {
return false;
}
i1.remove_prefix((uint32_t) len1);
i2.remove_prefix((uint32_t) len2);
}
}
return true;
}
};
using tuple_prefix = tuple_type<true>;

View File

@@ -2,6 +2,7 @@
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include <boost/lexical_cast.hpp>
#include "types.hh"
template <typename T, typename Compare>
@@ -54,6 +55,33 @@ struct int32_type_impl : simple_type_impl<int32_t> {
auto v = int32_t(net::ntoh(u));
return boost::any(v);
}
int32_t compose_value(const bytes& b) {
if (b.size() != 4) {
throw marshal_exception();
}
return (int32_t)net::ntoh(*reinterpret_cast<const uint32_t*>(b.begin()));
}
bytes decompose_value(int32_t v) {
bytes b(bytes::initialized_later(), sizeof(v));
*reinterpret_cast<int32_t*>(b.begin()) = (int32_t)net::hton((uint32_t)v);
return b;
}
int32_t parse_int(const sstring& s) {
try {
return boost::lexical_cast<int32_t>(s);
} catch (const boost::bad_lexical_cast& e) {
throw marshal_exception(sprint("Invalid number format '%s'", s));
}
}
virtual bytes from_string(const sstring& s) override {
return decompose_value(parse_int(s));
}
virtual sstring to_string(const bytes& b) override {
if (b.empty()) {
return {};
}
return to_sstring(compose_value(b));
}
};
struct long_type_impl : simple_type_impl<int64_t> {
@@ -75,6 +103,12 @@ struct long_type_impl : simple_type_impl<int64_t> {
auto v = int64_t(net::ntoh(u));
return boost::any(v);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
struct string_type_impl : public abstract_type {
@@ -101,6 +135,12 @@ struct string_type_impl : public abstract_type {
virtual size_t hash(const bytes& v) override {
return std::hash<bytes>()(v);
}
virtual bytes from_string(const sstring& s) override {
return to_bytes(s);
}
virtual sstring to_string(const bytes& b) override {
return sstring(b);
}
};
struct bytes_type_impl : public abstract_type {
@@ -126,6 +166,12 @@ struct bytes_type_impl : public abstract_type {
virtual size_t hash(const bytes& v) override {
return std::hash<bytes>()(v);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
struct boolean_type_impl : public simple_type_impl<bool> {
@@ -143,6 +189,12 @@ struct boolean_type_impl : public simple_type_impl<bool> {
}
return boost::any(tmp != 0);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
struct date_type_impl : public abstract_type {
@@ -174,6 +226,12 @@ struct date_type_impl : public abstract_type {
virtual size_t hash(const bytes& v) override {
return std::hash<bytes>()(v);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
struct timeuuid_type_impl : public abstract_type {
@@ -214,6 +272,12 @@ struct timeuuid_type_impl : public abstract_type {
virtual size_t hash(const bytes& v) override {
return std::hash<bytes>()(v);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
private:
static int compare_bytes(const bytes& o1, const bytes& o2) {
auto compare_pos = [&] (unsigned pos, int mask, int ifequal) {
@@ -251,6 +315,12 @@ struct timestamp_type_impl : simple_type_impl<db_clock::time_point> {
return boost::any(db_clock::time_point(db_clock::duration(net::ntoh(v))));
}
// FIXME: isCompatibleWith(timestampuuid)
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
struct uuid_type_impl : abstract_type {
@@ -302,6 +372,12 @@ struct uuid_type_impl : abstract_type {
virtual size_t hash(const bytes& v) override {
return std::hash<bytes>()(v);
}
virtual bytes from_string(const sstring& s) override {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) override {
throw std::runtime_error("not implemented");
}
};
thread_local shared_ptr<abstract_type> int32_type(make_shared<int32_type_impl>());

View File

@@ -18,7 +18,7 @@
// FIXME: should be int8_t
using bytes = basic_sstring<char, uint32_t, 31>;
using bytes_view = std::experimental::string_view;
using bytes_opt = std::experimental::optional<bytes>;
sstring to_hex(const bytes& b);
@@ -112,11 +112,11 @@ public:
validate(b);
return to_string(b);
}
virtual sstring to_string(const bytes& b) {
throw std::runtime_error("not implemented");
}
virtual sstring to_string(const bytes& b) = 0;
virtual bytes from_string(const sstring& text) = 0;
virtual bool is_counter() { return false; }
virtual bool is_collection() { return false; }
virtual bool is_multi_cell() { return false; }
protected:
template <typename T, typename Compare = std::less<T>>
bool default_less(const bytes& b1, const bytes& b2, Compare compare = Compare());

View File

@@ -23,6 +23,7 @@
*/
#include "validation.hh"
#include "exceptions/exceptions.hh"
namespace validation {
@@ -30,7 +31,7 @@ namespace validation {
* Based on org.apache.cassandra.thrift.ThriftValidation#validate_key()
*/
void
validate_cql_key(schema_ptr schema, const api::partition_key& key) {
validate_cql_key(schema_ptr schema, const partition_key& key) {
if (key.empty()) {
throw exceptions::invalid_request_exception("Key may not be empty");
}

View File

@@ -25,13 +25,13 @@
#pragma once
#include "database.hh"
#include "db/api.hh"
#include "database.hh"
namespace validation {
constexpr size_t max_key_size = std::numeric_limits<uint16_t>::max();
void validate_cql_key(schema_ptr schema, const api::partition_key& key);
void validate_cql_key(schema_ptr schema, const partition_key& key);
schema_ptr validate_column_family(database& db, const sstring& keyspace_name, const sstring& cf_name);
}