table: Make correctness of concurrent sstable list update robust
Today, table relies on row_cache::invalidate() serialization for concurrent sstable list updates to produce correct results. That's very error prone because table is relying on an implementation detail of invalidate() to get things right. Instead, let's make table itself take care of serialization on concurrent updates. To achieve that, sstable_list_builder is introduced. Only one builder can be alive for a given table, so serialization is guaranteed as long as the builder is kept alive throughout the update procedure. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20210721001716.210281-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
84c9bf2b63
commit
e4eb7df1a1
31
database.hh
31
database.hh
@@ -428,6 +428,8 @@ private:
|
||||
// This semaphore ensures that off-strategy compaction will be serialized and also
|
||||
// protects against candidates being picked more than once.
|
||||
seastar::named_semaphore _off_strategy_sem = {1, named_semaphore_exception_factory{"off-strategy compaction"}};
|
||||
// Ensures that concurrent updates to sstable set will work correctly
|
||||
seastar::named_semaphore _sstable_set_mutation_sem = {1, named_semaphore_exception_factory{"sstable set mutation"}};
|
||||
mutable row_cache _cache; // Cache covers only sstables.
|
||||
std::optional<int64_t> _sstable_generation = {};
|
||||
|
||||
@@ -509,6 +511,25 @@ public:
|
||||
void notify_bootstrap_or_replace_start();
|
||||
|
||||
void notify_bootstrap_or_replace_end();
|
||||
|
||||
// Ensures that concurrent preemptible mutations to sstable lists will produce correct results.
|
||||
// User will hold this permit until done with all updates. As soon as it's released, another concurrent
|
||||
// attempt to update the lists will be able to proceed.
|
||||
struct sstable_list_builder {
|
||||
using permit_t = semaphore_units<seastar::named_semaphore_exception_factory>;
|
||||
permit_t permit;
|
||||
|
||||
explicit sstable_list_builder(permit_t p) : permit(std::move(p)) {}
|
||||
sstable_list_builder& operator=(const sstable_list_builder&) = delete;
|
||||
sstable_list_builder(const sstable_list_builder&) = delete;
|
||||
|
||||
// Builds new sstable set from existing one, with new sstables added to it and old sstables removed from it.
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
build_new_list(const sstables::sstable_set& current_sstables,
|
||||
sstables::sstable_set new_sstable_list,
|
||||
const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables);
|
||||
};
|
||||
private:
|
||||
bool cache_enabled() const {
|
||||
return _config.enable_cache && _schema->caching_options().enabled();
|
||||
@@ -554,16 +575,6 @@ private:
|
||||
return sstable_generation % smp::count;
|
||||
}
|
||||
|
||||
// Builds new sstable set from existing one, with new sstables added to it and old sstables removed from it.
|
||||
// As this function may preempt, its caller must guarantee that concurrent calls will be serialized in order
|
||||
// to avoid corruption of the sstable set.
|
||||
// Today, the serialization is always performed through row_cache's update semaphore.
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
build_new_sstable_list(const sstables::sstable_set& current_sstables,
|
||||
sstables::sstable_set new_sstable_list,
|
||||
const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables);
|
||||
|
||||
future<>
|
||||
update_sstable_lists_on_off_strategy_completion(const std::vector<sstables::shared_sstable>& old_maintenance_sstables,
|
||||
const std::vector<sstables::shared_sstable>& new_main_sstables);
|
||||
|
||||
31
table.cc
31
table.cc
@@ -734,7 +734,7 @@ void table::rebuild_statistics() {
|
||||
}
|
||||
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
table::build_new_sstable_list(const sstables::sstable_set& current_sstables,
|
||||
table::sstable_list_builder::build_new_list(const sstables::sstable_set& current_sstables,
|
||||
sstables::sstable_set new_sstable_list,
|
||||
const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
@@ -758,31 +758,33 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector<sstable
|
||||
class sstable_lists_updater : public row_cache::external_updater_impl {
|
||||
using sstables_t = std::vector<sstables::shared_sstable>;
|
||||
table& _t;
|
||||
sstable_list_builder _builder;
|
||||
const sstables_t& _old_maintenance;
|
||||
const sstables_t& _new_main;
|
||||
lw_shared_ptr<sstables::sstable_set> _new_maintenance_list;
|
||||
lw_shared_ptr<sstables::sstable_set> _new_main_list;
|
||||
public:
|
||||
explicit sstable_lists_updater(table& t, const sstables_t& old_maintenance, const sstables_t& new_main)
|
||||
: _t(t), _old_maintenance(old_maintenance), _new_main(new_main) {
|
||||
explicit sstable_lists_updater(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main)
|
||||
: _t(t), _builder(std::move(permit)), _old_maintenance(old_maintenance), _new_main(new_main) {
|
||||
}
|
||||
virtual future<> prepare() override {
|
||||
sstables_t empty;
|
||||
// adding new sstables, created by off-strategy operation, to main set
|
||||
_new_main_list = co_await _t.build_new_sstable_list(*_t._main_sstables, _t._compaction_strategy.make_sstable_set(_t._schema), _new_main, empty);
|
||||
_new_main_list = co_await _builder.build_new_list(*_t._main_sstables, _t._compaction_strategy.make_sstable_set(_t._schema), _new_main, empty);
|
||||
// removing old sstables, used as input by off-strategy, from the maintenance set
|
||||
_new_maintenance_list = co_await _t.build_new_sstable_list(*_t._maintenance_sstables, std::move(*_t.make_maintenance_sstable_set()), empty, _old_maintenance);
|
||||
_new_maintenance_list = co_await _builder.build_new_list(*_t._maintenance_sstables, std::move(*_t.make_maintenance_sstable_set()), empty, _old_maintenance);
|
||||
}
|
||||
virtual void execute() override {
|
||||
_t._main_sstables = std::move(_new_main_list);
|
||||
_t._maintenance_sstables = std::move(_new_maintenance_list);
|
||||
_t.refresh_compound_sstable_set();
|
||||
}
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, const sstables_t& old_maintenance, const sstables_t& new_main) {
|
||||
return std::make_unique<sstable_lists_updater>(t, old_maintenance, new_main);
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) {
|
||||
return std::make_unique<sstable_lists_updater>(t, std::move(permit), old_maintenance, new_main);
|
||||
}
|
||||
};
|
||||
auto updater = row_cache::external_updater(sstable_lists_updater::make(*this, old_maintenance_sstables, new_main_sstables));
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
auto updater = row_cache::external_updater(sstable_lists_updater::make(*this, std::move(permit), old_maintenance_sstables, new_main_sstables));
|
||||
|
||||
// row_cache::invalidate() is only used to synchronize sstable list updates, to prevent race conditions from occurring,
|
||||
// meaning nothing is actually invalidated.
|
||||
@@ -836,24 +838,25 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
|
||||
class sstable_list_updater : public row_cache::external_updater_impl {
|
||||
table& _t;
|
||||
sstable_list_builder _builder;
|
||||
const sstables::compaction_completion_desc& _desc;
|
||||
lw_shared_ptr<sstables::sstable_set> _new_sstables;
|
||||
public:
|
||||
explicit sstable_list_updater(table& t, sstables::compaction_completion_desc& d) : _t(t), _desc(d) {}
|
||||
explicit sstable_list_updater(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) : _t(t), _builder(std::move(permit)), _desc(d) {}
|
||||
virtual future<> prepare() override {
|
||||
_new_sstables = co_await _t.build_new_sstable_list(*_t._main_sstables, _t._compaction_strategy.make_sstable_set(_t._schema), _desc.new_sstables, _desc.old_sstables);
|
||||
_new_sstables = co_await _builder.build_new_list(*_t._main_sstables, _t._compaction_strategy.make_sstable_set(_t._schema), _desc.new_sstables, _desc.old_sstables);
|
||||
}
|
||||
virtual void execute() override {
|
||||
_t._main_sstables = std::move(_new_sstables);
|
||||
_t.refresh_compound_sstable_set();
|
||||
}
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstables::compaction_completion_desc& d) {
|
||||
return std::make_unique<sstable_list_updater>(t, d);
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) {
|
||||
return std::make_unique<sstable_list_updater>(t, std::move(permit), d);
|
||||
}
|
||||
};
|
||||
auto updater = row_cache::external_updater(sstable_list_updater::make(*this, desc));
|
||||
auto permit = seastar::get_units(_sstable_set_mutation_sem, 1).get0();
|
||||
auto updater = row_cache::external_updater(sstable_list_updater::make(*this, std::move(permit), desc));
|
||||
|
||||
// row_cache's invalidate() guarantees that updates are serialized, so concurrent updates will work as intended.
|
||||
_cache.invalidate(std::move(updater), std::move(desc.ranges_for_cache_invalidation)).get();
|
||||
|
||||
// refresh underlying data source in row cache to prevent it from holding reference
|
||||
|
||||
@@ -53,7 +53,9 @@ public:
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_main_sstables = _cf->build_new_sstable_list(*_cf->_main_sstables, _cf->_compaction_strategy.make_sstable_set(_cf->schema()), new_sstables, sstables_to_remove).get0();
|
||||
auto permit = seastar::get_units(_cf->_sstable_set_mutation_sem, 1).get0();
|
||||
auto builder = table::sstable_list_builder(std::move(permit));
|
||||
_cf->_main_sstables = builder.build_new_list(*_cf->_main_sstables, _cf->_compaction_strategy.make_sstable_set(_cf->schema()), new_sstables, sstables_to_remove).get0();
|
||||
_cf->refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user