Merge "delete_atomically recovery" from Benny

"
The delete_atomically function is required to delete a set of sstables
atomically. I.e. Either delete all or none of them.  Deleting only
some sstables in the set might result in data resurrection in case
sstable A holding tombstone that cover mutation in sstable B, is deleted,
while sstable B remains.

This patchset introduces a log file holding a list of SSTable TOC files
to delete for recovering a partial delete_atomically operation.

A new subdirectory is create in the sstables dir called `pending_delete`
holding in-flight logs.

The logs are created with a temporary name (using a .tmp suffix)
and renamed to the final .log name once ready.  This indicates
the commit point for the operation.

When populating the column family, all files in the pending_delete
sub-directory are examined.  Temporary log files are just removed,
and committed log files are read, replayed, and deleted.

Fixes #4082

Tests: unit (dev), database_test (debug)
"

* 'projects/delete_atomically_recovery/v5' of https://github.com/bhalevy/scylla:
  tests: database_test: add test_distributed_loader_with_pending_delete
  distributed_loader: replay and cleanup pending_delete log files
  distributed_loader: populated_column_family: separate temp sst dirs cleanup phase
  docs: add sstables-directory-structure.md
  sstables: commit sstables to delete_atomically into a pending_delete log file
  sstables: delete_atomically: delete sstables in a thread
  sstables: component_basename: reuse with sstring component
  sstables: introduce component_basename
  database: maybe_delete_large_partitions_entry: do not access sstable and do not mask exceptions
  sstables: add delete_sstable_and_maybe_large_data_entries
  sstables: call remove_by_toc_name in dtor if marked_for_deletion
This commit is contained in:
Avi Kivity
2019-02-22 15:37:17 +02:00
8 changed files with 608 additions and 81 deletions

View File

@@ -37,19 +37,6 @@ future<> large_data_handler::maybe_update_large_partitions(const sstables::sstab
return make_ready_future<>();
}
future<> large_data_handler::maybe_delete_large_partitions_entry(const sstables::sstable& sst) const {
try {
if (!_stopped && sst.data_size() > _partition_threshold_bytes) {
const schema& s = *sst.get_schema();
return delete_large_partitions_entry(s, sst.get_filename());
}
} catch (...) {
// no-op
}
return make_ready_future<>();
}
logging::logger cql_table_large_data_handler::large_data_logger("large_data");
future<> cql_table_large_data_handler::update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& key, uint64_t partition_size) const {

View File

@@ -65,7 +65,13 @@ public:
}
future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const;
future<> maybe_delete_large_partitions_entry(const sstables::sstable& sst) const;
future<> maybe_delete_large_partitions_entry(const schema& s, const sstring& filename, uint64_t data_size) const {
if (!_stopped && __builtin_expect(data_size > _partition_threshold_bytes, false)) {
return delete_large_partitions_entry(s, filename);
}
return make_ready_future<>();
}
const large_data_handler::stats& stats() const { return _stats; }

View File

@@ -517,7 +517,75 @@ future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<da
});
}
future<> distributed_loader::populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
static future<> execute_futures(std::vector<future<>>& futures) {
return seastar::when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
std::exception_ptr eptr;
for (auto& f : ret) {
try {
if (eptr) {
f.ignore_ready_future();
} else {
f.get();
}
} catch(...) {
eptr = std::current_exception();
}
}
if (eptr) {
return make_exception_future<>(eptr);
}
return make_ready_future<>();
});
}
future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) {
return do_with(std::vector<future<>>(), [sstdir = std::move(sstdir)] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::directory }, [&futures] (fs::path sstdir, directory_entry de) {
// push futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path dirpath = sstdir / de.name;
if (sstables::sstable::is_temp_dir(dirpath)) {
dblog.info("Found temporary sstable directory: {}, removing", dirpath);
futures.push_back(lister::rmdir(dirpath));
}
return make_ready_future<>();
}).then([&futures] {
return execute_futures(futures);
});
});
}
future<> distributed_loader::handle_sstables_pending_delete(sstring pending_delete_dir) {
return do_with(std::vector<future<>>(), [dir = std::move(pending_delete_dir)] (std::vector<future<>>& futures) {
return lister::scan_dir(dir, { directory_entry_type::regular }, [&futures] (fs::path dir, directory_entry de) {
// push nested futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path file_path = dir / de.name;
if (file_path.extension() == ".tmp") {
dblog.info("Found temporary pending_delete log file: {}, deleting", file_path);
futures.push_back(remove_file(file_path.string()));
} else if (file_path.extension() == ".log") {
dblog.info("Found pending_delete log file: {}, replaying", file_path);
auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] {
dblog.debug("Replayed {}, removing", file_path);
return remove_file(file_path.string());
});
futures.push_back(std::move(f));
} else {
dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
}
return make_ready_future<>();
}).then([&futures] {
return execute_futures(futures);
});
});
}
future<> distributed_loader::do_populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
// We can catch most errors when we try to load an sstable. But if the TOC
// file is the one missing, we won't try to load the sstable at all. This
// case is still an invalid case, but it is way easier for us to treat it
@@ -538,21 +606,12 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
auto verifier = make_lw_shared<std::unordered_map<unsigned long, sstable_descriptor>>();
return do_with(std::vector<future<>>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::regular, directory_entry_type::directory }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) {
// FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".")
// push future returned by probe_file/rmdir into an array of futures,
// push future returned by probe_file into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
if (de.type && *de.type == directory_entry_type::directory) {
fs::path dirpath = sstdir / de.name;
if (engine().cpu_id() == 0 && sstables::sstable::is_temp_dir(dirpath)) {
dblog.info("Found temporary sstable directory: {}, removing", dirpath);
futures.push_back(lister::rmdir(dirpath));
}
return make_ready_future<>();
}
auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) {
if (entry.component == component_type::TemporaryStatistics) {
return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation,
@@ -588,26 +647,7 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
return make_ready_future<>();
}, &column_family::manifest_json_filter).then([&futures] {
return when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
std::exception_ptr eptr;
for (auto& f : ret) {
try {
if (eptr) {
f.ignore_ready_future();
} else {
f.get();
}
} catch(...) {
eptr = std::current_exception();
}
}
if (eptr) {
return make_exception_future<>(eptr);
}
return make_ready_future<>();
});
return execute_futures(futures);
}).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] {
return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) {
if (v.second.status == component_status::has_temporary_toc_file) {
@@ -631,6 +671,22 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
}
future<> distributed_loader::populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf) {
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] {
// First pass, cleanup temporary sstable directories and sstables pending delete.
if (engine().cpu_id() == 0) {
cleanup_column_family_temp_sst_dirs(sstdir).get();
auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename();
auto exists = file_exists(pending_delete_dir).get0();
if (exists) {
handle_sstables_pending_delete(pending_delete_dir).get();
}
}
// Second pass, cleanup sstables with temporary TOCs and load the rest.
do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get();
});
}
future<> distributed_loader::populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name) {
auto ksdir = datadir + "/" + ks_name;
auto& keyspaces = db.local().get_keyspaces();

View File

@@ -68,4 +68,8 @@ public:
static future<> init_system_keyspace(distributed<database>& db);
static future<> ensure_system_table_directories(distributed<database>& db);
static future<> init_non_system_keyspaces(distributed<database>& db, distributed<service::storage_proxy>& proxy);
private:
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);
static future<> do_populate_column_family(distributed<database>& db, sstring sstdir, sstring ks, sstring cf);
};

View File

@@ -0,0 +1,196 @@
# Introduction
SSTables are stored as a set of regular files in the file system
in a common directory per-table (a.k.a. column family).
In addition to SSTable files, sub-directories of the table base directory
are used for additional features such as snapshots, and atomic deletions recovery.
This document summarizes the directory structure and file organization of SSTables.
# Directory Hierarchy
Scylla uses the following directory structure to store all its SSTables, for example:
```
/var/lib/scylla
└── data
├── ks
│   ├── cf-6749a080303111e9b2f8000000000000
│   │   ├── ...
│   │   ├── mc-2-big-TOC.txt
│   │   ├── snapshots
│   │   │   └── 1550133010687-cf
│   │   │   ├── manifest.json
│   │   │   ├── ...
│   │   │   └── mc-1-big-TOC.txt
│   │   ├── staging
│   │   └── upload
│   └── cf-7ec943202fc611e9a130000000000000
│   ├── snapshots
│   │   └── 1550132311207-cf
│   │   ├── ...
│   │   ├── ks-cf-ka-3-TOC.txt
│   │   └── manifest.json
│   ├── staging
│   └── upload
├── system
│   ├── schema_columnfamilies-45f5b36024bc3f83a3631034ea4fa697
│   │   ├── staging
│   │   └── upload
│   ├── ...
├── ...
```
Each keyspace, including system keyspaces has its own sub-directory
under the data directory (See `data_file_directories` in scylla.yaml).
The keyspace directory name is the keyspace name.
In the keyspace directory there is a sub-directory per table
which is named by the table name followed by a dash and a unique identifier
to distinguish between different incarnations of tables that are called with the same name.
In the table directory there are the SSTable files and additional
sub-directories as documented below.
# SSTable Files
SSTables are comprised of multiple component files.
The component file names are self-identifying and denote the component type, as well as per-sstable-format metadata.
Here are the different component types and their naming convention:
* Data (`Data.db`)
The SSTable data file, containing a part of the actual data stored in the database.
See [SSTables-Data-File](https://github.com/scylladb/scylla/wiki/SSTables-Data-File) for more information.
* Primary Index (`Index.db`)
Index of the row keys with pointers to their positions in the data file.
See [SSTables-Index-File](https://github.com/scylladb/scylla/wiki/SSTables-Index-File) for more information.
* Bloom filter (`Filter.db`)
A structure stored in memory that checks if row data exists in the memtable before accessing SSTables on disk.
* Compression Information (`CompressionInfo.db`)
A file holding information about uncompressed data length, chunk offsets and other compression information.
* Statistics (`Statistics.db`)
Statistical metadata about the content of the SSTable and encoding statistics for the data file, starting with the mc format.
* Digest (`Digest.crc32`, `Digest.adler32`, `Digest.sha1`)
A file holding checksum of the data file.
The method used for checksum is specific to the SSTable format version (See below).
* CRC (`CRC.db`)
A file holding the CRC32 for chunks in an uncompressed file.
* SSTable Index Summary (`Summary.db`)
A sample of the partition index stored in memory.
* SSTable Table of Contents (`TOC.txt`)
A file that stores the list of all components for the SSTable TOC.
See details below regarding the use of a temporary TOC name during creation and deletion of SSTables.
* Scylla (`Scylla.db`)
A file holding scylla-specific metadata about the SSTable, such as sharding information, extended features support, and sstabe-run identifier.
## SSTable Format Version
SSTable's on-disk format has changed over time.
Three versions are currently supported by Scylla: `ka`, `la`, and `mc`.
Cassandra's convention is that the first letter determines
the major format version, in ascending order, and the second letter -
the minor version, starting from `a` onward.
The SSTable file names identify the SSTable format version.
In addition, they provide the SSTable generation number and other metadata.
The "encoding" of the above metadata into the file name changed over time
and it is version specific, as follows:
```
mc-<generation>-<big>-<component>
la-<generation>-<big>-<component>
<keyspace>-<column_family>-ka-<generation>-<component>
```
where:
* `<generation>` is the SSTable generation - a unique positive number identifying the SSTable.
* `<big>` is an archaic attribute that identifies the SSTable sub-format.
(Only `big` sub-format is supported by Scylla (and Cassandra) at this time.)
* `<component>` is the file's component type, as described above.
## Table Sub-directories
The per-table directory may contain several sub-directories, as listed below:
* Staging directory (`staging`)
A sub-directory holding materialized views SSTables during their update process.
* Snapshots directory (`snapshots`)
A sub-directory holding snapshots of SSTables, using hard links to the actual SSTable component files in the table base directory.
* Upload directory (`upload`)
Used for ingesting external SSTables into Scylla on startup.
* Temporary SSTable directory (`<generation>.sstable`)
A directory created when writing new SSTables.
Some file systems (e.g. linux XFS) base their locality-of-use heuristics based on the directory in which files were created.
In this case, if all files are created in one (or a few) directories, block allocation can become very slow.
To overcome this issue, when a SSTable is created, the database creates a new sub-directory using the newly-created SSTable generation, named `<generation>.sstable`
and all SSTable component files are then created in this sub-directory and moved to the table base directory.
* Pending-delete directory (`pending_delete`)
A directory that may hold log files for replaying atomic deletion operations of SSTables.
## Temporary TOC Files
SSTables are immutable. I.e., once written and sealed, they are never re-written.
For data consistency reasons, it is important for the database to determine that a SSTable is complete and valid,
in contrast to a SSTable that might be in a transitional state while being created or while being deleted.
When created and initially written, the table of contents is stored in a TemporaryTOC file - `TOC.txt.tmp`.
It is renamed to `TOC.txt` when the SSTable is sealed and all components are flushed to stable storage and ready to be used.
When a SSTable is removed, `TOC.txt` is first renamed to `TOC.txt.tmp`, and that atomically marks the SSTable as deleted.
# Recovering from crashes
On startup, the database scans all table directories and cleans up all SSTables that are in a transitional state: either partially written or partially deleted.
These SSTables are identified by their TemporaryTOC component, and the loader simply removes them.
In addition, any existing temporary SSTable sub-directories are automatically removed.
## Atomic deletion of SSTables
In certain cases, the database is required to delete a number of SSTable in an atomic manner.
For example, one of the SSTables may hold a tombstone that deletes data that was inserted to a different SSTable, and both are to be deleted as part of compaction.
When such operation is initiated, `delete_atomically` creates a unique, temporary log file in the `pending_delete` sub-directory named:
`sstables-<min_generation>-<max_generation>.log.tmp`, based on the SSTables to-be-deleted minimum and maximum generation numbers.
The log file contains the list of SSTables' TOC filenames (basename only, with no leading path), one TOC per line.
After the temporary log file if written, flushed, and closed; it is renamed to its final name: `sstables-<min_generation>-<max_generation>.log`.
Finally, after the SSTables are removed, the log file is removed from the `pending_delete` sub-directory.
On startup, sealed `pending_delete` log files are replayed and after all requires SSTables are deleted successfully, the log file is deleted.
Any temporary `pending_delete` log files that are found during startup are simply removed, as this is an indication that:
- The atomic delete operation had not started to delete any SSTable, and
- The log file may be partially written.

View File

@@ -2549,35 +2549,36 @@ bool sstable::requires_view_building() const {
return boost::algorithm::ends_with(_dir, "staging") || boost::algorithm::ends_with(_dir, "upload");
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, sstring component) {
sstring v = _version_string.at(version);
sstring g = to_sstring(generation);
sstring f = _format_string.at(format);
switch (version) {
case sstable::version_types::ka:
return ks + "-" + cf + "-" + v + "-" + g + "-" + component;
case sstable::version_types::la:
return v + "-" + g + "-" + f + "-" + component;
case sstable::version_types::mc:
return v + "-" + g + "-" + f + "-" + component;
}
assert(0 && "invalid version");
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, component_type component) {
return component_basename(ks, cf, version, generation, format,
sstable_version_constants::get_component_map(version).at(component));
}
sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, component_type component) {
sstring basename = [&] {
switch (version) {
case sstable::version_types::ka:
return ks + "-" + cf + "-" + _version_string.at(version) + "-" + to_sstring(generation) + "-" +
sstable_version_constants::get_component_map(version).at(component);
case sstable::version_types::la:
return _version_string.at(version) + "-" + to_sstring(generation) + "-" + _format_string.at(format) + "-" +
sstable_version_constants::get_component_map(version).at(component);
case sstable::version_types::mc:
return _version_string.at(version) + "-" + to_sstring(generation) + "-" + _format_string.at(format) + "-" +
sstable_version_constants::get_component_map(version).at(component);
}
assert(0 && "invalid version");
}();
return dir + "/" + basename;
return dir + "/" + component_basename(ks, cf, version, generation, format, component);
}
sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, sstring component) {
static std::unordered_map<version_types, const char*, enum_hash<version_types>> fmtmap = {
{ sstable::version_types::ka, "{0}-{1}-{2}-{3}-{5}" },
{ sstable::version_types::la, "{2}-{3}-{4}-{5}" },
{ sstable::version_types::mc, "{2}-{3}-{4}-{5}" }
};
return dir + "/" + seastar::format(fmtmap[version], ks, cf, _version_string.at(version), to_sstring(generation), _format_string.at(format), component);
return dir + "/" + component_basename(ks, cf, version, generation, format, component);
}
std::vector<std::pair<component_type, sstring>> sstable::all_components() const {
@@ -2856,9 +2857,6 @@ int sstable::compare_by_max_timestamp(const sstable& other) const {
return (ts1 > ts2 ? 1 : (ts1 == ts2 ? 0 : -1));
}
future<>
delete_sstables(std::vector<sstring> tocs);
sstable::~sstable() {
if (_index_file) {
_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) {
@@ -2881,7 +2879,12 @@ sstable::~sstable() {
// clean up unused sstables, and because we'll never reuse the same
// generation number anyway.
try {
delete_sstables({filename(component_type::TOC)}).handle_exception(
// FIXME: need to call large_data_handler.maybe_delete_large_partitions_entry
// - Short term fix plan is passing large_data_handler upon construction and
// using it from this path to update large_data_handler.
// - Longer term fix is to hand off deletion of sstables to a manager that can
// deal with sstable marked to be deleted after the corresponding object is destructed.
remove_by_toc_name(toc_filename()).handle_exception(
[op = background_jobs().start()] (std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
@@ -2922,6 +2925,7 @@ remove_by_toc_name(sstring sstable_toc_name, const io_error_handler& error_handl
auto new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
sstring dir;
sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
if (sstable_io_check(error_handler, file_exists, sstable_toc_name).get0()) {
dir = dirname(sstable_toc_name);
sstable_io_check(error_handler, rename_file, sstable_toc_name, new_toc_name).get();
@@ -3113,24 +3117,150 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
}
// FIXME: although this is unused at the moment
// keep it around to be used for replaying pending_delete logs
future<>
delete_sstables(std::vector<sstring> tocs) {
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
return parallel_for_each(tocs, [] (sstring name) {
return remove_by_toc_name(name);
});
}
future<>
delete_atomically(std::vector<shared_sstable> ssts, const db::large_data_handler& large_data_handler) {
future<> update = parallel_for_each(ssts, [&large_data_handler] (shared_sstable& sst) {
return large_data_handler.maybe_delete_large_partitions_entry(*sst);
sstable::unlink()
{
auto name = toc_filename();
return remove_by_toc_name(name).then_wrapped([name = std::move(name)] (future<> f) {
if (f.failed()) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
// b. in the future sstables_manager is planned to handle sstables deletion.
// c. Eventually we may want to record these failures in a system table
// and notify the administrator about that for manual handling (rather than aborting).
sstlog.warn("Failed to delete {}: {}. Ignoring.", name, f.get_exception());
}
return make_ready_future<>();
});
auto sstables_to_delete_atomically = boost::copy_range<std::vector<sstring>>(ssts
| boost::adaptors::transformed([] (auto&& sst) { return sst->toc_filename(); }));
}
future<> del = delete_sstables(std::move(sstables_to_delete_atomically));
return when_all(std::move(del), std::move(update)).discard_result();
static future<>
maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler)
{
auto name = sst->get_filename();
return large_data_handler.maybe_delete_large_partitions_entry(*sst->get_schema(), name, sst->data_size())
.then_wrapped([name = std::move(name)] (future<> f) {
if (f.failed()) {
// Just log and ignore failures to delete large data entries.
// They are not critical to the operation of the database.
sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, f.get_exception());
}
return make_ready_future<>();
});
}
static future<>
delete_sstable_and_maybe_large_data_entries(shared_sstable sst, const db::large_data_handler& large_data_handler)
{
return when_all(sst->unlink(), maybe_delete_large_data_entry(sst, large_data_handler)).discard_result();
}
future<>
delete_atomically(std::vector<shared_sstable> ssts, const db::large_data_handler& large_data_handler) {
return seastar::async([ssts = std::move(ssts), &large_data_handler] {
sstring sstdir;
min_max_tracker<int64_t> gen_tracker;
for (const auto& sst : ssts) {
gen_tracker.update(sst->generation());
if (sstdir.empty()) {
sstdir = sst->get_dir();
} else {
// All sstables are assumed to be in the same column_family, hence
// sharing their base directory.
assert (sstdir == sst->get_dir());
}
}
sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename();
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
sstlog.trace("Writing {}", tmp_pending_delete_log);
try {
touch_directory(pending_delete_dir).get();
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
// Create temporary pending_delete log file.
auto f = open_file_dma(tmp_pending_delete_log, oflags).get0();
// Write all toc names into the log file.
file_output_stream_options options;
options.buffer_size = 4096;
auto w = file_writer(std::move(f), options);
for (const auto& sst : ssts) {
auto toc = sst->component_basename(component_type::TOC);
w.write(toc.c_str(), toc.size());
w.write("\n", 1);
}
w.flush();
w.close();
auto dir_f = open_directory(pending_delete_dir).get0();
// Once flushed and closed, the temporary log file can be renamed.
rename_file(tmp_pending_delete_log, pending_delete_log).get();
// Guarantee that the changes above reached the disk.
dir_f.flush().get();
dir_f.close().get();
sstlog.debug("{} written successfully.", pending_delete_log);
} catch (...) {
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log), std::current_exception();
}
parallel_for_each(ssts, [&large_data_handler] (shared_sstable sst) {
return delete_sstable_and_maybe_large_data_entries(sst, large_data_handler);
}).get();
// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if
// delete_sstable_and_maybe_large_data_entries failed to remove
// any sstable and ignored the error.
try {
remove_file(pending_delete_log).get();
sstlog.debug("{} removed.", pending_delete_log);
} catch (...) {
sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
});
}
// FIXME: Go through maybe_delete_large_partitions_entry on recovery
// since this is an indication we crashed in the middle of delete_atomically
future<> replay_pending_delete_log(sstring pending_delete_log) {
sstlog.debug("Reading pending_deletes log file {}", pending_delete_log);
return seastar::async([pending_delete_log = std::move(pending_delete_log)] {
sstring pending_delete_dir = dirname(pending_delete_log);
assert(sstable::is_pending_delete_dir(fs::path(pending_delete_dir)));
try {
auto sstdir = dirname(pending_delete_dir);
auto f = open_file_dma(pending_delete_log, open_flags::ro).get0();
auto size = f.size().get0();
auto in = make_file_input_stream(f);
auto text = in.read_exactly(size).get0();
in.close().get();
f.close().get();
sstring all(text.begin(), text.end());
std::vector<sstring> basenames;
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
auto tocs = boost::copy_range<std::vector<sstring>>(basenames
| boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); })
| boost::adaptors::transformed([&sstdir] (auto&& basename) { return sstdir + "/" + basename; }));
delete_sstables(tocs).get();
} catch (...) {
sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
});
}
thread_local sstables_stats::stats sstables_stats::_shard_stats;

View File

@@ -174,6 +174,10 @@ public:
static component_type component_from_sstring(version_types version, sstring& s);
static version_types version_from_sstring(sstring& s);
static format_types format_from_sstring(sstring& s);
static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, component_type component);
static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, sstring component);
static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation,
format_types format, component_type component);
static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation,
@@ -354,6 +358,10 @@ public:
// Return values are those of a trichotomic comparison.
int compare_by_max_timestamp(const sstable& other) const;
sstring component_basename(component_type f) const {
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
sstring filename(const sstring& dir, component_type f) const {
return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
@@ -387,6 +395,15 @@ public:
return dirpath.extension().string() == ".sstable";
}
static sstring pending_delete_dir_basename() {
return "pending_delete";
}
static bool is_pending_delete_dir(const fs::path& dirpath)
{
return dirpath.filename().string() == pending_delete_dir_basename().c_str();
}
const sstring& get_dir() const {
return _dir;
}
@@ -409,6 +426,9 @@ public:
return create_links(dir, _generation);
}
// Delete the sstable by unlinking all sstable files
future<> unlink();
/**
* Note. This is using the Origin definition of
* max_data_age, which is load time. This could maybe
@@ -824,6 +844,11 @@ struct entry_descriptor {
int64_t generation, sstable::format_types format,
component_type component)
: sstdir(sstdir), ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {}
entry_descriptor(sstring ks, sstring cf, sstable::version_types version,
int64_t generation, sstable::format_types format,
component_type component)
: ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {}
};
// Waits for all prior tasks started on current shard related to sstable management to finish.
@@ -849,6 +874,7 @@ future<> await_background_jobs_on_all_shards();
//
// This function only solves the second problem for now.
future<> delete_atomically(std::vector<shared_sstable> ssts, const db::large_data_handler& large_data_handler);
future<> replay_pending_delete_log(sstring log_file);
struct index_sampling_state {
static constexpr size_t default_summary_byte_cost = 2000;

View File

@@ -165,3 +165,125 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
return make_ready_future<>();
}, db_cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
using sst = sstables::sstable;
tmpdir data_dir;
db::config db_cfg;
db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine);
// Create incomplete sstables in test data directory
sstring ks = "system";
sstring cf = "local-7ad54392bcdd35a684174e047860b377";
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename();
auto require_exist = [] (const sstring& name, bool should_exist) {
auto exists = file_exists(name).get0();
if (should_exist) {
BOOST_REQUIRE(exists);
} else {
BOOST_REQUIRE(!exists);
}
};
auto touch_dir = [&require_exist] (const sstring& dir_name) {
recursive_touch_directory(dir_name).get();
require_exist(dir_name, true);
};
auto touch_file = [&require_exist] (const sstring& file_name) {
auto f = open_file_dma(file_name, open_flags::create).get0();
f.close().get();
require_exist(file_name, true);
};
auto write_file = [&require_exist] (const sstring& file_name, const sstring& text) {
auto f = open_file_dma(file_name, open_flags::wo | open_flags::create | open_flags::truncate).get0();
auto buf = temporary_buffer<char>::aligned(f.memory_dma_alignment(), text.size());
::memcpy(buf.get_write(), text.c_str(), text.size());
auto count = f.dma_write(0, buf.get(), text.size()).get0();
BOOST_REQUIRE(count == text.size());
f.close().get();
require_exist(file_name, true);
};
auto component_basename = [&ks, &cf] (int64_t gen, component_type ctype) {
return sst::component_basename(ks, cf, sst::version_types::mc, gen, sst::format_types::big, ctype);
};
auto gen_filename = [&sst_dir, &ks, &cf] (int64_t gen, component_type ctype) {
return sst::filename(sst_dir, ks, cf, sst::version_types::mc, gen, sst::format_types::big, ctype);
};
touch_dir(pending_delete_dir);
// Empty log file
touch_file(pending_delete_dir + "/sstables-0-0.log");
// Empty temporary log file
touch_file(pending_delete_dir + "/sstables-1-1.log.tmp");
const sstring toc_text = "TOC.txt\nData.db\n";
// Regular log file with single entry
write_file(gen_filename(2, component_type::TOC), toc_text);
touch_file(gen_filename(2, component_type::Data));
write_file(pending_delete_dir + "/sstables-2-2.log",
component_basename(2, component_type::TOC) + "\n");
// Temporary log file with single entry
write_file(pending_delete_dir + "/sstables-3-3.log.tmp",
component_basename(3, component_type::TOC) + "\n");
// Regular log file with multiple entries
write_file(gen_filename(4, component_type::TOC), toc_text);
touch_file(gen_filename(4, component_type::Data));
write_file(gen_filename(5, component_type::TOC), toc_text);
touch_file(gen_filename(5, component_type::Data));
write_file(pending_delete_dir + "/sstables-4-5.log",
component_basename(4, component_type::TOC) + "\n" +
component_basename(5, component_type::TOC) + "\n");
// Regular log file with multiple entries and some deleted sstables
write_file(gen_filename(6, component_type::TemporaryTOC), toc_text);
touch_file(gen_filename(6, component_type::Data));
write_file(gen_filename(7, component_type::TemporaryTOC), toc_text);
write_file(pending_delete_dir + "/sstables-6-8.log",
component_basename(6, component_type::TOC) + "\n" +
component_basename(7, component_type::TOC) + "\n" +
component_basename(8, component_type::TOC) + "\n");
do_with_cql_env([&] (cql_test_env& e) {
// Empty log file
require_exist(pending_delete_dir + "/sstables-0-0.log", false);
// Empty temporary log file
require_exist(pending_delete_dir + "/sstables-1-1.log.tmp", false);
// Regular log file with single entry
require_exist(gen_filename(2, component_type::TOC), false);
require_exist(gen_filename(2, component_type::Data), false);
require_exist(pending_delete_dir + "/sstables-2-2.log", false);
// Temporary log file with single entry
require_exist(pending_delete_dir + "/sstables-3-3.log.tmp", false);
// Regular log file with multiple entries
require_exist(gen_filename(4, component_type::TOC), false);
require_exist(gen_filename(4, component_type::Data), false);
require_exist(gen_filename(5, component_type::TOC), false);
require_exist(gen_filename(5, component_type::Data), false);
require_exist(pending_delete_dir + "/sstables-4-5.log", false);
// Regular log file with multiple entries and some deleted sstables
require_exist(gen_filename(6, component_type::TemporaryTOC), false);
require_exist(gen_filename(6, component_type::Data), false);
require_exist(gen_filename(7, component_type::TemporaryTOC), false);
require_exist(pending_delete_dir + "/sstables-6-8.log", false);
return make_ready_future<>();
}, db_cfg).get();
}