Merge "Fix cleanup of temporary sstable directories" from Benny

"
Cleanup of temporary sstable directories in distributed_loader::populate_column_family
is completely broken and non tested. This code path was never executed since
populate_column_family doesn't currently list subdirectories at all.

This patchset fixes this code path and scans subdirectories in populate_column_family.
Also, a unit test is added for testing the cleanup of incomplete (unsealed) sstables.

Fixes: #4129
"

* 'projects/sst-temp-dir-cleanup/v3' of https://github.com/bhalevy/scylla:
  tests: add test_distributed_loader_with_incomplete_sstables
  tests: single_node_cql_env::do_with: use the provided data_file_directories path if available
  tests: single_node_cql_env::_data_dir is not used
  distributed_loader: populate_column_family should scan directories too
  sstables: fix is_temp_dir
  distributed_loader: populate_column_family: ignore directories other than sstable::is_temp_dir
  distributed_loader: remove temporary sstable directories only on shard 0
  distributed_loader: push future returned by rmdir into futures vector
This commit is contained in:
Paweł Dziepak
2019-01-28 12:23:00 +00:00
4 changed files with 78 additions and 14 deletions

View File

@@ -533,11 +533,19 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
auto verifier = make_lw_shared<std::unordered_map<unsigned long, sstable_descriptor>>();
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
return lister::scan_dir(sstdir, { directory_entry_type::regular, directory_entry_type::directory }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
if (de.type && *de.type == directory_entry_type::directory && sstables::sstable::is_temp_dir(de.name)) {
return lister::rmdir(sstdir / de.name);
// push future returned by probe_file/rmdir into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
if (de.type && *de.type == directory_entry_type::directory) {
fs::path dirpath = sstdir / de.name;
if (engine().cpu_id() == 0 && sstables::sstable::is_temp_dir(dirpath)) {
dblog.info("Found temporary sstable directory: {}, removing", dirpath);
futures.push_back(lister::rmdir(dirpath));
}
return make_ready_future<>();
}
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) {
@@ -571,9 +579,6 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
return make_ready_future<>();
});
// push future returned by probe_file into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
futures.push_back(std::move(f));
return make_ready_future<>();

View File

@@ -382,9 +382,9 @@ public:
return dir + "/" + sst_dir_basename(gen);
}
static bool is_temp_dir(const sstring& dirpath)
static bool is_temp_dir(const fs::path& dirpath)
{
return fs::canonical(fs::path(dirpath)).extension().string() == "sstable";
return dirpath.extension().string() == ".sstable";
}
const sstring& get_dir() const {

View File

@@ -99,7 +99,6 @@ private:
::shared_ptr<sharded<auth::service>> _auth_service;
::shared_ptr<sharded<db::view::view_builder>> _view_builder;
::shared_ptr<sharded<db::view::view_update_generator>> _view_update_generator;
lw_shared_ptr<tmpdir> _data_dir;
private:
struct core_local_state {
service::client_state client_state;
@@ -319,17 +318,20 @@ public:
auto db = ::make_shared<distributed<database>>();
auto cfg = make_lw_shared<db::config>(std::move(cfg_in));
tmpdir data_dir;
auto& data_dir_path = data_dir.path;
if (!cfg->data_file_directories.is_set()) {
cfg->data_file_directories() = {data_dir.path};
cfg->data_file_directories() = {data_dir_path};
} else {
data_dir_path = cfg->data_file_directories()[0];
}
cfg->commitlog_directory() = data_dir.path + "/commitlog.dir";
cfg->hints_directory() = data_dir.path + "/hints.dir";
cfg->view_hints_directory() = data_dir.path + "/view_hints.dir";
cfg->commitlog_directory() = data_dir_path + "/commitlog.dir";
cfg->hints_directory() = data_dir_path + "/hints.dir";
cfg->view_hints_directory() = data_dir_path + "/view_hints.dir";
cfg->num_tokens() = 256;
cfg->ring_delay_ms() = 500;
cfg->experimental() = true;
cfg->shutdown_announce_in_ms() = 0;
boost::filesystem::create_directories((data_dir.path + "/system").c_str());
boost::filesystem::create_directories((data_dir_path + "/system").c_str());
boost::filesystem::create_directories(cfg->commitlog_directory().c_str());
boost::filesystem::create_directories(cfg->hints_directory().c_str());
boost::filesystem::create_directories(cfg->view_hints_directory().c_str());

View File

@@ -20,6 +20,7 @@
*/
#include <seastar/core/reactor.hh>
#include <seastar/core/thread.hh>
#include <seastar/tests/test-utils.hh>
@@ -32,6 +33,9 @@
#include "mutation_source_test.hh"
#include "schema_registry.hh"
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "db/config.hh"
#include "tmpdir.hh"
SEASTAR_TEST_CASE(test_querying_with_limits) {
return do_with_cql_env([](cql_test_env& e) {
@@ -107,3 +111,56 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
using sst = sstables::sstable;
tmpdir data_dir;
db::config db_cfg;
db_cfg.data_file_directories({data_dir.path}, db::config::config_source::CommandLine);
// Create incomplete sstables in test data directory
sstring ks = "system";
sstring cf = "local-7ad54392bcdd35a684174e047860b377";
sstring sst_dir = data_dir.path + "/" + ks + "/" + cf;
auto require_exist = [] (const sstring& name, bool should_exist) {
auto exists = file_exists(name).get0();
BOOST_REQUIRE(exists == should_exist);
};
auto touch_dir = [&require_exist] (const sstring& dir_name) {
recursive_touch_directory(dir_name).get();
require_exist(dir_name, true);
};
auto touch_file = [&require_exist] (const sstring& file_name) {
auto f = open_file_dma(file_name, open_flags::create).get0();
f.close().get();
require_exist(file_name, true);
};
auto temp_sst_dir = sst::temp_sst_dir(sst_dir, 2);
touch_dir(temp_sst_dir);
temp_sst_dir = sst::temp_sst_dir(sst_dir, 3);
touch_dir(temp_sst_dir);
auto temp_file_name = sst::filename(temp_sst_dir, ks, cf, sst::version_types::mc, 3, sst::format_types::big, component_type::TemporaryTOC);
touch_file(temp_file_name);
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::TemporaryTOC);
touch_file(temp_file_name);
temp_file_name = sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::Data);
touch_file(temp_file_name);
do_with_cql_env([&sst_dir, &ks, &cf, &require_exist] (cql_test_env& e) {
require_exist(sst::temp_sst_dir(sst_dir, 2), false);
require_exist(sst::temp_sst_dir(sst_dir, 3), false);
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::TemporaryTOC), false);
require_exist(sst::filename(sst_dir, ks, cf, sst::version_types::mc, 4, sst::format_types::big, component_type::Data), false);
return make_ready_future<>();
}, db_cfg).get();
}