Merge branch 'asias/gossip_v1' of github.com:cloudius-systems/seastar-dev into db

Gossip now actually talks among nodes, from Asias.
This commit is contained in:
Avi Kivity
2015-04-16 17:00:24 +03:00
5 changed files with 28 additions and 10 deletions

View File

@@ -168,7 +168,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
for (auto g_digest : g_digest_list) {
inet_address addr = g_digest.get_endpoint();
auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version());
if (!local_ep_state_ptr) {
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(addr, *local_ep_state_ptr);
}
}
@@ -273,7 +273,6 @@ void gossiper::apply_state_locally(std::map<inet_address, endpoint_state>& map)
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
get_local_failure_detector().report(ep);
handle_major_state_change(ep, remote_state);
fail(unimplemented::cause::GOSSIP);
}
}
}
@@ -838,8 +837,6 @@ void gossiper::notify_failure_detector(std::map<inet_address, endpoint_state> re
}
void gossiper::mark_alive(inet_address addr, endpoint_state local_state) {
fail(unimplemented::cause::GOSSIP);
// if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) {
// real_mark_alive(addr, local_state);
// return;
@@ -1113,8 +1110,6 @@ void gossiper::maybe_initialize_local_state(int generation_nbr) {
heart_beat_state hb_state(generation_nbr);
endpoint_state local_state(hb_state);
local_state.mark_alive();
// FIXME
// endpoint_state_map.putIfAbsent(FBUtilities.getBroadcastAddress(), local_state);
inet_address ep = get_broadcast_address();
auto it = endpoint_state_map.find(ep);
if (it == endpoint_state_map.end()) {
@@ -1206,4 +1201,12 @@ int64_t gossiper::compute_expire_time() {
return now_millis() + A_VERY_LONG_TIME;
}
void gossiper::dump_endpoint_state_map() {
print("----------- endpoint_state_map dump beg -----------\n");
for (auto& x : endpoint_state_map) {
print("ep=%s, eps=%s\n", x.first, x.second);
}
print("----------- endpoint_state_map dump end -----------\n");
}
} // namespace gms

View File

@@ -98,7 +98,7 @@ private:
private:
inet_address get_broadcast_address() {
// FIXME: Helper for FBUtilities.getBroadcastAddress
return inet_address(0xffffff);
return ms().listen_address();
}
std::set<inet_address> _seeds_from_config;
public:
@@ -472,6 +472,8 @@ public:
void add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time);
static int64_t compute_expire_time();
public:
void dump_endpoint_state_map();
};
extern distributed<gossiper> _the_gossiper;

View File

@@ -62,7 +62,7 @@ public:
}
friend inline std::ostream& operator<<(std::ostream& os, const heart_beat_state& h) {
return os << "HeartBeat: generation = " << h._generation << ", version = " << h._version;
return os << "generation = " << h._generation << ", version = " << h._version;
}
// The following replaces HeartBeatStateSerializer from the Java code

View File

@@ -139,7 +139,7 @@ public:
}
#endif
versioned_value load(double load)
static inline versioned_value load(double load)
{
return versioned_value(to_sstring_sprintf(load, "%g"));
}

View File

@@ -3,6 +3,7 @@
#include "message/messaging_service.hh"
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"
namespace bpo = boost::program_options;
@@ -31,8 +32,20 @@ int main(int ac, char ** av) {
std::cout << "Start gossiper service ...\n";
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(std::move(seeds));
std::map<gms::application_state, gms::versioned_value> app_states = {
{ gms::application_state::LOAD, gms::versioned_value::versioned_value_factory::load(0.5) },
};
int generation_number = 1;
gossiper.start(generation_number);
gossiper.start(generation_number, app_states);
auto reporter = std::make_shared<timer<lowres_clock>>();
reporter->set_callback ([reporter] {
auto& gossiper = gms::get_local_gossiper();
gossiper.dump_endpoint_state_map();
});
reporter->arm_periodic(std::chrono::milliseconds(1000));
});
});
});