diff --git a/main.cc b/main.cc index c19be7d635..2ecbb1feae 100644 --- a/main.cc +++ b/main.cc @@ -1185,6 +1185,9 @@ int main(int ac, char** av) { return ss.local().init_server(qp.local()); }).get(); + auto schema_change_announce = db.local().observable_schema_version().observe([&mm] (utils::UUID schema_version) mutable { + mm.local().passive_announce(std::move(schema_version)); + }); gossiper.local().wait_for_gossip_to_settle().get(); sst_format_selector.sync(); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index ce8a035b73..752804f2a2 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -70,6 +70,7 @@ static future get_schema_definition(table_schema_version v, netw::me migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, gms::gossiper& gossiper) : _notifier(notifier), _feat(feat), _messaging(ms), _gossiper(gossiper) + , _schema_push([this] { return passive_announce(); }) { } @@ -77,6 +78,11 @@ future<> migration_manager::stop() { if (!_as.abort_requested()) { co_await drain(); } + try { + co_await _schema_push.join(); + } catch (...) { + mlogger.error("schema_push failed: {}", std::current_exception()); + } } future<> migration_manager::drain() @@ -980,11 +986,15 @@ future<> migration_manager::announce(std::vector schema) { * * @param version The schema version to announce */ -future<> migration_manager::passive_announce(utils::UUID version) { - return _gossiper.container().invoke_on(0, [version] (auto&& gossiper) { - mlogger.debug("Gossiping my schema version {}", version); - return gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(version)); - }); +void migration_manager::passive_announce(utils::UUID version) { + _schema_version_to_publish = std::move(version); + (void)_schema_push.trigger(); +} + +future<> migration_manager::passive_announce() { + assert(this_shard_id() == 0); + mlogger.debug("Gossiping my schema version {}", _schema_version_to_publish); + return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish)); } #if 0 diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 3d48016e03..f99b6091db 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -88,6 +88,8 @@ private: netw::messaging_service& _messaging; gms::gossiper& _gossiper; seastar::abort_source _as; + serialized_action _schema_push; + utils::UUID _schema_version_to_publish; public: migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, gms::gossiper& gossiper); @@ -169,7 +171,7 @@ public: // Returns a future on the local application of the schema future<> announce(std::vector schema); - future<> passive_announce(utils::UUID version); + void passive_announce(utils::UUID version); future<> drain(); future<> stop(); @@ -190,6 +192,8 @@ private: future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector& schema); + future<> passive_announce(); + void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state); future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint); diff --git a/service/storage_service.cc b/service/storage_service.cc index 8cf50b1800..5beb9e574b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -138,7 +138,7 @@ storage_service::storage_service(abort_source& abort_source, , _batchlog_manager(bm) , _sys_dist_ks(sys_dist_ks) , _snitch_reconfigure([this] { return snitch_reconfigured(); }) - , _schema_version_publisher([this] { return publish_schema_version(); }) { +{ register_metrics(); _listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); })))); @@ -221,16 +221,6 @@ bool storage_service::should_bootstrap() { return !db::system_keyspace::bootstrap_complete() && !is_first_node(); } -void storage_service::install_schema_version_change_listener() { - _listeners.emplace_back(make_lw_shared(_db.local().observable_schema_version().observe([this] (utils::UUID schema_version) { - (void)_schema_version_publisher.trigger(); - }))); -} - -future<> storage_service::publish_schema_version() { - return _migration_manager.local().passive_announce(_db.local().get_version()); -} - future<> storage_service::snitch_reconfigured() { return update_topology(utils::fb_utilities::get_broadcast_address()); } @@ -359,8 +349,6 @@ void storage_service::prepare_to_join( auto generation_number = db::system_keyspace::increment_and_get_generation().get0(); auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip); _gossiper.start_gossiping(generation_number, app_states, advertise).get(); - - install_schema_version_change_listener(); } void storage_service::maybe_start_sys_dist_ks() { @@ -1473,11 +1461,6 @@ future<> storage_service::stop() { } catch (...) { slogger.error("failed to stop Raft Group 0: {}", std::current_exception()); } - try { - co_await _schema_version_publisher.join(); - } catch (...) { - slogger.error("schema_version_publisher failed: {}", std::current_exception()); - } co_await std::move(_node_ops_abort_thread); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 23bab00c2e..ac003e4679 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -249,8 +249,6 @@ private: void register_metrics(); future<> snitch_reconfigured(); future<> update_topology(inet_address endpoint); - future<> publish_schema_version(); - void install_schema_version_change_listener(); future get_mutable_token_metadata_ptr() noexcept { return get_token_metadata_ptr()->clone_async().then([] (token_metadata tm) { @@ -572,7 +570,6 @@ private: future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept; sharded& _sys_dist_ks; locator::snitch_signal_slot_t _snitch_reconfigure; - serialized_action _schema_version_publisher; std::unordered_set _replacing_nodes_pending_ranges_updater; private: /**