diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 489299ac85..4dd5591441 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -130,7 +130,7 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm, * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding * to the endpoint from the map that was initially constructed. */ -void gossiper::do_sort(utils::chunked_vector& g_digest_list) { +void gossiper::do_sort(utils::chunked_vector& g_digest_list) const { /* Construct a map of endpoint to GossipDigest. */ std::map ep_to_digest_map; for (auto g_digest : g_digest_list) { @@ -1224,7 +1224,7 @@ void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quaran _just_removed_endpoints[endpoint] = quarantine_start; } -void gossiper::make_random_gossip_digest(utils::chunked_vector& g_digests) { +void gossiper::make_random_gossip_digest(utils::chunked_vector& g_digests) const { generation_type generation; version_type max_version; @@ -1243,17 +1243,6 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g } g_digests.push_back(gossip_digest(endpoint, generation, max_version)); } -#if 0 - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - for (GossipDigest g_digest : g_digests) - { - sb.append(g_digest); - sb.append(" "); - } - logger.trace("Gossip Digests are : {}", sb); - } -#endif } future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) { @@ -1357,14 +1346,16 @@ future<> gossiper::assassinate_endpoint(sstring address) { }); } -future gossiper::get_current_generation_number(inet_address endpoint) { - return container().invoke_on(0, [endpoint] (auto&& gossiper) { +future gossiper::get_current_generation_number(inet_address endpoint) const { + // FIXME: const container() has no const invoke_on variant + return const_cast(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) { return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation(); }); } -future gossiper::get_current_heart_beat_version(inet_address endpoint) { - return container().invoke_on(0, [endpoint] (auto&& gossiper) { +future gossiper::get_current_heart_beat_version(inet_address endpoint) const { + // FIXME: const container() has no const invoke_on variant + return const_cast(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) { return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version(); }); } @@ -1397,7 +1388,7 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) { return make_ready_future<>(); } -bool gossiper::is_gossip_only_member(inet_address endpoint) { +bool gossiper::is_gossip_only_member(inet_address endpoint) const { auto es = get_endpoint_state_ptr(endpoint); if (!es) { return false; @@ -1434,7 +1425,7 @@ const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const { if (it == _endpoint_state_map.end()) { throw std::out_of_range(format("ep={}", ep)); } - return const_cast(*it->second); + return *it->second; } endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) { @@ -1522,7 +1513,7 @@ std::set gossiper::get_nodes_with_host_id(locator::host_id ho return nodes; } -std::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) { +std::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const { std::optional reqd_endpoint_state; auto es = get_endpoint_state_ptr(for_endpoint); if (es) { @@ -1556,7 +1547,7 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a return reqd_endpoint_state; } -generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) { +generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) const { auto ep1 = get_endpoint_state_ptr(addr1); auto ep2 = get_endpoint_state_ptr(addr2); if (!ep1 || !ep2) { @@ -1860,13 +1851,13 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat maybe_rethrow_exception(std::move(ep)); } -future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) { +future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) const { co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { return subscriber->before_change(addr, ep_state, ap_state, new_value); }); } -future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id pid) { +future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id pid) const { co_await _subscribers.for_each([this, addr, state, value, pid] (shared_ptr subscriber) { // Once _abort_source is aborted, don't attempt to process any further notifications // because that would violate monotonicity due to partially failed notification. @@ -1875,14 +1866,14 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati }); } -future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) { +future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) const { co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr subscriber) { return subscriber->on_dead(addr, state, pid); }); } void gossiper::request_all(gossip_digest& g_digest, - utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation) { + utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation) const { /* We are here since we have no data for this endpoint locally so request everthing. */ delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation); logger.trace("request_all for {}", g_digest.get_endpoint()); @@ -1890,7 +1881,7 @@ void gossiper::request_all(gossip_digest& g_digest, void gossiper::send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, - version_type max_remote_version) { + version_type max_remote_version) const { auto ep = g_digest.get_endpoint(); logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version); auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version); @@ -1901,7 +1892,7 @@ void gossiper::send_all(gossip_digest& g_digest, void gossiper::examine_gossiper(utils::chunked_vector& g_digest_list, utils::chunked_vector& delta_gossip_digest_list, - std::map& delta_ep_state_map) { + std::map& delta_ep_state_map) const { if (g_digest_list.size() == 0) { /* we've been sent a *completely* empty syn, which should normally * never happen since an endpoint will at least send a syn with @@ -2008,7 +1999,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map } future -gossiper::get_generation_for_nodes(std::unordered_set nodes) { +gossiper::get_generation_for_nodes(std::unordered_set nodes) const { generation_for_nodes ret; for (const auto& node : nodes) { auto es = get_endpoint_state_ptr(node); @@ -2348,14 +2339,6 @@ clk::time_point gossiper::compute_expire_time() { return now() + A_VERY_LONG_TIME; } -void gossiper::dump_endpoint_state_map() { - logger.info("=== endpoint_state_map dump starts == "); - for (auto& x : _endpoint_state_map) { - logger.info("endpoint={}, endpoint_state={}", x.first, x.second); - } - logger.info("=== endpoint_state_map dump ends ==="); -} - bool gossiper::is_alive(inet_address ep) const { if (ep == get_broadcast_address()) { return true; @@ -2478,7 +2461,7 @@ std::string_view gossiper::get_gossip_status(const inet_address& endpoint) const return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS)); } -future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional force_after) { +future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional force_after) const { static constexpr std::chrono::milliseconds GOSSIP_SETTLE_POLL_INTERVAL_MS{1000}; static constexpr int32_t GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3; @@ -2520,21 +2503,21 @@ future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std: } } -future<> gossiper::wait_for_gossip_to_settle() { +future<> gossiper::wait_for_gossip_to_settle() const { auto force_after = _gcfg.skip_wait_for_gossip_to_settle; if (force_after != 0) { co_await wait_for_gossip(GOSSIP_SETTLE_MIN_WAIT_MS, force_after); } } -future<> gossiper::wait_for_range_setup() { +future<> gossiper::wait_for_range_setup() const { logger.info("Waiting for pending range setup..."); auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms); auto force_after = _gcfg.skip_wait_for_gossip_to_settle; return wait_for_gossip(ring_delay, force_after); } -bool gossiper::is_safe_for_bootstrap(inet_address endpoint) { +bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const { // We allow to bootstrap a new node in only two cases: // 1) The node is a completely new node and no state in gossip at all // 2) The node has state in gossip and it is already removed from the @@ -2555,7 +2538,7 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) { return allowed; } -bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_id) { +bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const { // Reject to restart a node in case: // *) if the node has been removed from the cluster by nodetool decommission or // nodetool removenode diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 1a474bb84c..7f7fb43d94 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -114,7 +114,7 @@ private: future handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request); static constexpr uint32_t _default_cpuid = 0; msg_addr get_msg_addr(inet_address to) const noexcept; - void do_sort(utils::chunked_vector& g_digest_list); + void do_sort(utils::chunked_vector& g_digest_list) const; timer _scheduled_gossip_task; bool _enabled = false; semaphore _callback_running{1}; @@ -132,7 +132,7 @@ private: public: // Get current generation number for the given nodes future - get_generation_for_nodes(std::unordered_set nodes); + get_generation_for_nodes(std::unordered_set nodes) const; // Only respond echo message listed in nodes with the generation number future<> advertise_to_nodes(generation_for_nodes advertise_to_nodes = {}); const sstring& get_cluster_name() const noexcept; @@ -213,7 +213,7 @@ public: std::chrono::milliseconds quarantine_delay() const noexcept; private: - std::default_random_engine _random_engine{std::random_device{}()}; + mutable std::default_random_engine _random_engine{std::random_device{}()}; /** * subscribers for interest in EndpointState change @@ -247,9 +247,6 @@ private: bool _in_shadow_round = false; - uint64_t _shadow_live_endpoints_version = 0; - uint64_t _shadow_unreachable_endpoints_version = 0; - // Must be called on shard 0. future> lock_endpoint_update_semaphore(); @@ -360,7 +357,7 @@ private: * * @param g_digests list of Gossip Digests. */ - void make_random_gossip_digest(utils::chunked_vector& g_digests); + void make_random_gossip_digest(utils::chunked_vector& g_digests) const; public: /** @@ -385,12 +382,12 @@ public: future<> assassinate_endpoint(sstring address); public: - future get_current_generation_number(inet_address endpoint); - future get_current_heart_beat_version(inet_address endpoint); + future get_current_generation_number(inet_address endpoint) const; + future get_current_heart_beat_version(inet_address endpoint) const; - bool is_gossip_only_member(inet_address endpoint); - bool is_safe_for_bootstrap(inet_address endpoint); - bool is_safe_for_restart(inet_address endpoint, locator::host_id host_id); + bool is_gossip_only_member(inet_address endpoint) const; + bool is_safe_for_bootstrap(inet_address endpoint) const; + bool is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const; private: /** * Returns true if the chosen target was also a seed. False otherwise @@ -453,12 +450,12 @@ public: std::set get_nodes_with_host_id(locator::host_id host_id) const; - std::optional get_state_for_version_bigger_than(inet_address for_endpoint, version_type version); + std::optional get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const; /** * determine which endpoint started up earlier */ - generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2); + generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2) const; /** * Return the rpc address associated with an endpoint as a string. @@ -527,22 +524,22 @@ private: // notify that a local application state is going to change (doesn't get triggered for remote changes) // Must be called under lock_endpoint. - future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value); + future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) const; // notify that an application state has changed // Must be called under lock_endpoint. - future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id); + future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id) const; // notify that a node is DOWN (dead) // Must be called under lock_endpoint. - future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id); + future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id) const; /* Request all the state for the endpoint in the g_digest */ - void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation); + void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, generation_type remote_generation) const; /* Send all the data with version greater than max_remote_version */ - void send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, version_type max_remote_version); + void send_all(gossip_digest& g_digest, std::map& delta_ep_state_map, version_type max_remote_version) const; public: /* @@ -551,7 +548,7 @@ public: */ void examine_gossiper(utils::chunked_vector& g_digest_list, utils::chunked_vector& delta_gossip_digest_list, - std::map& delta_ep_state_map); + std::map& delta_ep_state_map) const; public: /** @@ -641,8 +638,6 @@ public: void add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time); static clk::time_point compute_expire_time(); -public: - void dump_endpoint_state_map(); public: bool is_seed(const inet_address& endpoint) const; bool is_shutdown(const inet_address& endpoint) const; @@ -659,10 +654,10 @@ public: std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept; std::string_view get_gossip_status(const inet_address& endpoint) const noexcept; public: - future<> wait_for_gossip_to_settle(); - future<> wait_for_range_setup(); + future<> wait_for_gossip_to_settle() const; + future<> wait_for_range_setup() const; private: - future<> wait_for_gossip(std::chrono::milliseconds, std::optional = {}); + future<> wait_for_gossip(std::chrono::milliseconds, std::optional = {}) const; uint64_t _nr_run = 0; uint64_t _msg_processing = 0; diff --git a/utils/atomic_vector.hh b/utils/atomic_vector.hh index 5ee801944f..4e8fe6b9f1 100644 --- a/utils/atomic_vector.hh +++ b/utils/atomic_vector.hh @@ -20,7 +20,7 @@ template class atomic_vector { std::vector _vec; - seastar::rwlock _vec_lock; + mutable seastar::rwlock _vec_lock; public: void add(const T& value) { @@ -38,7 +38,7 @@ public: // We would take callbacks that take a T&, but we had bugs in the // past with some of those callbacks holding that reference past a // preemption. - void thread_for_each(seastar::noncopyable_function func) { + void thread_for_each(seastar::noncopyable_function func) const { _vec_lock.for_read().lock().get(); auto unlock = seastar::defer([this] { _vec_lock.for_read().unlock(); @@ -56,7 +56,7 @@ public: // We would take callbacks that take a T&, but we had bugs in the // past with some of those callbacks holding that reference past a // preemption. - seastar::future<> for_each(seastar::noncopyable_function(T)> func) { + seastar::future<> for_each(seastar::noncopyable_function(T)> func) const { auto holder = co_await _vec_lock.hold_read_lock(); // We grab a lock in remove(), but not in add(), so we // iterate using indexes to guard against the vector being