service/migration_manager: Convert scheduleSchemaPull() to C++

Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
Pekka Enberg
2015-07-09 11:19:23 +03:00
parent e361f2a436
commit f1d5b9c4ae
2 changed files with 58 additions and 47 deletions

View File

@@ -26,12 +26,16 @@
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "service/migration_task.hh"
#include "utils/runtime.hh"
#include "gms/gossiper.hh"
namespace service {
static thread_local logging::logger logger("Migration Manager");
const int32_t migration_manager::MIGRATION_DELAY_IN_MS = 60000;
#if 0
public void register(IMigrationListener listener)
{
@@ -42,83 +46,78 @@ public void unregister(IMigrationListener listener)
{
listeners.remove(listener);
}
#endif
public void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
future<> migration_manager::schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state)
{
VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
auto& proxy = service::get_local_storage_proxy();
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
const auto& value = state.get_application_state(gms::application_state::SCHEMA);
if (endpoint != utils::fb_utilities::get_broadcast_address() && value) {
return maybe_schedule_schema_pull(proxy, utils::UUID{value->value}, endpoint);
}
return make_ready_future<>();
}
/**
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
future<> migration_manager::maybe_schedule_schema_pull(storage_proxy& proxy, const utils::UUID& their_version, const gms::inet_address& endpoint)
{
if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
{
auto& db = proxy.get_db().local();
if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) {
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
return make_ready_future<>();
}
if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
if (db.get_version() == database::empty_version || runtime::get_uptime() < MIGRATION_DELAY_IN_MS) {
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
}
else
{
return submit_migration_task(proxy, endpoint);
} else {
// Include a delay to make sure we have a chance to apply any changes being
// pushed out simultaneously. See CASSANDRA-5025
Runnable runnable = new Runnable()
{
public void run()
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (epState == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
UUID currentVersion = UUID.fromString(value.value);
if (Schema.instance.getVersion().equals(currentVersion))
{
logger.debug("not submitting migration task for {} because our versions match", endpoint);
return;
}
logger.debug("submitting migration task for {}", endpoint);
submitMigrationTask(endpoint);
return sleep(std::chrono::milliseconds(MIGRATION_DELAY_IN_MS)).then([&proxy, endpoint] {
// grab the latest version of the schema since it may have changed again since the initial scheduling
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!ep_state) {
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return make_ready_future<>();
}
};
ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
const auto& value = ep_state->get_application_state(gms::application_state::SCHEMA);
utils::UUID current_version{value->value};
auto& db = proxy.get_db().local();
if (db.get_version() == current_version) {
logger.debug("not submitting migration task for {} because our versions match", endpoint);
return make_ready_future<>();
}
logger.debug("submitting migration task for {}", endpoint);
return submit_migration_task(proxy, endpoint);
});
}
}
private static Future<?> submitMigrationTask(InetAddress endpoint)
future<> migration_manager::submit_migration_task(service::storage_proxy& proxy, const gms::inet_address& endpoint)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
* running in the gossip stage.
*/
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
return service::migration_task::run_may_throw(proxy, endpoint);
}
private static boolean shouldPullSchemaFrom(InetAddress endpoint)
bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint);
auto& ms = net::get_local_messaging_service();
return ms.knows_version(endpoint)
&& ms.get_raw_version(endpoint) == net::messaging_service::current_version
&& !gms::get_gossiper().local().is_gossip_only_member(endpoint);
}
#if 0
public static boolean isReadyForBootstrap()
{
return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;

View File

@@ -26,7 +26,9 @@
#include "db/legacy_schema_tables.hh"
#include "gms/endpoint_state.hh"
#include "gms/inet_address.hh"
#include "utils/UUID.hh"
#include <vector>
@@ -39,14 +41,24 @@ class migration_manager {
public static final MigrationManager instance = new MigrationManager();
private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
#endif
public static final int MIGRATION_DELAY_IN_MS = 60000;
static const int32_t MIGRATION_DELAY_IN_MS;
#if 0
private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>();
private MigrationManager() {}
#endif
public:
static future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
static future<> maybe_schedule_schema_pull(service::storage_proxy& proxy, const utils::UUID& their_version, const gms::inet_address& endpoint);
static future<> submit_migration_task(service::storage_proxy& proxy, const gms::inet_address& endpoint);
static bool should_pull_schema_from(const gms::inet_address& endpoint);
static future<> announce_new_keyspace(distributed<service::storage_proxy>& proxy, lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally = false);
static future<> announce_new_keyspace(distributed<service::storage_proxy>& proxy, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp, bool announce_locally);