sstables: Synchronize deletion of SSTables in resharding with other operations
Input SSTables of resharding is deleted at the coordinator shard, not at the shards they belong to. We're not acquiring deletion semaphore before removing those input SSTables from the SSTable set, so it could happen that resharding deletes those SSTables while another operation like snapshot, which acquires the semaphore, find them deleted. Let's acquire the deletion semaphore so that the input SSTables will only be removed from the set, when we're certain that nobody is relying on their existence anymore. Now resharding will only delete input SStables after they're safely removed from the SSTable set of all shards they belong to. unit: test(dev). Fixes #6328. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20200507233636.92104-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
4d957eeda7
commit
88d2486fca
@@ -619,6 +619,9 @@ private:
|
||||
// Rebuilds existing sstable set with new sstables added to it and old sstables removed from it.
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables);
|
||||
// Rebuild sstable list with the deletion semaphore acquired.
|
||||
future<>
|
||||
rebuild_sstable_list_with_deletion_sem(std::vector<sstables::shared_sstable> new_ssts, std::vector<sstables::shared_sstable> old_ssts);
|
||||
|
||||
// Rebuild sstable set, delete input sstables right away, and update row cache and statistics.
|
||||
void on_compaction_completion(sstables::compaction_completion_desc& desc);
|
||||
@@ -626,8 +629,8 @@ private:
|
||||
void rebuild_statistics();
|
||||
|
||||
// This function replaces new sstables by their ancestors, which are sstables that needed resharding.
|
||||
void replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors, std::vector<sstables::shared_sstable> new_sstables);
|
||||
void remove_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors);
|
||||
future<> replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors, std::vector<sstables::shared_sstable> new_sstables);
|
||||
future<> remove_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors);
|
||||
private:
|
||||
mutation_source_opt _virtual_reader;
|
||||
// Creates a mutation reader which covers given sstables.
|
||||
|
||||
@@ -462,12 +462,11 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
|
||||
// handles case where sstable needing rewrite doesn't produce any sstable
|
||||
// for a shard it belongs to when resharded (the reason is explained above).
|
||||
return smp::submit_to(shard, [cf, ancestors = std::move(ancestors)] () mutable {
|
||||
cf->remove_ancestors_needed_rewrite(ancestors);
|
||||
return cf->remove_ancestors_needed_rewrite(ancestors);
|
||||
});
|
||||
} else {
|
||||
return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector<sstables::shared_sstable> sstables) {
|
||||
cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables));
|
||||
return make_ready_future<>();
|
||||
return forward_sstables_to(shard, directory, new_sstables_for_shard, cf, [cf, ancestors = std::move(ancestors)] (std::vector<sstables::shared_sstable> sstables) mutable {
|
||||
return cf->replace_ancestors_needed_rewrite(std::move(ancestors), std::move(sstables));
|
||||
});
|
||||
}
|
||||
}).then([&cf, sstables] {
|
||||
|
||||
48
table.cc
48
table.cc
@@ -1158,6 +1158,33 @@ table::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sst
|
||||
_sstables = make_lw_shared(std::move(new_sstable_list));
|
||||
}
|
||||
|
||||
future<>
|
||||
table::rebuild_sstable_list_with_deletion_sem(std::vector<sstables::shared_sstable> new_ssts, std::vector<sstables::shared_sstable> old_ssts) {
|
||||
// Resharding deletes shared sstables in the coordinator, so it's important to only remove its
|
||||
// input SSTables from the SSTable set after acquiring deletion semaphore to synchronize with
|
||||
// other processes that also acquire that semaphore and only then iterate through the SSTable
|
||||
// set, expecting that all SSTables will still do exist throughout the operation.
|
||||
//
|
||||
// FIXME: with_gate() returns a non-futurized exception if gate is closed, so let's futurize it.
|
||||
// This should be removed when with_gate() is made noexcept in newer API version of seastar.
|
||||
try {
|
||||
return seastar::with_gate(_sstable_deletion_gate, [this, new_ssts = std::move(new_ssts), old_ssts = std::move(old_ssts)] () mutable {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [this, new_ssts = std::move(new_ssts), old_ssts = std::move(old_ssts)] () mutable {
|
||||
rebuild_sstable_list(std::move(new_ssts), std::move(old_ssts));
|
||||
rebuild_statistics();
|
||||
trigger_compaction();
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr eptr) {
|
||||
tlogger.error("Failed to update SSTable set on behalf of resharding for {}.{}: {}", _schema->ks_name(), _schema->cf_name(), eptr);
|
||||
return make_exception_future<>(std::move(eptr));
|
||||
});
|
||||
} catch (...) {
|
||||
auto eptr = std::current_exception();
|
||||
tlogger.error("Failed to update SSTable set on behalf of resharding for {}.{}: {}", _schema->ks_name(), _schema->cf_name(), eptr);
|
||||
return make_exception_future<>(std::move(eptr));
|
||||
}
|
||||
}
|
||||
|
||||
// Note: must run in a seastar thread
|
||||
void
|
||||
table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
@@ -1231,7 +1258,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
// For replace/remove_ancestors_needed_write, note that we need to update the compaction backlog
|
||||
// manually. The new tables will be coming from a remote shard and thus unaccounted for in our
|
||||
// list so far, and the removed ones will no longer be needed by us.
|
||||
void table::replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors, std::vector<sstables::shared_sstable> new_sstables) {
|
||||
future<> table::replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors, std::vector<sstables::shared_sstable> new_sstables) {
|
||||
std::vector<sstables::shared_sstable> old_sstables;
|
||||
|
||||
for (auto& sst : new_sstables) {
|
||||
@@ -1246,24 +1273,11 @@ void table::replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancest
|
||||
_sstables_need_rewrite.erase(it);
|
||||
}
|
||||
}
|
||||
rebuild_sstable_list(new_sstables, old_sstables);
|
||||
rebuild_statistics();
|
||||
trigger_compaction();
|
||||
return rebuild_sstable_list_with_deletion_sem(std::move(new_sstables), std::move(old_sstables));
|
||||
}
|
||||
|
||||
void table::remove_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors) {
|
||||
std::vector<sstables::shared_sstable> old_sstables;
|
||||
for (auto& ancestor : ancestors) {
|
||||
auto it = _sstables_need_rewrite.find(ancestor);
|
||||
if (it != _sstables_need_rewrite.end()) {
|
||||
old_sstables.push_back(it->second);
|
||||
_compaction_strategy.get_backlog_tracker().remove_sstable(it->second);
|
||||
_sstables_need_rewrite.erase(it);
|
||||
}
|
||||
}
|
||||
rebuild_sstable_list({}, old_sstables);
|
||||
rebuild_statistics();
|
||||
trigger_compaction();
|
||||
future<> table::remove_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors) {
|
||||
return replace_ancestors_needed_rewrite(std::move(ancestors), {});
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
Reference in New Issue
Block a user