diff --git a/java/google/registry/backup/DeleteOldCommitLogsAction.java b/java/google/registry/backup/DeleteOldCommitLogsAction.java
index d91244bcc..31e78c6aa 100644
--- a/java/google/registry/backup/DeleteOldCommitLogsAction.java
+++ b/java/google/registry/backup/DeleteOldCommitLogsAction.java
@@ -14,6 +14,8 @@
package google.registry.backup;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static google.registry.mapreduce.MapreduceRunner.PARAM_DRY_RUN;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.util.FormattingLogger.getLoggerForCallerClass;
@@ -33,7 +35,6 @@ import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.CommitLogManifestInput;
import google.registry.mapreduce.inputs.EppResourceInputs;
import google.registry.model.EppResource;
-import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.model.ofy.CommitLogMutation;
import google.registry.model.translators.CommitLogRevisionsTranslatorFactory;
@@ -98,7 +99,7 @@ public final class DeleteOldCommitLogsAction implements Runnable {
new DeleteOldCommitLogsReducer(deletionThreshold, isDryRun),
ImmutableList.of(
new CommitLogManifestInput(Optional.of(deletionThreshold)),
- EppResourceInputs.createEntityInput(EppResource.class)))));
+ EppResourceInputs.createKeyInput(EppResource.class)))));
}
/**
@@ -110,29 +111,52 @@ public final class DeleteOldCommitLogsAction implements Runnable {
*
The reducer will then delete all CommitLogRevisions that only have {@code true}.
*/
private static class DeleteOldCommitLogsMapper
- extends Mapper, Boolean> {
+ extends Mapper, Key, Boolean> {
private static final long serialVersionUID = -1960845380164573834L;
+ private static final String KIND_MANIFEST = Key.getKind(CommitLogManifest.class);
+
@Override
- public void map(ImmutableObject object) {
- if (object instanceof EppResource) {
- getContext().incrementCounter("Epp resources found");
- EppResource eppResource = (EppResource) object;
- for (Key manifestKey : eppResource.getRevisions().values()) {
- emit(manifestKey, false);
- }
- getContext()
- .incrementCounter("Epp resource revisions found", eppResource.getRevisions().size());
- } else if (object instanceof CommitLogManifest) {
+ public void map(final Key> key) {
+ // key is either a Key or a Key extends EppResource>.
+ //
+ // If it's a CommitLogManifest we just emit it as is (no need to load it).
+ if (key.getKind().equals(KIND_MANIFEST)) {
getContext().incrementCounter("old commit log manifests found");
- emit(Key.create((CommitLogManifest) object), true);
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Received object of type %s, expected either EppResource or CommitLogManifest",
- object.getClass().getName()));
+ // safe because we checked getKind
+ @SuppressWarnings("unchecked")
+ Key manifestKey = (Key) key;
+ emit(manifestKey, true);
+ return;
}
+
+ // If it isn't a Key then it should be an EppResource, which we need to
+ // load to emit the revisions.
+ //
+ // We want to make sure we retry any load individually to reduce the chance of the entire
+ // shard failing, hence we wrap it in a transactNew.
+ Object object = ofy().transactNew(new Work() {
+ @Override
+ public Object run() {
+ return ofy().load().key(key).now();
+ }
+ });
+ checkNotNull(object, "Received a key to a missing object. key: %s", key);
+ checkState(
+ object instanceof EppResource,
+ "Received a key to an object that isn't EppResource nor CommitLogManifest."
+ + " Key: %s object type: %s",
+ key,
+ object.getClass().getName());
+
+ getContext().incrementCounter("EPP resources found");
+ EppResource eppResource = (EppResource) object;
+ for (Key manifestKey : eppResource.getRevisions().values()) {
+ emit(manifestKey, false);
+ }
+ getContext()
+ .incrementCounter("EPP resource revisions found", eppResource.getRevisions().size());
}
}
@@ -165,14 +189,17 @@ public final class DeleteOldCommitLogsAction implements Runnable {
getContext().incrementCounter("old commit log manifests still referenced");
return;
}
- if (ofy().load().key(manifestKey).now().getCommitTime().isAfter(deletionThreshold)) {
- logger.severefmt("Won't delete CommitLogManifest %s that is too recent.", manifestKey);
- getContext().incrementCounter("manifests incorrectly assigned for deletion (SEE LOGS)");
- return;
- }
- Integer deletedCount = ofy().transact(new Work() {
+
+ Integer deletedCount = ofy().transactNew(new Work() {
@Override
public Integer run() {
+ // Doing a sanity check on the date. This is the only place we load the CommitLogManifest,
+ // so maybe removing this test will improve performance. However, unless it's proven that
+ // the performance boost is significant (and we've tested this enough to be sure it never
+ // happens)- the safty of "let's not delete stuff we need from prod" is more important.
+ if (ofy().load().key(manifestKey).now().getCommitTime().isAfter(deletionThreshold)) {
+ return 0;
+ }
Iterable> commitLogMutationKeys = ofy().load()
.type(CommitLogMutation.class)
.ancestor(manifestKey)
@@ -190,8 +217,14 @@ public final class DeleteOldCommitLogsAction implements Runnable {
return keysToDelete.size();
}
});
- getContext().incrementCounter("old commit log manifests deleted");
- getContext().incrementCounter("total entities deleted", deletedCount);
+
+ if (deletedCount > 0) {
+ getContext().incrementCounter("old commit log manifests deleted");
+ getContext().incrementCounter("total entities deleted", deletedCount);
+ } else {
+ logger.severefmt("Won't delete CommitLogManifest %s that is too recent.", manifestKey);
+ getContext().incrementCounter("manifests incorrectly assigned for deletion (SEE LOGS)");
+ }
}
}
}
diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java b/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java
index 449ccdfc0..c7238b267 100644
--- a/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java
+++ b/java/google/registry/mapreduce/inputs/CommitLogManifestInput.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.joda.time.DateTime;
/** Base class for {@link Input} classes that map over {@link CommitLogManifest}. */
-public class CommitLogManifestInput extends Input {
+public class CommitLogManifestInput extends Input> {
private static final long serialVersionUID = 2043552272352286428L;
@@ -41,15 +41,16 @@ public class CommitLogManifestInput extends Input {
}
@Override
- public List> createReaders() {
- ImmutableList.Builder> readers = new ImmutableList.Builder<>();
+ public List>> createReaders() {
+ ImmutableList.Builder>> readers =
+ new ImmutableList.Builder<>();
for (Key bucketKey : CommitLogBucket.getAllBucketKeys()) {
readers.add(bucketToReader(bucketKey));
}
return readers.build();
}
- private InputReader bucketToReader(Key bucketKey) {
+ private InputReader> bucketToReader(Key bucketKey) {
return new CommitLogManifestReader(bucketKey, olderThan);
}
}
diff --git a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java
index b73cba06c..29ec015cb 100644
--- a/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java
+++ b/java/google/registry/mapreduce/inputs/CommitLogManifestReader.java
@@ -28,7 +28,7 @@ import java.util.NoSuchElementException;
import org.joda.time.DateTime;
/** {@link InputReader} that maps over {@link CommitLogManifest}. */
-class CommitLogManifestReader extends InputReader {
+class CommitLogManifestReader extends InputReader> {
private static final long serialVersionUID = 5117046535590539778L;
@@ -53,7 +53,7 @@ class CommitLogManifestReader extends InputReader {
private int total;
private int loaded;
- private transient QueryResultIterator queryIterator;
+ private transient QueryResultIterator> queryIterator;
CommitLogManifestReader(Key bucketKey, Optional olderThan) {
this.bucketKey = bucketKey;
@@ -83,7 +83,7 @@ class CommitLogManifestReader extends InputReader {
// paused and restarted with a cursor before it would have reached the new entity.
query = query.startAt(cursor);
}
- queryIterator = query.iterator();
+ queryIterator = query.keys().iterator();
}
/** Called occasionally alongside {@link #next}. */
@@ -123,7 +123,7 @@ class CommitLogManifestReader extends InputReader {
* @throws NoSuchElementException if there are no more elements.
*/
@Override
- public CommitLogManifest next() {
+ public Key next() {
loaded++;
try {
return queryIterator.next();
diff --git a/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java b/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java
index 99f26888f..67d8a09b4 100644
--- a/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java
+++ b/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java
@@ -103,6 +103,10 @@ public class DeleteOldCommitLogsActionTest
assertThat(ofy().load().type(CommitLogMutation.class).count()).isEqualTo(33);
}
+ private ImmutableList ofyLoadType(Class clazz) {
+ return ImmutableList.copyOf(ofy().load().type(clazz).iterable());
+ }
+
/**
* Check that with very short maxAge, only the referenced elements remain.
*/
@@ -110,12 +114,11 @@ public class DeleteOldCommitLogsActionTest
public void test_shortMaxAge() throws Exception {
runMapreduce(Duration.millis(1));
- assertThat(ofy().load().type(CommitLogManifest.class).keys().iterable())
+ assertThat(ImmutableList.copyOf(ofy().load().type(CommitLogManifest.class).keys().iterable()))
.containsExactlyElementsIn(contact.getRevisions().values());
// And each DatastoreHelper.persistResourceWithCommitLog creates 3 mutations
- assertThat(ofy().load().type(CommitLogMutation.class).keys().iterable())
- .hasSize(contact.getRevisions().size() * 3);
+ assertThat(ofyLoadType(CommitLogMutation.class)).hasSize(contact.getRevisions().size() * 3);
}
/**
@@ -124,16 +127,12 @@ public class DeleteOldCommitLogsActionTest
@Test
public void test_longMaxAge() throws Exception {
- ImmutableList initialManifests =
- ImmutableList.copyOf(ofy().load().type(CommitLogManifest.class).iterable());
- ImmutableList initialMutations =
- ImmutableList.copyOf(ofy().load().type(CommitLogMutation.class).iterable());
+ ImmutableList initialManifests = ofyLoadType(CommitLogManifest.class);
+ ImmutableList initialMutations = ofyLoadType(CommitLogMutation.class);
runMapreduce(Duration.standardDays(1000));
- assertThat(ofy().load().type(CommitLogManifest.class).iterable())
- .containsExactlyElementsIn(initialManifests);
- assertThat(ofy().load().type(CommitLogMutation.class).iterable())
- .containsExactlyElementsIn(initialMutations);
+ assertThat(ofyLoadType(CommitLogManifest.class)).containsExactlyElementsIn(initialManifests);
+ assertThat(ofyLoadType(CommitLogMutation.class)).containsExactlyElementsIn(initialMutations);
}
}
diff --git a/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java b/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java
index cdc24c588..158021fe2 100644
--- a/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java
+++ b/javatests/google/registry/mapreduce/inputs/CommitLogManifestInputTest.java
@@ -53,13 +53,14 @@ public final class CommitLogManifestInputTest {
@Test
public void testInputOlderThan_allFound() throws Exception {
- Set created = new HashSet<>();
+ Set> created = new HashSet<>();
for (int i = 1; i <= 3; i++) {
created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD));
}
- List seen = new ArrayList<>();
- Input input = new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD));
- for (InputReader reader
+ List> seen = new ArrayList<>();
+ Input> input =
+ new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD));
+ for (InputReader> reader
: input.createReaders()) {
reader.beginShard();
reader.beginSlice();
@@ -75,16 +76,17 @@ public final class CommitLogManifestInputTest {
@Test
public void testInputOlderThan_skipsNew() throws Exception {
- Set old = new HashSet<>();
+ Set> old = new HashSet<>();
for (int i = 1; i <= 3; i++) {
createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW);
createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW2);
old.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD));
old.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD2));
}
- List seen = new ArrayList<>();
- Input input = new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD));
- for (InputReader reader
+ List> seen = new ArrayList<>();
+ Input> input =
+ new CommitLogManifestInput(Optional.of(DATE_TIME_THRESHOLD));
+ for (InputReader> reader
: input.createReaders()) {
reader.beginShard();
reader.beginSlice();
@@ -101,16 +103,16 @@ public final class CommitLogManifestInputTest {
@Test
public void testInputAll() throws Exception {
- Set created = new HashSet<>();
+ Set> created = new HashSet<>();
for (int i = 1; i <= 3; i++) {
created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW));
created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_NEW2));
created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD));
created.add(createManifest(CommitLogBucket.getBucketKey(i), DATE_TIME_OLD2));
}
- List seen = new ArrayList<>();
- Input input = new CommitLogManifestInput(Optional.absent());
- for (InputReader reader
+ List> seen = new ArrayList<>();
+ Input> input = new CommitLogManifestInput(Optional.absent());
+ for (InputReader> reader
: input.createReaders()) {
reader.beginShard();
reader.beginSlice();
@@ -125,9 +127,11 @@ public final class CommitLogManifestInputTest {
assertThat(seen).containsExactlyElementsIn(created);
}
- private static CommitLogManifest createManifest(Key parent, DateTime dateTime) {
+ private static Key createManifest(
+ Key parent,
+ DateTime dateTime) {
CommitLogManifest commitLogManifest = CommitLogManifest.create(parent, dateTime, null);
DatastoreHelper.persistResource(commitLogManifest);
- return commitLogManifest;
+ return Key.create(commitLogManifest);
}
}