Avoid duplicated read_keyspace_mutation calls

There were many calls to read_keyspace_mutation. One in each function
that prepares a mutation for some other schema change.

With this patch they are all moved to a single location.

Tests: unit (dev, debug)

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20190328024440.26201-1-espindola@scylladb.com>
This commit is contained in:
Rafael Ávila de Espíndola
2019-03-27 19:44:40 -07:00
committed by Avi Kivity
parent d180caea89
commit 6191fd7701
4 changed files with 72 additions and 97 deletions

View File

@@ -111,15 +111,6 @@ logging::logger slogger("schema_tables");
const sstring version = "3";
struct push_back_and_return {
std::vector<mutation> muts;
std::vector<mutation> operator()(mutation&& m) {
muts.emplace_back(std::move(m));
return std::move(muts);
}
};
struct qualified_name {
sstring keyspace_name;
sstring table_name;
@@ -1461,16 +1452,14 @@ void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp,
mutations.emplace_back(std::move(m));
}
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
add_type_to_schema_mutation(type, timestamp, mutations);
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
schema_ptr s = types();
@@ -1480,21 +1469,19 @@ future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_me
m.partition().apply_delete(*s, ckey, tombstone(timestamp, gc_clock::now()));
mutations.emplace_back(std::move(m));
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
/*
* Table metadata serialization/deserialization.
*/
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
add_table_or_view_to_schema_mutation(table, timestamp, true, mutations);
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
static void add_table_params_to_mutations(mutation& m, const clustering_key& ckey, schema_ptr table, api::timestamp_type timestamp) {
@@ -1764,7 +1751,7 @@ static void make_update_columns_mutations(schema_ptr old_table,
}
}
future<std::vector<mutation>> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
std::vector<mutation> make_update_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
api::timestamp_type timestamp,
@@ -1788,8 +1775,7 @@ future<std::vector<mutation>> make_update_table_mutations(lw_shared_ptr<keyspace
addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
#endif
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
static void make_drop_table_or_view_mutations(schema_ptr schema_table,
@@ -1818,7 +1804,7 @@ static void make_drop_table_or_view_mutations(schema_ptr schema_table,
}
}
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
make_drop_table_or_view_mutations(tables(), std::move(table), timestamp, mutations);
@@ -1832,8 +1818,7 @@ future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_m
for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes())
indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp);
#endif
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s)
@@ -2414,16 +2399,14 @@ schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timesta
return s->is_view() ? make_view_mutations(view_ptr(s), timestamp, with_columns) : make_table_mutations(s, timestamp, with_columns);
}
future<std::vector<mutation>> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp)
std::vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp)
{
std::vector<mutation> mutations;
// And also the serialized base table.
auto base = keyspace->cf_meta_data().at(view->view_info()->base_name());
add_table_or_view_to_schema_mutation(base, timestamp, true, mutations);
add_table_or_view_to_schema_mutation(view, timestamp, true, mutations);
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
/**
@@ -2431,7 +2414,7 @@ future<std::vector<mutation>> make_create_view_mutations(lw_shared_ptr<keyspace_
* case, the new base schema isn't yet loaded, thus can't be accessed from this
* function.
*/
future<std::vector<mutation>> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace,
view_ptr old_view,
view_ptr new_view,
api::timestamp_type timestamp,
@@ -2445,16 +2428,13 @@ future<std::vector<mutation>> make_update_view_mutations(lw_shared_ptr<keyspace_
}
add_table_or_view_to_schema_mutation(new_view, timestamp, false, mutations);
make_update_columns_mutations(old_view, new_view, timestamp, false, mutations);
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
future<std::vector<mutation>> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp) {
std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp) {
std::vector<mutation> mutations;
make_drop_table_or_view_mutations(views(), view, timestamp, mutations);
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return read_keyspace_mutation(service::get_storage_proxy(), keyspace->name()).then(push_back_and_return{std::move(mutations)});
return mutations;
}
#if 0

View File

@@ -158,17 +158,17 @@ std::vector<mutation> make_drop_keyspace_mutations(lw_shared_ptr<keyspace_metada
lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const schema_result_value_type& partition);
future<std::vector<mutation>> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
std::vector<user_type> create_types_from_schema_partition(const schema_result_value_type& result);
future<std::vector<mutation>> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp, std::vector<mutation>& mutations);
future<std::vector<mutation>> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
std::vector<mutation> make_create_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
future<std::vector<mutation>> make_update_table_mutations(
std::vector<mutation> make_update_table_mutations(
lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
@@ -177,7 +177,7 @@ future<std::vector<mutation>> make_update_table_mutations(
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
future<std::vector<mutation>> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
@@ -190,11 +190,11 @@ mutation make_scylla_tables_mutation(schema_ptr, api::timestamp_type timestamp);
void add_table_or_view_to_schema_mutation(schema_ptr view, api::timestamp_type timestamp, bool with_columns, std::vector<mutation>& mutations);
future<std::vector<mutation>> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
std::vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
future<std::vector<mutation>> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp, bool include_base);
std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp, bool include_base);
future<std::vector<mutation>> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);

View File

@@ -525,6 +525,16 @@ future<> migration_manager::announce_new_column_family(schema_ptr cfm, bool anno
return announce_new_column_family(std::move(cfm), api::new_timestamp(), announce_locally);
}
static future<> include_keyspace_and_announce(
const keyspace_metadata& keyspace, std::vector<mutation> mutations, bool announce_locally) {
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
return db::schema_tables::read_keyspace_mutation(service::get_storage_proxy(), keyspace.name())
.then([announce_locally, mutations = std::move(mutations)] (mutation m) mutable {
mutations.push_back(std::move(m));
return migration_manager::announce(std::move(mutations), announce_locally);
});
}
future<> migration_manager::announce_new_column_family(schema_ptr cfm, api::timestamp_type timestamp, bool announce_locally) {
#if 0
cfm.validate();
@@ -540,10 +550,8 @@ future<> migration_manager::announce_new_column_family(schema_ptr cfm, api::time
}
mlogger.info("Create new ColumnFamily: {}", cfm);
return db::schema_tables::make_create_table_mutations(keyspace.metadata(), cfm, timestamp)
.then([announce_locally, this] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
auto mutations = db::schema_tables::make_create_table_mutations(keyspace.metadata(), cfm, timestamp);
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
} catch (const no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
}
@@ -563,22 +571,21 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
#endif
mlogger.info("Update table '{}.{}' From {} To {}", cfm->ks_name(), cfm->cf_name(), *old_schema, *cfm);
auto&& keyspace = db.find_keyspace(cfm->ks_name()).metadata();
return db::schema_tables::make_update_table_mutations(keyspace, old_schema, cfm, ts, from_thrift)
.then([announce_locally, keyspace, ts, view_updates = std::move(view_updates)] (auto&& mutations) {
return map_reduce(view_updates,
[keyspace = std::move(keyspace), ts] (auto&& view) {
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
return db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, false);
}, std::move(mutations),
[] (auto&& result, auto&& view_mutations) {
std::move(view_mutations.begin(), view_mutations.end(), std::back_inserter(result));
return std::move(result);
})
.then([announce_locally] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
});
auto mutations = db::schema_tables::make_update_table_mutations(keyspace, old_schema, cfm, ts, from_thrift);
return map_reduce(view_updates,
[keyspace, ts] (auto&& view) {
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, false);
return make_ready_future<std::vector<mutation>>(std::move(mutations));
}, std::move(mutations),
[] (auto&& result, auto&& view_mutations) {
std::move(view_mutations.begin(), view_mutations.end(), std::back_inserter(result));
return std::move(result);
})
.then([keyspace, announce_locally] (auto&& mutations) {
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
});
} catch (const no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
cfm->cf_name(), cfm->ks_name()));
@@ -588,10 +595,8 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
static future<> do_announce_new_type(user_type new_type, bool announce_locally) {
auto& db = get_local_storage_proxy().get_db().local();
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
return db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp())
.then([announce_locally] (auto&& mutations) {
return migration_manager::announce(std::move(mutations), announce_locally);
});
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp());
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
}
future<> migration_manager::announce_new_type(user_type new_type, bool announce_locally) {
@@ -690,19 +695,16 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name,
}
mlogger.info("Drop table '{}.{}'", schema->ks_name(), schema->cf_name());
auto maybe_drop_secondary_indexes = make_ready_future<std::vector<mutation>>();
std::vector<mutation> drop_si_mutations;
auto keyspace = db.find_keyspace(ks_name).metadata();
if (!schema->all_indices().empty()) {
auto builder = schema_builder(schema).without_indexes();
maybe_drop_secondary_indexes = db::schema_tables::make_update_table_mutations(db.find_keyspace(ks_name).metadata(), schema, builder.build(), api::new_timestamp(), false);
drop_si_mutations = db::schema_tables::make_update_table_mutations(keyspace, schema, builder.build(), api::new_timestamp(), false);
}
return maybe_drop_secondary_indexes.then([announce_locally, ks_name, schema, &db, &old_cfm] (auto&& drop_si_mutations) {
return db::schema_tables::make_drop_table_mutations(db.find_keyspace(ks_name).metadata(), schema, api::new_timestamp())
.then([drop_si_mutations = std::move(drop_si_mutations), announce_locally] (auto&& mutations) mutable {
mutations.insert(mutations.end(), std::make_move_iterator(drop_si_mutations.begin()), std::make_move_iterator(drop_si_mutations.end()));
return announce(std::move(mutations), announce_locally);
});
});
auto mutations = db::schema_tables::make_drop_table_mutations(keyspace, schema, api::new_timestamp());
mutations.insert(mutations.end(), std::make_move_iterator(drop_si_mutations.begin()), std::make_move_iterator(drop_si_mutations.end()));
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
} catch (const no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name));
}
@@ -714,10 +716,9 @@ future<> migration_manager::announce_type_drop(user_type dropped_type, bool anno
auto& db = get_local_storage_proxy().get_db().local();
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
return db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp())
.then([announce_locally] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
auto mutations =
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp());
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
}
future<> migration_manager::announce_new_view(view_ptr view, bool announce_locally)
@@ -732,10 +733,8 @@ future<> migration_manager::announce_new_view(view_ptr view, bool announce_local
throw exceptions::already_exists_exception(view->ks_name(), view->cf_name());
}
mlogger.info("Create new view: {}", view);
return db::schema_tables::make_create_view_mutations(keyspace, std::move(view), api::new_timestamp())
.then([announce_locally] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), api::new_timestamp());
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
} catch (const no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.", view->cf_name(), view->ks_name()));
}
@@ -757,10 +756,8 @@ future<> migration_manager::announce_view_update(view_ptr view, bool announce_lo
oldCfm.validateCompatility(cfm);
#endif
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
return db::schema_tables::make_update_view_mutations(std::move(keyspace), view_ptr(old_view), std::move(view), api::new_timestamp(), true)
.then([announce_locally] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), api::new_timestamp(), true);
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
} catch (const std::out_of_range& e) {
throw exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
view->cf_name(), view->ks_name()));
@@ -782,10 +779,8 @@ future<> migration_manager::announce_view_drop(const sstring& ks_name,
}
auto keyspace = db.find_keyspace(ks_name).metadata();
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
return db::schema_tables::make_drop_view_mutations(std::move(keyspace), view_ptr(std::move(view)), api::new_timestamp())
.then([announce_locally] (auto&& mutations) {
return announce(std::move(mutations), announce_locally);
});
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), api::new_timestamp());
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
} catch (const no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
cf_name, ks_name));

View File

@@ -158,7 +158,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) {
{
auto&& keyspace = e.db().local().find_keyspace(s0->ks_name()).metadata();
auto muts = db::schema_tables::make_update_table_mutations(keyspace, s0, s2,
api::new_timestamp(), false).get0();
api::new_timestamp(), false);
mm.announce(std::move(muts), true).get();
}
@@ -284,7 +284,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
// Drop v1
{
auto muts = db::schema_tables::make_update_table_mutations(keyspace, s1, s2,
api::new_timestamp(), false).get0();
api::new_timestamp(), false);
mm.announce(std::move(muts), true).get();
}
@@ -301,7 +301,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
.build();
auto muts = db::schema_tables::make_update_table_mutations(keyspace, s3, s4,
api::new_timestamp(), false).get0();
api::new_timestamp(), false);
mm.announce(std::move(muts), true).get();
}
@@ -329,7 +329,7 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
return e.db().local().find_column_family("ks", "table1");
};
auto muts1 = db::schema_tables::make_create_table_mutations(keyspace, s0, api::new_timestamp()).get0();
auto muts1 = db::schema_tables::make_create_table_mutations(keyspace, s0, api::new_timestamp());
mm.announce(muts1).get();
auto s1 = find_table().schema();