Merge 'Gossiper: mark const methods and remove dead code' from Benny Halevy

This series cleans up gossiper.
Methods that do not change the gossiper object are marked as const.
Dead code is removed.

Closes #15272

* github.com:scylladb/scylladb:
  gossiper: get_current* methods: mark as const
  gossiper: get_generation_for_nodes: mark as const
  gossiper: examine_gossiper: mark as const
  gossiper: request_all, send_all: mark as const
  gossiper: do_on_*notifications: mark as const
  utils: atomic_vector: mark for_each functions as const
  gossiper: compare_endpoint_startup: mark as const
  gossiper: get_state_for_version_bigger_than: mark as const
  gossiper: make_random_gossip_digest: delete dead legacy code
  gossiper: make_random_gossip_digest: mark as const
  gossiper: do_sort: mark as const
  gossiper: is* methods: mark as const
  gossiper: wait_for_gossip and friends: mark as const
  gossiper: drop unused dump_endpoint_state_map
  gossiper: remove unused shadow version members
This commit is contained in:
Pavel Emelyanov
2023-09-05 13:47:29 +03:00
3 changed files with 47 additions and 69 deletions

View File

@@ -130,7 +130,7 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm,
* Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
* to the endpoint from the map that was initially constructed.
*/
void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) {
void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) const {
/* Construct a map of endpoint to GossipDigest. */
std::map<inet_address, gossip_digest> ep_to_digest_map;
for (auto g_digest : g_digest_list) {
@@ -1224,7 +1224,7 @@ void gossiper::quarantine_endpoint(inet_address endpoint, clk::time_point quaran
_just_removed_endpoints[endpoint] = quarantine_start;
}
void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests) {
void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests) const {
generation_type generation;
version_type max_version;
@@ -1243,17 +1243,6 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
}
g_digests.push_back(gossip_digest(endpoint, generation, max_version));
}
#if 0
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
for (GossipDigest g_digest : g_digests)
{
sb.append(g_digest);
sb.append(" ");
}
logger.trace("Gossip Digests are : {}", sb);
}
#endif
}
future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) {
@@ -1357,14 +1346,16 @@ future<> gossiper::assassinate_endpoint(sstring address) {
});
}
future<generation_type> gossiper::get_current_generation_number(inet_address endpoint) {
return container().invoke_on(0, [endpoint] (auto&& gossiper) {
future<generation_type> gossiper::get_current_generation_number(inet_address endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation();
});
}
future<version_type> gossiper::get_current_heart_beat_version(inet_address endpoint) {
return container().invoke_on(0, [endpoint] (auto&& gossiper) {
future<version_type> gossiper::get_current_heart_beat_version(inet_address endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version();
});
}
@@ -1397,7 +1388,7 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
return make_ready_future<>();
}
bool gossiper::is_gossip_only_member(inet_address endpoint) {
bool gossiper::is_gossip_only_member(inet_address endpoint) const {
auto es = get_endpoint_state_ptr(endpoint);
if (!es) {
return false;
@@ -1434,7 +1425,7 @@ const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const {
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
}
return const_cast<endpoint_state&>(*it->second);
return *it->second;
}
endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) {
@@ -1522,7 +1513,7 @@ std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id ho
return nodes;
}
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) {
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const {
std::optional<endpoint_state> reqd_endpoint_state;
auto es = get_endpoint_state_ptr(for_endpoint);
if (es) {
@@ -1556,7 +1547,7 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
return reqd_endpoint_state;
}
generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) const {
auto ep1 = get_endpoint_state_ptr(addr1);
auto ep2 = get_endpoint_state_ptr(addr2);
if (!ep1 || !ep2) {
@@ -1860,13 +1851,13 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat
maybe_rethrow_exception(std::move(ep));
}
future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) {
future<> gossiper::do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) const {
co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->before_change(addr, ep_state, ap_state, new_value);
});
}
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id pid) {
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id pid) const {
co_await _subscribers.for_each([this, addr, state, value, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
// Once _abort_source is aborted, don't attempt to process any further notifications
// because that would violate monotonicity due to partially failed notification.
@@ -1875,14 +1866,14 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati
});
}
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) {
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) const {
co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_dead(addr, state, pid);
});
}
void gossiper::request_all(gossip_digest& g_digest,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) {
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) const {
/* We are here since we have no data for this endpoint locally so request everthing. */
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation);
logger.trace("request_all for {}", g_digest.get_endpoint());
@@ -1890,7 +1881,7 @@ void gossiper::request_all(gossip_digest& g_digest,
void gossiper::send_all(gossip_digest& g_digest,
std::map<inet_address, endpoint_state>& delta_ep_state_map,
version_type max_remote_version) {
version_type max_remote_version) const {
auto ep = g_digest.get_endpoint();
logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version);
auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version);
@@ -1901,7 +1892,7 @@ void gossiper::send_all(gossip_digest& g_digest,
void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_list,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list,
std::map<inet_address, endpoint_state>& delta_ep_state_map) {
std::map<inet_address, endpoint_state>& delta_ep_state_map) const {
if (g_digest_list.size() == 0) {
/* we've been sent a *completely* empty syn, which should normally
* never happen since an endpoint will at least send a syn with
@@ -2008,7 +1999,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map
}
future<gossiper::generation_for_nodes>
gossiper::get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes) {
gossiper::get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes) const {
generation_for_nodes ret;
for (const auto& node : nodes) {
auto es = get_endpoint_state_ptr(node);
@@ -2348,14 +2339,6 @@ clk::time_point gossiper::compute_expire_time() {
return now() + A_VERY_LONG_TIME;
}
void gossiper::dump_endpoint_state_map() {
logger.info("=== endpoint_state_map dump starts == ");
for (auto& x : _endpoint_state_map) {
logger.info("endpoint={}, endpoint_state={}", x.first, x.second);
}
logger.info("=== endpoint_state_map dump ends ===");
}
bool gossiper::is_alive(inet_address ep) const {
if (ep == get_broadcast_address()) {
return true;
@@ -2478,7 +2461,7 @@ std::string_view gossiper::get_gossip_status(const inet_address& endpoint) const
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
}
future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional<int32_t> force_after) {
future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std::optional<int32_t> force_after) const {
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_POLL_INTERVAL_MS{1000};
static constexpr int32_t GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
@@ -2520,21 +2503,21 @@ future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std:
}
}
future<> gossiper::wait_for_gossip_to_settle() {
future<> gossiper::wait_for_gossip_to_settle() const {
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
if (force_after != 0) {
co_await wait_for_gossip(GOSSIP_SETTLE_MIN_WAIT_MS, force_after);
}
}
future<> gossiper::wait_for_range_setup() {
future<> gossiper::wait_for_range_setup() const {
logger.info("Waiting for pending range setup...");
auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms);
auto force_after = _gcfg.skip_wait_for_gossip_to_settle;
return wait_for_gossip(ring_delay, force_after);
}
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
// We allow to bootstrap a new node in only two cases:
// 1) The node is a completely new node and no state in gossip at all
// 2) The node has state in gossip and it is already removed from the
@@ -2555,7 +2538,7 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
return allowed;
}
bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_id) {
bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const {
// Reject to restart a node in case:
// *) if the node has been removed from the cluster by nodetool decommission or
// nodetool removenode

View File

@@ -114,7 +114,7 @@ private:
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
static constexpr uint32_t _default_cpuid = 0;
msg_addr get_msg_addr(inet_address to) const noexcept;
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list);
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) const;
timer<lowres_clock> _scheduled_gossip_task;
bool _enabled = false;
semaphore _callback_running{1};
@@ -132,7 +132,7 @@ private:
public:
// Get current generation number for the given nodes
future<generation_for_nodes>
get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes);
get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes) const;
// Only respond echo message listed in nodes with the generation number
future<> advertise_to_nodes(generation_for_nodes advertise_to_nodes = {});
const sstring& get_cluster_name() const noexcept;
@@ -213,7 +213,7 @@ public:
std::chrono::milliseconds quarantine_delay() const noexcept;
private:
std::default_random_engine _random_engine{std::random_device{}()};
mutable std::default_random_engine _random_engine{std::random_device{}()};
/**
* subscribers for interest in EndpointState change
@@ -247,9 +247,6 @@ private:
bool _in_shadow_round = false;
uint64_t _shadow_live_endpoints_version = 0;
uint64_t _shadow_unreachable_endpoints_version = 0;
// Must be called on shard 0.
future<semaphore_units<>> lock_endpoint_update_semaphore();
@@ -360,7 +357,7 @@ private:
*
* @param g_digests list of Gossip Digests.
*/
void make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests);
void make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests) const;
public:
/**
@@ -385,12 +382,12 @@ public:
future<> assassinate_endpoint(sstring address);
public:
future<generation_type> get_current_generation_number(inet_address endpoint);
future<version_type> get_current_heart_beat_version(inet_address endpoint);
future<generation_type> get_current_generation_number(inet_address endpoint) const;
future<version_type> get_current_heart_beat_version(inet_address endpoint) const;
bool is_gossip_only_member(inet_address endpoint);
bool is_safe_for_bootstrap(inet_address endpoint);
bool is_safe_for_restart(inet_address endpoint, locator::host_id host_id);
bool is_gossip_only_member(inet_address endpoint) const;
bool is_safe_for_bootstrap(inet_address endpoint) const;
bool is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const;
private:
/**
* Returns true if the chosen target was also a seed. False otherwise
@@ -453,12 +450,12 @@ public:
std::set<gms::inet_address> get_nodes_with_host_id(locator::host_id host_id) const;
std::optional<endpoint_state> get_state_for_version_bigger_than(inet_address for_endpoint, version_type version);
std::optional<endpoint_state> get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const;
/**
* determine which endpoint started up earlier
*/
generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2);
generation_type::value_type compare_endpoint_startup(inet_address addr1, inet_address addr2) const;
/**
* Return the rpc address associated with an endpoint as a string.
@@ -527,22 +524,22 @@ private:
// notify that a local application state is going to change (doesn't get triggered for remote changes)
// Must be called under lock_endpoint.
future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value);
future<> do_before_change_notifications(inet_address addr, endpoint_state_ptr ep_state, const application_state& ap_state, const versioned_value& new_value) const;
// notify that an application state has changed
// Must be called under lock_endpoint.
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id);
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id) const;
// notify that a node is DOWN (dead)
// Must be called under lock_endpoint.
future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id);
future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id) const;
/* Request all the state for the endpoint in the g_digest */
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation);
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) const;
/* Send all the data with version greater than max_remote_version */
void send_all(gossip_digest& g_digest, std::map<inet_address, endpoint_state>& delta_ep_state_map, version_type max_remote_version);
void send_all(gossip_digest& g_digest, std::map<inet_address, endpoint_state>& delta_ep_state_map, version_type max_remote_version) const;
public:
/*
@@ -551,7 +548,7 @@ public:
*/
void examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_list,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list,
std::map<inet_address, endpoint_state>& delta_ep_state_map);
std::map<inet_address, endpoint_state>& delta_ep_state_map) const;
public:
/**
@@ -641,8 +638,6 @@ public:
void add_expire_time_for_endpoint(inet_address endpoint, clk::time_point expire_time);
static clk::time_point compute_expire_time();
public:
void dump_endpoint_state_map();
public:
bool is_seed(const inet_address& endpoint) const;
bool is_shutdown(const inet_address& endpoint) const;
@@ -659,10 +654,10 @@ public:
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
std::string_view get_gossip_status(const inet_address& endpoint) const noexcept;
public:
future<> wait_for_gossip_to_settle();
future<> wait_for_range_setup();
future<> wait_for_gossip_to_settle() const;
future<> wait_for_range_setup() const;
private:
future<> wait_for_gossip(std::chrono::milliseconds, std::optional<int32_t> = {});
future<> wait_for_gossip(std::chrono::milliseconds, std::optional<int32_t> = {}) const;
uint64_t _nr_run = 0;
uint64_t _msg_processing = 0;

View File

@@ -20,7 +20,7 @@
template <typename T>
class atomic_vector {
std::vector<T> _vec;
seastar::rwlock _vec_lock;
mutable seastar::rwlock _vec_lock;
public:
void add(const T& value) {
@@ -38,7 +38,7 @@ public:
// We would take callbacks that take a T&, but we had bugs in the
// past with some of those callbacks holding that reference past a
// preemption.
void thread_for_each(seastar::noncopyable_function<void(T)> func) {
void thread_for_each(seastar::noncopyable_function<void(T)> func) const {
_vec_lock.for_read().lock().get();
auto unlock = seastar::defer([this] {
_vec_lock.for_read().unlock();
@@ -56,7 +56,7 @@ public:
// We would take callbacks that take a T&, but we had bugs in the
// past with some of those callbacks holding that reference past a
// preemption.
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) {
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
auto holder = co_await _vec_lock.hold_read_lock();
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being