From d0fef3e31b9f4e86245fa347a4ae678d3219dc6b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 26 Mar 2015 10:22:42 +0200 Subject: [PATCH] db: Apply mutations in legacy_schema_tables::merge_schema() Pass a reference to storage_proxy and apply mutations in legacy_schema_tables::merge_schema(). Signed-off-by: Pekka Enberg --- cql3/statements/create_keyspace_statement.hh | 4 ++-- cql3/statements/create_table_statement.hh | 4 ++-- cql3/statements/schema_altering_statement.cc | 2 +- cql3/statements/schema_altering_statement.hh | 2 +- db/legacy_schema_tables.cc | 16 ++++++------- db/legacy_schema_tables.hh | 6 ++--- service/migration_manager.hh | 24 ++++++++++---------- 7 files changed, 29 insertions(+), 29 deletions(-) diff --git a/cql3/statements/create_keyspace_statement.hh b/cql3/statements/create_keyspace_statement.hh index a312c8632a..e7f60c37e1 100644 --- a/cql3/statements/create_keyspace_statement.hh +++ b/cql3/statements/create_keyspace_statement.hh @@ -122,8 +122,8 @@ public: #endif } - virtual future announce_migration(bool is_local_only) override { - return service::migration_manager::announce_new_keyspace(_attrs->as_ks_metadata(_name), is_local_only).then_wrapped([this] (auto&& f) { + virtual future announce_migration(service::storage_proxy& proxy, bool is_local_only) override { + return service::migration_manager::announce_new_keyspace(proxy, _attrs->as_ks_metadata(_name), is_local_only).then_wrapped([this] (auto&& f) { try { f.get(); return true; diff --git a/cql3/statements/create_table_statement.hh b/cql3/statements/create_table_statement.hh index f70733e7a7..3c08978852 100644 --- a/cql3/statements/create_table_statement.hh +++ b/cql3/statements/create_table_statement.hh @@ -144,8 +144,8 @@ public: } #endif - virtual future announce_migration(bool is_local_only) override { - return service::migration_manager::announce_new_column_family(get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) { + virtual future announce_migration(service::storage_proxy& proxy, bool is_local_only) override { + return service::migration_manager::announce_new_column_family(proxy, get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) { try { f.get(); return true; diff --git a/cql3/statements/schema_altering_statement.cc b/cql3/statements/schema_altering_statement.cc index 1de60f2fd7..1d81e6dcc2 100644 --- a/cql3/statements/schema_altering_statement.cc +++ b/cql3/statements/schema_altering_statement.cc @@ -34,7 +34,7 @@ future<::shared_ptr> schema_altering_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) { // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing // extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600) - return announce_migration(false).then([this] (bool did_change_schema) { + return announce_migration(proxy, false).then([this] (bool did_change_schema) { if (!did_change_schema) { auto result = ::make_shared(); return make_ready_future<::shared_ptr>(result); diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh index fd07eb7b33..e9c0ad70f9 100644 --- a/cql3/statements/schema_altering_statement.hh +++ b/cql3/statements/schema_altering_statement.hh @@ -93,7 +93,7 @@ protected: * is used, for example) * @throws RequestValidationException */ - virtual future announce_migration(bool is_local_only) = 0; + virtual future announce_migration(service::storage_proxy& proxy, bool is_local_only) = 0; virtual future<::shared_ptr> execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override; diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index fc4ebaf53b..bae4ad6357 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -449,9 +449,9 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE * @throws ConfigurationException If one of metadata attributes has invalid value * @throws IOException If data was corrupted during transportation or failed to apply fs operations */ - future<> merge_schema(std::vector mutations) + future<> merge_schema(service::storage_proxy& proxy, std::vector mutations) { - return merge_schema(std::move(mutations), true).then([] { + return merge_schema(proxy, std::move(mutations), true).then([] { #if 0 Schema.instance.updateVersionAndAnnounce(); #endif @@ -459,9 +459,8 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE }); } - future<> merge_schema(std::vector mutations, bool do_flush) + future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush) { - return make_ready_future<>(); #if 0 // compare before/after schemas of the affected keyspaces only Set keyspaces = new HashSet<>(mutations.size()); @@ -474,10 +473,9 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE Map oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); Map oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); Map oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); - - for (Mutation mutation : mutations) - mutation.apply(); - +#endif + return proxy.mutate_locally(std::move(mutations)).then([] { +#if 0 if (doFlush) flushSchemaTables(); @@ -498,6 +496,8 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE for (String keyspaceToDrop : keyspacesToDrop) Schema.instance.dropKeyspace(keyspaceToDrop); #endif + return make_ready_future<>(); + }); } #if 0 diff --git a/db/legacy_schema_tables.hh b/db/legacy_schema_tables.hh index 4adaefc0ba..59a7bc6f77 100644 --- a/db/legacy_schema_tables.hh +++ b/db/legacy_schema_tables.hh @@ -23,8 +23,8 @@ #pragma once +#include "service/storage_proxy.hh" #include "config/ks_meta_data.hh" - #include "database.hh" #include "schema.hh" @@ -46,9 +46,9 @@ extern std::vector ALL; std::vector all_tables(); -future<> merge_schema(std::vector mutations); +future<> merge_schema(service::storage_proxy& proxy, std::vector mutations); -future<> merge_schema(std::vector mutations, bool do_flush); +future<> merge_schema(service::storage_proxy& proxy, std::vector mutations, bool do_flush); mutation make_create_keyspace_mutation(lw_shared_ptr keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true); diff --git a/service/migration_manager.hh b/service/migration_manager.hh index cf18dcd1fb..dfd74ae5f1 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -267,17 +267,17 @@ public: } #endif - static future<> announce_new_keyspace(lw_shared_ptr ksm) + static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm) { - return announce_new_keyspace(ksm, false); + return announce_new_keyspace(proxy, ksm, false); } - static future<> announce_new_keyspace(lw_shared_ptr ksm, bool announce_locally) + static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, bool announce_locally) { - return announce_new_keyspace(ksm, db_clock::now_in_usecs(), announce_locally); + return announce_new_keyspace(proxy, ksm, db_clock::now_in_usecs(), announce_locally); } - static future<> announce_new_keyspace(lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally) + static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr ksm, api::timestamp_type timestamp, bool announce_locally) { #if 0 ksm.validate(); @@ -287,7 +287,7 @@ public: logger.info(String.format("Create new Keyspace: %s", ksm)); #endif - return announce(db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally); + return announce(proxy, db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally); } #if 0 @@ -297,7 +297,7 @@ public: } #endif - static future<> announce_new_column_family(schema_ptr cfm, bool announce_locally) { + static future<> announce_new_column_family(service::storage_proxy& proxy, schema_ptr cfm, bool announce_locally) { warn(unimplemented::cause::MIGRATIONS); return make_ready_future<>(); #if 0 @@ -438,14 +438,14 @@ public: * actively announce a new version to active hosts via rpc * @param schema The schema mutation to be applied */ - static future<> announce(mutation schema, bool announce_locally) + static future<> announce(service::storage_proxy& proxy, mutation schema, bool announce_locally) { std::vector mutations; mutations.emplace_back(std::move(schema)); if (announce_locally) { - return db::legacy_schema_tables::merge_schema(std::move(mutations), false); + return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false); } else { - return announce(std::move(mutations)); + return announce(proxy, std::move(mutations)); } } @@ -460,9 +460,9 @@ public: #endif // Returns a future on the local application of the schema - static future<> announce(std::vector schema) + static future<> announce(service::storage_proxy& proxy, std::vector schema) { - auto f = db::legacy_schema_tables::merge_schema(std::move(schema)); + auto f = db::legacy_schema_tables::merge_schema(proxy, std::move(schema)); #if 0 for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) {