locator: introduce i_endpoint_snitch::pause_io() and resume_io() methods

resume_io() is different from start() in that it won't try to read to configuration
and will only restart the periodic I/O task (if any).

This also means that resume_io() may not fail while start() will return an
exceptional future if it fails to read the configuration.

pause_io() is a counterpart of resume_io() - it stops the periodic I/O task (if any).
After it returns a ready future - snitch will not try to read any configuration until
either start() or resume_io() are called.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
Vlad Zolotarov
2015-08-11 16:57:06 +03:00
parent b9389d4907
commit ef1c7deff4
4 changed files with 75 additions and 16 deletions

View File

@@ -49,14 +49,15 @@ future<bool> gossiping_property_file_snitch::property_file_was_modified() {
gossiping_property_file_snitch::gossiping_property_file_snitch(
const sstring& fname, unsigned io_cpu_id)
: _fname(fname), _file_reader_cpu_id(io_cpu_id) {
_state = snitch_state::initializing;
}
: _fname(fname), _file_reader_cpu_id(io_cpu_id) {}
future<> gossiping_property_file_snitch::start() {
using namespace std::chrono_literals;
_state = snitch_state::initializing;
reset_io_state();
// Run a timer only on specific CPU
if (engine().cpu_id() == _file_reader_cpu_id) {
//
@@ -70,7 +71,7 @@ future<> gossiping_property_file_snitch::start() {
io_cpu_id() = _file_reader_cpu_id;
return read_property_file().then([this] {
_file_reader.arm(reload_property_file_period());
start_io();
set_snitch_ready();
return make_ready_future<>();
});
@@ -97,7 +98,7 @@ void gossiping_property_file_snitch::periodic_reader_callback() {
"file.");
}
if (_state == snitch_state::stopping) {
if (_state == snitch_state::stopping || _state == snitch_state::io_pausing) {
this->set_stopped();
} else if (_state != snitch_state::stopped) {
_file_reader.arm(reload_property_file_period());
@@ -276,17 +277,16 @@ future<> gossiping_property_file_snitch::reload_configuration() {
}
void gossiping_property_file_snitch::set_stopped() {
_state = snitch_state::stopped;
_snitch_is_stopped.set_value();
}
future<> gossiping_property_file_snitch::stop() {
if (_state == snitch_state::stopped) {
return make_ready_future<>();
if (_state == snitch_state::stopping) {
_state = snitch_state::stopped;
} else {
_state = snitch_state::io_paused;
}
_state = snitch_state::stopping;
_io_is_stopped.set_value();
}
future<> gossiping_property_file_snitch::stop_io() {
if (engine().cpu_id() == _file_reader_cpu_id) {
_file_reader.cancel();
@@ -298,7 +298,40 @@ future<> gossiping_property_file_snitch::stop() {
set_stopped();
}
return _snitch_is_stopped.get_future();
return _io_is_stopped.get_future();
}
void gossiping_property_file_snitch::resume_io() {
reset_io_state();
start_io();
set_snitch_ready();
}
void gossiping_property_file_snitch::start_io() {
// Run a timer only on specific CPU
if (engine().cpu_id() == _file_reader_cpu_id) {
_file_reader.arm(reload_property_file_period());
}
}
future<> gossiping_property_file_snitch::stop() {
if (_state == snitch_state::stopped || _state == snitch_state::io_paused) {
return make_ready_future<>();
}
_state = snitch_state::stopping;
return stop_io();
}
future<> gossiping_property_file_snitch::pause_io() {
if (_state == snitch_state::stopped || _state == snitch_state::io_paused) {
return make_ready_future<>();
}
_state = snitch_state::io_pausing;
return stop_io();
}
void gossiping_property_file_snitch::reload_gossiper_state()

View File

@@ -54,6 +54,8 @@ public:
virtual void gossiper_starting() override;
virtual future<> stop() override;
virtual future<> start() override;
virtual future<> pause_io() override;
virtual void resume_io() override;
gossiping_property_file_snitch(
const sstring& fname = snitch_properties_filename,
@@ -131,6 +133,9 @@ private:
*/
void set_stopped();
future<> stop_io();
void start_io();
private:
sstring _fname;
timer<> _file_reader;

View File

@@ -67,6 +67,14 @@ public:
_my_distributed = d;
}
void reset_io_state() {
//
// Reset the promise to allow repeating
// start()+stop()/pause_io()+resume_io() call sequences.
//
_io_is_stopped = promise<>();
}
private:
sstring get_endpoint_info(inet_address endpoint, gms::application_state key,
const sstring& default_val) {
@@ -109,7 +117,7 @@ private:
}
protected:
promise<> _snitch_is_stopped;
promise<> _io_is_stopped;
std::experimental::optional<addr2dc_rack_map> _saved_endpoints;
distributed<snitch_ptr>* _my_distributed = nullptr;
};

View File

@@ -97,6 +97,17 @@ public:
virtual future<> stop() = 0;
// noop by default
virtual future<> pause_io() {
_state = snitch_state::io_paused;
return make_ready_future<>();
};
// noop by default
virtual void resume_io() {
_state = snitch_state::running;
};
// noop by default
virtual future<> start() {
_state = snitch_state::running;
@@ -140,6 +151,8 @@ protected:
enum class snitch_state {
initializing,
running,
io_pausing,
io_paused,
stopping,
stopped
} _state = snitch_state::initializing;