gossip: Forward gossip message to cpu0

There is one gossiper instance per node and it runs on cpu0 only. We can
not guarantee there will always be a core to core tcp connection within
messaging service, so messaging service needs to listen on all cpus.
When a remote node connects to local node with a connection bound to cpu
other than cpu0, we need to forward this message to cpu0.
This commit is contained in:
Asias He
2015-05-04 21:45:38 +08:00
committed by Avi Kivity
parent 2835da3d8c
commit f689ef705a
3 changed files with 101 additions and 74 deletions

View File

@@ -42,7 +42,6 @@ gossiper::gossiper()
/* register with the Failure Detector for receiving Failure detector events */
get_local_failure_detector().register_failure_detection_event_listener(this->shared_from_this());
// Register this instance with JMX
init_messaging_service_handler();
}
/*
@@ -84,48 +83,64 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
}
}
future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
this->set_last_processed_message_at(now_millis());
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
/* If the message is from a different cluster throw it away. */
if (syn_msg.cluster_id() != get_cluster_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
auto g_digest_list = syn_msg.get_gossip_digests();
do_sort(g_digest_list);
std::vector<gossip_digest> delta_gossip_digest_list;
std::map<inet_address, endpoint_state> delta_ep_state_map;
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
}
void gossiper::init_messaging_service_handler() {
ms().register_handler(messaging_verb::ECHO, [this] (empty_msg msg) {
ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) {
// TODO: Use time_point instead of long for timing.
this->set_last_processed_message_at(now_millis());
return make_ready_future<empty_msg>();
return smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
return make_ready_future<empty_msg>();
});
});
ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [this] (inet_address from) {
this->set_last_processed_message_at(now_millis());
// TODO: Implement processing of incoming SHUTDOWN message
get_local_failure_detector().force_conviction(from);
ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [] (inet_address from) {
smp::submit_to(0, [from] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
// TODO: Implement processing of incoming SHUTDOWN message
get_local_failure_detector().force_conviction(from);
}).discard_result();
return messaging_service::no_wait();
});
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) {
this->set_last_processed_message_at(now_millis());
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
/* If the message is from a different cluster throw it away. */
if (syn_msg.cluster_id() != get_cluster_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
}
auto g_digest_list = syn_msg.get_gossip_digests();
do_sort(g_digest_list);
std::vector<gossip_digest> delta_gossip_digest_list;
std::map<inet_address, endpoint_state> delta_ep_state_map;
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gossip_digest_syn syn_msg) {
return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_syn_msg(std::move(syn_msg));
});
});
ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) {
this->set_last_processed_message_at(now_millis());
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
this->notify_failure_detector(remote_ep_state_map);
this->apply_state_locally(remote_ep_state_map);
ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [] (gossip_digest_ack2 msg) {
smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at(now_millis());
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
gossiper.notify_failure_detector(remote_ep_state_map);
gossiper.apply_state_locally(remote_ep_state_map);
}).discard_result();
return messaging_service::no_wait();
});
}
@@ -1024,27 +1039,36 @@ void gossiper::examine_gossiper(std::vector<gossip_digest>& g_digest_list,
}
}
void gossiper::start(int generation_number) {
start(generation_number, std::map<application_state, versioned_value>());
future<> gossiper::start(int generation_number) {
return start(generation_number, std::map<application_state, versioned_value>());
}
void gossiper::start(int generation_nbr, std::map<application_state, versioned_value> preload_local_states) {
build_seeds_list();
/* initialize the heartbeat state for this localEndpoint */
maybe_initialize_local_state(generation_nbr);
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
future<> gossiper::start(int generation_nbr, std::map<application_state, versioned_value> preload_local_states) {
// Although gossiper runs on cpu0 only, we need to listen incoming gossip
// message on all cpus and forard them to cpu0 to process.
return _handlers.start().then([this] {
return _handlers.invoke_on_all([this] (handler& h) {
this->init_messaging_service_handler();
});
}).then([this, generation_nbr, preload_local_states] {
build_seeds_list();
/* initialize the heartbeat state for this localEndpoint */
maybe_initialize_local_state(generation_nbr);
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
//notify snitches that Gossiper is about to start
//notify snitches that Gossiper is about to start
#if 0
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
if (logger.isTraceEnabled())
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
if (logger.isTraceEnabled())
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
#endif
std::chrono::milliseconds period(INTERVAL_IN_MILLIS);
_scheduled_gossip_task.arm_periodic(period);
std::chrono::milliseconds period(INTERVAL_IN_MILLIS);
_scheduled_gossip_task.arm_periodic(period);
return make_ready_future<>();
});
}
void gossiper::do_shadow_round() {

View File

@@ -80,7 +80,10 @@ private:
net::messaging_service& ms() {
return net::get_local_messaging_service();
}
class handler {};
distributed<handler> _handlers;
void init_messaging_service_handler();
future<gossip_digest_ack> handle_syn_msg(gossip_digest_syn syn_msg);
static constexpr const uint32_t _default_cpuid = 0;
shard_id get_shard_id(inet_address to) {
return shard_id{to, _default_cpuid};
@@ -396,12 +399,12 @@ public:
std::map<inet_address, endpoint_state>& delta_ep_state_map);
public:
void start(int generation_number);
future<> start(int generation_number);
/**
* Start the gossiper with the generation number, preloading the map of application states before starting
*/
void start(int generation_nbr, std::map<application_state, versioned_value> preload_local_states);
future<> start(int generation_nbr, std::map<application_state, versioned_value> preload_local_states);
public:
/**

View File

@@ -37,27 +37,27 @@ int main(int ac, char ** av) {
};
int generation_number = 1;
gossiper.start(generation_number, app_states);
gossiper.start(generation_number, app_states).then([] () {
auto reporter = std::make_shared<timer<lowres_clock>>();
reporter->set_callback ([reporter] {
auto& gossiper = gms::get_local_gossiper();
gossiper.dump_endpoint_state_map();
auto& fd = gms::get_local_failure_detector();
print("%s", fd);
});
reporter->arm_periodic(std::chrono::milliseconds(1000));
auto reporter = std::make_shared<timer<lowres_clock>>();
reporter->set_callback ([reporter] {
auto& gossiper = gms::get_local_gossiper();
gossiper.dump_endpoint_state_map();
auto& fd = gms::get_local_failure_detector();
print("%s", fd);
auto app_state_adder = std::make_shared<timer<lowres_clock>>();
app_state_adder->set_callback ([app_state_adder] {
static double load = 0.5;
auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = gms::versioned_value::versioned_value_factory::load(load);
gossiper.add_local_application_state(state, value);
load += 0.0001;
});
app_state_adder->arm_periodic(std::chrono::seconds(1));
});
reporter->arm_periodic(std::chrono::milliseconds(1000));
auto app_state_adder = std::make_shared<timer<lowres_clock>>();
app_state_adder->set_callback ([app_state_adder] {
static double load = 0.5;
auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = gms::versioned_value::versioned_value_factory::load(load);
gossiper.add_local_application_state(state, value);
load += 0.0001;
});
app_state_adder->arm_periodic(std::chrono::seconds(1));
});
});
});