diff --git a/distributed_loader.hh b/distributed_loader.hh index 82871ef724..e6a8ae176d 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -58,32 +58,33 @@ class storage_service; } class distributed_loader { -public: + friend class distributed_loader_for_tests; + static future<> reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator); static future<> process_sstable_dir(sharded& dir, bool sort_sstables_according_to_owner = true); static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name); - - static future<> verify_owner_and_mode(std::filesystem::path path); - static future make_sstables_available(sstables::sstable_directory& dir, sharded& db, sharded& view_update_generator, std::filesystem::path datadir, sstring ks, sstring cf); - static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, - distributed& view_update_generator, sstring ks_name, sstring cf_name); + static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); + static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); + static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); + static future<> handle_sstables_pending_delete(sstring pending_deletes_dir); + +public: + static future<> init_system_keyspace(distributed& db, distributed& ss); + static future<> init_non_system_keyspaces(distributed& db, distributed& proxy); + static future<> ensure_system_table_directories(distributed& db); + + static future<> verify_owner_and_mode(std::filesystem::path path); // Scan sstables under upload directory. Return a vector with smp::count entries. // Each entry with index of idx should be accessed on shard idx only. // Each entry contains a vector of sstables for this shard. // The table UUID is returned too. static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf); - static future<> populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); - static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); - static future<> init_system_keyspace(distributed& db, distributed& ss); - static future<> ensure_system_table_directories(distributed& db); - static future<> init_non_system_keyspaces(distributed& db, distributed& proxy); -private: - static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); - static future<> handle_sstables_pending_delete(sstring pending_deletes_dir); + static future<> process_upload_dir(distributed& db, distributed& sys_dist_ks, + distributed& view_update_generator, sstring ks_name, sstring cf_name); }; diff --git a/test/boost/sstable_directory_test.cc b/test/boost/sstable_directory_test.cc index c8e0b8d5c1..4824c80f66 100644 --- a/test/boost/sstable_directory_test.cc +++ b/test/boost/sstable_directory_test.cc @@ -32,6 +32,19 @@ #include "fmt/format.h" +class distributed_loader_for_tests { +public: + static future<> process_sstable_dir(sharded& dir, bool sort_sstables_according_to_owner = true) { + return distributed_loader::process_sstable_dir(dir, sort_sstables_according_to_owner); + } + static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name) { + return distributed_loader::lock_table(dir, db, std::move(ks_name), std::move(cf_name)); + } + static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator) { + return distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator)); + } +}; + schema_ptr test_table_schema() { static thread_local auto s = [] { schema_builder builder(make_shared_schema( @@ -171,7 +184,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); // No generation found on empty directory. BOOST_REQUIRE_EQUAL(max_generation_seen, 0); @@ -196,7 +209,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir); + auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); }); @@ -216,7 +229,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_temporary_toc) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - auto expect_ok = distributed_loader::process_sstable_dir(sstdir); + auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); @@ -236,7 +249,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - auto expect_ok = distributed_loader::process_sstable_dir(sstdir); + auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); @@ -257,7 +270,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir_fatal) { - auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal); + auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); @@ -268,7 +281,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir_ok) { - auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok); + auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); @@ -294,7 +307,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [&dir, &tempstat] (sharded& sstdir_ok) { - auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok); + auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok); BOOST_REQUIRE_NO_THROW(expect_ok.get()); lister::scan_dir(dir.path(), { directory_entry_type::regular }, [tempstat] (fs::path parent_dir, directory_entry de) { BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat); @@ -311,7 +324,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir_fatal) { - auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal); + auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); }).get(); @@ -332,7 +345,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) { sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); BOOST_REQUIRE_EQUAL(max_generation_seen, 3333); }); @@ -378,7 +391,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); verify_that_all_sstables_are_local(sstdir, smp::count).get(); }); }).get(); @@ -406,7 +419,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge sstable_directory::allow_loading_materialized_view::no, sstable_from_existing_file(env), [] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); verify_that_all_sstables_are_local(sstdir, smp::count).get(); }); }).get(); @@ -440,7 +453,7 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) { sstdir.stop().get(); }); - distributed_loader::lock_table(sstdir, e.db(), ks_name, cf_name).get(); + distributed_loader_for_tests::lock_table(sstdir, e.db(), ks_name, cf_name).get(); auto drop = e.execute_cql("drop table cf"); later().get(); @@ -496,14 +509,14 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) { return cf.make_sstable(dir.native(), gen, v, f); }, [&e, upload_path] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); std::atomic generation_for_test = {}; generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); - distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big); @@ -545,14 +558,14 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve return cf.make_sstable(dir.native(), gen, v, f); }, [&e, upload_path] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); std::atomic generation_for_test = {}; generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); - distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big); @@ -594,14 +607,14 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol return cf.make_sstable(dir.native(), gen, v, f); }, [&, upload_path] (sharded& sstdir) { - distributed_loader::process_sstable_dir(sstdir).get(); + distributed_loader_for_tests::process_sstable_dir(sstdir).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); int64_t max_generation_seen = highest_generation_seen(sstdir).get0(); std::atomic generation_for_test = {}; generation_for_test.store(max_generation_seen + 1, std::memory_order_relaxed); - distributed_loader::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { + distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", [&e, upload_path, &generation_for_test] (shard_id id) { auto generation = generation_for_test.fetch_add(1, std::memory_order_relaxed); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);