From c989911526ff5474e43ba12e37263b5baa5be393 Mon Sep 17 00:00:00 2001 From: mcilwain Date: Thu, 17 May 2018 15:22:34 -0700 Subject: [PATCH] Batch NORDN pull queue task deletions They were failing because the maximum App Engine task batch size is 1,000, and we currently have more than 4,000 tasks in the pull queue. We keep re-uploading those to NORDN because we're unable to delete the tasks after successful upload, so the leases expire and they get processed again. Also renames TaskEnqueuer to TaskQueueUtils to reflect its newly expanded role. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=197060903 --- .../backup/CommitLogCheckpointAction.java | 6 +-- .../registry/config/RegistryConfig.java | 3 +- .../registry/cron/CommitLogFanoutAction.java | 6 +-- .../google/registry/cron/TldFanoutAction.java | 6 +-- .../registry/dns/ReadDnsQueueAction.java | 6 +-- .../export/BigqueryPollJobAction.java | 17 +++--- .../registry/loadtest/LoadTestAction.java | 7 ++- .../registry/rde/RdeStagingReducer.java | 12 ++--- java/google/registry/rde/RdeUploadAction.java | 6 +-- java/google/registry/tmch/LordnTask.java | 6 +-- .../registry/tmch/NordnUploadAction.java | 4 +- ...{TaskEnqueuer.java => TaskQueueUtils.java} | 31 +++++++++-- .../backup/CommitLogCheckpointActionTest.java | 4 +- .../cron/CommitLogFanoutActionTest.java | 4 +- .../registry/cron/TldFanoutActionTest.java | 4 +- .../registry/dns/ReadDnsQueueActionTest.java | 4 +- .../export/BigqueryPollJobActionTest.java | 12 ++--- .../registry/rde/RdeStagingActionTest.java | 4 +- .../registry/rde/RdeUploadActionTest.java | 4 +- .../registry/tmch/NordnUploadActionTest.java | 4 ++ ...ueuerTest.java => TaskQueueUtilsTest.java} | 52 ++++++++++++++----- 21 files changed, 129 insertions(+), 73 deletions(-) rename java/google/registry/util/{TaskEnqueuer.java => TaskQueueUtils.java} (74%) rename javatests/google/registry/util/{TaskEnqueuerTest.java => TaskQueueUtilsTest.java} (70%) diff --git a/java/google/registry/backup/CommitLogCheckpointAction.java b/java/google/registry/backup/CommitLogCheckpointAction.java index c353e5c37..73fbfb70f 100644 --- a/java/google/registry/backup/CommitLogCheckpointAction.java +++ b/java/google/registry/backup/CommitLogCheckpointAction.java @@ -28,7 +28,7 @@ import google.registry.request.Action; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import javax.inject.Inject; import org.joda.time.DateTime; @@ -56,7 +56,7 @@ public final class CommitLogCheckpointAction implements Runnable { @Inject Clock clock; @Inject CommitLogCheckpointStrategy strategy; - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject CommitLogCheckpointAction() {} @Override @@ -76,7 +76,7 @@ public final class CommitLogCheckpointAction implements Runnable { .entities( checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime())); // Enqueue a diff task between previous and current checkpoints. - taskEnqueuer.enqueue( + taskQueueUtils.enqueue( getQueue(QUEUE_NAME), withUrl(ExportCommitLogDiffAction.PATH) .param(LOWER_CHECKPOINT_TIME_PARAM, lastWrittenTime.toString()) diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java index a3dc8355d..13684ac48 100644 --- a/java/google/registry/config/RegistryConfig.java +++ b/java/google/registry/config/RegistryConfig.java @@ -27,6 +27,7 @@ import com.google.common.net.HostAndPort; import dagger.Module; import dagger.Provides; import google.registry.config.RegistryConfigSettings.AppEngine.ToolsServiceUrl; +import google.registry.util.TaskQueueUtils; import java.lang.annotation.Documented; import java.lang.annotation.Retention; import java.net.URI; @@ -853,7 +854,7 @@ public final class RegistryConfig { *

Note that this uses {@code @Named} instead of {@code @Config} so that it can be used from * the low-level util package, which cannot have a dependency on the config package. * - * @see google.registry.util.TaskEnqueuer + * @see TaskQueueUtils */ @Provides @Named("transientFailureRetries") diff --git a/java/google/registry/cron/CommitLogFanoutAction.java b/java/google/registry/cron/CommitLogFanoutAction.java index 20188935b..6b18aa5f3 100644 --- a/java/google/registry/cron/CommitLogFanoutAction.java +++ b/java/google/registry/cron/CommitLogFanoutAction.java @@ -23,7 +23,7 @@ import google.registry.model.ofy.CommitLogBucket; import google.registry.request.Action; import google.registry.request.Parameter; import google.registry.request.auth.Auth; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Optional; import java.util.Random; import javax.inject.Inject; @@ -40,7 +40,7 @@ public final class CommitLogFanoutAction implements Runnable { private static final Random random = new Random(); - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject @Parameter("endpoint") String endpoint; @Inject @Parameter("queue") String queue; @Inject @Parameter("jitterSeconds") Optional jitterSeconds; @@ -55,7 +55,7 @@ public final class CommitLogFanoutAction implements Runnable { .countdownMillis(jitterSeconds.isPresent() ? random.nextInt((int) SECONDS.toMillis(jitterSeconds.get())) : 0); - taskEnqueuer.enqueue(taskQueue, taskOptions); + taskQueueUtils.enqueue(taskQueue, taskOptions); } } } diff --git a/java/google/registry/cron/TldFanoutAction.java b/java/google/registry/cron/TldFanoutAction.java index c8aba6f16..c6ff7c5b8 100644 --- a/java/google/registry/cron/TldFanoutAction.java +++ b/java/google/registry/cron/TldFanoutAction.java @@ -43,7 +43,7 @@ import google.registry.request.RequestParameters; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Optional; import java.util.Random; import java.util.stream.Stream; @@ -103,7 +103,7 @@ public final class TldFanoutAction implements Runnable { private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject Response response; @Inject @Parameter(ENDPOINT_PARAM) String endpoint; @Inject @Parameter(QUEUE_PARAM) String queue; @@ -144,7 +144,7 @@ public final class TldFanoutAction implements Runnable { } for (String tld : tlds) { TaskOptions taskOptions = createTaskOptions(tld, flowThruParams); - TaskHandle taskHandle = taskEnqueuer.enqueue(taskQueue, taskOptions); + TaskHandle taskHandle = taskQueueUtils.enqueue(taskQueue, taskOptions); outputPayload.append( String.format( "- Task: '%s', tld: '%s', endpoint: '%s'\n", diff --git a/java/google/registry/dns/ReadDnsQueueAction.java b/java/google/registry/dns/ReadDnsQueueAction.java index f1a96c1f9..1ea056e96 100644 --- a/java/google/registry/dns/ReadDnsQueueAction.java +++ b/java/google/registry/dns/ReadDnsQueueAction.java @@ -47,7 +47,7 @@ import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.UnsupportedEncodingException; import java.util.Collection; import java.util.Comparator; @@ -101,7 +101,7 @@ public final class ReadDnsQueueAction implements Runnable { @Inject Clock clock; @Inject DnsQueue dnsQueue; @Inject HashFunction hashFunction; - @Inject TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject ReadDnsQueueAction() {} /** Container for items we pull out of the DNS pull queue and process for fanout. */ @@ -374,7 +374,7 @@ public final class ReadDnsQueueAction implements Runnable { : PublishDnsUpdatesAction.PARAM_DOMAINS, refreshItem.name()); } - taskEnqueuer.enqueue(dnsPublishPushQueue, options); + taskQueueUtils.enqueue(dnsPublishPushQueue, options); } } } diff --git a/java/google/registry/export/BigqueryPollJobAction.java b/java/google/registry/export/BigqueryPollJobAction.java index 7c78daf5e..1174c91d2 100644 --- a/java/google/registry/export/BigqueryPollJobAction.java +++ b/java/google/registry/export/BigqueryPollJobAction.java @@ -33,7 +33,7 @@ import google.registry.request.HttpException.NotModifiedException; import google.registry.request.Payload; import google.registry.request.auth.Auth; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -64,7 +64,7 @@ public class BigqueryPollJobAction implements Runnable { static final Duration POLL_COUNTDOWN = Duration.standardSeconds(20); @Inject Bigquery bigquery; - @Inject TaskEnqueuer enqueuer; + @Inject TaskQueueUtils taskQueueUtils; @Inject @Header(CHAINED_TASK_QUEUE_HEADER) Lazy chainedQueueName; @Inject @Header(PROJECT_ID_HEADER) String projectId; @Inject @Header(JOB_ID_HEADER) String jobId; @@ -84,7 +84,7 @@ public class BigqueryPollJobAction implements Runnable { } catch (ClassNotFoundException | IOException e) { throw new BadRequestException("Cannot deserialize task from payload", e); } - String taskName = enqueuer.enqueue(getQueue(chainedQueueName.get()), task).getName(); + String taskName = taskQueueUtils.enqueue(getQueue(chainedQueueName.get()), task).getName(); logger.infofmt( "Added chained task %s for %s to queue %s: %s", taskName, @@ -127,16 +127,17 @@ public class BigqueryPollJobAction implements Runnable { /** Helper class to enqueue a bigquery poll job. */ public static class BigqueryPollJobEnqueuer { - private final TaskEnqueuer enqueuer; + private final TaskQueueUtils taskQueueUtils; @Inject - BigqueryPollJobEnqueuer(TaskEnqueuer enqueuer) { - this.enqueuer = enqueuer; + BigqueryPollJobEnqueuer(TaskQueueUtils taskQueueUtils) { + this.taskQueueUtils = taskQueueUtils; } /** Enqueue a task to poll for the success or failure of the referenced BigQuery job. */ public TaskHandle enqueuePollTask(JobReference jobRef) { - return enqueuer.enqueue(getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); + return taskQueueUtils.enqueue( + getQueue(QUEUE), createCommonPollTask(jobRef).method(Method.GET)); } /** @@ -148,7 +149,7 @@ public class BigqueryPollJobAction implements Runnable { // Serialize the chainedTask into a byte array to put in the task payload. ByteArrayOutputStream taskBytes = new ByteArrayOutputStream(); new ObjectOutputStream(taskBytes).writeObject(chainedTask); - return enqueuer.enqueue( + return taskQueueUtils.enqueue( getQueue(QUEUE), createCommonPollTask(jobRef) .method(Method.POST) diff --git a/java/google/registry/loadtest/LoadTestAction.java b/java/google/registry/loadtest/LoadTestAction.java index 1d245e644..4161ee76c 100644 --- a/java/google/registry/loadtest/LoadTestAction.java +++ b/java/google/registry/loadtest/LoadTestAction.java @@ -34,7 +34,7 @@ import google.registry.request.Parameter; import google.registry.request.auth.Auth; import google.registry.security.XsrfTokenManager; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -147,8 +147,7 @@ public class LoadTestAction implements Runnable { @Parameter("hostInfos") int hostInfosPerSecond; - @Inject - TaskEnqueuer taskEnqueuer; + @Inject TaskQueueUtils taskQueueUtils; private final String xmlContactCreateTmpl; private final String xmlContactCreateFail; @@ -344,7 +343,7 @@ public class LoadTestAction implements Runnable { List> chunks = partition(tasks, maxTasksPerAdd()); // Farm out tasks to multiple queues to work around queue qps quotas. for (int i = 0; i < chunks.size(); i++) { - taskEnqueuer.enqueue(getQueue("load" + (i % NUM_QUEUES)), chunks.get(i)); + taskQueueUtils.enqueue(getQueue("load" + (i % NUM_QUEUES)), chunks.get(i)); } } } diff --git a/java/google/registry/rde/RdeStagingReducer.java b/java/google/registry/rde/RdeStagingReducer.java index 10da93650..1a99bb962 100644 --- a/java/google/registry/rde/RdeStagingReducer.java +++ b/java/google/registry/rde/RdeStagingReducer.java @@ -44,7 +44,7 @@ import google.registry.request.RequestParameters; import google.registry.request.lock.LockHandler; import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.util.FormattingLogger; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import google.registry.xjc.rdeheader.XjcRdeHeader; import google.registry.xjc.rdeheader.XjcRdeHeaderElement; import google.registry.xml.XmlException; @@ -70,7 +70,7 @@ public final class RdeStagingReducer extends Reducer tasks = queue.leaseTasks(LeaseOptions.Builder .withTag(tld) .leasePeriod(LEASE_PERIOD.getMillis(), TimeUnit.MILLISECONDS) - .countLimit(BATCH_SIZE)); + .countLimit(TaskQueueUtils.getBatchSize())); allTasks.addAll(tasks); if (tasks.isEmpty()) { return allTasks.build(); diff --git a/java/google/registry/tmch/NordnUploadAction.java b/java/google/registry/tmch/NordnUploadAction.java index db05991df..d1442cc95 100644 --- a/java/google/registry/tmch/NordnUploadAction.java +++ b/java/google/registry/tmch/NordnUploadAction.java @@ -41,6 +41,7 @@ import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.FormattingLogger; +import google.registry.util.TaskQueueUtils; import google.registry.util.UrlFetchException; import java.io.IOException; import java.net.URL; @@ -84,6 +85,7 @@ public final class NordnUploadAction implements Runnable { @Inject @Config("tmchMarksdbUrl") String tmchMarksdbUrl; @Inject @Parameter(LORDN_PHASE_PARAM) String phase; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; + @Inject TaskQueueUtils taskQueueUtils; @Inject NordnUploadAction() {} /** @@ -117,7 +119,7 @@ public final class NordnUploadAction implements Runnable { if (!tasks.isEmpty()) { String csvData = convertTasksToCsv(tasks, now, columns); uploadCsvToLordn(String.format("/LORDN/%s/%s", tld, phase), csvData); - queue.deleteTask(tasks); + taskQueueUtils.deleteTasks(queue, tasks); } } diff --git a/java/google/registry/util/TaskEnqueuer.java b/java/google/registry/util/TaskQueueUtils.java similarity index 74% rename from java/google/registry/util/TaskEnqueuer.java rename to java/google/registry/util/TaskQueueUtils.java index 47f5b2466..c70becfc7 100644 --- a/java/google/registry/util/TaskEnqueuer.java +++ b/java/google/registry/util/TaskQueueUtils.java @@ -1,4 +1,4 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// Copyright 2018 The Nomulus Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,12 +20,14 @@ import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.TaskHandle; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransientFailureException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import java.io.Serializable; import java.util.List; import javax.inject.Inject; /** Utilities for dealing with App Engine task queues. */ -public class TaskEnqueuer implements Serializable { +public class TaskQueueUtils implements Serializable { private static final long serialVersionUID = 7893211200220508362L; @@ -34,10 +36,23 @@ public class TaskEnqueuer implements Serializable { private final Retrier retrier; @Inject - public TaskEnqueuer(Retrier retrier) { + public TaskQueueUtils(Retrier retrier) { this.retrier = retrier; } + @NonFinalForTesting + @VisibleForTesting + static int BATCH_SIZE = 1000; + + /** + * The batch size to use for App Engine task queue operations. + * + *

Note that 1,000 is currently the maximum allowable batch size in App Engine. + */ + public static int getBatchSize() { + return BATCH_SIZE; + } + /** * Adds a task to a App Engine task queue in a reliable manner. * @@ -73,4 +88,14 @@ public class TaskEnqueuer implements Serializable { }, TransientFailureException.class); } + + /** Deletes the specified tasks from the queue in batches, with retrying. */ + public void deleteTasks(Queue queue, List tasks) { + Lists.partition(tasks, BATCH_SIZE) + .stream() + .forEach( + batch -> + retrier.callWithRetry( + () -> queue.deleteTask(batch), TransientFailureException.class)); + } } diff --git a/javatests/google/registry/backup/CommitLogCheckpointActionTest.java b/javatests/google/registry/backup/CommitLogCheckpointActionTest.java index 736192ffb..ad7331b6c 100644 --- a/javatests/google/registry/backup/CommitLogCheckpointActionTest.java +++ b/javatests/google/registry/backup/CommitLogCheckpointActionTest.java @@ -31,7 +31,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import org.joda.time.DateTime; import org.junit.Before; import org.junit.Rule; @@ -60,7 +60,7 @@ public class CommitLogCheckpointActionTest { public void before() throws Exception { task.clock = new FakeClock(now); task.strategy = strategy; - task.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + task.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); when(strategy.computeCheckpoint()) .thenReturn( CommitLogCheckpoint.create( diff --git a/javatests/google/registry/cron/CommitLogFanoutActionTest.java b/javatests/google/registry/cron/CommitLogFanoutActionTest.java index 1fda93201..3c4d85fe1 100644 --- a/javatests/google/registry/cron/CommitLogFanoutActionTest.java +++ b/javatests/google/registry/cron/CommitLogFanoutActionTest.java @@ -22,7 +22,7 @@ import google.registry.model.ofy.CommitLogBucket; import google.registry.testing.AppEngineRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -54,7 +54,7 @@ public class CommitLogFanoutActionTest { @Test public void testSuccess() throws Exception { CommitLogFanoutAction action = new CommitLogFanoutAction(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.endpoint = ENDPOINT; action.queue = QUEUE; action.jitterSeconds = Optional.empty(); diff --git a/javatests/google/registry/cron/TldFanoutActionTest.java b/javatests/google/registry/cron/TldFanoutActionTest.java index 265456cc4..e747a95ec 100644 --- a/javatests/google/registry/cron/TldFanoutActionTest.java +++ b/javatests/google/registry/cron/TldFanoutActionTest.java @@ -35,7 +35,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeResponse; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -84,7 +84,7 @@ public class TldFanoutActionTest { action.excludes = params.containsKey("exclude") ? ImmutableSet.copyOf(Splitter.on(',').split(params.get("exclude").get(0))) : ImmutableSet.of(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.response = response; action.runInEmpty = params.containsKey("runInEmpty"); action.forEachRealTld = params.containsKey("forEachRealTld"); diff --git a/javatests/google/registry/dns/ReadDnsQueueActionTest.java b/javatests/google/registry/dns/ReadDnsQueueActionTest.java index 7cbcd12db..44a6f4b0d 100644 --- a/javatests/google/registry/dns/ReadDnsQueueActionTest.java +++ b/javatests/google/registry/dns/ReadDnsQueueActionTest.java @@ -47,7 +47,7 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -122,7 +122,7 @@ public class ReadDnsQueueActionTest { action.dnsQueue = dnsQueue; action.dnsPublishPushQueue = QueueFactory.getQueue(DNS_PUBLISH_PUSH_QUEUE_NAME); action.hashFunction = Hashing.murmur3_32(); - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.jitterSeconds = Optional.empty(); // Advance the time a little, to ensure that leaseTasks() returns all tasks. clock.advanceBy(Duration.standardHours(1)); diff --git a/javatests/google/registry/export/BigqueryPollJobActionTest.java b/javatests/google/registry/export/BigqueryPollJobActionTest.java index 6d7cf01da..4fd643d3a 100644 --- a/javatests/google/registry/export/BigqueryPollJobActionTest.java +++ b/javatests/google/registry/export/BigqueryPollJobActionTest.java @@ -44,7 +44,7 @@ import google.registry.testing.TaskQueueHelper; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.util.CapturingLogHandler; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -71,8 +71,8 @@ public class BigqueryPollJobActionTest { private static final String PROJECT_ID = "project_id"; private static final String JOB_ID = "job_id"; private static final String CHAINED_QUEUE_NAME = UpdateSnapshotViewAction.QUEUE; - private static final TaskEnqueuer ENQUEUER = - new TaskEnqueuer(new Retrier(new FakeSleeper(new FakeClock()), 1)); + private static final TaskQueueUtils TASK_QUEUE_UTILS = + new TaskQueueUtils(new Retrier(new FakeSleeper(new FakeClock()), 1)); private final Bigquery bigquery = mock(Bigquery.class); private final Bigquery.Jobs bigqueryJobs = mock(Bigquery.Jobs.class); @@ -86,7 +86,7 @@ public class BigqueryPollJobActionTest { action.bigquery = bigquery; when(bigquery.jobs()).thenReturn(bigqueryJobs); when(bigqueryJobs.get(PROJECT_ID, JOB_ID)).thenReturn(bigqueryJobsGet); - action.enqueuer = ENQUEUER; + action.taskQueueUtils = TASK_QUEUE_UTILS; action.projectId = PROJECT_ID; action.jobId = JOB_ID; action.chainedQueueName = () -> CHAINED_QUEUE_NAME; @@ -103,7 +103,7 @@ public class BigqueryPollJobActionTest { @Test public void testSuccess_enqueuePollTask() throws Exception { - new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask( + new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID)); assertTasksEnqueued(BigqueryPollJobAction.QUEUE, newPollJobTaskMatcher("GET")); } @@ -115,7 +115,7 @@ public class BigqueryPollJobActionTest { .method(Method.POST) .header("X-Testing", "foo") .param("testing", "bar"); - new BigqueryPollJobEnqueuer(ENQUEUER).enqueuePollTask( + new BigqueryPollJobEnqueuer(TASK_QUEUE_UTILS).enqueuePollTask( new JobReference().setProjectId(PROJECT_ID).setJobId(JOB_ID), chainedTask, getQueue(CHAINED_QUEUE_NAME)); diff --git a/javatests/google/registry/rde/RdeStagingActionTest.java b/javatests/google/registry/rde/RdeStagingActionTest.java index b947eaf18..fe581ba94 100644 --- a/javatests/google/registry/rde/RdeStagingActionTest.java +++ b/javatests/google/registry/rde/RdeStagingActionTest.java @@ -64,7 +64,7 @@ import google.registry.testing.mapreduce.MapreduceTestCase; import google.registry.tldconfig.idn.IdnTableEnum; import google.registry.util.Retrier; import google.registry.util.SystemSleeper; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import google.registry.xjc.XjcXmlTransformer; import google.registry.xjc.rde.XjcRdeContentType; import google.registry.xjc.rde.XjcRdeDeposit; @@ -132,7 +132,7 @@ public class RdeStagingActionTest extends MapreduceTestCase { action.mrRunner = makeDefaultRunner(); action.lenient = false; action.reducer = new RdeStagingReducer( - new TaskEnqueuer(new Retrier(new SystemSleeper(), 1)), // taskEnqueuer + new TaskQueueUtils(new Retrier(new SystemSleeper(), 1)), // taskQueueUtils new FakeLockHandler(true), 0, // gcsBufferSize "rde-bucket", // bucket diff --git a/javatests/google/registry/rde/RdeUploadActionTest.java b/javatests/google/registry/rde/RdeUploadActionTest.java index d85ac7400..ab4c84d7e 100644 --- a/javatests/google/registry/rde/RdeUploadActionTest.java +++ b/javatests/google/registry/rde/RdeUploadActionTest.java @@ -72,7 +72,7 @@ import google.registry.testing.Lazies; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.sftp.SftpServerRule; import google.registry.util.Retrier; -import google.registry.util.TaskEnqueuer; +import google.registry.util.TaskQueueUtils; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -208,7 +208,7 @@ public class RdeUploadActionTest { action.stagingDecryptionKey = keyring.getRdeStagingDecryptionKey(); action.reportQueue = QueueFactory.getQueue("rde-report"); action.runner = runner; - action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); + action.taskQueueUtils = new TaskQueueUtils(new Retrier(null, 1)); action.retrier = new Retrier(new FakeSleeper(clock), 3); return action; } diff --git a/javatests/google/registry/tmch/NordnUploadActionTest.java b/javatests/google/registry/tmch/NordnUploadActionTest.java index 6124afdad..0a704686a 100644 --- a/javatests/google/registry/tmch/NordnUploadActionTest.java +++ b/javatests/google/registry/tmch/NordnUploadActionTest.java @@ -47,9 +47,12 @@ import google.registry.model.ofy.Ofy; import google.registry.model.registry.Registry; import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; +import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.util.Retrier; +import google.registry.util.TaskQueueUtils; import google.registry.util.UrlFetchException; import java.net.URL; import java.util.Optional; @@ -109,6 +112,7 @@ public class NordnUploadActionTest { action.fetchService = fetchService; action.lordnRequestInitializer = lordnRequestInitializer; action.phase = "claims"; + action.taskQueueUtils = new TaskQueueUtils(new Retrier(new FakeSleeper(clock), 3)); action.tld = "tld"; action.tmchMarksdbUrl = "http://127.0.0.1"; } diff --git a/javatests/google/registry/util/TaskEnqueuerTest.java b/javatests/google/registry/util/TaskQueueUtilsTest.java similarity index 70% rename from javatests/google/registry/util/TaskEnqueuerTest.java rename to javatests/google/registry/util/TaskQueueUtilsTest.java index 91bcc9cb9..68bbd7a16 100644 --- a/javatests/google/registry/util/TaskEnqueuerTest.java +++ b/javatests/google/registry/util/TaskQueueUtilsTest.java @@ -1,4 +1,4 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// Copyright 2018 The Nomulus Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,12 +17,14 @@ package google.registry.util; import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static com.google.common.truth.Truth.assertThat; import static google.registry.testing.JUnitBackports.assertThrows; +import static google.registry.testing.TaskQueueHelper.getQueueInfo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.appengine.api.taskqueue.Queue; +import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.TaskHandle; import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransientFailureException; @@ -31,33 +33,39 @@ import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Unit tests for {@link TaskEnqueuer}. */ +/** Unit tests for {@link TaskQueueUtils}. */ @RunWith(JUnit4.class) -public final class TaskEnqueuerTest { +public final class TaskQueueUtilsTest { private static final int MAX_RETRIES = 3; + @Rule - public final AppEngineRule appEngine = AppEngineRule.builder() - .withDatastore() - .build(); + public final AppEngineRule appEngine = + AppEngineRule.builder().withDatastore().withTaskQueue().build(); private final FakeClock clock = new FakeClock(DateTime.parse("2000-01-01TZ")); private final FakeSleeper sleeper = new FakeSleeper(clock); - private final TaskEnqueuer taskEnqueuer = - new TaskEnqueuer(new Retrier(sleeper, MAX_RETRIES)); + private final TaskQueueUtils taskQueueUtils = + new TaskQueueUtils(new Retrier(sleeper, MAX_RETRIES)); private final Queue queue = mock(Queue.class); private final TaskOptions task = withUrl("url").taskName("name"); private final TaskHandle handle = new TaskHandle(task, "handle"); + @Before + public void before() { + TaskQueueUtils.BATCH_SIZE = 2; + } + @Test public void testEnqueue_worksOnFirstTry_doesntSleep() throws Exception { when(queue.add(ImmutableList.of(task))).thenReturn(ImmutableList.of(handle)); - assertThat(taskEnqueuer.enqueue(queue, task)).isSameAs(handle); + assertThat(taskQueueUtils.enqueue(queue, task)).isSameAs(handle); verify(queue).add(ImmutableList.of(task)); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); } @@ -68,7 +76,7 @@ public final class TaskEnqueuerTest { .thenThrow(new TransientFailureException("")) .thenThrow(new TransientFailureException("")) .thenReturn(ImmutableList.of(handle)); - assertThat(taskEnqueuer.enqueue(queue, task)).isSameAs(handle); + assertThat(taskQueueUtils.enqueue(queue, task)).isSameAs(handle); verify(queue, times(3)).add(ImmutableList.of(task)); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01T00:00:00.6Z")); // 200 + 400ms } @@ -80,7 +88,7 @@ public final class TaskEnqueuerTest { ImmutableList handles = ImmutableList.of(new TaskHandle(taskA, "a"), new TaskHandle(taskB, "b")); when(queue.add(ImmutableList.of(taskA, taskB))).thenReturn(handles); - assertThat(taskEnqueuer.enqueue(queue, ImmutableList.of(taskA, taskB))).isSameAs(handles); + assertThat(taskQueueUtils.enqueue(queue, ImmutableList.of(taskA, taskB))).isSameAs(handles); assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); } @@ -92,7 +100,7 @@ public final class TaskEnqueuerTest { .thenThrow(new TransientFailureException("three")) .thenThrow(new TransientFailureException("four")); TransientFailureException thrown = - assertThrows(TransientFailureException.class, () -> taskEnqueuer.enqueue(queue, task)); + assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); assertThat(thrown).hasMessageThat().contains("three"); } @@ -101,9 +109,27 @@ public final class TaskEnqueuerTest { when(queue.add(ImmutableList.of(task))).thenThrow(new TransientFailureException("")); try { Thread.currentThread().interrupt(); - assertThrows(TransientFailureException.class, () -> taskEnqueuer.enqueue(queue, task)); + assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); } finally { Thread.interrupted(); // Clear interrupt state so it doesn't pwn other tests. } } + + @Test + public void testDeleteTasks_usesMultipleBatches() { + Queue defaultQ = QueueFactory.getQueue("default"); + TaskOptions taskOptA = withUrl("/a").taskName("a"); + TaskOptions taskOptB = withUrl("/b").taskName("b"); + TaskOptions taskOptC = withUrl("/c").taskName("c"); + taskQueueUtils.enqueue(defaultQ, ImmutableList.of(taskOptA, taskOptB, taskOptC)); + assertThat(getQueueInfo("default").getTaskInfo()).hasSize(3); + + taskQueueUtils.deleteTasks( + defaultQ, + ImmutableList.of( + new TaskHandle(taskOptA, "default"), + new TaskHandle(taskOptB, "default"), + new TaskHandle(taskOptC, "default"))); + assertThat(getQueueInfo("default").getTaskInfo()).hasSize(0); + } }