table: clone staging sstables into table dir

clone staging sstables so their content may be compacted while
views are built.  When done, the hard-linked copy in the staging
subdirectory will be simply unlinked.

Fixes #9559

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-05-23 08:53:41 +03:00
parent ce5bd505dc
commit 597e415c38
5 changed files with 51 additions and 14 deletions

View File

@@ -113,7 +113,7 @@ future<> view_update_generator::start() {
auto& [t, sstables] = *it;
try {
inject_failure("view_update_generator_move_staging_sstable");
t->move_sstables_from_staging(sstables).get();
t->remove_sstables_from_staging(sstables).get();
} catch (...) {
// Move from staging will be retried upon restart.
vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception());

View File

@@ -502,7 +502,7 @@ public:
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
future<> remove_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
io_error_handler_gen error_handler_gen);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f);

View File

@@ -392,12 +392,13 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
}
if (sstable->requires_view_building()) {
on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename()));
}
// allow in-progress reads to continue using old list
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
new_sstables->insert(sstable);
if (sstable->requires_view_building()) {
_sstables_staging.emplace(sstable->generation(), sstable);
} else if (backlog_tracker) {
if (backlog_tracker) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable);
}
// update sstable set last in case either updating
@@ -438,6 +439,28 @@ void table::enable_off_strategy_trigger() {
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
if (sst->requires_view_building()) {
auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst);
if (!inserted) {
on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation()));
}
// clone staging sstables so their content may be compacted while
// views are built. When done, the hard-linked copy in the staging
// subsirectory will be simply unlinked.
//
// Note that after restart, we don't know whether we already cloned
// the staging sstable or we might have restarted right after sealing it
// and before cloning here, so we might be resurrecting an sstable
// in the base directory in this rare corner case.
try {
sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table());
} catch (...) {
_sstables_staging.erase(it);
throw;
}
}
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
@@ -2276,21 +2299,18 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
future<> table::remove_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
auto units = co_await get_units(_sstable_deletion_sem, 1);
auto dirs_to_sync = std::set<sstring>({dir()});
auto main_sstables = _main_sstables->all();
std::set<sstring> dirs_to_sync;
for (auto sst : sstables) {
dirs_to_sync.emplace(sst->get_dir());
tlogger.debug("Removing sstable {} from staging", sst->get_filename());
try {
co_await sst->move_to_new_dir(dir(), sst->generation(), false);
co_await sst->unlink();
_sstables_staging.erase(sst->generation());
// Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy
if (main_sstables->contains(sst)) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
}
} catch (...) {
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception());
throw;
}
}

View File

@@ -2143,6 +2143,17 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) {
co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs);
}
future<shared_sstable> sstable::clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen) {
if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) {
on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename()));
}
auto gen = opt_gen.value_or(_generation);
co_await create_links(new_dir, gen);
auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format);
co_await cloned_sst->load(co_await get_open_info());
co_return cloned_sst;
}
flat_mutation_reader_v2
sstable::make_reader(
schema_ptr schema,

View File

@@ -201,6 +201,12 @@ public:
// will move it into a quarantine_dir subdirectory of its current directory.
future<> move_to_quarantine(bool do_sync_dirs = true);
// Clone the sstable at a new directory.
// hardlink all sstable components in the new dir
// with the same generation and return a new, shared
// sstable object for the clone.
future<shared_sstable> clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen);
generation_type generation() const {
return _generation;
}