test: sstable_directory_test: extract sstable_directory creation into with_sstable_directory

Use common code to create, start, and stop the sharded<sstable_directory>
for each test.

This will be used in the next patch for creating a sharded semaphore
and passing it to the sstable_directory.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-10-07 19:32:17 +03:00
parent f4269e3a04
commit dc46aaa3fd

View File

@@ -123,6 +123,31 @@ highest_generation_seen(sharded<sstables::sstable_directory>& dir) {
});
}
// Called from a seastar thread
static void with_sstable_directory(
std::filesystem::path path,
unsigned load_parallelism,
sstable_directory::need_mutate_level need_mutate,
sstable_directory::lack_of_toc_fatal fatal_nontoc,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters eddiocc,
sstable_directory::allow_loading_materialized_view almv,
sstable_directory::sstable_object_from_existing_fn sstable_from_existing,
noncopyable_function<void (sharded<sstable_directory>&)> func) {
sharded<sstable_directory> sstdir;
auto stop_sstdir = defer([&sstdir] {
sstdir.stop().get();
});
auto wrapped_sfe = [&sstable_from_existing] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return sstable_from_existing(std::move(dir), gen, v, f);
};
sstdir.start(std::move(path), load_parallelism, need_mutate, fatal_nontoc, eddiocc, almv, std::move(wrapped_sfe)).get();
func(sstdir);
}
SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_scan) {
sstables::test_env::do_with_async([] (test_env& env) {
auto dir = tmpdir();
@@ -132,22 +157,18 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_simple_empty_directory_sca
auto f = open_file_dma(manifest.native(), open_flags::wo | open_flags::create | open_flags::truncate).get0();
f.close().get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
// No generation found on empty directory.
BOOST_REQUIRE_EQUAL(max_generation_seen, 0);
});
}).get();
}
@@ -161,20 +182,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_scan_incomplete_sstables)
// We should fail validation and leave the directory untouched
remove_file(sst->filename(sstables::component_type::Statistics)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
});
}).get();
}
@@ -185,20 +202,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_temporary_toc) {
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
auto expect_ok = distributed_loader::process_sstable_dir(sstdir);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
});
}).get();
}
@@ -210,35 +223,27 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_table_missing_toc) {
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env), dir.path(), 1));
remove_file(sst->filename(sstables::component_type::TOC)).get();
sharded<sstable_directory> sstdir_fatal;
sstdir_fatal.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir_fatal) {
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
});
sharded<sstable_directory> sstdir_ok;
sstdir_ok.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop_ok = defer([&sstdir_ok] {
sstdir_ok.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir_ok) {
auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
});
}).get();
}
@@ -255,41 +260,33 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
f.close().get();
auto tempstat = fs::canonical(fs::path(tempstr));
sharded<sstable_directory> sstdir_ok;
sstdir_ok.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop_ok= defer([&sstdir_ok] {
sstdir_ok.stop().get();
});
sstable_from_existing_file(env),
[&dir, &tempstat] (sharded<sstables::sstable_directory>& sstdir_ok) {
auto expect_ok = distributed_loader::process_sstable_dir(sstdir_ok);
BOOST_REQUIRE_NO_THROW(expect_ok.get());
lister::scan_dir(dir.path(), { directory_entry_type::regular }, [tempstat] (fs::path parent_dir, directory_entry de) {
BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat);
return make_ready_future<>();
}).get();
});
remove_file(sst->filename(sstables::component_type::Statistics)).get();
sharded<sstable_directory> sstdir_fatal;
sstdir_fatal.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::no,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop_fatal = defer([&sstdir_fatal] {
sstdir_fatal.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir_fatal) {
auto expect_malformed_sstable = distributed_loader::process_sstable_dir(sstdir_fatal);
BOOST_REQUIRE_THROW(expect_malformed_sstable.get(), sstables::malformed_sstable_exception);
});
}).get();
}
@@ -301,21 +298,17 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_generation_sanity) {
auto sst = make_sstable_for_this_shard(std::bind(new_sstable, std::ref(env.local()), dir.path(), 6666));
rename_file(sst->filename(sstables::component_type::TOC), sst->filename(sstables::component_type::TemporaryTOC)).get();
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
int64_t max_generation_seen = highest_generation_seen(sstdir).get0();
BOOST_REQUIRE_EQUAL(max_generation_seen, 3333);
});
}).get();
}
@@ -351,20 +344,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_gene
}).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
});
}).get();
}
@@ -383,20 +372,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_unmatched_ge
}).get();
}
sharded<sstable_directory> sstdir;
sstdir.start(dir.path(), 1,
with_sstable_directory(dir.path(), 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
sstable_directory::allow_loading_materialized_view::no,
sstable_from_existing_file(env)).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
sstable_from_existing_file(env),
[] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, smp::count).get();
});
}).get();
}
@@ -468,8 +453,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
with_sstable_directory(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
@@ -477,12 +461,8 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
},
[&e, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -496,6 +476,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_correctly) {
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
});
});
}
@@ -521,8 +502,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++ * smp::count, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
with_sstable_directory(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
@@ -530,12 +510,8 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
},
[&e, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -549,6 +525,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_distributes_well_eve
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, smp::count * smp::count).get();
});
});
}
@@ -574,8 +551,7 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
make_sstable_for_all_shards(e.db().local(), cf, upload_path.native(), generation++, sstables::sstable_version_types::mc, sstables::sstable::format_types::big);
}
sharded<sstable_directory> sstdir;
sstdir.start(upload_path, 1,
with_sstable_directory(upload_path, 1,
sstable_directory::need_mutate_level::no,
sstable_directory::lack_of_toc_fatal::yes,
sstable_directory::enable_dangerous_direct_import_of_cassandra_counters::no,
@@ -583,12 +559,8 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
[&e] (fs::path dir, int64_t gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.make_sstable(dir.native(), gen, v, f);
}).get();
auto stop = defer([&sstdir] {
sstdir.stop().get();
});
},
[&, upload_path] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader::process_sstable_dir(sstdir).get();
verify_that_all_sstables_are_local(sstdir, 0).get();
@@ -602,5 +574,6 @@ SEASTAR_TEST_CASE(sstable_directory_shared_sstables_reshard_respect_max_threshol
return cf.make_sstable(upload_path.native(), generation, sstables::sstable::version_types::mc, sstables::sstable::format_types::big);
}).get();
verify_that_all_sstables_are_local(sstdir, 2 * smp::count * smp::count).get();
});
});
}