diff --git a/database.cc b/database.cc index 1b2c3e971b..6b9b05f630 100644 --- a/database.cc +++ b/database.cc @@ -15,6 +15,7 @@ #include "query-result-writer.hh" #include "nway_merger.hh" #include "cql3/column_identifier.hh" +#include "core/seastar.hh" #include #include #include "sstables/sstables.hh" @@ -427,6 +428,19 @@ void database::add_keyspace(sstring name, keyspace k) { _keyspaces.emplace(std::move(name), std::move(k)); } +future<> +create_keyspace(distributed& db, sstring name) { + return make_directory(db.local()._cfg->data_file_directories() + "/" + name).then([name, &db] { + return db.invoke_on_all([&name] (database& db) { + auto cfg = db.make_keyspace_config(name); + db.add_keyspace(name, keyspace(cfg)); + }); + }); + // FIXME: rollback on error, or keyspace directory remains on disk, poisoning + // everything. + // FIXME: sync parent directory? +} + void database::update_keyspace(const sstring& name) { throw std::runtime_error("not implemented"); } @@ -552,6 +566,11 @@ keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const { return sprint("%s/%s-%s", _config.datadir, name, uuid); } +future<> +keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) { + return make_directory(column_family_directory(name, uuid)); +} + column_family& database::find_column_family(const schema_ptr& schema) throw (no_such_column_family) { return find_column_family(schema->id()); } diff --git a/database.hh b/database.hh index bf348c0e83..2542214b4c 100644 --- a/database.hh +++ b/database.hh @@ -15,6 +15,7 @@ #include "utils/hash.hh" #include "db_clock.hh" #include "gc_clock.hh" +#include "core/distributed.hh" #include #include #include @@ -168,6 +169,7 @@ public: void create_replication_strategy(::config::ks_meta_data& ksm); locator::abstract_replication_strategy& get_replication_strategy(); column_family::config make_column_family_config(const schema& s) const; + future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid); private: sstring column_family_directory(const sstring& name, utils::UUID uuid) const; }; @@ -209,6 +211,7 @@ public: future<> init_from_data_directory(); + // but see: create_keyspace(distributed&, sstring) void add_keyspace(sstring name, keyspace k); /** Adds cf with auto-generated UUID. */ void add_column_family(column_family&&); @@ -241,8 +244,13 @@ public: future<> apply(const frozen_mutation&); keyspace::config make_keyspace_config(sstring name) const; friend std::ostream& operator<<(std::ostream& out, const database& db); + friend future<> create_keyspace(distributed&, sstring); }; +// Creates a keyspace. Keyspaces have a non-sharded +// component (the directory), so a global function is needed. +future<> create_keyspace(distributed& db, sstring name); + // FIXME: stub class secondary_index_manager {}; diff --git a/thrift/handler.cc b/thrift/handler.cc index fba25426b6..cdae2304e0 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -427,8 +427,16 @@ public: boost::for_each(ks_def.cf_defs, [&cf_ids] (auto&&) { cf_ids.push_back(utils::UUID_gen::get_time_UUID()); }); - _db.invoke_on_all([this, ks_def = std::move(ks_def), cf_ids = std::move(cf_ids)] (database& db) { - db.add_keyspace(ks_def.name, keyspace(db.make_keyspace_config(ks_def.name))); + create_keyspace(_db, ks_def.name).then([this, ks_def, cf_ids] { + return parallel_for_each(boost::combine(ks_def.cf_defs, cf_ids), [this, ks_def, cf_ids] (auto&& cf_def_and_id) { + // We create the directory on the local shard, since the same directory is + // used for all shards. + auto&& name = boost::get<0>(cf_def_and_id).name; + auto&& uuid = boost::get<1>(cf_def_and_id); + return _db.local().find_keyspace(ks_def.name).make_directory_for_column_family(name, uuid); + }); + }).then([this, ks_def, cf_ids] { + return _db.invoke_on_all([this, ks_def = std::move(ks_def), cf_ids = std::move(cf_ids)] (database& db) { std::vector cf_defs; cf_defs.reserve(ks_def.cf_defs.size()); auto id_iterator = cf_ids.begin(); @@ -462,6 +470,7 @@ public: shared_ptr()); auto& ks = db.find_keyspace(ks_def.name); ks.create_replication_strategy(ksm); + }); }).then([schema_id = std::move(schema_id)] { return make_ready_future(std::move(schema_id)); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future result) {