diff --git a/database.cc b/database.cc index 15d22b3e04..7037ace2c1 100644 --- a/database.cc +++ b/database.cc @@ -8,7 +8,7 @@ #include "core/future-util.hh" #include "db/system_keyspace.hh" #include "db/consistency_level.hh" - +#include "utils/UUID_gen.hh" #include "cql3/column_identifier.hh" #include #include @@ -304,30 +304,9 @@ future<> column_family::populate(sstring sstdir) { }); } -future<> keyspace::populate(sstring ksdir) { - return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir] (directory_entry de) { - auto comps = parse_fname(de.name); - if (comps.size() != 2) { - dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name); - return make_ready_future<>(); - } - sstring cfname = comps[0]; - - auto sstdir = ksdir + "/" + de.name; - if (column_families.count(cfname) != 0) { - dblog.info("Keyspace {}: Reading CF {} ", ksdir, comps[0]); - - // FIXME: Increase parallelism. - return column_families.at(cfname).populate(sstdir); - } else { - dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]); - return make_ready_future<>(); - } - }); -} database::database() { - keyspaces.emplace("system", db::system_keyspace::make()); + db::system_keyspace::make(*this); } future<> database::populate(sstring datadir) { @@ -335,12 +314,31 @@ future<> database::populate(sstring datadir) { auto& ks_name = de.name; auto ksdir = datadir + "/" + de.name; - auto i = keyspaces.find(ks_name); - if (i == keyspaces.end()) { + auto i = _keyspaces.find(ks_name); + if (i == _keyspaces.end()) { dblog.warn("Skipping undefined keyspace: {}", ks_name); } else { dblog.warn("Populating Keyspace {}", ks_name); - return i->second.populate(ksdir); + return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir, ks_name] (directory_entry de) { + auto comps = parse_fname(de.name); + if (comps.size() != 2) { + dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name); + return make_ready_future<>(); + } + sstring cfname = comps[0]; + + auto sstdir = ksdir + "/" + de.name; + + try { + auto& cf = find_column_family(ks_name, cfname); + dblog.info("Keyspace {}: Reading CF {} ", ksdir, cfname); + // FIXME: Increase parallelism. + return cf.populate(sstdir); + } catch (no_such_column_family&) { + dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]); + return make_ready_future<>(); + } + }); } return make_ready_future<>(); }); @@ -384,49 +382,115 @@ column_definition::name() const { return _name; } -column_family* -keyspace::find_column_family(const sstring& cf_name) { - auto i = column_families.find(cf_name); - if (i == column_families.end()) { - return nullptr; +void database::add_keyspace(sstring name, keyspace k) { + if (_keyspaces.count(name) != 0) { + throw std::invalid_argument("Keyspace " + name + " already exists"); } - return &i->second; + _keyspaces.emplace(std::move(name), std::move(k)); } -schema_ptr -keyspace::find_schema(const sstring& cf_name) { - auto cf = find_column_family(cf_name); - if (!cf) { - return {}; +void database::add_column_family(const utils::UUID& uuid, column_family cf) { + if (_keyspaces.count(cf._schema->ks_name) == 0) { + throw std::invalid_argument("Keyspace " + cf._schema->ks_name + " not defined"); } - return cf->_schema; + if (_column_families.count(uuid) != 0) { + throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped"); + } + auto kscf = std::make_pair(cf._schema->ks_name, cf._schema->cf_name); + if (_ks_cf_to_uuid.count(kscf) != 0) { + throw std::invalid_argument("Column family " + cf._schema->cf_name + " exists"); + } + _column_families.emplace(uuid, std::move(cf)); + _ks_cf_to_uuid.emplace(std::move(kscf), uuid); } -schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) { - auto ks = find_keyspace(ks_name); - if (!ks) { - return {}; - } - - return ks->find_schema(cf_name); +void database::add_column_family(column_family cf) { + add_column_family(utils::UUID_gen::get_time_UUID(), std::move(cf)); } -keyspace* -database::find_keyspace(const sstring& name) { - auto i = keyspaces.find(name); - if (i != keyspaces.end()) { - return &i->second; +const utils::UUID& database::find_uuid(sstring ks, sstring cf) const throw (std::out_of_range) { + return _ks_cf_to_uuid.at(std::make_pair(ks, cf)); +} + +const utils::UUID& database::find_uuid(schema_ptr schema) const throw (std::out_of_range) { + return find_uuid(schema->ks_name, schema->cf_name); +} + +keyspace& database::find_keyspace(const sstring& name) throw (no_such_keyspace) { + try { + return _keyspaces.at(name); + } catch (...) { + std::throw_with_nested(no_such_keyspace(name)); } - return nullptr; +} + +const keyspace& database::find_keyspace(const sstring& name) const throw (no_such_keyspace) { + try { + return _keyspaces.at(name); + } catch (...) { + std::throw_with_nested(no_such_keyspace(name)); + } +} + +bool database::has_keyspace(const sstring& name) const { + return _keyspaces.count(name) != 0; +} + +column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) throw (no_such_column_family) { + try { + return find_column_family(find_uuid(ks_name, cf_name)); + } catch (...) { + std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name)); + } +} + +const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) { + try { + return find_column_family(find_uuid(ks_name, cf_name)); + } catch (...) { + std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name)); + } +} + +column_family& database::find_column_family(const utils::UUID& uuid) throw (no_such_column_family) { + try { + return _column_families.at(uuid); + } catch (...) { + std::throw_with_nested(no_such_column_family(uuid.to_sstring())); + } +} + +const column_family& database::find_column_family(const utils::UUID& uuid) const throw (no_such_column_family) { + try { + return _column_families.at(uuid); + } catch (...) { + std::throw_with_nested(no_such_column_family(uuid.to_sstring())); + } +} + +column_family& database::find_column_family(schema_ptr schema) throw (no_such_column_family) { + return find_column_family(schema->ks_name, schema->cf_name); +} + +const column_family& database::find_column_family(schema_ptr schema) const throw (no_such_column_family) { + return find_column_family(schema->ks_name, schema->cf_name); +} + +schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) { + return find_schema(find_uuid(ks_name, cf_name)); +} + +schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_column_family) { + return find_column_family(uuid)._schema; } keyspace& database::find_or_create_keyspace(const sstring& name) { - auto i = keyspaces.find(name); - if (i != keyspaces.end()) { + auto i = _keyspaces.find(name); + if (i != _keyspaces.end()) { return i->second; } - return keyspaces.emplace(name, keyspace()).first->second; + return _keyspaces.emplace(name, keyspace()).first->second; } void @@ -868,18 +932,13 @@ database::query(const query::read_command& cmd) { return make_ready_future>(make_lw_shared(query::result())); }; - auto ks = find_keyspace(cmd.keyspace); - if (!ks) { + try { + auto& cf = find_column_family(cmd.keyspace, cmd.column_family); + return cf.query(cmd); + } catch (...) { // FIXME: load from sstables return make_empty(); } - - auto cf = ks->find_column_family(cmd.column_family); - if (!cf) { - return make_empty(); - } - - return cf->query(cmd); } namespace db { diff --git a/database.hh b/database.hh index 773b10ca18..ebb435738c 100644 --- a/database.hh +++ b/database.hh @@ -10,6 +10,7 @@ #include "core/shared_ptr.hh" #include "net/byteorder.hh" #include "utils/UUID.hh" +#include "utils/hash.hh" #include "db_clock.hh" #include "gc_clock.hh" #include @@ -251,10 +252,17 @@ private: class keyspace { public: - std::unordered_map column_families; - future<> populate(sstring datadir); - schema_ptr find_schema(const sstring& cf_name); - column_family* find_column_family(const sstring& cf_name); + // empty right now. placeholder for metadata(?) +}; + +class no_such_keyspace : public std::runtime_error { +public: + using runtime_error::runtime_error; +}; + +class no_such_column_family : public std::runtime_error { +public: + using runtime_error::runtime_error; }; // Policy for distributed: @@ -263,14 +271,37 @@ public: // use shard_of() for data class database { + std::unordered_map _keyspaces; + std::unordered_map _column_families; + std::unordered_map, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid; public: database(); - std::unordered_map keyspaces; + future<> init_from_data_directory(sstring datadir); future<> populate(sstring datadir); - keyspace* find_keyspace(const sstring& name); + + void add_keyspace(sstring name, keyspace k); + /** Adds cf with auto-generated UUID. */ + void add_column_family(column_family); + void add_column_family(const utils::UUID&, column_family); + + /* throws std::out_of_range if missing */ + const utils::UUID& find_uuid(sstring ks, sstring cf) const throw (std::out_of_range); + const utils::UUID& find_uuid(schema_ptr) const throw (std::out_of_range); + + /* below, find* throws no_such_ on fail */ keyspace& find_or_create_keyspace(const sstring& name); - schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name); + keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace); + const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace); + bool has_keyspace(const sstring& name) const; + column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family); + const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family); + column_family& find_column_family(const utils::UUID&) throw (no_such_column_family); + const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family); + column_family& find_column_family(schema_ptr) throw (no_such_column_family); + const column_family& find_column_family(schema_ptr) const throw (no_such_column_family); + schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family); + schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family); future<> stop() { return make_ready_future<>(); } void assign(database&& db) { *this = std::move(db); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index cc3fe4d009..f07badc17a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1001,12 +1001,12 @@ std::vector all_tables() { return r; } -keyspace make() { +void make(database& db) { keyspace ks; + db.add_keyspace("system", std::move(ks)); for (auto&& table : all_tables()) { - ks.column_families.emplace(table->cf_name, column_family(table)); + db.add_column_family(column_family(table)); } - return ks; } } // namespace system_keyspace diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index ed87e946f1..c7fa6be4de 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -49,7 +49,7 @@ extern schema_ptr batchlog(); extern schema_ptr built_indexes(); // TODO (from Cassandra): make private std::vector all_tables(); -keyspace make(); +void make(database& db); #if 0 diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 163b79db50..e5d7eebfd1 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -474,13 +474,12 @@ storage_proxy::mutate_locally(std::vector mutations) { auto shard = _db.local().shard_of(dk._token); return _db.invoke_on(shard, [&m, pmut] (database& db) -> void { // FIXME: lookup column_family by UUID - keyspace* ks = db.find_keyspace(m.schema->ks_name); - assert(ks); // FIXME: load keyspace meta-data from storage - column_family* cf = ks->find_column_family(m.schema->cf_name); - if (cf) { - cf->apply(m); - } else { + try { + auto& cf = db.find_column_family(m.schema); + cf.apply(m); + } catch (no_such_column_family&) { // TODO: log a warning + // FIXME: load keyspace meta-data from storage } }); }); diff --git a/tests/urchin/cql_test_env.hh b/tests/urchin/cql_test_env.hh index 1f228fdb7d..6c298790a2 100644 --- a/tests/urchin/cql_test_env.hh +++ b/tests/urchin/cql_test_env.hh @@ -76,8 +76,8 @@ public: future<> create_table(SchemaMaker schema_maker) { return _db->invoke_on_all([schema_maker, this] (database& db) { auto cf_schema = make_lw_shared(schema_maker(ks_name)); - auto& ks = db.find_or_create_keyspace(ks_name); - ks.column_families.emplace(cf_schema->cf_name, column_family(cf_schema)); + db.find_or_create_keyspace(ks_name); + db.add_column_family(column_family(cf_schema)); }); } @@ -87,11 +87,8 @@ public: const sstring& column_name, boost::any expected) { auto& db = _db->local(); - auto ks = db.find_keyspace(ks_name); - assert(ks != nullptr); - auto cf = ks->find_column_family(table_name); - assert(cf != nullptr); - auto schema = cf->_schema; + auto& cf = db.find_column_family(ks_name, table_name); + auto schema = cf._schema; auto pkey = partition_key::from_deeply_exploded(*schema, pk); auto dk = dht::global_partitioner().decorate_key(pkey); auto shard = db.shard_of(dk._token); @@ -101,12 +98,9 @@ public: column_name = std::move(column_name), expected = std::move(expected), table_name = std::move(table_name)] (database& db) { - auto ks = db.find_keyspace(ks_name); - assert(ks != nullptr); - auto cf = ks->find_column_family(table_name); - assert(cf != nullptr); - auto schema = cf->_schema; - auto p = cf->find_partition(pkey); + auto& cf = db.find_column_family(ks_name, table_name); + auto schema = cf._schema; + auto p = cf.find_partition(pkey); assert(p != nullptr); auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck)); assert(row != nullptr); diff --git a/thrift/handler.cc b/thrift/handler.cc index a01475705c..cffe84400a 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -79,7 +79,6 @@ void complete(future>>& fut, class CassandraAsyncHandler : public CassandraCobSvIf { distributed& _db; - keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection? sstring _ks_name; sstring _cql_version; public: @@ -91,7 +90,6 @@ public: void set_keyspace(tcxx::function cob, tcxx::function exn_cob, const std::string& keyspace) { try { - _ks = &_db.local().keyspaces.at(keyspace); _ks_name = keyspace; cob(); } catch (std::out_of_range& e) { @@ -107,9 +105,10 @@ public: } void get_slice(tcxx::function const& _return)> cob, tcxx::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { - auto& ks = lookup_keyspace(_db.local(), _ks_name); - auto schema = ks.find_schema(column_parent.column_family); - if (!_ks) { + schema_ptr schema; + try { + schema = _db.local().find_schema(_ks_name, column_parent.column_family); + } catch (...) { return complete_with_exception(std::move(exn_cob), "column family %s not found", column_parent.column_family); } auto pk = key_from_thrift(schema, to_bytes(key)); @@ -124,8 +123,7 @@ public: if (!column_parent.super_column.empty()) { throw unimplemented_exception(); } - auto& ks = lookup_keyspace(db, _ks_name); - auto& cf = lookup_column_family(ks, column_parent.column_family); + auto& cf = lookup_column_family(_db.local(), _ks_name, column_parent.column_family); if (predicate.__isset.column_names) { throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { @@ -235,7 +233,7 @@ public: } void batch_mutate(tcxx::function cob, tcxx::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { - if (!_ks) { + if (_ks_name.empty()) { return complete_with_exception(std::move(exn_cob), "keyspace not set"); } // Would like to use move_iterator below, but Mutation is filled with some const stuff. @@ -249,7 +247,7 @@ public: [this, thrift_key] (std::pair> cf_mutations) { sstring cf_name = cf_mutations.first; const std::vector& mutations = cf_mutations.second; - auto& cf = lookup_column_family(*_ks, cf_name); + auto& cf = lookup_column_family(_db.local(), _ks_name, cf_name); mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema); auto empty_clustering_key = clustering_key::make_empty(*cf._schema); for (const Mutation& m : mutations) { @@ -294,8 +292,7 @@ public: auto dk = dht::global_partitioner().decorate_key(m_to_apply.key); auto shard = _db.local().shard_of(dk._token); return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) { - auto& ks = db.keyspaces.at(_ks_name); - auto& cf = ks.column_families.at(cf_name); + auto& cf = db.find_column_family(_ks_name, cf_name); cf.apply(m_to_apply); }); }); @@ -413,13 +410,13 @@ public: void system_add_keyspace(tcxx::function cob, tcxx::function exn_cob, const KsDef& ks_def) { std::string schema_id = "schema-id"; // FIXME: make meaningful - if (_db.local().keyspaces.count(ks_def.name)) { + if (_db.local().has_keyspace(ks_def.name)) { InvalidRequestException ire; ire.why = sprint("Keyspace %s already exists", ks_def.name); exn_cob(TDelayedException::delayException(ire)); } _db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) { - keyspace& ks = db.keyspaces[ks_def.name]; + db.add_keyspace(ks_def.name, keyspace()); for (const CfDef& cf_def : ks_def.cf_defs) { std::vector partition_key; std::vector clustering_key; @@ -437,7 +434,7 @@ public: std::move(partition_key), std::move(clustering_key), std::move(regular_columns), std::vector(), column_name_type); column_family cf(s); - ks.column_families.emplace(cf_def.name, std::move(cf)); + db.add_column_family(std::move(cf)); } }).then([schema_id = std::move(schema_id)] { return make_ready_future(std::move(schema_id)); @@ -511,20 +508,13 @@ public: } private: - static column_family& lookup_column_family(keyspace& ks, const sstring& cf_name) { + static column_family& lookup_column_family(database& db, const sstring& ks_name, const sstring& cf_name) { try { - return ks.column_families.at(cf_name); + return db.find_column_family(ks_name, cf_name); } catch (std::out_of_range&) { throw make_exception("column family %s not found", cf_name); } } - static keyspace& lookup_keyspace(database& db, const sstring& ks_name) { - try { - return db.keyspaces.at(ks_name); - } catch (std::out_of_range&) { - throw make_exception("Keyspace %s not found", ks_name); - } - } static partition_key key_from_thrift(schema_ptr s, bytes k) { if (s->partition_key_size() != 1) { fail(unimplemented::cause::THRIFT); diff --git a/validation.cc b/validation.cc index 29b76446ad..b0c46b269e 100644 --- a/validation.cc +++ b/validation.cc @@ -58,8 +58,9 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring throw exceptions::invalid_request_exception("Keyspace not set"); } - keyspace* ks = db.find_keyspace(keyspace_name); - if (!ks) { + try { + db.find_keyspace(keyspace_name); + } catch (...) { throw exceptions::keyspace_not_defined_exception(sprint("Keyspace %s does not exist", keyspace_name)); } @@ -67,12 +68,11 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring throw exceptions::invalid_request_exception("non-empty table is required"); } - auto schema = ks->find_schema(cf_name); - if (!schema) { + try { + return db.find_schema(keyspace_name, cf_name); + } catch (...) { throw exceptions::invalid_request_exception(sprint("unconfigured table %s", cf_name)); } - - return schema; } }