diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index 3d0b58a4c4..c6ee9220e2 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -479,6 +479,16 @@ future<> save_system_keyspace_schema() { } #endif + static semaphore the_merge_lock; + + future<> merge_lock() { + return smp::submit_to(0, [] { return the_merge_lock.wait(); }); + } + + future<> merge_unlock() { + return smp::submit_to(0, [] { the_merge_lock.signal(); }); + } + /** * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects * (which also involves fs operations on add/drop ks/cf) @@ -490,12 +500,25 @@ future<> save_system_keyspace_schema() { */ future<> merge_schema(service::storage_proxy& proxy, std::vector mutations) { - return merge_schema(proxy, std::move(mutations), true).then([&proxy] { - return update_schema_version_and_announce(proxy); + return merge_lock().then([&proxy, mutations = std::move(mutations)] { + return do_merge_schema(proxy, std::move(mutations), true).then([&proxy] { + return update_schema_version_and_announce(proxy); + }); + }).finally([] { + return merge_unlock(); }); } future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush) + { + return merge_lock().then([&proxy, mutations = std::move(mutations), do_flush] { + return merge_schema(proxy, std::move(mutations), do_flush); + }).finally([] { + return merge_unlock(); + }); + } + + future<> do_merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush) { return seastar::async([&proxy, mutations = std::move(mutations), do_flush] { schema_ptr s = keyspaces(); @@ -532,8 +555,6 @@ future<> save_system_keyspace_schema() { /*auto& new_functions = */read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0(); /*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0(); - // FIXME: Make the update atomic like in Origin. - std::set keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0(); merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)).get0(); #if 0 diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index 3bd0305e8c..092d64ec43 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -65,6 +65,8 @@ future<> merge_schema(service::storage_proxy& proxy, std::vector mutat future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush); +future<> do_merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush); + future> merge_keyspaces(service::storage_proxy& proxy, schema_result&& before, schema_result&& after); std::vector make_create_keyspace_mutations(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);