db: create keyspace/column_family directory structure

This is slightly awkwards, since the directory structure is not sharded.
This requires some processing to occur outside the shard, while the rest
is sharded.
This commit is contained in:
Avi Kivity
2015-05-17 10:21:47 +03:00
parent 20775b9d5c
commit 875148dae6
3 changed files with 38 additions and 2 deletions

View File

@@ -15,6 +15,7 @@
#include "query-result-writer.hh"
#include "nway_merger.hh"
#include "cql3/column_identifier.hh"
#include "core/seastar.hh"
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include "sstables/sstables.hh"
@@ -427,6 +428,19 @@ void database::add_keyspace(sstring name, keyspace k) {
_keyspaces.emplace(std::move(name), std::move(k));
}
future<>
create_keyspace(distributed<database>& db, sstring name) {
return make_directory(db.local()._cfg->data_file_directories() + "/" + name).then([name, &db] {
return db.invoke_on_all([&name] (database& db) {
auto cfg = db.make_keyspace_config(name);
db.add_keyspace(name, keyspace(cfg));
});
});
// FIXME: rollback on error, or keyspace directory remains on disk, poisoning
// everything.
// FIXME: sync parent directory?
}
void database::update_keyspace(const sstring& name) {
throw std::runtime_error("not implemented");
}
@@ -552,6 +566,11 @@ keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const {
return sprint("%s/%s-%s", _config.datadir, name, uuid);
}
future<>
keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) {
return make_directory(column_family_directory(name, uuid));
}
column_family& database::find_column_family(const schema_ptr& schema) throw (no_such_column_family) {
return find_column_family(schema->id());
}

View File

@@ -15,6 +15,7 @@
#include "utils/hash.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
#include "core/distributed.hh"
#include <functional>
#include <cstdint>
#include <unordered_map>
@@ -168,6 +169,7 @@ public:
void create_replication_strategy(::config::ks_meta_data& ksm);
locator::abstract_replication_strategy& get_replication_strategy();
column_family::config make_column_family_config(const schema& s) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
private:
sstring column_family_directory(const sstring& name, utils::UUID uuid) const;
};
@@ -209,6 +211,7 @@ public:
future<> init_from_data_directory();
// but see: create_keyspace(distributed<database>&, sstring)
void add_keyspace(sstring name, keyspace k);
/** Adds cf with auto-generated UUID. */
void add_column_family(column_family&&);
@@ -241,8 +244,13 @@ public:
future<> apply(const frozen_mutation&);
keyspace::config make_keyspace_config(sstring name) const;
friend std::ostream& operator<<(std::ostream& out, const database& db);
friend future<> create_keyspace(distributed<database>&, sstring);
};
// Creates a keyspace. Keyspaces have a non-sharded
// component (the directory), so a global function is needed.
future<> create_keyspace(distributed<database>& db, sstring name);
// FIXME: stub
class secondary_index_manager {};

View File

@@ -427,8 +427,16 @@ public:
boost::for_each(ks_def.cf_defs, [&cf_ids] (auto&&) {
cf_ids.push_back(utils::UUID_gen::get_time_UUID());
});
_db.invoke_on_all([this, ks_def = std::move(ks_def), cf_ids = std::move(cf_ids)] (database& db) {
db.add_keyspace(ks_def.name, keyspace(db.make_keyspace_config(ks_def.name)));
create_keyspace(_db, ks_def.name).then([this, ks_def, cf_ids] {
return parallel_for_each(boost::combine(ks_def.cf_defs, cf_ids), [this, ks_def, cf_ids] (auto&& cf_def_and_id) {
// We create the directory on the local shard, since the same directory is
// used for all shards.
auto&& name = boost::get<0>(cf_def_and_id).name;
auto&& uuid = boost::get<1>(cf_def_and_id);
return _db.local().find_keyspace(ks_def.name).make_directory_for_column_family(name, uuid);
});
}).then([this, ks_def, cf_ids] {
return _db.invoke_on_all([this, ks_def = std::move(ks_def), cf_ids = std::move(cf_ids)] (database& db) {
std::vector<schema_ptr> cf_defs;
cf_defs.reserve(ks_def.cf_defs.size());
auto id_iterator = cf_ids.begin();
@@ -462,6 +470,7 @@ public:
shared_ptr<config::ut_meta_data>());
auto& ks = db.find_keyspace(ks_def.name);
ks.create_replication_strategy(ksm);
});
}).then([schema_id = std::move(schema_id)] {
return make_ready_future<std::string>(std::move(schema_id));
}).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<std::string> result) {