diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 6138223d2c..8aa2f529f1 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -119,7 +119,8 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm } } // namespace -class topology_coordinator : public endpoint_lifecycle_subscriber { +class topology_coordinator : public endpoint_lifecycle_subscriber + , public migration_listener::empty_listener { sharded& _sys_dist_ks; gms::gossiper& _gossiper; netw::messaging_service& _messaging; @@ -2296,6 +2297,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }); } + virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override { + // New tablets were allocated, we need per-tablet stats for them for tablet balancer to make progress. + trigger_load_stats_refresh(); + } + + virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override { + trigger_load_stats_refresh(); + } + + virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) override { + // Tablet hints may have changed. Wake up so that load balancer re-evaluates tablet distribution. + _topo_sm.event.broadcast(); + } + + virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override { + // Tablet distribution has changed. Wake up the load balancer. + _topo_sm.event.broadcast(); + } + + virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override { + // Tablet distribution has changed. Wake up the load balancer. + _topo_sm.event.broadcast(); + } + future<> cancel_all_requests(group0_guard guard, std::unordered_set dead_nodes) { utils::chunked_vector muts; std::vector reject_join; @@ -3630,15 +3655,17 @@ public: , _voter_handler(group0, topo_sm._topology, gossiper, feature_service) , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _async_gate("topology_coordinator") - {} + { + _db.get_notifier().register_listener(this); + } // Returns true if the upgrade was done, returns false if upgrade was interrupted. future maybe_run_upgrade(); future<> run(); future<> stop(); - virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); }; - virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) { _topo_sm.event.broadcast(); }; + virtual void on_up(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); }; + virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) override { _topo_sm.event.broadcast(); }; private: tablet_ops_metrics _tablet_ops_metrics; @@ -4225,6 +4252,8 @@ future<> topology_coordinator::run() { } future<> topology_coordinator::stop() { + co_await _db.get_notifier().unregister_listener(this); + // if topology_coordinator::run() is aborted either because we are not a // leader anymore, or we are shutting down as a leader, we have to handle // futures in _tablets in case any of them failed, before these failures