Refactor db/keyspace/column_family toplogy

* database now holds all keyspace + column family object
* column families are mapped by uuid, either generated or explicit
* lookup by name tuples or uuid
* finder functions now return refs + throws on missing obj
This commit is contained in:
Calle Wilund
2015-03-24 17:06:38 +01:00
parent 70936e5391
commit d3fe0c5182
8 changed files with 195 additions and 122 deletions

View File

@@ -8,7 +8,7 @@
#include "core/future-util.hh"
#include "db/system_keyspace.hh"
#include "db/consistency_level.hh"
#include "utils/UUID_gen.hh"
#include "cql3/column_identifier.hh"
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
@@ -304,30 +304,9 @@ future<> column_family::populate(sstring sstdir) {
});
}
future<> keyspace::populate(sstring ksdir) {
return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir] (directory_entry de) {
auto comps = parse_fname(de.name);
if (comps.size() != 2) {
dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name);
return make_ready_future<>();
}
sstring cfname = comps[0];
auto sstdir = ksdir + "/" + de.name;
if (column_families.count(cfname) != 0) {
dblog.info("Keyspace {}: Reading CF {} ", ksdir, comps[0]);
// FIXME: Increase parallelism.
return column_families.at(cfname).populate(sstdir);
} else {
dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]);
return make_ready_future<>();
}
});
}
database::database() {
keyspaces.emplace("system", db::system_keyspace::make());
db::system_keyspace::make(*this);
}
future<> database::populate(sstring datadir) {
@@ -335,12 +314,31 @@ future<> database::populate(sstring datadir) {
auto& ks_name = de.name;
auto ksdir = datadir + "/" + de.name;
auto i = keyspaces.find(ks_name);
if (i == keyspaces.end()) {
auto i = _keyspaces.find(ks_name);
if (i == _keyspaces.end()) {
dblog.warn("Skipping undefined keyspace: {}", ks_name);
} else {
dblog.warn("Populating Keyspace {}", ks_name);
return i->second.populate(ksdir);
return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir, ks_name] (directory_entry de) {
auto comps = parse_fname(de.name);
if (comps.size() != 2) {
dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name);
return make_ready_future<>();
}
sstring cfname = comps[0];
auto sstdir = ksdir + "/" + de.name;
try {
auto& cf = find_column_family(ks_name, cfname);
dblog.info("Keyspace {}: Reading CF {} ", ksdir, cfname);
// FIXME: Increase parallelism.
return cf.populate(sstdir);
} catch (no_such_column_family&) {
dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]);
return make_ready_future<>();
}
});
}
return make_ready_future<>();
});
@@ -384,49 +382,115 @@ column_definition::name() const {
return _name;
}
column_family*
keyspace::find_column_family(const sstring& cf_name) {
auto i = column_families.find(cf_name);
if (i == column_families.end()) {
return nullptr;
void database::add_keyspace(sstring name, keyspace k) {
if (_keyspaces.count(name) != 0) {
throw std::invalid_argument("Keyspace " + name + " already exists");
}
return &i->second;
_keyspaces.emplace(std::move(name), std::move(k));
}
schema_ptr
keyspace::find_schema(const sstring& cf_name) {
auto cf = find_column_family(cf_name);
if (!cf) {
return {};
void database::add_column_family(const utils::UUID& uuid, column_family cf) {
if (_keyspaces.count(cf._schema->ks_name) == 0) {
throw std::invalid_argument("Keyspace " + cf._schema->ks_name + " not defined");
}
return cf->_schema;
if (_column_families.count(uuid) != 0) {
throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped");
}
auto kscf = std::make_pair(cf._schema->ks_name, cf._schema->cf_name);
if (_ks_cf_to_uuid.count(kscf) != 0) {
throw std::invalid_argument("Column family " + cf._schema->cf_name + " exists");
}
_column_families.emplace(uuid, std::move(cf));
_ks_cf_to_uuid.emplace(std::move(kscf), uuid);
}
schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) {
auto ks = find_keyspace(ks_name);
if (!ks) {
return {};
}
return ks->find_schema(cf_name);
void database::add_column_family(column_family cf) {
add_column_family(utils::UUID_gen::get_time_UUID(), std::move(cf));
}
keyspace*
database::find_keyspace(const sstring& name) {
auto i = keyspaces.find(name);
if (i != keyspaces.end()) {
return &i->second;
const utils::UUID& database::find_uuid(sstring ks, sstring cf) const throw (std::out_of_range) {
return _ks_cf_to_uuid.at(std::make_pair(ks, cf));
}
const utils::UUID& database::find_uuid(schema_ptr schema) const throw (std::out_of_range) {
return find_uuid(schema->ks_name, schema->cf_name);
}
keyspace& database::find_keyspace(const sstring& name) throw (no_such_keyspace) {
try {
return _keyspaces.at(name);
} catch (...) {
std::throw_with_nested(no_such_keyspace(name));
}
return nullptr;
}
const keyspace& database::find_keyspace(const sstring& name) const throw (no_such_keyspace) {
try {
return _keyspaces.at(name);
} catch (...) {
std::throw_with_nested(no_such_keyspace(name));
}
}
bool database::has_keyspace(const sstring& name) const {
return _keyspaces.count(name) != 0;
}
column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) throw (no_such_column_family) {
try {
return find_column_family(find_uuid(ks_name, cf_name));
} catch (...) {
std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name));
}
}
const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) {
try {
return find_column_family(find_uuid(ks_name, cf_name));
} catch (...) {
std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name));
}
}
column_family& database::find_column_family(const utils::UUID& uuid) throw (no_such_column_family) {
try {
return _column_families.at(uuid);
} catch (...) {
std::throw_with_nested(no_such_column_family(uuid.to_sstring()));
}
}
const column_family& database::find_column_family(const utils::UUID& uuid) const throw (no_such_column_family) {
try {
return _column_families.at(uuid);
} catch (...) {
std::throw_with_nested(no_such_column_family(uuid.to_sstring()));
}
}
column_family& database::find_column_family(schema_ptr schema) throw (no_such_column_family) {
return find_column_family(schema->ks_name, schema->cf_name);
}
const column_family& database::find_column_family(schema_ptr schema) const throw (no_such_column_family) {
return find_column_family(schema->ks_name, schema->cf_name);
}
schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) {
return find_schema(find_uuid(ks_name, cf_name));
}
schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_column_family) {
return find_column_family(uuid)._schema;
}
keyspace&
database::find_or_create_keyspace(const sstring& name) {
auto i = keyspaces.find(name);
if (i != keyspaces.end()) {
auto i = _keyspaces.find(name);
if (i != _keyspaces.end()) {
return i->second;
}
return keyspaces.emplace(name, keyspace()).first->second;
return _keyspaces.emplace(name, keyspace()).first->second;
}
void
@@ -868,18 +932,13 @@ database::query(const query::read_command& cmd) {
return make_ready_future<lw_shared_ptr<query::result>>(make_lw_shared(query::result()));
};
auto ks = find_keyspace(cmd.keyspace);
if (!ks) {
try {
auto& cf = find_column_family(cmd.keyspace, cmd.column_family);
return cf.query(cmd);
} catch (...) {
// FIXME: load from sstables
return make_empty();
}
auto cf = ks->find_column_family(cmd.column_family);
if (!cf) {
return make_empty();
}
return cf->query(cmd);
}
namespace db {

View File

@@ -10,6 +10,7 @@
#include "core/shared_ptr.hh"
#include "net/byteorder.hh"
#include "utils/UUID.hh"
#include "utils/hash.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
#include <functional>
@@ -251,10 +252,17 @@ private:
class keyspace {
public:
std::unordered_map<sstring, column_family> column_families;
future<> populate(sstring datadir);
schema_ptr find_schema(const sstring& cf_name);
column_family* find_column_family(const sstring& cf_name);
// empty right now. placeholder for metadata(?)
};
class no_such_keyspace : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
class no_such_column_family : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
// Policy for distributed<database>:
@@ -263,14 +271,37 @@ public:
// use shard_of() for data
class database {
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, column_family> _column_families;
std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid;
public:
database();
std::unordered_map<sstring, keyspace> keyspaces;
future<> init_from_data_directory(sstring datadir);
future<> populate(sstring datadir);
keyspace* find_keyspace(const sstring& name);
void add_keyspace(sstring name, keyspace k);
/** Adds cf with auto-generated UUID. */
void add_column_family(column_family);
void add_column_family(const utils::UUID&, column_family);
/* throws std::out_of_range if missing */
const utils::UUID& find_uuid(sstring ks, sstring cf) const throw (std::out_of_range);
const utils::UUID& find_uuid(schema_ptr) const throw (std::out_of_range);
/* below, find* throws no_such_<type> on fail */
keyspace& find_or_create_keyspace(const sstring& name);
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name);
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
bool has_keyspace(const sstring& name) const;
column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family);
const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family);
column_family& find_column_family(const utils::UUID&) throw (no_such_column_family);
const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family);
column_family& find_column_family(schema_ptr) throw (no_such_column_family);
const column_family& find_column_family(schema_ptr) const throw (no_such_column_family);
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family);
schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family);
future<> stop() { return make_ready_future<>(); }
void assign(database&& db) {
*this = std::move(db);

View File

@@ -1001,12 +1001,12 @@ std::vector<schema_ptr> all_tables() {
return r;
}
keyspace make() {
void make(database& db) {
keyspace ks;
db.add_keyspace("system", std::move(ks));
for (auto&& table : all_tables()) {
ks.column_families.emplace(table->cf_name, column_family(table));
db.add_column_family(column_family(table));
}
return ks;
}
} // namespace system_keyspace

View File

@@ -49,7 +49,7 @@ extern schema_ptr batchlog();
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
std::vector<schema_ptr> all_tables();
keyspace make();
void make(database& db);
#if 0

View File

@@ -474,13 +474,12 @@ storage_proxy::mutate_locally(std::vector<mutation> mutations) {
auto shard = _db.local().shard_of(dk._token);
return _db.invoke_on(shard, [&m, pmut] (database& db) -> void {
// FIXME: lookup column_family by UUID
keyspace* ks = db.find_keyspace(m.schema->ks_name);
assert(ks); // FIXME: load keyspace meta-data from storage
column_family* cf = ks->find_column_family(m.schema->cf_name);
if (cf) {
cf->apply(m);
} else {
try {
auto& cf = db.find_column_family(m.schema);
cf.apply(m);
} catch (no_such_column_family&) {
// TODO: log a warning
// FIXME: load keyspace meta-data from storage
}
});
});

View File

@@ -76,8 +76,8 @@ public:
future<> create_table(SchemaMaker schema_maker) {
return _db->invoke_on_all([schema_maker, this] (database& db) {
auto cf_schema = make_lw_shared(schema_maker(ks_name));
auto& ks = db.find_or_create_keyspace(ks_name);
ks.column_families.emplace(cf_schema->cf_name, column_family(cf_schema));
db.find_or_create_keyspace(ks_name);
db.add_column_family(column_family(cf_schema));
});
}
@@ -87,11 +87,8 @@ public:
const sstring& column_name,
boost::any expected) {
auto& db = _db->local();
auto ks = db.find_keyspace(ks_name);
assert(ks != nullptr);
auto cf = ks->find_column_family(table_name);
assert(cf != nullptr);
auto schema = cf->_schema;
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf._schema;
auto pkey = partition_key::from_deeply_exploded(*schema, pk);
auto dk = dht::global_partitioner().decorate_key(pkey);
auto shard = db.shard_of(dk._token);
@@ -101,12 +98,9 @@ public:
column_name = std::move(column_name),
expected = std::move(expected),
table_name = std::move(table_name)] (database& db) {
auto ks = db.find_keyspace(ks_name);
assert(ks != nullptr);
auto cf = ks->find_column_family(table_name);
assert(cf != nullptr);
auto schema = cf->_schema;
auto p = cf->find_partition(pkey);
auto& cf = db.find_column_family(ks_name, table_name);
auto schema = cf._schema;
auto p = cf.find_partition(pkey);
assert(p != nullptr);
auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck));
assert(row != nullptr);

View File

@@ -79,7 +79,6 @@ void complete(future<foreign_ptr<lw_shared_ptr<T>>>& fut,
class CassandraAsyncHandler : public CassandraCobSvIf {
distributed<database>& _db;
keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection?
sstring _ks_name;
sstring _cql_version;
public:
@@ -91,7 +90,6 @@ public:
void set_keyspace(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
try {
_ks = &_db.local().keyspaces.at(keyspace);
_ks_name = keyspace;
cob();
} catch (std::out_of_range& e) {
@@ -107,9 +105,10 @@ public:
}
void get_slice(tcxx::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
auto& ks = lookup_keyspace(_db.local(), _ks_name);
auto schema = ks.find_schema(column_parent.column_family);
if (!_ks) {
schema_ptr schema;
try {
schema = _db.local().find_schema(_ks_name, column_parent.column_family);
} catch (...) {
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "column family %s not found", column_parent.column_family);
}
auto pk = key_from_thrift(schema, to_bytes(key));
@@ -124,8 +123,7 @@ public:
if (!column_parent.super_column.empty()) {
throw unimplemented_exception();
}
auto& ks = lookup_keyspace(db, _ks_name);
auto& cf = lookup_column_family(ks, column_parent.column_family);
auto& cf = lookup_column_family(_db.local(), _ks_name, column_parent.column_family);
if (predicate.__isset.column_names) {
throw unimplemented_exception();
} else if (predicate.__isset.slice_range) {
@@ -235,7 +233,7 @@ public:
}
void batch_mutate(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
if (!_ks) {
if (_ks_name.empty()) {
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "keyspace not set");
}
// Would like to use move_iterator below, but Mutation is filled with some const stuff.
@@ -249,7 +247,7 @@ public:
[this, thrift_key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
sstring cf_name = cf_mutations.first;
const std::vector<Mutation>& mutations = cf_mutations.second;
auto& cf = lookup_column_family(*_ks, cf_name);
auto& cf = lookup_column_family(_db.local(), _ks_name, cf_name);
mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema);
auto empty_clustering_key = clustering_key::make_empty(*cf._schema);
for (const Mutation& m : mutations) {
@@ -294,8 +292,7 @@ public:
auto dk = dht::global_partitioner().decorate_key(m_to_apply.key);
auto shard = _db.local().shard_of(dk._token);
return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) {
auto& ks = db.keyspaces.at(_ks_name);
auto& cf = ks.column_families.at(cf_name);
auto& cf = db.find_column_family(_ks_name, cf_name);
cf.apply(m_to_apply);
});
});
@@ -413,13 +410,13 @@ public:
void system_add_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
std::string schema_id = "schema-id"; // FIXME: make meaningful
if (_db.local().keyspaces.count(ks_def.name)) {
if (_db.local().has_keyspace(ks_def.name)) {
InvalidRequestException ire;
ire.why = sprint("Keyspace %s already exists", ks_def.name);
exn_cob(TDelayedException::delayException(ire));
}
_db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) {
keyspace& ks = db.keyspaces[ks_def.name];
db.add_keyspace(ks_def.name, keyspace());
for (const CfDef& cf_def : ks_def.cf_defs) {
std::vector<schema::column> partition_key;
std::vector<schema::column> clustering_key;
@@ -437,7 +434,7 @@ public:
std::move(partition_key), std::move(clustering_key), std::move(regular_columns),
std::vector<schema::column>(), column_name_type);
column_family cf(s);
ks.column_families.emplace(cf_def.name, std::move(cf));
db.add_column_family(std::move(cf));
}
}).then([schema_id = std::move(schema_id)] {
return make_ready_future<std::string>(std::move(schema_id));
@@ -511,20 +508,13 @@ public:
}
private:
static column_family& lookup_column_family(keyspace& ks, const sstring& cf_name) {
static column_family& lookup_column_family(database& db, const sstring& ks_name, const sstring& cf_name) {
try {
return ks.column_families.at(cf_name);
return db.find_column_family(ks_name, cf_name);
} catch (std::out_of_range&) {
throw make_exception<InvalidRequestException>("column family %s not found", cf_name);
}
}
static keyspace& lookup_keyspace(database& db, const sstring& ks_name) {
try {
return db.keyspaces.at(ks_name);
} catch (std::out_of_range&) {
throw make_exception<InvalidRequestException>("Keyspace %s not found", ks_name);
}
}
static partition_key key_from_thrift(schema_ptr s, bytes k) {
if (s->partition_key_size() != 1) {
fail(unimplemented::cause::THRIFT);

View File

@@ -58,8 +58,9 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring
throw exceptions::invalid_request_exception("Keyspace not set");
}
keyspace* ks = db.find_keyspace(keyspace_name);
if (!ks) {
try {
db.find_keyspace(keyspace_name);
} catch (...) {
throw exceptions::keyspace_not_defined_exception(sprint("Keyspace %s does not exist", keyspace_name));
}
@@ -67,12 +68,11 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring
throw exceptions::invalid_request_exception("non-empty table is required");
}
auto schema = ks->find_schema(cf_name);
if (!schema) {
try {
return db.find_schema(keyspace_name, cf_name);
} catch (...) {
throw exceptions::invalid_request_exception(sprint("unconfigured table %s", cf_name));
}
return schema;
}
}