diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java
index bbd3e4806..74186e2e4 100644
--- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java
+++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java
@@ -16,6 +16,7 @@ package google.registry.beam.common;
import google.registry.beam.common.RegistryJpaIO.Write;
import google.registry.config.RegistryEnvironment;
+import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Objects;
@@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions {
void setSqlWriteShards(int maxConcurrentSqlWriters);
+ @DeleteAfterMigration
+ @Description(
+ "Whether to use self allocated primary IDs when building entities. This should only be used"
+ + " when the IDs are not significant and the resulting entities are not persisted back to"
+ + " the database. Use with caution as self allocated IDs are not unique across workers,"
+ + " and persisting entities with these IDs can be dangerous.")
+ @Default.Boolean(false)
+ boolean getUseSelfAllocatedId();
+
+ void setUseSelfAllocatedId(boolean useSelfAllocatedId);
+
static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
return DaggerRegistryPipelineComponent.builder()
.isolationOverride(options.getIsolationOverride())
diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java
index f4d13e903..ea6899b68 100644
--- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java
+++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java
@@ -22,6 +22,8 @@ import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.AppEngineEnvironment;
+import google.registry.model.IdService;
+import google.registry.model.IdService.SelfAllocatedIdSupplier;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
@@ -65,12 +67,20 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer {
transactionManagerLazy = registryPipelineComponent.getJpaTransactionManager();
}
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
- // Masquerade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also
+ // Masquerade all threads as App Engine threads, so we can create Ofy keys in the pipeline. Also
// loads all ofy entities.
new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId())
.setEnvironmentForAllThreads();
- // Set the system property so that we can call IdService.allocateId() without access to
- // datastore.
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
+ // Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from
+ // multiple workers, which can also collide with existing IDs in the database. So they cannot be
+ // dependent upon for comparison or anything significant. The resulting entities can never be
+ // persisted back into the database. This is a stop-gap measure that should only be used when
+ // you need to create Buildables in Beam, but do not have control over how the IDs are
+ // allocated, and you don't care about the generated IDs as long
+ // as you can build the entities.
+ if (registryOptions.getUseSelfAllocatedId()) {
+ IdService.setIdSupplier(SelfAllocatedIdSupplier.getInstance());
+ }
}
}
diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java
index 0bd4088c4..07a4b06a6 100644
--- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java
+++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java
@@ -128,7 +128,7 @@ import org.joda.time.DateTime;
*
{@link EppResource}
*
* All EPP resources are loaded from the corresponding {@link HistoryEntry}, which has the resource
- * embedded. In general we find most recent history entry before watermark and filter out the ones
+ * embedded. In general, we find most recent history entry before watermark and filter out the ones
* that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history
* revision ID) from the SQL query.
*
@@ -164,7 +164,7 @@ import org.joda.time.DateTime;
*
* The (pending deposit: deposit fragment) pairs from different resources are combined and grouped
* by pending deposit. For each pending deposit, all the relevant deposit fragments are written into
- * a encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
+ * an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
* is no need to lock the GCS write operation to prevent stomping. The cursor for staging the
* pending deposit is then rolled forward, and the next action is enqueued. The latter two
* operations are performed in a transaction so the cursor is rolled back if enqueueing failed.
@@ -698,8 +698,8 @@ public class RdePipeline implements Serializable {
}
/**
- * Encodes the pending deposit set in an URL safe string that is sent to the pipeline worker by
- * the pipeline launcher as a pipeline option.
+ * Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the
+ * pipeline launcher as a pipeline option.
*/
public static String encodePendingDeposits(ImmutableSet pendingDeposits)
throws IOException {
@@ -715,6 +715,12 @@ public class RdePipeline implements Serializable {
PipelineOptionsFactory.register(RdePipelineOptions.class);
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
+ // We need to self allocate the IDs because the pipeline creates EPP resources from history
+ // entries and projects them to watermark. These buildable entities would otherwise request an
+ // ID from datastore, which Beam does not have access to. The IDs are not included in the
+ // deposits or are these entities persisted back to the database, so it is OK to use a self
+ // allocated ID to get around the limitations of beam.
+ options.setUseSelfAllocatedId(true);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
diff --git a/core/src/main/java/google/registry/model/IdService.java b/core/src/main/java/google/registry/model/IdService.java
index 8074c0583..0a6040c0b 100644
--- a/core/src/main/java/google/registry/model/IdService.java
+++ b/core/src/main/java/google/registry/model/IdService.java
@@ -17,58 +17,114 @@ package google.registry.model;
import static com.google.common.base.Preconditions.checkState;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
/**
- * Allocates a globally unique {@link Long} number to use as an Ofy {@code @Id}.
+ * Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity.
*
- * In non-test, non-beam environments the Id is generated by Datastore, otherwise it's from an
- * atomic long number that's incremented every time this method is called.
+ *
Normally, the ID is globally unique and allocated by Datastore. It is possible to override
+ * this behavior by providing an ID supplier, such as in unit tests, where a self-allocated ID based
+ * on a monotonically increasing atomic {@link long} is used. Such an ID supplier can also be used
+ * in other scenarios, such as in a Beam pipeline to get around the limitation of Beam's inability
+ * to use GAE SDK to access Datastore. The override should be used with great care lest it results
+ * in irreversible data corruption.
+ *
+ * @see #setIdSupplier(Supplier)
*/
@DeleteAfterMigration
public final class IdService {
- /**
- * A placeholder String passed into DatastoreService.allocateIds that ensures that all ids are
- * initialized from the same id pool.
- */
- private static final String APP_WIDE_ALLOCATION_KIND = "common";
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private IdService() {}
+
+ private static Supplier idSupplier =
+ RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
+ ? SelfAllocatedIdSupplier.getInstance()
+ : DatastoreIdSupplier.getInstance();
/**
- * Counts of used ids for use in unit tests or Beam.
+ * Provides a {@link Supplier} of ID that overrides the default.
*
- * Note that one should only use self-allocate Ids in Beam for entities whose Ids are not
- * important and are not persisted back to the database, i. e. nowhere the uniqueness of the ID is
- * required.
+ *
Currently, the only use case for an override is in the Beam pipeline, where access to
+ * Datastore is not possible through the App Engine API. As such, the setter explicitly checks if
+ * the runtime is Beam.
+ *
+ *
Because the provided supplier is not guaranteed to be globally unique and compatible with
+ * existing IDs in the database, one should proceed with great care. It is safe to use an
+ * arbitrary supplier when the resulting IDs are not significant and not persisted back to the
+ * database, i.e. the IDs are only required by the {@link Buildable} contract but are not used in
+ * any meaningful way. One example is the RDE pipeline where we project EPP resource entities from
+ * history entries to watermark time, which are then marshalled into XML elements in the RDE
+ * deposits, where the IDs are omitted.
*/
- private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero
-
- private static final boolean isSelfAllocated() {
- return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
- || "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false"));
+ public static void setIdSupplier(Supplier idSupplier) {
+ checkState(
+ "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")),
+ "Can only set ID supplier in a Beam pipeline");
+ logger.atWarning().log("Using ID supplier override!");
+ IdService.idSupplier = idSupplier;
}
/** Allocates an id. */
- // TODO(b/201547855): Find a way to allocate a unique ID without datastore.
public static long allocateId() {
- return isSelfAllocated()
- ? nextSelfAllocatedId.getAndIncrement()
- : DatastoreServiceFactory.getDatastoreService()
- .allocateIds(APP_WIDE_ALLOCATION_KIND, 1)
- .iterator()
- .next()
- .getId();
+ return idSupplier.get();
}
- /** Resets the global self-allocated id counter (i.e. sets the next id to 1). */
- @VisibleForTesting
- public static void resetSelfAllocatedId() {
- checkState(
- isSelfAllocated(), "Can only call resetSelfAllocatedId() in unit tests or Beam pipelines");
- nextSelfAllocatedId.set(1); // ids cannot be zero
+ // TODO(b/201547855): Find a way to allocate a unique ID without datastore.
+ private static class DatastoreIdSupplier implements Supplier {
+
+ private static final DatastoreIdSupplier INSTANCE = new DatastoreIdSupplier();
+
+ /**
+ * A placeholder String passed into {@code DatastoreService.allocateIds} that ensures that all
+ * IDs are initialized from the same ID pool.
+ */
+ private static final String APP_WIDE_ALLOCATION_KIND = "common";
+
+ public static DatastoreIdSupplier getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Long get() {
+ return DatastoreServiceFactory.getDatastoreService()
+ .allocateIds(APP_WIDE_ALLOCATION_KIND, 1)
+ .iterator()
+ .next()
+ .getId();
+ }
+ }
+
+ /**
+ * An ID supplier that allocates an ID from a monotonically increasing atomic {@link long}.
+ *
+ * The generated IDs are only unique within the same JVM. It is not suitable for production use
+ * unless in cases the IDs are not significant.
+ */
+ public static class SelfAllocatedIdSupplier implements Supplier {
+
+ private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier();
+
+ /** Counts of used ids for self allocating IDs. */
+ private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero
+
+ public static SelfAllocatedIdSupplier getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Long get() {
+ return nextSelfAllocatedId.getAndIncrement();
+ }
+
+ public void reset() {
+ nextSelfAllocatedId.set(1);
+ }
}
}
diff --git a/core/src/test/java/google/registry/testing/AppEngineExtension.java b/core/src/test/java/google/registry/testing/AppEngineExtension.java
index 0cf37bef6..090d98eb3 100644
--- a/core/src/test/java/google/registry/testing/AppEngineExtension.java
+++ b/core/src/test/java/google/registry/testing/AppEngineExtension.java
@@ -40,7 +40,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.ObjectifyFilter;
-import google.registry.model.IdService;
+import google.registry.model.IdService.SelfAllocatedIdSupplier;
import google.registry.model.ofy.ObjectifyService;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.Registrar.State;
@@ -441,7 +441,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
ObjectifyService.initOfy();
// Reset id allocation in ObjectifyService so that ids are deterministic in tests.
- IdService.resetSelfAllocatedId();
+ SelfAllocatedIdSupplier.getInstance().reset();
this.ofyTestEntities.forEach(AppEngineExtension::register);
}