diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index f2592ced6e..e00bf5a2d9 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -88,7 +88,7 @@ query_processor::get_statement(const sstring_view& query, service::client_state& #if 0 Tracing.trace("Preparing statement"); #endif - return statement->prepare(_db); + return statement->prepare(_db.local()); } ::shared_ptr diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 7826cb676e..6e6084f8b0 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -32,15 +32,16 @@ #include "cql3/statements/cf_statement.hh" #include "service/query_state.hh" #include "log.hh" +#include "core/distributed.hh" namespace cql3 { class query_processor { private: service::storage_proxy& _proxy; - database& _db; + distributed& _db; public: - query_processor(service::storage_proxy& proxy, database& db) : _proxy(proxy), _db(db) {} + query_processor(service::storage_proxy& proxy, distributed& db) : _proxy(proxy), _db(db) {} #if 0 public static final SemanticVersion CQL_VERSION = new SemanticVersion("3.2.0"); diff --git a/database.cc b/database.cc index 881f608c3b..8844d6f90d 100644 --- a/database.cc +++ b/database.cc @@ -204,6 +204,13 @@ future database::populate(sstring datadir) { }); } +future<> +database::init_from_data_directory(sstring datadir) { + return populate(datadir).then([this] (database&& db) { + *this = db; + }); +} + column_definition::column_definition(bytes name, data_type type, column_id id, column_kind kind) : _name(std::move(name)) , type(std::move(type)) @@ -257,9 +264,9 @@ database::find_keyspace(const sstring& name) { } void -column_family::apply(mutation&& m) { - mutation_partition& p = find_or_create_partition(std::move(m.key)); - p.apply(_schema, std::move(m.p)); +column_family::apply(const mutation& m) { + mutation_partition& p = find_or_create_partition(m.key); + p.apply(_schema, m.p); } // Based on org.apache.cassandra.db.AbstractCell#reconcile() @@ -299,39 +306,39 @@ compare_for_merge(const column_definition& def, } void -mutation_partition::apply(schema_ptr schema, mutation_partition&& p) { +mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(schema, std::move(entry)); + apply_row_tombstone(schema, entry); } - auto merge_cells = [this, schema] (row& old_row, row&& new_row) { + auto merge_cells = [this, schema] (row& old_row, const row& new_row) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); if (i == old_row.end()) { - _static_row.emplace_hint(i, std::move(new_column)); + _static_row.emplace_hint(i, new_column); } else { auto& old_column = *i; auto& def = schema->regular_column_at(col); if (compare_for_merge(def, old_column, new_column) < 0) { - old_column.second = std::move(new_column.second); + old_column.second = new_column.second; } } } }; - merge_cells(_static_row, std::move(p._static_row)); + merge_cells(_static_row, p._static_row); for (auto&& entry : p._rows) { auto& key = entry.first; auto i = _rows.find(key); if (i == _rows.end()) { - _rows.emplace_hint(i, std::move(entry)); + _rows.emplace_hint(i, entry); } else { i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, std::move(entry.second.cells)); + merge_cells(i->second.cells, entry.second.cells); } } } @@ -388,3 +395,11 @@ bool column_definition::is_compact_value() const { unimplemented::compact_tables(); return false; } + +std::ostream& operator<<(std::ostream& os, const mutation& m) { + return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p); +} + +std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) { + return fprint(os, "{mutation_partition: ...}"); +} diff --git a/database.hh b/database.hh index 04395a8ccd..cbcd0279cf 100644 --- a/database.hh +++ b/database.hh @@ -162,13 +162,14 @@ public: apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); } void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); - void apply(schema_ptr schema, mutation_partition&& p); + void apply(schema_ptr schema, const mutation_partition& p); const row_tombstone_set& row_tombstones() const { return _row_tombstones; } row& static_row() { return _static_row; } row& clustered_row(const clustering_key& key) { return _rows[key].cells; } row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; } row* find_row(const clustering_key& key); tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key); + friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); }; class mutation final { @@ -184,7 +185,7 @@ public: { } mutation(mutation&&) = default; - mutation(const mutation&) = delete; + mutation(const mutation&) = default; void set_static_cell(const column_definition& def, boost::any value) { p.static_row()[def.id] = std::move(value); @@ -199,6 +200,7 @@ public: auto& row = p.clustered_row(key); row[def.id] = std::move(value); } + friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; struct column_family { @@ -210,7 +212,7 @@ struct column_family { schema_ptr _schema; // partition key -> partition std::map partitions; - void apply(mutation&& m); + void apply(const mutation& m); }; class keyspace { @@ -221,12 +223,20 @@ public: column_family* find_column_family(const sstring& cf_name); }; +// Policy for distributed: +// broadcast writes +// local reads + class database { public: std::unordered_map keyspaces; + future<> init_from_data_directory(sstring datadir); static future populate(sstring datadir); keyspace* find_keyspace(const sstring& name); + future<> stop() { return make_ready_future<>(); } + void assign(database&& db) { + *this = std::move(db); + } }; - #endif /* DATABASE_HH_ */ diff --git a/main.cc b/main.cc index 1dbef7b0c4..e86e40a16c 100644 --- a/main.cc +++ b/main.cc @@ -19,6 +19,7 @@ int main(int ac, char** av) { ("datadir", bpo::value()->default_value("/var/lib/cassandra/data"), "data directory"); auto server = std::make_unique>();; + distributed db; return app.run(ac, av, [&] { auto&& config = app.configuration(); @@ -26,16 +27,17 @@ int main(int ac, char** av) { uint16_t cql_port = config["cql-port"].as(); sstring datadir = config["datadir"].as(); - return database::populate(datadir).then([cql_port, thrift_port] (database db) { - auto pdb = new database(std::move(db)); + return db.start().then([datadir, &db] { + return db.invoke_on_all(&database::init_from_data_directory, datadir); + }).then([&db, cql_port, thrift_port] { auto cserver = new distributed; - cserver->start(std::ref(*pdb)).then([server = std::move(cserver), cql_port] () mutable { + cserver->start(std::ref(db)).then([server = std::move(cserver), cql_port] () mutable { server->invoke_on_all(&cql_server::listen, ipv4_addr{cql_port}); }).then([cql_port] { std::cout << "CQL server listening on port " << cql_port << " ...\n"; }); auto tserver = new distributed; - tserver->start(std::ref(*pdb)).then([server = std::move(tserver), thrift_port] () mutable { + tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port] () mutable { server->invoke_on_all(&thrift_server::listen, ipv4_addr{thrift_port}); }).then([thrift_port] { std::cout << "Thrift server listening on port " << thrift_port << " ...\n"; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 55c442e915..a079f12931 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -477,18 +477,20 @@ namespace service { future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { // FIXME: send it to replicas instead of applying locally - for (auto&& m : mutations) { - // FIXME: lookup column_family by UUID - keyspace* ks = _db.find_keyspace(m.schema->ks_name); - assert(ks); // FIXME: load keyspace meta-data from storage - column_family* cf = ks->find_column_family(m.schema->cf_name); - if (cf) { - cf->apply(std::move(m)); - } else { - // TODO: log a warning + auto pmut = make_lw_shared(std::move(mutations)); + return _db.invoke_on_all([pmut, cl] (database& db) { + for (auto&& m : *pmut) { + // FIXME: lookup column_family by UUID + keyspace* ks = db.find_keyspace(m.schema->ks_name); + assert(ks); // FIXME: load keyspace meta-data from storage + column_family* cf = ks->find_column_family(m.schema->cf_name); + if (cf) { + cf->apply(m); + } else { + // TODO: log a warning + } } - } - return make_ready_future<>(); + }); #if 0 Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 5b9f2e3eb2..3008c17a92 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -25,15 +25,16 @@ #pragma once #include "database.hh" +#include "core/distributed.hh" #include "db/consistency_level.hh" namespace service { class storage_proxy /*implements StorageProxyMBean*/ { private: - database& _db; + distributed& _db; public: - storage_proxy(database& db) : _db(db) {} + storage_proxy(distributed& db) : _db(db) {} /** * Use this method to have these Mutations applied diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index cdb7f8ff8e..97e134043b 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -2,11 +2,10 @@ * Copyright 2015 Cloudius Systems */ -#include #include "cql3/query_processor.hh" #include "cql3/query_options.hh" -#include "tests/test-utils.hh" -#include "core/future-util.hh" +#include "core/distributed.hh" +#include "core/app-template.hh" struct conversation_state { service::storage_proxy proxy; @@ -15,7 +14,7 @@ struct conversation_state { service::query_state query_state; cql3::query_options& options; - conversation_state(database& db, const sstring& ks_name) + conversation_state(distributed& db, const sstring& ks_name) : proxy(db) , qp(proxy, db) , client_state(service::client_state::for_internal_calls()) @@ -33,46 +32,48 @@ struct conversation_state { static const sstring ks_name = "ks"; static const sstring table_name = "cf"; -static void require_column_has_value(database& db, const sstring& ks_name, const sstring& table_name, +static void require_column_has_value(distributed& ddb, const sstring& ks_name, const sstring& table_name, std::vector pk, std::vector ck, const sstring& column_name, boost::any expected) { + auto& db = ddb.local(); auto ks = db.find_keyspace(ks_name); - BOOST_REQUIRE(ks != nullptr); + assert(ks != nullptr); auto cf = ks->find_column_family(table_name); - BOOST_REQUIRE(cf != nullptr); + assert(cf != nullptr); auto schema = cf->_schema; auto p = cf->find_partition(schema->partition_key_type->serialize_value_deep(pk)); - BOOST_REQUIRE(p != nullptr); + assert(p != nullptr); auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); - BOOST_REQUIRE(row != nullptr); + assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); - BOOST_REQUIRE(col_def != nullptr); + assert(col_def != nullptr); auto i = row->find(col_def->id); if (i == row->end()) { - BOOST_FAIL("column not set"); + assert(((void)"column not set", 0)); } auto& cell = boost::any_cast(i->second); - BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); + assert(cell.is_live()); + assert(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); } -SEASTAR_TEST_CASE(test_insert_statement) { - auto db = make_shared(); - - // CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1)); - keyspace ks; - auto cf_schema = make_lw_shared(ks_name, table_name, - std::vector({{"p1", utf8_type}}), - std::vector({{"c1", int32_type}}), - std::vector({{"r1", int32_type}}), - utf8_type - ); - ks.column_families.emplace(table_name, column_family(cf_schema)); - db->keyspaces.emplace(ks_name, std::move(ks)); - +future<> test_insert_statement() { + auto db = make_shared>(); auto state = make_shared(*db, ks_name); - return now().then([state, db] { + // CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1)); + return db->start().then([db] { + return db->invoke_on_all([] (database& db) { + keyspace ks; + auto cf_schema = make_lw_shared(ks_name, table_name, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + ks.column_families.emplace(table_name, column_family(cf_schema)); + db.keyspaces.emplace(ks_name, std::move(ks)); + }); + }).then([state, db] { return state->execute_cql("insert into cf (p1, c1, r1) values ('key1', 1, 100);"); }).then([state, db] { require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 100); @@ -80,5 +81,14 @@ SEASTAR_TEST_CASE(test_insert_statement) { return state->execute_cql("update cf set r1 = 66 where p1 = 'key1' and c1 = 1;"); }).then([state, db] { require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 66); - }); + }).then([db] { + return db->stop(); + }).then([db] { + // keep db alive until stop(), above + engine().exit(0); + }); +} + +int main(int ac, char** av) { + return app_template().run(ac, av, test_insert_statement); } diff --git a/thrift/handler.cc b/thrift/handler.cc index 2d650c54c5..fa9ce8ae2e 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -2,7 +2,13 @@ * Copyright 2014 Cloudius Systems */ +// Some thrift headers include other files from within namespaces, +// which is totally broken. Include those files here to avoid +// breakage: +#include +// end thrift workaround #include "Cassandra.h" +#include "core/distributed.hh" #include "database.hh" #include "core/sstring.hh" #include "core/print.hh" @@ -38,12 +44,34 @@ void complete_with_exception(tcxx::function(fmt, std::forward(args)...))); } +class delayed_exception_wrapper : public ::apache::thrift::TDelayedException { + std::exception_ptr _ex; +public: + delayed_exception_wrapper(std::exception_ptr ex) : _ex(std::move(ex)) {} + virtual void throw_it() override { + std::rethrow_exception(std::move(_ex)); + } +}; + +template +void complete(future& fut, + const tcxx::function& cob, + const tcxx::function& exn_cob) { + try { + apply(std::move(cob), fut.get()); + } catch (...) { + delayed_exception_wrapper dew(std::current_exception()); + exn_cob(&dew); + } +} + class CassandraAsyncHandler : public CassandraCobSvIf { - database& _db; + distributed& _db; keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection? + sstring _ks_name; sstring _cql_version; public: - explicit CassandraAsyncHandler(database& db) : _db(db) {} + explicit CassandraAsyncHandler(distributed& db) : _db(db) {} void login(tcxx::function cob, tcxx::function exn_cob, const AuthenticationRequest& auth_request) { // FIXME: implement return unimplemented(exn_cob); @@ -51,7 +79,8 @@ public: void set_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { try { - _ks = &_db.keyspaces.at(keyspace); + _ks = &_db.local().keyspaces.at(keyspace); + _ks_name = keyspace; cob(); } catch (std::out_of_range& e) { return complete_with_exception(std::move(exn_cob), @@ -183,61 +212,68 @@ public: return complete_with_exception(std::move(exn_cob), "keyspace not set"); } static bytes null_clustering_key = to_bytes(""); - try { - for (auto&& key_cf : mutation_map) { - bytes key = to_bytes(key_cf.first); - const std::map>& cf_mutations_map = key_cf.second; - for (auto&& cf_mutations : cf_mutations_map) { - sstring cf_name = cf_mutations.first; - const std::vector& mutations = cf_mutations.second; - auto& cf = lookup_column_family(cf_name); - mutation m_to_apply(key, cf._schema); - for (const Mutation& m : mutations) { - if (m.__isset.column_or_supercolumn) { - auto&& cosc = m.column_or_supercolumn; - if (cosc.__isset.column) { - auto&& col = cosc.column; - bytes cname = to_bytes(col.name); - auto def = cf._schema->get_column_definition(cname); - if (!def) { - throw make_exception("column %s not found", col.name); - } - if (def->kind != column_definition::column_kind::REGULAR) { - throw make_exception("Column %s is not settable", col.name); - } - gc_clock::duration ttl; - if (col.__isset.ttl) { - ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); - } - if (ttl.count() <= 0) { - ttl = cf._schema->default_time_to_live; - } - auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); - m_to_apply.set_clustered_cell(null_clustering_key, *def, - atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); - } else if (cosc.__isset.super_column) { - // FIXME: implement - } else if (cosc.__isset.counter_column) { - // FIXME: implement - } else if (cosc.__isset.counter_super_column) { - // FIXME: implement - } else { - throw make_exception("Empty ColumnOrSuperColumn"); + // Would like to use move_iterator below, but Mutation is filled with some const stuff. + parallel_for_each(mutation_map.begin(), mutation_map.end(), + [this] (std::pair>> key_cf) { + bytes key = to_bytes(key_cf.first); + std::map>& cf_mutations_map = key_cf.second; + return parallel_for_each( + boost::make_move_iterator(cf_mutations_map.begin()), + boost::make_move_iterator(cf_mutations_map.end()), + [this, key] (std::pair> cf_mutations) { + sstring cf_name = cf_mutations.first; + const std::vector& mutations = cf_mutations.second; + auto& cf = lookup_column_family(cf_name); + mutation m_to_apply(key, cf._schema); + for (const Mutation& m : mutations) { + if (m.__isset.column_or_supercolumn) { + auto&& cosc = m.column_or_supercolumn; + if (cosc.__isset.column) { + auto&& col = cosc.column; + bytes cname = to_bytes(col.name); + auto def = cf._schema->get_column_definition(cname); + if (!def) { + throw make_exception("column %s not found", col.name); } - } else if (m.__isset.deletion) { + if (def->kind != column_definition::column_kind::REGULAR) { + throw make_exception("Column %s is not settable", col.name); + } + gc_clock::duration ttl; + if (col.__isset.ttl) { + ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); + } + if (ttl.count() <= 0) { + ttl = cf._schema->default_time_to_live; + } + auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); + m_to_apply.set_clustered_cell(null_clustering_key, *def, + atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); + } else if (cosc.__isset.super_column) { + // FIXME: implement + } else if (cosc.__isset.counter_column) { + // FIXME: implement + } else if (cosc.__isset.counter_super_column) { // FIXME: implement - abort(); } else { - throw make_exception("Mutation must have either column or deletion"); + throw make_exception("Empty ColumnOrSuperColumn"); } + } else if (m.__isset.deletion) { + // FIXME: implement + abort(); + } else { + throw make_exception("Mutation must have either column or deletion"); } - cf.apply(std::move(m_to_apply)); } - } - } catch (std::exception& ex) { - return exn_cob(TDelayedException::delayException(ex)); - } - cob(); + return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { + auto& ks = db.keyspaces.at(_ks_name); + auto& cf = ks.column_families.at(cf_name); + cf.apply(std::move(m_to_apply)); + }); + }); + }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { + complete(ret, cob, exn_cob); + return make_ready_future<>(); + }); } void atomic_batch_mutate(tcxx::function cob, tcxx::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { @@ -348,31 +384,37 @@ public: void system_add_keyspace(tcxx::function cob, tcxx::function exn_cob, const KsDef& ks_def) { std::string schema_id = "schema-id"; // FIXME: make meaningful - if (_db.keyspaces.count(ks_def.name)) { + if (_db.local().keyspaces.count(ks_def.name)) { InvalidRequestException ire; ire.why = sprint("Keyspace %s already exists", ks_def.name); exn_cob(TDelayedException::delayException(ire)); } - keyspace& ks = _db.keyspaces[ks_def.name]; - for (const CfDef& cf_def : ks_def.cf_defs) { - std::vector partition_key; - std::vector clustering_key; - std::vector regular_columns; - // FIXME: get this from comparator - auto column_name_type = utf8_type; - // FIXME: look at key_alias and key_validator first - partition_key.push_back({"key", bytes_type}); - // FIXME: guess clustering keys - for (const ColumnDef& col_def : cf_def.column_metadata) { - // FIXME: look at all fields, not just name - regular_columns.push_back({to_bytes(col_def.name), bytes_type}); + _db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) { + keyspace& ks = db.keyspaces[ks_def.name]; + for (const CfDef& cf_def : ks_def.cf_defs) { + std::vector partition_key; + std::vector clustering_key; + std::vector regular_columns; + // FIXME: get this from comparator + auto column_name_type = utf8_type; + // FIXME: look at key_alias and key_validator first + partition_key.push_back({"key", bytes_type}); + // FIXME: guess clustering keys + for (const ColumnDef& col_def : cf_def.column_metadata) { + // FIXME: look at all fields, not just name + regular_columns.push_back({to_bytes(col_def.name), bytes_type}); + } + auto s = make_lw_shared(ks_def.name, cf_def.name, + std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type); + column_family cf(s); + ks.column_families.emplace(cf_def.name, std::move(cf)); } - auto s = make_lw_shared(ks_def.name, cf_def.name, - std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type); - column_family cf(s); - ks.column_families.emplace(cf_def.name, std::move(cf)); - } - cob(schema_id); + }).then([schema_id = std::move(schema_id)] { + return make_ready_future(std::move(schema_id)); + }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future result) { + complete(result, cob, exn_cob); + return make_ready_future<>(); + }); } void system_drop_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { @@ -449,9 +491,9 @@ private: }; class handler_factory : public CassandraCobSvIfFactory { - database& _db; + distributed& _db; public: - explicit handler_factory(database& db) : _db(db) {} + explicit handler_factory(distributed& db) : _db(db) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { return new CassandraAsyncHandler(_db); @@ -462,6 +504,6 @@ public: }; std::unique_ptr -create_handler_factory(database& db) { +create_handler_factory(distributed& db) { return std::make_unique(db); } diff --git a/thrift/handler.hh b/thrift/handler.hh index d17df1364e..bb9971cffd 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -7,8 +7,9 @@ #include "Cassandra.h" #include "database.hh" +#include "core/distributed.hh" #include -std::unique_ptr create_handler_factory(database& db); +std::unique_ptr create_handler_factory(distributed& db); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index ed8651e35b..1a6f5f4dbd 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -36,7 +36,7 @@ public: thrift_stats(thrift_server& server); }; -thrift_server::thrift_server(database& db) +thrift_server::thrift_server(distributed& db) : _stats(new thrift_stats(*this)) , _handler_factory(create_handler_factory(db).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) diff --git a/thrift/server.hh b/thrift/server.hh index 583a1ce5d8..c5632a452e 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -6,6 +6,7 @@ #define APPS_SEASTAR_THRIFT_SERVER_HH_ #include "core/reactor.hh" +#include "core/distributed.hh" #include #include @@ -42,7 +43,7 @@ class thrift_server { uint64_t _current_connections = 0; uint64_t _requests_served = 0; public: - thrift_server(database& db); + thrift_server(distributed& db); future<> listen(ipv4_addr addr); void do_accepts(int which); class connection; diff --git a/transport/server.cc b/transport/server.cc index 604887b588..004b323164 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -230,7 +230,7 @@ private: sstring make_frame(uint8_t version, size_t length); }; -cql_server::cql_server(database& db) +cql_server::cql_server(distributed& db) : _proxy(db) , _query_processor(_proxy, db) { diff --git a/transport/server.hh b/transport/server.hh index 38a5bde512..d820f8db27 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -8,6 +8,7 @@ #include "core/reactor.hh" #include "service/storage_proxy.hh" #include "cql3/query_processor.hh" +#include "core/distributed.hh" class database; @@ -16,7 +17,7 @@ class cql_server { service::storage_proxy _proxy; cql3::query_processor _query_processor; public: - cql_server(database& db); + cql_server(distributed& db); future<> listen(ipv4_addr addr); void do_accepts(int which); private: diff --git a/types.cc b/types.cc index 6fc68eeb97..3b55bb038d 100644 --- a/types.cc +++ b/types.cc @@ -5,6 +5,7 @@ #include #include "cql3/cql3_type.hh" #include "types.hh" +#include "core/print.hh" template struct simple_type_traits { @@ -396,6 +397,10 @@ struct uuid_type_impl : abstract_type { } }; +std::ostream& operator<<(std::ostream& os, const bytes& b) { + return os << to_hex(b); +} + thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii", cql3::native_cql3_type::ascii)); diff --git a/types.hh b/types.hh index 3606fbc768..ab0186e55e 100644 --- a/types.hh +++ b/types.hh @@ -31,6 +31,8 @@ using sstring_view = std::experimental::string_view; sstring to_hex(const bytes& b); sstring to_hex(const bytes_opt& b); +std::ostream& operator<<(std::ostream& os, const bytes& b); + using object_opt = std::experimental::optional; class marshal_exception : public std::exception {