diff --git a/database.cc b/database.cc index 8844d6f90d..7c11b273dd 100644 --- a/database.cc +++ b/database.cc @@ -211,6 +211,14 @@ database::init_from_data_directory(sstring datadir) { }); } +unsigned +database::shard_of(const dht::token& t) { + if (t._data.empty()) { + return 0; + } + return uint8_t(t._data[0]) % smp::count; +} + column_definition::column_definition(bytes name, data_type type, column_id id, column_kind kind) : _name(std::move(name)) , type(std::move(type)) diff --git a/database.hh b/database.hh index cbcd0279cf..0824843297 100644 --- a/database.hh +++ b/database.hh @@ -5,6 +5,7 @@ #ifndef DATABASE_HH_ #define DATABASE_HH_ +#include "dht/i_partitioner.hh" #include "core/sstring.hh" #include "core/shared_ptr.hh" #include "net/byteorder.hh" @@ -224,8 +225,9 @@ public: }; // Policy for distributed: -// broadcast writes -// local reads +// broadcast metadata writes +// local metadata reads +// use shard_of() for data class database { public: @@ -237,6 +239,7 @@ public: void assign(database&& db) { *this = std::move(db); } + unsigned shard_of(const dht::token& t); }; #endif /* DATABASE_HH_ */ diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index a079f12931..d89a4d816b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -478,8 +478,10 @@ future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { // FIXME: send it to replicas instead of applying locally auto pmut = make_lw_shared(std::move(mutations)); - return _db.invoke_on_all([pmut, cl] (database& db) { - for (auto&& m : *pmut) { + return parallel_for_each(pmut->begin(), pmut->end(), [this, pmut] (const mutation& m) { + auto dk = dht::global_partitioner().decorate_key(m.key); + auto shard = _db.local().shard_of(dk._token); + return _db.invoke_on(shard, [&m, pmut] (database& db) -> void { // FIXME: lookup column_family by UUID keyspace* ks = db.find_keyspace(m.schema->ks_name); assert(ks); // FIXME: load keyspace meta-data from storage @@ -489,7 +491,7 @@ storage_proxy::mutate(std::vector mutations, db::consistency_level cl) } else { // TODO: log a warning } - } + }); }); #if 0 Tracing.trace("Determining replicas for mutation"); diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index 97e134043b..00bfff5488 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -32,7 +32,7 @@ struct conversation_state { static const sstring ks_name = "ks"; static const sstring table_name = "cf"; -static void require_column_has_value(distributed& ddb, const sstring& ks_name, const sstring& table_name, +static future<> require_column_has_value(distributed& ddb, const sstring& ks_name, const sstring& table_name, std::vector pk, std::vector ck, const sstring& column_name, boost::any expected) { auto& db = ddb.local(); @@ -41,19 +41,34 @@ static void require_column_has_value(distributed& ddb, const sstring& auto cf = ks->find_column_family(table_name); assert(cf != nullptr); auto schema = cf->_schema; - auto p = cf->find_partition(schema->partition_key_type->serialize_value_deep(pk)); - assert(p != nullptr); - auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); - assert(row != nullptr); - auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); - assert(col_def != nullptr); - auto i = row->find(col_def->id); - if (i == row->end()) { - assert(((void)"column not set", 0)); - } - auto& cell = boost::any_cast(i->second); - assert(cell.is_live()); - assert(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); + auto pkey = schema->partition_key_type->serialize_value_deep(pk); + auto dk = dht::global_partitioner().decorate_key(pkey); + auto shard = db.shard_of(dk._token); + return ddb.invoke_on(shard, [pkey = std::move(pkey), + ck = std::move(ck), + ks_name = std::move(ks_name), + column_name = std::move(column_name), + expected = std::move(expected), + table_name = std::move(table_name)] (database& db) { + auto ks = db.find_keyspace(ks_name); + assert(ks != nullptr); + auto cf = ks->find_column_family(table_name); + assert(cf != nullptr); + auto schema = cf->_schema; + auto p = cf->find_partition(pkey); + assert(p != nullptr); + auto row = p->find_row(schema->clustering_key_type->serialize_value_deep(ck)); + assert(row != nullptr); + auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); + assert(col_def != nullptr); + auto i = row->find(col_def->id); + if (i == row->end()) { + assert(((void)"column not set", 0)); + } + auto& cell = boost::any_cast(i->second); + assert(cell.is_live()); + assert(col_def->type->equal(cell.as_live().value, col_def->type->decompose(expected))); + }); } future<> test_insert_statement() { @@ -76,11 +91,11 @@ future<> test_insert_statement() { }).then([state, db] { return state->execute_cql("insert into cf (p1, c1, r1) values ('key1', 1, 100);"); }).then([state, db] { - require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 100); + return require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 100); }).then([state, db] { return state->execute_cql("update cf set r1 = 66 where p1 = 'key1' and c1 = 1;"); }).then([state, db] { - require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 66); + return require_column_has_value(*db, ks_name, table_name, {sstring("key1")}, {1}, "r1", 66); }).then([db] { return db->stop(); }).then([db] { diff --git a/thrift/handler.cc b/thrift/handler.cc index 6d20a544cf..38551dbc34 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -107,13 +107,18 @@ public: } void get_slice(tcxx::function const& _return)> cob, tcxx::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { - try { + auto keyb = to_bytes(key); + auto do_get = [this, + key = std::move(key), + column_parent = std::move(column_parent), + predicate = std::move(predicate)] (database& db) { std::vector ret; auto keyb = to_bytes(key); if (!column_parent.super_column.empty()) { throw unimplemented_exception(); } - auto& cf = lookup_column_family(column_parent.column_family); + auto& ks = lookup_keyspace(db, _ks_name); + auto& cf = lookup_column_family(ks, column_parent.column_family); if (predicate.__isset.column_names) { throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { @@ -147,15 +152,20 @@ public: } } } + return make_foreign(make_lw_shared(std::move(ret))); } else { throw make_exception("empty SlicePredicate"); } - cob(std::move(ret)); - } catch (InvalidRequestException& ex) { - exn_cob(TDelayedException::delayException(ex)); - } catch (std::exception& ex) { - exn_cob(TDelayedException::delayException(ex)); - } + }; + auto dk = dht::global_partitioner().decorate_key(keyb); + auto shard = _db.local().shard_of(dk._token); + _db.invoke_on(shard, [do_get = std::move(do_get)] (database& db) { + return do_get(db); + }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] + (future>>> ret) { + complete(ret, cob, exn_cob); + return make_ready_future<>(); + }); } void get_count(tcxx::function cob, tcxx::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { @@ -235,7 +245,7 @@ public: [this, key] (std::pair> cf_mutations) { sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; - auto& cf = lookup_column_family(cf_name); + auto& cf = lookup_column_family(*_ks, cf_name); mutation m_to_apply(key, cf._schema); for (const Mutation& m : mutations) { if (m.__isset.column_or_supercolumn) { @@ -276,7 +286,9 @@ public: throw make_exception("Mutation must have either column or deletion"); } } - return _db.invoke_on_all([this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { + auto dk = dht::global_partitioner().decorate_key(m_to_apply.key); + auto shard = _db.local().shard_of(dk._token); + return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { auto& ks = db.keyspaces.at(_ks_name); auto& cf = ks.column_families.at(cf_name); cf.apply(std::move(m_to_apply)); @@ -493,13 +505,20 @@ public: } private: - column_family& lookup_column_family(const sstring& cf_name) { + static column_family& lookup_column_family(keyspace& ks, const sstring& cf_name) { try { - return _ks->column_families.at(cf_name); + return ks.column_families.at(cf_name); } catch (std::out_of_range&) { throw make_exception("column family %s not found", cf_name); } } + keyspace& lookup_keyspace(database& db, const sstring ks_name) { + try { + return db.keyspaces.at(ks_name); + } catch (std::out_of_range&) { + throw make_exception("Keyspace %s not found", ks_name); + } + } }; class handler_factory : public CassandraCobSvIfFactory {