Merge "Futurize all callers of add_local_application_state" from Asias

"In 5e8037b50a (gossip: Futurize
add_local_application_state()) , we futurized add_local_application_state.
However, not all of the callers are futurized. Fix it up."
This commit is contained in:
Avi Kivity
2015-11-02 10:22:01 +02:00
9 changed files with 40 additions and 38 deletions

View File

@@ -1270,12 +1270,12 @@ future<> gossiper::start(int generation_nbr, std::map<application_state, version
}
//notify snitches that Gossiper is about to start
locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting();
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
_enabled = true;
_scheduled_gossip_task.arm(INTERVAL);
return make_ready_future<>();
return locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting().then([this, &local_state] {
logger.trace("gossip started with generation {}", local_state.get_heart_beat_state().get_generation());
_enabled = true;
_scheduled_gossip_task.arm(INTERVAL);
return make_ready_future<>();
});
});
}

View File

@@ -96,7 +96,7 @@ void ec2_multi_region_snitch::set_local_private_addr(const sstring& addr_str) {
_local_private_address = addr_str;
}
void ec2_multi_region_snitch::gossiper_starting() {
future<> ec2_multi_region_snitch::gossiper_starting() {
//
// Note: currently gossiper "main" instance always runs on CPU0 therefore
// this function will be executed on CPU0 only.
@@ -107,13 +107,14 @@ void ec2_multi_region_snitch::gossiper_starting() {
auto& g = get_local_gossiper();
auto& ss = service::get_local_storage_service();
g.add_local_application_state(application_state::INTERNAL_IP,
ss.value_factory.internal_ip(_local_private_address));
return g.add_local_application_state(application_state::INTERNAL_IP,
ss.value_factory.internal_ip(_local_private_address)).then([this] {
if (!_helper_added) {
gms::get_local_gossiper().register_(make_shared<reconnectable_snitch_helper>(_my_dc));
_helper_added = true;
}
});
if (!_helper_added) {
gms::get_local_gossiper().register_(make_shared<reconnectable_snitch_helper>(_my_dc));
_helper_added = true;
}
}
using registry_2_params = class_registrator<i_endpoint_snitch, ec2_multi_region_snitch, const sstring&, unsigned>;

View File

@@ -44,7 +44,7 @@ namespace locator {
class ec2_multi_region_snitch : public ec2_snitch {
public:
ec2_multi_region_snitch(const sstring& fname = "", unsigned io_cpu_id = 0);
virtual void gossiper_starting() override;
virtual future<> gossiper_starting() override;
virtual future<> start() override;
virtual void set_local_private_addr(const sstring& addr_str) override;
private:

View File

@@ -126,7 +126,7 @@ void gossiping_property_file_snitch::periodic_reader_callback() {
});
}
void gossiping_property_file_snitch::gossiper_starting() {
future<> gossiping_property_file_snitch::gossiper_starting() {
using namespace gms;
using namespace service;
//
@@ -141,11 +141,11 @@ void gossiping_property_file_snitch::gossiper_starting() {
ostrm<<local_internal_addr<<std::flush;
g.add_local_application_state(application_state::INTERNAL_IP,
ss.value_factory.internal_ip(ostrm.str()));
_gossip_started = true;
reload_gossiper_state();
return g.add_local_application_state(application_state::INTERNAL_IP,
ss.value_factory.internal_ip(ostrm.str())).then([this] {
_gossip_started = true;
reload_gossiper_state();
});
}
future<> gossiping_property_file_snitch::read_property_file() {

View File

@@ -66,7 +66,7 @@ public:
return std::chrono::seconds(60);
}
virtual void gossiper_starting() override;
virtual future<> gossiper_starting() override;
virtual future<> stop() override;
virtual future<> start() override;
virtual future<> pause_io() override;

View File

@@ -102,7 +102,7 @@ public:
* called after Gossiper instance exists immediately before it starts
* gossiping
*/
virtual void gossiper_starting() = 0;
virtual future<> gossiper_starting() = 0;
/**
* Returns whether for a range query doing a query against merged is likely
@@ -407,7 +407,7 @@ public:
inet_address& address, inet_address& a1, inet_address& a2) override;
// noop by default
virtual void gossiper_starting() override {}
virtual future<> gossiper_starting() override { return make_ready_future<>(); }
virtual bool is_worth_merging_for_range_query(
std::vector<inet_address>& merged,

View File

@@ -60,10 +60,13 @@ void load_broadcaster::start_broadcasting() {
res += i.second->get_stats().live_disk_space_used;
}
return res;
}, 0, std::plus<int64_t>()).then([this](int64_t size) {
}, 0, std::plus<int64_t>()).then([this] (int64_t size) {
gms::versioned_value::factory value_factory;
_gossiper.add_local_application_state(gms::application_state::LOAD, value_factory.load(size));
_timer.arm(BROADCAST_INTERVAL);
return _gossiper.add_local_application_state(gms::application_state::LOAD,
value_factory.load(size)).then([this] {
_timer.arm(BROADCAST_INTERVAL);
return make_ready_future<>();
});
});
});

View File

@@ -493,13 +493,11 @@ future<> migration_manager::announce(std::vector<mutation> schema) {
*
* @param version The schema version to announce
*/
future<> migration_manager::passive_announce(utils::UUID version)
{
future<> migration_manager::passive_announce(utils::UUID version) {
return gms::get_gossiper().invoke_on(0, [version] (auto&& gossiper) {
auto& ss = service::get_local_storage_service();
gossiper.add_local_application_state(gms::application_state::SCHEMA, ss.value_factory.schema(version));
logger.debug("Gossiping my schema version {}", version);
return make_ready_future<>();
return gossiper.add_local_application_state(gms::application_state::SCHEMA, ss.value_factory.schema(version));
});
}

View File

@@ -30,6 +30,7 @@
#include "utils/fb_utilities.hh"
#include "locator/snitch_base.hh"
#include "log.hh"
#include <seastar/core/thread.hh>
#include <chrono>
namespace bpo = boost::program_options;
@@ -78,16 +79,15 @@ int main(int ac, char ** av) {
int generation_number = duration_cast<seconds>(now).count();
return gossiper.start(generation_number, app_states);
}).then([vv] {
auto app_state_adder = std::make_shared<timer<lowres_clock>>();
app_state_adder->set_callback ([vv, app_state_adder] {
return seastar::async([vv] {
static double load = 0.5;
auto& gossiper = gms::get_local_gossiper();
auto state = gms::application_state::LOAD;
auto value = vv->load(load);
gossiper.add_local_application_state(state, value);
load += 0.0001;
for (;;) {
auto value = vv->load(load);
load += 0.0001;
gms::get_local_gossiper().add_local_application_state(gms::application_state::LOAD, value).get();
sleep(std::chrono::seconds(1)).get();
}
});
app_state_adder->arm_periodic(std::chrono::seconds(1));
});
});
}