Merge branch 'thrift' into db

This patchset adds a simple metadata and data store, and wires the
system_add_keyspace RPC to set it up.

With this, cassandra-stress is able to initialize the database (but not
store anything in it).
This commit is contained in:
Avi Kivity
2014-12-24 09:41:50 +02:00
8 changed files with 204 additions and 15 deletions

View File

@@ -201,6 +201,7 @@ cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'C
deps = {
'seastar': (['main.cc',
'database.cc',
'thrift/handler.cc',
'thrift/server.cc',
]

57
database.cc Normal file
View File

@@ -0,0 +1,57 @@
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include "database.hh"
#include "net/byteorder.hh"
struct int_type_impl : public data_type::impl {
int_type_impl() : impl("int") {}
virtual void serialize(const boost::any& value, std::ostream& out) override {
auto v = boost::any_cast<const int32_t&>(value);
auto u = net::hton(uint32_t(v));
out.write(reinterpret_cast<const char*>(&u), sizeof(u));
}
virtual boost::any deserialize(std::istream& in) {
uint32_t u;
in.read(reinterpret_cast<char*>(&u), sizeof(u));
auto v = int32_t(net::ntoh(u));
return boost::any(v);
}
};
struct bigint_type_impl : public data_type::impl {
bigint_type_impl() : impl("bigint") {}
virtual void serialize(const boost::any& value, std::ostream& out) override {
auto v = boost::any_cast<const int64_t&>(value);
auto u = net::hton(uint64_t(v));
out.write(reinterpret_cast<const char*>(&u), sizeof(u));
}
virtual boost::any deserialize(std::istream& in) {
uint64_t u;
in.read(reinterpret_cast<char*>(&u), sizeof(u));
auto v = int64_t(net::ntoh(u));
return boost::any(v);
}
};
struct string_type_impl : public data_type::impl {
string_type_impl(sstring name) : impl(name) {}
virtual void serialize(const boost::any& value, std::ostream& out) override {
auto& v = boost::any_cast<const sstring&>(value);
out.write(v.c_str(), v.size());
}
virtual boost::any deserialize(std::istream& in) {
std::vector<char> tmp(std::istreambuf_iterator<char>(in.rdbuf()),
std::istreambuf_iterator<char>());
// FIXME: validation?
return boost::any(sstring(tmp.data(), tmp.size()));
}
};
data_type int_type(new int_type_impl);
data_type bigint_type(new bigint_type_impl);
data_type ascii_type(new string_type_impl("ascii"));
data_type blob_type(new string_type_impl("blob"));
data_type varchar_type(new string_type_impl("varchar"));
data_type text_type(new string_type_impl("text"));

94
database.hh Normal file
View File

@@ -0,0 +1,94 @@
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#ifndef DATABASE_HH_
#define DATABASE_HH_
#include "core/sstring.hh"
#include <functional>
#include <boost/any.hpp>
#include <cstdint>
#include <boost/variant.hpp>
#include <unordered_map>
#include <map>
#include <set>
#include <vector>
#include <iostream>
struct row {
std::vector<boost::any> cells;
};
using key_compare = std::function<bool (const boost::any&, const boost::any&)>;
struct partition {
row static_columns;
// row key within partition -> row
std::map<boost::any, row, key_compare> rows;
};
class data_type {
public:
// Hide the virtual stuff behind an impl class. This allows us to treat
// data_type as a normal value - we can copy, assign, and destroy it
// without worrying about the destructor.
struct impl {
sstring name;
impl(sstring name) : name(name) {}
virtual ~impl() {}
virtual void serialize(const boost::any& value, std::ostream& out) = 0;
virtual boost::any deserialize(std::istream& in) = 0;
};
private:
impl* _impl;
public:
explicit data_type(impl* impl) : _impl(impl) {}
static data_type find(const sstring& name);
const sstring& name() const { return _impl->name; }
void serialize(const boost::any& value, std::ostream& out) {
return _impl->serialize(value, out);
}
boost::any deserialize(std::istream& in) {
return _impl->deserialize(in);
}
bool operator==(const data_type& x) const {
return _impl == x._impl;
}
bool operator!=(const data_type& x) const {
return _impl != x._impl;
}
};
// FIXME: add missing types
extern data_type int_type;
extern data_type bigint_type;
extern data_type ascii_type;
extern data_type blob_type;
extern data_type varchar_type;
extern data_type text_type;
struct column_definition {
sstring name;
data_type type;
};
struct column_family {
// primary key = paritition key + clustering_key
std::vector<column_definition> partition_key;
std::vector<column_definition> clustering_key;
std::vector<column_definition> column_defs;
std::unordered_map<sstring, unsigned> column_names;
// partition key -> partition
std::map<boost::any, partition, key_compare> partitions;
};
struct keyspace {
std::unordered_map<sstring, column_family> column_families;
};
struct database {
std::unordered_map<sstring, keyspace> keyspaces;
};
#endif /* DATABASE_HH_ */

View File

@@ -3,6 +3,7 @@
*/
#include "database.hh"
#include "core/app-template.hh"
#include "core/smp.hh"
#include "thrift/server.hh"
@@ -14,11 +15,12 @@ int main(int ac, char** av) {
app.add_options()
("thrift-port", bpo::value<uint16_t>()->default_value(9160), "Thrift port") ;
auto server = std::make_unique<distributed<thrift_server>>();;
database db;
return app.run(ac, av, [&] {
auto&& config = app.configuration();
uint16_t port = config["thrift-port"].as<uint16_t>();
auto server = new distributed<thrift_server>;
server->start().then([server = std::move(server), port] () mutable {
server->start(std::ref(db)).then([server = std::move(server), port] () mutable {
server->invoke_on_all(&thrift_server::listen, ipv4_addr{port});
}).then([port] {
std::cout << "Thrift server listening on port " << port << " ...\n";

View File

@@ -3,6 +3,9 @@
*/
#include "Cassandra.h"
#include "database.hh"
#include "core/sstring.hh"
#include "core/print.hh"
#include <thrift/protocol/TBinaryProtocol.h>
using namespace ::apache::thrift;
@@ -24,15 +27,25 @@ void unimplemented(const tcxx::function<void(::apache::thrift::TDelayedException
}
class CassandraAsyncHandler : public CassandraCobSvIf {
database& _db;
keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection?
sstring _cql_version;
public:
explicit CassandraAsyncHandler(database& db) : _db(db) {}
void login(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const AuthenticationRequest& auth_request) {
// FIXME: implement
return unimplemented(exn_cob);
}
void set_keyspace(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
// FIXME: implement
return unimplemented(exn_cob);
try {
_ks = &_db.keyspaces.at(keyspace);
cob();
} catch (std::out_of_range& e) {
InvalidRequestException ire;
ire.why = sprint("keyspace %s does not exist", keyspace);
exn_cob(TDelayedException::delayException(ire));
}
}
void get(tcxx::function<void(ColumnOrSuperColumn const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) {
@@ -220,9 +233,27 @@ public:
}
void system_add_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
std::string _return;
// FIXME: implement
return unimplemented(exn_cob);
std::string schema_id = "schema-id"; // FIXME: make meaningful
if (_db.keyspaces.count(ks_def.name)) {
InvalidRequestException ire;
ire.why = sprint("Keyspace %s already exists", ks_def.name);
exn_cob(TDelayedException::delayException(ire));
}
keyspace& ks = _db.keyspaces[ks_def.name];
for (const CfDef& cf_def : ks_def.cf_defs) {
column_family& cf = ks.column_families[cf_def.name];
// FIXME: look at key_alias and key_validator first
cf.partition_key.push_back(column_definition{"key", blob_type});
// FIXME: guess clustering keys
for (const ColumnDef& col_def : cf_def.column_metadata) {
// FIXME: look at all fields, not just name
cf.column_defs.push_back(column_definition{
col_def.name,
blob_type,
});
}
}
cob(schema_id);
}
void system_drop_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
@@ -280,16 +311,18 @@ public:
}
void set_cql_version(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& version) {
// FIXME: implement
return unimplemented(exn_cob);
_cql_version = version;
cob();
}
};
class handler_factory : public CassandraCobSvIfFactory {
database& _db;
public:
explicit handler_factory(database& db) : _db(db) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
return new CassandraAsyncHandler;
return new CassandraAsyncHandler(_db);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
@@ -297,6 +330,6 @@ public:
};
std::unique_ptr<CassandraCobSvIfFactory>
create_handler_factory() {
return std::make_unique<handler_factory>();
create_handler_factory(database& db) {
return std::make_unique<handler_factory>(db);
}

View File

@@ -6,8 +6,9 @@
#define APPS_SEASTAR_THRIFT_HANDLER_HH_
#include "Cassandra.h"
#include "database.hh"
#include <memory>
std::unique_ptr<org::apache::cassandra::CassandraCobSvIfFactory> create_handler_factory();
std::unique_ptr<org::apache::cassandra::CassandraCobSvIfFactory> create_handler_factory(database& db);
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */

View File

@@ -36,9 +36,9 @@ public:
thrift_stats(thrift_server& server);
};
thrift_server::thrift_server()
thrift_server::thrift_server(database& db)
: _stats(new thrift_stats(*this))
, _handler_factory(create_handler_factory().release())
, _handler_factory(create_handler_factory(db).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) {
}

View File

@@ -11,6 +11,7 @@
class thrift_server;
class thrift_stats;
struct database;
namespace org { namespace apache { namespace cassandra {
@@ -41,7 +42,7 @@ class thrift_server {
uint64_t _current_connections = 0;
uint64_t _requests_served = 0;
public:
thrift_server();
thrift_server(database& db);
future<> listen(ipv4_addr addr);
void do_accepts(int which);
class connection;