mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-25 11:00:35 +00:00
Merge branch 'tgrabiec/cql3' of github.com:cloudius-systems/seastar-dev into db
Wire up query_processor into the CQL server, from Tomasz.
This commit is contained in:
@@ -263,6 +263,7 @@ urchin_core = (['database.cc',
|
||||
'utils/UUID_gen.cc',
|
||||
'gms/version_generator.cc',
|
||||
'dht/dht.cc',
|
||||
'unimplemented.cc',
|
||||
]
|
||||
+ [Antlr3Grammar('cql3/Cql.g')]
|
||||
+ [Thrift('interface/cassandra.thrift', 'Cassandra')]
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "service/query_state.hh"
|
||||
#include "service/pager/paging_state.hh"
|
||||
#include "cql3/column_specification.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -44,7 +45,7 @@ public:
|
||||
struct specific_options final {
|
||||
static const specific_options DEFAULT;
|
||||
|
||||
const int page_size;
|
||||
const int32_t page_size;
|
||||
const ::shared_ptr<service::pager::paging_state> state;
|
||||
const std::experimental::optional<db::consistency_level> serial_consistency;
|
||||
const api::timestamp_type timestamp;
|
||||
@@ -120,169 +121,12 @@ public:
|
||||
// Mainly for the sake of BatchQueryOptions
|
||||
virtual const specific_options& get_specific_options() const = 0;
|
||||
|
||||
query_options& prepare(const std::vector<::shared_ptr<column_specification>>& specs) {
|
||||
return *this;
|
||||
virtual void prepare(const std::vector<::shared_ptr<column_specification>>& specs) {
|
||||
}
|
||||
|
||||
#if 0
|
||||
static abstract class QueryOptionsWrapper extends QueryOptions
|
||||
{
|
||||
protected final QueryOptions wrapped;
|
||||
|
||||
QueryOptionsWrapper(QueryOptions wrapped)
|
||||
{
|
||||
this.wrapped = wrapped;
|
||||
}
|
||||
|
||||
public ConsistencyLevel getConsistency()
|
||||
{
|
||||
return wrapped.getConsistency();
|
||||
}
|
||||
|
||||
public boolean skipMetadata()
|
||||
{
|
||||
return wrapped.skipMetadata();
|
||||
}
|
||||
|
||||
public int getProtocolVersion()
|
||||
{
|
||||
return wrapped.getProtocolVersion();
|
||||
}
|
||||
|
||||
SpecificOptions getSpecificOptions()
|
||||
{
|
||||
return wrapped.getSpecificOptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryOptions prepare(List<ColumnSpecification> specs)
|
||||
{
|
||||
wrapped.prepare(specs);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
static class OptionsWithNames extends QueryOptionsWrapper
|
||||
{
|
||||
private final List<String> names;
|
||||
private List<ByteBuffer> orderedValues;
|
||||
|
||||
OptionsWithNames(DefaultQueryOptions wrapped, List<String> names)
|
||||
{
|
||||
super(wrapped);
|
||||
this.names = names;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryOptions prepare(List<ColumnSpecification> specs)
|
||||
{
|
||||
super.prepare(specs);
|
||||
|
||||
orderedValues = new ArrayList<ByteBuffer>(specs.size());
|
||||
for (int i = 0; i < specs.size(); i++)
|
||||
{
|
||||
String name = specs.get(i).name.toString();
|
||||
for (int j = 0; j < names.size(); j++)
|
||||
{
|
||||
if (name.equals(names.get(j)))
|
||||
{
|
||||
orderedValues.add(wrapped.getValues().get(j));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public List<ByteBuffer> getValues()
|
||||
{
|
||||
assert orderedValues != null; // We should have called prepare first!
|
||||
return orderedValues;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Codec implements CBCodec<QueryOptions>
|
||||
{
|
||||
private static enum Flag
|
||||
{
|
||||
// The order of that enum matters!!
|
||||
VALUES,
|
||||
SKIP_METADATA,
|
||||
PAGE_SIZE,
|
||||
PAGING_STATE,
|
||||
SERIAL_CONSISTENCY,
|
||||
TIMESTAMP,
|
||||
NAMES_FOR_VALUES;
|
||||
|
||||
private static final Flag[] ALL_VALUES = values();
|
||||
|
||||
public static EnumSet<Flag> deserialize(int flags)
|
||||
{
|
||||
EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
|
||||
for (int n = 0; n < ALL_VALUES.length; n++)
|
||||
{
|
||||
if ((flags & (1 << n)) != 0)
|
||||
set.add(ALL_VALUES[n]);
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
public static int serialize(EnumSet<Flag> flags)
|
||||
{
|
||||
int i = 0;
|
||||
for (Flag flag : flags)
|
||||
i |= 1 << flag.ordinal();
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
public QueryOptions decode(ByteBuf body, int version)
|
||||
{
|
||||
assert version >= 2;
|
||||
|
||||
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
|
||||
EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
|
||||
|
||||
List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
|
||||
List<String> names = null;
|
||||
if (flags.contains(Flag.VALUES))
|
||||
{
|
||||
if (flags.contains(Flag.NAMES_FOR_VALUES))
|
||||
{
|
||||
Pair<List<String>, List<ByteBuffer>> namesAndValues = CBUtil.readNameAndValueList(body);
|
||||
names = namesAndValues.left;
|
||||
values = namesAndValues.right;
|
||||
}
|
||||
else
|
||||
{
|
||||
values = CBUtil.readValueList(body);
|
||||
}
|
||||
}
|
||||
|
||||
boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
|
||||
flags.remove(Flag.VALUES);
|
||||
flags.remove(Flag.SKIP_METADATA);
|
||||
|
||||
SpecificOptions options = SpecificOptions.DEFAULT;
|
||||
if (!flags.isEmpty())
|
||||
{
|
||||
int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
|
||||
PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
|
||||
ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
|
||||
long timestamp = Long.MIN_VALUE;
|
||||
if (flags.contains(Flag.TIMESTAMP))
|
||||
{
|
||||
long ts = body.readLong();
|
||||
if (ts == Long.MIN_VALUE)
|
||||
throw new ProtocolException(String.format("Out of bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1, Long.MAX_VALUE, ts));
|
||||
timestamp = ts;
|
||||
}
|
||||
|
||||
options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
|
||||
}
|
||||
DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
|
||||
return names == null ? opts : new OptionsWithNames(opts, names);
|
||||
}
|
||||
|
||||
public void encode(QueryOptions options, ByteBuf dest, int version)
|
||||
{
|
||||
@@ -362,7 +206,7 @@ private:
|
||||
const int32_t _protocol_version; // transient
|
||||
public:
|
||||
default_query_options(db::consistency_level consistency, std::vector<bytes_opt> values, bool skip_metadata, specific_options options,
|
||||
int protocol_version)
|
||||
int32_t protocol_version)
|
||||
: _consistency(consistency)
|
||||
, _values(std::move(values))
|
||||
, _skip_metadata(skip_metadata)
|
||||
@@ -386,6 +230,69 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class query_options_wrapper : public query_options {
|
||||
protected:
|
||||
std::unique_ptr<query_options> _wrapped;
|
||||
public:
|
||||
query_options_wrapper(std::unique_ptr<query_options> wrapped) : _wrapped(std::move(wrapped)) {}
|
||||
|
||||
virtual db::consistency_level get_consistency() const override {
|
||||
return _wrapped->get_consistency();
|
||||
}
|
||||
|
||||
virtual const std::vector<bytes_opt>& get_values() const override {
|
||||
return _wrapped->get_values();
|
||||
}
|
||||
|
||||
virtual bool skip_metadata() const override {
|
||||
return _wrapped->skip_metadata();
|
||||
}
|
||||
|
||||
virtual int get_protocol_version() const override {
|
||||
return _wrapped->get_protocol_version();
|
||||
}
|
||||
|
||||
virtual const specific_options& get_specific_options() const override {
|
||||
return _wrapped->get_specific_options();
|
||||
}
|
||||
|
||||
virtual void prepare(const std::vector<::shared_ptr<column_specification>>& specs) override {
|
||||
_wrapped->prepare(specs);
|
||||
}
|
||||
};
|
||||
|
||||
class options_with_names : public query_options_wrapper {
|
||||
private:
|
||||
std::vector<sstring> _names;
|
||||
std::vector<bytes_opt> _ordered_values;
|
||||
public:
|
||||
options_with_names(std::unique_ptr<query_options> wrapped, std::vector<sstring> names)
|
||||
: query_options_wrapper(std::move(wrapped))
|
||||
, _names(std::move(names))
|
||||
{ }
|
||||
|
||||
void prepare(const std::vector<::shared_ptr<column_specification>>& specs) override {
|
||||
query_options::prepare(specs);
|
||||
|
||||
_ordered_values.resize(specs.size());
|
||||
auto& wrapped_values = _wrapped->get_values();
|
||||
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < _names.size(); j++) {
|
||||
if (_names[j] == spec_name) {
|
||||
_ordered_values.emplace_back(wrapped_values[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual const std::vector<bytes_opt>& get_values() const override {
|
||||
return _ordered_values;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <experimental/string_view>
|
||||
|
||||
#include "core/shared_ptr.hh"
|
||||
|
||||
@@ -265,7 +265,7 @@ column_family::apply(mutation&& m) {
|
||||
// Based on org.apache.cassandra.db.AbstractCell#reconcile()
|
||||
static inline
|
||||
int
|
||||
compare_for_merge(const column_definition& def, const atomic_cell& left, const atomic_cell& right) {
|
||||
compare_for_merge(const atomic_cell& left, const atomic_cell& right) {
|
||||
if (left.timestamp != right.timestamp) {
|
||||
return left.timestamp > right.timestamp ? 1 : -1;
|
||||
}
|
||||
@@ -273,7 +273,7 @@ compare_for_merge(const column_definition& def, const atomic_cell& left, const a
|
||||
return left.is_live() ? -1 : 1;
|
||||
}
|
||||
if (left.is_live()) {
|
||||
return def.type->compare(left.as_live().value, right.as_live().value);
|
||||
return compare_unsigned(left.as_live().value, right.as_live().value);
|
||||
} else {
|
||||
auto& c1 = left.as_dead();
|
||||
auto& c2 = right.as_dead();
|
||||
@@ -291,7 +291,7 @@ compare_for_merge(const column_definition& def,
|
||||
const std::pair<column_id, boost::any>& left,
|
||||
const std::pair<column_id, boost::any>& right) {
|
||||
if (def.is_atomic()) {
|
||||
return compare_for_merge(def, boost::any_cast<const atomic_cell&>(left.second),
|
||||
return compare_for_merge(boost::any_cast<const atomic_cell&>(left.second),
|
||||
boost::any_cast<const atomic_cell&>(right.second));
|
||||
} else {
|
||||
throw std::runtime_error("not implemented");
|
||||
|
||||
32
enum_set.hh
32
enum_set.hh
@@ -84,9 +84,17 @@ struct super_enum {
|
||||
};
|
||||
|
||||
template<typename Enum>
|
||||
struct enum_set {
|
||||
class enum_set {
|
||||
public:
|
||||
using mask_type = size_t; // TODO: use the smallest sufficient type
|
||||
using enum_type = typename Enum::enum_type;
|
||||
private:
|
||||
mask_type _mask;
|
||||
constexpr enum_set(mask_type mask) : _mask(mask) {}
|
||||
public:
|
||||
static constexpr enum_set from_mask(mask_type mask) {
|
||||
return enum_set(mask);
|
||||
}
|
||||
|
||||
static inline mask_type mask_for(enum_type e) {
|
||||
return mask_type(1) << Enum::sequence_for(e);
|
||||
@@ -116,6 +124,28 @@ struct enum_set {
|
||||
|
||||
static_assert(std::numeric_limits<mask_type>::max() >= ((size_t)1 << Enum::max_sequence), "mask type too small");
|
||||
|
||||
template<enum_type e>
|
||||
bool contains() const {
|
||||
return bool(_mask & mask_for<e>());
|
||||
}
|
||||
|
||||
bool contains(enum_type e) const {
|
||||
return bool(_mask & mask_for(e));
|
||||
}
|
||||
|
||||
template<enum_type e>
|
||||
void remove() {
|
||||
_mask &= ~mask_for<e>();
|
||||
}
|
||||
|
||||
void remove(enum_type e) {
|
||||
_mask &= ~mask_for(e);
|
||||
}
|
||||
|
||||
explicit operator bool() const {
|
||||
return bool(_mask);
|
||||
}
|
||||
|
||||
template<enum_type... items>
|
||||
struct frozen {
|
||||
template<enum_type first>
|
||||
|
||||
@@ -51,7 +51,7 @@ public:
|
||||
{ }
|
||||
};
|
||||
|
||||
enum exception_code {
|
||||
enum class exception_code : int32_t {
|
||||
SERVER_ERROR = 0x0000,
|
||||
PROTOCOL_ERROR = 0x000A,
|
||||
|
||||
|
||||
@@ -78,58 +78,49 @@ private:
|
||||
}
|
||||
cqlQueryHandler = handler;
|
||||
}
|
||||
#endif
|
||||
|
||||
public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
|
||||
// isInternal is used to mark ClientState as used by some internal component
|
||||
// that should have an ability to modify system keyspace.
|
||||
public final boolean isInternal;
|
||||
|
||||
// The remote address of the client - null for internal clients.
|
||||
private final SocketAddress remoteAddress;
|
||||
#endif
|
||||
const bool _is_internal;
|
||||
|
||||
// The biggest timestamp that was returned by getTimestamp/assigned to a query
|
||||
api::timestamp_type _last_timestamp_micros = 0;
|
||||
|
||||
// Note: Origin passes here a RemoteAddress parameter, but it doesn't seem to be used
|
||||
// anywhere so I didn't bother converting it.
|
||||
client_state(external_tag) : _is_internal(false) {
|
||||
unimplemented::auth();
|
||||
#if 0
|
||||
/**
|
||||
* Construct a new, empty ClientState for internal calls.
|
||||
*/
|
||||
private ClientState()
|
||||
{
|
||||
this.isInternal = true;
|
||||
this.remoteAddress = null;
|
||||
if (!DatabaseDescriptor.getAuthenticator().requireAuthentication())
|
||||
this.user = AuthenticatedUser.ANONYMOUS_USER;
|
||||
#endif
|
||||
}
|
||||
|
||||
protected ClientState(SocketAddress remoteAddress)
|
||||
{
|
||||
this.isInternal = false;
|
||||
this.remoteAddress = remoteAddress;
|
||||
if (!DatabaseDescriptor.getAuthenticator().requireAuthentication())
|
||||
this.user = AuthenticatedUser.ANONYMOUS_USER;
|
||||
}
|
||||
client_state(internal_tag) : _is_internal(true) {}
|
||||
|
||||
/**
|
||||
* @return a ClientState object for internal C* calls (not limited by any kind of auth).
|
||||
*/
|
||||
public static ClientState forInternalCalls()
|
||||
{
|
||||
return new ClientState();
|
||||
static client_state for_internal_calls() {
|
||||
return client_state(internal_tag());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a ClientState object for external clients (thrift/native protocol users).
|
||||
*/
|
||||
public static ClientState forExternalCalls(SocketAddress remoteAddress)
|
||||
{
|
||||
return new ClientState(remoteAddress);
|
||||
static client_state for_external_calls() {
|
||||
return client_state(external_tag());
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* This clock guarantees that updates for the same ClientState will be ordered
|
||||
* in the sequence seen, even if multiple updates happen in the same millisecond.
|
||||
*/
|
||||
public:
|
||||
api::timestamp_type get_timestamp() {
|
||||
auto current = db_clock::now().time_since_epoch().count() * 1000;
|
||||
auto last = _last_timestamp_micros;
|
||||
|
||||
@@ -18,6 +18,7 @@ struct conversation_state {
|
||||
conversation_state(database& db, const sstring& ks_name)
|
||||
: proxy(db)
|
||||
, qp(proxy, db)
|
||||
, client_state(service::client_state::for_internal_calls())
|
||||
, query_state(client_state)
|
||||
, options(cql3::query_options::DEFAULT)
|
||||
{
|
||||
|
||||
45
transport/protocol_exception.hh
Normal file
45
transport/protocol_exception.hh
Normal file
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*
|
||||
* Modified by Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace transport {
|
||||
|
||||
class protocol_exception : public std::exception, public exceptions::transport_exception {
|
||||
private:
|
||||
exceptions::exception_code _code;
|
||||
sstring _msg;
|
||||
public:
|
||||
protocol_exception(sstring msg)
|
||||
: _code(exceptions::exception_code::PROTOCOL_ERROR)
|
||||
, _msg(std::move(msg))
|
||||
{ }
|
||||
virtual const char* what() const noexcept override { return _msg.begin(); }
|
||||
virtual exceptions::exception_code code() const override { return _code; }
|
||||
virtual sstring get_message() const override { return _msg; }
|
||||
};
|
||||
|
||||
}
|
||||
@@ -11,7 +11,10 @@
|
||||
#include "database.hh"
|
||||
#include "net/byteorder.hh"
|
||||
|
||||
#include "cql3/CqlParser.hpp"
|
||||
#include "enum_set.hh"
|
||||
#include "service/query_state.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "transport/protocol_exception.hh"
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
@@ -128,18 +131,30 @@ inline int16_t consistency_to_wire(db::consistency_level c)
|
||||
}
|
||||
}
|
||||
|
||||
struct cql_query_state {
|
||||
service::query_state query_state;
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
|
||||
cql_query_state(service::client_state& client_state)
|
||||
: query_state(client_state)
|
||||
{ }
|
||||
};
|
||||
|
||||
class cql_server::connection {
|
||||
cql_server& _server;
|
||||
connected_socket _fd;
|
||||
input_stream<char> _read_buf;
|
||||
output_stream<char> _write_buf;
|
||||
uint8_t _version = 0;
|
||||
service::client_state _client_state;
|
||||
std::unordered_map<uint16_t, cql_query_state> _query_states;
|
||||
public:
|
||||
connection(cql_server& server, connected_socket&& fd, socket_address addr)
|
||||
: _server(server)
|
||||
, _fd(std::move(fd))
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
, _client_state(service::client_state::for_external_calls())
|
||||
{ }
|
||||
future<> process() {
|
||||
return do_until([this] { return _read_buf.eof(); }, [this] { return process_request(); });
|
||||
@@ -163,14 +178,27 @@ private:
|
||||
future<> write_supported(int16_t stream);
|
||||
future<> write_response(shared_ptr<cql_server::response> response);
|
||||
|
||||
void check_room(temporary_buffer<char>& buf, size_t n) {
|
||||
if (buf.size() < n) {
|
||||
throw transport::protocol_exception("truncated frame");
|
||||
}
|
||||
}
|
||||
|
||||
int8_t read_byte(temporary_buffer<char>& buf);
|
||||
int32_t read_int(temporary_buffer<char>& buf);
|
||||
int64_t read_long(temporary_buffer<char>& buf);
|
||||
int16_t read_short(temporary_buffer<char>& buf);
|
||||
uint16_t read_unsigned_short(temporary_buffer<char>& buf);
|
||||
sstring read_string(temporary_buffer<char>& buf);
|
||||
sstring read_long_string(temporary_buffer<char>& buf);
|
||||
bytes_opt read_value(temporary_buffer<char>& buf);
|
||||
sstring_view read_long_string_view(temporary_buffer<char>& buf);
|
||||
void read_name_and_value_list(temporary_buffer<char>& buf, std::vector<sstring>& names, std::vector<bytes_opt>& values);
|
||||
void read_value_list(temporary_buffer<char>& buf, std::vector<bytes_opt>& values);
|
||||
db::consistency_level read_consistency(temporary_buffer<char>& buf);
|
||||
std::unordered_map<sstring, sstring> read_string_map(temporary_buffer<char>& buf);
|
||||
std::unique_ptr<cql3::query_options> read_options(temporary_buffer<char>& buf);
|
||||
|
||||
cql_query_state& get_query_state(uint16_t stream);
|
||||
};
|
||||
|
||||
class cql_server::response {
|
||||
@@ -203,6 +231,8 @@ private:
|
||||
};
|
||||
|
||||
cql_server::cql_server(database& db)
|
||||
: _proxy(db)
|
||||
, _query_processor(_proxy, db)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -354,21 +384,28 @@ future<> cql_server::connection::process_options(uint16_t stream, temporary_buff
|
||||
return write_supported(stream);
|
||||
}
|
||||
|
||||
cql_query_state& cql_server::connection::get_query_state(uint16_t stream)
|
||||
{
|
||||
auto i = _query_states.find(stream);
|
||||
if (i == _query_states.end()) {
|
||||
i = _query_states.emplace(stream, _client_state).first;
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_query(uint16_t stream, temporary_buffer<char> buf)
|
||||
{
|
||||
auto query = read_long_string(buf);
|
||||
auto query = read_long_string_view(buf);
|
||||
#if 0
|
||||
auto consistency = read_consistency(buf);
|
||||
auto flags = read_byte(buf);
|
||||
#endif
|
||||
print("processing query: '%s' ...\n", query);
|
||||
cql3_parser::CqlLexer::InputStreamType input{reinterpret_cast<const ANTLR_UINT8*>(query.begin()), ANTLR_ENC_UTF8, static_cast<ANTLR_UINT32>(query.size()), nullptr};
|
||||
cql3_parser::CqlLexer lexer{&input};
|
||||
cql3_parser::CqlParser::TokenStreamType tstream(ANTLR_SIZE_HINT, lexer.get_tokSource());
|
||||
cql3_parser::CqlParser parser{&tstream};
|
||||
auto stmt = parser.query();
|
||||
assert(stmt != nullptr);
|
||||
return make_ready_future<>();
|
||||
auto& q_state = get_query_state(stream);
|
||||
q_state.options = read_options(buf);
|
||||
return _server._query_processor.process(query, q_state.query_state, *q_state.options).then([] (auto msg) {
|
||||
// TODO: respond
|
||||
});
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_prepare(uint16_t stream, temporary_buffer<char> buf)
|
||||
@@ -430,6 +467,7 @@ future<> cql_server::connection::write_response(shared_ptr<cql_server::response>
|
||||
|
||||
int8_t cql_server::connection::read_byte(temporary_buffer<char>& buf)
|
||||
{
|
||||
check_room(buf, 1);
|
||||
int8_t n = buf[0];
|
||||
buf.trim_front(1);
|
||||
return n;
|
||||
@@ -437,6 +475,7 @@ int8_t cql_server::connection::read_byte(temporary_buffer<char>& buf)
|
||||
|
||||
int32_t cql_server::connection::read_int(temporary_buffer<char>& buf)
|
||||
{
|
||||
check_room(buf, sizeof(int32_t));
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint32_t n = (static_cast<uint32_t>(p[0]) << 24)
|
||||
| (static_cast<uint32_t>(p[1]) << 16)
|
||||
@@ -448,6 +487,7 @@ int32_t cql_server::connection::read_int(temporary_buffer<char>& buf)
|
||||
|
||||
int64_t cql_server::connection::read_long(temporary_buffer<char>& buf)
|
||||
{
|
||||
check_room(buf, sizeof(int64_t));
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint64_t n = (static_cast<uint64_t>(p[0]) << 56)
|
||||
| (static_cast<uint64_t>(p[1]) << 48)
|
||||
@@ -463,6 +503,12 @@ int64_t cql_server::connection::read_long(temporary_buffer<char>& buf)
|
||||
|
||||
int16_t cql_server::connection::read_short(temporary_buffer<char>& buf)
|
||||
{
|
||||
return static_cast<int16_t>(read_unsigned_short(buf));
|
||||
}
|
||||
|
||||
uint16_t cql_server::connection::read_unsigned_short(temporary_buffer<char>& buf)
|
||||
{
|
||||
check_room(buf, sizeof(uint16_t));
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint16_t n = (static_cast<uint16_t>(p[0]) << 8)
|
||||
| (static_cast<uint16_t>(p[1]));
|
||||
@@ -473,16 +519,18 @@ int16_t cql_server::connection::read_short(temporary_buffer<char>& buf)
|
||||
sstring cql_server::connection::read_string(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto n = read_short(buf);
|
||||
check_room(buf, n);
|
||||
sstring s{buf.begin(), static_cast<size_t>(n)};
|
||||
assert(n >= 0);
|
||||
buf.trim_front(n);
|
||||
return s;
|
||||
}
|
||||
|
||||
sstring cql_server::connection::read_long_string(temporary_buffer<char>& buf)
|
||||
sstring_view cql_server::connection::read_long_string_view(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto n = read_int(buf);
|
||||
sstring s{buf.begin(), static_cast<size_t>(n)};
|
||||
check_room(buf, n);
|
||||
sstring_view s{buf.begin(), static_cast<size_t>(n)};
|
||||
buf.trim_front(n);
|
||||
return s;
|
||||
}
|
||||
@@ -506,6 +554,120 @@ std::unordered_map<sstring, sstring> cql_server::connection::read_string_map(tem
|
||||
return string_map;
|
||||
}
|
||||
|
||||
enum class options_flag {
|
||||
VALUES,
|
||||
SKIP_METADATA,
|
||||
PAGE_SIZE,
|
||||
PAGING_STATE,
|
||||
SERIAL_CONSISTENCY,
|
||||
TIMESTAMP,
|
||||
NAMES_FOR_VALUES
|
||||
};
|
||||
|
||||
using options_flag_enum = super_enum<options_flag,
|
||||
options_flag::VALUES,
|
||||
options_flag::SKIP_METADATA,
|
||||
options_flag::PAGE_SIZE,
|
||||
options_flag::PAGING_STATE,
|
||||
options_flag::SERIAL_CONSISTENCY,
|
||||
options_flag::TIMESTAMP,
|
||||
options_flag::NAMES_FOR_VALUES
|
||||
>;
|
||||
|
||||
std::unique_ptr<cql3::query_options> cql_server::connection::read_options(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto consistency = read_consistency(buf);
|
||||
if (_version == 1) {
|
||||
return std::make_unique<cql3::default_query_options>(consistency, std::vector<bytes_opt>{},
|
||||
false, cql3::query_options::specific_options::DEFAULT, 1);
|
||||
}
|
||||
|
||||
assert(_version >= 2);
|
||||
|
||||
auto flags = enum_set<options_flag_enum>::from_mask(read_byte(buf));
|
||||
std::vector<bytes_opt> values;
|
||||
std::vector<sstring> names;
|
||||
|
||||
if (flags.contains<options_flag::VALUES>()) {
|
||||
if (flags.contains<options_flag::NAMES_FOR_VALUES>()) {
|
||||
read_name_and_value_list(buf, names, values);
|
||||
} else {
|
||||
read_value_list(buf, values);
|
||||
}
|
||||
}
|
||||
|
||||
bool skip_metadata = flags.contains<options_flag::SKIP_METADATA>();
|
||||
flags.remove<options_flag::VALUES>();
|
||||
flags.remove<options_flag::SKIP_METADATA>();
|
||||
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
if (flags) {
|
||||
::shared_ptr<service::pager::paging_state> paging_state;
|
||||
int32_t page_size = flags.contains<options_flag::PAGE_SIZE>() ? read_int(buf) : -1;
|
||||
if (flags.contains<options_flag::PAGING_STATE>()) {
|
||||
unimplemented::paging();
|
||||
#if 0
|
||||
paging_state = PagingState.deserialize(CBUtil.readValue(body))
|
||||
#endif
|
||||
}
|
||||
|
||||
db::consistency_level serial_consistency = db::consistency_level::SERIAL;
|
||||
if (flags.contains<options_flag::SERIAL_CONSISTENCY>()) {
|
||||
serial_consistency = read_consistency(buf);
|
||||
}
|
||||
|
||||
api::timestamp_type ts = api::missing_timestamp;
|
||||
if (flags.contains<options_flag::TIMESTAMP>()) {
|
||||
ts = read_long(buf);
|
||||
if (ts < api::min_timestamp || ts > api::max_timestamp) {
|
||||
throw transport::protocol_exception(sprint("Out of bound timestamp, must be in [%d, %d] (got %d)",
|
||||
api::min_timestamp, api::max_timestamp, ts));
|
||||
}
|
||||
}
|
||||
|
||||
options = std::make_unique<cql3::default_query_options>(consistency, std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, _version);
|
||||
} else {
|
||||
options = std::make_unique<cql3::default_query_options>(consistency, std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options::DEFAULT, _version);
|
||||
}
|
||||
|
||||
if (names.empty()) {
|
||||
return std::move(options);
|
||||
}
|
||||
|
||||
return std::make_unique<cql3::options_with_names>(std::move(options), std::move(names));
|
||||
}
|
||||
|
||||
void cql_server::connection::read_name_and_value_list(temporary_buffer<char>& buf, std::vector<sstring>& names, std::vector<bytes_opt>& values) {
|
||||
uint16_t size = read_unsigned_short(buf);
|
||||
names.reserve(size);
|
||||
values.reserve(size);
|
||||
for (uint16_t i = 0; i < size; i++) {
|
||||
names.emplace_back(read_string(buf));
|
||||
values.emplace_back(read_value(buf));
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::connection::read_value_list(temporary_buffer<char>& buf, std::vector<bytes_opt>& values) {
|
||||
uint16_t size = read_unsigned_short(buf);
|
||||
values.reserve(size);
|
||||
for (uint16_t i = 0; i < size; i++) {
|
||||
values.emplace_back(read_value(buf));
|
||||
}
|
||||
}
|
||||
|
||||
bytes_opt cql_server::connection::read_value(temporary_buffer<char>& buf) {
|
||||
auto len = read_int(buf);
|
||||
if (len < 0) {
|
||||
return {};
|
||||
}
|
||||
check_room(buf, len);
|
||||
bytes b(buf.begin(), buf.begin() + len);
|
||||
buf.trim_front(len);
|
||||
return {std::move(b)};
|
||||
}
|
||||
|
||||
scattered_message<char> cql_server::response::make_message(uint8_t version) {
|
||||
scattered_message<char> msg;
|
||||
sstring body = _body.str();
|
||||
|
||||
@@ -6,11 +6,15 @@
|
||||
#define CQL_SERVER_HH
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
|
||||
class database;
|
||||
|
||||
class cql_server {
|
||||
std::vector<server_socket> _listeners;
|
||||
service::storage_proxy _proxy;
|
||||
cql3::query_processor _query_processor;
|
||||
public:
|
||||
cql_server(database& db);
|
||||
future<> listen(ipv4_addr addr);
|
||||
|
||||
47
unimplemented.cc
Normal file
47
unimplemented.cc
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#include "unimplemented.hh"
|
||||
#include "core/sstring.hh"
|
||||
|
||||
namespace unimplemented {
|
||||
|
||||
static inline
|
||||
void warn(sstring what) {
|
||||
std::cerr << "WARNING: Not implemented: " << what << std::endl;
|
||||
}
|
||||
|
||||
class warn_once {
|
||||
sstring _msg;
|
||||
public:
|
||||
warn_once(const char* msg) : _msg(msg) {}
|
||||
void operator()() {
|
||||
if (!_msg.empty()) {
|
||||
warn(_msg);
|
||||
_msg.reset();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void indexes() {
|
||||
static thread_local warn_once w("indexes");
|
||||
w();
|
||||
}
|
||||
|
||||
void auth() {
|
||||
static thread_local warn_once w("auth");
|
||||
w();
|
||||
}
|
||||
|
||||
void permissions() {
|
||||
static thread_local warn_once w("permissions");
|
||||
w();
|
||||
}
|
||||
|
||||
void triggers() {
|
||||
static thread_local warn_once w("triggers");
|
||||
w();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
#include <iostream>
|
||||
#include "core/print.hh"
|
||||
#include "core/sstring.hh"
|
||||
|
||||
namespace unimplemented {
|
||||
|
||||
@@ -17,15 +18,7 @@ void fail(sstring what) {
|
||||
throw std::runtime_error(sprint("not implemented: %s", what));
|
||||
}
|
||||
|
||||
static inline
|
||||
void warn(sstring what) {
|
||||
std::cerr << "WARNING: Not implemented: " << what << std::endl;
|
||||
}
|
||||
|
||||
static inline
|
||||
void indexes() {
|
||||
warn("indexes");
|
||||
}
|
||||
void indexes();
|
||||
|
||||
static inline
|
||||
void lwt() __attribute__((noreturn));
|
||||
@@ -36,19 +29,16 @@ void lwt() {
|
||||
}
|
||||
|
||||
static inline
|
||||
void auth() {
|
||||
warn("auth");
|
||||
}
|
||||
void paging() __attribute__((noreturn));
|
||||
|
||||
static inline
|
||||
void permissions() {
|
||||
warn("permissions");
|
||||
void paging() {
|
||||
fail("paging");
|
||||
}
|
||||
|
||||
static inline
|
||||
void triggers() {
|
||||
warn("triggers");
|
||||
}
|
||||
void auth();
|
||||
void permissions();
|
||||
void triggers();
|
||||
|
||||
static inline
|
||||
void collections() __attribute__((noreturn));
|
||||
|
||||
Reference in New Issue
Block a user