Refactor db/keyspace/column_family toplogy
* database now holds all keyspace + column family object * column families are mapped by uuid, either generated or explicit * lookup by name tuples or uuid * finder functions now return refs + throws on missing obj
This commit is contained in:
185
database.cc
185
database.cc
@@ -8,7 +8,7 @@
|
||||
#include "core/future-util.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/consistency_level.hh"
|
||||
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
@@ -304,30 +304,9 @@ future<> column_family::populate(sstring sstdir) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> keyspace::populate(sstring ksdir) {
|
||||
return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir] (directory_entry de) {
|
||||
auto comps = parse_fname(de.name);
|
||||
if (comps.size() != 2) {
|
||||
dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
sstring cfname = comps[0];
|
||||
|
||||
auto sstdir = ksdir + "/" + de.name;
|
||||
if (column_families.count(cfname) != 0) {
|
||||
dblog.info("Keyspace {}: Reading CF {} ", ksdir, comps[0]);
|
||||
|
||||
// FIXME: Increase parallelism.
|
||||
return column_families.at(cfname).populate(sstdir);
|
||||
} else {
|
||||
dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
database::database() {
|
||||
keyspaces.emplace("system", db::system_keyspace::make());
|
||||
db::system_keyspace::make(*this);
|
||||
}
|
||||
|
||||
future<> database::populate(sstring datadir) {
|
||||
@@ -335,12 +314,31 @@ future<> database::populate(sstring datadir) {
|
||||
auto& ks_name = de.name;
|
||||
auto ksdir = datadir + "/" + de.name;
|
||||
|
||||
auto i = keyspaces.find(ks_name);
|
||||
if (i == keyspaces.end()) {
|
||||
auto i = _keyspaces.find(ks_name);
|
||||
if (i == _keyspaces.end()) {
|
||||
dblog.warn("Skipping undefined keyspace: {}", ks_name);
|
||||
} else {
|
||||
dblog.warn("Populating Keyspace {}", ks_name);
|
||||
return i->second.populate(ksdir);
|
||||
return lister::scan_dir(ksdir, directory_entry_type::directory, [this, ksdir, ks_name] (directory_entry de) {
|
||||
auto comps = parse_fname(de.name);
|
||||
if (comps.size() != 2) {
|
||||
dblog.error("Keyspace {}: Skipping malformed CF {} ", ksdir, de.name);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
sstring cfname = comps[0];
|
||||
|
||||
auto sstdir = ksdir + "/" + de.name;
|
||||
|
||||
try {
|
||||
auto& cf = find_column_family(ks_name, cfname);
|
||||
dblog.info("Keyspace {}: Reading CF {} ", ksdir, cfname);
|
||||
// FIXME: Increase parallelism.
|
||||
return cf.populate(sstdir);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.warn("{}, CF {}: schema not loaded!", ksdir, comps[0]);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -384,49 +382,115 @@ column_definition::name() const {
|
||||
return _name;
|
||||
}
|
||||
|
||||
column_family*
|
||||
keyspace::find_column_family(const sstring& cf_name) {
|
||||
auto i = column_families.find(cf_name);
|
||||
if (i == column_families.end()) {
|
||||
return nullptr;
|
||||
void database::add_keyspace(sstring name, keyspace k) {
|
||||
if (_keyspaces.count(name) != 0) {
|
||||
throw std::invalid_argument("Keyspace " + name + " already exists");
|
||||
}
|
||||
return &i->second;
|
||||
_keyspaces.emplace(std::move(name), std::move(k));
|
||||
}
|
||||
|
||||
schema_ptr
|
||||
keyspace::find_schema(const sstring& cf_name) {
|
||||
auto cf = find_column_family(cf_name);
|
||||
if (!cf) {
|
||||
return {};
|
||||
void database::add_column_family(const utils::UUID& uuid, column_family cf) {
|
||||
if (_keyspaces.count(cf._schema->ks_name) == 0) {
|
||||
throw std::invalid_argument("Keyspace " + cf._schema->ks_name + " not defined");
|
||||
}
|
||||
return cf->_schema;
|
||||
if (_column_families.count(uuid) != 0) {
|
||||
throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped");
|
||||
}
|
||||
auto kscf = std::make_pair(cf._schema->ks_name, cf._schema->cf_name);
|
||||
if (_ks_cf_to_uuid.count(kscf) != 0) {
|
||||
throw std::invalid_argument("Column family " + cf._schema->cf_name + " exists");
|
||||
}
|
||||
_column_families.emplace(uuid, std::move(cf));
|
||||
_ks_cf_to_uuid.emplace(std::move(kscf), uuid);
|
||||
}
|
||||
|
||||
schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) {
|
||||
auto ks = find_keyspace(ks_name);
|
||||
if (!ks) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return ks->find_schema(cf_name);
|
||||
void database::add_column_family(column_family cf) {
|
||||
add_column_family(utils::UUID_gen::get_time_UUID(), std::move(cf));
|
||||
}
|
||||
|
||||
keyspace*
|
||||
database::find_keyspace(const sstring& name) {
|
||||
auto i = keyspaces.find(name);
|
||||
if (i != keyspaces.end()) {
|
||||
return &i->second;
|
||||
const utils::UUID& database::find_uuid(sstring ks, sstring cf) const throw (std::out_of_range) {
|
||||
return _ks_cf_to_uuid.at(std::make_pair(ks, cf));
|
||||
}
|
||||
|
||||
const utils::UUID& database::find_uuid(schema_ptr schema) const throw (std::out_of_range) {
|
||||
return find_uuid(schema->ks_name, schema->cf_name);
|
||||
}
|
||||
|
||||
keyspace& database::find_keyspace(const sstring& name) throw (no_such_keyspace) {
|
||||
try {
|
||||
return _keyspaces.at(name);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_keyspace(name));
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const keyspace& database::find_keyspace(const sstring& name) const throw (no_such_keyspace) {
|
||||
try {
|
||||
return _keyspaces.at(name);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_keyspace(name));
|
||||
}
|
||||
}
|
||||
|
||||
bool database::has_keyspace(const sstring& name) const {
|
||||
return _keyspaces.count(name) != 0;
|
||||
}
|
||||
|
||||
column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) throw (no_such_column_family) {
|
||||
try {
|
||||
return find_column_family(find_uuid(ks_name, cf_name));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name));
|
||||
}
|
||||
}
|
||||
|
||||
const column_family& database::find_column_family(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) {
|
||||
try {
|
||||
return find_column_family(find_uuid(ks_name, cf_name));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(ks_name + ":" + cf_name));
|
||||
}
|
||||
}
|
||||
|
||||
column_family& database::find_column_family(const utils::UUID& uuid) throw (no_such_column_family) {
|
||||
try {
|
||||
return _column_families.at(uuid);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(uuid.to_sstring()));
|
||||
}
|
||||
}
|
||||
|
||||
const column_family& database::find_column_family(const utils::UUID& uuid) const throw (no_such_column_family) {
|
||||
try {
|
||||
return _column_families.at(uuid);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(no_such_column_family(uuid.to_sstring()));
|
||||
}
|
||||
}
|
||||
|
||||
column_family& database::find_column_family(schema_ptr schema) throw (no_such_column_family) {
|
||||
return find_column_family(schema->ks_name, schema->cf_name);
|
||||
}
|
||||
|
||||
const column_family& database::find_column_family(schema_ptr schema) const throw (no_such_column_family) {
|
||||
return find_column_family(schema->ks_name, schema->cf_name);
|
||||
}
|
||||
|
||||
schema_ptr database::find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family) {
|
||||
return find_schema(find_uuid(ks_name, cf_name));
|
||||
}
|
||||
|
||||
schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_column_family) {
|
||||
return find_column_family(uuid)._schema;
|
||||
}
|
||||
|
||||
keyspace&
|
||||
database::find_or_create_keyspace(const sstring& name) {
|
||||
auto i = keyspaces.find(name);
|
||||
if (i != keyspaces.end()) {
|
||||
auto i = _keyspaces.find(name);
|
||||
if (i != _keyspaces.end()) {
|
||||
return i->second;
|
||||
}
|
||||
return keyspaces.emplace(name, keyspace()).first->second;
|
||||
return _keyspaces.emplace(name, keyspace()).first->second;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -868,18 +932,13 @@ database::query(const query::read_command& cmd) {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(make_lw_shared(query::result()));
|
||||
};
|
||||
|
||||
auto ks = find_keyspace(cmd.keyspace);
|
||||
if (!ks) {
|
||||
try {
|
||||
auto& cf = find_column_family(cmd.keyspace, cmd.column_family);
|
||||
return cf.query(cmd);
|
||||
} catch (...) {
|
||||
// FIXME: load from sstables
|
||||
return make_empty();
|
||||
}
|
||||
|
||||
auto cf = ks->find_column_family(cmd.column_family);
|
||||
if (!cf) {
|
||||
return make_empty();
|
||||
}
|
||||
|
||||
return cf->query(cmd);
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
45
database.hh
45
database.hh
@@ -10,6 +10,7 @@
|
||||
#include "core/shared_ptr.hh"
|
||||
#include "net/byteorder.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include <functional>
|
||||
@@ -251,10 +252,17 @@ private:
|
||||
|
||||
class keyspace {
|
||||
public:
|
||||
std::unordered_map<sstring, column_family> column_families;
|
||||
future<> populate(sstring datadir);
|
||||
schema_ptr find_schema(const sstring& cf_name);
|
||||
column_family* find_column_family(const sstring& cf_name);
|
||||
// empty right now. placeholder for metadata(?)
|
||||
};
|
||||
|
||||
class no_such_keyspace : public std::runtime_error {
|
||||
public:
|
||||
using runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
class no_such_column_family : public std::runtime_error {
|
||||
public:
|
||||
using runtime_error::runtime_error;
|
||||
};
|
||||
|
||||
// Policy for distributed<database>:
|
||||
@@ -263,14 +271,37 @@ public:
|
||||
// use shard_of() for data
|
||||
|
||||
class database {
|
||||
std::unordered_map<sstring, keyspace> _keyspaces;
|
||||
std::unordered_map<utils::UUID, column_family> _column_families;
|
||||
std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid;
|
||||
public:
|
||||
database();
|
||||
std::unordered_map<sstring, keyspace> keyspaces;
|
||||
|
||||
future<> init_from_data_directory(sstring datadir);
|
||||
future<> populate(sstring datadir);
|
||||
keyspace* find_keyspace(const sstring& name);
|
||||
|
||||
void add_keyspace(sstring name, keyspace k);
|
||||
/** Adds cf with auto-generated UUID. */
|
||||
void add_column_family(column_family);
|
||||
void add_column_family(const utils::UUID&, column_family);
|
||||
|
||||
/* throws std::out_of_range if missing */
|
||||
const utils::UUID& find_uuid(sstring ks, sstring cf) const throw (std::out_of_range);
|
||||
const utils::UUID& find_uuid(schema_ptr) const throw (std::out_of_range);
|
||||
|
||||
/* below, find* throws no_such_<type> on fail */
|
||||
keyspace& find_or_create_keyspace(const sstring& name);
|
||||
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name);
|
||||
keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace);
|
||||
const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace);
|
||||
bool has_keyspace(const sstring& name) const;
|
||||
column_family& find_column_family(const sstring& ks, const sstring& name) throw (no_such_column_family);
|
||||
const column_family& find_column_family(const sstring& ks, const sstring& name) const throw (no_such_column_family);
|
||||
column_family& find_column_family(const utils::UUID&) throw (no_such_column_family);
|
||||
const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family);
|
||||
column_family& find_column_family(schema_ptr) throw (no_such_column_family);
|
||||
const column_family& find_column_family(schema_ptr) const throw (no_such_column_family);
|
||||
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family);
|
||||
schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family);
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
void assign(database&& db) {
|
||||
*this = std::move(db);
|
||||
|
||||
@@ -1001,12 +1001,12 @@ std::vector<schema_ptr> all_tables() {
|
||||
return r;
|
||||
}
|
||||
|
||||
keyspace make() {
|
||||
void make(database& db) {
|
||||
keyspace ks;
|
||||
db.add_keyspace("system", std::move(ks));
|
||||
for (auto&& table : all_tables()) {
|
||||
ks.column_families.emplace(table->cf_name, column_family(table));
|
||||
db.add_column_family(column_family(table));
|
||||
}
|
||||
return ks;
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
@@ -49,7 +49,7 @@ extern schema_ptr batchlog();
|
||||
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private
|
||||
|
||||
std::vector<schema_ptr> all_tables();
|
||||
keyspace make();
|
||||
void make(database& db);
|
||||
|
||||
#if 0
|
||||
|
||||
|
||||
@@ -474,13 +474,12 @@ storage_proxy::mutate_locally(std::vector<mutation> mutations) {
|
||||
auto shard = _db.local().shard_of(dk._token);
|
||||
return _db.invoke_on(shard, [&m, pmut] (database& db) -> void {
|
||||
// FIXME: lookup column_family by UUID
|
||||
keyspace* ks = db.find_keyspace(m.schema->ks_name);
|
||||
assert(ks); // FIXME: load keyspace meta-data from storage
|
||||
column_family* cf = ks->find_column_family(m.schema->cf_name);
|
||||
if (cf) {
|
||||
cf->apply(m);
|
||||
} else {
|
||||
try {
|
||||
auto& cf = db.find_column_family(m.schema);
|
||||
cf.apply(m);
|
||||
} catch (no_such_column_family&) {
|
||||
// TODO: log a warning
|
||||
// FIXME: load keyspace meta-data from storage
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -76,8 +76,8 @@ public:
|
||||
future<> create_table(SchemaMaker schema_maker) {
|
||||
return _db->invoke_on_all([schema_maker, this] (database& db) {
|
||||
auto cf_schema = make_lw_shared(schema_maker(ks_name));
|
||||
auto& ks = db.find_or_create_keyspace(ks_name);
|
||||
ks.column_families.emplace(cf_schema->cf_name, column_family(cf_schema));
|
||||
db.find_or_create_keyspace(ks_name);
|
||||
db.add_column_family(column_family(cf_schema));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -87,11 +87,8 @@ public:
|
||||
const sstring& column_name,
|
||||
boost::any expected) {
|
||||
auto& db = _db->local();
|
||||
auto ks = db.find_keyspace(ks_name);
|
||||
assert(ks != nullptr);
|
||||
auto cf = ks->find_column_family(table_name);
|
||||
assert(cf != nullptr);
|
||||
auto schema = cf->_schema;
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf._schema;
|
||||
auto pkey = partition_key::from_deeply_exploded(*schema, pk);
|
||||
auto dk = dht::global_partitioner().decorate_key(pkey);
|
||||
auto shard = db.shard_of(dk._token);
|
||||
@@ -101,12 +98,9 @@ public:
|
||||
column_name = std::move(column_name),
|
||||
expected = std::move(expected),
|
||||
table_name = std::move(table_name)] (database& db) {
|
||||
auto ks = db.find_keyspace(ks_name);
|
||||
assert(ks != nullptr);
|
||||
auto cf = ks->find_column_family(table_name);
|
||||
assert(cf != nullptr);
|
||||
auto schema = cf->_schema;
|
||||
auto p = cf->find_partition(pkey);
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf._schema;
|
||||
auto p = cf.find_partition(pkey);
|
||||
assert(p != nullptr);
|
||||
auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck));
|
||||
assert(row != nullptr);
|
||||
|
||||
@@ -79,7 +79,6 @@ void complete(future<foreign_ptr<lw_shared_ptr<T>>>& fut,
|
||||
|
||||
class CassandraAsyncHandler : public CassandraCobSvIf {
|
||||
distributed<database>& _db;
|
||||
keyspace* _ks = nullptr; // FIXME: reference counting for in-use detection?
|
||||
sstring _ks_name;
|
||||
sstring _cql_version;
|
||||
public:
|
||||
@@ -91,7 +90,6 @@ public:
|
||||
|
||||
void set_keyspace(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
|
||||
try {
|
||||
_ks = &_db.local().keyspaces.at(keyspace);
|
||||
_ks_name = keyspace;
|
||||
cob();
|
||||
} catch (std::out_of_range& e) {
|
||||
@@ -107,9 +105,10 @@ public:
|
||||
}
|
||||
|
||||
void get_slice(tcxx::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
|
||||
auto& ks = lookup_keyspace(_db.local(), _ks_name);
|
||||
auto schema = ks.find_schema(column_parent.column_family);
|
||||
if (!_ks) {
|
||||
schema_ptr schema;
|
||||
try {
|
||||
schema = _db.local().find_schema(_ks_name, column_parent.column_family);
|
||||
} catch (...) {
|
||||
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "column family %s not found", column_parent.column_family);
|
||||
}
|
||||
auto pk = key_from_thrift(schema, to_bytes(key));
|
||||
@@ -124,8 +123,7 @@ public:
|
||||
if (!column_parent.super_column.empty()) {
|
||||
throw unimplemented_exception();
|
||||
}
|
||||
auto& ks = lookup_keyspace(db, _ks_name);
|
||||
auto& cf = lookup_column_family(ks, column_parent.column_family);
|
||||
auto& cf = lookup_column_family(_db.local(), _ks_name, column_parent.column_family);
|
||||
if (predicate.__isset.column_names) {
|
||||
throw unimplemented_exception();
|
||||
} else if (predicate.__isset.slice_range) {
|
||||
@@ -235,7 +233,7 @@ public:
|
||||
}
|
||||
|
||||
void batch_mutate(tcxx::function<void()> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
|
||||
if (!_ks) {
|
||||
if (_ks_name.empty()) {
|
||||
return complete_with_exception<InvalidRequestException>(std::move(exn_cob), "keyspace not set");
|
||||
}
|
||||
// Would like to use move_iterator below, but Mutation is filled with some const stuff.
|
||||
@@ -249,7 +247,7 @@ public:
|
||||
[this, thrift_key] (std::pair<std::string, std::vector<Mutation>> cf_mutations) {
|
||||
sstring cf_name = cf_mutations.first;
|
||||
const std::vector<Mutation>& mutations = cf_mutations.second;
|
||||
auto& cf = lookup_column_family(*_ks, cf_name);
|
||||
auto& cf = lookup_column_family(_db.local(), _ks_name, cf_name);
|
||||
mutation m_to_apply(key_from_thrift(cf._schema, thrift_key), cf._schema);
|
||||
auto empty_clustering_key = clustering_key::make_empty(*cf._schema);
|
||||
for (const Mutation& m : mutations) {
|
||||
@@ -294,8 +292,7 @@ public:
|
||||
auto dk = dht::global_partitioner().decorate_key(m_to_apply.key);
|
||||
auto shard = _db.local().shard_of(dk._token);
|
||||
return _db.invoke_on(shard, [this, cf_name, m_to_apply = std::move(m_to_apply)] (database& db) {
|
||||
auto& ks = db.keyspaces.at(_ks_name);
|
||||
auto& cf = ks.column_families.at(cf_name);
|
||||
auto& cf = db.find_column_family(_ks_name, cf_name);
|
||||
cf.apply(m_to_apply);
|
||||
});
|
||||
});
|
||||
@@ -413,13 +410,13 @@ public:
|
||||
|
||||
void system_add_keyspace(tcxx::function<void(std::string const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
|
||||
std::string schema_id = "schema-id"; // FIXME: make meaningful
|
||||
if (_db.local().keyspaces.count(ks_def.name)) {
|
||||
if (_db.local().has_keyspace(ks_def.name)) {
|
||||
InvalidRequestException ire;
|
||||
ire.why = sprint("Keyspace %s already exists", ks_def.name);
|
||||
exn_cob(TDelayedException::delayException(ire));
|
||||
}
|
||||
_db.invoke_on_all([this, ks_def = std::move(ks_def)] (database& db) {
|
||||
keyspace& ks = db.keyspaces[ks_def.name];
|
||||
db.add_keyspace(ks_def.name, keyspace());
|
||||
for (const CfDef& cf_def : ks_def.cf_defs) {
|
||||
std::vector<schema::column> partition_key;
|
||||
std::vector<schema::column> clustering_key;
|
||||
@@ -437,7 +434,7 @@ public:
|
||||
std::move(partition_key), std::move(clustering_key), std::move(regular_columns),
|
||||
std::vector<schema::column>(), column_name_type);
|
||||
column_family cf(s);
|
||||
ks.column_families.emplace(cf_def.name, std::move(cf));
|
||||
db.add_column_family(std::move(cf));
|
||||
}
|
||||
}).then([schema_id = std::move(schema_id)] {
|
||||
return make_ready_future<std::string>(std::move(schema_id));
|
||||
@@ -511,20 +508,13 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
static column_family& lookup_column_family(keyspace& ks, const sstring& cf_name) {
|
||||
static column_family& lookup_column_family(database& db, const sstring& ks_name, const sstring& cf_name) {
|
||||
try {
|
||||
return ks.column_families.at(cf_name);
|
||||
return db.find_column_family(ks_name, cf_name);
|
||||
} catch (std::out_of_range&) {
|
||||
throw make_exception<InvalidRequestException>("column family %s not found", cf_name);
|
||||
}
|
||||
}
|
||||
static keyspace& lookup_keyspace(database& db, const sstring& ks_name) {
|
||||
try {
|
||||
return db.keyspaces.at(ks_name);
|
||||
} catch (std::out_of_range&) {
|
||||
throw make_exception<InvalidRequestException>("Keyspace %s not found", ks_name);
|
||||
}
|
||||
}
|
||||
static partition_key key_from_thrift(schema_ptr s, bytes k) {
|
||||
if (s->partition_key_size() != 1) {
|
||||
fail(unimplemented::cause::THRIFT);
|
||||
|
||||
@@ -58,8 +58,9 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring
|
||||
throw exceptions::invalid_request_exception("Keyspace not set");
|
||||
}
|
||||
|
||||
keyspace* ks = db.find_keyspace(keyspace_name);
|
||||
if (!ks) {
|
||||
try {
|
||||
db.find_keyspace(keyspace_name);
|
||||
} catch (...) {
|
||||
throw exceptions::keyspace_not_defined_exception(sprint("Keyspace %s does not exist", keyspace_name));
|
||||
}
|
||||
|
||||
@@ -67,12 +68,11 @@ validate_column_family(database& db, const sstring& keyspace_name, const sstring
|
||||
throw exceptions::invalid_request_exception("non-empty table is required");
|
||||
}
|
||||
|
||||
auto schema = ks->find_schema(cf_name);
|
||||
if (!schema) {
|
||||
try {
|
||||
return db.find_schema(keyspace_name, cf_name);
|
||||
} catch (...) {
|
||||
throw exceptions::invalid_request_exception(sprint("unconfigured table %s", cf_name));
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user