migration_manager: Eliminate storage service from passive announcing
Currently storage service acts as a glue between database schema value and the migration manager "passive_announce" call. This interposing is not required, migration manager can do all the management itself, and the linkage can be done in main. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
a751a1117a
commit
e4f35e2139
3
main.cc
3
main.cc
@@ -1185,6 +1185,9 @@ int main(int ac, char** av) {
|
||||
return ss.local().init_server(qp.local());
|
||||
}).get();
|
||||
|
||||
auto schema_change_announce = db.local().observable_schema_version().observe([&mm] (utils::UUID schema_version) mutable {
|
||||
mm.local().passive_announce(std::move(schema_version));
|
||||
});
|
||||
gossiper.local().wait_for_gossip_to_settle().get();
|
||||
sst_format_selector.sync();
|
||||
|
||||
|
||||
@@ -70,6 +70,7 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, netw::me
|
||||
|
||||
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, gms::gossiper& gossiper) :
|
||||
_notifier(notifier), _feat(feat), _messaging(ms), _gossiper(gossiper)
|
||||
, _schema_push([this] { return passive_announce(); })
|
||||
{
|
||||
}
|
||||
|
||||
@@ -77,6 +78,11 @@ future<> migration_manager::stop() {
|
||||
if (!_as.abort_requested()) {
|
||||
co_await drain();
|
||||
}
|
||||
try {
|
||||
co_await _schema_push.join();
|
||||
} catch (...) {
|
||||
mlogger.error("schema_push failed: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
future<> migration_manager::drain()
|
||||
@@ -980,11 +986,15 @@ future<> migration_manager::announce(std::vector<mutation> schema) {
|
||||
*
|
||||
* @param version The schema version to announce
|
||||
*/
|
||||
future<> migration_manager::passive_announce(utils::UUID version) {
|
||||
return _gossiper.container().invoke_on(0, [version] (auto&& gossiper) {
|
||||
mlogger.debug("Gossiping my schema version {}", version);
|
||||
return gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(version));
|
||||
});
|
||||
void migration_manager::passive_announce(utils::UUID version) {
|
||||
_schema_version_to_publish = std::move(version);
|
||||
(void)_schema_push.trigger();
|
||||
}
|
||||
|
||||
future<> migration_manager::passive_announce() {
|
||||
assert(this_shard_id() == 0);
|
||||
mlogger.debug("Gossiping my schema version {}", _schema_version_to_publish);
|
||||
return _gossiper.add_local_application_state(gms::application_state::SCHEMA, gms::versioned_value::schema(_schema_version_to_publish));
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -88,6 +88,8 @@ private:
|
||||
netw::messaging_service& _messaging;
|
||||
gms::gossiper& _gossiper;
|
||||
seastar::abort_source _as;
|
||||
serialized_action _schema_push;
|
||||
utils::UUID _schema_version_to_publish;
|
||||
public:
|
||||
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, gms::gossiper& gossiper);
|
||||
|
||||
@@ -169,7 +171,7 @@ public:
|
||||
// Returns a future on the local application of the schema
|
||||
future<> announce(std::vector<mutation> schema);
|
||||
|
||||
future<> passive_announce(utils::UUID version);
|
||||
void passive_announce(utils::UUID version);
|
||||
|
||||
future<> drain();
|
||||
future<> stop();
|
||||
@@ -190,6 +192,8 @@ private:
|
||||
|
||||
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
|
||||
|
||||
future<> passive_announce();
|
||||
|
||||
void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
|
||||
|
||||
future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint);
|
||||
|
||||
@@ -138,7 +138,7 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _batchlog_manager(bm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _snitch_reconfigure([this] { return snitch_reconfigured(); })
|
||||
, _schema_version_publisher([this] { return publish_schema_version(); }) {
|
||||
{
|
||||
register_metrics();
|
||||
|
||||
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
|
||||
@@ -221,16 +221,6 @@ bool storage_service::should_bootstrap() {
|
||||
return !db::system_keyspace::bootstrap_complete() && !is_first_node();
|
||||
}
|
||||
|
||||
void storage_service::install_schema_version_change_listener() {
|
||||
_listeners.emplace_back(make_lw_shared(_db.local().observable_schema_version().observe([this] (utils::UUID schema_version) {
|
||||
(void)_schema_version_publisher.trigger();
|
||||
})));
|
||||
}
|
||||
|
||||
future<> storage_service::publish_schema_version() {
|
||||
return _migration_manager.local().passive_announce(_db.local().get_version());
|
||||
}
|
||||
|
||||
future<> storage_service::snitch_reconfigured() {
|
||||
return update_topology(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
@@ -359,8 +349,6 @@ void storage_service::prepare_to_join(
|
||||
auto generation_number = db::system_keyspace::increment_and_get_generation().get0();
|
||||
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
|
||||
_gossiper.start_gossiping(generation_number, app_states, advertise).get();
|
||||
|
||||
install_schema_version_change_listener();
|
||||
}
|
||||
|
||||
void storage_service::maybe_start_sys_dist_ks() {
|
||||
@@ -1473,11 +1461,6 @@ future<> storage_service::stop() {
|
||||
} catch (...) {
|
||||
slogger.error("failed to stop Raft Group 0: {}", std::current_exception());
|
||||
}
|
||||
try {
|
||||
co_await _schema_version_publisher.join();
|
||||
} catch (...) {
|
||||
slogger.error("schema_version_publisher failed: {}", std::current_exception());
|
||||
}
|
||||
co_await std::move(_node_ops_abort_thread);
|
||||
}
|
||||
|
||||
|
||||
@@ -249,8 +249,6 @@ private:
|
||||
void register_metrics();
|
||||
future<> snitch_reconfigured();
|
||||
future<> update_topology(inet_address endpoint);
|
||||
future<> publish_schema_version();
|
||||
void install_schema_version_change_listener();
|
||||
|
||||
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
|
||||
return get_token_metadata_ptr()->clone_async().then([] (token_metadata tm) {
|
||||
@@ -572,7 +570,6 @@ private:
|
||||
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
locator::snitch_signal_slot_t _snitch_reconfigure;
|
||||
serialized_action _schema_version_publisher;
|
||||
std::unordered_set<gms::inet_address> _replacing_nodes_pending_ranges_updater;
|
||||
private:
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user