diff --git a/distributed_loader.cc b/distributed_loader.cc index b6fe385b0f..3ec418ef2f 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -533,11 +533,19 @@ future<> distributed_loader::populate_column_family(distributed& db, s auto verifier = make_lw_shared>(); return do_with(std::vector>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { + return lister::scan_dir(sstdir, { directory_entry_type::regular, directory_entry_type::directory }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - if (de.type && *de.type == directory_entry_type::directory && sstables::sstable::is_temp_dir(de.name)) { - return lister::rmdir(sstdir / de.name); + // push future returned by probe_file/rmdir into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + if (de.type && *de.type == directory_entry_type::directory) { + fs::path dirpath = sstdir / de.name; + if (engine().cpu_id() == 0 && sstables::sstable::is_temp_dir(dirpath)) { + dblog.info("Found temporary sstable directory: {}, removing", dirpath); + futures.push_back(lister::rmdir(dirpath)); + } + return make_ready_future<>(); } auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) { @@ -571,9 +579,6 @@ future<> distributed_loader::populate_column_family(distributed& db, s return make_ready_future<>(); }); - // push future returned by probe_file into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. futures.push_back(std::move(f)); return make_ready_future<>(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 9fc89e4b7c..6731bdb690 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -382,9 +382,9 @@ public: return dir + "/" + sst_dir_basename(gen); } - static bool is_temp_dir(const sstring& dirpath) + static bool is_temp_dir(const fs::path& dirpath) { - return fs::canonical(fs::path(dirpath)).extension().string() == "sstable"; + return dirpath.extension().string() == ".sstable"; } const sstring& get_dir() const { diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index bbceefef2f..d581ea5e7f 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -99,7 +99,6 @@ private: ::shared_ptr> _auth_service; ::shared_ptr> _view_builder; ::shared_ptr> _view_update_generator; - lw_shared_ptr _data_dir; private: struct core_local_state { service::client_state client_state; @@ -319,17 +318,20 @@ public: auto db = ::make_shared>(); auto cfg = make_lw_shared(std::move(cfg_in)); tmpdir data_dir; + auto& data_dir_path = data_dir.path; if (!cfg->data_file_directories.is_set()) { - cfg->data_file_directories() = {data_dir.path}; + cfg->data_file_directories() = {data_dir_path}; + } else { + data_dir_path = cfg->data_file_directories()[0]; } - cfg->commitlog_directory() = data_dir.path + "/commitlog.dir"; - cfg->hints_directory() = data_dir.path + "/hints.dir"; - cfg->view_hints_directory() = data_dir.path + "/view_hints.dir"; + cfg->commitlog_directory() = data_dir_path + "/commitlog.dir"; + cfg->hints_directory() = data_dir_path + "/hints.dir"; + cfg->view_hints_directory() = data_dir_path + "/view_hints.dir"; cfg->num_tokens() = 256; cfg->ring_delay_ms() = 500; cfg->experimental() = true; cfg->shutdown_announce_in_ms() = 0; - boost::filesystem::create_directories((data_dir.path + "/system").c_str()); + boost::filesystem::create_directories((data_dir_path + "/system").c_str()); boost::filesystem::create_directories(cfg->commitlog_directory().c_str()); boost::filesystem::create_directories(cfg->hints_directory().c_str()); boost::filesystem::create_directories(cfg->view_hints_directory().c_str()); diff --git a/tests/database_test.cc b/tests/database_test.cc index c4f9e17279..6fbcaab78c 100644 --- a/tests/database_test.cc +++ b/tests/database_test.cc @@ -20,6 +20,7 @@ */ +#include #include #include @@ -32,6 +33,9 @@ #include "mutation_source_test.hh" #include "schema_registry.hh" #include "service/migration_manager.hh" +#include "sstables/sstables.hh" +#include "db/config.hh" +#include "tmpdir.hh" SEASTAR_TEST_CASE(test_querying_with_limits) { return do_with_cql_env([](cql_test_env& e) { @@ -107,3 +111,56 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc return make_ready_future<>(); }).get(); } + +SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) { + using sst = sstables::sstable; + + tmpdir data_dir; + db::config db_cfg; + + db_cfg.data_file_directories({data_dir.path}, db::config::config_source::CommandLine); + + // Create incomplete sstables in test data directory + sstring ks = "system"; + sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring sst_dir = data_dir.path + "/" + ks + "/" + cf; + + auto require_exist = [] (const sstring& name, bool should_exist) { + auto exists = file_exists(name).get0(); + BOOST_REQUIRE(exists == should_exist); + }; + + auto touch_dir = [&require_exist] (const sstring& dir_name) { + recursive_touch_directory(dir_name).get(); + require_exist(dir_name, true); + }; + + auto touch_file = [&require_exist] (const sstring& file_name) { + auto f = open_file_dma(file_name, open_flags::create).get0(); + f.close().get(); + require_exist(file_name, true); + }; + + auto temp_sst_dir = sst::temp_sst_dir(sst_dir, 2); + touch_dir(temp_sst_dir); + + temp_sst_dir = sst::temp_sst_dir(sst_dir, 3); + touch_dir(temp_sst_dir); + auto temp_file_name = sst::filename(temp_sst_dir, ks, cf, sst::version_types::mc, 3, sst::format_types::big, component_type::TemporaryTOC); + touch_file(temp_file_name); + + temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::TemporaryTOC); + touch_file(temp_file_name); + temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::Data); + touch_file(temp_file_name); + + do_with_cql_env([&sst_dir, &ks, &cf, &require_exist] (cql_test_env& e) { + require_exist(sst::temp_sst_dir(sst_dir, 2), false); + require_exist(sst::temp_sst_dir(sst_dir, 3), false); + + require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::TemporaryTOC), false); + require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::Data), false); + + return make_ready_future<>(); + }, db_cfg).get(); +}