Merge "storage service updates" from Asias

This commit is contained in:
Avi Kivity
2015-06-16 12:52:45 +03:00
3 changed files with 40 additions and 27 deletions

View File

@@ -26,6 +26,7 @@
#include "core/shared_ptr.hh"
#include "core/print.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "gms/i_failure_detection_event_listener.hh"
#include "gms/versioned_value.hh"
#include "gms/application_state.hh"
@@ -91,8 +92,7 @@ private:
std::set<inet_address> _seeds_from_config;
public:
inet_address get_broadcast_address() {
// FIXME: Helper for FBUtilities.getBroadcastAddress
return ms().listen_address();
return utils::fb_utilities::get_broadcast_address();
}
std::set<inet_address> get_seeds() {
// FIXME: DatabaseDescriptor.getSeeds()

View File

@@ -4,6 +4,7 @@
#include "storage_service.hh"
#include "core/distributed.hh"
#include "locator/snitch_base.hh"
namespace service {
@@ -69,20 +70,21 @@ future<> storage_service::prepare_to_join() {
gms::get_local_gossiper().debug_show();
_token_metadata.debug_show();
#endif
});
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
#if 0
// gossip snitch infos (local DC and rack)
gossipSnitchInfo();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
#endif
});
}
return make_ready_future<>();
}
@@ -933,4 +935,24 @@ void storage_service::replicate_to_all_cores() {
});
}
void storage_service::gossip_snitch_info() {
// FIXME: get a snitch_ptr
#if 0
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
locator::snitch_ptr snitch;
auto addr = get_broadcast_address();
auto dc = snitch->get_datacenter(addr);
auto rack = snitch->get_rack(addr);
#endif
auto dc = "rack1";
auto rack = "datacenter1";
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::DC, value_factory.datacenter(dc));
gossiper.add_local_application_state(gms::application_state::RACK, value_factory.rack(rack));
}
future<> storage_service::stop() {
return make_ready_future<>();
}
} // namespace service

View File

@@ -33,6 +33,7 @@
#include "gms/application_state.hh"
#include "db/system_keyspace.hh"
#include "core/semaphore.hh"
#include "utils/fb_utilities.hh"
namespace service {
@@ -65,6 +66,9 @@ class storage_service : public gms::i_endpoint_state_change_subscriber
public:
static int RING_DELAY; // delay after which we assume ring has stablized
// Needed by distributed<>
future<> stop();
const locator::token_metadata& get_token_metadata() const {
return _token_metadata;
}
@@ -73,14 +77,11 @@ public:
return _token_metadata;
}
void gossip_snitch_info() {
// TODO
}
void gossip_snitch_info();
private:
inet_address get_broadcast_address() {
auto& gossiper = gms::get_local_gossiper();
return gossiper.get_broadcast_address();
return utils::fb_utilities::get_broadcast_address();
}
static int get_ring_delay() {
#if 0
@@ -421,16 +422,6 @@ private:
bool should_bootstrap();
future<> prepare_to_join();
future<> join_token_ring(int delay);
#if 0
public void gossipSnitchInfo()
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
String rack = snitch.getRack(FBUtilities.getBroadcastAddress());
Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
}
#endif
public:
void join_ring();
bool is_joined() {