migration_manager: Eliminate storage service from passive announcing

Currently storage service acts as a glue between database schema value
and the migration manager "passive_announce" call. This interposing is
not required, migration manager can do all the management itself, and
the linkage can be done in main.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-11-18 14:11:13 +03:00
committed by Avi Kivity
parent a751a1117a
commit e4f35e2139
5 changed files with 24 additions and 27 deletions

View File

@@ -1185,6 +1185,9 @@ int main(int ac, char** av) {
return ss.local().init_server(qp.local());
}).get();
auto schema_change_announce = db.local().observable_schema_version().observe([&mm] (utils::UUID schema_version) mutable {
mm.local().passive_announce(std::move(schema_version));
});
gossiper.local().wait_for_gossip_to_settle().get();
sst_format_selector.sync();

View File

@@ -70,6 +70,7 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, netw::me
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, gms::gossiper& gossiper) :
_notifier(notifier), _feat(feat), _messaging(ms), _gossiper(gossiper)
, _schema_push([this] { return passive_announce(); })
{
}
@@ -77,6 +78,11 @@ future<> migration_manager::stop() {
if (!_as.abort_requested()) {
co_await drain();
}
try {
co_await _schema_push.join();
} catch (...) {
mlogger.error("schema_push failed: {}", std::current_exception());
}
}
future<> migration_manager::drain()
@@ -980,11 +986,15 @@ future<> migration_manager::announce(std::vector<mutation> schema) {
*
* @param version The schema version to announce
*/
future<> migration_manager::passive_announce(utils::UUID version) {
return _gossiper.container().invoke_on(0, [version] (auto&& gossiper) {
mlogger.debug("Gossiping my schema version {}", version);
return gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(version));
});
void migration_manager::passive_announce(utils::UUID version) {
_schema_version_to_publish = std::move(version);
(void)_schema_push.trigger();
}
future<> migration_manager::passive_announce() {
assert(this_shard_id() == 0);
mlogger.debug("Gossiping my schema version {}", _schema_version_to_publish);
return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish));
}
#if 0

View File

@@ -88,6 +88,8 @@ private:
netw::messaging_service& _messaging;
gms::gossiper& _gossiper;
seastar::abort_source _as;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;
public:
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, gms::gossiper& gossiper);
@@ -169,7 +171,7 @@ public:
// Returns a future on the local application of the schema
future<> announce(std::vector<mutation> schema);
future<> passive_announce(utils::UUID version);
void passive_announce(utils::UUID version);
future<> drain();
future<> stop();
@@ -190,6 +192,8 @@ private:
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
future<> passive_announce();
void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint);

View File

@@ -138,7 +138,7 @@ storage_service::storage_service(abort_source& abort_source,
, _batchlog_manager(bm)
, _sys_dist_ks(sys_dist_ks)
, _snitch_reconfigure([this] { return snitch_reconfigured(); })
, _schema_version_publisher([this] { return publish_schema_version(); }) {
{
register_metrics();
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
@@ -221,16 +221,6 @@ bool storage_service::should_bootstrap() {
return !db::system_keyspace::bootstrap_complete() && !is_first_node();
}
void storage_service::install_schema_version_change_listener() {
_listeners.emplace_back(make_lw_shared(_db.local().observable_schema_version().observe([this] (utils::UUID schema_version) {
(void)_schema_version_publisher.trigger();
})));
}
future<> storage_service::publish_schema_version() {
return _migration_manager.local().passive_announce(_db.local().get_version());
}
future<> storage_service::snitch_reconfigured() {
return update_topology(utils::fb_utilities::get_broadcast_address());
}
@@ -359,8 +349,6 @@ void storage_service::prepare_to_join(
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
_gossiper.start_gossiping(generation_number, app_states, advertise).get();
install_schema_version_change_listener();
}
void storage_service::maybe_start_sys_dist_ks() {
@@ -1473,11 +1461,6 @@ future<> storage_service::stop() {
} catch (...) {
slogger.error("failed to stop Raft Group 0: {}", std::current_exception());
}
try {
co_await _schema_version_publisher.join();
} catch (...) {
slogger.error("schema_version_publisher failed: {}", std::current_exception());
}
co_await std::move(_node_ops_abort_thread);
}

View File

@@ -249,8 +249,6 @@ private:
void register_metrics();
future<> snitch_reconfigured();
future<> update_topology(inet_address endpoint);
future<> publish_schema_version();
void install_schema_version_change_listener();
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
return get_token_metadata_ptr()->clone_async().then([] (token_metadata tm) {
@@ -572,7 +570,6 @@ private:
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
locator::snitch_signal_slot_t _snitch_reconfigure;
serialized_action _schema_version_publisher;
std::unordered_set<gms::inet_address> _replacing_nodes_pending_ranges_updater;
private:
/**