1
0
mirror of https://github.com/google/nomulus synced 2026-06-09 16:33:02 +00:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Lai Jiang c1ad06afd1 Allow the beam parameter in RDE standard mode (#1505)
Standard mode will determine the watermarks based on the cursors and
kick off subsequent uploading steps. In order to run both the Beam and
the Mapreduce pipeline in parallel, we need to allow setting the beam
parameter when in standard mode. This changes should have been part of
https://github.com/google/nomulus/pull/1500.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1505)
<!-- Reviewable:end -->
2022-01-31 14:20:23 -05:00
gbrodman b24670f33a Use the replica jpaTm in FKI and EppResource cache methods (#1503)
The cached methods are only used in situations where we don't really
care about being 100% synchronously up to date (e.g. whois), and they're
not used frequently anyway, so it's safe to use the replica in these
locations.
2022-01-28 18:05:18 -05:00
Weimin Yu 1253fa479a Release ValidateSqlPipeline as container image (#1504)
* Release ValidateSqlPipeline as container image
2022-01-28 14:57:31 -05:00
Weimin Yu 5f0dd24906 Release ValidateDatastorePipeline (#1501)
* Release ValidateDatastorePipeline
2022-01-26 13:38:19 -05:00
10 changed files with 153 additions and 25 deletions
+10
View File
@@ -796,6 +796,16 @@ if (environment == 'alpha') {
mainClass: 'google.registry.beam.rde.RdePipeline',
metaData : 'google/registry/beam/rde_pipeline_metadata.json'
],
validateDatastore :
[
mainClass: 'google.registry.beam.comparedb.ValidateDatastorePipeline',
metaData: 'google/registry/beam/validate_datastore_pipeline_metadata.json'
],
validateSql :
[
mainClass: 'google.registry.beam.comparedb.ValidateSqlPipeline',
metaData: 'google/registry/beam/validate_sql_pipeline_metadata.json'
],
]
project.tasks.create("stageBeamPipelines") {
doLast {
@@ -21,6 +21,7 @@ import static com.google.common.collect.Sets.union;
import static google.registry.config.RegistryConfig.getEppResourceCachingDuration;
import static google.registry.config.RegistryConfig.getEppResourceMaxCachedEntries;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.replicaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.util.CollectionUtils.nullToEmpty;
import static google.registry.util.CollectionUtils.nullToEmptyImmutableCopy;
@@ -380,13 +381,13 @@ public abstract class EppResource extends BackupGroupRoot implements Buildable {
@Override
public EppResource load(VKey<? extends EppResource> key) {
return tm().doTransactionless(() -> tm().loadByKey(key));
return replicaTm().doTransactionless(() -> replicaTm().loadByKey(key));
}
@Override
public Map<VKey<? extends EppResource>, EppResource> loadAll(
Iterable<? extends VKey<? extends EppResource>> keys) {
return tm().doTransactionless(() -> tm().loadByKeys(keys));
return replicaTm().doTransactionless(() -> replicaTm().loadByKeys(keys));
}
};
@@ -21,6 +21,7 @@ import static google.registry.config.RegistryConfig.getEppResourceCachingDuratio
import static google.registry.config.RegistryConfig.getEppResourceMaxCachedEntries;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.replicaJpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.util.CollectionUtils.entriesToImmutableMap;
import static google.registry.util.TypeUtils.instantiate;
@@ -51,6 +52,7 @@ import google.registry.model.host.HostResource;
import google.registry.model.replay.DatastoreOnlyEntity;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.CriteriaQueryBuilder;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.util.NonFinalForTesting;
import java.util.Collection;
import java.util.Comparator;
@@ -198,7 +200,7 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
*/
public static <E extends EppResource> ImmutableMap<String, ForeignKeyIndex<E>> load(
Class<E> clazz, Collection<String> foreignKeys, final DateTime now) {
return loadIndexesFromStore(clazz, foreignKeys, true).entrySet().stream()
return loadIndexesFromStore(clazz, foreignKeys, true, false).entrySet().stream()
.filter(e -> now.isBefore(e.getValue().getDeletionTime()))
.collect(entriesToImmutableMap());
}
@@ -217,7 +219,10 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
*/
private static <E extends EppResource>
ImmutableMap<String, ForeignKeyIndex<E>> loadIndexesFromStore(
Class<E> clazz, Collection<String> foreignKeys, boolean inTransaction) {
Class<E> clazz,
Collection<String> foreignKeys,
boolean inTransaction,
boolean useReplicaJpaTm) {
if (tm().isOfy()) {
Class<ForeignKeyIndex<E>> fkiClass = mapToFkiClass(clazz);
return ImmutableMap.copyOf(
@@ -226,17 +231,18 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
: tm().doTransactionless(() -> auditedOfy().load().type(fkiClass).ids(foreignKeys)));
} else {
String property = RESOURCE_CLASS_TO_FKI_PROPERTY.get(clazz);
JpaTransactionManager jpaTmToUse = useReplicaJpaTm ? replicaJpaTm() : jpaTm();
ImmutableList<ForeignKeyIndex<E>> indexes =
tm().transact(
() ->
jpaTm()
.criteriaQuery(
CriteriaQueryBuilder.create(clazz)
.whereFieldIsIn(property, foreignKeys)
.build())
.getResultStream()
.map(e -> ForeignKeyIndex.create(e, e.getDeletionTime()))
.collect(toImmutableList()));
jpaTmToUse.transact(
() ->
jpaTmToUse
.criteriaQuery(
CriteriaQueryBuilder.create(clazz)
.whereFieldIsIn(property, foreignKeys)
.build())
.getResultStream()
.map(e -> ForeignKeyIndex.create(e, e.getDeletionTime()))
.collect(toImmutableList()));
// We need to find and return the entities with the maximum deletionTime for each foreign key.
return Multimaps.index(indexes, ForeignKeyIndex::getForeignKey).asMap().entrySet().stream()
.map(
@@ -260,7 +266,8 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
loadIndexesFromStore(
RESOURCE_CLASS_TO_FKI_CLASS.inverse().get(key.getKind()),
ImmutableSet.of(foreignKey),
false)
false,
true)
.get(foreignKey));
}
@@ -276,7 +283,7 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
Streams.stream(keys).map(v -> v.getSqlKey().toString()).collect(toImmutableSet());
ImmutableSet<VKey<ForeignKeyIndex<?>>> typedKeys = ImmutableSet.copyOf(keys);
ImmutableMap<String, ? extends ForeignKeyIndex<? extends EppResource>> existingFkis =
loadIndexesFromStore(resourceClass, foreignKeys, false);
loadIndexesFromStore(resourceClass, foreignKeys, false, true);
// ofy omits keys that don't have values in Datastore, so re-add them in
// here with Optional.empty() values.
return Maps.asMap(
@@ -336,7 +343,7 @@ public abstract class ForeignKeyIndex<E extends EppResource> extends BackupGroup
// Safe to cast VKey<FKI<E>> to VKey<FKI<?>>
@SuppressWarnings("unchecked")
ImmutableList<VKey<ForeignKeyIndex<?>>> fkiVKeys =
Streams.stream(foreignKeys)
foreignKeys.stream()
.map(fk -> (VKey<ForeignKeyIndex<?>>) VKey.create(fkiClass, fk))
.collect(toImmutableList());
try {
@@ -14,8 +14,8 @@
package google.registry.persistence.transaction;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.util.PreconditionsUtils.checkArgumentNotNull;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.utils.SystemProperty;
@@ -47,6 +47,10 @@ public final class TransactionManagerFactory {
private static Supplier<JpaTransactionManager> jpaTm =
Suppliers.memoize(TransactionManagerFactory::createJpaTransactionManager);
@NonFinalForTesting
private static Supplier<JpaTransactionManager> replicaJpaTm =
Suppliers.memoize(TransactionManagerFactory::createReplicaJpaTransactionManager);
private static boolean onBeam = false;
private TransactionManagerFactory() {}
@@ -61,6 +65,14 @@ public final class TransactionManagerFactory {
}
}
private static JpaTransactionManager createReplicaJpaTransactionManager() {
if (isInAppEngine()) {
return DaggerPersistenceComponent.create().readOnlyReplicaJpaTransactionManager();
} else {
return DummyJpaTransactionManager.create();
}
}
private static DatastoreTransactionManager createTransactionManager() {
return new DatastoreTransactionManager(null);
}
@@ -108,6 +120,21 @@ public final class TransactionManagerFactory {
return jpaTm.get();
}
/** Returns a read-only {@link JpaTransactionManager} instance if configured. */
public static JpaTransactionManager replicaJpaTm() {
return replicaJpaTm.get();
}
/**
* Returns a {@link TransactionManager} that uses a replica database if one exists.
*
* <p>In Datastore mode, this is unchanged from the regular transaction manager. In SQL mode,
* however, this will be a reference to the read-only replica database if one is configured.
*/
public static TransactionManager replicaTm() {
return tm().isOfy() ? tm() : replicaJpaTm();
}
/** Returns {@link DatastoreTransactionManager} instance. */
@VisibleForTesting
public static DatastoreTransactionManager ofyTm() {
@@ -116,7 +143,7 @@ public final class TransactionManagerFactory {
/** Sets the return of {@link #jpaTm()} to the given instance of {@link JpaTransactionManager}. */
public static void setJpaTm(Supplier<JpaTransactionManager> jpaTmSupplier) {
checkNotNull(jpaTmSupplier, "jpaTmSupplier");
checkArgumentNotNull(jpaTmSupplier, "jpaTmSupplier");
checkState(
RegistryEnvironment.get().equals(RegistryEnvironment.UNITTEST)
|| RegistryToolEnvironment.get() != null,
@@ -124,13 +151,23 @@ public final class TransactionManagerFactory {
jpaTm = Suppliers.memoize(jpaTmSupplier::get);
}
/** Sets the value of {@link #replicaJpaTm()} to the given {@link JpaTransactionManager}. */
public static void setReplicaJpaTm(Supplier<JpaTransactionManager> replicaJpaTmSupplier) {
checkArgumentNotNull(replicaJpaTmSupplier, "replicaJpaTmSupplier");
checkState(
RegistryEnvironment.get().equals(RegistryEnvironment.UNITTEST)
|| RegistryToolEnvironment.get() != null,
"setReplicaJpaTm() should only be called by tools and tests.");
replicaJpaTm = Suppliers.memoize(replicaJpaTmSupplier::get);
}
/**
* Makes {@link #jpaTm()} return the {@link JpaTransactionManager} instance provided by {@code
* jpaTmSupplier} from now on. This method should only be called by an implementor of {@link
* org.apache.beam.sdk.harness.JvmInitializer}.
*/
public static void setJpaTmOnBeamWorker(Supplier<JpaTransactionManager> jpaTmSupplier) {
checkNotNull(jpaTmSupplier, "jpaTmSupplier");
checkArgumentNotNull(jpaTmSupplier, "jpaTmSupplier");
jpaTm = Suppliers.memoize(jpaTmSupplier::get);
onBeam = true;
}
@@ -391,9 +391,6 @@ public final class RdeStagingAction implements Runnable {
if (revision.isPresent()) {
throw new BadRequestException("Revision parameter not allowed in standard operation");
}
if (beam) {
throw new BadRequestException("Beam parameter not allowed in standard operation");
}
return ImmutableSetMultimap.copyOf(
Multimaps.filterValues(
@@ -0,0 +1,42 @@
{
"name": "Validate Datastore with Cloud SQL",
"description": "An Apache Beam batch pipeline that compares Datastore with the primary Cloud SQL database.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "isolationOverride",
"label": "The desired SQL transaction isolation level.",
"helpText": "The desired SQL transaction isolation level.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
]
},
{
"name": "sqlSnapshotId",
"label": "The ID of an exported Cloud SQL (Postgresql) snapshot.",
"helpText": "The ID of an exported Cloud SQL (Postgresql) snapshot.",
"is_optional": true
},
{
"name": "latestCommitLogTimestamp",
"label": "Nomulus CommitLog start time",
"helpText": "The latest entity update time allowed for inclusion in validation, in ISO8601 format.",
"is_optional": false
},
{
"name": "comparisonStartTimestamp",
"label": "Only entities updated at or after this time are included for validation.",
"helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.",
"is_optional": true
}
]
}
@@ -0,0 +1,21 @@
{
"name": "Validate Cloud SQL with Datastore being primary",
"description": "An Apache Beam batch pipeline that compares Cloud SQL with the primary Datastore.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "comparisonStartTimestamp",
"label": "Only entities updated at or after this time are included for validation.",
"helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.",
"is_optional": true
}
]
}
@@ -217,11 +217,14 @@ abstract class JpaTransactionManagerExtension implements BeforeEachCallback, Aft
JpaTransactionManagerImpl txnManager = new JpaTransactionManagerImpl(emf, clock);
cachedTm = TransactionManagerFactory.jpaTm();
TransactionManagerFactory.setJpaTm(Suppliers.ofInstance(txnManager));
TransactionManagerFactory.setReplicaJpaTm(
Suppliers.ofInstance(new ReplicaSimulatingJpaTransactionManager(txnManager)));
}
@Override
public void afterEach(ExtensionContext context) {
TransactionManagerFactory.setJpaTm(Suppliers.ofInstance(cachedTm));
TransactionManagerFactory.setReplicaJpaTm(Suppliers.ofInstance(cachedTm));
// Even though we didn't set this, reset it to make sure no other tests are affected
JpaTransactionManagerImpl.removeReplaySqlToDsOverrideForTest();
cachedTm = null;
@@ -91,9 +91,15 @@ public class ReplicaSimulatingJpaTransactionManager implements JpaTransactionMan
@Override
public <T> T transact(Supplier<T> work) {
if (delegate.inTransaction()) {
return work.get();
}
return delegate.transact(
() -> {
delegate.getEntityManager().createQuery("SET TRANSACTION READ ONLY").executeUpdate();
delegate
.getEntityManager()
.createNativeQuery("SET TRANSACTION READ ONLY")
.executeUpdate();
return work.get();
});
}
+5 -1
View File
@@ -96,7 +96,11 @@ steps:
google.registry.beam.invoicing.InvoicingPipeline \
google/registry/beam/invoicing_pipeline_metadata.json \
google.registry.beam.rde.RdePipeline \
google/registry/beam/rde_pipeline_metadata.json
google/registry/beam/rde_pipeline_metadata.json \
google.registry.beam.comparedb.ValidateDatastorePipeline \
google/registry/beam/validate_datastore_pipeline_metadata.json \
google.registry.beam.comparedb.ValidateSqlPipeline \
google/registry/beam/validate_sql_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.