Merge tag 'avi/writetime/v1' of github.com:cloudius-systems/seastar-dev into db

This commit is contained in:
Avi Kivity
2015-04-07 17:57:26 +03:00
9 changed files with 271 additions and 171 deletions

View File

@@ -30,6 +30,7 @@ options {
}
@parser::includes {
#include "cql3/selection/writetime_or_ttl.hh"
#include "cql3/statements/create_keyspace_statement.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/property_definitions.hh"
@@ -306,10 +307,8 @@ selector returns [shared_ptr<raw_selector> s]
unaliasedSelector returns [shared_ptr<selectable::raw> s]
@init { shared_ptr<selectable::raw> tmp; }
: ( c=cident { tmp = c; }
#if 0
| K_WRITETIME '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL.Raw(c, true); }
| K_TTL '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL.Raw(c, false); }
#endif
| K_WRITETIME '(' c=cident ')' { tmp = make_shared<selectable::writetime_or_ttl::raw>(c, true); }
| K_TTL '(' c=cident ')' { tmp = make_shared<selectable::writetime_or_ttl::raw>(c, false); }
| f=functionName args=selectionFunctionArgs { tmp = ::make_shared<selectable::with_function::raw>(std::move(f), std::move(args)); }
)
#if 0

View File

@@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.utils.ByteBufferUtil;
final class WritetimeOrTTLSelector extends Selector
{
private final String columnName;
private final int idx;
private final boolean isWritetime;
private ByteBuffer current;
public static Factory newFactory(final String columnName, final int idx, final boolean isWritetime)
{
return new Factory()
{
protected String getColumnName()
{
return String.format("%s(%s)", isWritetime ? "writetime" : "ttl", columnName);
}
protected AbstractType<?> getReturnType()
{
return isWritetime ? LongType.instance : Int32Type.instance;
}
public Selector newInstance()
{
return new WritetimeOrTTLSelector(columnName, idx, isWritetime);
}
public boolean isWritetimeSelectorFactory()
{
return isWritetime;
}
public boolean isTTLSelectorFactory()
{
return !isWritetime;
}
};
}
public void addInput(int protocolVersion, ResultSetBuilder rs)
{
if (isWritetime)
{
long ts = rs.timestamps[idx];
current = ts != Long.MIN_VALUE ? ByteBufferUtil.bytes(ts) : null;
}
else
{
int ttl = rs.ttls[idx];
current = ttl > 0 ? ByteBufferUtil.bytes(ttl) : null;
}
}
public ByteBuffer getOutput(int protocolVersion)
{
return current;
}
public void reset()
{
current = null;
}
public AbstractType<?> getType()
{
return isWritetime ? LongType.instance : Int32Type.instance;
}
@Override
public String toString()
{
return columnName;
}
private WritetimeOrTTLSelector(String columnName, int idx, boolean isWritetime)
{
this.columnName = columnName;
this.idx = idx;
this.isWritetime = isWritetime;
}
}

View File

@@ -3,14 +3,46 @@
*/
#include "selectable.hh"
#include "writetime_or_ttl.hh"
#include "selector_factories.hh"
#include "cql3/functions/functions.hh"
#include "abstract_function_selector.hh"
#include "writetime_or_ttl_selector.hh"
namespace cql3 {
namespace selection {
shared_ptr<selector::factory>
selectable::writetime_or_ttl::new_selector_factory(schema_ptr s, std::vector<const column_definition*>& defs) {
auto&& def = s->get_column_definition(_id->name());
if (!def) {
throw exceptions::invalid_request_exception(sprint("Undefined name %s in selection clause", _id));
}
if (def->is_primary_key()) {
throw exceptions::invalid_request_exception(
sprint("Cannot use selection function %s on PRIMARY KEY part %s",
_is_writetime ? "writeTime" : "ttl",
def->name()));
}
if (def->type->is_collection()) {
throw exceptions::invalid_request_exception(sprint("Cannot use selection function %s on collections",
_is_writetime ? "writeTime" : "ttl"));
}
return writetime_or_ttl_selector::new_factory(def->name_as_text(), add_and_get_index(*def, defs), _is_writetime);
}
shared_ptr<selectable>
selectable::writetime_or_ttl::raw::prepare(schema_ptr s) {
return make_shared<writetime_or_ttl>(_id->prepare_column_identifier(s), _is_writetime);
}
bool
selectable::writetime_or_ttl::raw::processes_selection() const {
return true;
}
shared_ptr<selector::factory>
selectable::with_function::new_selector_factory(schema_ptr s, std::vector<const column_definition*>& defs) {
auto&& factories = selector_factories::create_factories_and_collect_column_definitions(_args, s, defs);

View File

@@ -77,65 +77,7 @@ public:
virtual bool processes_selection() const = 0;
};
#if 0
public static class WritetimeOrTTL extends Selectable
{
public final ColumnIdentifier id;
public final boolean isWritetime;
public WritetimeOrTTL(ColumnIdentifier id, boolean isWritetime)
{
this.id = id;
this.isWritetime = isWritetime;
}
@Override
public String toString()
{
return (isWritetime ? "writetime" : "ttl") + "(" + id + ")";
}
public Selector.Factory newSelectorFactory(CFMetaData cfm,
List<ColumnDefinition> defs) throws InvalidRequestException
{
ColumnDefinition def = cfm.getColumnDefinition(id);
if (def == null)
throw new InvalidRequestException(String.format("Undefined name %s in selection clause", id));
if (def.isPrimaryKeyColumn())
throw new InvalidRequestException(
String.format("Cannot use selection function %s on PRIMARY KEY part %s",
isWritetime ? "writeTime" : "ttl",
def.name));
if (def.type.isCollection())
throw new InvalidRequestException(String.format("Cannot use selection function %s on collections",
isWritetime ? "writeTime" : "ttl"));
return WritetimeOrTTLSelector.newFactory(def.name.toString(), addAndGetIndex(def, defs), isWritetime);
}
public static class Raw implements Selectable.Raw
{
private final ColumnIdentifier.Raw id;
private final boolean isWritetime;
public Raw(ColumnIdentifier.Raw id, boolean isWritetime)
{
this.id = id;
this.isWritetime = isWritetime;
}
public WritetimeOrTTL prepare(CFMetaData cfm)
{
return new WritetimeOrTTL(id.prepare(cfm), isWritetime);
}
public boolean processesSelection()
{
return true;
}
}
}
#endif
class writetime_or_ttl;
class with_function;

View File

@@ -288,6 +288,13 @@ public:
return std::move(_result_set);
}
api::timestamp_type timestamp_of(size_t idx) {
return _timestamps[idx];
}
int32_t ttl_of(size_t idx) {
return _ttls[idx];
}
private:
bytes_opt get_value(data_type t, atomic_cell_view c) {
if (c.is_dead()) {

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "selectable.hh"
#include "cql3/column_identifier.hh"
namespace cql3 {
namespace selection {
class selectable::writetime_or_ttl : public selectable {
public:
shared_ptr<column_identifier> _id;
bool _is_writetime;
writetime_or_ttl(shared_ptr<column_identifier> id, bool is_writetime)
: _id(std::move(id)), _is_writetime(is_writetime) {
}
#if 0
@Override
public String toString()
{
return (isWritetime ? "writetime" : "ttl") + "(" + id + ")";
}
#endif
virtual shared_ptr<selector::factory> new_selector_factory(schema_ptr s, std::vector<const column_definition*>& defs) override;
class raw : public selectable::raw {
shared_ptr<column_identifier::raw> _id;
bool _is_writetime;
public:
raw(shared_ptr<column_identifier::raw> id, bool is_writetime)
: _id(std::move(id)), _is_writetime(is_writetime) {
}
virtual shared_ptr<selectable> prepare(schema_ptr s) override;
virtual bool processes_selection() const override;
};
};
}
}

View File

@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Modified by Cloudius Systems
*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include "selector.hh"
#include "selection.hh"
namespace cql3 {
namespace selection {
class writetime_or_ttl_selector : public selector {
sstring _column_name;
int _idx;
bool _is_writetime;
bytes_opt _current;
public:
static shared_ptr<selector::factory> new_factory(sstring column_name, int idx, bool is_writetime) {
class wtots_factory : public selector::factory {
sstring _column_name;
int _idx;
bool _is_writetime;
public:
wtots_factory(sstring column_name, int idx, bool is_writetime)
: _column_name(std::move(column_name)), _idx(idx), _is_writetime(is_writetime) {
}
virtual sstring column_name() override {
return sprint("%s(%s)", _is_writetime ? "writetime" : "ttl", _column_name);
}
virtual data_type get_return_type() override {
return _is_writetime ? long_type : int32_type;
}
virtual shared_ptr<selector> new_instance() override {
return make_shared<writetime_or_ttl_selector>(_column_name, _idx, _is_writetime);
}
virtual bool is_write_time_selector_factory() override {
return _is_writetime;
}
virtual bool is_ttl_selector_factory() override {
return !_is_writetime;
}
};
return make_shared<wtots_factory>(std::move(column_name), idx, is_writetime);
}
virtual void add_input(serialization_format sf, result_set_builder& rs) override {
if (_is_writetime) {
int64_t ts = rs.timestamp_of(_idx);
if (ts != api::missing_timestamp) {
_current = bytes(bytes::initialized_later(), 8);
auto i = _current->begin();
serialize_int64(i, ts);
} else {
_current = std::experimental::nullopt;
}
} else {
int ttl = rs.ttl_of(_idx);
if (ttl > 0) {
_current = bytes(bytes::initialized_later(), 4);
auto i = _current->begin();
serialize_int32(i, ttl);
} else {
_current = std::experimental::nullopt;
}
}
}
virtual bytes_opt get_output(serialization_format sf) override {
return _current;
}
virtual void reset() override {
_current = std::experimental::nullopt;
}
virtual data_type get_type() override {
return _is_writetime ? long_type : int32_type;
}
virtual sstring assignment_testable_source_context() const override {
return _column_name;
}
#if 0
@Override
public String toString()
{
return columnName;
}
#endif
writetime_or_ttl_selector(sstring column_name, int idx, bool is_writetime)
: _column_name(std::move(column_name)), _idx(idx), _is_writetime(is_writetime) {
}
};
}
}

View File

@@ -667,3 +667,33 @@ SEASTAR_TEST_CASE(test_functions) {
});
});
}
static const api::timestamp_type the_timestamp = 123456789;
SEASTAR_TEST_CASE(test_writetime_and_ttl) {
return do_with_cql_env([] (auto&& e) {
return e.create_table([](auto ks_name) {
// CQL: create table cf (p1 varchar primary key, u uuid, tu timeuuid);
return schema(ks_name, "cf",
{{"p1", utf8_type}}, {}, {{"i", int32_type}}, {}, utf8_type);
}).then([&e] {
auto qo = std::make_unique<cql3::default_query_options>(
db::consistency_level::ONE,
std::vector<bytes_opt>(),
false,
cql3::query_options::specific_options{100, make_shared<service::pager::paging_state>(),
{db::consistency_level::ONE}, the_timestamp},
3,
serialization_format::use_32_bit()
);
return e.execute_cql("insert into cf (p1, i) values ('key1', 1);", std::move(qo)).discard_result();
}).then([&e] {
return e.execute_cql("select writetime(i) from cf where p1 in ('key1');");
}).then([&e] (shared_ptr<transport::messages::result_message> msg) {
assert_that(msg).is_rows()
.with_size(1)
.with_row({
{long_type->decompose(int64_t(the_timestamp))},
});
});
});
}

View File

@@ -47,6 +47,12 @@ public:
return _qp->local().process(text, *qs, cql3::query_options::DEFAULT).finally([qs] {});
}
auto execute_cql(const sstring& text, std::unique_ptr<cql3::query_options> qo) {
auto qs = make_query_state();
auto& lqo = *qo;
return _qp->local().process(text, *qs, lqo).finally([qs, qo = std::move(qo)] {});
}
future<bytes> prepare(sstring query) {
return _qp->invoke_on_all([query, this] (auto& local_qp) {
auto qs = this->make_query_state();