From 96a93a2d8cc1dac7569cdcb40695b71d5484e48c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 18 Feb 2015 13:12:49 +0200 Subject: [PATCH 1/8] thrift: add workaround for compile breakage due to thrift code generator --- thrift/handler.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/thrift/handler.cc b/thrift/handler.cc index 2d650c54c5..8e095ac175 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -2,6 +2,11 @@ * Copyright 2014 Cloudius Systems */ +// Some thrift headers include other files from within namespaces, +// which is totally broken. Include those files here to avoid +// breakage: +#include +// end thrift workaround #include "Cassandra.h" #include "database.hh" #include "core/sstring.hh" From 93818692e1d1c83f8174d2ad068a8ffe31da8c8c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 18 Feb 2015 13:14:01 +0200 Subject: [PATCH 2/8] thrift: add adapter from futures to thrift completion objects Futures hold either a value or an exception; thrift uses two separate function objects to signal completion, one for success, the other for an exception. Add a helper to pass the result of a future to either of these. --- thrift/handler.cc | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/thrift/handler.cc b/thrift/handler.cc index 8e095ac175..a26f668f2a 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -43,6 +43,27 @@ void complete_with_exception(tcxx::function(fmt, std::forward(args)...))); } +class delayed_exception_wrapper : public ::apache::thrift::TDelayedException { + std::exception_ptr _ex; +public: + delayed_exception_wrapper(std::exception_ptr ex) : _ex(std::move(ex)) {} + virtual void throw_it() override { + std::rethrow_exception(std::move(_ex)); + } +}; + +template +void complete(future& fut, + const tcxx::function& cob, + const tcxx::function& exn_cob) { + try { + apply(std::move(cob), fut.get()); + } catch (...) { + delayed_exception_wrapper dew(std::current_exception()); + exn_cob(&dew); + } +} + class CassandraAsyncHandler : public CassandraCobSvIf { database& _db; keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection? From 3ec83658f3e8c9c039ae47fe94565a6257ea1e91 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 19 Feb 2015 15:36:10 +0200 Subject: [PATCH 3/8] thrift: store the keyspace name in set_keyspace() The keyspace pointer is only valid for the local shard. --- thrift/handler.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/thrift/handler.cc b/thrift/handler.cc index a26f668f2a..4d7724feac 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -67,6 +67,7 @@ void complete(future& fut, class CassandraAsyncHandler : public CassandraCobSvIf { database& _db; keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection? + sstring _ks_name; sstring _cql_version; public: explicit CassandraAsyncHandler(database& db) : _db(db) {} @@ -78,6 +79,7 @@ public: void set_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { try { _ks = &_db.keyspaces.at(keyspace); + _ks_name = keyspace; cob(); } catch (std::out_of_range& e) { return complete_with_exception(std::move(exn_cob), From a2519926a618e2919f718ee89345b68f1673dddf Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 19 Feb 2015 15:26:10 +0200 Subject: [PATCH 4/8] db: add some iostream output operators Helps debugging --- database.cc | 8 ++++++++ database.hh | 2 ++ types.cc | 5 +++++ types.hh | 2 ++ 4 files changed, 17 insertions(+) diff --git a/database.cc b/database.cc index 881f608c3b..a62d8ed10a 100644 --- a/database.cc +++ b/database.cc @@ -388,3 +388,11 @@ bool column_definition::is_compact_value() const { unimplemented::compact_tables(); return false; } + +std::ostream& operator<<(std::ostream& os, const mutation& m) { + return fprint(os, "{mutation: schema %p key %s data %s}", m.schema.get(), m.key, m.p); +} + +std::ostream& operator<<(std::ostream& os, const mutation_partition& mp) { + return fprint(os, "{mutation_partition: ...}"); +} diff --git a/database.hh b/database.hh index 04395a8ccd..e613bbb9ff 100644 --- a/database.hh +++ b/database.hh @@ -169,6 +169,7 @@ public: row& clustered_row(clustering_key&& key) { return _rows[std::move(key)].cells; } row* find_row(const clustering_key& key); tombstone tombstone_for_row(schema_ptr schema, const clustering_key& key); + friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); }; class mutation final { @@ -199,6 +200,7 @@ public: auto& row = p.clustered_row(key); row[def.id] = std::move(value); } + friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; struct column_family { diff --git a/types.cc b/types.cc index 6fc68eeb97..3b55bb038d 100644 --- a/types.cc +++ b/types.cc @@ -5,6 +5,7 @@ #include #include "cql3/cql3_type.hh" #include "types.hh" +#include "core/print.hh" template struct simple_type_traits { @@ -396,6 +397,10 @@ struct uuid_type_impl : abstract_type { } }; +std::ostream& operator<<(std::ostream& os, const bytes& b) { + return os << to_hex(b); +} + thread_local shared_ptr int32_type(make_shared()); thread_local shared_ptr long_type(make_shared()); thread_local shared_ptr ascii_type(make_shared("ascii", cql3::native_cql3_type::ascii)); diff --git a/types.hh b/types.hh index 3606fbc768..ab0186e55e 100644 --- a/types.hh +++ b/types.hh @@ -31,6 +31,8 @@ using sstring_view = std::experimental::string_view; sstring to_hex(const bytes& b); sstring to_hex(const bytes_opt& b); +std::ostream& operator<<(std::ostream& os, const bytes& b); + using object_opt = std::experimental::optional; class marshal_exception : public std::exception { From 8f9f794a730f870abeecef5930c0586d1ca6de41 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 19 Feb 2015 15:28:30 +0200 Subject: [PATCH 5/8] db: make column_family::apply(mutation) not steal the contents With replication, we want the contents of the mutation to be available to multiple replicas. (In this context, we will replicate the mutation to all shards in the same node, as a temporary step in sharding a node; but the issue also occurs when replicating to other nodes). --- database.cc | 22 +++++++++++----------- database.hh | 6 +++--- service/storage_proxy.cc | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/database.cc b/database.cc index a62d8ed10a..4dfd610e98 100644 --- a/database.cc +++ b/database.cc @@ -257,9 +257,9 @@ database::find_keyspace(const sstring& name) { } void -column_family::apply(mutation&& m) { - mutation_partition& p = find_or_create_partition(std::move(m.key)); - p.apply(_schema, std::move(m.p)); +column_family::apply(const mutation& m) { + mutation_partition& p = find_or_create_partition(m.key); + p.apply(_schema, m.p); } // Based on org.apache.cassandra.db.AbstractCell#reconcile() @@ -299,39 +299,39 @@ compare_for_merge(const column_definition& def, } void -mutation_partition::apply(schema_ptr schema, mutation_partition&& p) { +mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); for (auto&& entry : p._row_tombstones) { - apply_row_tombstone(schema, std::move(entry)); + apply_row_tombstone(schema, entry); } - auto merge_cells = [this, schema] (row& old_row, row&& new_row) { + auto merge_cells = [this, schema] (row& old_row, const row& new_row) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); if (i == old_row.end()) { - _static_row.emplace_hint(i, std::move(new_column)); + _static_row.emplace_hint(i, new_column); } else { auto& old_column = *i; auto& def = schema->regular_column_at(col); if (compare_for_merge(def, old_column, new_column) < 0) { - old_column.second = std::move(new_column.second); + old_column.second = new_column.second; } } } }; - merge_cells(_static_row, std::move(p._static_row)); + merge_cells(_static_row, p._static_row); for (auto&& entry : p._rows) { auto& key = entry.first; auto i = _rows.find(key); if (i == _rows.end()) { - _rows.emplace_hint(i, std::move(entry)); + _rows.emplace_hint(i, entry); } else { i->second.t.apply(entry.second.t); - merge_cells(i->second.cells, std::move(entry.second.cells)); + merge_cells(i->second.cells, entry.second.cells); } } } diff --git a/database.hh b/database.hh index e613bbb9ff..fb874372d6 100644 --- a/database.hh +++ b/database.hh @@ -162,7 +162,7 @@ public: apply_row_tombstone(schema, {std::move(prefix), std::move(t)}); } void apply_row_tombstone(schema_ptr schema, std::pair row_tombstone); - void apply(schema_ptr schema, mutation_partition&& p); + void apply(schema_ptr schema, const mutation_partition& p); const row_tombstone_set& row_tombstones() const { return _row_tombstones; } row& static_row() { return _static_row; } row& clustered_row(const clustering_key& key) { return _rows[key].cells; } @@ -185,7 +185,7 @@ public: { } mutation(mutation&&) = default; - mutation(const mutation&) = delete; + mutation(const mutation&) = default; void set_static_cell(const column_definition& def, boost::any value) { p.static_row()[def.id] = std::move(value); @@ -212,7 +212,7 @@ struct column_family { schema_ptr _schema; // partition key -> partition std::map partitions; - void apply(mutation&& m); + void apply(const mutation& m); }; class keyspace { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 55c442e915..78069b1aed 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -483,7 +483,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) assert(ks); // FIXME: load keyspace meta-data from storage column_family* cf = ks->find_column_family(m.schema->cf_name); if (cf) { - cf->apply(std::move(m)); + cf->apply(m); } else { // TODO: log a warning } From e8096ff2bb2f86225df8f13386d38aee80e94332 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 19 Feb 2015 15:34:05 +0200 Subject: [PATCH 6/8] db: add database::stop() Required by distributed<>'s contract. --- database.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database.hh b/database.hh index fb874372d6..f4b1381f41 100644 --- a/database.hh +++ b/database.hh @@ -228,7 +228,7 @@ public: std::unordered_map keyspaces; static future populate(sstring datadir); keyspace* find_keyspace(const sstring& name); + future<> stop() { return make_ready_future<>(); } }; - #endif /* DATABASE_HH_ */ From 70381a6da5895799e145f1b7b1cf52d6845d2f59 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 18 Feb 2015 13:16:14 +0200 Subject: [PATCH 7/8] db: distribute database object s/database/distributed/ everywhere. Use simple distribution rules: writes are broadcast, reads are local. This causes tremendous data duplication, but will change soon. --- cql3/query_processor.cc | 2 +- cql3/query_processor.hh | 5 +- database.cc | 7 +++ database.hh | 8 ++++ main.cc | 10 ++-- service/storage_proxy.cc | 24 +++++----- service/storage_proxy.hh | 5 +- tests/urchin/cql_query_test.cc | 68 +++++++++++++++------------ thrift/handler.cc | 86 ++++++++++++++++++++-------------- thrift/handler.hh | 3 +- thrift/server.cc | 2 +- thrift/server.hh | 3 +- transport/server.cc | 2 +- transport/server.hh | 3 +- 14 files changed, 140 insertions(+), 88 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index f2592ced6e..e00bf5a2d9 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -88,7 +88,7 @@ query_processor::get_statement(const sstring_view& query, service::client_state& #if 0 Tracing.trace("Preparing statement"); #endif - return statement->prepare(_db); + return statement->prepare(_db.local()); } ::shared_ptr diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 7826cb676e..6e6084f8b0 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -32,15 +32,16 @@ #include "cql3/statements/cf_statement.hh" #include "service/query_state.hh" #include "log.hh" +#include "core/distributed.hh" namespace cql3 { class query_processor { private: service::storage_proxy& _proxy; - database& _db; + distributed& _db; public: - query_processor(service::storage_proxy& proxy, database& db) : _proxy(proxy), _db(db) {} + query_processor(service::storage_proxy& proxy, distributed& db) : _proxy(proxy), _db(db) {} #if 0 public static final SemanticVersion CQL_VERSION = new SemanticVersion("3.2.0"); diff --git a/database.cc b/database.cc index 4dfd610e98..8844d6f90d 100644 --- a/database.cc +++ b/database.cc @@ -204,6 +204,13 @@ future database::populate(sstring datadir) { }); } +future<> +database::init_from_data_directory(sstring datadir) { + return populate(datadir).then([this] (database&& db) { + *this = db; + }); +} + column_definition::column_definition(bytes name, data_type type, column_id id, column_kind kind) : _name(std::move(name)) , type(std::move(type)) diff --git a/database.hh b/database.hh index f4b1381f41..cbcd0279cf 100644 --- a/database.hh +++ b/database.hh @@ -223,12 +223,20 @@ public: column_family* find_column_family(const sstring& cf_name); }; +// Policy for distributed: +// broadcast writes +// local reads + class database { public: std::unordered_map keyspaces; + future<> init_from_data_directory(sstring datadir); static future populate(sstring datadir); keyspace* find_keyspace(const sstring& name); future<> stop() { return make_ready_future<>(); } + void assign(database&& db) { + *this = std::move(db); + } }; #endif /* DATABASE_HH_ */ diff --git a/main.cc b/main.cc index 1dbef7b0c4..e86e40a16c 100644 --- a/main.cc +++ b/main.cc @@ -19,6 +19,7 @@ int main(int ac, char** av) { ("datadir", bpo::value()->default_value("/var/lib/cassandra/data"), "data directory"); auto server = std::make_unique>();; + distributed db; return app.run(ac, av, [&] { auto&& config = app.configuration(); @@ -26,16 +27,17 @@ int main(int ac, char** av) { uint16_t cql_port = config["cql-port"].as(); sstring datadir = config["datadir"].as(); - return database::populate(datadir).then([cql_port, thrift_port] (database db) { - auto pdb = new database(std::move(db)); + return db.start().then([datadir, &db] { + return db.invoke_on_all(&database::init_from_data_directory, datadir); + }).then([&db, cql_port, thrift_port] { auto cserver = new distributed; - cserver->start(std::ref(*pdb)).then([server = std::move(cserver), cql_port] () mutable { + cserver->start(std::ref(db)).then([server = std::move(cserver), cql_port] () mutable { server->invoke_on_all(&cql_server::listen, ipv4_addr{cql_port}); }).then([cql_port] { std::cout << "CQL server listening on port " << cql_port << " ...\n"; }); auto tserver = new distributed; - tserver->start(std::ref(*pdb)).then([server = std::move(tserver), thrift_port] () mutable { + tserver->start(std::ref(db)).then([server = std::move(tserver), thrift_port] () mutable { server->invoke_on_all(&thrift_server::listen, ipv4_addr{thrift_port}); }).then([thrift_port] { std::cout << "Thrift server listening on port " << thrift_port << " ...\n"; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 78069b1aed..a079f12931 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -477,18 +477,20 @@ namespace service { future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { // FIXME: send it to replicas instead of applying locally - for (auto&& m : mutations) { - // FIXME: lookup column_family by UUID - keyspace* ks = _db.find_keyspace(m.schema->ks_name); - assert(ks); // FIXME: load keyspace meta-data from storage - column_family* cf = ks->find_column_family(m.schema->cf_name); - if (cf) { - cf->apply(m); - } else { - // TODO: log a warning + auto pmut = make_lw_shared(std::move(mutations)); + return _db.invoke_on_all([pmut, cl] (database& db) { + for (auto&& m : *pmut) { + // FIXME: lookup column_family by UUID + keyspace* ks = db.find_keyspace(m.schema->ks_name); + assert(ks); // FIXME: load keyspace meta-data from storage + column_family* cf = ks->find_column_family(m.schema->cf_name); + if (cf) { + cf->apply(m); + } else { + // TODO: log a warning + } } - } - return make_ready_future<>(); + }); #if 0 Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 5b9f2e3eb2..3008c17a92 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -25,15 +25,16 @@ #pragma once #include "database.hh" +#include "core/distributed.hh" #include "db/consistency_level.hh" namespace service { class storage_proxy /*implements StorageProxyMBean*/ { private: - database& _db; + distributed& _db; public: - storage_proxy(database& db) : _db(db) {} + storage_proxy(distributed& db) : _db(db) {} /** * Use this method to have these Mutations applied diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index cdb7f8ff8e..97e134043b 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -2,11 +2,10 @@ * Copyright 2015 Cloudius Systems */ -#include #include "cql3/query_processor.hh" #include "cql3/query_options.hh" -#include "tests/test-utils.hh" -#include "core/future-util.hh" +#include "core/distributed.hh" +#include "core/app-template.hh" struct conversation_state { service::storage_proxy proxy; @@ -15,7 +14,7 @@ struct conversation_state { service::query_state query_state; cql3::query_options& options; - conversation_state(database& db, const sstring& ks_name) + conversation_state(distributed& db, const sstring& ks_name) : proxy(db) , qp(proxy, db) , client_state(service::client_state::for_internal_calls()) @@ -33,46 +32,48 @@ struct conversation_state { static const sstring ks_name = "ks"; static const sstring table_name = "cf"; -static void require_column_has_value(database& db, const sstring& ks_name, const sstring& table_name, +static void require_column_has_value(distributed& ddb, const sstring& ks_name, const sstring& table_name, std::vector pk, std::vector ck, const sstring& column_name, boost::any expected) { + auto& db = ddb.local(); auto ks = db.find_keyspace(ks_name); - BOOST_REQUIRE(ks != nullptr); + assert(ks != nullptr); auto cf = ks->find_column_family(table_name); - BOOST_REQUIRE(cf != nullptr); + assert(cf != nullptr); auto schema = cf->_schema; auto p = cf->find_partition(schema->partition_key_type->serialize_value_deep(pk)); - BOOST_REQUIRE(p != nullptr); + assert(p != nullptr); auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); - BOOST_REQUIRE(row != nullptr); + assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); - BOOST_REQUIRE(col_def != nullptr); + assert(col_def != nullptr); auto i = row->find(col_def->id); if (i == row->end()) { - BOOST_FAIL("column not set"); + assert(((void)"column not set", 0)); } auto& cell = boost::any_cast(i->second); - BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); + assert(cell.is_live()); + assert(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); } -SEASTAR_TEST_CASE(test_insert_statement) { - auto db = make_shared(); - - // CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1)); - keyspace ks; - auto cf_schema = make_lw_shared(ks_name, table_name, - std::vector({{"p1", utf8_type}}), - std::vector({{"c1", int32_type}}), - std::vector({{"r1", int32_type}}), - utf8_type - ); - ks.column_families.emplace(table_name, column_family(cf_schema)); - db->keyspaces.emplace(ks_name, std::move(ks)); - +future<> test_insert_statement() { + auto db = make_shared>(); auto state = make_shared(*db, ks_name); - return now().then([state, db] { + // CQL: create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1)); + return db->start().then([db] { + return db->invoke_on_all([] (database& db) { + keyspace ks; + auto cf_schema = make_lw_shared(ks_name, table_name, + std::vector({{"p1", utf8_type}}), + std::vector({{"c1", int32_type}}), + std::vector({{"r1", int32_type}}), + utf8_type + ); + ks.column_families.emplace(table_name, column_family(cf_schema)); + db.keyspaces.emplace(ks_name, std::move(ks)); + }); + }).then([state, db] { return state->execute_cql("insert into cf (p1, c1, r1) values ('key1', 1, 100);"); }).then([state, db] { require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 100); @@ -80,5 +81,14 @@ SEASTAR_TEST_CASE(test_insert_statement) { return state->execute_cql("update cf set r1 = 66 where p1 = 'key1' and c1 = 1;"); }).then([state, db] { require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 66); - }); + }).then([db] { + return db->stop(); + }).then([db] { + // keep db alive until stop(), above + engine().exit(0); + }); +} + +int main(int ac, char** av) { + return app_template().run(ac, av, test_insert_statement); } diff --git a/thrift/handler.cc b/thrift/handler.cc index 4d7724feac..9aae8d71a2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -8,6 +8,7 @@ #include // end thrift workaround #include "Cassandra.h" +#include "core/distributed.hh" #include "database.hh" #include "core/sstring.hh" #include "core/print.hh" @@ -65,12 +66,12 @@ void complete(future& fut, } class CassandraAsyncHandler : public CassandraCobSvIf { - database& _db; + distributed& _db; keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection? sstring _ks_name; sstring _cql_version; public: - explicit CassandraAsyncHandler(database& db) : _db(db) {} + explicit CassandraAsyncHandler(distributed& db) : _db(db) {} void login(tcxx::function cob, tcxx::function exn_cob, const AuthenticationRequest& auth_request) { // FIXME: implement return unimplemented(exn_cob); @@ -78,7 +79,7 @@ public: void set_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { try { - _ks = &_db.keyspaces.at(keyspace); + _ks = &_db.local().keyspaces.at(keyspace); _ks_name = keyspace; cob(); } catch (std::out_of_range& e) { @@ -212,10 +213,15 @@ public: } static bytes null_clustering_key = to_bytes(""); try { - for (auto&& key_cf : mutation_map) { + // Would like to use move_iterator below, but Mutation is filled with some const stuff. + parallel_for_each(mutation_map.begin(), mutation_map.end(), + [this] (std::pair>> key_cf) { bytes key = to_bytes(key_cf.first); - const std::map>& cf_mutations_map = key_cf.second; - for (auto&& cf_mutations : cf_mutations_map) { + std::map>& cf_mutations_map = key_cf.second; + return parallel_for_each( + boost::make_move_iterator(cf_mutations_map.begin()), + boost::make_move_iterator(cf_mutations_map.end()), + [this, key] (std::pair> cf_mutations) { sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; auto& cf = lookup_column_family(cf_name); @@ -259,13 +265,19 @@ public: throw make_exception("Mutation must have either column or deletion"); } } - cf.apply(std::move(m_to_apply)); - } - } + return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { + auto& ks = db.keyspaces.at(_ks_name); + auto& cf = ks.column_families.at(cf_name); + cf.apply(std::move(m_to_apply)); + }); + }); + }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { + complete(ret, cob, exn_cob); + return make_ready_future<>(); + }); } catch (std::exception& ex) { - return exn_cob(TDelayedException::delayException(ex)); + abort(); } - cob(); } void atomic_batch_mutate(tcxx::function cob, tcxx::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { @@ -376,31 +388,37 @@ public: void system_add_keyspace(tcxx::function cob, tcxx::function exn_cob, const KsDef& ks_def) { std::string schema_id = "schema-id"; // FIXME: make meaningful - if (_db.keyspaces.count(ks_def.name)) { + if (_db.local().keyspaces.count(ks_def.name)) { InvalidRequestException ire; ire.why = sprint("Keyspace %s already exists", ks_def.name); exn_cob(TDelayedException::delayException(ire)); } - keyspace& ks = _db.keyspaces[ks_def.name]; - for (const CfDef& cf_def : ks_def.cf_defs) { - std::vector partition_key; - std::vector clustering_key; - std::vector regular_columns; - // FIXME: get this from comparator - auto column_name_type = utf8_type; - // FIXME: look at key_alias and key_validator first - partition_key.push_back({"key", bytes_type}); - // FIXME: guess clustering keys - for (const ColumnDef& col_def : cf_def.column_metadata) { - // FIXME: look at all fields, not just name - regular_columns.push_back({to_bytes(col_def.name), bytes_type}); + _db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) { + keyspace& ks = db.keyspaces[ks_def.name]; + for (const CfDef& cf_def : ks_def.cf_defs) { + std::vector partition_key; + std::vector clustering_key; + std::vector regular_columns; + // FIXME: get this from comparator + auto column_name_type = utf8_type; + // FIXME: look at key_alias and key_validator first + partition_key.push_back({"key", bytes_type}); + // FIXME: guess clustering keys + for (const ColumnDef& col_def : cf_def.column_metadata) { + // FIXME: look at all fields, not just name + regular_columns.push_back({to_bytes(col_def.name), bytes_type}); + } + auto s = make_lw_shared(ks_def.name, cf_def.name, + std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type); + column_family cf(s); + ks.column_families.emplace(cf_def.name, std::move(cf)); } - auto s = make_lw_shared(ks_def.name, cf_def.name, - std::move(partition_key), std::move(clustering_key), std::move(regular_columns), column_name_type); - column_family cf(s); - ks.column_families.emplace(cf_def.name, std::move(cf)); - } - cob(schema_id); + }).then([schema_id = std::move(schema_id)] { + return make_ready_future(std::move(schema_id)); + }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future result) { + complete(result, cob, exn_cob); + return make_ready_future<>(); + }); } void system_drop_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { @@ -477,9 +495,9 @@ private: }; class handler_factory : public CassandraCobSvIfFactory { - database& _db; + distributed& _db; public: - explicit handler_factory(database& db) : _db(db) {} + explicit handler_factory(distributed& db) : _db(db) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { return new CassandraAsyncHandler(_db); @@ -490,6 +508,6 @@ public: }; std::unique_ptr -create_handler_factory(database& db) { +create_handler_factory(distributed& db) { return std::make_unique(db); } diff --git a/thrift/handler.hh b/thrift/handler.hh index d17df1364e..bb9971cffd 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -7,8 +7,9 @@ #include "Cassandra.h" #include "database.hh" +#include "core/distributed.hh" #include -std::unique_ptr create_handler_factory(database& db); +std::unique_ptr create_handler_factory(distributed& db); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index ed8651e35b..1a6f5f4dbd 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -36,7 +36,7 @@ public: thrift_stats(thrift_server& server); }; -thrift_server::thrift_server(database& db) +thrift_server::thrift_server(distributed& db) : _stats(new thrift_stats(*this)) , _handler_factory(create_handler_factory(db).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) diff --git a/thrift/server.hh b/thrift/server.hh index 583a1ce5d8..c5632a452e 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -6,6 +6,7 @@ #define APPS_SEASTAR_THRIFT_SERVER_HH_ #include "core/reactor.hh" +#include "core/distributed.hh" #include #include @@ -42,7 +43,7 @@ class thrift_server { uint64_t _current_connections = 0; uint64_t _requests_served = 0; public: - thrift_server(database& db); + thrift_server(distributed& db); future<> listen(ipv4_addr addr); void do_accepts(int which); class connection; diff --git a/transport/server.cc b/transport/server.cc index 604887b588..004b323164 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -230,7 +230,7 @@ private: sstring make_frame(uint8_t version, size_t length); }; -cql_server::cql_server(database& db) +cql_server::cql_server(distributed& db) : _proxy(db) , _query_processor(_proxy, db) { diff --git a/transport/server.hh b/transport/server.hh index 38a5bde512..d820f8db27 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -8,6 +8,7 @@ #include "core/reactor.hh" #include "service/storage_proxy.hh" #include "cql3/query_processor.hh" +#include "core/distributed.hh" class database; @@ -16,7 +17,7 @@ class cql_server { service::storage_proxy _proxy; cql3::query_processor _query_processor; public: - cql_server(database& db); + cql_server(distributed& db); future<> listen(ipv4_addr addr); void do_accepts(int which); private: From cb63d16b400703f2adc9b40a4b7226491e537208 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 19 Feb 2015 18:00:03 +0200 Subject: [PATCH 8/8] thrift: get rid of useless try/catch Exceptions are now handled with then_wrapped(), nothing is left to catch. --- thrift/handler.cc | 114 ++++++++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/thrift/handler.cc b/thrift/handler.cc index 9aae8d71a2..fa9ce8ae2e 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -212,72 +212,68 @@ public: return complete_with_exception(std::move(exn_cob), "keyspace not set"); } static bytes null_clustering_key = to_bytes(""); - try { - // Would like to use move_iterator below, but Mutation is filled with some const stuff. - parallel_for_each(mutation_map.begin(), mutation_map.end(), - [this] (std::pair>> key_cf) { - bytes key = to_bytes(key_cf.first); - std::map>& cf_mutations_map = key_cf.second; - return parallel_for_each( - boost::make_move_iterator(cf_mutations_map.begin()), - boost::make_move_iterator(cf_mutations_map.end()), - [this, key] (std::pair> cf_mutations) { - sstring cf_name = cf_mutations.first; - const std::vector& mutations = cf_mutations.second; - auto& cf = lookup_column_family(cf_name); - mutation m_to_apply(key, cf._schema); - for (const Mutation& m : mutations) { - if (m.__isset.column_or_supercolumn) { - auto&& cosc = m.column_or_supercolumn; - if (cosc.__isset.column) { - auto&& col = cosc.column; - bytes cname = to_bytes(col.name); - auto def = cf._schema->get_column_definition(cname); - if (!def) { - throw make_exception("column %s not found", col.name); - } - if (def->kind != column_definition::column_kind::REGULAR) { - throw make_exception("Column %s is not settable", col.name); - } - gc_clock::duration ttl; - if (col.__isset.ttl) { - ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); - } - if (ttl.count() <= 0) { - ttl = cf._schema->default_time_to_live; - } - auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); - m_to_apply.set_clustered_cell(null_clustering_key, *def, - atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); - } else if (cosc.__isset.super_column) { - // FIXME: implement - } else if (cosc.__isset.counter_column) { - // FIXME: implement - } else if (cosc.__isset.counter_super_column) { - // FIXME: implement - } else { - throw make_exception("Empty ColumnOrSuperColumn"); + // Would like to use move_iterator below, but Mutation is filled with some const stuff. + parallel_for_each(mutation_map.begin(), mutation_map.end(), + [this] (std::pair>> key_cf) { + bytes key = to_bytes(key_cf.first); + std::map>& cf_mutations_map = key_cf.second; + return parallel_for_each( + boost::make_move_iterator(cf_mutations_map.begin()), + boost::make_move_iterator(cf_mutations_map.end()), + [this, key] (std::pair> cf_mutations) { + sstring cf_name = cf_mutations.first; + const std::vector& mutations = cf_mutations.second; + auto& cf = lookup_column_family(cf_name); + mutation m_to_apply(key, cf._schema); + for (const Mutation& m : mutations) { + if (m.__isset.column_or_supercolumn) { + auto&& cosc = m.column_or_supercolumn; + if (cosc.__isset.column) { + auto&& col = cosc.column; + bytes cname = to_bytes(col.name); + auto def = cf._schema->get_column_definition(cname); + if (!def) { + throw make_exception("column %s not found", col.name); } - } else if (m.__isset.deletion) { + if (def->kind != column_definition::column_kind::REGULAR) { + throw make_exception("Column %s is not settable", col.name); + } + gc_clock::duration ttl; + if (col.__isset.ttl) { + ttl = std::chrono::duration_cast(std::chrono::seconds(col.ttl)); + } + if (ttl.count() <= 0) { + ttl = cf._schema->default_time_to_live; + } + auto ttl_option = ttl.count() > 0 ? ttl_opt(gc_clock::now() + ttl) : ttl_opt(); + m_to_apply.set_clustered_cell(null_clustering_key, *def, + atomic_cell{col.timestamp, atomic_cell::live{ttl_option, to_bytes(col.value)}}); + } else if (cosc.__isset.super_column) { + // FIXME: implement + } else if (cosc.__isset.counter_column) { + // FIXME: implement + } else if (cosc.__isset.counter_super_column) { // FIXME: implement - abort(); } else { - throw make_exception("Mutation must have either column or deletion"); + throw make_exception("Empty ColumnOrSuperColumn"); } + } else if (m.__isset.deletion) { + // FIXME: implement + abort(); + } else { + throw make_exception("Mutation must have either column or deletion"); } - return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { - auto& ks = db.keyspaces.at(_ks_name); - auto& cf = ks.column_families.at(cf_name); - cf.apply(std::move(m_to_apply)); - }); + } + return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { + auto& ks = db.keyspaces.at(_ks_name); + auto& cf = ks.column_families.at(cf_name); + cf.apply(std::move(m_to_apply)); }); - }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { - complete(ret, cob, exn_cob); - return make_ready_future<>(); }); - } catch (std::exception& ex) { - abort(); - } + }).then_wrapped([this, cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> ret) { + complete(ret, cob, exn_cob); + return make_ready_future<>(); + }); } void atomic_batch_mutate(tcxx::function cob, tcxx::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) {