Merge 'Remove datadir string from table::config' from Pavel Emelyanov
The datadir keeps path to directory where local sstables can be. The very same information is now kept in table's storage options (#20542). This set fixes the remaining places that still use table::config::datadir and table::dir() and removes the datadir field. Closes scylladb/scylladb#20675 * github.com:scylladb/scylladb: treewide: Remove table::config::datadir distributed_loader: Print storage options, not datadir data_dictionary: Add formatter for storage_options test: Construct table_for_tests with table storage options test: Generalize pair of make_table_for_tests helpers tests: Add helper to get snapshot directory from storage options table: snapshot_exists: Get directory from storage options table: snapshot_on_all_shards: Get directory from storage options
This commit is contained in:
@@ -415,3 +415,14 @@ auto fmt::formatter<data_dictionary::keyspace_metadata>::format(const data_dicti
|
||||
}
|
||||
return fmt::format_to(ctx.out(), ", userTypes={}}}", m.user_types());
|
||||
}
|
||||
|
||||
auto fmt::formatter<data_dictionary::storage_options>::format(const data_dictionary::storage_options& so, fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
||||
return std::visit(overloaded_functor {
|
||||
[&ctx] (const data_dictionary::storage_options::local& so) -> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "{}", so.dir);
|
||||
},
|
||||
[&ctx] (const data_dictionary::storage_options::s3& so) -> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "s3://{}/{}", so.bucket, so.prefix);
|
||||
}
|
||||
}, so.value);
|
||||
}
|
||||
|
||||
@@ -56,3 +56,9 @@ inline storage_options make_local_options(std::filesystem::path dir) {
|
||||
}
|
||||
|
||||
} // namespace data_dictionary
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<data_dictionary::storage_options> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const data_dictionary::storage_options&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
@@ -1233,7 +1233,6 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
|
||||
for (auto& extra : db_config.data_file_directories()) {
|
||||
cfg.all_datadirs.push_back(format("{}/{}/{}", extra, s.ks_name(), format_table_directory_name(s.cf_name(), s.id())));
|
||||
}
|
||||
cfg.datadir = cfg.all_datadirs[0];
|
||||
cfg.enable_disk_reads = _config.enable_disk_reads;
|
||||
cfg.enable_disk_writes = _config.enable_disk_writes;
|
||||
cfg.enable_commitlog = _config.enable_commitlog;
|
||||
|
||||
@@ -394,7 +394,6 @@ class table : public enable_lw_shared_from_this<table>
|
||||
public:
|
||||
struct config {
|
||||
std::vector<sstring> all_datadirs;
|
||||
sstring datadir;
|
||||
bool enable_disk_writes = true;
|
||||
bool enable_disk_reads = true;
|
||||
bool enable_cache = true;
|
||||
@@ -686,10 +685,6 @@ private:
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
std::function<void(size_t)> reserve_fn) const;
|
||||
public:
|
||||
sstring dir() const {
|
||||
return _config.datadir;
|
||||
}
|
||||
|
||||
const storage_options& get_storage_options() const noexcept { return *_storage_opts; }
|
||||
lw_shared_ptr<const storage_options> get_storage_options_ptr() const noexcept { return _storage_opts; }
|
||||
future<> init_storage();
|
||||
|
||||
@@ -367,7 +367,7 @@ sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_s
|
||||
}
|
||||
|
||||
future<> table_populator::populate_subdir(sstables::sstable_state state, allow_offstrategy_compaction do_allow_offstrategy_compaction) {
|
||||
dblog.debug("Populating {}/{}/{} state={} allow_offstrategy_compaction={}", _ks, _cf, _global_table->dir(), state, do_allow_offstrategy_compaction);
|
||||
dblog.debug("Populating {}/{}/{} state={} allow_offstrategy_compaction={}", _ks, _cf, _global_table->get_storage_options(), state, do_allow_offstrategy_compaction);
|
||||
|
||||
if (!_sstable_directories.contains(state)) {
|
||||
co_return;
|
||||
|
||||
@@ -2601,7 +2601,15 @@ future<> table::write_schema_as_cql(database& db, sstring dir) const {
|
||||
|
||||
// Runs the orchestration code on an arbitrary shard to balance the load.
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
|
||||
auto jsondir = table_shards->_config.datadir + "/snapshots/" + name;
|
||||
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
|
||||
if (so == nullptr) {
|
||||
throw std::runtime_error("Snapshotting non-local tables is not implemented");
|
||||
}
|
||||
if (so->dir.empty()) { // virtual tables don't have initialized local storage
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto jsondir = (so->dir / sstables::snapshots_dir / name).native();
|
||||
auto orchestrator = std::hash<sstring>()(jsondir) % smp::count;
|
||||
|
||||
co_await smp::submit_to(orchestrator, [&] () -> future<> {
|
||||
@@ -2661,7 +2669,12 @@ future<> table::finalize_snapshot(database& db, sstring jsondir, std::vector<sna
|
||||
}
|
||||
|
||||
future<bool> table::snapshot_exists(sstring tag) {
|
||||
sstring jsondir = _config.datadir + "/snapshots/" + tag;
|
||||
auto* so = std::get_if<storage_options::local>(&_storage_opts->value);
|
||||
if (so == nullptr || so->dir.empty()) {
|
||||
co_return false; // Technically it doesn't as snapshots only work for local storage
|
||||
}
|
||||
|
||||
sstring jsondir = (so->dir / sstables::snapshots_dir / tag).native();
|
||||
bool exists = false;
|
||||
try {
|
||||
auto sd = co_await io_check(file_stat, jsondir, follow_symlink::no);
|
||||
|
||||
@@ -645,7 +645,8 @@ future<> s3_storage::remove_by_registry_entry(entry_descriptor desc) {
|
||||
}
|
||||
|
||||
future<> s3_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
|
||||
co_await coroutine::return_exception(std::runtime_error("Snapshotting S3 objects not implemented"));
|
||||
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstable_state state) {
|
||||
|
||||
@@ -576,6 +576,12 @@ future<> take_snapshot(cql_test_env& e, sstring ks_name, sstring cf_name, sstrin
|
||||
return take_snapshot(e.db(), false /* skip_flush */, std::move(ks_name), std::move(cf_name), std::move(snapshot_name));
|
||||
}
|
||||
|
||||
// Helper to get directory a table keeps its data in.
|
||||
// Only suitable for tests, that work with local storage type.
|
||||
fs::path table_dir(const replica::column_family& cf) {
|
||||
return std::get<data_dictionary::storage_options::local>(cf.get_storage_options().value).dir;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_works) {
|
||||
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
|
||||
take_snapshot(e).get();
|
||||
@@ -585,7 +591,7 @@ SEASTAR_TEST_CASE(snapshot_works) {
|
||||
};
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
lister::scan_dir(fs::path(cf.dir()), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
expected.insert(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -593,7 +599,7 @@ SEASTAR_TEST_CASE(snapshot_works) {
|
||||
BOOST_REQUIRE_GT(expected.size(), 1);
|
||||
|
||||
// all files were copied and manifest was generated
|
||||
lister::scan_dir((fs::path(cf.dir()) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
expected.erase(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -612,7 +618,7 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
};
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
lister::scan_dir(fs::path(cf.dir()), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
expected.insert(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -621,7 +627,7 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
BOOST_REQUIRE_EQUAL(expected.size(), 1);
|
||||
|
||||
// all files were copied and manifest was generated
|
||||
lister::scan_dir((fs::path(cf.dir()) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
expected.erase(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -643,7 +649,7 @@ SEASTAR_TEST_CASE(snapshot_list_okay) {
|
||||
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
||||
BOOST_REQUIRE_GT(sd.total, 0);
|
||||
|
||||
lister::scan_dir(fs::path(cf.dir()), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
fs::remove(parent_dir / de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -714,7 +720,7 @@ SEASTAR_TEST_CASE(clear_snapshot) {
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
|
||||
unsigned count = 0;
|
||||
lister::scan_dir((fs::path(cf.dir()) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&count] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&count] (fs::path parent_dir, directory_entry de) {
|
||||
count++;
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -723,7 +729,7 @@ SEASTAR_TEST_CASE(clear_snapshot) {
|
||||
e.local_db().clear_snapshot("test", {"ks"}, "").get();
|
||||
count = 0;
|
||||
|
||||
BOOST_REQUIRE_EQUAL(fs::exists(fs::path(cf.dir()) / sstables::snapshots_dir / "test"), false);
|
||||
BOOST_REQUIRE_EQUAL(fs::exists(table_dir(cf) / sstables::snapshots_dir / "test"), false);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
@@ -739,8 +745,8 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
|
||||
|
||||
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) {
|
||||
auto& t = e.local_db().find_column_family(ks_name, table_name);
|
||||
auto table_dir = fs::path(t.dir());
|
||||
auto snapshots_dir = table_dir / sstables::snapshots_dir;
|
||||
auto tdir = table_dir(t);
|
||||
auto snapshots_dir = tdir / sstables::snapshots_dir;
|
||||
|
||||
for (auto i = 0; i < num_snapshots; i++) {
|
||||
testlog.debug("Taking snapshot {} on {}.{}", snapshot_name(i), ks_name, table_name);
|
||||
@@ -791,11 +797,11 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
|
||||
testlog.debug("Clearing all snapshots in {}.{} after it had been dropped", ks_name, table_name);
|
||||
e.local_db().clear_snapshot("", {ks_name}, table_name).get();
|
||||
|
||||
SCYLLA_ASSERT(!fs::exists(table_dir));
|
||||
SCYLLA_ASSERT(!fs::exists(tdir));
|
||||
|
||||
// after all snapshots had been cleared,
|
||||
// the dropped table directory is expected to be removed.
|
||||
BOOST_REQUIRE_EQUAL(fs::exists(table_dir), false);
|
||||
BOOST_REQUIRE_EQUAL(fs::exists(tdir), false);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -836,7 +842,7 @@ SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
|
||||
BOOST_REQUIRE_EQUAL(sc_sd.details.live, sd.live);
|
||||
BOOST_REQUIRE_EQUAL(sc_sd.details.total, sd.total);
|
||||
|
||||
lister::scan_dir(fs::path(cf.dir()), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
fs::remove(parent_dir / de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -878,7 +884,7 @@ SEASTAR_TEST_CASE(test_snapshot_ctl_true_snapshots_size) {
|
||||
auto sc_live_size = sc.local().true_snapshots_size().get();
|
||||
BOOST_REQUIRE_EQUAL(sc_live_size, sd.live);
|
||||
|
||||
lister::scan_dir(fs::path(cf.dir()), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
lister::scan_dir(table_dir(cf), lister::dir_entry_types::of<directory_entry_type::regular>(), [] (fs::path parent_dir, directory_entry de) {
|
||||
fs::remove(parent_dir / de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
@@ -1334,7 +1340,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
auto& cf = db.local().find_column_family("ks", "cf");
|
||||
|
||||
// all files were copied and manifest was generated
|
||||
co_await lister::scan_dir((fs::path(cf.dir()) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
co_await lister::scan_dir((table_dir(cf) / sstables::snapshots_dir / "test"), lister::dir_entry_types::of<directory_entry_type::regular>(), [&expected] (fs::path parent_dir, directory_entry de) {
|
||||
testlog.debug("Found in snapshots: {}", de.name);
|
||||
expected.erase(de.name);
|
||||
return make_ready_future<>();
|
||||
@@ -1388,7 +1394,7 @@ static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
|
||||
db_cfg_ptr->auto_snapshot(auto_snapshot);
|
||||
|
||||
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
||||
auto cf_dir = e.local_db().find_column_family(ks_name, table_name).dir();
|
||||
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
||||
|
||||
// Pass `with_snapshot=true` to drop_table_on_all
|
||||
// to allow auto_snapshot (based on the configuration above).
|
||||
@@ -1413,7 +1419,7 @@ SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
|
||||
sstring table_name = "table_with_no_snapshot";
|
||||
|
||||
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
||||
auto cf_dir = e.local_db().find_column_family(ks_name, table_name).dir();
|
||||
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
||||
|
||||
// Pass `with_snapshot=false` to drop_table_on_all
|
||||
// to disallow auto_snapshot.
|
||||
@@ -1432,7 +1438,7 @@ SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
|
||||
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
||||
auto snapshot_tag = format("test-{}", db_clock::now().time_since_epoch().count());
|
||||
co_await replica::database::snapshot_table_on_all_shards(e.db(), ks_name, table_name, snapshot_tag, db::snapshot_ctl::snap_views::no, false);
|
||||
auto cf_dir = e.local_db().find_column_family(ks_name, table_name).dir();
|
||||
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
||||
|
||||
// With explicit snapshot and with_snapshot=false
|
||||
// dir should still be kept, regardless of the
|
||||
|
||||
@@ -103,12 +103,11 @@ with_column_family(schema_ptr s, replica::column_family::config cfg, sstables::s
|
||||
for (auto x_log2_compaction_groups : x_log2_compaction_group_values) {
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto dir = tmpdir();
|
||||
cfg.datadir = dir.path().string();
|
||||
cfg.x_log2_compaction_groups = x_log2_compaction_groups;
|
||||
tasks::task_manager tm;
|
||||
auto cm = make_lw_shared<compaction_manager>(tm, compaction_manager::for_testing_tag{});
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto s_opts = make_lw_shared<replica::storage_options>(data_dictionary::make_local_options(fs::path(cfg.datadir)));
|
||||
auto s_opts = make_lw_shared<replica::storage_options>(data_dictionary::make_local_options(dir.path()));
|
||||
auto cf = make_lw_shared<replica::column_family>(s, cfg, s_opts, *cm, sm, *cl_stats, *tracker, nullptr);
|
||||
cf->mark_ready_for_writes(nullptr);
|
||||
co_await func(*cf);
|
||||
|
||||
@@ -137,12 +137,6 @@ schema_ptr table_for_tests::make_default_schema() {
|
||||
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage)
|
||||
: _data(make_lw_shared<data>())
|
||||
{
|
||||
// FIXME -- the storage options here are from sstables::test_env that should have
|
||||
// initialized it properly
|
||||
std::visit(overloaded_functor {
|
||||
[&cfg] (data_dictionary::storage_options::local& o) { o.dir = cfg.datadir; },
|
||||
[&cfg] (data_dictionary::storage_options::s3& o) { o.prefix = cfg.datadir; },
|
||||
}, storage.value);
|
||||
cfg.cf_stats = &_data->cf_stats;
|
||||
_data->s = s ? s : make_default_schema();
|
||||
_data->cf = make_lw_shared<replica::column_family>(_data->s, std::move(cfg), make_lw_shared<replica::storage_options>(storage), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr);
|
||||
@@ -485,18 +479,18 @@ table_for_tests
|
||||
test_env::make_table_for_tests(schema_ptr s, sstring dir) {
|
||||
maybe_start_compaction_manager();
|
||||
auto cfg = make_table_config();
|
||||
cfg.datadir = dir;
|
||||
cfg.enable_commitlog = false;
|
||||
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage);
|
||||
auto storage = _impl->storage;
|
||||
std::visit(overloaded_functor {
|
||||
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
||||
[&dir] (data_dictionary::storage_options::s3& o) { o.prefix = dir; },
|
||||
}, storage.value);
|
||||
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), std::move(storage));
|
||||
}
|
||||
|
||||
table_for_tests
|
||||
test_env::make_table_for_tests(schema_ptr s) {
|
||||
maybe_start_compaction_manager();
|
||||
auto cfg = make_table_config();
|
||||
cfg.datadir = _impl->dir.path().native();
|
||||
cfg.enable_commitlog = false;
|
||||
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage);
|
||||
return make_table_for_tests(std::move(s), _impl->dir.path().native());
|
||||
}
|
||||
|
||||
void test_env::request_abort() {
|
||||
|
||||
Reference in New Issue
Block a user