From ded40851d3a17b029704ef468a920ab330b17e3c Mon Sep 17 00:00:00 2001 From: mcilwain Date: Tue, 24 Jul 2018 14:03:24 -0700 Subject: [PATCH] Use locking on async mapreduces This ensures that only one will run at a time, which should help fix the clogged up mapreduces we've seen on sandbox. In order to do this, the UnlockerOutput is introduced. This unlocks the given Lock after all reducer shards have finished. Also increases the lease duration of the DNS refresh action from 20 to 240 minutes. 20 minutes isn't long enough; when there's a lot of domains and decent system load the mapreduce could take longer than that in the ordinary case. TESTED=Deployed to alpha and verified that more than one copy of the mapreduce wouldn't run simultaneously, and also that the lock is released when the mapreduce is finished. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=205887554 --- .../batch/DeleteContactsAndHostsAction.java | 87 +++++++++++----- .../batch/RefreshDnsOnHostRenameAction.java | 98 ++++++++++++++----- java/google/registry/mapreduce/BUILD | 1 + .../registry/mapreduce/UnlockerOutput.java | 65 ++++++++++++ .../model/domain/DesignatedContact.java | 4 +- .../registry/model/domain/DomainBase.java | 12 +-- java/google/registry/model/server/Lock.java | 16 +-- .../request/lock/LockHandlerImpl.java | 2 +- .../DeleteContactsAndHostsActionTest.java | 83 ++++++++++++++-- .../RefreshDnsOnHostRenameActionTest.java | 68 ++++++++++++- .../registry/model/server/LockTest.java | 13 +-- 11 files changed, 360 insertions(+), 89 deletions(-) create mode 100644 java/google/registry/mapreduce/UnlockerOutput.java diff --git a/java/google/registry/batch/DeleteContactsAndHostsAction.java b/java/google/registry/batch/DeleteContactsAndHostsAction.java index f5f532698..2bbd1bb88 100644 --- a/java/google/registry/batch/DeleteContactsAndHostsAction.java +++ b/java/google/registry/batch/DeleteContactsAndHostsAction.java @@ -42,7 +42,10 @@ import static google.registry.model.reporting.HistoryEntry.Type.HOST_DELETE_FAIL import static google.registry.util.PipelineUtils.createJobPath; import static java.math.RoundingMode.CEILING; import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; +import static org.joda.time.Duration.standardHours; import com.google.appengine.api.taskqueue.LeaseOptions; import com.google.appengine.api.taskqueue.Queue; @@ -65,6 +68,7 @@ import google.registry.flows.async.AsyncFlowMetrics; import google.registry.flows.async.AsyncFlowMetrics.OperationResult; import google.registry.flows.async.AsyncFlowMetrics.OperationType; import google.registry.mapreduce.MapreduceRunner; +import google.registry.mapreduce.UnlockerOutput; import google.registry.mapreduce.inputs.EppResourceInputs; import google.registry.mapreduce.inputs.NullInput; import google.registry.model.EppResource; @@ -80,21 +84,26 @@ import google.registry.model.poll.PendingActionNotificationResponse.ContactPendi import google.registry.model.poll.PendingActionNotificationResponse.HostPendingActionNotificationResponse; import google.registry.model.poll.PollMessage; import google.registry.model.reporting.HistoryEntry; +import google.registry.model.server.Lock; import google.registry.model.transfer.TransferStatus; import google.registry.request.Action; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.NonFinalForTesting; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.SystemClock; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.logging.Level; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import org.joda.time.DateTime; +import org.joda.time.Duration; /** * A mapreduce that processes batch asynchronous deletions of contact and host resources by mapping @@ -110,7 +119,7 @@ public class DeleteContactsAndHostsAction implements Runnable { static final String KIND_CONTACT = getKind(ContactResource.class); static final String KIND_HOST = getKind(HostResource.class); - private static final long LEASE_MINUTES = 120; + private static final Duration LEASE_LENGTH = standardHours(4); private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int MAX_REDUCE_SHARDS = 50; private static final int DELETES_PER_SHARD = 5; @@ -119,20 +128,40 @@ public class DeleteContactsAndHostsAction implements Runnable { @Inject Clock clock; @Inject MapreduceRunner mrRunner; @Inject @Named(QUEUE_ASYNC_DELETE) Queue queue; + @Inject RequestStatusChecker requestStatusChecker; @Inject Response response; @Inject Retrier retrier; @Inject DeleteContactsAndHostsAction() {} @Override public void run() { - LeaseOptions options = - LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES); - List tasks = queue.leaseTasks(options); - asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size()); - if (tasks.isEmpty()) { - response.setPayload("No contact/host deletion tasks in pull queue."); + // Check if the lock can be acquired, and if not, a previous run of this mapreduce is still + // executing, so return early. + Optional lock = + Lock.acquire( + DeleteContactsAndHostsAction.class.getSimpleName(), + null, + LEASE_LENGTH, + requestStatusChecker, + false); + if (!lock.isPresent()) { + logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock); return; } + + // Lease the async tasks to process. + LeaseOptions options = + LeaseOptions.Builder.withCountLimit(maxLeaseCount()) + .leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS); + List tasks = queue.leaseTasks(options); + asyncFlowMetrics.recordContactHostDeletionBatchSize(tasks.size()); + + // Check if there are no tasks to process, and if so, return early. + if (tasks.isEmpty()) { + logRespondAndUnlock(INFO, "No contact/host deletion tasks in pull queue; finishing.", lock); + return; + } + Multiset kindCounts = HashMultiset.create(2); ImmutableList.Builder builder = new ImmutableList.Builder<>(); ImmutableList.Builder> resourceKeys = new ImmutableList.Builder<>(); @@ -158,13 +187,13 @@ public class DeleteContactsAndHostsAction implements Runnable { deleteStaleTasksWithRetry(requestsToDelete); ImmutableList deletionRequests = builder.build(); if (deletionRequests.isEmpty()) { - logger.atInfo().log("No asynchronous deletions to process because all were already handled."); - response.setPayload("All requested deletions of contacts/hosts have already occurred."); + logRespondAndUnlock( + INFO, "No async deletions to process because all were already handled.", lock); } else { logger.atInfo().log( "Processing asynchronous deletion of %d contacts and %d hosts: %s", kindCounts.count(KIND_CONTACT), kindCounts.count(KIND_HOST), resourceKeys.build()); - runMapreduce(deletionRequests); + runMapreduce(deletionRequests, lock); } } @@ -187,27 +216,35 @@ public class DeleteContactsAndHostsAction implements Runnable { deletionRequest.requestedTime())); } - private void runMapreduce(ImmutableList deletionRequests) { + private void runMapreduce(ImmutableList deletionRequests, Optional lock) { try { int numReducers = Math.min(MAX_REDUCE_SHARDS, divide(deletionRequests.size(), DELETES_PER_SHARD, CEILING)); - response.sendJavaScriptRedirect(createJobPath(mrRunner - .setJobName("Check for EPP resource references and then delete") - .setModuleName("backend") - .setDefaultReduceShards(numReducers) - .runMapreduce( - new DeleteContactsAndHostsMapper(deletionRequests), - new DeleteEppResourceReducer(), - ImmutableList.of( - // Add an extra shard that maps over a null domain. See the mapper code for why. - new NullInput<>(), - EppResourceInputs.createEntityInput(DomainBase.class))))); + response.sendJavaScriptRedirect( + createJobPath( + mrRunner + .setJobName("Check for EPP resource references and then delete") + .setModuleName("backend") + .setDefaultReduceShards(numReducers) + .runMapreduce( + new DeleteContactsAndHostsMapper(deletionRequests), + new DeleteEppResourceReducer(), + ImmutableList.of( + // Add an extra shard that maps over a null domain. See the mapper code + // for why. + new NullInput<>(), EppResourceInputs.createEntityInput(DomainBase.class)), + new UnlockerOutput(lock.get())))); } catch (Throwable t) { - logger.atSevere().withCause(t).log( - "Error while kicking off mapreduce to delete contacts/hosts"); + logRespondAndUnlock(SEVERE, "Error starting mapreduce to delete contacts/hosts.", lock); } } + private void logRespondAndUnlock(Level level, String message, Optional lock) { + logger.at(level).log(message); + response.setPayload(message); + lock.ifPresent(Lock::release); + } + /** * A mapper that iterates over all {@link DomainBase} entities. * diff --git a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java index ffcbf0b6c..70a3b9998 100644 --- a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java +++ b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java @@ -29,7 +29,10 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.util.DateTimeUtils.latestOf; import static google.registry.util.PipelineUtils.createJobPath; import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; +import static org.joda.time.Duration.standardHours; import com.google.appengine.api.taskqueue.LeaseOptions; import com.google.appengine.api.taskqueue.Queue; @@ -50,20 +53,25 @@ import google.registry.mapreduce.MapreduceRunner; import google.registry.mapreduce.inputs.NullInput; import google.registry.model.domain.DomainResource; import google.registry.model.host.HostResource; +import google.registry.model.server.Lock; import google.registry.request.Action; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.NonFinalForTesting; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.SystemClock; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.logging.Level; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import org.joda.time.DateTime; +import org.joda.time.Duration; /** Performs batched DNS refreshes for applicable domains following a host rename. */ @Action( @@ -73,26 +81,48 @@ import org.joda.time.DateTime; public class RefreshDnsOnHostRenameAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private static final long LEASE_MINUTES = 20; + private static final Duration LEASE_LENGTH = standardHours(4); @Inject AsyncFlowMetrics asyncFlowMetrics; @Inject Clock clock; @Inject MapreduceRunner mrRunner; @Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue; + @Inject RequestStatusChecker requestStatusChecker; @Inject Response response; @Inject Retrier retrier; @Inject RefreshDnsOnHostRenameAction() {} @Override public void run() { - LeaseOptions options = - LeaseOptions.Builder.withCountLimit(maxLeaseCount()).leasePeriod(LEASE_MINUTES, MINUTES); - List tasks = pullQueue.leaseTasks(options); - asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size()); - if (tasks.isEmpty()) { - response.setPayload("No DNS refresh on host rename tasks to process in pull queue."); + // Check if the lock can be acquired, and if not, a previous run of this mapreduce is still + // executing, so return early. + Optional lock = + Lock.acquire( + RefreshDnsOnHostRenameAction.class.getSimpleName(), + null, + LEASE_LENGTH, + requestStatusChecker, + false); + + if (!lock.isPresent()) { + logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock); return; } + + // Lease the async tasks to process. + LeaseOptions options = + LeaseOptions.Builder.withCountLimit(maxLeaseCount()) + .leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS); + List tasks = pullQueue.leaseTasks(options); + asyncFlowMetrics.recordDnsRefreshBatchSize(tasks.size()); + + // Check if there are no tasks to process, and if so, return early. + if (tasks.isEmpty()) { + logRespondAndUnlock( + INFO, "No DNS refresh on host rename tasks to process in pull queue; finishing.", lock); + return; + } + ImmutableList.Builder requestsBuilder = new ImmutableList.Builder<>(); ImmutableList.Builder> hostKeys = new ImmutableList.Builder<>(); final List requestsToDelete = new ArrayList<>(); @@ -119,34 +149,41 @@ public class RefreshDnsOnHostRenameAction implements Runnable { requestsToDelete, pullQueue, asyncFlowMetrics, retrier, OperationResult.STALE); ImmutableList refreshRequests = requestsBuilder.build(); if (refreshRequests.isEmpty()) { - logger.atInfo().log( - "No asynchronous DNS refreshes to process because all renamed hosts are deleted."); - response.setPayload("All requested DNS refreshes are on hosts that were since deleted."); + logRespondAndUnlock( + INFO, "No async DNS refreshes to process because all renamed hosts are deleted.", lock); } else { logger.atInfo().log( "Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build()); - runMapreduce(refreshRequests); + runMapreduce(refreshRequests, lock); } } - private void runMapreduce(ImmutableList refreshRequests) { + private void runMapreduce(ImmutableList refreshRequests, Optional lock) { try { - response.sendJavaScriptRedirect(createJobPath(mrRunner - .setJobName("Enqueue DNS refreshes for domains referencing renamed hosts") - .setModuleName("backend") - .setDefaultReduceShards(1) - .runMapreduce( - new RefreshDnsOnHostRenameMapper(refreshRequests, retrier), - new RefreshDnsOnHostRenameReducer(refreshRequests, retrier), - // Add an extra NullInput so that the reducer always fires exactly once. - ImmutableList.of( - new NullInput<>(), createEntityInput(DomainResource.class))))); + response.sendJavaScriptRedirect( + createJobPath( + mrRunner + .setJobName("Enqueue DNS refreshes for domains referencing renamed hosts") + .setModuleName("backend") + .setDefaultReduceShards(1) + .runMapreduce( + new RefreshDnsOnHostRenameMapper(refreshRequests, retrier), + new RefreshDnsOnHostRenameReducer(refreshRequests, lock.get(), retrier), + // Add an extra NullInput so that the reducer always fires exactly once. + ImmutableList.of( + new NullInput<>(), createEntityInput(DomainResource.class))))); } catch (Throwable t) { - logger.atSevere().withCause(t).log( - "Error while kicking off mapreduce to refresh DNS for renamed hosts."); + logRespondAndUnlock( + SEVERE, "Error starting mapreduce to refresh DNS for renamed hosts.", lock); } } + private void logRespondAndUnlock(Level level, String message, Optional lock) { + logger.at(level).log(message); + response.setPayload(message); + lock.ifPresent(Lock::release); + } + /** Map over domains and refresh the DNS of those that reference the renamed hosts. */ public static class RefreshDnsOnHostRenameMapper extends Mapper { @@ -205,27 +242,34 @@ public class RefreshDnsOnHostRenameAction implements Runnable { */ public static class RefreshDnsOnHostRenameReducer extends Reducer { - private static final long serialVersionUID = -2850944843275790412L; + private static final long serialVersionUID = 9077366205249562118L; @NonFinalForTesting private static AsyncFlowMetrics asyncFlowMetrics = new AsyncFlowMetrics(new SystemClock()); + private final Lock lock; private final Retrier retrier; private final List refreshRequests; - RefreshDnsOnHostRenameReducer(List refreshRequests, Retrier retrier) { + RefreshDnsOnHostRenameReducer( + List refreshRequests, Lock lock, Retrier retrier) { this.refreshRequests = refreshRequests; + this.lock = lock; this.retrier = retrier; } @Override public void reduce(Boolean key, ReducerInput values) { + // The reduce() method is run precisely once, because the NullInput caused the mapper to emit + // a dummy value once. deleteTasksWithRetry( refreshRequests, getQueue(QUEUE_ASYNC_HOST_RENAME), asyncFlowMetrics, retrier, OperationResult.SUCCESS); + + lock.release(); } } diff --git a/java/google/registry/mapreduce/BUILD b/java/google/registry/mapreduce/BUILD index fba909685..591edeb4f 100644 --- a/java/google/registry/mapreduce/BUILD +++ b/java/google/registry/mapreduce/BUILD @@ -9,6 +9,7 @@ java_library( srcs = glob(["*.java"]), deps = [ "//java/google/registry/mapreduce/inputs", + "//java/google/registry/model", "//java/google/registry/request", "//java/google/registry/util", "//third_party/objectify:objectify-v4_1", diff --git a/java/google/registry/mapreduce/UnlockerOutput.java b/java/google/registry/mapreduce/UnlockerOutput.java new file mode 100644 index 000000000..c37d8e51d --- /dev/null +++ b/java/google/registry/mapreduce/UnlockerOutput.java @@ -0,0 +1,65 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.mapreduce; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +import com.google.appengine.tools.mapreduce.Output; +import com.google.appengine.tools.mapreduce.OutputWriter; +import com.google.common.flogger.FluentLogger; +import google.registry.model.server.Lock; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; + +/** An App Engine MapReduce "Output" that releases the given {@link Lock}. */ +public class UnlockerOutput extends Output { + + private static final long serialVersionUID = 2884979908715512998L; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final Lock lock; + + public UnlockerOutput(Lock lock) { + this.lock = lock; + } + + private static class NoopWriter extends OutputWriter { + + private static final long serialVersionUID = -8327197554987150393L; + + @Override + public void write(O object) { + // Noop + } + + @Override + public boolean allowSliceRetry() { + return true; + } + } + + @Override + public List> createWriters(int numShards) { + return Stream.generate(NoopWriter::new).limit(numShards).collect(toImmutableList()); + } + + @Override + public Lock finish(Collection> writers) { + logger.atInfo().log("Mapreduce finished; releasing lock: %s", lock); + lock.release(); + return lock; + } +} diff --git a/java/google/registry/model/domain/DesignatedContact.java b/java/google/registry/model/domain/DesignatedContact.java index 62f13e1e0..b95e42401 100644 --- a/java/google/registry/model/domain/DesignatedContact.java +++ b/java/google/registry/model/domain/DesignatedContact.java @@ -14,6 +14,8 @@ package google.registry.model.domain; +import static google.registry.util.PreconditionsUtils.checkArgumentNotNull; + import com.google.common.annotations.VisibleForTesting; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.Embed; @@ -59,7 +61,7 @@ public class DesignatedContact extends ImmutableObject { public static DesignatedContact create(Type type, Key contact) { DesignatedContact instance = new DesignatedContact(); instance.type = type; - instance.contact = contact; + instance.contact = checkArgumentNotNull(contact, "Must specify contact key"); return instance; } diff --git a/java/google/registry/model/domain/DomainBase.java b/java/google/registry/model/domain/DomainBase.java index 637855ea1..022a42cd5 100644 --- a/java/google/registry/model/domain/DomainBase.java +++ b/java/google/registry/model/domain/DomainBase.java @@ -45,6 +45,7 @@ import google.registry.model.domain.DesignatedContact.Type; import google.registry.model.domain.launch.LaunchNotice; import google.registry.model.domain.secdns.DelegationSignerData; import google.registry.model.host.HostResource; +import java.util.Objects; import java.util.Set; import java.util.function.Predicate; @@ -165,12 +166,11 @@ public abstract class DomainBase extends EppResource { /** Returns all referenced contacts from this domain or application. */ public ImmutableSet> getReferencedContacts() { - ImmutableSet.Builder> contactsBuilder = - new ImmutableSet.Builder<>(); - for (DesignatedContact designated : nullToEmptyImmutableCopy(allContacts)) { - contactsBuilder.add(designated.getContactKey()); - } - return contactsBuilder.build(); + return nullToEmptyImmutableCopy(allContacts) + .stream() + .map(DesignatedContact::getContactKey) + .filter(Objects::nonNull) + .collect(toImmutableSet()); } public String getTld() { diff --git a/java/google/registry/model/server/Lock.java b/java/google/registry/model/server/Lock.java index b39e2d31c..b897c76e0 100644 --- a/java/google/registry/model/server/Lock.java +++ b/java/google/registry/model/server/Lock.java @@ -29,6 +29,7 @@ import google.registry.model.annotations.NotBackedUp; import google.registry.model.annotations.NotBackedUp.Reason; import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusCheckerImpl; +import java.io.Serializable; import java.util.Optional; import javax.annotation.Nullable; import org.joda.time.DateTime; @@ -45,8 +46,9 @@ import org.joda.time.Duration; */ @Entity @NotBackedUp(reason = Reason.TRANSIENT) -public class Lock extends ImmutableObject { +public class Lock extends ImmutableObject implements Serializable { + private static final long serialVersionUID = 756397280691684645L; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); /** Disposition of locking, for monitoring. */ @@ -164,10 +166,11 @@ public class Lock extends ImmutableObject { /** Try to acquire a lock. Returns absent if it can't be acquired. */ public static Optional acquire( - final String resourceName, - @Nullable final String tld, - final Duration leaseLength, - final RequestStatusChecker requestStatusChecker) { + String resourceName, + @Nullable String tld, + Duration leaseLength, + RequestStatusChecker requestStatusChecker, + boolean checkThreadRunning) { String lockId = makeLockId(resourceName, tld); // It's important to use transactNew rather than transact, because a Lock can be used to control // access to resources like GCS that can't be transactionally rolled back. Therefore, the lock @@ -189,7 +192,8 @@ public class Lock extends ImmutableObject { lockState = LockState.FREE; } else if (isAtOrAfter(now, lock.expirationTime)) { lockState = LockState.TIMED_OUT; - } else if (!requestStatusChecker.isRunning(lock.requestLogId)) { + } else if (checkThreadRunning + && !requestStatusChecker.isRunning(lock.requestLogId)) { lockState = LockState.OWNER_DIED; } else { lockState = LockState.IN_USE; diff --git a/java/google/registry/request/lock/LockHandlerImpl.java b/java/google/registry/request/lock/LockHandlerImpl.java index 197c7e546..2d44dd152 100644 --- a/java/google/registry/request/lock/LockHandlerImpl.java +++ b/java/google/registry/request/lock/LockHandlerImpl.java @@ -105,7 +105,7 @@ public class LockHandlerImpl implements LockHandler { /** Allows injection of mock Lock in tests. */ @VisibleForTesting Optional acquire(String lockName, @Nullable String tld, Duration leaseLength) { - return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker); + return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker, true); } /** A {@link Callable} that acquires and releases a lock around a delegate {@link Callable}. */ diff --git a/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java b/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java index 5d1993227..9019d4288 100644 --- a/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java +++ b/javatests/google/registry/batch/DeleteContactsAndHostsActionTest.java @@ -52,11 +52,14 @@ import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.util.DateTimeUtils.END_OF_TIME; import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.Duration.millis; +import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardHours; import static org.joda.time.Duration.standardSeconds; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.appengine.api.modules.ModulesService; import com.google.appengine.api.taskqueue.TaskOptions; @@ -88,6 +91,7 @@ import google.registry.model.poll.PollMessage; import google.registry.model.poll.PollMessage.OneTime; import google.registry.model.registry.Registry; import google.registry.model.reporting.HistoryEntry; +import google.registry.model.server.Lock; import google.registry.model.transfer.TransferData; import google.registry.model.transfer.TransferResponse; import google.registry.model.transfer.TransferStatus; @@ -95,8 +99,10 @@ import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; +import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.mapreduce.MapreduceTestCase; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.Sleeper; import google.registry.util.SystemSleeper; @@ -108,17 +114,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; /** Unit tests for {@link DeleteContactsAndHostsAction}. */ @RunWith(JUnit4.class) public class DeleteContactsAndHostsActionTest extends MapreduceTestCase { - @Rule - public final InjectRule inject = new InjectRule(); + @Rule public final InjectRule inject = new InjectRule(); + @Rule public final MockitoJUnitRule mocks = MockitoJUnitRule.create(); - AsyncFlowEnqueuer enqueuer; - FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private AsyncFlowEnqueuer enqueuer; + private final FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private final FakeResponse fakeResponse = new FakeResponse(); + @Mock private RequestStatusChecker requestStatusChecker; private void runMapreduce() throws Exception { clock.advanceBy(standardSeconds(5)); @@ -134,8 +143,17 @@ public class DeleteContactsAndHostsActionTest ofy().clearSessionCache(); } + /** Kicks off, but does not run, the mapreduce tasks. Useful for testing validation/setup. */ + private void enqueueMapreduceOnly() { + clock.advanceBy(standardSeconds(5)); + action.run(); + clock.advanceBy(standardSeconds(5)); + ofy().clearSessionCache(); + } + @Before public void setup() { + inject.setStaticField(Ofy.class, "clock", clock); enqueuer = new AsyncFlowEnqueuer( getQueue(QUEUE_ASYNC_ACTIONS), @@ -150,10 +168,13 @@ public class DeleteContactsAndHostsActionTest inject.setStaticField(DeleteEppResourceReducer.class, "asyncFlowMetrics", asyncFlowMetricsMock); action.clock = clock; action.mrRunner = makeDefaultRunner(); - action.response = new FakeResponse(); + action.requestStatusChecker = requestStatusChecker; + action.response = fakeResponse; action.retrier = new Retrier(new FakeSleeper(clock), 1); action.queue = getQueue(QUEUE_ASYNC_DELETE); - inject.setStaticField(Ofy.class, "clock", clock); + when(requestStatusChecker.getLogId()).thenReturn("requestId"); + when(requestStatusChecker.isRunning(anyString())) + .thenThrow(new AssertionError("Should not be called")); createTld("tld"); clock.advanceOneMilli(); @@ -207,6 +228,35 @@ public class DeleteContactsAndHostsActionTest runSuccessfulContactDeletionTest(Optional.empty()); } + @Test + public void test_cannotAcquireLock() { + // Make lock acquisition fail. + acquireLock(); + enqueueMapreduceOnly(); + assertThat(fakeResponse.getPayload()).isEqualTo("Can't acquire lock; aborting."); + } + + @Test + public void test_mapreduceHasWorkToDo_lockIsAcquired() { + ContactResource contact = persistContactPendingDelete("blah8221"); + persistResource(newDomainResource("example.tld", contact)); + DateTime timeEnqueued = clock.nowUtc(); + enqueuer.enqueueAsyncDelete( + contact, + timeEnqueued, + "TheRegistrar", + Trid.create("fakeClientTrid", "fakeServerTrid"), + false); + enqueueMapreduceOnly(); + assertThat(acquireLock()).isEmpty(); + } + + @Test + public void test_noTasksToLease_releasesLockImmediately() { + enqueueMapreduceOnly(); + // If the Lock was correctly released, then we can acquire it now. + assertThat(acquireLock()).isPresent(); + } private void runSuccessfulContactDeletionTest(Optional clientTrid) throws Exception { ContactResource contact = persistContactWithPii("jim919"); @@ -454,7 +504,7 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertTasksEnqueued( QUEUE_ASYNC_DELETE, new TaskMatcher() @@ -473,6 +523,7 @@ public class DeleteContactsAndHostsActionTest .param("serverTransactionId", "fakeServerTrid") .param("isSuperuser", "false") .param("requestedTime", timeBeforeRun.toString())); + assertThat(acquireLock()).isPresent(); } @Test @@ -480,7 +531,7 @@ public class DeleteContactsAndHostsActionTest TaskOptions task = TaskOptions.Builder.withMethod(Method.PULL).param("gobbledygook", "kljhadfgsd9f7gsdfh"); getQueue(QUEUE_ASYNC_DELETE).add(task); - runMapreduce(); + enqueueMapreduceOnly(); assertTasksEnqueued( QUEUE_ASYNC_DELETE, new TaskMatcher() @@ -488,6 +539,7 @@ public class DeleteContactsAndHostsActionTest .etaDelta(standardHours(23), standardHours(25))); verify(action.asyncFlowMetrics).recordContactHostDeletionBatchSize(1L); verifyNoMoreInteractions(action.asyncFlowMetrics); + assertThat(acquireLock()).isPresent(); } @Test @@ -507,7 +559,7 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertThat(loadByForeignKey(ContactResource.class, "blah2222", clock.nowUtc())) .isEqualTo(contact); assertThat(loadByForeignKey(HostResource.class, "rustles.your.jimmies", clock.nowUtc())) @@ -519,6 +571,7 @@ public class DeleteContactsAndHostsActionTest verify(action.asyncFlowMetrics) .recordAsyncFlowResult(OperationType.HOST_DELETE, STALE, timeEnqueued); verifyNoMoreInteractions(action.asyncFlowMetrics); + assertThat(acquireLock()).isPresent(); } @Test @@ -537,10 +590,11 @@ public class DeleteContactsAndHostsActionTest "TheRegistrar", Trid.create("fakeClientTrid", "fakeServerTrid"), false); - runMapreduce(); + enqueueMapreduceOnly(); assertThat(ofy().load().entity(contactDeleted).now()).isEqualTo(contactDeleted); assertThat(ofy().load().entity(hostDeleted).now()).isEqualTo(hostDeleted); assertNoTasksEnqueued(QUEUE_ASYNC_DELETE); + assertThat(acquireLock()).isPresent(); } @Test @@ -894,4 +948,13 @@ public class DeleteContactsAndHostsActionTest .setNameservers(ImmutableSet.of(Key.create(host))) .build()); } + + private Optional acquireLock() { + return Lock.acquire( + DeleteContactsAndHostsAction.class.getSimpleName(), + null, + standardDays(30), + requestStatusChecker, + false); + } } diff --git a/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java b/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java index 11a8b60f6..f8170ec7b 100644 --- a/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java +++ b/javatests/google/registry/batch/RefreshDnsOnHostRenameActionTest.java @@ -15,6 +15,8 @@ package google.registry.batch; import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_ACTIONS; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_DELETE; import static google.registry.flows.async.AsyncFlowEnqueuer.QUEUE_ASYNC_HOST_RENAME; @@ -33,12 +35,15 @@ import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static google.registry.util.DateTimeUtils.START_OF_TIME; import static org.joda.time.Duration.millis; +import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardHours; import static org.joda.time.Duration.standardSeconds; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.appengine.api.modules.ModulesService; import com.google.common.collect.ImmutableSet; @@ -48,15 +53,19 @@ import google.registry.flows.async.AsyncFlowEnqueuer; import google.registry.flows.async.AsyncFlowMetrics; import google.registry.flows.async.AsyncFlowMetrics.OperationResult; import google.registry.model.host.HostResource; +import google.registry.model.server.Lock; import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import google.registry.testing.FakeSleeper; import google.registry.testing.InjectRule; +import google.registry.testing.MockitoJUnitRule; import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.mapreduce.MapreduceTestCase; +import google.registry.util.RequestStatusChecker; import google.registry.util.Retrier; import google.registry.util.Sleeper; import google.registry.util.SystemSleeper; +import java.util.Optional; import org.joda.time.DateTime; import org.joda.time.Duration; import org.junit.Before; @@ -64,17 +73,20 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; /** Unit tests for {@link RefreshDnsOnHostRenameAction}. */ @RunWith(JUnit4.class) public class RefreshDnsOnHostRenameActionTest extends MapreduceTestCase { - @Rule - public InjectRule inject = new InjectRule(); + @Rule public final InjectRule inject = new InjectRule(); + @Rule public final MockitoJUnitRule mocks = MockitoJUnitRule.create(); private AsyncFlowEnqueuer enqueuer; private final FakeClock clock = new FakeClock(DateTime.parse("2015-01-15T11:22:33Z")); + private final FakeResponse fakeResponse = new FakeResponse(); + @Mock private RequestStatusChecker requestStatusChecker; @Before public void setup() { @@ -95,8 +107,12 @@ public class RefreshDnsOnHostRenameActionTest action.clock = clock; action.mrRunner = makeDefaultRunner(); action.pullQueue = getQueue(QUEUE_ASYNC_HOST_RENAME); - action.response = new FakeResponse(); + action.requestStatusChecker = requestStatusChecker; + action.response = fakeResponse; action.retrier = new Retrier(new FakeSleeper(clock), 1); + when(requestStatusChecker.getLogId()).thenReturn("requestId"); + when(requestStatusChecker.isRunning(anyString())) + .thenThrow(new AssertionError("Should not be called")); } private void runMapreduce() throws Exception { @@ -112,6 +128,14 @@ public class RefreshDnsOnHostRenameActionTest ofy().clearSessionCache(); } + /** Kicks off, but does not run, the mapreduce tasks. Useful for testing validation/setup. */ + private void enqueueMapreduceOnly() { + clock.advanceOneMilli(); + action.run(); + clock.advanceBy(standardSeconds(5)); + ofy().clearSessionCache(); + } + @Test public void testSuccess_dnsUpdateEnqueued() throws Exception { HostResource host = persistActiveHost("ns1.example.tld"); @@ -191,12 +215,48 @@ public class RefreshDnsOnHostRenameActionTest public void testRun_hostDoesntExist_delaysTask() throws Exception { HostResource host = newHostResource("ns1.example.tld"); enqueuer.enqueueAsyncDnsRefresh(host, clock.nowUtc()); - runMapreduce(); + enqueueMapreduceOnly(); assertNoDnsTasksEnqueued(); assertTasksEnqueued( QUEUE_ASYNC_HOST_RENAME, new TaskMatcher() .etaDelta(standardHours(23), standardHours(25)) .param("hostKey", Key.create(host).getString())); + assertThat(acquireLock()).isPresent(); + } + + @Test + public void test_cannotAcquireLock() { + // Make lock acquisition fail. + acquireLock(); + enqueueMapreduceOnly(); + assertThat(fakeResponse.getPayload()).isEqualTo("Can't acquire lock; aborting."); + assertNoDnsTasksEnqueued(); + } + + @Test + public void test_mapreduceHasWorkToDo_lockIsAcquired() { + HostResource host = persistActiveHost("ns1.example.tld"); + enqueuer.enqueueAsyncDnsRefresh(host, clock.nowUtc()); + enqueueMapreduceOnly(); + assertThat(acquireLock()).isEmpty(); + } + + @Test + public void test_noTasksToLease_releasesLockImmediately() throws Exception { + enqueueMapreduceOnly(); + assertNoDnsTasksEnqueued(); + assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME); + // If the Lock was correctly released, then we can acquire it now. + assertThat(acquireLock()).isPresent(); + } + + private Optional acquireLock() { + return Lock.acquire( + RefreshDnsOnHostRenameAction.class.getSimpleName(), + null, + standardDays(30), + requestStatusChecker, + false); } } diff --git a/javatests/google/registry/model/server/LockTest.java b/javatests/google/registry/model/server/LockTest.java index ced8a37cf..33fa11e5c 100644 --- a/javatests/google/registry/model/server/LockTest.java +++ b/javatests/google/registry/model/server/LockTest.java @@ -50,16 +50,12 @@ public class LockTest { private static final RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class); private static final FakeClock clock = new FakeClock(); - @Rule - public final AppEngineRule appEngine = AppEngineRule.builder() - .withDatastore() - .build(); + @Rule public final AppEngineRule appEngine = AppEngineRule.builder().withDatastore().build(); + @Rule public final InjectRule inject = new InjectRule(); - @Rule - public final InjectRule inject = new InjectRule(); private Optional acquire(String tld, Duration leaseLength, LockState expectedLockState) { Lock.lockMetrics = mock(LockMetrics.class); - Optional lock = Lock.acquire(RESOURCE_NAME, tld, leaseLength, requestStatusChecker); + Optional lock = Lock.acquire(RESOURCE_NAME, tld, leaseLength, requestStatusChecker, true); verify(Lock.lockMetrics).recordAcquire(RESOURCE_NAME, tld, expectedLockState); verifyNoMoreInteractions(Lock.lockMetrics); Lock.lockMetrics = null; @@ -75,7 +71,6 @@ public class LockTest { Lock.lockMetrics = null; } - @Before public void setUp() { inject.setStaticField(Ofy.class, "clock", clock); Lock.lockMetrics = null; @@ -138,7 +133,7 @@ public class LockTest { IllegalArgumentException thrown = assertThrows( IllegalArgumentException.class, - () -> Lock.acquire("", "", TWO_MILLIS, requestStatusChecker)); + () -> Lock.acquire("", "", TWO_MILLIS, requestStatusChecker, true)); assertThat(thrown).hasMessageThat().contains("resourceName cannot be null or empty"); } }