diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 7f34b639e3..2b952e66d3 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -434,10 +434,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele futures.push_back(remove_file(file_path.string())); } else if (file_path.extension() == ".log") { dblog.info("Found pending_delete log file: {}, replaying", file_path); - auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] { - dblog.debug("Replayed {}, removing", file_path); - return remove_file(file_path.string()); - }); + auto f = sstables::sstable_directory::replay_pending_delete_log(std::move(file_path)); futures.push_back(std::move(f)); } else { dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path); diff --git a/replica/table.cc b/replica/table.cc index e5d6af1a4d..5850359bde 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1086,7 +1086,7 @@ compaction_group::update_main_sstable_list_on_compaction_completion(sstables::co auto f = seastar::try_with_gate(_t._sstable_deletion_gate, [this, sstables_to_remove = desc.old_sstables] { return with_semaphore(_t._sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] { - return sstables::delete_atomically(std::move(sstables_to_remove)); + return sstables::sstable_directory::delete_atomically(std::move(sstables_to_remove)); }); }); @@ -1699,7 +1699,7 @@ future table::discard_sstables(db_clock::time_point truncat if (r.enable_backlog_tracker) { remove_sstable_from_backlog_tracker(p->cg.get_backlog_tracker(), r.sst); } - return sstables::delete_atomically({r.sst}); + return sstables::sstable_directory::delete_atomically({r.sst}); }).then([p] { return make_ready_future(p->rp); }); diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index aeab920671..c7aa9acb82 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -7,7 +7,9 @@ */ #include +#include #include +#include #include "sstables/sstable_directory.hh" #include "sstables/sstables.hh" #include "compaction/compaction_manager.hh" @@ -463,4 +465,98 @@ sstable_directory::retrieve_shared_sstables() { return std::exchange(_shared_sstable_info, {}); } +future<> sstable_directory::delete_atomically(std::vector ssts) { + if (ssts.empty()) { + return make_ready_future<>(); + } + return seastar::async([ssts = std::move(ssts)] { + sstring sstdir; + min_max_tracker gen_tracker; + + for (const auto& sst : ssts) { + gen_tracker.update(sst->generation()); + + if (sstdir.empty()) { + sstdir = sst->get_dir(); + } else { + // All sstables are assumed to be in the same column_family, hence + // sharing their base directory. + assert (sstdir == sst->get_dir()); + } + } + + sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename(); + sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max()); + sstring tmp_pending_delete_log = pending_delete_log + ".tmp"; + sstlog.trace("Writing {}", tmp_pending_delete_log); + try { + touch_directory(pending_delete_dir).get(); + auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive; + // Create temporary pending_delete log file. + auto f = open_file_dma(tmp_pending_delete_log, oflags).get0(); + // Write all toc names into the log file. + auto out = make_file_output_stream(std::move(f), 4096).get0(); + auto close_out = deferred_close(out); + + for (const auto& sst : ssts) { + auto toc = sst->component_basename(component_type::TOC); + out.write(toc).get(); + out.write("\n").get(); + } + + out.flush().get(); + close_out.close_now(); + + auto dir_f = open_directory(pending_delete_dir).get0(); + // Once flushed and closed, the temporary log file can be renamed. + rename_file(tmp_pending_delete_log, pending_delete_log).get(); + + // Guarantee that the changes above reached the disk. + dir_f.flush().get(); + dir_f.close().get(); + sstlog.debug("{} written successfully.", pending_delete_log); + } catch (...) { + sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } + + parallel_for_each(ssts, [] (shared_sstable sst) { + return sst->unlink(); + }).get(); + + // Once all sstables are deleted, the log file can be removed. + // Note: the log file will be removed also if unlink failed to remove + // any sstable and ignored the error. + try { + remove_file(pending_delete_log).get(); + sstlog.debug("{} removed.", pending_delete_log); + } catch (...) { + sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } + }); +} + +// FIXME: Go through maybe_delete_large_partitions_entry on recovery +// since this is an indication we crashed in the middle of delete_atomically +future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_log) { + sstlog.debug("Reading pending_deletes log file {}", pending_delete_log); + fs::path pending_delete_dir = pending_delete_log.parent_path(); + assert(sstable::is_pending_delete_dir(pending_delete_dir)); + try { + sstring sstdir = pending_delete_dir.parent_path().native(); + auto text = co_await seastar::util::read_entire_file_contiguous(pending_delete_log); + + sstring all(text.begin(), text.end()); + std::vector basenames; + boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on); + auto tocs = boost::copy_range>(basenames | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); })); + co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) { + return remove_by_toc_name(sstdir + "/" + name); + }); + sstlog.debug("Replayed {}, removing", pending_delete_log); + co_await remove_file(pending_delete_log.native()); + } catch (...) { + sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception()); + } +} + } diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 881f672c45..1c80bded55 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -207,6 +207,21 @@ public: std::filesystem::path sstable_dir() const noexcept { return _sstable_dir; } + + // When we compact sstables, we have to atomically instantiate the new + // sstable and delete the old ones. Otherwise, if we compact A+B into C, + // and if A contained some data that was tombstoned by B, and if B was + // deleted but A survived, then data from A will be resurrected. + // + // There are two violators of the requirement to atomically delete + // sstables: first sstable instantiation and deletion on disk is atomic + // only wrt. itself, not other sstables, and second when an sstable is + // shared among shard, so actual on-disk deletion of an sstable is deferred + // until all shards agree it can be deleted. + // + // This function only solves the second problem for now. + static future<> delete_atomically(std::vector ssts); + static future<> replay_pending_delete_log(std::filesystem::path log_file); }; } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 45ae4d833b..e66044cdb2 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2816,8 +2816,7 @@ fsync_directory(const io_error_handler& error_handler, sstring dirname) { }); } -static future<> -remove_by_toc_name(sstring sstable_toc_name) { +future<> remove_by_toc_name(sstring sstable_toc_name) { sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size()); sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX; @@ -3037,13 +3036,6 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key& return utils::make_hashed_key(static_cast(key::from_partition_key(s, key))); } -future<> -delete_sstables(std::vector tocs) { - return parallel_for_each(tocs, [] (const sstring& name) { - return remove_by_toc_name(name); - }); -} - future<> sstable::unlink() noexcept { // We must be able to generate toc_filename() @@ -3081,107 +3073,6 @@ sstable::unlink() noexcept { _stats.on_delete(); } -future<> -delete_atomically(std::vector ssts) { - if (ssts.empty()) { - return make_ready_future<>(); - } - return seastar::async([ssts = std::move(ssts)] { - sstring sstdir; - min_max_tracker gen_tracker; - - for (const auto& sst : ssts) { - gen_tracker.update(sst->generation()); - - if (sstdir.empty()) { - sstdir = sst->get_dir(); - } else { - // All sstables are assumed to be in the same column_family, hence - // sharing their base directory. - assert (sstdir == sst->get_dir()); - } - } - - sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename(); - sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max()); - sstring tmp_pending_delete_log = pending_delete_log + ".tmp"; - sstlog.trace("Writing {}", tmp_pending_delete_log); - try { - touch_directory(pending_delete_dir).get(); - auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive; - // Create temporary pending_delete log file. - auto f = open_file_dma(tmp_pending_delete_log, oflags).get0(); - // Write all toc names into the log file. - file_output_stream_options options; - options.buffer_size = 4096; - auto w = file_writer::make(std::move(f), options, tmp_pending_delete_log).get0(); - - for (const auto& sst : ssts) { - auto toc = sst->component_basename(component_type::TOC); - w.write(toc.c_str(), toc.size()); - w.write("\n", 1); - } - - w.flush(); - w.close(); - - auto dir_f = open_directory(pending_delete_dir).get0(); - // Once flushed and closed, the temporary log file can be renamed. - rename_file(tmp_pending_delete_log, pending_delete_log).get(); - - // Guarantee that the changes above reached the disk. - dir_f.flush().get(); - dir_f.close().get(); - sstlog.debug("{} written successfully.", pending_delete_log); - } catch (...) { - sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); - } - - parallel_for_each(ssts, [] (shared_sstable sst) { - return sst->unlink(); - }).get(); - - // Once all sstables are deleted, the log file can be removed. - // Note: the log file will be removed also if unlink failed to remove - // any sstable and ignored the error. - try { - remove_file(pending_delete_log).get(); - sstlog.debug("{} removed.", pending_delete_log); - } catch (...) { - sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception()); - } - }); -} - -// FIXME: Go through maybe_delete_large_partitions_entry on recovery -// since this is an indication we crashed in the middle of delete_atomically -future<> replay_pending_delete_log(sstring pending_delete_log) { - sstlog.debug("Reading pending_deletes log file {}", pending_delete_log); - return seastar::async([pending_delete_log = std::move(pending_delete_log)] { - sstring pending_delete_dir = parent_path(pending_delete_log); - assert(sstable::is_pending_delete_dir(fs::path(pending_delete_dir))); - try { - auto sstdir = parent_path(pending_delete_dir); - auto f = open_file_dma(pending_delete_log, open_flags::ro).get0(); - auto size = f.size().get0(); - auto in = make_file_input_stream(f); - auto text = in.read_exactly(size).get0(); - in.close().get(); - f.close().get(); - - sstring all(text.begin(), text.end()); - std::vector basenames; - boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on); - auto tocs = boost::copy_range>(basenames - | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); }) - | boost::adaptors::transformed([&sstdir] (auto&& basename) { return sstdir + "/" + basename; })); - delete_sstables(tocs).get(); - } catch (...) { - sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception()); - } - }); -} - thread_local sstables_stats::stats sstables_stats::_shard_stats; thread_local partition_index_cache::stats partition_index_cache::_shard_stats; thread_local cached_file::metrics index_page_cache_metrics; diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 451a552f8b..ecca5cb3c5 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -907,21 +907,6 @@ public: gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const; }; -// When we compact sstables, we have to atomically instantiate the new -// sstable and delete the old ones. Otherwise, if we compact A+B into C, -// and if A contained some data that was tombstoned by B, and if B was -// deleted but A survived, then data from A will be resurrected. -// -// There are two violators of the requirement to atomically delete -// sstables: first sstable instantiation and deletion on disk is atomic -// only wrt. itself, not other sstables, and second when an sstable is -// shared among shard, so actual on-disk deletion of an sstable is deferred -// until all shards agree it can be deleted. -// -// This function only solves the second problem for now. -future<> delete_atomically(std::vector ssts); -future<> replay_pending_delete_log(sstring log_file); - // Validate checksums // // Sstables have two kind of checksums: per-chunk checksums and a @@ -977,4 +962,7 @@ public: // swallows all errors and just reports them to the log. future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir); +// similar to sstable::unlink, but works on a TOC file name +future<> remove_by_toc_name(sstring sstable_toc_name); + } // namespace sstables diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 1f08b82c7b..54ada96fef 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -22,6 +22,7 @@ #include "replica/database.hh" #include "sstables/metadata_collector.hh" #include "sstables/sstable_writer.hh" +#include "sstables/sstable_directory.hh" #include #include "test/boost/sstable_test.hh" #include @@ -2103,7 +2104,7 @@ SEASTAR_TEST_CASE(test_unknown_component) { BOOST_REQUIRE(!file_exists(tmp.path().string() + "/la-1-big-UNKNOWN.txt").get0()); BOOST_REQUIRE(file_exists(tmp.path().string() + "/la-2-big-UNKNOWN.txt").get0()); - sstables::delete_atomically({sstp}).get(); + sstables::sstable_directory::delete_atomically({sstp}).get(); // assure unknown component is deleted BOOST_REQUIRE(!file_exists(tmp.path().string() + "/la-2-big-UNKNOWN.txt").get0()); });