diff --git a/service/storage_service.cc b/service/storage_service.cc index a1ea371d6e..0c88820ad6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1392,4 +1392,80 @@ future<> storage_service::remove_node(sstring host_id_string) { return make_ready_future<>(); } +future<> storage_service::drain() { +#if 0 + ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); + ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); + if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) + { + logger.warn("Cannot drain node (did it already happen?)"); + return; + } + setMode(Mode.DRAINING, "starting drain process", true); + shutdownClientServers(); + ScheduledExecutors.optionalTasks.shutdown(); + Gossiper.instance.stop(); + + setMode(Mode.DRAINING, "shutting down MessageService", false); + MessagingService.instance().shutdown(); + + setMode(Mode.DRAINING, "clearing mutation stage", false); + counterMutationStage.shutdown(); + mutationStage.shutdown(); + counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + mutationStage.awaitTermination(3600, TimeUnit.SECONDS); + + StorageProxy.instance.verifyNoHintsInProgress(); + + setMode(Mode.DRAINING, "flushing column families", false); + // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty + totalCFs = 0; + for (Keyspace keyspace : Keyspace.nonSystem()) + totalCFs += keyspace.getColumnFamilyStores().size(); + remainingCFs = totalCFs; + // flush + List> flushes = new ArrayList<>(); + for (Keyspace keyspace : Keyspace.nonSystem()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + // wait for the flushes. + // TODO this is a godawful way to track progress, since they flush in parallel. a long one could + // thus make several short ones "instant" if we wait for them later. + for (Future f : flushes) + { + FBUtilities.waitOnFuture(f); + remainingCFs--; + } + // flush the system ones after all the rest are done, just in case flushing modifies any system state + // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. + flushes.clear(); + for (Keyspace keyspace : Keyspace.system()) + { + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + FBUtilities.waitOnFutures(flushes); + + BatchlogManager.shutdown(); + + // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure + // there are no segments to replay, so we force the recycling of any remaining (should be at most one) + CommitLog.instance.forceRecycleAllSegments(); + + ColumnFamilyStore.shutdownPostFlushExecutor(); + + CommitLog.instance.shutdownBlocking(); + + // wait for miscellaneous tasks like sstable and commitlog segment deletion + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) + logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); + + setMode(Mode.DRAINED, true); +#endif + return make_ready_future<>(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index e3d0f78a3a..a3fee531fd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -2502,6 +2502,7 @@ public: { return String.format("Drained %s/%s ColumnFamilies", remainingCFs, totalCFs); } +#endif /** * Shuts node off to writes, empties memtables and the commit log. @@ -2509,80 +2510,9 @@ public: * - Drain waits for in-progress streaming to complete * - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs) */ - public synchronized void drain() throws IOException, InterruptedException, ExecutionException - { - ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); - ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) - { - logger.warn("Cannot drain node (did it already happen?)"); - return; - } - setMode(Mode.DRAINING, "starting drain process", true); - shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); - Gossiper.instance.stop(); - - setMode(Mode.DRAINING, "shutting down MessageService", false); - MessagingService.instance().shutdown(); - - setMode(Mode.DRAINING, "clearing mutation stage", false); - counterMutationStage.shutdown(); - mutationStage.shutdown(); - counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - mutationStage.awaitTermination(3600, TimeUnit.SECONDS); - - StorageProxy.instance.verifyNoHintsInProgress(); - - setMode(Mode.DRAINING, "flushing column families", false); - // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty - totalCFs = 0; - for (Keyspace keyspace : Keyspace.nonSystem()) - totalCFs += keyspace.getColumnFamilyStores().size(); - remainingCFs = totalCFs; - // flush - List> flushes = new ArrayList<>(); - for (Keyspace keyspace : Keyspace.nonSystem()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - // wait for the flushes. - // TODO this is a godawful way to track progress, since they flush in parallel. a long one could - // thus make several short ones "instant" if we wait for them later. - for (Future f : flushes) - { - FBUtilities.waitOnFuture(f); - remainingCFs--; - } - // flush the system ones after all the rest are done, just in case flushing modifies any system state - // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. - flushes.clear(); - for (Keyspace keyspace : Keyspace.system()) - { - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - flushes.add(cfs.forceFlush()); - } - FBUtilities.waitOnFutures(flushes); - - BatchlogManager.shutdown(); - - // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure - // there are no segments to replay, so we force the recycling of any remaining (should be at most one) - CommitLog.instance.forceRecycleAllSegments(); - - ColumnFamilyStore.shutdownPostFlushExecutor(); - - CommitLog.instance.shutdownBlocking(); - - // wait for miscellaneous tasks like sstable and commitlog segment deletion - ScheduledExecutors.nonPeriodicTasks.shutdown(); - if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) - logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); - - setMode(Mode.DRAINED, true); - } + future<> drain(); +#if 0 // Never ever do this at home. Used by tests. IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) {