migration_manager: drop announce_locally flag
It looks like the history of the flag begins in Cassandra's https://issues.apache.org/jira/browse/CASSANDRA-7327 where it is introduced to speedup tests by not needing to start the gossiper. The thing is we always start gossiper in our cql tests, so the flag only introduce noise. And, of course, since we want to move schema to use raft it goes against the nature of the raft to be able to apply modification only locally, so we better get rid of the capability ASAP. Tests: units(dev, debug) Message-Id: <20201230111101.4037543-2-gleb@scylladb.com>
This commit is contained in:
@@ -476,8 +476,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
return make_ready_future<request_return_type>(api_error::resource_not_found(
|
||||
format("Requested resource not found: Table: {} not found", table_name)));
|
||||
}
|
||||
return _mm.announce_column_family_drop(keyspace_name, table_name, false, service::migration_manager::drop_views::yes).then([this, keyspace_name] {
|
||||
return _mm.announce_keyspace_drop(keyspace_name, false);
|
||||
return _mm.announce_column_family_drop(keyspace_name, table_name, service::migration_manager::drop_views::yes).then([this, keyspace_name] {
|
||||
return _mm.announce_keyspace_drop(keyspace_name);
|
||||
}).then([table_name = std::move(table_name)] {
|
||||
// FIXME: need more attributes?
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
@@ -704,7 +704,7 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
|
||||
static future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
|
||||
schema_builder builder(schema);
|
||||
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(std::move(tags_map)));
|
||||
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>(), false);
|
||||
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>());
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
@@ -981,7 +981,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
return create_keyspace(keyspace_name).handle_exception_type([] (exceptions::already_exists_exception&) {
|
||||
// Ignore the fact that the keyspace may already exist. See discussion in #6340
|
||||
}).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
|
||||
return futurize_invoke([&] { return _mm.announce_new_column_family(schema, false); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
|
||||
return futurize_invoke([&] { return _mm.announce_new_column_family(schema); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
|
||||
return parallel_for_each(std::move(view_builders), [this, schema] (schema_builder builder) {
|
||||
return _mm.announce_new_view(view_ptr(builder.build()));
|
||||
}).then([this, table_info = std::move(table_info), schema, tags_map = std::move(tags_map)] () mutable {
|
||||
@@ -3549,7 +3549,7 @@ future<> executor::create_keyspace(std::string_view keyspace_name) {
|
||||
}
|
||||
auto opts = get_network_topology_options(rf);
|
||||
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
|
||||
return _mm.announce_new_keyspace(ksm, api::new_timestamp(), false);
|
||||
return _mm.announce_new_keyspace(ksm, api::new_timestamp());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ static future<> create_metadata_table_if_missing_impl(
|
||||
b.set_uuid(uuid);
|
||||
schema_ptr table = b.build();
|
||||
return ignore_existing([&mm, table = std::move(table)] () {
|
||||
return mm.announce_new_column_family(table, false);
|
||||
return mm.announce_new_column_family(table);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -154,7 +154,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
|
||||
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments.
|
||||
// See issue #2129.
|
||||
return mm.announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
return mm.announce_new_keyspace(ksm, api::min_timestamp);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -91,10 +91,10 @@ void cql3::statements::alter_keyspace_statement::validate(service::storage_proxy
|
||||
}
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> cql3::statements::alter_keyspace_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const {
|
||||
future<shared_ptr<cql_transport::event::schema_change>> cql3::statements::alter_keyspace_statement::announce_migration(service::storage_proxy& proxy) const {
|
||||
auto old_ksm = proxy.get_db().local().find_keyspace(_name).metadata();
|
||||
const auto& tm = *proxy.get_token_metadata_ptr();
|
||||
return service::get_local_migration_manager().announce_keyspace_update(_attrs->as_ks_metadata_update(old_ksm, tm), is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_keyspace_update(_attrs->as_ks_metadata_update(old_ksm, tm)).then([this] {
|
||||
using namespace cql_transport;
|
||||
return ::make_shared<event::schema_change>(
|
||||
event::schema_change::change_type::UPDATED,
|
||||
|
||||
@@ -61,7 +61,7 @@ public:
|
||||
|
||||
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
void validate(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
|
||||
@@ -288,7 +288,7 @@ void alter_table_statement::drop_column(const schema& schema, const table& cf, s
|
||||
}
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
auto& db = proxy.get_db().local();
|
||||
auto s = validation::validate_column_family(db, keyspace(), column_family());
|
||||
@@ -396,7 +396,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
break;
|
||||
}
|
||||
|
||||
return service::get_local_migration_manager().announce_column_family_update(cfm.build(), false, std::move(view_updates), is_local_only)
|
||||
return service::get_local_migration_manager().announce_column_family_update(cfm.build(), false, std::move(view_updates))
|
||||
.then([this] {
|
||||
using namespace cql_transport;
|
||||
return ::make_shared<event::schema_change>(
|
||||
|
||||
@@ -80,7 +80,7 @@ public:
|
||||
|
||||
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
virtual void validate(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
private:
|
||||
void add_column(const schema& schema, const table& cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
|
||||
|
||||
@@ -78,7 +78,7 @@ const sstring& alter_type_statement::keyspace() const
|
||||
return _name.get_keyspace();
|
||||
}
|
||||
|
||||
void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, bool is_local_only) const
|
||||
void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks) const
|
||||
{
|
||||
auto&& all_types = ks.metadata()->user_types().get_all_types();
|
||||
auto to_update = all_types.find(_name.get_user_type_name());
|
||||
@@ -100,7 +100,7 @@ void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, b
|
||||
|
||||
// Now, we need to announce the type update to basically change it for new tables using this type,
|
||||
// but we also need to find all existing user types and CF using it and change them.
|
||||
service::get_local_migration_manager().announce_type_update(updated, is_local_only).get();
|
||||
service::get_local_migration_manager().announce_type_update(updated).get();
|
||||
|
||||
for (auto&& schema : ks.metadata()->cf_meta_data() | boost::adaptors::map_values) {
|
||||
auto cfm = schema_builder(schema);
|
||||
@@ -115,21 +115,21 @@ void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, b
|
||||
}
|
||||
if (modified) {
|
||||
if (schema->is_view()) {
|
||||
service::get_local_migration_manager().announce_view_update(view_ptr(cfm.build()), is_local_only).get();
|
||||
service::get_local_migration_manager().announce_view_update(view_ptr(cfm.build())).get();
|
||||
} else {
|
||||
service::get_local_migration_manager().announce_column_family_update(cfm.build(), false, {}, is_local_only).get();
|
||||
service::get_local_migration_manager().announce_column_family_update(cfm.build(), false, {}).get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_type_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_type_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
return seastar::async([this, &proxy, is_local_only] {
|
||||
return seastar::async([this, &proxy] {
|
||||
auto&& db = proxy.get_db().local();
|
||||
try {
|
||||
auto&& ks = db.find_keyspace(keyspace());
|
||||
do_announce_migration(db, ks, is_local_only);
|
||||
do_announce_migration(db, ks);
|
||||
using namespace cql_transport;
|
||||
return ::make_shared<event::schema_change>(
|
||||
event::schema_change::change_type::UPDATED,
|
||||
|
||||
@@ -63,14 +63,14 @@ public:
|
||||
|
||||
virtual const sstring& keyspace() const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
class add_or_alter;
|
||||
class renames;
|
||||
protected:
|
||||
virtual user_type make_updated_type(database& db, user_type to_update) const = 0;
|
||||
private:
|
||||
void do_announce_migration(database& db, ::keyspace& ks, bool is_local_only) const;
|
||||
void do_announce_migration(database& db, ::keyspace& ks) const;
|
||||
};
|
||||
|
||||
class alter_type_statement::add_or_alter : public alter_type_statement {
|
||||
|
||||
@@ -76,7 +76,7 @@ void alter_view_statement::validate(service::storage_proxy&, const service::clie
|
||||
// validated in announce_migration()
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_view_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> alter_view_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
auto&& db = proxy.get_db().local();
|
||||
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
|
||||
@@ -108,7 +108,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_view_statement::an
|
||||
"the corresponding data in the parent table.");
|
||||
}
|
||||
|
||||
return service::get_local_migration_manager().announce_view_update(view_ptr(builder.build()), is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_view_update(view_ptr(builder.build())).then([this] {
|
||||
using namespace cql_transport;
|
||||
|
||||
return ::make_shared<event::schema_change>(
|
||||
|
||||
@@ -63,7 +63,7 @@ public:
|
||||
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
@@ -59,11 +59,11 @@ std::unique_ptr<prepared_statement> create_function_statement::prepare(database&
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_function_statement::announce_migration(
|
||||
service::storage_proxy& proxy, bool is_local_only) const {
|
||||
service::storage_proxy& proxy) const {
|
||||
if (!_func) {
|
||||
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>();
|
||||
}
|
||||
return service::get_local_migration_manager().announce_new_function(_func, is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_new_function(_func).then([this] {
|
||||
return create_schema_change(*_func, true);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ namespace statements {
|
||||
class create_function_statement final : public create_function_statement_base {
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(
|
||||
service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
service::storage_proxy& proxy) const override;
|
||||
virtual void create(service::storage_proxy& proxy, functions::function* old) const override;
|
||||
sstring _language;
|
||||
sstring _body;
|
||||
|
||||
@@ -271,7 +271,7 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::event::schema_change>>
|
||||
create_index_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const {
|
||||
create_index_statement::announce_migration(service::storage_proxy& proxy) const {
|
||||
auto& db = proxy.get_db().local();
|
||||
auto schema = db.find_schema(keyspace(), column_family());
|
||||
std::vector<::shared_ptr<index_target>> targets;
|
||||
@@ -310,7 +310,7 @@ create_index_statement::announce_migration(service::storage_proxy& proxy, bool i
|
||||
schema_builder builder{schema};
|
||||
builder.with_index(index);
|
||||
return service::get_local_migration_manager().announce_column_family_update(
|
||||
builder.build(), false, {}, is_local_only).then([this]() {
|
||||
builder.build(), false, {}).then([this]() {
|
||||
using namespace cql_transport;
|
||||
return ::make_shared<event::schema_change>(
|
||||
event::schema_change::change_type::UPDATED,
|
||||
|
||||
@@ -79,7 +79,7 @@ public:
|
||||
|
||||
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy&, bool is_local_only) const override;
|
||||
future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy&) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
private:
|
||||
|
||||
@@ -106,11 +106,11 @@ void create_keyspace_statement::validate(service::storage_proxy&, const service:
|
||||
#endif
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_keyspace_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_keyspace_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
return make_ready_future<>().then([this, p = proxy.shared_from_this(), is_local_only] {
|
||||
return make_ready_future<>().then([this, p = proxy.shared_from_this()] {
|
||||
const auto& tm = *p->get_token_metadata_ptr();
|
||||
return service::get_local_migration_manager().announce_new_keyspace(_attrs->as_ks_metadata(_name, tm), is_local_only);
|
||||
return service::get_local_migration_manager().announce_new_keyspace(_attrs->as_ks_metadata(_name, tm));
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -84,7 +84,7 @@ public:
|
||||
*/
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
|
||||
@@ -97,10 +97,10 @@ std::vector<column_definition> create_table_statement::get_columns() const
|
||||
return column_defs;
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_table_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const {
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_table_statement::announce_migration(service::storage_proxy& proxy) const {
|
||||
auto schema = get_cf_meta_data(proxy.get_db().local());
|
||||
return make_ready_future<>().then([this, is_local_only, schema = std::move(schema)] {
|
||||
return service::get_local_migration_manager().announce_new_column_family(std::move(schema), is_local_only);
|
||||
return make_ready_future<>().then([this, schema = std::move(schema)] {
|
||||
return service::get_local_migration_manager().announce_new_column_family(std::move(schema));
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -102,7 +102,7 @@ public:
|
||||
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ inline user_type create_type_statement::create_type(database& db) const
|
||||
std::move(field_names), std::move(field_types), true /* multi cell */);
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_type_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_type_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
auto&& db = proxy.get_db().local();
|
||||
|
||||
@@ -152,7 +152,7 @@ future<shared_ptr<cql_transport::event::schema_change>> create_type_statement::a
|
||||
|
||||
auto type = create_type(db);
|
||||
check_for_duplicate_names(type);
|
||||
return service::get_local_migration_manager().announce_new_type(type, is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_new_type(type).then([this] {
|
||||
using namespace cql_transport;
|
||||
|
||||
return ::make_shared<event::schema_change>(
|
||||
|
||||
@@ -65,7 +65,7 @@ public:
|
||||
|
||||
virtual const sstring& keyspace() const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ static bool validate_primary_key(
|
||||
return new_non_pk_column;
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const {
|
||||
future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::announce_migration(service::storage_proxy& proxy) const {
|
||||
// We need to make sure that:
|
||||
// - primary key includes all columns in base table's primary key
|
||||
// - make sure that the select statement does not have anything other than columns
|
||||
@@ -350,8 +350,8 @@ future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::a
|
||||
auto where_clause_text = util::relations_to_where_clause(_where_clause);
|
||||
builder.with_view_info(schema->id(), schema->cf_name(), included.empty(), std::move(where_clause_text));
|
||||
|
||||
return make_ready_future<>().then([definition = view_ptr(builder.build()), is_local_only]() mutable {
|
||||
return service::get_local_migration_manager().announce_new_view(definition, is_local_only);
|
||||
return make_ready_future<>().then([definition = view_ptr(builder.build())]() mutable {
|
||||
return service::get_local_migration_manager().announce_new_view(definition);
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -68,7 +68,7 @@ public:
|
||||
// Functions we need to override to subclass schema_altering_statement
|
||||
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
|
||||
// FIXME: continue here. See create_table_statement.hh and CreateViewStatement.java
|
||||
|
||||
@@ -33,7 +33,7 @@ std::unique_ptr<prepared_statement> drop_function_statement::prepare(database& d
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_function_statement::announce_migration(
|
||||
service::storage_proxy& proxy, bool is_local_only) const {
|
||||
service::storage_proxy& proxy) const {
|
||||
if (!_func) {
|
||||
return make_ready_future<shared_ptr<cql_transport::event::schema_change>>();
|
||||
}
|
||||
@@ -41,7 +41,7 @@ future<shared_ptr<cql_transport::event::schema_change>> drop_function_statement:
|
||||
if (!user_func) {
|
||||
throw exceptions::invalid_request_exception(format("'{}' is not a user defined function", _func));
|
||||
}
|
||||
return service::get_local_migration_manager().announce_function_drop(user_func, is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_function_drop(user_func).then([this] {
|
||||
return create_schema_change(*_func, false);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ namespace statements {
|
||||
class drop_function_statement final : public drop_function_statement_base {
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(
|
||||
service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
service::storage_proxy& proxy) const override;
|
||||
|
||||
public:
|
||||
drop_function_statement(functions::function_name name, std::vector<shared_ptr<cql3_type::raw>> arg_types,
|
||||
|
||||
@@ -86,7 +86,7 @@ void drop_index_statement::validate(service::storage_proxy& proxy, const service
|
||||
}
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
auto cfm = lookup_indexed_table(proxy);
|
||||
if (!cfm) {
|
||||
@@ -95,7 +95,7 @@ future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::an
|
||||
++_cql_stats->secondary_index_drops;
|
||||
auto builder = schema_builder(cfm);
|
||||
builder.without_index(_index_name);
|
||||
return service::get_local_migration_manager().announce_column_family_update(builder.build(), false, {}, is_local_only).then([cfm] {
|
||||
return service::get_local_migration_manager().announce_column_family_update(builder.build(), false, {}).then([cfm] {
|
||||
// Dropping an index is akin to updating the CF
|
||||
// Note that we shouldn't call columnFamily() at this point because the index has been dropped and the call to lookupIndexedTable()
|
||||
// in that method would now throw.
|
||||
|
||||
@@ -72,7 +72,7 @@ public:
|
||||
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
private:
|
||||
|
||||
@@ -74,10 +74,10 @@ const sstring& drop_keyspace_statement::keyspace() const
|
||||
return _keyspace;
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_keyspace_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_keyspace_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
return make_ready_future<>().then([this, is_local_only] {
|
||||
return service::get_local_migration_manager().announce_keyspace_drop(_keyspace, is_local_only);
|
||||
return make_ready_future<>().then([this] {
|
||||
return service::get_local_migration_manager().announce_keyspace_drop(_keyspace);
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
|
||||
virtual const sstring& keyspace() const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
@@ -72,10 +72,10 @@ void drop_table_statement::validate(service::storage_proxy&, const service::clie
|
||||
// validated in announce_migration()
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_table_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_table_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
return make_ready_future<>().then([this, is_local_only] {
|
||||
return service::get_local_migration_manager().announce_column_family_drop(keyspace(), column_family(), is_local_only);
|
||||
return make_ready_future<>().then([this] {
|
||||
return service::get_local_migration_manager().announce_column_family_drop(keyspace(), column_family());
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -58,7 +58,7 @@ public:
|
||||
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
@@ -142,7 +142,7 @@ const sstring& drop_type_statement::keyspace() const
|
||||
return _name.get_keyspace();
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_type_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_type_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
auto&& db = proxy.get_db().local();
|
||||
|
||||
@@ -157,7 +157,7 @@ future<shared_ptr<cql_transport::event::schema_change>> drop_type_statement::ann
|
||||
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>();
|
||||
}
|
||||
|
||||
return service::get_local_migration_manager().announce_type_drop(to_drop->second, is_local_only).then([this] {
|
||||
return service::get_local_migration_manager().announce_type_drop(to_drop->second).then([this] {
|
||||
using namespace cql_transport;
|
||||
|
||||
return ::make_shared<event::schema_change>(
|
||||
|
||||
@@ -61,7 +61,7 @@ public:
|
||||
|
||||
virtual const sstring& keyspace() const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
@@ -74,10 +74,10 @@ void drop_view_statement::validate(service::storage_proxy&, const service::clien
|
||||
// validated in migration_manager::announce_view_drop()
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_view_statement::announce_migration(service::storage_proxy& proxy, bool is_local_only) const
|
||||
future<shared_ptr<cql_transport::event::schema_change>> drop_view_statement::announce_migration(service::storage_proxy& proxy) const
|
||||
{
|
||||
return make_ready_future<>().then([this, is_local_only] {
|
||||
return service::get_local_migration_manager().announce_view_drop(keyspace(), column_family(), is_local_only);
|
||||
return make_ready_future<>().then([this] {
|
||||
return service::get_local_migration_manager().announce_view_drop(keyspace(), column_family());
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -63,7 +63,7 @@ public:
|
||||
|
||||
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
|
||||
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const override;
|
||||
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const override;
|
||||
|
||||
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
|
||||
};
|
||||
|
||||
@@ -90,10 +90,10 @@ void schema_altering_statement::prepare_keyspace(const service::client_state& st
|
||||
}
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
schema_altering_statement::execute0(service::storage_proxy& proxy, service::query_state& state, const query_options& options, bool is_local_only) const {
|
||||
schema_altering_statement::execute0(service::storage_proxy& proxy, service::query_state& state, const query_options& options) const {
|
||||
// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
|
||||
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
|
||||
return announce_migration(proxy, is_local_only).then([this] (auto ce) {
|
||||
return announce_migration(proxy).then([this] (auto ce) {
|
||||
::shared_ptr<messages::result_message> result;
|
||||
if (!ce) {
|
||||
result = ::make_shared<messages::result_message::void_message>();
|
||||
@@ -120,7 +120,7 @@ schema_altering_statement::execute(service::storage_proxy& proxy, service::query
|
||||
}
|
||||
}
|
||||
|
||||
return execute0(proxy, state, options, internal).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
return execute0(proxy, state, options).then([this, &state, internal](::shared_ptr<messages::result_message> result) {
|
||||
auto permissions_granted_fut = internal
|
||||
? make_ready_future<>()
|
||||
: grant_permissions_to_creator(state.get_client_state());
|
||||
|
||||
@@ -65,7 +65,7 @@ private:
|
||||
const bool _is_column_family_level;
|
||||
|
||||
future<::shared_ptr<messages::result_message>>
|
||||
execute0(service::storage_proxy& proxy, service::query_state& state, const query_options& options, bool) const;
|
||||
execute0(service::storage_proxy& proxy, service::query_state& state, const query_options& options) const;
|
||||
protected:
|
||||
explicit schema_altering_statement(timeout_config_selector timeout_selector = &timeout_config::other_timeout);
|
||||
|
||||
@@ -87,7 +87,7 @@ protected:
|
||||
|
||||
virtual void prepare_keyspace(const service::client_state& state) override;
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy, bool is_local_only) const = 0;
|
||||
virtual future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(service::storage_proxy& proxy) const = 0;
|
||||
|
||||
virtual future<::shared_ptr<messages::result_message>>
|
||||
execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) const override;
|
||||
|
||||
@@ -3060,7 +3060,7 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage
|
||||
if (!base_schema->columns_by_name().contains(first_view_ck.name())) {
|
||||
schema_builder builder{schema_ptr(v)};
|
||||
builder.mark_column_computed(first_view_ck.name(), std::make_unique<legacy_token_column_computation>());
|
||||
return mm.announce_view_update(view_ptr(builder.build()), false);
|
||||
return mm.announce_view_update(view_ptr(builder.build()));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -139,12 +139,12 @@ future<> system_distributed_keyspace::start() {
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
{{"replication_factor", "3"}},
|
||||
true);
|
||||
return _mm.announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
return _mm.announce_new_keyspace(ksm, api::min_timestamp);
|
||||
}).then([this] {
|
||||
return do_with(all_tables(), [this] (std::vector<schema_ptr>& tables) {
|
||||
return do_for_each(tables, [this] (schema_ptr table) {
|
||||
return ignore_existing([this, table = std::move(table)] {
|
||||
return _mm.announce_new_column_family(std::move(table), api::min_timestamp, false);
|
||||
return _mm.announce_new_column_family(std::move(table), api::min_timestamp);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -168,7 +168,7 @@ future<> create_keyspace_if_not_exists_impl(db::config& config, int default_repl
|
||||
attrs->add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
|
||||
attrs->validate();
|
||||
const auto& tm = *proxy.get_token_metadata_ptr();
|
||||
return service::get_local_migration_manager().announce_new_keyspace(attrs->as_ks_metadata(name, tm), false);
|
||||
return service::get_local_migration_manager().announce_new_keyspace(attrs->as_ks_metadata(name, tm));
|
||||
};
|
||||
auto table_gen = [] (sstring ks_name, sstring cf_name, schema_ptr schema) {
|
||||
auto& proxy = service::get_local_storage_proxy();
|
||||
@@ -176,7 +176,7 @@ future<> create_keyspace_if_not_exists_impl(db::config& config, int default_repl
|
||||
return make_ready_future<>();
|
||||
}
|
||||
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
|
||||
return service::get_local_migration_manager().announce_new_column_family(schema, false);
|
||||
return service::get_local_migration_manager().announce_new_column_family(schema);
|
||||
};
|
||||
// create default databases for redis.
|
||||
return parallel_for_each(boost::irange<unsigned>(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) {
|
||||
|
||||
@@ -582,26 +582,22 @@ public void notifyDropAggregate(UDAggregate udf)
|
||||
}
|
||||
#endif
|
||||
|
||||
future<> migration_manager::announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally) {
|
||||
return announce_keyspace_update(ksm, api::new_timestamp(), announce_locally);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp, bool announce_locally) {
|
||||
future<> migration_manager::announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm) {
|
||||
auto& proxy = get_local_storage_proxy();
|
||||
auto& db = proxy.get_db().local();
|
||||
|
||||
db.validate_keyspace_update(*ksm);
|
||||
mlogger.info("Update Keyspace: {}", ksm);
|
||||
auto mutations = db::schema_tables::make_create_keyspace_mutations(ksm, timestamp);
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
auto mutations = db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp());
|
||||
return announce(std::move(mutations));
|
||||
}
|
||||
|
||||
future<>migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally)
|
||||
future<>migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm)
|
||||
{
|
||||
return announce_new_keyspace(ksm, api::new_timestamp(), announce_locally);
|
||||
return announce_new_keyspace(ksm, api::new_timestamp());
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp, bool announce_locally)
|
||||
future<> migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp)
|
||||
{
|
||||
auto& proxy = get_local_storage_proxy();
|
||||
auto& db = proxy.get_db().local();
|
||||
@@ -609,25 +605,25 @@ future<> migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadat
|
||||
db.validate_new_keyspace(*ksm);
|
||||
mlogger.info("Create new Keyspace: {}", ksm);
|
||||
auto mutations = db::schema_tables::make_create_keyspace_mutations(ksm, timestamp);
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return announce(std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_column_family(schema_ptr cfm, bool announce_locally)
|
||||
future<> migration_manager::announce_new_column_family(schema_ptr cfm)
|
||||
{
|
||||
return announce_new_column_family(std::move(cfm), api::new_timestamp(), announce_locally);
|
||||
return announce_new_column_family(std::move(cfm), api::new_timestamp());
|
||||
}
|
||||
|
||||
future<> migration_manager::include_keyspace_and_announce(
|
||||
const keyspace_metadata& keyspace, std::vector<mutation> mutations, bool announce_locally) {
|
||||
const keyspace_metadata& keyspace, std::vector<mutation> mutations) {
|
||||
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
|
||||
return db::schema_tables::read_keyspace_mutation(service::get_storage_proxy(), keyspace.name())
|
||||
.then([announce_locally, mutations = std::move(mutations)] (mutation m) mutable {
|
||||
.then([mutations = std::move(mutations)] (mutation m) mutable {
|
||||
mutations.push_back(std::move(m));
|
||||
return migration_manager::announce(std::move(mutations), announce_locally);
|
||||
return migration_manager::announce(std::move(mutations));
|
||||
});
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_column_family(schema_ptr cfm, api::timestamp_type timestamp, bool announce_locally) {
|
||||
future<> migration_manager::announce_new_column_family(schema_ptr cfm, api::timestamp_type timestamp) {
|
||||
#if 0
|
||||
cfm.validate();
|
||||
#endif
|
||||
@@ -648,15 +644,15 @@ future<> migration_manager::announce_new_column_family(schema_ptr cfm, api::time
|
||||
auto mutations = db::schema_tables::make_create_table_mutations(ksm, cfm, timestamp);
|
||||
get_notifier().before_create_column_family(*cfm, mutations, timestamp);
|
||||
return mutations;
|
||||
}).then([announce_locally, ksm](std::vector<mutation> mutations) {
|
||||
return include_keyspace_and_announce(*ksm, std::move(mutations), announce_locally);
|
||||
}).then([ksm](std::vector<mutation> mutations) {
|
||||
return include_keyspace_and_announce(*ksm, std::move(mutations));
|
||||
});
|
||||
} catch (const no_such_keyspace& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
|
||||
}
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates, bool announce_locally) {
|
||||
future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates) {
|
||||
warn(unimplemented::cause::VALIDATION);
|
||||
#if 0
|
||||
cfm.validate();
|
||||
@@ -686,8 +682,8 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
|
||||
|
||||
get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
|
||||
return mutations;
|
||||
}).then([keyspace, announce_locally] (auto&& mutations) {
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
|
||||
}).then([keyspace] (auto&& mutations) {
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations));
|
||||
});
|
||||
} catch (const no_such_column_family& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
|
||||
@@ -695,36 +691,36 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f
|
||||
}
|
||||
}
|
||||
|
||||
future<> migration_manager::do_announce_new_type(user_type new_type, bool announce_locally) {
|
||||
future<> migration_manager::do_announce_new_type(user_type new_type) {
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
|
||||
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_type(user_type new_type, bool announce_locally) {
|
||||
future<> migration_manager::announce_new_type(user_type new_type) {
|
||||
mlogger.info("Create new User Type: {}", new_type->get_name_as_string());
|
||||
return do_announce_new_type(new_type, announce_locally);
|
||||
return do_announce_new_type(new_type);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_type_update(user_type updated_type, bool announce_locally) {
|
||||
future<> migration_manager::announce_type_update(user_type updated_type) {
|
||||
mlogger.info("Update User Type: {}", updated_type->get_name_as_string());
|
||||
return do_announce_new_type(updated_type, announce_locally);
|
||||
return do_announce_new_type(updated_type);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_function(shared_ptr<cql3::functions::user_function> func, bool announce_locally) {
|
||||
future<> migration_manager::announce_new_function(shared_ptr<cql3::functions::user_function> func) {
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(func->name().keyspace);
|
||||
auto mutations = db::schema_tables::make_create_function_mutations(func, api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_function_drop(
|
||||
shared_ptr<cql3::functions::user_function> func, bool announce_locally) {
|
||||
shared_ptr<cql3::functions::user_function> func) {
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(func->name().keyspace);
|
||||
auto mutations = db::schema_tables::make_drop_function_mutations(func, api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations));
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -773,7 +769,7 @@ public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift
|
||||
}
|
||||
#endif
|
||||
|
||||
future<> migration_manager::announce_keyspace_drop(const sstring& ks_name, bool announce_locally)
|
||||
future<> migration_manager::announce_keyspace_drop(const sstring& ks_name)
|
||||
{
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
if (!db.has_keyspace(ks_name)) {
|
||||
@@ -782,12 +778,11 @@ future<> migration_manager::announce_keyspace_drop(const sstring& ks_name, bool
|
||||
auto& keyspace = db.find_keyspace(ks_name);
|
||||
mlogger.info("Drop Keyspace '{}'", ks_name);
|
||||
auto&& mutations = db::schema_tables::make_drop_keyspace_mutations(keyspace.metadata(), api::new_timestamp());
|
||||
return announce(std::move(mutations), announce_locally);
|
||||
return announce(std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_column_family_drop(const sstring& ks_name,
|
||||
const sstring& cf_name,
|
||||
bool announce_locally,
|
||||
drop_views drop_views)
|
||||
{
|
||||
try {
|
||||
@@ -831,8 +826,8 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name,
|
||||
|
||||
get_notifier().before_drop_column_family(*schema, mutations, ts);
|
||||
return mutations;
|
||||
}).then([this, keyspace, announce_locally](std::vector<mutation> mutations) {
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
|
||||
}).then([this, keyspace](std::vector<mutation> mutations) {
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations));
|
||||
});
|
||||
} catch (const no_such_column_family& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name));
|
||||
@@ -840,17 +835,17 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name,
|
||||
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_type_drop(user_type dropped_type, bool announce_locally)
|
||||
future<> migration_manager::announce_type_drop(user_type dropped_type)
|
||||
{
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
|
||||
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
|
||||
auto mutations =
|
||||
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace.metadata(), std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_new_view(view_ptr view, bool announce_locally)
|
||||
future<> migration_manager::announce_new_view(view_ptr view)
|
||||
{
|
||||
#if 0
|
||||
view.metadata.validate();
|
||||
@@ -863,13 +858,13 @@ future<> migration_manager::announce_new_view(view_ptr view, bool announce_local
|
||||
}
|
||||
mlogger.info("Create new view: {}", view);
|
||||
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations));
|
||||
} catch (const no_such_keyspace& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.", view->cf_name(), view->ks_name()));
|
||||
}
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_view_update(view_ptr view, bool announce_locally)
|
||||
future<> migration_manager::announce_view_update(view_ptr view)
|
||||
{
|
||||
#if 0
|
||||
view.metadata.validate();
|
||||
@@ -886,7 +881,7 @@ future<> migration_manager::announce_view_update(view_ptr view, bool announce_lo
|
||||
#endif
|
||||
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
|
||||
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), api::new_timestamp(), true);
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations));
|
||||
} catch (const std::out_of_range& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
|
||||
view->cf_name(), view->ks_name()));
|
||||
@@ -894,8 +889,7 @@ future<> migration_manager::announce_view_update(view_ptr view, bool announce_lo
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_view_drop(const sstring& ks_name,
|
||||
const sstring& cf_name,
|
||||
bool announce_locally)
|
||||
const sstring& cf_name)
|
||||
{
|
||||
auto& db = get_local_storage_proxy().get_db().local();
|
||||
try {
|
||||
@@ -909,7 +903,7 @@ future<> migration_manager::announce_view_drop(const sstring& ks_name,
|
||||
auto keyspace = db.find_keyspace(ks_name).metadata();
|
||||
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
|
||||
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), api::new_timestamp());
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations), announce_locally);
|
||||
return include_keyspace_and_announce(*keyspace, std::move(mutations));
|
||||
} catch (const no_such_column_family& e) {
|
||||
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
|
||||
cf_name, ks_name));
|
||||
@@ -925,15 +919,6 @@ public static void announceAggregateDrop(UDAggregate udf, boolean announceLocall
|
||||
}
|
||||
#endif
|
||||
|
||||
future<> migration_manager::announce(std::vector<mutation> mutations, bool announce_locally)
|
||||
{
|
||||
if (announce_locally) {
|
||||
return db::schema_tables::merge_schema(get_storage_proxy(), std::move(mutations), false);
|
||||
} else {
|
||||
return announce(std::move(mutations));
|
||||
}
|
||||
}
|
||||
|
||||
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
|
||||
{
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
|
||||
@@ -114,48 +114,44 @@ public:
|
||||
bool should_pull_schema_from(const gms::inet_address& endpoint);
|
||||
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
|
||||
|
||||
future<> announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally = false);
|
||||
future<> announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm);
|
||||
|
||||
future<> announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp, bool announce_locally);
|
||||
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm);
|
||||
|
||||
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally = false);
|
||||
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp);
|
||||
|
||||
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp, bool announce_locally);
|
||||
future<> announce_column_family_update(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates);
|
||||
|
||||
future<> announce_column_family_update(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates, bool announce_locally = false);
|
||||
future<> announce_new_column_family(schema_ptr cfm);
|
||||
|
||||
future<> announce_new_column_family(schema_ptr cfm, bool announce_locally = false);
|
||||
future<> announce_new_column_family(schema_ptr cfm, api::timestamp_type timestamp);
|
||||
|
||||
future<> announce_new_column_family(schema_ptr cfm, api::timestamp_type timestamp, bool announce_locally = false);
|
||||
future<> announce_new_type(user_type new_type);
|
||||
|
||||
future<> announce_new_type(user_type new_type, bool announce_locally = false);
|
||||
future<> announce_new_function(shared_ptr<cql3::functions::user_function> func);
|
||||
|
||||
future<> announce_new_function(shared_ptr<cql3::functions::user_function> func, bool announce_locally);
|
||||
future<> announce_function_drop(shared_ptr<cql3::functions::user_function> func);
|
||||
|
||||
future<> announce_function_drop(shared_ptr<cql3::functions::user_function> func, bool announce_locally);
|
||||
future<> announce_type_update(user_type updated_type);
|
||||
|
||||
future<> announce_type_update(user_type updated_type, bool announce_locally = false);
|
||||
|
||||
future<> announce_keyspace_drop(const sstring& ks_name, bool announce_locally = false);
|
||||
future<> announce_keyspace_drop(const sstring& ks_name);
|
||||
|
||||
class drop_views_tag;
|
||||
using drop_views = bool_class<drop_views_tag>;
|
||||
future<> announce_column_family_drop(const sstring& ks_name, const sstring& cf_name, bool announce_locally = false, drop_views drop_views = drop_views::no);
|
||||
future<> announce_column_family_drop(const sstring& ks_name, const sstring& cf_name, drop_views drop_views = drop_views::no);
|
||||
|
||||
future<> announce_type_drop(user_type dropped_type, bool announce_locally = false);
|
||||
future<> announce_type_drop(user_type dropped_type);
|
||||
|
||||
future<> announce_new_view(view_ptr view, bool announce_locally = false);
|
||||
future<> announce_new_view(view_ptr view);
|
||||
|
||||
future<> announce_view_update(view_ptr view, bool announce_locally = false);
|
||||
future<> announce_view_update(view_ptr view);
|
||||
|
||||
future<> announce_view_drop(const sstring& ks_name, const sstring& cf_name, bool announce_locally = false);
|
||||
future<> announce_view_drop(const sstring& ks_name, const sstring& cf_name);
|
||||
|
||||
/**
|
||||
* actively announce a new version to active hosts via rpc
|
||||
* @param schema The schema mutation to be applied
|
||||
*/
|
||||
static future<> announce(std::vector<mutation> mutations, bool announce_locally);
|
||||
|
||||
// Returns a future on the local application of the schema
|
||||
static future<> announce(std::vector<mutation> schema);
|
||||
|
||||
@@ -173,9 +169,9 @@ private:
|
||||
future<> uninit_messaging_service();
|
||||
|
||||
static future<> include_keyspace_and_announce(
|
||||
const keyspace_metadata& keyspace, std::vector<mutation> mutations, bool announce_locally);
|
||||
const keyspace_metadata& keyspace, std::vector<mutation> mutations);
|
||||
|
||||
static future<> do_announce_new_type(user_type new_type, bool announce_locally);
|
||||
static future<> do_announce_new_type(user_type new_type);
|
||||
|
||||
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
|
||||
};
|
||||
|
||||
@@ -54,7 +54,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp) const {
|
||||
// "CREATE TABLE" invocation on different Nodes.
|
||||
// The important thing is that it will converge eventually (some traces may
|
||||
// be lost in a process but that's ok).
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});;
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build()).discard_result().handle_exception([this] (auto ep) {});;
|
||||
}
|
||||
|
||||
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) {
|
||||
@@ -131,7 +131,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, const sstring&
|
||||
opts["replication_factor"] = replication_factor;
|
||||
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
|
||||
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
|
||||
service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false).get();
|
||||
service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp).get();
|
||||
}
|
||||
|
||||
qs.get_client_state().set_keyspace(db, keyspace_name);
|
||||
|
||||
@@ -134,11 +134,11 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
|
||||
run_mutation_source_tests([&] (schema_ptr s, const std::vector<mutation>& partitions) -> mutation_source {
|
||||
try {
|
||||
e.local_db().find_column_family(s->ks_name(), s->cf_name());
|
||||
service::get_local_migration_manager().announce_column_family_drop(s->ks_name(), s->cf_name(), true).get();
|
||||
service::get_local_migration_manager().announce_column_family_drop(s->ks_name(), s->cf_name()).get();
|
||||
} catch (const no_such_column_family&) {
|
||||
// expected
|
||||
}
|
||||
service::get_local_migration_manager().announce_new_column_family(s, true).get();
|
||||
service::get_local_migration_manager().announce_new_column_family(s).get();
|
||||
column_family& cf = e.local_db().find_column_family(s);
|
||||
for (auto&& m : partitions) {
|
||||
e.local_db().apply(cf.schema(), freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get();
|
||||
|
||||
@@ -53,7 +53,7 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
|
||||
|
||||
auto old_schema = partial.build();
|
||||
|
||||
service::get_local_migration_manager().announce_new_column_family(old_schema, false).get();
|
||||
service::get_local_migration_manager().announce_new_column_family(old_schema).get();
|
||||
|
||||
auto old_table_version = e.db().local().find_schema(old_schema->id())->version();
|
||||
auto old_node_version = e.db().local().get_version();
|
||||
@@ -80,7 +80,7 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) {
|
||||
|
||||
auto old_schema = builder.build();
|
||||
|
||||
service::get_local_migration_manager().announce_new_column_family(old_schema, false).get();
|
||||
service::get_local_migration_manager().announce_new_column_family(old_schema).get();
|
||||
|
||||
auto s = e.local_db().find_schema(old_schema->id());
|
||||
BOOST_REQUIRE_EQUAL(*old_schema, *s);
|
||||
@@ -111,7 +111,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) {
|
||||
.with_column("v1", bytes_type)
|
||||
.build();
|
||||
|
||||
service::get_local_migration_manager().announce_new_column_family(table_schema, false).get();
|
||||
service::get_local_migration_manager().announce_new_column_family(table_schema).get();
|
||||
|
||||
auto old_table_version = e.db().local().find_schema(table_schema->id())->version();
|
||||
auto old_node_version = e.db().local().get_version();
|
||||
@@ -123,7 +123,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) {
|
||||
mutation m(s, pkey);
|
||||
auto ckey = clustering_key::from_exploded(*s, {utf8_type->decompose(table_schema->cf_name()), "v1"});
|
||||
m.partition().apply_delete(*s, ckey, tombstone(api::min_timestamp, gc_clock::now()));
|
||||
service::get_local_migration_manager().announce(std::vector<mutation>({m}), true).get();
|
||||
service::get_local_migration_manager().announce(std::vector<mutation>({m})).get();
|
||||
}
|
||||
|
||||
auto new_table_version = e.db().local().find_schema(table_schema->id())->version();
|
||||
@@ -159,7 +159,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) {
|
||||
.with_column("v2", bytes_type)
|
||||
.build();
|
||||
|
||||
mm.announce_new_column_family(s1, false).get();
|
||||
mm.announce_new_column_family(s1).get();
|
||||
auto old_version = e.db().local().find_schema(s1->id())->version();
|
||||
|
||||
// Apply s0 -> s2 change.
|
||||
@@ -167,7 +167,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) {
|
||||
auto&& keyspace = e.db().local().find_keyspace(s0->ks_name()).metadata();
|
||||
auto muts = db::schema_tables::make_update_table_mutations(e.db().local(), keyspace, s0, s2,
|
||||
api::new_timestamp(), false);
|
||||
mm.announce(std::move(muts), true).get();
|
||||
mm.announce(std::move(muts)).get();
|
||||
}
|
||||
|
||||
auto new_schema = e.db().local().find_schema(s1->id());
|
||||
@@ -200,7 +200,7 @@ SEASTAR_TEST_CASE(test_sort_type_in_update) {
|
||||
auto muts = muts2;
|
||||
muts.insert(muts.end(), muts1.begin(), muts1.end());
|
||||
muts.insert(muts.end(), muts3.begin(), muts3.end());
|
||||
mm.announce(std::move(muts), false).get();
|
||||
mm.announce(std::move(muts)).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -302,7 +302,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
|
||||
.with_column("v1", bytes_type)
|
||||
.build();
|
||||
|
||||
mm.announce_new_column_family(s1, false).get();
|
||||
mm.announce_new_column_family(s1).get();
|
||||
|
||||
auto&& keyspace = e.db().local().find_keyspace(s1->ks_name()).metadata();
|
||||
|
||||
@@ -315,7 +315,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
|
||||
{
|
||||
auto muts = db::schema_tables::make_update_table_mutations(e.db().local(), keyspace, s1, s2,
|
||||
api::new_timestamp(), false);
|
||||
mm.announce(std::move(muts), true).get();
|
||||
mm.announce(std::move(muts)).get();
|
||||
}
|
||||
|
||||
// Add a new v1 and drop it
|
||||
@@ -332,7 +332,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
|
||||
|
||||
auto muts = db::schema_tables::make_update_table_mutations(e.db().local(), keyspace, s3, s4,
|
||||
api::new_timestamp(), false);
|
||||
mm.announce(std::move(muts), true).get();
|
||||
mm.announce(std::move(muts)).get();
|
||||
}
|
||||
|
||||
auto new_schema = e.db().local().find_schema(s1->id());
|
||||
@@ -460,7 +460,7 @@ SEASTAR_TEST_CASE(test_nested_type_mutation_in_update) {
|
||||
|
||||
auto muts = muts1;
|
||||
muts.insert(muts.end(), muts2.begin(), muts2.end());
|
||||
mm.announce(std::move(muts), false).get();
|
||||
mm.announce(std::move(muts)).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(listener.create_user_type_count, 2);
|
||||
BOOST_REQUIRE_EQUAL(listener.update_user_type_count, 2);
|
||||
|
||||
@@ -237,7 +237,7 @@ public:
|
||||
schema_builder builder(make_lw_shared<schema>(schema_maker(ks_name)));
|
||||
builder.set_uuid(id);
|
||||
auto s = builder.build(schema_builder::compact_storage::no);
|
||||
return service::get_local_migration_manager().announce_new_column_family(s, true);
|
||||
return service::get_local_migration_manager().announce_new_column_family(s);
|
||||
}
|
||||
|
||||
virtual future<> require_keyspace_exists(const sstring& ks_name) override {
|
||||
|
||||
@@ -837,7 +837,7 @@ public:
|
||||
|
||||
auto s = schema_from_thrift(cf_def, cf_def.keyspace);
|
||||
return _query_state.get_client_state().has_keyspace_access(cf_def.keyspace, auth::permission::CREATE).then([this, s = std::move(s)] {
|
||||
return service::get_local_migration_manager().announce_new_column_family(std::move(s), false).then([this] {
|
||||
return service::get_local_migration_manager().announce_new_column_family(std::move(s)).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
@@ -853,7 +853,7 @@ public:
|
||||
if (!cf.views().empty()) {
|
||||
throw make_exception<InvalidRequestException>("Cannot drop table with Materialized Views %s", column_family);
|
||||
}
|
||||
return service::get_local_migration_manager().announce_column_family_drop(current_keyspace(), column_family, false).then([this] {
|
||||
return service::get_local_migration_manager().announce_column_family_drop(current_keyspace(), column_family).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
@@ -864,7 +864,7 @@ public:
|
||||
with_cob(std::move(cob), std::move(exn_cob), [&] {
|
||||
auto ksm = keyspace_from_thrift(ks_def);
|
||||
return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] {
|
||||
return service::get_local_migration_manager().announce_new_keyspace(std::move(ksm), false).then([this] {
|
||||
return service::get_local_migration_manager().announce_new_keyspace(std::move(ksm)).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
@@ -879,7 +879,7 @@ public:
|
||||
}
|
||||
|
||||
return _query_state.get_client_state().has_keyspace_access(keyspace, auth::permission::DROP).then([this, keyspace] {
|
||||
return service::get_local_migration_manager().announce_keyspace_drop(keyspace, false).then([this] {
|
||||
return service::get_local_migration_manager().announce_keyspace_drop(keyspace).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
@@ -899,7 +899,7 @@ public:
|
||||
|
||||
auto ksm = keyspace_from_thrift(ks_def);
|
||||
return _query_state.get_client_state().has_keyspace_access(ks_def.name, auth::permission::ALTER).then([this, ksm = std::move(ksm)] {
|
||||
return service::get_local_migration_manager().announce_keyspace_update(std::move(ksm), false).then([this] {
|
||||
return service::get_local_migration_manager().announce_keyspace_update(std::move(ksm)).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
@@ -930,7 +930,7 @@ public:
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::ALTER).then([this, s = std::move(s)] {
|
||||
return service::get_local_migration_manager().announce_column_family_update(std::move(s), true, {}, false).then([this] {
|
||||
return service::get_local_migration_manager().announce_column_family_update(std::move(s), true, {}).then([this] {
|
||||
return std::string(_db.local().get_version().to_sstring());
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user