mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
db/legacy_schema_tables: Merge schema locking
Add locking to merge_schema() to ensure only one CPU is able to fiddle with internals at a time. Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
@@ -479,6 +479,16 @@ future<> save_system_keyspace_schema() {
|
||||
}
|
||||
#endif
|
||||
|
||||
static semaphore the_merge_lock;
|
||||
|
||||
future<> merge_lock() {
|
||||
return smp::submit_to(0, [] { return the_merge_lock.wait(); });
|
||||
}
|
||||
|
||||
future<> merge_unlock() {
|
||||
return smp::submit_to(0, [] { the_merge_lock.signal(); });
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
|
||||
* (which also involves fs operations on add/drop ks/cf)
|
||||
@@ -490,12 +500,25 @@ future<> save_system_keyspace_schema() {
|
||||
*/
|
||||
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations)
|
||||
{
|
||||
return merge_schema(proxy, std::move(mutations), true).then([&proxy] {
|
||||
return update_schema_version_and_announce(proxy);
|
||||
return merge_lock().then([&proxy, mutations = std::move(mutations)] {
|
||||
return do_merge_schema(proxy, std::move(mutations), true).then([&proxy] {
|
||||
return update_schema_version_and_announce(proxy);
|
||||
});
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
});
|
||||
}
|
||||
|
||||
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] {
|
||||
return merge_schema(proxy, std::move(mutations), do_flush);
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
});
|
||||
}
|
||||
|
||||
future<> do_merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] {
|
||||
schema_ptr s = keyspaces();
|
||||
@@ -532,8 +555,6 @@ future<> save_system_keyspace_schema() {
|
||||
/*auto& new_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
|
||||
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
|
||||
// FIXME: Make the update atomic like in Origin.
|
||||
|
||||
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
|
||||
merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)).get0();
|
||||
#if 0
|
||||
|
||||
@@ -65,6 +65,8 @@ future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutat
|
||||
|
||||
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);
|
||||
|
||||
future<> do_merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);
|
||||
|
||||
future<std::set<sstring>> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after);
|
||||
|
||||
std::vector<mutation> make_create_keyspace_mutations(lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
|
||||
|
||||
Reference in New Issue
Block a user