storage_service: Stub drain

Needed by API.
This commit is contained in:
Asias He
2015-08-28 13:13:47 +08:00
parent ea691397f6
commit d7736062ef
2 changed files with 79 additions and 73 deletions

View File

@@ -1392,4 +1392,80 @@ future<> storage_service::remove_node(sstring host_id_string) {
return make_ready_future<>();
}
future<> storage_service::drain() {
#if 0
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
return;
}
setMode(Mode.DRAINING, "starting drain process", true);
shutdownClientServers();
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
setMode(Mode.DRAINING, "shutting down MessageService", false);
MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "clearing mutation stage", false);
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
setMode(Mode.DRAINING, "flushing column families", false);
// count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
totalCFs = 0;
for (Keyspace keyspace : Keyspace.nonSystem())
totalCFs += keyspace.getColumnFamilyStores().size();
remainingCFs = totalCFs;
// flush
List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
// wait for the flushes.
// TODO this is a godawful way to track progress, since they flush in parallel. a long one could
// thus make several short ones "instant" if we wait for them later.
for (Future f : flushes)
{
FBUtilities.waitOnFuture(f);
remainingCFs--;
}
// flush the system ones after all the rest are done, just in case flushing modifies any system state
// like CASSANDRA-5151. don't bother with progress tracking since system data is tiny.
flushes.clear();
for (Keyspace keyspace : Keyspace.system())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
FBUtilities.waitOnFutures(flushes);
BatchlogManager.shutdown();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
CommitLog.instance.forceRecycleAllSegments();
ColumnFamilyStore.shutdownPostFlushExecutor();
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
setMode(Mode.DRAINED, true);
#endif
return make_ready_future<>();
}
} // namespace service

View File

@@ -2502,6 +2502,7 @@ public:
{
return String.format("Drained %s/%s ColumnFamilies", remainingCFs, totalCFs);
}
#endif
/**
* Shuts node off to writes, empties memtables and the commit log.
@@ -2509,80 +2510,9 @@ public:
* - Drain waits for in-progress streaming to complete
* - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs)
*/
public synchronized void drain() throws IOException, InterruptedException, ExecutionException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
return;
}
setMode(Mode.DRAINING, "starting drain process", true);
shutdownClientServers();
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
setMode(Mode.DRAINING, "shutting down MessageService", false);
MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "clearing mutation stage", false);
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
setMode(Mode.DRAINING, "flushing column families", false);
// count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
totalCFs = 0;
for (Keyspace keyspace : Keyspace.nonSystem())
totalCFs += keyspace.getColumnFamilyStores().size();
remainingCFs = totalCFs;
// flush
List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonSystem())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
// wait for the flushes.
// TODO this is a godawful way to track progress, since they flush in parallel. a long one could
// thus make several short ones "instant" if we wait for them later.
for (Future f : flushes)
{
FBUtilities.waitOnFuture(f);
remainingCFs--;
}
// flush the system ones after all the rest are done, just in case flushing modifies any system state
// like CASSANDRA-5151. don't bother with progress tracking since system data is tiny.
flushes.clear();
for (Keyspace keyspace : Keyspace.system())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
FBUtilities.waitOnFutures(flushes);
BatchlogManager.shutdown();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
CommitLog.instance.forceRecycleAllSegments();
ColumnFamilyStore.shutdownPostFlushExecutor();
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
setMode(Mode.DRAINED, true);
}
future<> drain();
#if 0
// Never ever do this at home. Used by tests.
IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
{