diff --git a/configure.py b/configure.py index 5b9ea74b83..fb5fb9173a 100755 --- a/configure.py +++ b/configure.py @@ -201,6 +201,7 @@ cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'C deps = { 'seastar': (['main.cc', + 'database.cc', 'thrift/handler.cc', 'thrift/server.cc', ] diff --git a/database.cc b/database.cc new file mode 100644 index 0000000000..ae56388124 --- /dev/null +++ b/database.cc @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include "database.hh" +#include "net/byteorder.hh" + +struct int_type_impl : public data_type::impl { + int_type_impl() : impl("int") {} + virtual void serialize(const boost::any& value, std::ostream& out) override { + auto v = boost::any_cast(value); + auto u = net::hton(uint32_t(v)); + out.write(reinterpret_cast(&u), sizeof(u)); + } + virtual boost::any deserialize(std::istream& in) { + uint32_t u; + in.read(reinterpret_cast(&u), sizeof(u)); + auto v = int32_t(net::ntoh(u)); + return boost::any(v); + } +}; + +struct bigint_type_impl : public data_type::impl { + bigint_type_impl() : impl("bigint") {} + virtual void serialize(const boost::any& value, std::ostream& out) override { + auto v = boost::any_cast(value); + auto u = net::hton(uint64_t(v)); + out.write(reinterpret_cast(&u), sizeof(u)); + } + virtual boost::any deserialize(std::istream& in) { + uint64_t u; + in.read(reinterpret_cast(&u), sizeof(u)); + auto v = int64_t(net::ntoh(u)); + return boost::any(v); + } +}; + +struct string_type_impl : public data_type::impl { + string_type_impl(sstring name) : impl(name) {} + virtual void serialize(const boost::any& value, std::ostream& out) override { + auto& v = boost::any_cast(value); + out.write(v.c_str(), v.size()); + } + virtual boost::any deserialize(std::istream& in) { + std::vector tmp(std::istreambuf_iterator(in.rdbuf()), + std::istreambuf_iterator()); + // FIXME: validation? + return boost::any(sstring(tmp.data(), tmp.size())); + } +}; + +data_type int_type(new int_type_impl); +data_type bigint_type(new bigint_type_impl); +data_type ascii_type(new string_type_impl("ascii")); +data_type blob_type(new string_type_impl("blob")); +data_type varchar_type(new string_type_impl("varchar")); +data_type text_type(new string_type_impl("text")); diff --git a/database.hh b/database.hh new file mode 100644 index 0000000000..7fc28836bb --- /dev/null +++ b/database.hh @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#ifndef DATABASE_HH_ +#define DATABASE_HH_ + +#include "core/sstring.hh" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct row { + std::vector cells; +}; + +using key_compare = std::function; + +struct partition { + row static_columns; + // row key within partition -> row + std::map rows; +}; + +class data_type { +public: + // Hide the virtual stuff behind an impl class. This allows us to treat + // data_type as a normal value - we can copy, assign, and destroy it + // without worrying about the destructor. + struct impl { + sstring name; + impl(sstring name) : name(name) {} + virtual ~impl() {} + virtual void serialize(const boost::any& value, std::ostream& out) = 0; + virtual boost::any deserialize(std::istream& in) = 0; + }; +private: + impl* _impl; +public: + explicit data_type(impl* impl) : _impl(impl) {} + static data_type find(const sstring& name); + const sstring& name() const { return _impl->name; } + void serialize(const boost::any& value, std::ostream& out) { + return _impl->serialize(value, out); + } + boost::any deserialize(std::istream& in) { + return _impl->deserialize(in); + } + bool operator==(const data_type& x) const { + return _impl == x._impl; + } + bool operator!=(const data_type& x) const { + return _impl != x._impl; + } +}; + +// FIXME: add missing types +extern data_type int_type; +extern data_type bigint_type; +extern data_type ascii_type; +extern data_type blob_type; +extern data_type varchar_type; +extern data_type text_type; + +struct column_definition { + sstring name; + data_type type; +}; + +struct column_family { + // primary key = paritition key + clustering_key + std::vector partition_key; + std::vector clustering_key; + std::vector column_defs; + std::unordered_map column_names; + // partition key -> partition + std::map partitions; +}; + +struct keyspace { + std::unordered_map column_families; +}; + +struct database { + std::unordered_map keyspaces; +}; + +#endif /* DATABASE_HH_ */ diff --git a/main.cc b/main.cc index 1836ae3e63..23fa7009c7 100644 --- a/main.cc +++ b/main.cc @@ -3,6 +3,7 @@ */ +#include "database.hh" #include "core/app-template.hh" #include "core/smp.hh" #include "thrift/server.hh" @@ -14,11 +15,12 @@ int main(int ac, char** av) { app.add_options() ("thrift-port", bpo::value()->default_value(9160), "Thrift port") ; auto server = std::make_unique>();; + database db; return app.run(ac, av, [&] { auto&& config = app.configuration(); uint16_t port = config["thrift-port"].as(); auto server = new distributed; - server->start().then([server = std::move(server), port] () mutable { + server->start(std::ref(db)).then([server = std::move(server), port] () mutable { server->invoke_on_all(&thrift_server::listen, ipv4_addr{port}); }).then([port] { std::cout << "Thrift server listening on port " << port << " ...\n"; diff --git a/thrift/handler.cc b/thrift/handler.cc index 626fe03864..029c354fe7 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -3,6 +3,9 @@ */ #include "Cassandra.h" +#include "database.hh" +#include "core/sstring.hh" +#include "core/print.hh" #include using namespace ::apache::thrift; @@ -24,15 +27,25 @@ void unimplemented(const tcxx::function cob, tcxx::function exn_cob, const AuthenticationRequest& auth_request) { // FIXME: implement return unimplemented(exn_cob); } void set_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { - // FIXME: implement - return unimplemented(exn_cob); + try { + _ks = &_db.keyspaces.at(keyspace); + cob(); + } catch (std::out_of_range& e) { + InvalidRequestException ire; + ire.why = sprint("keyspace %s does not exist", keyspace); + exn_cob(TDelayedException::delayException(ire)); + } } void get(tcxx::function cob, tcxx::function exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) { @@ -220,9 +233,27 @@ public: } void system_add_keyspace(tcxx::function cob, tcxx::function exn_cob, const KsDef& ks_def) { - std::string _return; - // FIXME: implement - return unimplemented(exn_cob); + std::string schema_id = "schema-id"; // FIXME: make meaningful + if (_db.keyspaces.count(ks_def.name)) { + InvalidRequestException ire; + ire.why = sprint("Keyspace %s already exists", ks_def.name); + exn_cob(TDelayedException::delayException(ire)); + } + keyspace& ks = _db.keyspaces[ks_def.name]; + for (const CfDef& cf_def : ks_def.cf_defs) { + column_family& cf = ks.column_families[cf_def.name]; + // FIXME: look at key_alias and key_validator first + cf.partition_key.push_back(column_definition{"key", blob_type}); + // FIXME: guess clustering keys + for (const ColumnDef& col_def : cf_def.column_metadata) { + // FIXME: look at all fields, not just name + cf.column_defs.push_back(column_definition{ + col_def.name, + blob_type, + }); + } + } + cob(schema_id); } void system_drop_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { @@ -280,16 +311,18 @@ public: } void set_cql_version(tcxx::function cob, tcxx::function exn_cob, const std::string& version) { - // FIXME: implement - return unimplemented(exn_cob); + _cql_version = version; + cob(); } }; class handler_factory : public CassandraCobSvIfFactory { + database& _db; public: + explicit handler_factory(database& db) : _db(db) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { - return new CassandraAsyncHandler; + return new CassandraAsyncHandler(_db); } virtual void releaseHandler(CassandraCobSvIf* handler) { delete handler; @@ -297,6 +330,6 @@ public: }; std::unique_ptr -create_handler_factory() { - return std::make_unique(); +create_handler_factory(database& db) { + return std::make_unique(db); } diff --git a/thrift/handler.hh b/thrift/handler.hh index fc5701dc7f..d17df1364e 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -6,8 +6,9 @@ #define APPS_SEASTAR_THRIFT_HANDLER_HH_ #include "Cassandra.h" +#include "database.hh" #include -std::unique_ptr create_handler_factory(); +std::unique_ptr create_handler_factory(database& db); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index eba062246d..edf2b64a17 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -36,9 +36,9 @@ public: thrift_stats(thrift_server& server); }; -thrift_server::thrift_server() +thrift_server::thrift_server(database& db) : _stats(new thrift_stats(*this)) - , _handler_factory(create_handler_factory().release()) + , _handler_factory(create_handler_factory(db).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) { } diff --git a/thrift/server.hh b/thrift/server.hh index 458fd3e66a..217a70c6e7 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -11,6 +11,7 @@ class thrift_server; class thrift_stats; +struct database; namespace org { namespace apache { namespace cassandra { @@ -41,7 +42,7 @@ class thrift_server { uint64_t _current_connections = 0; uint64_t _requests_served = 0; public: - thrift_server(); + thrift_server(database& db); future<> listen(ipv4_addr addr); void do_accepts(int which); class connection;