Merge 'Initialize/destroy ks/cf directories with explicit class methods' from Pavel Emelyanov

This set encapsulates ks/cf directories creation and deletion into keyspace and table classes methods. This is needed to facilitate making the storage initialization storage-type aware in the future. Also this makes the replica/ code less involved in formatting sstables' directory path by hand.

refs: #13020
refs: #12707

Closes #14048

* github.com:scylladb/scylladb:
  keyspace: Introduce init_storage()
  keyspace: Remove column_family_directory()
  table: Introduce destroy_storage()
  table: Simplify init_storage()
  table: Coroutinize init_storage()
  table: Relocate ks.make_directory_for_column_family()
  distributed_loader: Use cf.dir() instead of ks.column_family_directory()
  test: Don't create directory for system tables in cql_test_env
This commit is contained in:
Avi Kivity
2023-05-28 18:50:38 +03:00
4 changed files with 28 additions and 39 deletions

View File

@@ -993,8 +993,9 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family:
future<> database::add_column_family_and_make_directory(schema_ptr schema) {
auto& ks = find_keyspace(schema->ks_name());
add_column_family(ks, schema, ks.make_column_family_config(*schema, *this));
find_column_family(schema).get_index_manager().reload();
return ks.make_directory_for_column_family(schema->cf_name(), schema->id());
auto& cf = find_column_family(schema);
cf.get_index_manager().reload();
return cf.init_storage();
}
bool database::update_column_family(schema_ptr new_schema) {
@@ -1081,7 +1082,6 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
auto table_dir = fs::path(table_shards->dir());
std::optional<sstring> snapshot_name_opt;
if (with_snapshot) {
snapshot_name_opt = format("pre-drop-{}", db_clock::now().time_since_epoch().count());
@@ -1098,7 +1098,7 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
return table_shards->stop();
});
f.get(); // re-throw exception from truncate() if any
co_await sstables::remove_table_directory_if_has_no_snapshots(table_dir);
co_await table_shards->destroy_storage();
}
const table_id& database::find_uuid(std::string_view ks, std::string_view cf) const {
@@ -1277,7 +1277,9 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
const db::config& db_config = db.get_config();
for (auto& extra : _config.all_datadirs) {
cfg.all_datadirs.push_back(column_family_directory(extra, s.cf_name(), s.id()));
auto uuid_sstring = s.id().to_sstring();
boost::erase_all(uuid_sstring, "-");
cfg.all_datadirs.push_back(format("{}/{}-{}", extra, s.cf_name(), uuid_sstring));
}
cfg.datadir = cfg.all_datadirs[0];
cfg.enable_disk_reads = _config.enable_disk_reads;
@@ -1309,26 +1311,22 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
return cfg;
}
sstring
keyspace::column_family_directory(const sstring& base_path, const sstring& name, table_id uuid) const {
auto uuid_sstring = uuid.to_sstring();
boost::erase_all(uuid_sstring, "-");
return format("{}/{}-{}", base_path, name, uuid_sstring);
future<> table::init_storage() {
co_await coroutine::parallel_for_each(_config.all_datadirs, [] (sstring cfdir) {
return io_check([cfdir] { return recursive_touch_directory(cfdir); });
});
co_await io_check([this] { return touch_directory(_config.datadir + "/upload"); });
co_await io_check([this] { return touch_directory(_config.datadir + "/staging"); });
}
future<>
keyspace::make_directory_for_column_family(const sstring& name, table_id uuid) {
std::vector<sstring> cfdirs;
for (auto& extra : _config.all_datadirs) {
cfdirs.push_back(column_family_directory(extra, name, uuid));
future<> table::destroy_storage() {
return sstables::remove_table_directory_if_has_no_snapshots(fs::path(_config.datadir));
}
future<> keyspace::init_storage() {
if (_config.datadir != "") {
co_await io_check([this] { return touch_directory(_config.datadir); });
}
return parallel_for_each(cfdirs, [] (sstring cfdir) {
return io_check([cfdir] { return recursive_touch_directory(cfdir); });
}).then([cfdirs0 = cfdirs[0]] {
return io_check([cfdirs0] { return touch_directory(cfdirs0 + "/upload"); });
}).then([cfdirs0 = cfdirs[0]] {
return io_check([cfdirs0] { return touch_directory(cfdirs0 + "/staging"); });
});
}
column_family& database::find_column_family(const schema_ptr& schema) {
@@ -1402,11 +1400,7 @@ database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::
co_await create_in_memory_keyspace(ksm, erm_factory, system);
auto& ks = _keyspaces.at(ksm->name());
auto& datadir = ks.datadir();
if (datadir != "") {
co_await io_check([&datadir] { return touch_directory(datadir); });
}
co_await ks.init_storage();
}
future<>

View File

@@ -626,6 +626,8 @@ public:
const storage_options& get_storage_options() const noexcept { return *_storage_opts; }
lw_shared_ptr<const storage_options> get_storage_options_ptr() const noexcept { return _storage_opts; }
future<> init_storage();
future<> destroy_storage();
seastar::gate& async_gate() { return _async_gate; }
@@ -1202,6 +1204,8 @@ public:
future<> update_from(const locator::shared_token_metadata& stm, lw_shared_ptr<keyspace_metadata>);
future<> init_storage();
/** Note: return by shared pointer value, since the meta data is
* semi-volatile. I.e. we could do alter keyspace at any time, and
* boom, it is replaced.
@@ -1225,7 +1229,6 @@ public:
locator::vnode_effective_replication_map_ptr get_effective_replication_map() const;
column_family::config make_column_family_config(const schema& s, const database& db) const;
future<> make_directory_for_column_family(const sstring& name, table_id uuid);
void add_or_update_column_family(const schema_ptr& s);
void add_user_type(const user_type ut);
void remove_user_type(const user_type ut);
@@ -1241,8 +1244,6 @@ public:
const sstring& datadir() const {
return _config.datadir;
}
sstring column_family_directory(const sstring& base_path, const sstring& name, table_id uuid) const;
};
using no_such_keyspace = data_dictionary::no_such_keyspace;

View File

@@ -723,7 +723,6 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
}
sstring cfname = cf->schema()->cf_name();
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf->get_storage_options().type_string());
auto gtable = co_await get_table_on_all_shards(db, ks_name, cfname);
@@ -731,16 +730,16 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
std::exception_ptr ex;
try {
co_await ks.make_directory_for_column_family(cfname, uuid);
co_await cf->init_storage();
co_await metadata.start();
} catch (...) {
std::exception_ptr eptr = std::current_exception();
std::string msg =
format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
ks_name, cfname, sstdir, eptr);
ks_name, cfname, cf->dir(), eptr);
dblog.error("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",
ks_name, cfname, sstdir, eptr);
ks_name, cfname, cf->dir(), eptr);
try {
std::rethrow_exception(eptr);
} catch (sstables::compaction_stopped_exception& e) {

View File

@@ -843,11 +843,6 @@ public:
replica::distributed_loader::init_system_keyspace(sys_ks, db, ss, gossiper, raft_gr, *cfg, p).get();
}
auto& ks = db.local().find_keyspace(db::system_keyspace::NAME);
parallel_for_each(ks.metadata()->cf_meta_data(), [&ks] (auto& pair) {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
db.invoke_on_all([] (replica::database& db) {