mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-05 14:33:08 +00:00
gossip: Protect gossiper from been destroyed while in use
There are three places where async operations can be scheduled - gossiper timer handler - API called by user - messaging service handler Use reference tracking infrastructure to protect. Fixes #268
This commit is contained in:
@@ -310,7 +310,7 @@ void gossiper::notify_failure_detector(inet_address endpoint, endpoint_state rem
|
||||
}
|
||||
|
||||
future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state>& map) {
|
||||
return seastar::async([this, map = std::move(map)] () mutable {
|
||||
return seastar::async([this, g = this->shared_from_this(), map = std::move(map)] () mutable {
|
||||
for (auto& entry : map) {
|
||||
auto& ep = entry.first;
|
||||
if (ep == get_broadcast_address() && !is_in_shadow_round()) {
|
||||
@@ -435,7 +435,7 @@ void gossiper::do_status_check() {
|
||||
}
|
||||
|
||||
void gossiper::run() {
|
||||
seastar::async([this] {
|
||||
seastar::async([this, g = this->shared_from_this()] {
|
||||
logger.trace("=== Gossip round START");
|
||||
|
||||
//wait on messaging service to start listening
|
||||
@@ -694,7 +694,7 @@ void gossiper::make_random_gossip_digest(std::vector<gossip_digest>& g_digests)
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
|
||||
return seastar::async([this, endpoint, host_id, local_host_id] {
|
||||
return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] {
|
||||
auto& state = endpoint_state_map.at(endpoint);
|
||||
// remember this node's generation
|
||||
int generation = state.get_heart_beat_state().get_generation();
|
||||
@@ -718,7 +718,7 @@ future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) {
|
||||
return seastar::async([this, endpoint, host_id] {
|
||||
return seastar::async([this, g = this->shared_from_this(), endpoint, host_id] {
|
||||
auto& eps = endpoint_state_map.at(endpoint);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
eps.get_heart_beat_state().force_newer_generation_unsafe();
|
||||
@@ -738,7 +738,7 @@ future<> gossiper::unsafe_assassinate_endpoint(sstring address) {
|
||||
}
|
||||
|
||||
future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
return seastar::async([this, address] {
|
||||
return seastar::async([this, g = this->shared_from_this(), address] {
|
||||
inet_address endpoint(address);
|
||||
auto is_exist = endpoint_state_map.count(endpoint);
|
||||
int gen = std::chrono::duration_cast<std::chrono::seconds>((now() + std::chrono::seconds(60)).time_since_epoch()).count();
|
||||
@@ -1005,7 +1005,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state local_state) {
|
||||
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as down {}", addr);
|
||||
local_state.mark_dead();
|
||||
seastar::async([this, addr, local_state] {
|
||||
seastar::async([this, g = this->shared_from_this(), addr, local_state] {
|
||||
_live_endpoints.erase(addr);
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("inet_address {} is now DOWN", addr);
|
||||
@@ -1218,7 +1218,7 @@ future<> gossiper::start(int generation_nbr, std::map<application_state, version
|
||||
}
|
||||
|
||||
future<> gossiper::do_shadow_round() {
|
||||
return seastar::async([this] {
|
||||
return seastar::async([this, g = this->shared_from_this()] {
|
||||
build_seeds_list();
|
||||
_in_shadow_round = true;
|
||||
auto t = clk::now();
|
||||
@@ -1298,7 +1298,7 @@ void gossiper::add_saved_endpoint(inet_address ep) {
|
||||
}
|
||||
|
||||
void gossiper::add_local_application_state(application_state state, versioned_value value) {
|
||||
seastar::async([this, state, value = std::move(value)] () mutable {
|
||||
seastar::async([this, g = this->shared_from_this(), state, value = std::move(value)] () mutable {
|
||||
inet_address ep_addr = get_broadcast_address();
|
||||
assert(endpoint_state_map.count(ep_addr));
|
||||
endpoint_state& ep_state = endpoint_state_map.at(ep_addr);
|
||||
@@ -1329,7 +1329,7 @@ void gossiper::add_lccal_application_states(std::list<std::pair<application_stat
|
||||
}
|
||||
|
||||
future<> gossiper::shutdown() {
|
||||
return seastar::async([this] {
|
||||
return seastar::async([this, g = this->shared_from_this()] {
|
||||
_enabled = false;
|
||||
_scheduled_gossip_task.cancel();
|
||||
logger.info("Announcing shutdown");
|
||||
|
||||
@@ -59,7 +59,7 @@ class i_failure_detector;
|
||||
* Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in
|
||||
* the Failure Detector.
|
||||
*/
|
||||
class gossiper : public i_failure_detection_event_listener, public enable_shared_from_this<gossiper> {
|
||||
class gossiper : public i_failure_detection_event_listener, public seastar::async_sharded_service<gossiper> {
|
||||
public:
|
||||
using clk = std::chrono::high_resolution_clock;
|
||||
private:
|
||||
|
||||
Reference in New Issue
Block a user