CQL binary protocol
This patch implement initial support for CQL binary protocol versions 1,
2, 3, and 4. The CQL server is able to handshake with cqlsh and
cassandra-stress from Cassandra 2.1 which uses the STARTUP and OPTIONS
messages. Queries or other functionality is not supported.
To try it out, start Urchin:
$ build/release/seastar --smp 1 --datadir data
CQL server listening on port 9042 ...
Thrift server listening on port 9160 ...
Then try to login with cqlsh:
$ ./bin/cqlsh 127.0.0.1
Urchin side will fail with:
CQL_VERSION => 3.2.0
warning: ignoring event registration
warning: ignoring query SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers
seastar: cql/server.cc:222: future<> cql_server::connection::process_query(uint16_t, temporary_buffer<char>&): Assertion `0' failed.
Aborted (core dumped)
TODO:
- Compression is not supported.
- Authentication is not supported.
- Supported options are defined to make cqlsh and cassandra-stress
happy. We really need to decide which CQL versions and compression
algorithms we want to support.
- std::string is used everywhere because sstring does not work with
std::map, std::multimap, and others.
Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
Signed-off-by: Tomasz Grabiec <tgrabiec@cloudius-systems.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
5049ed8ae6
commit
2a7da21481
@@ -225,6 +225,8 @@ deps = {
|
||||
'seastar': (['main.cc',
|
||||
'database.cc',
|
||||
'log.cc',
|
||||
'cql/binary.rl',
|
||||
'cql/server.cc',
|
||||
'cql3/abstract_marker.cc',
|
||||
'cql3/cql3.cc',
|
||||
'cql3/cql3_type.cc',
|
||||
|
||||
148
cql/binary.rl
Normal file
148
cql/binary.rl
Normal file
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#include "core/ragel.hh"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
/*
|
||||
* CQL binary protocol versions [1234]
|
||||
*/
|
||||
|
||||
%%{
|
||||
|
||||
machine cql_binary_protocol;
|
||||
|
||||
access _fsm_;
|
||||
|
||||
action mark {
|
||||
_g = p;
|
||||
}
|
||||
|
||||
v1 = 0x01 >mark @{ _version = read_u8(); };
|
||||
|
||||
v2 = 0x02 >mark @{ _version = read_u8(); };
|
||||
|
||||
v3 = 0x03 >mark @{ _version = read_u8(); };
|
||||
|
||||
v4 = 0x04 >mark @{ _version = read_u8(); };
|
||||
|
||||
v1_v2 = v1|v2;
|
||||
|
||||
v3_v4 = v3|v4;
|
||||
|
||||
flags = any{1} >mark @{ _flags = read_u8(); };
|
||||
|
||||
stream8 = any{1} >mark @{ _stream = read_u8(); };
|
||||
|
||||
stream16 = any{2} >mark @{ _stream = read_be16(); };
|
||||
|
||||
length = any{4} >mark @{ _length = read_be32(); };
|
||||
|
||||
startup_v1_v2 = v1_v2 flags stream8 0x01 length @{ _state = state::req_startup; };
|
||||
|
||||
credentials_v1 = v1 flags stream8 0x04 length @{ _state = state::req_credentials; };
|
||||
|
||||
auth_response_v2 = v2 flags stream8 0x0f length @{ _state = state::req_auth_response; };
|
||||
|
||||
options_v1_v2 = v1_v2 flags stream8 0x05 length @{ _state = state::req_options; };
|
||||
|
||||
query_v1_v2 = v1_v2 flags stream8 0x07 length @{ _state = state::req_query; };
|
||||
|
||||
prepare_v1_v2 = v1_v2 flags stream8 0x09 length @{ _state = state::req_prepare; };
|
||||
|
||||
execute_v1_v2 = v1_v2 flags stream8 0x0a length @{ _state = state::req_execute; };
|
||||
|
||||
batch_v1_v2 = v1_v2 flags stream8 0x0d length @{ _state = state::req_batch; };
|
||||
|
||||
register_v1_v2 = v1_v2 flags stream8 0x0b length @{ _state = state::req_register; };
|
||||
|
||||
request_v1_v2 = startup_v1_v2|credentials_v1|auth_response_v2|options_v1_v2|query_v1_v2|prepare_v1_v2|execute_v1_v2|batch_v1_v2|register_v1_v2;
|
||||
|
||||
startup_v3_v4 = v3_v4 flags stream16 0x01 length @{ _state = state::req_startup; };
|
||||
|
||||
auth_response_v3_v4 = v3_v4 flags stream16 0x0f length @{ _state = state::req_auth_response; };
|
||||
|
||||
options_v3_v4 = v3_v4 flags stream16 0x05 length @{ _state = state::req_options; };
|
||||
|
||||
query_v3_v4 = v3_v4 flags stream16 0x07 length @{ _state = state::req_query; };
|
||||
|
||||
prepare_v3_v4 = v3_v4 flags stream16 0x09 length @{ _state = state::req_prepare; };
|
||||
|
||||
execute_v3_v4 = v3_v4 flags stream16 0x0a length @{ _state = state::req_execute; };
|
||||
|
||||
batch_v3_v4 = v3_v4 flags stream16 0x0d length @{ _state = state::req_batch; };
|
||||
|
||||
register_v3_v4 = v3_v4 flags stream16 0x0b length @{ _state = state::req_register; };
|
||||
|
||||
request_v3_v4 = startup_v3_v4|auth_response_v3_v4|options_v3_v4|query_v3_v4|prepare_v3_v4|execute_v3_v4|batch_v3_v4|register_v3_v4;
|
||||
|
||||
main := request_v1_v2|request_v3_v4 @eof{ _state = state::eof; };
|
||||
|
||||
prepush {
|
||||
prepush();
|
||||
}
|
||||
|
||||
postpop {
|
||||
postpop();
|
||||
}
|
||||
|
||||
}%%
|
||||
|
||||
class cql_binary_parser : public ragel_parser_base<cql_binary_parser> {
|
||||
%% write data nofinal noprefix;
|
||||
public:
|
||||
enum class state {
|
||||
error,
|
||||
eof,
|
||||
req_startup,
|
||||
req_credentials,
|
||||
req_auth_response,
|
||||
req_options,
|
||||
req_query,
|
||||
req_prepare,
|
||||
req_execute,
|
||||
req_batch,
|
||||
req_register,
|
||||
};
|
||||
state _state;
|
||||
int8_t _version;
|
||||
int8_t _flags;
|
||||
int16_t _stream;
|
||||
int32_t _length;
|
||||
char* _g;
|
||||
public:
|
||||
void init() {
|
||||
init_base();
|
||||
_state = state::error;
|
||||
%% write init;
|
||||
}
|
||||
char* parse(char* p, char* pe, char* eof) {
|
||||
%% write exec;
|
||||
if (_state != state::error) {
|
||||
return p;
|
||||
}
|
||||
if (p != pe) {
|
||||
p = pe;
|
||||
return p;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
uint8_t read_u8() {
|
||||
return _g[0];
|
||||
}
|
||||
uint16_t read_be16() {
|
||||
return (static_cast<uint8_t>(_g[0]) << 8)
|
||||
| (static_cast<uint8_t>(_g[1]));
|
||||
}
|
||||
uint32_t read_be32() {
|
||||
return (static_cast<uint8_t>(_g[0]) << 24)
|
||||
| (static_cast<uint8_t>(_g[1]) << 16)
|
||||
| (static_cast<uint8_t>(_g[2]) << 8)
|
||||
| (static_cast<uint8_t>(_g[3]));
|
||||
}
|
||||
bool eof() const {
|
||||
return _state == state::eof;
|
||||
}
|
||||
};
|
||||
557
cql/server.cc
Normal file
557
cql/server.cc
Normal file
@@ -0,0 +1,557 @@
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
|
||||
#include "cql/server.hh"
|
||||
|
||||
#include "db/consistency_level.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "core/reactor.hh"
|
||||
#include "cql/binary.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "database.hh"
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
|
||||
struct [[gnu::packed]] cql_binary_frame_v1 {
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
uint8_t stream;
|
||||
uint8_t opcode;
|
||||
uint32_t length;
|
||||
};
|
||||
|
||||
struct [[gnu::packed]] cql_binary_frame_v3 {
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
int16_t stream;
|
||||
uint8_t opcode;
|
||||
uint32_t length;
|
||||
};
|
||||
|
||||
enum class cql_binary_opcode : uint8_t {
|
||||
ERROR = 0,
|
||||
STARTUP = 1,
|
||||
READY = 2,
|
||||
AUTHENTICATE = 3,
|
||||
CREDENTIALS = 4,
|
||||
OPTIONS = 5,
|
||||
SUPPORTED = 6,
|
||||
QUERY = 7,
|
||||
RESULT = 8,
|
||||
PREPARE = 9,
|
||||
EXECUTE = 10,
|
||||
REGISTER = 11,
|
||||
EVENT = 12,
|
||||
BATCH = 13,
|
||||
AUTH_CHALLENGE = 14,
|
||||
AUTH_RESPONSE = 15,
|
||||
AUTH_SUCCESS = 16,
|
||||
};
|
||||
|
||||
enum class cql_binary_error {
|
||||
SERVER_ERROR = 0x0000,
|
||||
PROTOCOL_ERROR = 0x000A,
|
||||
BAD_CREDENTIALS = 0x0100,
|
||||
UNAVAILABLE = 0x1000,
|
||||
OVERLOADED = 0x1001,
|
||||
IS_BOOTSTRAPPING = 0x1002,
|
||||
TRUNCATE_ERROR = 0x1003,
|
||||
WRITE_TIMEOUT = 0x1100,
|
||||
READ_TIMEOUT = 0x1200,
|
||||
SYNTAX_ERROR = 0x2000,
|
||||
UNAUTHORIZED = 0x2100,
|
||||
INVALID = 0x2200,
|
||||
CONFIG_ERROR = 0x2300,
|
||||
ALREADY_EXISTS = 0x2400,
|
||||
UNPREPARED = 0x2500,
|
||||
};
|
||||
|
||||
inline db::consistency_level wire_to_consistency(int16_t v)
|
||||
{
|
||||
switch (v) {
|
||||
case 0x0000: return db::consistency_level::ANY;
|
||||
case 0x0001: return db::consistency_level::ONE;
|
||||
case 0x0002: return db::consistency_level::TWO;
|
||||
case 0x0003: return db::consistency_level::THREE;
|
||||
case 0x0004: return db::consistency_level::QUORUM;
|
||||
case 0x0005: return db::consistency_level::ALL;
|
||||
case 0x0006: return db::consistency_level::LOCAL_QUORUM;
|
||||
case 0x0007: return db::consistency_level::EACH_QUORUM;
|
||||
case 0x0008: return db::consistency_level::SERIAL;
|
||||
case 0x0009: return db::consistency_level::LOCAL_SERIAL;
|
||||
case 0x000A: return db::consistency_level::LOCAL_ONE;
|
||||
default: assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
inline int16_t consistency_to_wire(db::consistency_level c)
|
||||
{
|
||||
switch (c) {
|
||||
case db::consistency_level::ANY: return 0x0000;
|
||||
case db::consistency_level::ONE: return 0x0001;
|
||||
case db::consistency_level::TWO: return 0x0002;
|
||||
case db::consistency_level::THREE: return 0x0003;
|
||||
case db::consistency_level::QUORUM: return 0x0004;
|
||||
case db::consistency_level::ALL: return 0x0005;
|
||||
case db::consistency_level::LOCAL_QUORUM: return 0x0006;
|
||||
case db::consistency_level::EACH_QUORUM: return 0x0007;
|
||||
case db::consistency_level::SERIAL: return 0x0008;
|
||||
case db::consistency_level::LOCAL_SERIAL: return 0x0009;
|
||||
case db::consistency_level::LOCAL_ONE: return 0x000A;
|
||||
default: assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
class cql_server::connection {
|
||||
cql_server& _server;
|
||||
connected_socket _fd;
|
||||
input_stream<char> _read_buf;
|
||||
output_stream<char> _write_buf;
|
||||
cql_binary_parser _parser;
|
||||
uint8_t _version = 0;
|
||||
public:
|
||||
connection(cql_server& server, connected_socket&& fd, socket_address addr)
|
||||
: _server(server)
|
||||
, _fd(std::move(fd))
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
{ }
|
||||
future<> process() {
|
||||
return do_until([this] { return _read_buf.eof(); }, [this] { return process_request(); });
|
||||
}
|
||||
future<> process_request();
|
||||
private:
|
||||
future<> process_startup(uint8_t version, int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_auth_response(int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_options(uint8_t version, int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_query(int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_prepare(int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_execute(int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_batch(int16_t stream, temporary_buffer<char>& buf);
|
||||
future<> process_register(int16_t stream, temporary_buffer<char>& buf);
|
||||
|
||||
future<> write_error(int16_t stream, cql_binary_error err, sstring msg);
|
||||
future<> write_ready(int16_t stream);
|
||||
future<> write_supported(int16_t stream);
|
||||
future<> write_response(shared_ptr<cql_server::response> response);
|
||||
|
||||
int8_t read_byte(temporary_buffer<char>& buf);
|
||||
int32_t read_int(temporary_buffer<char>& buf);
|
||||
int64_t read_long(temporary_buffer<char>& buf);
|
||||
int16_t read_short(temporary_buffer<char>& buf);
|
||||
sstring read_string(temporary_buffer<char>& buf);
|
||||
sstring read_long_string(temporary_buffer<char>& buf);
|
||||
db::consistency_level read_consistency(temporary_buffer<char>& buf);
|
||||
std::unordered_map<sstring, sstring> read_string_map(temporary_buffer<char>& buf);
|
||||
};
|
||||
|
||||
class cql_server::response {
|
||||
int16_t _stream;
|
||||
cql_binary_opcode _opcode;
|
||||
std::stringstream _body;
|
||||
public:
|
||||
response(int16_t stream, cql_binary_opcode opcode)
|
||||
: _stream{stream}
|
||||
, _opcode{opcode}
|
||||
{ }
|
||||
scattered_message<char> make_message(uint8_t version);
|
||||
void write_int(int32_t n);
|
||||
void write_long(int64_t n);
|
||||
void write_short(int16_t n);
|
||||
void write_string(const sstring& s);
|
||||
void write_long_string(const sstring& s);
|
||||
void write_uuid(utils::UUID uuid);
|
||||
void write_string_list(std::vector<std::string> string_list);
|
||||
void write_bytes(bytes b);
|
||||
void write_short_bytes(bytes b);
|
||||
void write_option(std::pair<int16_t, boost::any> opt);
|
||||
void write_option_list(std::vector<std::pair<int16_t, boost::any>> opt_list);
|
||||
void write_inet(ipv4_addr inet);
|
||||
void write_consistency(db::consistency_level c);
|
||||
void write_string_map(std::map<std::string, std::string> string_map);
|
||||
void write_string_multimap(std::multimap<std::string, std::string> string_map);
|
||||
private:
|
||||
sstring make_frame(uint8_t version, size_t length);
|
||||
};
|
||||
|
||||
cql_server::cql_server(database& db)
|
||||
{
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(ipv4_addr addr) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
|
||||
do_accepts(_listeners.size() - 1);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void
|
||||
cql_server::do_accepts(int which) {
|
||||
_listeners[which].accept().then([this, which] (connected_socket fd, socket_address addr) mutable {
|
||||
auto conn = make_shared<connection>(*this, std::move(fd), addr);
|
||||
conn->process().rescue([this, conn] (auto&& get_ex) {
|
||||
try {
|
||||
get_ex();
|
||||
} catch (std::exception& ex) {
|
||||
std::cout << "request error " << ex.what() << "\n";
|
||||
}
|
||||
});
|
||||
do_accepts(which);
|
||||
}).rescue([] (auto get_ex) {
|
||||
try {
|
||||
get_ex();
|
||||
} catch (std::exception& ex) {
|
||||
std::cout << "accept failed: " << ex.what() << "\n";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_request() {
|
||||
_parser.init();
|
||||
return _read_buf.consume(_parser).then([this] () -> future<> {
|
||||
switch (_parser._state) {
|
||||
case cql_binary_parser::state::eof: return make_ready_future<>();
|
||||
case cql_binary_parser::state::error: return write_error(_parser._stream, cql_binary_error::PROTOCOL_ERROR, sstring{"parse error"});
|
||||
default: break;
|
||||
}
|
||||
return _read_buf.read_exactly(_parser._length).then([this] (temporary_buffer<char> buf) {
|
||||
assert(!(_parser._flags & 0x01)); // FIXME: compression
|
||||
switch (_parser._state) {
|
||||
case cql_binary_parser::state::req_startup: return process_startup(_parser._version, _parser._stream, buf);
|
||||
case cql_binary_parser::state::req_auth_response: return process_auth_response(_parser._stream, buf);
|
||||
case cql_binary_parser::state::req_options: return process_options(_parser._version, _parser._stream, buf);
|
||||
case cql_binary_parser::state::req_query: return process_query(_parser._stream, buf);
|
||||
case cql_binary_parser::state::req_prepare: return process_prepare(_parser._stream, buf);
|
||||
case cql_binary_parser::state::req_execute: return process_execute(_parser._stream, buf);
|
||||
case cql_binary_parser::state::req_batch: return process_batch(_parser._stream, buf);
|
||||
case cql_binary_parser::state::req_register: return process_register(_parser._stream, buf);
|
||||
default: assert(0);
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_startup(uint8_t version, int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
_version = version;
|
||||
auto string_map = read_string_map(buf);
|
||||
for (auto&& s : string_map) {
|
||||
print("%s => %s\n", s.first, s.second);
|
||||
}
|
||||
return write_ready(stream);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_auth_response(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
assert(0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_options(uint8_t version, int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
_version = version;
|
||||
return write_supported(stream);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_query(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
auto query = read_long_string(buf);
|
||||
#if 0
|
||||
auto consistency = read_consistency(buf);
|
||||
auto flags = read_byte(buf);
|
||||
#endif
|
||||
print("warning: ignoring query %s\n", query);
|
||||
assert(0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_prepare(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
assert(0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_execute(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
assert(0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_batch(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
assert(0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> cql_server::connection::process_register(int16_t stream, temporary_buffer<char>& buf)
|
||||
{
|
||||
print("warning: ignoring event registration\n");
|
||||
return write_ready(stream);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_error(int16_t stream, cql_binary_error err, sstring msg)
|
||||
{
|
||||
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
response->write_string(msg);
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_ready(int16_t stream)
|
||||
{
|
||||
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::READY);
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_supported(int16_t stream)
|
||||
{
|
||||
std::multimap<std::string, std::string> opts;
|
||||
opts.insert({"CQL_VERSION", "3.0.0"});
|
||||
opts.insert({"CQL_VERSION", "3.2.0"});
|
||||
opts.insert({"COMPRESSION", "snappy"});
|
||||
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::SUPPORTED);
|
||||
response->write_string_multimap(opts);
|
||||
return write_response(response);
|
||||
}
|
||||
|
||||
future<> cql_server::connection::write_response(shared_ptr<cql_server::response> response)
|
||||
{
|
||||
auto msg = response->make_message(_version);
|
||||
return _write_buf.write(std::move(msg)).then([this] {
|
||||
return _write_buf.flush();
|
||||
});
|
||||
}
|
||||
|
||||
int8_t cql_server::connection::read_byte(temporary_buffer<char>& buf)
|
||||
{
|
||||
int8_t n = buf[0];
|
||||
buf.trim_front(1);
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t cql_server::connection::read_int(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint32_t n = (static_cast<uint32_t>(p[0]) << 24)
|
||||
| (static_cast<uint32_t>(p[1]) << 16)
|
||||
| (static_cast<uint32_t>(p[2]) << 8)
|
||||
| (static_cast<uint32_t>(p[3]));
|
||||
buf.trim_front(4);
|
||||
return n;
|
||||
}
|
||||
|
||||
int64_t cql_server::connection::read_long(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint64_t n = (static_cast<uint64_t>(p[0]) << 56)
|
||||
| (static_cast<uint64_t>(p[1]) << 48)
|
||||
| (static_cast<uint64_t>(p[2]) << 40)
|
||||
| (static_cast<uint64_t>(p[3]) << 32)
|
||||
| (static_cast<uint64_t>(p[4]) << 24)
|
||||
| (static_cast<uint64_t>(p[5]) << 16)
|
||||
| (static_cast<uint64_t>(p[6]) << 8)
|
||||
| (static_cast<uint64_t>(p[7]));
|
||||
buf.trim_front(8);
|
||||
return n;
|
||||
}
|
||||
|
||||
int16_t cql_server::connection::read_short(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto p = reinterpret_cast<const uint8_t*>(buf.begin());
|
||||
uint16_t n = (static_cast<uint16_t>(p[0]) << 8)
|
||||
| (static_cast<uint16_t>(p[1]));
|
||||
buf.trim_front(2);
|
||||
return n;
|
||||
}
|
||||
|
||||
sstring cql_server::connection::read_string(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto n = read_short(buf);
|
||||
sstring s{buf.begin(), static_cast<size_t>(n)};
|
||||
assert(n >= 0);
|
||||
buf.trim_front(n);
|
||||
return s;
|
||||
}
|
||||
|
||||
sstring cql_server::connection::read_long_string(temporary_buffer<char>& buf)
|
||||
{
|
||||
auto n = read_int(buf);
|
||||
sstring s{buf.begin(), static_cast<size_t>(n)};
|
||||
buf.trim_front(n);
|
||||
return s;
|
||||
}
|
||||
|
||||
db::consistency_level cql_server::connection::read_consistency(temporary_buffer<char>& buf)
|
||||
{
|
||||
return wire_to_consistency(read_short(buf));
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, sstring> cql_server::connection::read_string_map(temporary_buffer<char>& buf)
|
||||
{
|
||||
std::unordered_map<sstring, sstring> string_map;
|
||||
auto n = read_short(buf);
|
||||
for (auto i = 0; i < n; i++) {
|
||||
auto key = read_string(buf);
|
||||
auto val = read_string(buf);
|
||||
string_map.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(std::move(key)),
|
||||
std::forward_as_tuple(std::move(val)));
|
||||
}
|
||||
return string_map;
|
||||
}
|
||||
|
||||
scattered_message<char> cql_server::response::make_message(uint8_t version) {
|
||||
scattered_message<char> msg;
|
||||
sstring body = _body.str();
|
||||
sstring frame = make_frame(version, body.size());
|
||||
msg.append(std::move(frame));
|
||||
msg.append(std::move(body));
|
||||
return msg;
|
||||
}
|
||||
|
||||
sstring cql_server::response::make_frame(uint8_t version, size_t length)
|
||||
{
|
||||
switch (version) {
|
||||
case 0x01:
|
||||
case 0x02: {
|
||||
sstring frame_buf(sstring::initialized_later(), sizeof(cql_binary_frame_v1));
|
||||
auto* frame = reinterpret_cast<cql_binary_frame_v1*>(frame_buf.begin());
|
||||
frame->version = version | 0x80;
|
||||
frame->flags = 0x00;
|
||||
frame->stream = _stream;
|
||||
frame->opcode = static_cast<uint8_t>(_opcode);
|
||||
frame->length = htonl(length);
|
||||
return frame_buf;
|
||||
}
|
||||
case 0x03:
|
||||
case 0x04: {
|
||||
sstring frame_buf(sstring::initialized_later(), sizeof(cql_binary_frame_v3));
|
||||
auto* frame = reinterpret_cast<cql_binary_frame_v3*>(frame_buf.begin());
|
||||
frame->version = version | 0x80;
|
||||
frame->flags = 0x00;
|
||||
frame->stream = htons(_stream);
|
||||
frame->opcode = static_cast<uint8_t>(_opcode);
|
||||
frame->length = htonl(length);
|
||||
return frame_buf;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::response::write_int(int32_t n)
|
||||
{
|
||||
auto u = htonl(n);
|
||||
_body.write(reinterpret_cast<const char*>(&u), sizeof(u));
|
||||
}
|
||||
|
||||
void cql_server::response::write_long(int64_t n)
|
||||
{
|
||||
auto u = htonq(n);
|
||||
_body.write(reinterpret_cast<const char*>(&u), sizeof(u));
|
||||
}
|
||||
|
||||
void cql_server::response::write_short(int16_t n)
|
||||
{
|
||||
auto u = htons(n);
|
||||
_body.write(reinterpret_cast<const char*>(&u), sizeof(u));
|
||||
}
|
||||
|
||||
void cql_server::response::write_string(const sstring& s)
|
||||
{
|
||||
assert(s.size() < std::numeric_limits<int16_t>::max());
|
||||
write_short(s.size());
|
||||
_body << s;
|
||||
}
|
||||
|
||||
void cql_server::response::write_long_string(const sstring& s)
|
||||
{
|
||||
assert(s.size() < std::numeric_limits<int32_t>::max());
|
||||
write_int(s.size());
|
||||
_body << s;
|
||||
}
|
||||
|
||||
void cql_server::response::write_uuid(utils::UUID uuid)
|
||||
{
|
||||
// FIXME
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void cql_server::response::write_string_list(std::vector<std::string> string_list)
|
||||
{
|
||||
assert(string_list.size() < std::numeric_limits<int16_t>::max());
|
||||
write_short(string_list.size());
|
||||
for (auto&& s : string_list) {
|
||||
write_string(s);
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::response::write_bytes(bytes b)
|
||||
{
|
||||
assert(b.size() < std::numeric_limits<int32_t>::max());
|
||||
write_int(b.size());
|
||||
_body << b;
|
||||
}
|
||||
|
||||
void cql_server::response::write_short_bytes(bytes b)
|
||||
{
|
||||
assert(b.size() < std::numeric_limits<int16_t>::max());
|
||||
write_short(b.size());
|
||||
_body << b;
|
||||
}
|
||||
|
||||
void cql_server::response::write_option(std::pair<int16_t, boost::any> opt)
|
||||
{
|
||||
// FIXME
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void cql_server::response::write_option_list(std::vector<std::pair<int16_t, boost::any>> opt_list)
|
||||
{
|
||||
// FIXME
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void cql_server::response::write_inet(ipv4_addr inet)
|
||||
{
|
||||
// FIXME
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void cql_server::response::write_consistency(db::consistency_level c)
|
||||
{
|
||||
write_short(consistency_to_wire(c));
|
||||
}
|
||||
|
||||
void cql_server::response::write_string_map(std::map<std::string, std::string> string_map)
|
||||
{
|
||||
assert(string_map.size() < std::numeric_limits<int16_t>::max());
|
||||
write_short(string_map.size());
|
||||
for (auto&& s : string_map) {
|
||||
write_string(s.first);
|
||||
write_string(s.second);
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::response::write_string_multimap(std::multimap<std::string, std::string> string_map)
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
for (auto it = string_map.begin(), end = string_map.end(); it != end; it = string_map.upper_bound(it->first)) {
|
||||
keys.push_back(it->first);
|
||||
}
|
||||
assert(keys.size() < std::numeric_limits<int16_t>::max());
|
||||
write_short(keys.size());
|
||||
for (auto&& key : keys) {
|
||||
std::vector<std::string> values;
|
||||
auto range = string_map.equal_range(key);
|
||||
for (auto it = range.first; it != range.second; ++it) {
|
||||
values.push_back(it->second);
|
||||
}
|
||||
write_string(key);
|
||||
write_string_list(values);
|
||||
}
|
||||
}
|
||||
23
cql/server.hh
Normal file
23
cql/server.hh
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
||||
*/
|
||||
|
||||
#ifndef CQL_SERVER_HH
|
||||
#define CQL_SERVER_HH
|
||||
|
||||
#include "core/reactor.hh"
|
||||
|
||||
class database;
|
||||
|
||||
class cql_server {
|
||||
std::vector<server_socket> _listeners;
|
||||
public:
|
||||
cql_server(database& db);
|
||||
future<> listen(ipv4_addr addr);
|
||||
void do_accepts(int which);
|
||||
private:
|
||||
class connection;
|
||||
class response;
|
||||
};
|
||||
|
||||
#endif
|
||||
23
main.cc
23
main.cc
@@ -7,12 +7,14 @@
|
||||
#include "core/app-template.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "thrift/server.hh"
|
||||
#include "cql/server.hh"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
int main(int ac, char** av) {
|
||||
app_template app;
|
||||
app.add_options()
|
||||
("cql-port", bpo::value<uint16_t>()->default_value(9042), "CQL port")
|
||||
("thrift-port", bpo::value<uint16_t>()->default_value(9160), "Thrift port")
|
||||
("datadir", bpo::value<std::string>()->default_value("/var/lib/cassandra/data"), "data directory");
|
||||
|
||||
@@ -20,16 +22,23 @@ int main(int ac, char** av) {
|
||||
|
||||
return app.run(ac, av, [&] {
|
||||
auto&& config = app.configuration();
|
||||
uint16_t port = config["thrift-port"].as<uint16_t>();
|
||||
uint16_t thrift_port = config["thrift-port"].as<uint16_t>();
|
||||
uint16_t cql_port = config["cql-port"].as<uint16_t>();
|
||||
sstring datadir = config["datadir"].as<std::string>();
|
||||
|
||||
return database::populate(datadir).then([port] (database db) {
|
||||
return database::populate(datadir).then([cql_port, thrift_port] (database db) {
|
||||
auto pdb = new database(std::move(db));
|
||||
auto server = new distributed<thrift_server>;
|
||||
server->start(std::ref(*pdb)).then([server = std::move(server), port] () mutable {
|
||||
server->invoke_on_all(&thrift_server::listen, ipv4_addr{port});
|
||||
}).then([port] {
|
||||
std::cout << "Thrift server listening on port " << port << " ...\n";
|
||||
auto cserver = new distributed<cql_server>;
|
||||
cserver->start(std::ref(*pdb)).then([server = std::move(cserver), cql_port] () mutable {
|
||||
server->invoke_on_all(&cql_server::listen, ipv4_addr{cql_port});
|
||||
}).then([cql_port] {
|
||||
std::cout << "CQL server listening on port " << cql_port << " ...\n";
|
||||
});
|
||||
auto tserver = new distributed<thrift_server>;
|
||||
tserver->start(std::ref(*pdb)).then([server = std::move(tserver), thrift_port] () mutable {
|
||||
server->invoke_on_all(&thrift_server::listen, ipv4_addr{thrift_port});
|
||||
}).then([thrift_port] {
|
||||
std::cout << "Thrift server listening on port " << thrift_port << " ...\n";
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user