Merge branch 'asias/gossip' of github.com:cloudius-systems/seastar-dev into db

More gossip conversion, from Asias.
This commit is contained in:
Avi Kivity
2015-03-19 09:18:58 +02:00
6 changed files with 840 additions and 837 deletions

View File

@@ -27,6 +27,7 @@
#include "gms/application_state.hh"
#include "gms/versioned_value.hh"
#include "db_clock.hh"
#include <experimental/optional>
namespace gms {
@@ -42,6 +43,12 @@ private:
db_clock::time_point _update_timestamp;
bool _is_alive;
public:
endpoint_state()
: _heart_beat_state(0)
, _update_timestamp(db_clock::now())
, _is_alive(true) {
}
endpoint_state(heart_beat_state initial_hb_state)
: _heart_beat_state(initial_hb_state)
, _update_timestamp(db_clock::now())
@@ -57,15 +64,20 @@ public:
_heart_beat_state = hbs;
}
versioned_value getapplication_state(application_state key) {
return _application_state.at(key);
std::experimental::optional<versioned_value> get_application_state(application_state key) {
auto it = _application_state.find(key);
if (it == _application_state.end()) {
return {};
} else {
return _application_state.at(key);
}
}
/**
* TODO replace this with operations that don't expose private state
*/
// @Deprecated
std::map<application_state, versioned_value> get_application_state_map() {
std::map<application_state, versioned_value>& get_application_state_map() {
return _application_state;
}

View File

@@ -27,6 +27,7 @@
#include "gms/i_failure_detector.hh"
#include "core/sstring.hh"
#include "core/shared_ptr.hh"
#include "core/distributed.hh"
#include "gms/gossiper.hh"
#include "utils/bounded_stats_deque.hh"
#include <cmath>
@@ -160,7 +161,7 @@ public:
sstring get_all_endpoint_states() {
std::stringstream ss;
for (auto& entry : the_gossiper().endpoint_state_map) {
for (auto& entry : get_local_gossiper().endpoint_state_map) {
auto& ep = entry.first;
auto& state = entry.second;
ss << ep << "\n";
@@ -171,7 +172,7 @@ public:
std::map<sstring, sstring> get_simple_states() {
std::map<sstring, sstring> nodes_status;
for (auto& entry : the_gossiper().endpoint_state_map) {
for (auto& entry : get_local_gossiper().endpoint_state_map) {
auto& ep = entry.first;
auto& state = entry.second;
std::stringstream ss;
@@ -186,7 +187,7 @@ public:
int get_down_endpoint_count() {
int count = 0;
for (auto& entry : the_gossiper().endpoint_state_map) {
for (auto& entry : get_local_gossiper().endpoint_state_map) {
auto& state = entry.second;
if (!state.is_alive()) {
count++;
@@ -197,7 +198,7 @@ public:
int get_up_endpoint_count() {
int count = 0;
for (auto& entry : the_gossiper().endpoint_state_map) {
for (auto& entry : get_local_gossiper().endpoint_state_map) {
auto& state = entry.second;
if (state.is_alive()) {
count++;
@@ -208,7 +209,7 @@ public:
sstring get_endpoint_state(sstring address) {
std::stringstream ss;
auto eps = the_gossiper().get_endpoint_state_for_endpoint(inet_address(address));
auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(inet_address(address));
if (eps) {
append_endpoint_state(ss, *eps);
return sstring(ss.str());
@@ -275,7 +276,7 @@ public:
return true;
}
auto eps = the_gossiper().get_endpoint_state_for_endpoint(ep);
auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(ep);
// we could assert not-null, but having isAlive fail screws a node over so badly that
// it's worth being defensive here so minor bugs don't cause disproportionate
// badness. (See CASSANDRA-1463 for an example).
@@ -353,4 +354,10 @@ public:
}
};
extern distributed<failure_detector> _the_failure_detector;
inline failure_detector& get_local_failure_detector() {
assert(engine().cpu_id() == 0);
return _the_failure_detector.local();
}
} // namespace gms

View File

@@ -19,6 +19,8 @@
#include "gms/i_failure_detection_event_listener.hh"
#include "gms/failure_detector.hh"
#include "core/distributed.hh"
namespace gms {
gossiper _the_gossiper;
distributed<gossiper> _the_gossiper;
distributed<failure_detector> _the_failure_detector;
}

File diff suppressed because it is too large Load Diff

View File

@@ -48,7 +48,7 @@ public:
*/
virtual void on_join(inet_address endpoint, endpoint_state ep_state) = 0;
virtual void beforechange(inet_address endpoint, endpoint_state current_state, application_state new_statekey, versioned_value newvalue) = 0;
virtual void before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, versioned_value newvalue) = 0;
virtual void on_change(inet_address endpoint, application_state state, versioned_value value) = 0;

View File

@@ -39,6 +39,9 @@ public:
size_t serialized_size() const {
return serialize_int8_size + serialize_int32_size;
}
friend inline bool operator==(const inet_address& x, const inet_address& y) {
return x._addr == y._addr;
}
friend inline bool operator<(const inet_address& x, const inet_address& y) {
return x._addr.ip < y._addr.ip;
}