Merge branch 'asias/gossip_v2' of github.com:cloudius-systems/seastar-dev into db

Gossip update, from Asias:

"With this series, gossip now can:

1) Detect if remote node is down, if so deletes that node.
2) Update heat beat version number properly.
3) application_state::Load info is updated periodically and
   gossiped around correctly."
This commit is contained in:
Avi Kivity
2015-04-23 19:19:19 +03:00
9 changed files with 97 additions and 50 deletions

View File

@@ -55,7 +55,7 @@ public:
, _is_alive(true) {
}
heart_beat_state get_heart_beat_state() {
heart_beat_state& get_heart_beat_state() {
return _heart_beat_state;
}
@@ -82,7 +82,7 @@ public:
}
void add_application_state(application_state key, versioned_value value) {
_application_state.emplace(key, value);
_application_state[key] = value;
}
/* getters and setters */

View File

@@ -183,10 +183,10 @@ void failure_detector::set_phi_convict_threshold(double phi) {
}
double failure_detector::get_phi_convict_threshold() {
// FIXME
// FIXME: phi_convict_threshold must be between 5 and 16"
// return DatabaseDescriptor.getPhiConvictThreshold();
warn(unimplemented::cause::GOSSIP);
return 0;
return 8;
}
bool failure_detector::is_alive(inet_address ep) {

View File

@@ -85,15 +85,19 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
}
void gossiper::init_messaging_service_handler() {
ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) {
ms().register_handler(messaging_verb::ECHO, [this] (empty_msg msg) {
// TODO: Use time_point instead of long for timing.
this->set_last_processed_message_at(now_millis());
return make_ready_future<empty_msg>();
});
ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [] (inet_address from) {
ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [this] (inet_address from) {
this->set_last_processed_message_at(now_millis());
// TODO: Implement processing of incoming SHUTDOWN message
get_local_failure_detector().force_conviction(from);
return messaging_service::no_wait();
});
ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) {
this->set_last_processed_message_at(now_millis());
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
@@ -117,6 +121,7 @@ void gossiper::init_messaging_service_handler() {
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
});
ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) {
this->set_last_processed_message_at(now_millis());
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
this->notify_failure_detector(remote_ep_state_map);
@@ -140,6 +145,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> eps
using RetMsg = gossip_digest_ack;
auto id = get_shard_id(to);
ms().send_message<RetMsg>(messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(message)).then([this, id] (RetMsg ack_msg) {
this->set_last_processed_message_at(now_millis());
if (!this->is_enabled() && !this->is_in_shadow_round()) {
return make_ready_future<>();
}
@@ -289,7 +295,7 @@ void gossiper::remove_endpoint(inet_address endpoint) {
get_local_failure_detector().remove(endpoint);
// FIXME: MessagingService
//MessagingService.instance().resetVersion(endpoint);
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
quarantine_endpoint(endpoint);
// FIXME: MessagingService
//MessagingService.instance().destroyConnectionPool(endpoint);
@@ -303,21 +309,6 @@ void gossiper::do_status_check() {
int64_t now = now_millis();
// FIXME:
// int64_t pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).getPendingTasks();
int64_t pending = 1;
if (pending > 0 && _last_processed_message_at < now - 1000) {
// FIXME: SLEEP
// if some new messages just arrived, give the executor some time to work on them
//Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
// still behind? something's broke
if (_last_processed_message_at < now - 1000) {
// logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending);
return;
}
}
for (auto& entry : endpoint_state_map) {
const inet_address& endpoint = entry.first;
if (endpoint == get_broadcast_address()) {
@@ -325,7 +316,6 @@ void gossiper::do_status_check() {
}
get_local_failure_detector().interpret(endpoint);
fail(unimplemented::cause::GOSSIP);
auto it = endpoint_state_map.find(endpoint);
if (it != endpoint_state_map.end()) {
@@ -368,9 +358,8 @@ void gossiper::run() {
//wait on messaging service to start listening
// MessagingService.instance().waitUntilListening();
/* Update the local heartbeat counter. */
//endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().updateHeartBeat();
endpoint_state_map[get_broadcast_address()].get_heart_beat_state().update_heart_beat();
// if (logger.isTraceEnabled())
// logger.trace("My heartbeat is now {}", endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().get_heart_beat_version());
std::vector<gossip_digest> g_digests;
@@ -447,7 +436,7 @@ std::set<inet_address> gossiper::get_live_token_owners() {
if (it != endpoint_state_map.end() && !is_dead_state(it->second) /* && StorageService.instance.getTokenMetadata().isMember(member) */) {
token_owners.insert(member);
}
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
}
return token_owners;
}
@@ -456,7 +445,7 @@ std::set<inet_address> gossiper::get_unreachable_token_owners() {
std::set<inet_address> token_owners;
for (auto&& x : _unreachable_endpoints) {
auto& endpoint = x.first;
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
if (true /* StorageService.instance.getTokenMetadata().isMember(endpoint) */) {
token_owners.insert(endpoint);
}
@@ -572,7 +561,7 @@ void gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, ut
// logger.info("Removing host: {}", host_id);
// logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint);
// FIXME: sleep
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
// Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
// make sure it did not change
auto& eps = endpoint_state_map.at(endpoint);
@@ -603,7 +592,7 @@ void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_i
// ensure at least one gossip round occurs before returning
// FIXME: sleep
//Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS);
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
}
void gossiper::unsafe_assassinate_endpoint(sstring address) {
@@ -620,7 +609,7 @@ void gossiper::assassinate_endpoint(sstring address) {
// logger.warn("Assassinating {} via gossip", endpoint);
if (is_exist) {
// FIXME:
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
#if 0
try {
tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint);
@@ -757,7 +746,7 @@ std::map<inet_address, endpoint_state>&gms::gossiper::get_endpoint_states() {
bool gossiper::uses_host_id(inet_address endpoint) {
// FIXME
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
if (true /* MessagingService.instance().knowsVersion(endpoint) */) {
return true;
} else if (get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::NET_VERSION)) {
@@ -776,7 +765,7 @@ utils::UUID gossiper::get_host_id(inet_address endpoint) {
}
sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value;
// FIXME: Add UUID(const sstring& id) constructor
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
return utils::UUID(0, 0);
}
@@ -839,6 +828,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state local_state) {
//logger.trace("Sending a EchoMessage to {}", addr);
shard_id id = get_shard_id(addr);
ms().send_message<empty_msg>(messaging_verb::ECHO, id).then([this, addr, local_state = std::move(local_state)] (empty_msg msg) mutable {
this->set_last_processed_message_at(now_millis());
this->real_mark_alive(addr, local_state);
});
}
@@ -917,7 +907,7 @@ bool gossiper::is_dead_state(endpoint_state eps) {
return false;
}
void gossiper::apply_new_states(inet_address addr, endpoint_state local_state, endpoint_state remote_state) {
void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, endpoint_state& remote_state) {
// don't assert here, since if the node restarts the version will go back to zero
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
@@ -1067,6 +1057,7 @@ void gossiper::do_shadow_round() {
auto id = get_shard_id(seed);
ms().send_message<gossip_digest_ack>(messaging_verb::GOSSIP_DIGEST_SYN,
std::move(id), std::move(message)).then([this, id] (gossip_digest_ack ack_msg) {
this->set_last_processed_message_at(now_millis());
if (this->is_in_shadow_round()) {
this->finish_shadow_round();
}
@@ -1156,7 +1147,7 @@ void gossiper::add_lccal_application_states(std::list<std::pair<application_stat
}
void gossiper::stop() {
fail(unimplemented::cause::GOSSIP);
warn(unimplemented::cause::GOSSIP);
// if (scheduledGossipTask != null)
// scheduledGossipTask.cancel(false);
// logger.info("Announcing shutdown");

View File

@@ -372,7 +372,7 @@ public:
void apply_state_locally(std::map<inet_address, endpoint_state>& map);
private:
void apply_new_states(inet_address addr, endpoint_state local_state, endpoint_state remote_state);
void apply_new_states(inet_address addr, endpoint_state& local_state, endpoint_state& remote_state);
// notify that a local application state is going to change (doesn't get triggered for remote changes)
void do_before_change_notifications(inet_address addr, endpoint_state& ep_state, application_state& ap_state, versioned_value& new_value);

View File

@@ -65,8 +65,13 @@ public:
// values for ApplicationState.REMOVAL_COORDINATOR
static constexpr const char *REMOVAL_COORDINATOR = "REMOVER";
const int version;
const sstring value;
int version;
sstring value;
public:
versioned_value()
: version(version_generator::get_next_version())
, value("") {
}
private:
versioned_value(const sstring& value, int version = version_generator::get_next_version())

View File

@@ -111,9 +111,11 @@ struct serializer {
// For sstring
inline auto operator()(output_stream<char>& out, sstring& v) {
auto sz = serialize_int16_size + v.size();
auto serialize_string_size = serialize_int16_size + v.size();
auto sz = serialize_int16_size + serialize_string_size;
bytes b(bytes::initialized_later(), sz);
auto _out = b.begin();
serialize_int16(_out, serialize_string_size);
serialize_string(_out, v);
return out.write(reinterpret_cast<const char*>(b.c_str()), sz);
}
@@ -122,12 +124,13 @@ struct serializer {
if (buf.size() != serialize_int16_size) {
throw rpc::closed_error();
}
size_t sz = net::ntoh(*reinterpret_cast<const net::packed<int16_t>*>(buf.get()));
return in.read_exactly(sz).then([sz, &v] (temporary_buffer<char> buf) mutable {
if (buf.size() != sz) {
size_t serialize_string_size = net::ntoh(*reinterpret_cast<const net::packed<int16_t>*>(buf.get()));
return in.read_exactly(serialize_string_size).then([serialize_string_size, &v]
(temporary_buffer<char> buf) mutable {
if (buf.size() != serialize_string_size) {
throw rpc::closed_error();
}
bytes_view bv(reinterpret_cast<const int8_t*>(buf.get()), sz);
bytes_view bv(reinterpret_cast<const int8_t*>(buf.get()), serialize_string_size);
v = read_simple_short_string(bv);
return make_ready_future<>();
});
@@ -297,7 +300,9 @@ public:
auto ret = f.get();
return make_ready_future<MsgIn>(std::move(std::get<0>(ret)));
} catch (std::runtime_error&) {
remove_rpc_client(id);
// FIXME: we need to distinguish between a transport error and
// a server error.
// remove_rpc_client(id);
throw;
}
});
@@ -312,7 +317,8 @@ public:
f.get();
return make_ready_future<>();
} catch (std::runtime_error&) {
remove_rpc_client(id);
// FIXME: as above
// remove_rpc_client(id);
throw;
}
});

View File

@@ -47,6 +47,17 @@ int main(int ac, char ** av) {
print("%s", fd);
});
reporter->arm_periodic(std::chrono::milliseconds(1000));
auto app_state_adder = std::make_shared<timer<lowres_clock>>();
app_state_adder->set_callback ([app_state_adder] {
static double load = 0.5;
auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = gms::versioned_value::versioned_value_factory::load(load);
gossiper.add_local_application_state(state, value);
load += 0.0001;
});
app_state_adder->arm_periodic(std::chrono::seconds(1));
});
});
});

View File

@@ -81,6 +81,12 @@ public:
std::tuple<int, long> ret(x*x, y*y);
return make_ready_future<decltype(ret)>(std::move(ret));
});
ms.register_handler(messaging_verb::UNUSED_1, [] (int x, long y) {
print("Server got echo msg = (%d, %ld) \n", x, y);
throw std::runtime_error("I'm throwing runtime_error exception");
long ret = x + y;
return make_ready_future<decltype(ret)>(ret);
});
}
public:
@@ -129,11 +135,34 @@ public:
int msg1 = 30;
int msg2 = 60;
using RetMsg = std::tuple<int, long>;
return ms.send_message<RetMsg>(messaging_verb::ECHO, id, msg1, msg2).then([] (RetMsg msg) {
print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg));
return sleep(100ms).then([]{
return ms.send_message<RetMsg>(messaging_verb::ECHO, id, msg1, msg2).then_wrapped([] (future<RetMsg> f) {
try {
auto msg = std::get<0>(f.get());
print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg));
return sleep(100ms).then([]{
return make_ready_future<>();
});
} catch (std::runtime_error& e) {
print("test_echo: %s\n", e.what());
}
return make_ready_future<>();
});
}
future<> test_exception() {
print("=== %s ===\n", __func__);
auto id = get_shard_id();
int msg1 = 3;
int msg2 = 6;
return ms.send_message<long>(messaging_verb::UNUSED_1, id, msg1, msg2).then_wrapped([] (future<long> f) {
try {
auto ret = std::get<0>(f.get());
print("Client sent UNUSED_1 got reply = %ld\n", ret);
return make_ready_future<>();
});
} catch (std::runtime_error& e) {
print("Client sent UNUSED_1 got exception: %s\n", e.what());
}
return make_ready_future<>();
});
}
};
@@ -144,9 +173,12 @@ int main(int ac, char ** av) {
app_template app;
app.add_options()
("server", bpo::value<std::string>(), "Server ip")
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen")
("cpuid", bpo::value<uint32_t>()->default_value(0), "Server cpuid");
return app.run(ac, av, [&] {
net::get_messaging_service().start().then([&] () {
auto&& config = app.configuration();
auto listen = gms::inet_address(config["listen-address"].as<std::string>());
net::get_messaging_service().start(std::ref(listen)).then([&] () {
auto&& config = app.configuration();
auto testers = new distributed<tester>;
testers->start().then([testers]{
@@ -169,6 +201,8 @@ int main(int ac, char ** av) {
return t->test_gossip_shutdown();
}).then([testers, t] {
return t->test_echo();
}).then([testers, t] {
return t->test_exception();
}).then([testers, t] {
print("=============TEST DONE===========\n");
testers->stop().then([testers] {

View File

@@ -47,8 +47,8 @@ public:
auto removed = _deque.front();
_deque.pop_front();
_sum -= removed;
_deque.push_back(i);
}
_deque.push_back(i);
_sum += i;
}