db: Apply mutations in legacy_schema_tables::merge_schema()

Pass a reference to storage_proxy and apply mutations in
legacy_schema_tables::merge_schema().

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
Pekka Enberg
2015-03-26 10:22:42 +02:00
parent 5bab83b9e0
commit d0fef3e31b
7 changed files with 29 additions and 29 deletions

View File

@@ -122,8 +122,8 @@ public:
#endif
}
virtual future<bool> announce_migration(bool is_local_only) override {
return service::migration_manager::announce_new_keyspace(_attrs->as_ks_metadata(_name), is_local_only).then_wrapped([this] (auto&& f) {
virtual future<bool> announce_migration(service::storage_proxy& proxy, bool is_local_only) override {
return service::migration_manager::announce_new_keyspace(proxy, _attrs->as_ks_metadata(_name), is_local_only).then_wrapped([this] (auto&& f) {
try {
f.get();
return true;

View File

@@ -144,8 +144,8 @@ public:
}
#endif
virtual future<bool> announce_migration(bool is_local_only) override {
return service::migration_manager::announce_new_column_family(get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) {
virtual future<bool> announce_migration(service::storage_proxy& proxy, bool is_local_only) override {
return service::migration_manager::announce_new_column_family(proxy, get_cf_meta_data(), is_local_only).then_wrapped([this] (auto&& f) {
try {
f.get();
return true;

View File

@@ -34,7 +34,7 @@ future<::shared_ptr<messages::result_message>>
schema_altering_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) {
// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
return announce_migration(false).then([this] (bool did_change_schema) {
return announce_migration(proxy, false).then([this] (bool did_change_schema) {
if (!did_change_schema) {
auto result = ::make_shared<messages::result_message::void_message>();
return make_ready_future<::shared_ptr<messages::result_message>>(result);

View File

@@ -93,7 +93,7 @@ protected:
* is used, for example)
* @throws RequestValidationException
*/
virtual future<bool> announce_migration(bool is_local_only) = 0;
virtual future<bool> announce_migration(service::storage_proxy& proxy, bool is_local_only) = 0;
virtual future<::shared_ptr<messages::result_message>>
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) override;

View File

@@ -449,9 +449,9 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
future<> merge_schema(std::vector<mutation> mutations)
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations)
{
return merge_schema(std::move(mutations), true).then([] {
return merge_schema(proxy, std::move(mutations), true).then([] {
#if 0
Schema.instance.updateVersionAndAnnounce();
#endif
@@ -459,9 +459,8 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
});
}
future<> merge_schema(std::vector<mutation> mutations, bool do_flush)
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush)
{
return make_ready_future<>();
#if 0
// compare before/after schemas of the affected keyspaces only
Set<String> keyspaces = new HashSet<>(mutations.size());
@@ -474,10 +473,9 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces);
Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
for (Mutation mutation : mutations)
mutation.apply();
#endif
return proxy.mutate_locally(std::move(mutations)).then([] {
#if 0
if (doFlush)
flushSchemaTables();
@@ -498,6 +496,8 @@ std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE
for (String keyspaceToDrop : keyspacesToDrop)
Schema.instance.dropKeyspace(keyspaceToDrop);
#endif
return make_ready_future<>();
});
}
#if 0

View File

@@ -23,8 +23,8 @@
#pragma once
#include "service/storage_proxy.hh"
#include "config/ks_meta_data.hh"
#include "database.hh"
#include "schema.hh"
@@ -46,9 +46,9 @@ extern std::vector<const char*> ALL;
std::vector<schema_ptr> all_tables();
future<> merge_schema(std::vector<mutation> mutations);
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations);
future<> merge_schema(std::vector<mutation> mutations, bool do_flush);
future<> merge_schema(service::storage_proxy& proxy, std::vector<mutation> mutations, bool do_flush);
mutation make_create_keyspace_mutation(lw_shared_ptr<config::ks_meta_data> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);

View File

@@ -267,17 +267,17 @@ public:
}
#endif
static future<> announce_new_keyspace(lw_shared_ptr<config::ks_meta_data> ksm)
static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr<config::ks_meta_data> ksm)
{
return announce_new_keyspace(ksm, false);
return announce_new_keyspace(proxy, ksm, false);
}
static future<> announce_new_keyspace(lw_shared_ptr<config::ks_meta_data> ksm, bool announce_locally)
static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr<config::ks_meta_data> ksm, bool announce_locally)
{
return announce_new_keyspace(ksm, db_clock::now_in_usecs(), announce_locally);
return announce_new_keyspace(proxy, ksm, db_clock::now_in_usecs(), announce_locally);
}
static future<> announce_new_keyspace(lw_shared_ptr<config::ks_meta_data> ksm, api::timestamp_type timestamp, bool announce_locally)
static future<> announce_new_keyspace(service::storage_proxy& proxy, lw_shared_ptr<config::ks_meta_data> ksm, api::timestamp_type timestamp, bool announce_locally)
{
#if 0
ksm.validate();
@@ -287,7 +287,7 @@ public:
logger.info(String.format("Create new Keyspace: %s", ksm));
#endif
return announce(db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally);
return announce(proxy, db::legacy_schema_tables::make_create_keyspace_mutation(ksm, timestamp), announce_locally);
}
#if 0
@@ -297,7 +297,7 @@ public:
}
#endif
static future<> announce_new_column_family(schema_ptr cfm, bool announce_locally) {
static future<> announce_new_column_family(service::storage_proxy& proxy, schema_ptr cfm, bool announce_locally) {
warn(unimplemented::cause::MIGRATIONS);
return make_ready_future<>();
#if 0
@@ -438,14 +438,14 @@ public:
* actively announce a new version to active hosts via rpc
* @param schema The schema mutation to be applied
*/
static future<> announce(mutation schema, bool announce_locally)
static future<> announce(service::storage_proxy& proxy, mutation schema, bool announce_locally)
{
std::vector<mutation> mutations;
mutations.emplace_back(std::move(schema));
if (announce_locally) {
return db::legacy_schema_tables::merge_schema(std::move(mutations), false);
return db::legacy_schema_tables::merge_schema(proxy, std::move(mutations), false);
} else {
return announce(std::move(mutations));
return announce(proxy, std::move(mutations));
}
}
@@ -460,9 +460,9 @@ public:
#endif
// Returns a future on the local application of the schema
static future<> announce(std::vector<mutation> schema)
static future<> announce(service::storage_proxy& proxy, std::vector<mutation> schema)
{
auto f = db::legacy_schema_tables::merge_schema(std::move(schema));
auto f = db::legacy_schema_tables::merge_schema(proxy, std::move(schema));
#if 0
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{