Merge 'Compact staging sstables' from Benny Halevy
This series decouples the staging sstables from the table's sstable set. The current behavior keeps the sstables in the staging directory until view building is done. They are readable as any other sstable, but fenced off from compaction, so they don't go away in the meanwhile. Currently, when views are built, the sstables are moved into the main table directory where they will then be compacted normally. The problem with this design is that the staging sstables are never compacted, in particular they won't get cleaned up or scrubbed. The cleanup scenario open a backdoor for data resurrection when the staging sstables are moved after view building while possibly containing stale partitions (#9559) which will not be cleaned up until next time cleanup compaction is performed. With this series, SSTables that are created in or moved to the staging sub-directory are "cloned" into the base table directory by hard-linking the components there and creating a new sstable object which loads the cloned files. The former, in the staging directory is used solely for view building and is not added to the table's sstable set, while the latter, its clone, behaves like any other sstable and is added either to the regular or maintenance set and is read and compacted normally. When view building is done, instead of moving the staging sstable into the table's base directory, it is simply unlinked. If its "clone" wasn't compacted away yet, then it will just remain where it is, exactly like it would be after it was moved there in the present state of things. If it was already compacted and no longer exists, then unlinking will then free its storage. Note that snapshot is based on the sstables listed by the table, which do not include the staging sstables with this change. But that shouldn't matter since even today, the sstables in the snapshot has no notion of "staging" directory and it is expected that the MV's are either updated view `nodetool refresh` if restoring sstables from snapshot using the uploads dir, or if restoring the whole table from backup - MV's are effectively expected to be rebuilt from scratch (they are not included in automatic snapshots anyway since we don't have snapshot-coherency across tables). A fundamental infrastructure change was done to achieve that which is to change the sstable_list which was a std::unordered_set<shared_sstable> into a std::unordered_map<generation_type, shared_sstable> that keeps the shared_sstable objects indexed by generation number (that must be unique). With this model, sstables are supposed to be searched by the generation number, not by their pointer, since when the staging sstable is clones, there will be 2 shared_sstable objects with the same generation (and different `dir()`) and we must distinguish between them. Special care was taken to throw a runtime_error exception if when looking up a shared sstable and finding another one with the same generation, since they must never exist in the same sstable_map. Fixes #9559 Closes #10657 * github.com:scylladb/scylla: table: clone staging sstables into table dir view_update_generator: discover_staging_sstables: reindent table: add get_staging_sstables view_update_generator: discover_staging_sstables: get shared table ptr earlier distributed_loader: populate table directory first sstables: time_series_sstable_set: insert: make exception safe sstables: move_to_new_dir: fix debug log message
This commit is contained in:
@@ -113,7 +113,7 @@ future<> view_update_generator::start() {
|
||||
auto& [t, sstables] = *it;
|
||||
try {
|
||||
inject_failure("view_update_generator_move_staging_sstable");
|
||||
t->move_sstables_from_staging(sstables).get();
|
||||
t->remove_sstables_from_staging(sstables).get();
|
||||
} catch (...) {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception());
|
||||
@@ -172,14 +172,14 @@ void view_update_generator::setup_metrics() {
|
||||
|
||||
void view_update_generator::discover_staging_sstables() {
|
||||
for (auto& x : _db.get_column_families()) {
|
||||
replica::table& t = *(x.second);
|
||||
for (auto sstables = t.get_sstables(); sstables::shared_sstable sst : *sstables) {
|
||||
if (sst->requires_view_building()) {
|
||||
_sstables_with_tables[t.shared_from_this()].push_back(std::move(sst));
|
||||
// we're at early stage here, no need to kick _pending_sstables (the
|
||||
// bulding fiber is not running), neither we can wait on the semaphore
|
||||
_registration_sem.consume(1);
|
||||
}
|
||||
auto t = x.second->shared_from_this();
|
||||
const auto& sstables = t->get_staging_sstables();
|
||||
_sstables_with_tables[t].reserve(_sstables_with_tables[t].size() + sstables.size());
|
||||
for (auto& sst : sstables | boost::adaptors::map_values) {
|
||||
_sstables_with_tables[t].push_back(sst);
|
||||
// we're at early stage here, no need to kick _pending_sstables (the
|
||||
// bulding fiber is not running), neither we can wait on the semaphore
|
||||
_registration_sem.consume(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -510,7 +510,7 @@ public:
|
||||
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
|
||||
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
future<> remove_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
|
||||
io_error_handler_gen error_handler_gen);
|
||||
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
|
||||
@@ -909,6 +909,9 @@ public:
|
||||
size_t sstables_count() const;
|
||||
std::vector<uint64_t> sstable_count_per_level() const;
|
||||
int64_t get_unleveled_sstables() const;
|
||||
const auto& get_staging_sstables() const {
|
||||
return _sstables_staging;
|
||||
}
|
||||
|
||||
void start_compaction();
|
||||
void trigger_compaction();
|
||||
|
||||
@@ -546,9 +546,12 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
|
||||
|
||||
try {
|
||||
co_await ks.make_directory_for_column_family(cfname, uuid);
|
||||
// Populate the table base directory first so we can clone
|
||||
// staging sstables into it later when populating the table
|
||||
// from the staging_dir.
|
||||
co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
|
||||
co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
|
||||
co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
|
||||
co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
|
||||
} catch (...) {
|
||||
std::exception_ptr eptr = std::current_exception();
|
||||
std::string msg =
|
||||
|
||||
@@ -392,12 +392,13 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
|
||||
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
|
||||
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
|
||||
}
|
||||
if (sstable->requires_view_building()) {
|
||||
on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename()));
|
||||
}
|
||||
// allow in-progress reads to continue using old list
|
||||
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
|
||||
new_sstables->insert(sstable);
|
||||
if (sstable->requires_view_building()) {
|
||||
_sstables_staging.emplace(sstable->generation(), sstable);
|
||||
} else if (backlog_tracker) {
|
||||
if (backlog_tracker) {
|
||||
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable);
|
||||
}
|
||||
// update sstable set last in case either updating
|
||||
@@ -438,6 +439,28 @@ void table::enable_off_strategy_trigger() {
|
||||
|
||||
future<>
|
||||
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
|
||||
if (sst->requires_view_building()) {
|
||||
auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst);
|
||||
if (!inserted) {
|
||||
on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation()));
|
||||
}
|
||||
|
||||
// clone staging sstables so their content may be compacted while
|
||||
// views are built. When done, the hard-linked copy in the staging
|
||||
// subsirectory will be simply unlinked.
|
||||
//
|
||||
// Note that after restart, we don't know whether we already cloned
|
||||
// the staging sstable or we might have restarted right after sealing it
|
||||
// and before cloning here, so we might be resurrecting an sstable
|
||||
// in the base directory in this rare corner case.
|
||||
try {
|
||||
sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table());
|
||||
} catch (...) {
|
||||
_sstables_staging.erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
@@ -2272,21 +2295,18 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
|
||||
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
||||
future<> table::remove_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
|
||||
auto units = co_await get_units(_sstable_deletion_sem, 1);
|
||||
auto dirs_to_sync = std::set<sstring>({dir()});
|
||||
auto main_sstables = _main_sstables->all();
|
||||
std::set<sstring> dirs_to_sync;
|
||||
|
||||
for (auto sst : sstables) {
|
||||
dirs_to_sync.emplace(sst->get_dir());
|
||||
tlogger.debug("Removing sstable {} from staging", sst->get_filename());
|
||||
try {
|
||||
co_await sst->move_to_new_dir(dir(), sst->generation(), false);
|
||||
co_await sst->unlink();
|
||||
_sstables_staging.erase(sst->generation());
|
||||
// Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy
|
||||
if (main_sstables->contains(sst)) {
|
||||
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
|
||||
}
|
||||
} catch (...) {
|
||||
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
||||
tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,10 +390,15 @@ void time_series_sstable_set::for_each_sstable(std::function<void(const shared_s
|
||||
|
||||
// O(log n)
|
||||
void time_series_sstable_set::insert(shared_sstable sst) {
|
||||
try {
|
||||
auto min_pos = sst->min_position();
|
||||
auto max_pos_reversed = sst->max_position().reversed();
|
||||
_sstables->emplace(std::move(min_pos), sst);
|
||||
_sstables_reversed->emplace(std::move(max_pos_reversed), std::move(sst));
|
||||
} catch (...) {
|
||||
erase(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// O(n) worst case, but should be close to O(log n) most of the time
|
||||
|
||||
@@ -2113,7 +2113,7 @@ future<> sstable::set_generation(generation_type new_generation) {
|
||||
future<> sstable::move_to_new_dir(sstring new_dir, generation_type new_generation, bool do_sync_dirs) {
|
||||
sstring old_dir = get_dir();
|
||||
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
|
||||
get_filename(), old_dir, _generation, new_dir, new_generation, do_sync_dirs);
|
||||
get_filename(), _generation, new_dir, new_generation, do_sync_dirs);
|
||||
co_await create_links_and_mark_for_removal(new_dir, new_generation);
|
||||
_dir = new_dir;
|
||||
generation_type old_generation = std::exchange(_generation, new_generation);
|
||||
@@ -2143,6 +2143,17 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) {
|
||||
co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs);
|
||||
}
|
||||
|
||||
future<shared_sstable> sstable::clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen) {
|
||||
if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) {
|
||||
on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename()));
|
||||
}
|
||||
auto gen = opt_gen.value_or(_generation);
|
||||
co_await create_links(new_dir, gen);
|
||||
auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format);
|
||||
co_await cloned_sst->load(co_await get_open_info());
|
||||
co_return cloned_sst;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2
|
||||
sstable::make_reader(
|
||||
schema_ptr schema,
|
||||
|
||||
@@ -201,6 +201,12 @@ public:
|
||||
// will move it into a quarantine_dir subdirectory of its current directory.
|
||||
future<> move_to_quarantine(bool do_sync_dirs = true);
|
||||
|
||||
// Clone the sstable at a new directory.
|
||||
// hardlink all sstable components in the new dir
|
||||
// with the same generation and return a new, shared
|
||||
// sstable object for the clone.
|
||||
future<shared_sstable> clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen);
|
||||
|
||||
generation_type generation() const {
|
||||
return _generation;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user