Merge branch 'shlomi/support_net_service_stop_v4_rebase' of github.com:cloudius-systems/seastar-dev into db

Support stop of gossiper and failure_detector, from Shlomi.

Reviewed-by: Asias He <asias@cloudius-systems.com>
This commit is contained in:
Avi Kivity
2015-05-14 16:40:27 +03:00
7 changed files with 61 additions and 12 deletions

View File

@@ -174,6 +174,7 @@ urchin_tests = [
'tests/urchin/map_difference_test',
'tests/urchin/message',
'tests/urchin/gossip',
'tests/urchin/gossip_test',
'tests/urchin/compound_test',
'tests/urchin/config_test',
]
@@ -487,6 +488,7 @@ deps['tests/urchin/sstable_test'] += boost_test_lib + ['tests/urchin/sstable_dat
deps['tests/urchin/sstable_mutation_test'] += boost_test_lib
deps['tests/urchin/hash_test'] += boost_test_lib
deps['tests/urchin/serializer_test'] += boost_test_lib
deps['tests/urchin/gossip_test'] += boost_test_lib
deps['tests/urchin/bytes_ostream_test'] = ['tests/urchin/bytes_ostream_test.cc']
deps['tests/urchin/UUID_test'] = ['utils/UUID_gen.cc', 'tests/urchin/UUID_test.cc']

View File

@@ -252,11 +252,11 @@ void failure_detector::remove(inet_address ep) {
_arrival_samples.erase(ep);
}
void failure_detector::register_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener) {
void failure_detector::register_failure_detection_event_listener(i_failure_detection_event_listener* listener) {
_fd_evnt_listeners.push_back(std::move(listener));
}
void failure_detector::unregister_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener) {
void failure_detector::unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener) {
_fd_evnt_listeners.remove(listener);
}

View File

@@ -95,12 +95,16 @@ private:
// change.
static constexpr const double PHI_FACTOR{1.0 / std::log(10.0)}; // 0.434...
std::map<inet_address, arrival_window> _arrival_samples;
std::list<shared_ptr<i_failure_detection_event_listener>> _fd_evnt_listeners;
std::list<i_failure_detection_event_listener*> _fd_evnt_listeners;
public:
failure_detector() {
}
future<> stop() {
return make_ready_future<>();
}
sstring get_all_endpoint_states();
std::map<sstring, sstring> get_simple_states();
@@ -154,9 +158,9 @@ public:
void remove(inet_address ep);
void register_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener);
void register_failure_detection_event_listener(i_failure_detection_event_listener* listener);
void unregister_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener);
void unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener);
friend std::ostream& operator<<(std::ostream& os, const failure_detector& x);
};

View File

@@ -46,10 +46,14 @@ gossiper::gossiper()
// half of QUARATINE_DELAY, to ensure _just_removed_endpoints has enough leeway to prevent re-gossip
fat_client_timeout = (int64_t) (QUARANTINE_DELAY / 2);
/* register with the Failure Detector for receiving Failure detector events */
get_local_failure_detector().register_failure_detection_event_listener(this->shared_from_this());
get_local_failure_detector().register_failure_detection_event_listener(this);
// Register this instance with JMX
}
gossiper::~gossiper() {
get_local_failure_detector().unregister_failure_detection_event_listener(this);
}
/*
* First construct a map whose key is the endpoint in the GossipDigest and the value is the
* GossipDigest itself. Then build a list of version differences i.e difference between the
@@ -1174,7 +1178,7 @@ void gossiper::add_lccal_application_states(std::list<std::pair<application_stat
}
}
void gossiper::stop() {
void gossiper::shutdown() {
warn(unimplemented::cause::GOSSIP);
// if (scheduledGossipTask != null)
// scheduledGossipTask.cancel(false);
@@ -1186,6 +1190,13 @@ void gossiper::stop() {
}
}
future<> gossiper::stop() {
_scheduled_gossip_task.cancel();
return _handlers.stop().then( [] () {
return make_ready_future<>();
});
}
bool gossiper::is_enabled() {
//return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
warn(unimplemented::cause::GOSSIP);

View File

@@ -65,7 +65,12 @@ private:
net::messaging_service& ms() {
return net::get_local_messaging_service();
}
class handler {};
class handler {
public:
future<> stop() {
return make_ready_future<>();
}
};
distributed<handler> _handlers;
void init_messaging_service_handler();
future<gossip_digest_ack> handle_syn_msg(gossip_digest_syn syn_msg);
@@ -147,6 +152,7 @@ private:
void run();
public:
gossiper();
~gossiper();
void set_last_processed_message_at(int64_t time_in_millis) {
_last_processed_message_at = time_in_millis;
}
@@ -414,7 +420,9 @@ public:
void add_lccal_application_states(std::list<std::pair<application_state, versioned_value>> states);
void stop();
void shutdown();
future<> stop();
public:
bool is_enabled();

View File

@@ -23,7 +23,6 @@
#include "gms/inet_address.hh"
#include "gms/i_failure_detection_event_listener.hh"
#include "core/shared_ptr.hh"
namespace gms {
@@ -79,14 +78,14 @@ public:
*
* @param listener implementation of an application provided IFailureDetectionEventListener
*/
virtual void register_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener) = 0;
virtual void register_failure_detection_event_listener(i_failure_detection_event_listener* listener) = 0;
/**
* Un-register interest for Failure Detector events.
*
* @param listener implementation of an application provided IFailureDetectionEventListener
*/
virtual void unregister_failure_detection_event_listener(shared_ptr<i_failure_detection_event_listener> listener) = 0;
virtual void unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener) = 0;
};
} // namespace gms

View File

@@ -0,0 +1,25 @@
#define BOOST_TEST_DYN_LINK
#include <boost/test/unit_test.hpp>
#include "tests/test-utils.hh"
#include "message/messaging_service.hh"
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "core/reactor.hh"
SEASTAR_TEST_CASE(test_boot_shutdown){
return net::get_messaging_service().start(gms::inet_address("127.0.0.1")).then( [] () {
return gms::get_failure_detector().start_single().then([] {
return gms::get_gossiper().start_single().then([] {
return gms::get_gossiper().stop().then( [] (){
return gms::get_failure_detector().stop().then( [] (){
return net::get_messaging_service().stop().then ( [] () {
return make_ready_future<>();
});
});
});
});
});
});
}