diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index 29f018165c..015047423b 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -37,19 +37,6 @@ future<> large_data_handler::maybe_update_large_partitions(const sstables::sstab return make_ready_future<>(); } -future<> large_data_handler::maybe_delete_large_partitions_entry(const sstables::sstable& sst) const { - try { - if (!_stopped && sst.data_size() > _partition_threshold_bytes) { - const schema& s = *sst.get_schema(); - return delete_large_partitions_entry(s, sst.get_filename()); - } - } catch (...) { - // no-op - } - - return make_ready_future<>(); -} - logging::logger cql_table_large_data_handler::large_data_logger("large_data"); future<> cql_table_large_data_handler::update_large_partitions(const schema& s, const sstring& sstable_name, const sstables::key& key, uint64_t partition_size) const { diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 65774f3605..af7811e890 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -65,7 +65,13 @@ public: } future<> maybe_update_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) const; - future<> maybe_delete_large_partitions_entry(const sstables::sstable& sst) const; + + future<> maybe_delete_large_partitions_entry(const schema& s, const sstring& filename, uint64_t data_size) const { + if (!_stopped && __builtin_expect(data_size > _partition_threshold_bytes, false)) { + return delete_large_partitions_entry(s, filename); + } + return make_ready_future<>(); + } const large_data_handler::stats& stats() const { return _stats; } diff --git a/distributed_loader.cc b/distributed_loader.cc index db6e525d19..f1bfef23e6 100644 --- a/distributed_loader.cc +++ b/distributed_loader.cc @@ -517,7 +517,75 @@ future distributed_loader::probe_file(distributed distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { +static future<> execute_futures(std::vector>& futures) { + return seastar::when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { + std::exception_ptr eptr; + + for (auto& f : ret) { + try { + if (eptr) { + f.ignore_ready_future(); + } else { + f.get(); + } + } catch(...) { + eptr = std::current_exception(); + } + } + + if (eptr) { + return make_exception_future<>(eptr); + } + return make_ready_future<>(); + }); +} + +future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) { + return do_with(std::vector>(), [sstdir = std::move(sstdir)] (std::vector>& futures) { + return lister::scan_dir(sstdir, { directory_entry_type::directory }, [&futures] (fs::path sstdir, directory_entry de) { + // push futures that remove files/directories into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + fs::path dirpath = sstdir / de.name; + if (sstables::sstable::is_temp_dir(dirpath)) { + dblog.info("Found temporary sstable directory: {}, removing", dirpath); + futures.push_back(lister::rmdir(dirpath)); + } + return make_ready_future<>(); + }).then([&futures] { + return execute_futures(futures); + }); + }); +} + +future<> distributed_loader::handle_sstables_pending_delete(sstring pending_delete_dir) { + return do_with(std::vector>(), [dir = std::move(pending_delete_dir)] (std::vector>& futures) { + return lister::scan_dir(dir, { directory_entry_type::regular }, [&futures] (fs::path dir, directory_entry de) { + // push nested futures that remove files/directories into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + fs::path file_path = dir / de.name; + if (file_path.extension() == ".tmp") { + dblog.info("Found temporary pending_delete log file: {}, deleting", file_path); + futures.push_back(remove_file(file_path.string())); + } else if (file_path.extension() == ".log") { + dblog.info("Found pending_delete log file: {}, replaying", file_path); + auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] { + dblog.debug("Replayed {}, removing", file_path); + return remove_file(file_path.string()); + }); + futures.push_back(std::move(f)); + } else { + dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path); + } + return make_ready_future<>(); + }).then([&futures] { + return execute_futures(futures); + }); + }); +} + +future<> distributed_loader::do_populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { // We can catch most errors when we try to load an sstable. But if the TOC // file is the one missing, we won't try to load the sstable at all. This // case is still an invalid case, but it is way easier for us to treat it @@ -538,21 +606,12 @@ future<> distributed_loader::populate_column_family(distributed& db, s auto verifier = make_lw_shared>(); return do_with(std::vector>(), [&db, sstdir = std::move(sstdir), verifier, ks, cf] (std::vector>& futures) { - return lister::scan_dir(sstdir, { directory_entry_type::regular, directory_entry_type::directory }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { + return lister::scan_dir(sstdir, { directory_entry_type::regular }, [&db, verifier, &futures] (fs::path sstdir, directory_entry de) { // FIXME: The secondary indexes are in this level, but with a directory type, (starting with ".") - // push future returned by probe_file/rmdir into an array of futures, + // push future returned by probe_file into an array of futures, // so that the supplied callback will not block scan_dir() from // reading the next entry in the directory. - if (de.type && *de.type == directory_entry_type::directory) { - fs::path dirpath = sstdir / de.name; - if (engine().cpu_id() == 0 && sstables::sstable::is_temp_dir(dirpath)) { - dblog.info("Found temporary sstable directory: {}, removing", dirpath); - futures.push_back(lister::rmdir(dirpath)); - } - return make_ready_future<>(); - } - auto f = distributed_loader::probe_file(db, sstdir.native(), de.name).then([verifier, sstdir, de] (auto entry) { if (entry.component == component_type::TemporaryStatistics) { return remove_file(sstables::sstable::filename(sstdir.native(), entry.ks, entry.cf, entry.version, entry.generation, @@ -588,26 +647,7 @@ future<> distributed_loader::populate_column_family(distributed& db, s return make_ready_future<>(); }, &column_family::manifest_json_filter).then([&futures] { - return when_all(futures.begin(), futures.end()).then([] (std::vector> ret) { - std::exception_ptr eptr; - - for (auto& f : ret) { - try { - if (eptr) { - f.ignore_ready_future(); - } else { - f.get(); - } - } catch(...) { - eptr = std::current_exception(); - } - } - - if (eptr) { - return make_exception_future<>(eptr); - } - return make_ready_future<>(); - }); + return execute_futures(futures); }).then([verifier, sstdir, ks = std::move(ks), cf = std::move(cf)] { return do_for_each(*verifier, [sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), verifier] (auto v) { if (v.second.status == component_status::has_temporary_toc_file) { @@ -631,6 +671,22 @@ future<> distributed_loader::populate_column_family(distributed& db, s } +future<> distributed_loader::populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf) { + return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf)] { + // First pass, cleanup temporary sstable directories and sstables pending delete. + if (engine().cpu_id() == 0) { + cleanup_column_family_temp_sst_dirs(sstdir).get(); + auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); + auto exists = file_exists(pending_delete_dir).get0(); + if (exists) { + handle_sstables_pending_delete(pending_delete_dir).get(); + } + } + // Second pass, cleanup sstables with temporary TOCs and load the rest. + do_populate_column_family(db, std::move(sstdir), std::move(ks), std::move(cf)).get(); + }); +} + future<> distributed_loader::populate_keyspace(distributed& db, sstring datadir, sstring ks_name) { auto ksdir = datadir + "/" + ks_name; auto& keyspaces = db.local().get_keyspaces(); diff --git a/distributed_loader.hh b/distributed_loader.hh index e2bc64ce25..5544d13f71 100644 --- a/distributed_loader.hh +++ b/distributed_loader.hh @@ -68,4 +68,8 @@ public: static future<> init_system_keyspace(distributed& db); static future<> ensure_system_table_directories(distributed& db); static future<> init_non_system_keyspaces(distributed& db, distributed& proxy); +private: + static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); + static future<> handle_sstables_pending_delete(sstring pending_deletes_dir); + static future<> do_populate_column_family(distributed& db, sstring sstdir, sstring ks, sstring cf); }; diff --git a/docs/sstables-directory-structure.md b/docs/sstables-directory-structure.md new file mode 100644 index 0000000000..a26db2ee9e --- /dev/null +++ b/docs/sstables-directory-structure.md @@ -0,0 +1,196 @@ + +# Introduction + +SSTables are stored as a set of regular files in the file system +in a common directory per-table (a.k.a. column family). + +In addition to SSTable files, sub-directories of the table base directory +are used for additional features such as snapshots, and atomic deletions recovery. + +This document summarizes the directory structure and file organization of SSTables. + +# Directory Hierarchy + +Scylla uses the following directory structure to store all its SSTables, for example: + +``` +/var/lib/scylla +└── data + ├── ks + │   ├── cf-6749a080303111e9b2f8000000000000 + │   │   ├── ... + │   │   ├── mc-2-big-TOC.txt + │   │   ├── snapshots + │   │   │   └── 1550133010687-cf + │   │   │   ├── manifest.json + │   │   │   ├── ... + │   │   │   └── mc-1-big-TOC.txt + │   │   ├── staging + │   │   └── upload + │   └── cf-7ec943202fc611e9a130000000000000 + │   ├── snapshots + │   │   └── 1550132311207-cf + │   │   ├── ... + │   │   ├── ks-cf-ka-3-TOC.txt + │   │   └── manifest.json + │   ├── staging + │   └── upload + ├── system + │   ├── schema_columnfamilies-45f5b36024bc3f83a3631034ea4fa697 + │   │   ├── staging + │   │   └── upload + │   ├── ... + ├── ... +``` + +Each keyspace, including system keyspaces has its own sub-directory +under the data directory (See `data_file_directories` in scylla.yaml). +The keyspace directory name is the keyspace name. + +In the keyspace directory there is a sub-directory per table +which is named by the table name followed by a dash and a unique identifier +to distinguish between different incarnations of tables that are called with the same name. + +In the table directory there are the SSTable files and additional +sub-directories as documented below. + +# SSTable Files + +SSTables are comprised of multiple component files. +The component file names are self-identifying and denote the component type, as well as per-sstable-format metadata. + +Here are the different component types and their naming convention: + +* Data (`Data.db`) + The SSTable data file, containing a part of the actual data stored in the database. + + See [SSTables-Data-File](https://github.com/scylladb/scylla/wiki/SSTables-Data-File) for more information. + +* Primary Index (`Index.db`) + Index of the row keys with pointers to their positions in the data file. + + See [SSTables-Index-File](https://github.com/scylladb/scylla/wiki/SSTables-Index-File) for more information. + + +* Bloom filter (`Filter.db`) + A structure stored in memory that checks if row data exists in the memtable before accessing SSTables on disk. + + +* Compression Information (`CompressionInfo.db`) + A file holding information about uncompressed data length, chunk offsets and other compression information. + + +* Statistics (`Statistics.db`) + Statistical metadata about the content of the SSTable and encoding statistics for the data file, starting with the mc format. + + +* Digest (`Digest.crc32`, `Digest.adler32`, `Digest.sha1`) + A file holding checksum of the data file. + The method used for checksum is specific to the SSTable format version (See below). + + +* CRC (`CRC.db`) + A file holding the CRC32 for chunks in an uncompressed file. + + +* SSTable Index Summary (`Summary.db`) + A sample of the partition index stored in memory. + + +* SSTable Table of Contents (`TOC.txt`) + A file that stores the list of all components for the SSTable TOC. + See details below regarding the use of a temporary TOC name during creation and deletion of SSTables. + + +* Scylla (`Scylla.db`) + A file holding scylla-specific metadata about the SSTable, such as sharding information, extended features support, and sstabe-run identifier. + +## SSTable Format Version + +SSTable's on-disk format has changed over time. +Three versions are currently supported by Scylla: `ka`, `la`, and `mc`. +Cassandra's convention is that the first letter determines +the major format version, in ascending order, and the second letter - +the minor version, starting from `a` onward. + +The SSTable file names identify the SSTable format version. +In addition, they provide the SSTable generation number and other metadata. + +The "encoding" of the above metadata into the file name changed over time +and it is version specific, as follows: + +``` + mc--- + la--- + --ka-- +``` + +where: +* `` is the SSTable generation - a unique positive number identifying the SSTable. +* `` is an archaic attribute that identifies the SSTable sub-format. + (Only `big` sub-format is supported by Scylla (and Cassandra) at this time.) +* `` is the file's component type, as described above. + +## Table Sub-directories + +The per-table directory may contain several sub-directories, as listed below: + +* Staging directory (`staging`) + A sub-directory holding materialized views SSTables during their update process. + + +* Snapshots directory (`snapshots`) + A sub-directory holding snapshots of SSTables, using hard links to the actual SSTable component files in the table base directory. + + +* Upload directory (`upload`) + Used for ingesting external SSTables into Scylla on startup. + + +* Temporary SSTable directory (`.sstable`) + A directory created when writing new SSTables. + + Some file systems (e.g. linux XFS) base their locality-of-use heuristics based on the directory in which files were created. + In this case, if all files are created in one (or a few) directories, block allocation can become very slow. + To overcome this issue, when a SSTable is created, the database creates a new sub-directory using the newly-created SSTable generation, named `.sstable` + and all SSTable component files are then created in this sub-directory and moved to the table base directory. + +* Pending-delete directory (`pending_delete`) + A directory that may hold log files for replaying atomic deletion operations of SSTables. + +## Temporary TOC Files + +SSTables are immutable. I.e., once written and sealed, they are never re-written. +For data consistency reasons, it is important for the database to determine that a SSTable is complete and valid, +in contrast to a SSTable that might be in a transitional state while being created or while being deleted. + +When created and initially written, the table of contents is stored in a TemporaryTOC file - `TOC.txt.tmp`. +It is renamed to `TOC.txt` when the SSTable is sealed and all components are flushed to stable storage and ready to be used. + +When a SSTable is removed, `TOC.txt` is first renamed to `TOC.txt.tmp`, and that atomically marks the SSTable as deleted. + +# Recovering from crashes + +On startup, the database scans all table directories and cleans up all SSTables that are in a transitional state: either partially written or partially deleted. +These SSTables are identified by their TemporaryTOC component, and the loader simply removes them. + +In addition, any existing temporary SSTable sub-directories are automatically removed. + +## Atomic deletion of SSTables + +In certain cases, the database is required to delete a number of SSTable in an atomic manner. +For example, one of the SSTables may hold a tombstone that deletes data that was inserted to a different SSTable, and both are to be deleted as part of compaction. + +When such operation is initiated, `delete_atomically` creates a unique, temporary log file in the `pending_delete` sub-directory named: +`sstables--.log.tmp`, based on the SSTables to-be-deleted minimum and maximum generation numbers. + +The log file contains the list of SSTables' TOC filenames (basename only, with no leading path), one TOC per line. +After the temporary log file if written, flushed, and closed; it is renamed to its final name: `sstables--.log`. + +Finally, after the SSTables are removed, the log file is removed from the `pending_delete` sub-directory. + +On startup, sealed `pending_delete` log files are replayed and after all requires SSTables are deleted successfully, the log file is deleted. + +Any temporary `pending_delete` log files that are found during startup are simply removed, as this is an indication that: +- The atomic delete operation had not started to delete any SSTable, and +- The log file may be partially written. diff --git a/sstables/sstables.cc b/sstables/sstables.cc index ea7f7927e8..dd2272b61e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2549,35 +2549,36 @@ bool sstable::requires_view_building() const { return boost::algorithm::ends_with(_dir, "staging") || boost::algorithm::ends_with(_dir, "upload"); } +sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation, + format_types format, sstring component) { + sstring v = _version_string.at(version); + sstring g = to_sstring(generation); + sstring f = _format_string.at(format); + switch (version) { + case sstable::version_types::ka: + return ks + "-" + cf + "-" + v + "-" + g + "-" + component; + case sstable::version_types::la: + return v + "-" + g + "-" + f + "-" + component; + case sstable::version_types::mc: + return v + "-" + g + "-" + f + "-" + component; + } + assert(0 && "invalid version"); +} + +sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation, + format_types format, component_type component) { + return component_basename(ks, cf, version, generation, format, + sstable_version_constants::get_component_map(version).at(component)); +} + sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation, format_types format, component_type component) { - sstring basename = [&] { - switch (version) { - case sstable::version_types::ka: - return ks + "-" + cf + "-" + _version_string.at(version) + "-" + to_sstring(generation) + "-" + - sstable_version_constants::get_component_map(version).at(component); - case sstable::version_types::la: - return _version_string.at(version) + "-" + to_sstring(generation) + "-" + _format_string.at(format) + "-" + - sstable_version_constants::get_component_map(version).at(component); - case sstable::version_types::mc: - return _version_string.at(version) + "-" + to_sstring(generation) + "-" + _format_string.at(format) + "-" + - sstable_version_constants::get_component_map(version).at(component); - } - assert(0 && "invalid version"); - }(); - - return dir + "/" + basename; + return dir + "/" + component_basename(ks, cf, version, generation, format, component); } sstring sstable::filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation, format_types format, sstring component) { - static std::unordered_map> fmtmap = { - { sstable::version_types::ka, "{0}-{1}-{2}-{3}-{5}" }, - { sstable::version_types::la, "{2}-{3}-{4}-{5}" }, - { sstable::version_types::mc, "{2}-{3}-{4}-{5}" } - }; - - return dir + "/" + seastar::format(fmtmap[version], ks, cf, _version_string.at(version), to_sstring(generation), _format_string.at(format), component); + return dir + "/" + component_basename(ks, cf, version, generation, format, component); } std::vector> sstable::all_components() const { @@ -2856,9 +2857,6 @@ int sstable::compare_by_max_timestamp(const sstable& other) const { return (ts1 > ts2 ? 1 : (ts1 == ts2 ? 0 : -1)); } -future<> -delete_sstables(std::vector tocs); - sstable::~sstable() { if (_index_file) { _index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) { @@ -2881,7 +2879,12 @@ sstable::~sstable() { // clean up unused sstables, and because we'll never reuse the same // generation number anyway. try { - delete_sstables({filename(component_type::TOC)}).handle_exception( + // FIXME: need to call large_data_handler.maybe_delete_large_partitions_entry + // - Short term fix plan is passing large_data_handler upon construction and + // using it from this path to update large_data_handler. + // - Longer term fix is to hand off deletion of sstables to a manager that can + // deal with sstable marked to be deleted after the corresponding object is destructed. + remove_by_toc_name(toc_filename()).handle_exception( [op = background_jobs().start()] (std::exception_ptr eptr) { try { std::rethrow_exception(eptr); @@ -2922,6 +2925,7 @@ remove_by_toc_name(sstring sstable_toc_name, const io_error_handler& error_handl auto new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX; sstring dir; + sstlog.debug("Removing by TOC name: {}", sstable_toc_name); if (sstable_io_check(error_handler, file_exists, sstable_toc_name).get0()) { dir = dirname(sstable_toc_name); sstable_io_check(error_handler, rename_file, sstable_toc_name, new_toc_name).get(); @@ -3113,24 +3117,150 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key& return utils::make_hashed_key(static_cast(key::from_partition_key(s, key))); } +// FIXME: although this is unused at the moment +// keep it around to be used for replaying pending_delete logs future<> delete_sstables(std::vector tocs) { - // FIXME: this needs to be done atomically (using a log file of sstables we intend to delete) return parallel_for_each(tocs, [] (sstring name) { return remove_by_toc_name(name); }); } future<> -delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler) { - future<> update = parallel_for_each(ssts, [&large_data_handler] (shared_sstable& sst) { - return large_data_handler.maybe_delete_large_partitions_entry(*sst); +sstable::unlink() +{ + auto name = toc_filename(); + return remove_by_toc_name(name).then_wrapped([name = std::move(name)] (future<> f) { + if (f.failed()) { + // Log and ignore the failure since there is nothing much we can do about it at this point. + // a. Compaction will retry deleting the sstable in the next pass, and + // b. in the future sstables_manager is planned to handle sstables deletion. + // c. Eventually we may want to record these failures in a system table + // and notify the administrator about that for manual handling (rather than aborting). + sstlog.warn("Failed to delete {}: {}. Ignoring.", name, f.get_exception()); + } + return make_ready_future<>(); }); - auto sstables_to_delete_atomically = boost::copy_range>(ssts - | boost::adaptors::transformed([] (auto&& sst) { return sst->toc_filename(); })); +} - future<> del = delete_sstables(std::move(sstables_to_delete_atomically)); - return when_all(std::move(del), std::move(update)).discard_result(); +static future<> +maybe_delete_large_data_entry(shared_sstable sst, const db::large_data_handler& large_data_handler) +{ + auto name = sst->get_filename(); + return large_data_handler.maybe_delete_large_partitions_entry(*sst->get_schema(), name, sst->data_size()) + .then_wrapped([name = std::move(name)] (future<> f) { + if (f.failed()) { + // Just log and ignore failures to delete large data entries. + // They are not critical to the operation of the database. + sstlog.warn("Failed to delete large data entry for {}: {}. Ignoring.", name, f.get_exception()); + } + return make_ready_future<>(); + }); +} + +static future<> +delete_sstable_and_maybe_large_data_entries(shared_sstable sst, const db::large_data_handler& large_data_handler) +{ + return when_all(sst->unlink(), maybe_delete_large_data_entry(sst, large_data_handler)).discard_result(); +} + +future<> +delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler) { + return seastar::async([ssts = std::move(ssts), &large_data_handler] { + sstring sstdir; + min_max_tracker gen_tracker; + + for (const auto& sst : ssts) { + gen_tracker.update(sst->generation()); + + if (sstdir.empty()) { + sstdir = sst->get_dir(); + } else { + // All sstables are assumed to be in the same column_family, hence + // sharing their base directory. + assert (sstdir == sst->get_dir()); + } + } + + sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename(); + sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max()); + sstring tmp_pending_delete_log = pending_delete_log + ".tmp"; + sstlog.trace("Writing {}", tmp_pending_delete_log); + try { + touch_directory(pending_delete_dir).get(); + auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive; + // Create temporary pending_delete log file. + auto f = open_file_dma(tmp_pending_delete_log, oflags).get0(); + // Write all toc names into the log file. + file_output_stream_options options; + options.buffer_size = 4096; + auto w = file_writer(std::move(f), options); + + for (const auto& sst : ssts) { + auto toc = sst->component_basename(component_type::TOC); + w.write(toc.c_str(), toc.size()); + w.write("\n", 1); + } + + w.flush(); + w.close(); + + auto dir_f = open_directory(pending_delete_dir).get0(); + // Once flushed and closed, the temporary log file can be renamed. + rename_file(tmp_pending_delete_log, pending_delete_log).get(); + + // Guarantee that the changes above reached the disk. + dir_f.flush().get(); + dir_f.close().get(); + sstlog.debug("{} written successfully.", pending_delete_log); + } catch (...) { + sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log), std::current_exception(); + } + + parallel_for_each(ssts, [&large_data_handler] (shared_sstable sst) { + return delete_sstable_and_maybe_large_data_entries(sst, large_data_handler); + }).get(); + + // Once all sstables are deleted, the log file can be removed. + // Note: the log file will be removed also if + // delete_sstable_and_maybe_large_data_entries failed to remove + // any sstable and ignored the error. + try { + remove_file(pending_delete_log).get(); + sstlog.debug("{} removed.", pending_delete_log); + } catch (...) { + sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } + }); +} + +// FIXME: Go through maybe_delete_large_partitions_entry on recovery +// since this is an indication we crashed in the middle of delete_atomically +future<> replay_pending_delete_log(sstring pending_delete_log) { + sstlog.debug("Reading pending_deletes log file {}", pending_delete_log); + return seastar::async([pending_delete_log = std::move(pending_delete_log)] { + sstring pending_delete_dir = dirname(pending_delete_log); + assert(sstable::is_pending_delete_dir(fs::path(pending_delete_dir))); + try { + auto sstdir = dirname(pending_delete_dir); + auto f = open_file_dma(pending_delete_log, open_flags::ro).get0(); + auto size = f.size().get0(); + auto in = make_file_input_stream(f); + auto text = in.read_exactly(size).get0(); + in.close().get(); + f.close().get(); + + sstring all(text.begin(), text.end()); + std::vector basenames; + boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on); + auto tocs = boost::copy_range>(basenames + | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); }) + | boost::adaptors::transformed([&sstdir] (auto&& basename) { return sstdir + "/" + basename; })); + delete_sstables(tocs).get(); + } catch (...) { + sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } + }); } thread_local sstables_stats::stats sstables_stats::_shard_stats; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 24e846aaf0..7aff9026f7 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -174,6 +174,10 @@ public: static component_type component_from_sstring(version_types version, sstring& s); static version_types version_from_sstring(sstring& s); static format_types format_from_sstring(sstring& s); + static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation, + format_types format, component_type component); + static sstring component_basename(const sstring& ks, const sstring& cf, version_types version, int64_t generation, + format_types format, sstring component); static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation, format_types format, component_type component); static sstring filename(const sstring& dir, const sstring& ks, const sstring& cf, version_types version, int64_t generation, @@ -354,6 +358,10 @@ public: // Return values are those of a trichotomic comparison. int compare_by_max_timestamp(const sstable& other) const; + sstring component_basename(component_type f) const { + return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f); + } + sstring filename(const sstring& dir, component_type f) const { return filename(dir, _schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f); } @@ -387,6 +395,15 @@ public: return dirpath.extension().string() == ".sstable"; } + static sstring pending_delete_dir_basename() { + return "pending_delete"; + } + + static bool is_pending_delete_dir(const fs::path& dirpath) + { + return dirpath.filename().string() == pending_delete_dir_basename().c_str(); + } + const sstring& get_dir() const { return _dir; } @@ -409,6 +426,9 @@ public: return create_links(dir, _generation); } + // Delete the sstable by unlinking all sstable files + future<> unlink(); + /** * Note. This is using the Origin definition of * max_data_age, which is load time. This could maybe @@ -824,6 +844,11 @@ struct entry_descriptor { int64_t generation, sstable::format_types format, component_type component) : sstdir(sstdir), ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {} + + entry_descriptor(sstring ks, sstring cf, sstable::version_types version, + int64_t generation, sstable::format_types format, + component_type component) + : ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {} }; // Waits for all prior tasks started on current shard related to sstable management to finish. @@ -849,6 +874,7 @@ future<> await_background_jobs_on_all_shards(); // // This function only solves the second problem for now. future<> delete_atomically(std::vector ssts, const db::large_data_handler& large_data_handler); +future<> replay_pending_delete_log(sstring log_file); struct index_sampling_state { static constexpr size_t default_summary_byte_cost = 2000; diff --git a/tests/database_test.cc b/tests/database_test.cc index ed6804baec..ee2b75b42a 100644 --- a/tests/database_test.cc +++ b/tests/database_test.cc @@ -165,3 +165,125 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) { return make_ready_future<>(); }, db_cfg).get(); } + +SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) { + using sst = sstables::sstable; + + tmpdir data_dir; + db::config db_cfg; + + db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine); + + // Create incomplete sstables in test data directory + sstring ks = "system"; + sstring cf = "local-7ad54392bcdd35a684174e047860b377"; + sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); + sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename(); + + auto require_exist = [] (const sstring& name, bool should_exist) { + auto exists = file_exists(name).get0(); + if (should_exist) { + BOOST_REQUIRE(exists); + } else { + BOOST_REQUIRE(!exists); + } + }; + + auto touch_dir = [&require_exist] (const sstring& dir_name) { + recursive_touch_directory(dir_name).get(); + require_exist(dir_name, true); + }; + + auto touch_file = [&require_exist] (const sstring& file_name) { + auto f = open_file_dma(file_name, open_flags::create).get0(); + f.close().get(); + require_exist(file_name, true); + }; + + auto write_file = [&require_exist] (const sstring& file_name, const sstring& text) { + auto f = open_file_dma(file_name, open_flags::wo | open_flags::create | open_flags::truncate).get0(); + auto buf = temporary_buffer::aligned(f.memory_dma_alignment(), text.size()); + ::memcpy(buf.get_write(), text.c_str(), text.size()); + auto count = f.dma_write(0, buf.get(), text.size()).get0(); + BOOST_REQUIRE(count == text.size()); + f.close().get(); + require_exist(file_name, true); + }; + + auto component_basename = [&ks, &cf] (int64_t gen, component_type ctype) { + return sst::component_basename(ks, cf, sst::version_types::mc, gen, sst::format_types::big, ctype); + }; + + auto gen_filename = [&sst_dir, &ks, &cf] (int64_t gen, component_type ctype) { + return sst::filename(sst_dir, ks, cf, sst::version_types::mc, gen, sst::format_types::big, ctype); + }; + + touch_dir(pending_delete_dir); + + // Empty log file + touch_file(pending_delete_dir + "/sstables-0-0.log"); + + // Empty temporary log file + touch_file(pending_delete_dir + "/sstables-1-1.log.tmp"); + + const sstring toc_text = "TOC.txt\nData.db\n"; + + // Regular log file with single entry + write_file(gen_filename(2, component_type::TOC), toc_text); + touch_file(gen_filename(2, component_type::Data)); + write_file(pending_delete_dir + "/sstables-2-2.log", + component_basename(2, component_type::TOC) + "\n"); + + // Temporary log file with single entry + write_file(pending_delete_dir + "/sstables-3-3.log.tmp", + component_basename(3, component_type::TOC) + "\n"); + + // Regular log file with multiple entries + write_file(gen_filename(4, component_type::TOC), toc_text); + touch_file(gen_filename(4, component_type::Data)); + write_file(gen_filename(5, component_type::TOC), toc_text); + touch_file(gen_filename(5, component_type::Data)); + write_file(pending_delete_dir + "/sstables-4-5.log", + component_basename(4, component_type::TOC) + "\n" + + component_basename(5, component_type::TOC) + "\n"); + + // Regular log file with multiple entries and some deleted sstables + write_file(gen_filename(6, component_type::TemporaryTOC), toc_text); + touch_file(gen_filename(6, component_type::Data)); + write_file(gen_filename(7, component_type::TemporaryTOC), toc_text); + write_file(pending_delete_dir + "/sstables-6-8.log", + component_basename(6, component_type::TOC) + "\n" + + component_basename(7, component_type::TOC) + "\n" + + component_basename(8, component_type::TOC) + "\n"); + + do_with_cql_env([&] (cql_test_env& e) { + // Empty log file + require_exist(pending_delete_dir + "/sstables-0-0.log", false); + + // Empty temporary log file + require_exist(pending_delete_dir + "/sstables-1-1.log.tmp", false); + + // Regular log file with single entry + require_exist(gen_filename(2, component_type::TOC), false); + require_exist(gen_filename(2, component_type::Data), false); + require_exist(pending_delete_dir + "/sstables-2-2.log", false); + + // Temporary log file with single entry + require_exist(pending_delete_dir + "/sstables-3-3.log.tmp", false); + + // Regular log file with multiple entries + require_exist(gen_filename(4, component_type::TOC), false); + require_exist(gen_filename(4, component_type::Data), false); + require_exist(gen_filename(5, component_type::TOC), false); + require_exist(gen_filename(5, component_type::Data), false); + require_exist(pending_delete_dir + "/sstables-4-5.log", false); + + // Regular log file with multiple entries and some deleted sstables + require_exist(gen_filename(6, component_type::TemporaryTOC), false); + require_exist(gen_filename(6, component_type::Data), false); + require_exist(gen_filename(7, component_type::TemporaryTOC), false); + require_exist(pending_delete_dir + "/sstables-6-8.log", false); + + return make_ready_future<>(); + }, db_cfg).get(); +}