/* * Copyright (C) 2020-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include "sstables/generation_type.hh" #undef SEASTAR_TESTING_MAIN #include #include #include "sstables/shared_sstable.hh" #include "sstables/sstable_directory.hh" #include "replica/distributed_loader.hh" #include "replica/global_table_ptr.hh" #include "test/lib/sstable_utils.hh" #include "test/lib/cql_test_env.hh" #include "test/lib/tmpdir.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" #include "test/lib/cql_assertions.hh" #include "utils/lister.hh" #include "db/config.hh" #include #include #include class distributed_loader_for_tests { public: static future<> process_sstable_dir(sharded& dir, sstable_directory::process_flags flags) { return replica::distributed_loader::process_sstable_dir(dir, flags); } static future<> lock_table(sharded& dir, sharded& db, sstring ks_name, sstring cf_name) { auto gtable = co_await replica::get_table_on_all_shards(db, ks_name, cf_name); co_await replica::distributed_loader::lock_table(gtable, dir); } static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr) { return replica::distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator), std::move(owned_ranges_ptr)); } }; schema_ptr test_table_schema() { static thread_local auto s = [] { schema_builder builder("ks", "cf", generate_legacy_id("ks", "cf"), bytes_type); builder.with_column("p", bytes_type, column_kind::partition_key); builder.with_column("c", int32_type); return builder.build(schema_builder::compact_storage::no); }(); return s; } using namespace sstables; // Must be called from a seastar thread. sstables::shared_sstable make_sstable_for_this_shard(std::function sst_factory) { auto s = test_table_schema(); auto key = tests::generate_partition_key(s); mutation m(s, key); m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0)); return make_sstable_containing(sst_factory, {m}); } /// Create a shared SSTable belonging to all shards for the following schema: "create table cf (p text PRIMARY KEY, c int)" /// /// Arguments passed to the function are passed to table::make_sstable template sstables::shared_sstable make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation) { // Unlike the previous helper, we'll assume we're in a thread here. It's less flexible // but the users are usually in a thread, and rewrite_toc_without_component requires // a thread. We could fix that, but deferring that for now. auto s = table.schema(); auto mt = make_lw_shared(s); for (shard_id shard = 0; shard < smp::count; ++shard) { auto key = tests::generate_partition_key(s, shard); mutation m(s, key); m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0)); mt->apply(std::move(m)); } auto sst = table.get_sstables_manager().make_sstable(s, table.get_storage_options(), generation, state); write_memtable_to_sstable(*mt, sst).get(); mt->clear_gently().get(); // We can't write an SSTable with bad sharding, so pretend // it came from Cassandra testlog.debug("make_sstable_for_all_shards: {}: rewriting TOC", sst->get_filename()); sstables::test(sst).remove_component(sstables::component_type::Scylla).get(); sstables::test(sst).rewrite_toc_without_component(sstables::component_type::Scylla); return sst; } sstables::shared_sstable new_sstable(sstables::test_env& env, sstables::generation_type gen) { testlog.debug("new_sstable: dir={} gen={}", env.tempdir().path(), gen); return env.make_sstable(test_table_schema(), gen); } sstables::shared_sstable new_env_sstable(sstables::test_env& env) { testlog.debug("new_env_sstable: dir={}", env.tempdir().path()); return env.make_sstable(test_table_schema()); } class wrapped_test_env { std::function _get_mgr; std::optional tmpdir_opt; fs::path _tmpdir_path; public: wrapped_test_env(sstables::test_env& env) : _get_mgr([m = &env.manager()] { return m; }) , _tmpdir_path(env.tempdir().path()) {} // This variant this transportable across shards wrapped_test_env(sharded& env) : _get_mgr([s = &env] { return &s->local().manager(); }) , _tmpdir_path(env.local().tempdir().path()) {} sstables_manager& get_manager() { return *_get_mgr(); } const fs::path& tmpdir_path() noexcept { return _tmpdir_path; } }; // Called from a seastar thread static void with_sstable_directory( fs::path path, sstables::sstable_state state, wrapped_test_env env_wrap, noncopyable_function&)> func) { testlog.debug("with_sstable_directory: {}/{}", path, state); sharded sstdir; auto stop_sstdir = defer([&sstdir] { // The func is allowed to stop sstdir, and some tests actually do it if (sstdir.local_is_initialized()) { sstdir.stop().get(); } }); sstdir.start(seastar::sharded_parameter([&env_wrap] { return std::ref(env_wrap.get_manager()); }), seastar::sharded_parameter([] { return test_table_schema(); }), seastar::sharded_parameter([] { return std::ref(test_table_schema()->get_sharder()); }), path.native(), state, default_io_error_handler_gen()).get(); func(sstdir); } static void with_sstable_directory( wrapped_test_env env_wrap, noncopyable_function&)> func) { with_sstable_directory(env_wrap.tmpdir_path(), sstables::sstable_state::normal, std::move(env_wrap), std::move(func)); } static void with_sstable_directory(sharded& db, sstring ks, sstring cf, sstables::sstable_state state, noncopyable_function&)> func) { sharded sstdir; auto gtable = replica::get_table_on_all_shards(db, ks, cf).get(); sstdir.start(gtable.as_sharded_parameter(), state, default_io_error_handler_gen()).get(); auto stop_sstdir = defer([&sstdir] { sstdir.stop().get(); }); func(sstdir); } BOOST_AUTO_TEST_SUITE(sstable_directory_test) SEASTAR_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) { return sstables::test_env::do_with_async([] (test_env& env) { auto& dir = env.tempdir(); // Write a manifest file to make sure it's ignored auto manifest = dir.path() / "manifest.json"; tests::touch_file(manifest.native()).get(); with_sstable_directory(env, [] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, {}).get(); sstdir.invoke_on_all([] (sstables::sstable_directory& d) { BOOST_REQUIRE(d.empty()); }).get(); }); }); } // Test unrecoverable SSTable: missing a file that is expected in the TOC. SEASTAR_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables) { return sstables::test_env::do_with_async([] (test_env& env) { auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), generation_type(this_shard_id()))); // Now there is one sstable to the upload directory, but it is incomplete and one component is missing. // We should fail validation and leave the directory untouched remove_file(test(sst).filename(sstables::component_type::Statistics).native()).get(); with_sstable_directory(env, [] (sharded& sstdir) { auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir, {}); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); }); } // Test scanning a directory with unrecognized file // reproducing https://github.com/scylladb/scylla/issues/10697 SEASTAR_TEST_CASE(sstable_directory_test_table_scan_invalid_file) { return sstables::test_env::do_with_async([] (test_env& env) { auto& dir = env.tempdir(); auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env))); // Add a bogus file in the sstables directory auto name = dir.path() / "bogus"; tests::touch_file(name.native()).get(); with_sstable_directory(env, [] (sharded& sstdir) { auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir, {}); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); }); } // Test always-benign incomplete SSTable: temporaryTOC found SEASTAR_TEST_CASE(sstable_directory_test_table_temporary_toc) { return sstables::test_env::do_with_async([] (test_env& env) { auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env))); rename_file(test(sst).filename(sstables::component_type::TOC).native(), test(sst).filename(sstables::component_type::TemporaryTOC).native()).get(); with_sstable_directory(env, [] (sharded& sstdir) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); } // Test always-benign incomplete SSTable: with extraneous temporaryTOC found SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) { return sstables::test_env::do_with_async([] (test_env& env) { auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env))); link_file(test(sst).filename(sstables::component_type::TOC).native(), test(sst).filename(sstables::component_type::TemporaryTOC).native()).get(); with_sstable_directory(env, [] (sharded& sstdir) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); } // Test the absence of TOC. Behavior is controllable by a flag SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) { return sstables::test_env::do_with_async([] (test_env& env) { auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env))); remove_file(test(sst).filename(sstables::component_type::TOC).native()).get(); with_sstable_directory(env, [] (sharded& sstdir_fatal) { auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal, { .throw_on_missing_toc = true }); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); with_sstable_directory(env, [] (sharded& sstdir_ok) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok, {}); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); }); } // Test the presence of TemporaryStatistics. If the old Statistics file is around // this is benign and we'll just delete it and move on. If the old Statistics file // is not around (but mentioned in the TOC), then this is an error. SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) { sstables::test_env::do_with_sharded_async([] (sharded& env) { auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))); auto tempstr = test(sst).filename(component_type::TemporaryStatistics); tests::touch_file(tempstr.native()).get(); auto statstr = test(sst).filename(component_type::Statistics); with_sstable_directory(env, [&] (sharded& sstdir_ok) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok, {}); BOOST_REQUIRE_NO_THROW(expect_ok.get()); BOOST_REQUIRE(!file_exists(tempstr.native()).get()); BOOST_REQUIRE(file_exists(statstr.native()).get()); // sanity check that we didn't miss the directory itself }); remove_file(statstr.native()).get(); with_sstable_directory(env, [] (sharded& sstdir_fatal) { auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal, {}); BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception); }); }).get(); } // Test that we see the right generation during the scan. Temporary files are skipped SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) { sstables::test_env::do_with_sharded_async([] (sharded& env) { auto sst1 = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))); auto sst2 = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local()))); rename_file(test(sst2).filename(sstables::component_type::TOC).native(), test(sst2).filename(sstables::component_type::TemporaryTOC).native()).get(); std::vector gen1_seen; gen1_seen.resize(smp::count); with_sstable_directory(env, [&] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); sstdir.invoke_on_all([&] (sstables::sstable_directory& sstdir) { return seastar::async([&] { sstdir.do_for_each_sstable([&] (const shared_sstable& sst) { THREADSAFE_BOOST_REQUIRE_EQUAL(sst->generation(), sst1->generation()); THREADSAFE_BOOST_REQUIRE(!gen1_seen[this_shard_id()]); gen1_seen[this_shard_id()] = true; return make_ready_future<>(); }).get(); }); }).get(); }); BOOST_REQUIRE_EQUAL(std::count(gen1_seen.begin(), gen1_seen.end(), true), 1); }).get(); } future<> verify_that_all_sstables_are_local(sharded& sstdir, unsigned expected_sstables) { unsigned count = co_await sstdir.map_reduce0([] (sstable_directory& d) -> future { unsigned ret = 0; co_await d.do_for_each_sstable([&ret] (sstables::shared_sstable sst) { ret++; auto shards = sst->get_shards_for_this_sstable(); THREADSAFE_BOOST_REQUIRE_EQUAL(shards.size(), 1); THREADSAFE_BOOST_REQUIRE_EQUAL(shards[0], this_shard_id()); return make_ready_future<>(); }); co_return ret; }, 0, std::plus()); THREADSAFE_BOOST_REQUIRE_EQUAL(count, expected_sstables); } // Test that all SSTables are seen as unshared, if the generation numbers match what their // shard-assignments expect SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) { sstables::test_env::do_with_sharded_async([] (sharded& env) { sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (shard_id i = 0; i < smp::count; ++i) { env.invoke_on(i, [&sharded_gen] (sstables::test_env& env) { auto generation = std::invoke(sharded_gen.local()); // this is why it is annoying for the internal functions in the test infrastructure to // assume threaded execution return seastar::async([generation, &env] { make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), generation)); }); }).get(); } with_sstable_directory(env, [] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, smp::count).get(); }); }).get(); } // Test that all SSTables are seen as unshared, even if the generation numbers do not match what their // shard-assignments expect SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_generations) { sstables::test_env::do_with_sharded_async([] (sharded& env) { sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (shard_id i = 0; i < smp::count; ++i) { env.invoke_on(i, [&sharded_gen] (sstables::test_env& env) -> future<> { // intentionally generate the generation on a different shard auto generation = co_await sharded_gen.invoke_on((this_shard_id() + 1) % smp::count, [] (auto& gen) { return gen(); }); // this is why it is annoying for the internal functions in the test infrastructure to // assume threaded execution co_return co_await seastar::async([generation, &env] { make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), generation)); }); }).get(); } with_sstable_directory(env, [] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, smp::count).get(); }); }).get(); } SEASTAR_THREAD_TEST_CASE(sstable_directory_foreign_sstable_should_not_load_locally) { if (smp::count == 1) { fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); return; } sstables::test_env::do_with_sharded_async([] (sharded& env) { auto sstables_opened_for_reading = [&env] () { return env.map_reduce0([] (sstables::test_env& env) { return sstables_stats::get_shard_stats().open_for_reading; }, 0, [] (auto res, auto gen) {return res + gen;}).get(); }; for (shard_id i = 0; i < smp::count; ++i) { env.invoke_on(i, [] (sstables::test_env& env) -> future<> { co_return co_await seastar::async([&env] { make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), generation_type((this_shard_id() + 1) % smp::count))); }); }).get(); } auto sstables_open_before_process = sstables_opened_for_reading(); with_sstable_directory(env, [](sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); }); // verify that all the sstables were loaded only once BOOST_REQUIRE_EQUAL(sstables_opened_for_reading(), sstables_open_before_process + smp::count); }).get(); } // Test that the sstable_dir object can keep the table alive against a drop SEASTAR_TEST_CASE(sstable_directory_test_table_lock_works) { return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); auto ks_name = "ks"; auto cf_name = "cf"; std::unordered_map> sstables; testlog.debug("Inserting into cf"); e.execute_cql("insert into cf (p, c) values ('one', 1)").get(); testlog.debug("Flushing cf"); e.db().invoke_on_all([&] (replica::database& db) { auto& cf = db.find_column_family(ks_name, cf_name); return cf.flush(); }).get(); with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::normal, [&] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, {}).get(); // Collect all sstable file names sstdir.invoke_on_all([&] (sstable_directory& d) { return d.do_for_each_sstable([&] (sstables::shared_sstable sst) { sstables[this_shard_id()].push_back(test(sst).filename(sstables::component_type::Data).native()); return make_ready_future<>(); }); }).get(); BOOST_REQUIRE_NE(sstables.size(), 0); distributed_loader_for_tests::lock_table(sstdir, e.db(), ks_name, cf_name).get(); auto drop = e.execute_cql("drop table cf"); auto table_exists = [&] () { try { e.db().invoke_on_all([ks_name, cf_name] (replica::database& db) { db.find_column_family(ks_name, cf_name); }).get(); return true; } catch (replica::no_such_column_family&) { return false; } }; testlog.debug("Waiting until {}.{} is unlisted from the database", ks_name, cf_name); while (table_exists()) { yield().get(); } auto all_sstables_exist = [&] () { std::unordered_map res; for (const auto& [shard, files] : sstables) { for (const auto& f : files) { res[file_exists(f).get()]++; } } return res; }; auto res = all_sstables_exist(); BOOST_REQUIRE_EQUAL(res[false], 0); BOOST_REQUIRE_EQUAL(res[true], sstables.size()); // Stop manually now, to allow for the object to be destroyed and take the // phaser with it. sstdir.stop().get(); drop.get(); BOOST_REQUIRE(!table_exists()); res = all_sstables_exist(); BOOST_REQUIRE_EQUAL(res[false], sstables.size()); BOOST_REQUIRE_EQUAL(res[true], 0); }); }); } SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) { if (smp::count == 1) { fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); return make_ready_future<>(); } return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); auto& cf = e.local_db().find_column_family("ks", "cf"); e.db().invoke_on_all([] (replica::database& db) { auto& cf = db.find_column_family("ks", "cf"); return cf.disable_auto_compaction(); }).get(); unsigned num_sstables = 10 * smp::count; sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (unsigned nr = 0; nr < num_sstables; ++nr) { auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) { return gen(); }).get(); make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation); } with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); auto make_sstable = [&e, &sharded_gen] (shard_id shard) { auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) { return gen(); }).get(); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload); }; distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get(); verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get(); }); }); } // Regression test for #14618 - resharding with non-empty owned_ranges_ptr. SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly_with_owned_ranges) { if (smp::count == 1) { fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); return make_ready_future<>(); } return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); auto& cf = e.local_db().find_column_family("ks", "cf"); e.db().invoke_on_all([] (replica::database& db) { auto& cf = db.find_column_family("ks", "cf"); return cf.disable_auto_compaction(); }).get(); unsigned num_sstables = 10 * smp::count; sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (unsigned nr = 0; nr < num_sstables; ++nr) { auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) { return gen(); }).get(); make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation); } with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); auto make_sstable = [&e, &sharded_gen] (shard_id shard) { auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) { return gen(); }).get(); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload); }; const auto& erm = e.db().local().find_keyspace("ks").get_static_effective_replication_map(); auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(e.db().local().get_keyspace_local_ranges(erm).get()); distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable), std::move(owned_ranges_ptr)).get(); verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get(); }); }); } SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_even_if_files_are_not_well_distributed) { if (smp::count == 1) { fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); return make_ready_future<>(); } return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); auto& cf = e.local_db().find_column_family("ks", "cf"); e.db().invoke_on_all([] (replica::database& db) { auto& cf = db.find_column_family("ks", "cf"); return cf.disable_auto_compaction(); }).get(); unsigned num_sstables = 10 * smp::count; sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (unsigned nr = 0; nr < num_sstables; ++nr) { // always generate the generation on shard#0 auto generation = sharded_gen.invoke_on(0, [] (auto& gen) { return gen(); }).get(); make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation); } with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&e] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); auto make_sstable = [&e, &sharded_gen] (shard_id shard) { auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) { return gen(); }).get(); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload); }; distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get(); verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get(); }); }); } SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshold) { if (smp::count == 1) { fmt::print("Skipping sstable_directory_shared_sstables_reshard_correctly, smp == 1\n"); return make_ready_future<>(); } return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); auto& cf = e.local_db().find_column_family("ks", "cf"); e.db().invoke_on_all([] (replica::database& db) { auto& cf = db.find_column_family("ks", "cf"); return cf.disable_auto_compaction(); }).get(); unsigned num_sstables = (cf.schema()->max_compaction_threshold() + 1) * smp::count; sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); for (unsigned nr = 0; nr < num_sstables; ++nr) { auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) { return gen(); }).get(); make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation); } with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&] (sharded& sstdir) { distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); verify_that_all_sstables_are_local(sstdir, 0).get(); sharded sharded_gen; sharded_gen.start().get(); auto stop_generator = deferred_stop(sharded_gen); auto make_sstable = [&e, &sharded_gen] (shard_id shard) { auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) { return gen(); }).get(); auto& cf = e.local_db().find_column_family("ks", "cf"); return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload); }; distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable)).get(); verify_that_all_sstables_are_local(sstdir, 2 * smp::count * smp::count).get(); }); }); } SEASTAR_THREAD_TEST_CASE(test_multiple_data_dirs) { std::vector data_dirs; data_dirs.resize(2); sstring ks_name = "ks"; sstring tbl_name = "test"; sstring uuid_sstring; cql_test_config cfg; cfg.db_config->data_file_directories({ data_dirs[0].path().native(), data_dirs[1].path().native() }, db::config::config_source::CommandLine); do_with_cql_env_thread([&] (cql_test_env& e) { e.execute_cql(format("create table {}.{} (p text PRIMARY KEY, c int)", ks_name, tbl_name)).get(); auto id = e.local_db().find_uuid(ks_name, tbl_name); uuid_sstring = id.to_sstring(); boost::erase_all(uuid_sstring, "-"); e.execute_cql(format("insert into {}.{} (p, c) values ('one', 1)", ks_name, tbl_name)).get(); e.execute_cql(format("insert into {}.{} (p, c) values ('two', 2)", ks_name, tbl_name)).get(); }, cfg).get(); sstring tbl_dirname = tbl_name + "-" + uuid_sstring; BOOST_REQUIRE(file_exists((data_dirs[0].path() / ks_name / tbl_dirname).native()).get()); BOOST_REQUIRE(file_exists((data_dirs[1].path() / ks_name / tbl_dirname).native()).get()); // All sstables are now in dir-0, because tables flush their sstables // into this dir only. Move all sstables from dir-0 to dir-1 to check // how populator picks sstables from it, not only from "default" dir-0 std::vector filenames; lister::scan_dir(data_dirs[0].path() / ks_name / tbl_dirname, lister::dir_entry_types::of(), [&filenames] (fs::path dir, directory_entry de) { filenames.push_back(de.name); return make_ready_future<>(); }).get(); for (auto&& name : filenames) { rename_file((data_dirs[0].path() / ks_name / tbl_dirname / name).native(), (data_dirs[1].path() / ks_name / tbl_dirname / name).native()).get(); } do_with_cql_env_thread([&] (cql_test_env& e) { auto res = e.execute_cql(format("select * from {}.{}", ks_name, tbl_name)).get(); assert_that(res).is_rows().with_size(2).with_rows({ { utf8_type->decompose("one"), int32_type->decompose(1) }, { utf8_type->decompose("two"), int32_type->decompose(2) } }); }, cfg).get(); } fs::path table_dirname(cql_test_env& e, const fs::path& root, sstring ks, sstring cf) { auto id = e.local_db().find_uuid(ks, cf); sstring uuid_sstring = id.to_sstring(); boost::erase_all(uuid_sstring, "-"); return root / ks / (cf + "-" + uuid_sstring); } SEASTAR_THREAD_TEST_CASE(test_user_datadir_layout) { sstring ks = "ks"; sstring cf = "test"; tmpdir data_dir; fs::path tbl_dirname; cql_test_config cfg; cfg.db_config->data_file_directories({ data_dir.path().native(), }, db::config::config_source::CommandLine); do_with_cql_env_thread([&] (cql_test_env& e) { e.execute_cql(format("create table {}.{} (p text PRIMARY KEY, c int)", ks, cf)).get(); tbl_dirname = table_dirname(e, data_dir.path(), ks, cf); testlog.info("Checking {}.{}: {}", ks, cf, tbl_dirname); }, cfg).get(); BOOST_REQUIRE(file_exists(tbl_dirname.native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::upload_dir).native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::staging_dir).native()).get()); seastar::recursive_remove_directory(tbl_dirname.native()).get(); do_with_cql_env_thread([&] (cql_test_env& e) { /* nothing -- just populate */ }, cfg).get(); BOOST_REQUIRE(file_exists(tbl_dirname.native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::upload_dir).native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::staging_dir).native()).get()); } SEASTAR_THREAD_TEST_CASE(test_system_datadir_layout) { tmpdir data_dir; cql_test_config cfg; cfg.db_config->data_file_directories({ data_dir.path().native(), }, db::config::config_source::CommandLine); do_with_cql_env_thread([&] (cql_test_env& e) { auto tbl_dirname = table_dirname(e, data_dir.path(), "system", "local"); testlog.info("Checking system.local: {}", tbl_dirname); BOOST_REQUIRE(file_exists(tbl_dirname.native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::upload_dir).native()).get()); BOOST_REQUIRE(file_exists((tbl_dirname / sstables::staging_dir).native()).get()); tbl_dirname = table_dirname(e, data_dir.path(), "system", "config"); testlog.info("Checking system.config: {}", tbl_dirname); BOOST_REQUIRE(!file_exists(tbl_dirname.native()).get()); }, cfg).get(); } SEASTAR_TEST_CASE(test_pending_log_garbage_collection) { return sstables::test_env::do_with_sharded_async([] (auto& env) { for (auto state : {sstables::sstable_state::normal, sstables::sstable_state::staging}) { auto base = env.local().tempdir().path() / fmt::to_string(table_id::create_random_id()); auto dir = base / fmt::to_string(state); recursive_touch_directory(dir.native()).get(); auto new_sstable = [&] { return env.local().make_sstable(test_table_schema(), dir.native()); }; std::vector ssts_to_keep; for (int i = 0; i < 2; i++) { ssts_to_keep.emplace_back(make_sstable_for_this_shard(new_sstable)); } testlog.debug("SSTables to keep: {}", ssts_to_keep); std::vector ssts_to_remove; for (int i = 0; i < 3; i++) { ssts_to_remove.emplace_back(make_sstable_for_this_shard(new_sstable)); } testlog.debug("SSTables to remove: {}", ssts_to_remove); // Now start atomic deletion -- create the pending deletion log for all // three sstables, move TOC file for one of them into temporary-TOC, and // partially delete another auto base_opened_dir = opened_directory(base); sstable_directory::create_pending_deletion_log(base_opened_dir, ssts_to_remove).get(); rename_file(test(ssts_to_remove[1]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[1]).filename(sstables::component_type::TemporaryTOC).native()).get(); rename_file(test(ssts_to_remove[2]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[2]).filename(sstables::component_type::TemporaryTOC).native()).get(); remove_file(test(ssts_to_remove[2]).filename(sstables::component_type::Data).native()).get(); // mimic distributed_loader table_populator::start order // as the pending_delete_dir is now shared, at the table base directory if (state != sstables::sstable_state::normal) { with_sstable_directory(base, sstables::sstable_state::normal, env, [&] (sharded& sstdir) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true }); BOOST_REQUIRE_NO_THROW(expect_ok.get()); }); } with_sstable_directory(base, state, env, [&] (sharded& sstdir) { auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true, .garbage_collect = true }); BOOST_REQUIRE_NO_THROW(expect_ok.get()); auto collected = sstdir.map_reduce0( [] (auto& sstdir) { return do_with(std::set(), [&sstdir] (auto& gens) { return sstdir.do_for_each_sstable([&] (const shared_sstable& sst) { gens.emplace(sst->generation()); return make_ready_future<>(); }).then([&gens] () mutable -> future> { return make_ready_future>(std::move(gens));; }); }); }, std::set(), [] (auto&& res, auto&& gens) { res.merge(gens); return std::move(res); } ).get(); std::set expected; for (auto& sst : ssts_to_keep) { expected.insert(sst->generation()); } BOOST_REQUIRE_EQUAL(expected, collected); }); } }); } BOOST_AUTO_TEST_SUITE_END()