From 9c6c210e21cd94b6d5135b2d39c73871efb72fe2 Mon Sep 17 00:00:00 2001 From: gbrodman Date: Thu, 27 Oct 2022 14:46:26 -0400 Subject: [PATCH] Check for entity nonexistence in SqlBatchWriter (#1824) Passing in an already-existing instance is an antipattern because it can lead to race conditions where something else modified the object in between when the pipeline loaded it and when you're saving it. The Write action should only be writing new entities. We cannot check IDs for the objects (some IDs are not autogenerated so they might exist already). We also cannot call `insert` on the objects because the underlying JPA `persist` call adds the input object to the persistence context, meaning that any modifications (e.g. updateTimestamp) are reflected in the input object. Beam doesn't allow modification of input objects. --- .../registry/beam/common/RegistryJpaIO.java | 28 +++++++++++++++-- .../beam/common/RegistryJpaWriteTest.java | 30 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 03b3501a0..bde73fb89 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -14,6 +14,7 @@ package google.registry.beam.common; +import static com.google.common.base.Preconditions.checkArgument; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.apache.beam.sdk.values.TypeDescriptors.integers; @@ -405,7 +406,13 @@ public final class RegistryJpaIO { .filter(Objects::nonNull) .collect(ImmutableList.toImmutableList()); try { - jpaTm().transact(() -> jpaTm().putAll(entities)); + jpaTm() + .transact( + () -> { + // Don't modify existing objects as it could lead to race conditions + entities.forEach(this::verifyObjectNonexistence); + jpaTm().putAll(entities); + }); counter.inc(entities.size()); } catch (RuntimeException e) { processSingly(entities); @@ -419,7 +426,13 @@ public final class RegistryJpaIO { private void processSingly(ImmutableList entities) { for (Object entity : entities) { try { - jpaTm().transact(() -> jpaTm().put(entity)); + jpaTm() + .transact( + () -> { + // Don't modify existing objects as it could lead to race conditions + verifyObjectNonexistence(entity); + jpaTm().put(entity); + }); counter.inc(); } catch (RuntimeException e) { throw new RuntimeException(toEntityKeyString(entity), e); @@ -445,5 +458,16 @@ public final class RegistryJpaIO { return "Non-SqlEntity: " + entity; } } + + /** SqlBatchWriter should not re-write existing entities due to potential race conditions. */ + private void verifyObjectNonexistence(Object obj) { + // We cannot rely on calling "insert" on the objects because the underlying JPA persist call + // adds the input object to the persistence context, meaning that any modifications (e.g. + // updateTimestamp) are reflected in the input object. Beam doesn't allow modification of + // input objects, so this throws an exception. + // TODO(go/non-datastore-allocateid): also check that all the objects have IDs + checkArgument( + !jpaTm().exists(obj), "Entities created in SqlBatchWriter must not already exist"); + } } } diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index c483ab4ac..4e2ebd419 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -17,9 +17,12 @@ package google.registry.beam.common; import static com.google.common.truth.Truth.assertThat; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.DatabaseHelper.loadAllOf; import static google.registry.testing.DatabaseHelper.newContact; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.Contact; import google.registry.persistence.transaction.JpaTestExtensions; @@ -28,6 +31,7 @@ import google.registry.testing.AppEngineExtension; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import java.io.Serializable; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.transforms.Create; import org.joda.time.DateTime; import org.junit.jupiter.api.Order; @@ -65,8 +69,32 @@ class RegistryJpaWriteTest implements Serializable { .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4).withShards(2)); testPipeline.run().waitUntilFinish(); - assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Contact.class))) + assertThat(loadAllOf(Contact.class)) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) .containsExactlyElementsIn(contacts); } + + @Test + void testFailure_writeExistingEntity() { + // RegistryJpaIO.Write actions should not write existing objects to the database because the + // object could have been mutated in between creation and when the Write actually occurs, + // causing a race condition + jpaTm() + .transact( + () -> { + jpaTm().put(AppEngineExtension.makeRegistrar2()); + jpaTm().put(newContact("contact")); + }); + Contact contact = Iterables.getOnlyElement(loadAllOf(Contact.class)); + testPipeline + .apply(Create.of(contact)) + .apply(RegistryJpaIO.write().withName("Contact")); + // PipelineExecutionException caused by a RuntimeException caused by an IllegalArgumentException + assertThat( + assertThrows( + PipelineExecutionException.class, () -> testPipeline.run().waitUntilFinish())) + .hasCauseThat() + .hasCauseThat() + .isInstanceOf(IllegalArgumentException.class); + } }