Merge 'Move deletion log code to sstable_directory.cc' from Pavel Emelyanov

In order to support different storage kinds for sstable files (e.g. -- s3) it's needed to localize all the places that manipulate files on a POSIX filesystem so that custom storage could implement them in its own way. This set moves the deletion log manipulations to the sstable_directory.cc, which already "knows" that it works over a directory.

Closes #12020

* github.com:scylladb/scylladb:
  sstables: Delete log file in replay_pending_delete_log()
  sstables: Move deletion log manipulations to sstable_directory.cc
  sstables: Open-code delete_sstables() call
  sstables: Use fs::path in replay_pending_delete_log()
  sstables: Indentation fix after previous patch
  sstables: Coroutinize replay_pending_delete_log
  sstables: Read pending delete log with one line helper
  sstables: Dont write pending log with file_writer
This commit is contained in:
Avi Kivity
2022-11-21 21:22:59 +02:00
7 changed files with 120 additions and 132 deletions

View File

@@ -434,10 +434,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
futures.push_back(remove_file(file_path.string()));
} else if (file_path.extension() == ".log") {
dblog.info("Found pending_delete log file: {}, replaying", file_path);
auto f = sstables::replay_pending_delete_log(file_path.string()).then([file_path = std::move(file_path)] {
dblog.debug("Replayed {}, removing", file_path);
return remove_file(file_path.string());
});
auto f = sstables::sstable_directory::replay_pending_delete_log(std::move(file_path));
futures.push_back(std::move(f));
} else {
dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);

View File

@@ -1086,7 +1086,7 @@ compaction_group::update_main_sstable_list_on_compaction_completion(sstables::co
auto f = seastar::try_with_gate(_t._sstable_deletion_gate, [this, sstables_to_remove = desc.old_sstables] {
return with_semaphore(_t._sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] {
return sstables::delete_atomically(std::move(sstables_to_remove));
return sstables::sstable_directory::delete_atomically(std::move(sstables_to_remove));
});
});
@@ -1699,7 +1699,7 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
if (r.enable_backlog_tracker) {
remove_sstable_from_backlog_tracker(p->cg.get_backlog_tracker(), r.sst);
}
return sstables::delete_atomically({r.sst});
return sstables::sstable_directory::delete_atomically({r.sst});
}).then([p] {
return make_ready_future<db::replay_position>(p->rp);
});

View File

@@ -7,7 +7,9 @@
*/
#include <seastar/core/coroutine.hh>
#include <seastar/util/file.hh>
#include <boost/range/adaptor/map.hpp>
#include <boost/algorithm/string.hpp>
#include "sstables/sstable_directory.hh"
#include "sstables/sstables.hh"
#include "compaction/compaction_manager.hh"
@@ -463,4 +465,98 @@ sstable_directory::retrieve_shared_sstables() {
return std::exchange(_shared_sstable_info, {});
}
future<> sstable_directory::delete_atomically(std::vector<shared_sstable> ssts) {
if (ssts.empty()) {
return make_ready_future<>();
}
return seastar::async([ssts = std::move(ssts)] {
sstring sstdir;
min_max_tracker<generation_type> gen_tracker;
for (const auto& sst : ssts) {
gen_tracker.update(sst->generation());
if (sstdir.empty()) {
sstdir = sst->get_dir();
} else {
// All sstables are assumed to be in the same column_family, hence
// sharing their base directory.
assert (sstdir == sst->get_dir());
}
}
sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename();
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
sstlog.trace("Writing {}", tmp_pending_delete_log);
try {
touch_directory(pending_delete_dir).get();
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
// Create temporary pending_delete log file.
auto f = open_file_dma(tmp_pending_delete_log, oflags).get0();
// Write all toc names into the log file.
auto out = make_file_output_stream(std::move(f), 4096).get0();
auto close_out = deferred_close(out);
for (const auto& sst : ssts) {
auto toc = sst->component_basename(component_type::TOC);
out.write(toc).get();
out.write("\n").get();
}
out.flush().get();
close_out.close_now();
auto dir_f = open_directory(pending_delete_dir).get0();
// Once flushed and closed, the temporary log file can be renamed.
rename_file(tmp_pending_delete_log, pending_delete_log).get();
// Guarantee that the changes above reached the disk.
dir_f.flush().get();
dir_f.close().get();
sstlog.debug("{} written successfully.", pending_delete_log);
} catch (...) {
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
parallel_for_each(ssts, [] (shared_sstable sst) {
return sst->unlink();
}).get();
// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if unlink failed to remove
// any sstable and ignored the error.
try {
remove_file(pending_delete_log).get();
sstlog.debug("{} removed.", pending_delete_log);
} catch (...) {
sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
});
}
// FIXME: Go through maybe_delete_large_partitions_entry on recovery
// since this is an indication we crashed in the middle of delete_atomically
future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_log) {
sstlog.debug("Reading pending_deletes log file {}", pending_delete_log);
fs::path pending_delete_dir = pending_delete_log.parent_path();
assert(sstable::is_pending_delete_dir(pending_delete_dir));
try {
sstring sstdir = pending_delete_dir.parent_path().native();
auto text = co_await seastar::util::read_entire_file_contiguous(pending_delete_log);
sstring all(text.begin(), text.end());
std::vector<sstring> basenames;
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
auto tocs = boost::copy_range<std::vector<sstring>>(basenames | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); }));
co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) {
return remove_by_toc_name(sstdir + "/" + name);
});
sstlog.debug("Replayed {}, removing", pending_delete_log);
co_await remove_file(pending_delete_log.native());
} catch (...) {
sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
}
}

View File

@@ -207,6 +207,21 @@ public:
std::filesystem::path sstable_dir() const noexcept {
return _sstable_dir;
}
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
// deleted but A survived, then data from A will be resurrected.
//
// There are two violators of the requirement to atomically delete
// sstables: first sstable instantiation and deletion on disk is atomic
// only wrt. itself, not other sstables, and second when an sstable is
// shared among shard, so actual on-disk deletion of an sstable is deferred
// until all shards agree it can be deleted.
//
// This function only solves the second problem for now.
static future<> delete_atomically(std::vector<shared_sstable> ssts);
static future<> replay_pending_delete_log(std::filesystem::path log_file);
};
}

View File

@@ -2816,8 +2816,7 @@ fsync_directory(const io_error_handler& error_handler, sstring dirname) {
});
}
static future<>
remove_by_toc_name(sstring sstable_toc_name) {
future<> remove_by_toc_name(sstring sstable_toc_name) {
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
@@ -3037,13 +3036,6 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
}
future<>
delete_sstables(std::vector<sstring> tocs) {
return parallel_for_each(tocs, [] (const sstring& name) {
return remove_by_toc_name(name);
});
}
future<>
sstable::unlink() noexcept {
// We must be able to generate toc_filename()
@@ -3081,107 +3073,6 @@ sstable::unlink() noexcept {
_stats.on_delete();
}
future<>
delete_atomically(std::vector<shared_sstable> ssts) {
if (ssts.empty()) {
return make_ready_future<>();
}
return seastar::async([ssts = std::move(ssts)] {
sstring sstdir;
min_max_tracker<generation_type> gen_tracker;
for (const auto& sst : ssts) {
gen_tracker.update(sst->generation());
if (sstdir.empty()) {
sstdir = sst->get_dir();
} else {
// All sstables are assumed to be in the same column_family, hence
// sharing their base directory.
assert (sstdir == sst->get_dir());
}
}
sstring pending_delete_dir = sstdir + "/" + sstable::pending_delete_dir_basename();
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
sstlog.trace("Writing {}", tmp_pending_delete_log);
try {
touch_directory(pending_delete_dir).get();
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
// Create temporary pending_delete log file.
auto f = open_file_dma(tmp_pending_delete_log, oflags).get0();
// Write all toc names into the log file.
file_output_stream_options options;
options.buffer_size = 4096;
auto w = file_writer::make(std::move(f), options, tmp_pending_delete_log).get0();
for (const auto& sst : ssts) {
auto toc = sst->component_basename(component_type::TOC);
w.write(toc.c_str(), toc.size());
w.write("\n", 1);
}
w.flush();
w.close();
auto dir_f = open_directory(pending_delete_dir).get0();
// Once flushed and closed, the temporary log file can be renamed.
rename_file(tmp_pending_delete_log, pending_delete_log).get();
// Guarantee that the changes above reached the disk.
dir_f.flush().get();
dir_f.close().get();
sstlog.debug("{} written successfully.", pending_delete_log);
} catch (...) {
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
parallel_for_each(ssts, [] (shared_sstable sst) {
return sst->unlink();
}).get();
// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if unlink failed to remove
// any sstable and ignored the error.
try {
remove_file(pending_delete_log).get();
sstlog.debug("{} removed.", pending_delete_log);
} catch (...) {
sstlog.warn("Error removing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
});
}
// FIXME: Go through maybe_delete_large_partitions_entry on recovery
// since this is an indication we crashed in the middle of delete_atomically
future<> replay_pending_delete_log(sstring pending_delete_log) {
sstlog.debug("Reading pending_deletes log file {}", pending_delete_log);
return seastar::async([pending_delete_log = std::move(pending_delete_log)] {
sstring pending_delete_dir = parent_path(pending_delete_log);
assert(sstable::is_pending_delete_dir(fs::path(pending_delete_dir)));
try {
auto sstdir = parent_path(pending_delete_dir);
auto f = open_file_dma(pending_delete_log, open_flags::ro).get0();
auto size = f.size().get0();
auto in = make_file_input_stream(f);
auto text = in.read_exactly(size).get0();
in.close().get();
f.close().get();
sstring all(text.begin(), text.end());
std::vector<sstring> basenames;
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
auto tocs = boost::copy_range<std::vector<sstring>>(basenames
| boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); })
| boost::adaptors::transformed([&sstdir] (auto&& basename) { return sstdir + "/" + basename; }));
delete_sstables(tocs).get();
} catch (...) {
sstlog.warn("Error replaying {}: {}. Ignoring.", pending_delete_log, std::current_exception());
}
});
}
thread_local sstables_stats::stats sstables_stats::_shard_stats;
thread_local partition_index_cache::stats partition_index_cache::_shard_stats;
thread_local cached_file::metrics index_page_cache_metrics;

View File

@@ -907,21 +907,6 @@ public:
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state) const;
};
// When we compact sstables, we have to atomically instantiate the new
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
// and if A contained some data that was tombstoned by B, and if B was
// deleted but A survived, then data from A will be resurrected.
//
// There are two violators of the requirement to atomically delete
// sstables: first sstable instantiation and deletion on disk is atomic
// only wrt. itself, not other sstables, and second when an sstable is
// shared among shard, so actual on-disk deletion of an sstable is deferred
// until all shards agree it can be deleted.
//
// This function only solves the second problem for now.
future<> delete_atomically(std::vector<shared_sstable> ssts);
future<> replay_pending_delete_log(sstring log_file);
// Validate checksums
//
// Sstables have two kind of checksums: per-chunk checksums and a
@@ -977,4 +962,7 @@ public:
// swallows all errors and just reports them to the log.
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir);
// similar to sstable::unlink, but works on a TOC file name
future<> remove_by_toc_name(sstring sstable_toc_name);
} // namespace sstables

View File

@@ -22,6 +22,7 @@
#include "replica/database.hh"
#include "sstables/metadata_collector.hh"
#include "sstables/sstable_writer.hh"
#include "sstables/sstable_directory.hh"
#include <memory>
#include "test/boost/sstable_test.hh"
#include <seastar/core/seastar.hh>
@@ -2103,7 +2104,7 @@ SEASTAR_TEST_CASE(test_unknown_component) {
BOOST_REQUIRE(!file_exists(tmp.path().string() + "/la-1-big-UNKNOWN.txt").get0());
BOOST_REQUIRE(file_exists(tmp.path().string() + "/la-2-big-UNKNOWN.txt").get0());
sstables::delete_atomically({sstp}).get();
sstables::sstable_directory::delete_atomically({sstp}).get();
// assure unknown component is deleted
BOOST_REQUIRE(!file_exists(tmp.path().string() + "/la-2-big-UNKNOWN.txt").get0());
});