table: clone staging sstables into table dir
clone staging sstables so their content may be compacted while views are built. When done, the hard-linked copy in the staging subdirectory will be simply unlinked. Fixes #9559 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -113,7 +113,7 @@ future<> view_update_generator::start() {
|
||||
auto& [t, sstables] = *it;
|
||||
try {
|
||||
inject_failure("view_update_generator_move_staging_sstable");
|
||||
t->move_sstables_from_staging(sstables).get();
|
||||
t->remove_sstables_from_staging(sstables).get();
|
||||
} catch (...) {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception());
|
||||
|
||||
@@ -502,7 +502,7 @@ public:
|
||||
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
|
||||
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
future<> remove_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
|
||||
io_error_handler_gen error_handler_gen);
|
||||
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
|
||||
|
||||
@@ -392,12 +392,13 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
|
||||
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
|
||||
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
|
||||
}
|
||||
if (sstable->requires_view_building()) {
|
||||
on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename()));
|
||||
}
|
||||
// allow in-progress reads to continue using old list
|
||||
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
|
||||
new_sstables->insert(sstable);
|
||||
if (sstable->requires_view_building()) {
|
||||
_sstables_staging.emplace(sstable->generation(), sstable);
|
||||
} else if (backlog_tracker) {
|
||||
if (backlog_tracker) {
|
||||
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable);
|
||||
}
|
||||
// update sstable set last in case either updating
|
||||
@@ -438,6 +439,28 @@ void table::enable_off_strategy_trigger() {
|
||||
|
||||
future<>
|
||||
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
|
||||
if (sst->requires_view_building()) {
|
||||
auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst);
|
||||
if (!inserted) {
|
||||
on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation()));
|
||||
}
|
||||
|
||||
// clone staging sstables so their content may be compacted while
|
||||
// views are built. When done, the hard-linked copy in the staging
|
||||
// subsirectory will be simply unlinked.
|
||||
//
|
||||
// Note that after restart, we don't know whether we already cloned
|
||||
// the staging sstable or we might have restarted right after sealing it
|
||||
// and before cloning here, so we might be resurrecting an sstable
|
||||
// in the base directory in this rare corner case.
|
||||
try {
|
||||
sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table());
|
||||
} catch (...) {
|
||||
_sstables_staging.erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
@@ -2276,21 +2299,18 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
|
||||
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
||||
future<> table::remove_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
||||
auto units = co_await get_units(_sstable_deletion_sem, 1);
|
||||
auto dirs_to_sync = std::set<sstring>({dir()});
|
||||
auto main_sstables = _main_sstables->all();
|
||||
std::set<sstring> dirs_to_sync;
|
||||
|
||||
for (auto sst : sstables) {
|
||||
dirs_to_sync.emplace(sst->get_dir());
|
||||
tlogger.debug("Removing sstable {} from staging", sst->get_filename());
|
||||
try {
|
||||
co_await sst->move_to_new_dir(dir(), sst->generation(), false);
|
||||
co_await sst->unlink();
|
||||
_sstables_staging.erase(sst->generation());
|
||||
// Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy
|
||||
if (main_sstables->contains(sst)) {
|
||||
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
|
||||
}
|
||||
} catch (...) {
|
||||
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
||||
tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2143,6 +2143,17 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) {
|
||||
co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs);
|
||||
}
|
||||
|
||||
future<shared_sstable> sstable::clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen) {
|
||||
if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) {
|
||||
on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename()));
|
||||
}
|
||||
auto gen = opt_gen.value_or(_generation);
|
||||
co_await create_links(new_dir, gen);
|
||||
auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format);
|
||||
co_await cloned_sst->load(co_await get_open_info());
|
||||
co_return cloned_sst;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2
|
||||
sstable::make_reader(
|
||||
schema_ptr schema,
|
||||
|
||||
@@ -201,6 +201,12 @@ public:
|
||||
// will move it into a quarantine_dir subdirectory of its current directory.
|
||||
future<> move_to_quarantine(bool do_sync_dirs = true);
|
||||
|
||||
// Clone the sstable at a new directory.
|
||||
// hardlink all sstable components in the new dir
|
||||
// with the same generation and return a new, shared
|
||||
// sstable object for the clone.
|
||||
future<shared_sstable> clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen);
|
||||
|
||||
generation_type generation() const {
|
||||
return _generation;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user