1
0
mirror of https://github.com/google/nomulus synced 2026-02-10 06:50:30 +00:00

Support testing SQL -> DS replication in ReplayExt (#1216)

* Support testing SQL -> DS replication in ReplayExt

Support testing of Postgres -> Datastore replication in the ReplayExtension
when running in SQL mode in a DualDatabaseTest.

This is currently only enabled for one test (HostInfoFlowTest) since this form
of replication is likely to be problematic in many cases.

As part of this change:

- Add a thread-local flag so that we don't attempt to do certain data
  transformations when serializing entities for storage in a Transaction
  record. (These typically need to be called in a datastore transaction).
- Replace tm() in datastore translators with ofyTm() (these should only be
  called from within an ofy transaction) and also in the replay system itself.
- Add a transactWithoutBackup() method for use within the replay itself.
- Prevent replication of entities that are not intended to be replicated.
- Make some of the ReplicateToDatastoreAction methods public so we can invoke
  them from ReplayExtension.
- Change the way that the test type is stored in the extension context in a
  DualDatabaseTest so that we can check for it from the ReplayExtension.

* Limit number of tests and show output

Trying to debug why these are failing in kokoro.

* Move HostInfoFlowTest to fragile for now

The test now manipulates a globel variable that causes problems for other
tests.  There's likely a better fix for this, but for purposes of this PR we
can just move it to "fragile."

* Fix a few more problems

-   "replay" flag should have been initialized to false -- as it stands,
    replay wasn't happening.
-   disable "always save with backup" in the datastore helper, we were
    apparently getting some unwanted commit log entries that were causing
    timestamp inversions in other tests.  Also clear out the replay queue
    just for good hygiene.
-   Check for a null replicator in replayToOfy before proceeding.
-   Use a local inOfyContext flag to track whether we're in ofy context, as
    the tm() function is less reliable in dual-database tests.
This commit is contained in:
Michael Muller
2021-06-29 10:00:39 -04:00
committed by GitHub
parent 2e8a1c422d
commit 78a750b7e1
12 changed files with 196 additions and 46 deletions

View File

@@ -51,7 +51,7 @@ class HostInfoFlowTest extends ResourceFlowTestCase<HostInfoFlow, HostResource>
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
HostInfoFlowTest() {
setEppInput("host_info.xml", ImmutableMap.of("HOSTNAME", "ns1.example.tld"));

View File

@@ -477,7 +477,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
public void afterEach(ExtensionContext context) throws Exception {
checkArgumentNotNull(context, "The ExtensionContext must not be null");
try {
// If there is a replay extension, we'll want to call its replayToSql() method.
// If there is a replay extension, we'll want to call its replay() method.
//
// We have to provide this hook here for ReplayExtension instead of relying on
// ReplayExtension's afterEach() method because of ordering and the conflation of environment
@@ -492,7 +492,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
(ReplayExtension)
context.getStore(ExtensionContext.Namespace.GLOBAL).get(ReplayExtension.class);
if (replayer != null) {
replayer.replayToSql();
replayer.replay();
}
if (withCloudSql) {

View File

@@ -51,15 +51,23 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
return true;
}
/**
* Returns true if "context" is an objectify unit test.
*
* <p>Provided to allow ReplayExtension to make this determination.
*/
static boolean inOfyContext(ExtensionContext context) {
return (DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY)
== DatabaseType.OFY;
}
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
ExtensionContext context) {
TestTemplateInvocationContext ofyContext =
createInvocationContext(
context.getDisplayName() + " with Datastore", TransactionManagerFactory::ofyTm);
createInvocationContext(context.getDisplayName() + " with Datastore", DatabaseType.OFY);
TestTemplateInvocationContext sqlContext =
createInvocationContext(
context.getDisplayName() + " with PostgreSQL", TransactionManagerFactory::jpaTm);
createInvocationContext(context.getDisplayName() + " with PostgreSQL", DatabaseType.JPA);
Method testMethod = context.getTestMethod().orElseThrow(IllegalStateException::new);
if (testMethod.isAnnotationPresent(TestOfyAndSql.class)) {
return Stream.of(ofyContext, sqlContext);
@@ -74,7 +82,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
}
private TestTemplateInvocationContext createInvocationContext(
String name, Supplier<? extends TransactionManager> tmSupplier) {
String name, DatabaseType databaseType) {
return new TestTemplateInvocationContext() {
@Override
public String getDisplayName(int invocationIndex) {
@@ -83,17 +91,17 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
@Override
public List<Extension> getAdditionalExtensions() {
return ImmutableList.of(new DatabaseSwitchInvocationContext(tmSupplier));
return ImmutableList.of(new DatabaseSwitchInvocationContext(databaseType));
}
};
}
private static class DatabaseSwitchInvocationContext implements TestInstancePostProcessor {
private Supplier<? extends TransactionManager> tmSupplier;
private DatabaseType databaseType;
private DatabaseSwitchInvocationContext(Supplier<? extends TransactionManager> tmSupplier) {
this.tmSupplier = tmSupplier;
private DatabaseSwitchInvocationContext(DatabaseType databaseType) {
this.databaseType = databaseType;
}
@Override
@@ -113,7 +121,7 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
throw new IllegalStateException(
"AppEngineExtension in @DualDatabaseTest test must set withDatastoreAndCloudSql()");
}
context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, tmSupplier);
context.getStore(NAMESPACE).put(INJECTED_TM_SUPPLIER_KEY, databaseType);
}
private static ImmutableList<Field> getAppEngineExtensionFields(Class<?> clazz) {
@@ -144,10 +152,9 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
}
});
context.getStore(NAMESPACE).put(ORIGINAL_TM_KEY, tm());
Supplier<? extends TransactionManager> tmSupplier =
(Supplier<? extends TransactionManager>)
context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY);
TransactionManagerFactory.setTm(tmSupplier.get());
DatabaseType databaseType =
(DatabaseType) context.getStore(NAMESPACE).get(INJECTED_TM_SUPPLIER_KEY);
TransactionManagerFactory.setTm(databaseType.getTm());
}
}
@@ -171,4 +178,20 @@ class DualDatabaseTestInvocationContextProvider implements TestTemplateInvocatio
return testInstance.getClass().isAnnotationPresent(DualDatabaseTest.class)
&& isDeclaredTestMethod;
}
private enum DatabaseType {
JPA(TransactionManagerFactory::jpaTm),
OFY(TransactionManagerFactory::ofyTm);
@SuppressWarnings("Immutable") // Supplier is immutable, but not annotated as such.
private final Supplier<? extends TransactionManager> supplier;
DatabaseType(Supplier<? extends TransactionManager> supplier) {
this.supplier = supplier;
}
TransactionManager getTm() {
return supplier.get();
}
}
}

View File

@@ -22,13 +22,17 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.config.RegistryConfig;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.ReplayQueue;
import google.registry.model.ofy.TransactionInfo;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.schema.replay.DatastoreEntity;
import google.registry.schema.replay.ReplicateToDatastoreAction;
import java.util.Optional;
import javax.annotation.Nullable;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -48,19 +52,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
FakeClock clock;
boolean compare;
boolean replayed = false;
boolean inOfyContext;
InjectExtension injectExtension = new InjectExtension();
@Nullable ReplicateToDatastoreAction sqlToDsReplicator;
private ReplayExtension(FakeClock clock, boolean compare) {
private ReplayExtension(
FakeClock clock, boolean compare, @Nullable ReplicateToDatastoreAction sqlToDsReplicator) {
this.clock = clock;
this.compare = compare;
this.sqlToDsReplicator = sqlToDsReplicator;
}
public static ReplayExtension createWithCompare(FakeClock clock) {
return new ReplayExtension(clock, true);
return new ReplayExtension(clock, true, null);
}
public static ReplayExtension createWithoutCompare(FakeClock clock) {
return new ReplayExtension(clock, false);
/**
* Create a replay extension that replays from SQL to cloud datastore when running in SQL mode.
*/
public static ReplayExtension createWithDoubleReplay(FakeClock clock) {
return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock));
}
@Override
@@ -74,16 +86,27 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
DatabaseHelper.setClock(clock);
DatabaseHelper.setAlwaysSaveWithBackup(true);
ReplayQueue.clear();
// When running in JPA mode with double replay enabled, enable JPA transaction replication.
// Note that we can't just use isOfy() here because this extension gets run before the dual-test
// transaction manager gets injected.
inOfyContext = DualDatabaseTestInvocationContextProvider.inOfyContext(context);
if (sqlToDsReplicator != null && !inOfyContext) {
RegistryConfig.overrideCloudSqlReplicateTransactions(true);
}
context.getStore(ExtensionContext.Namespace.GLOBAL).put(ReplayExtension.class, this);
}
@Override
public void afterEach(ExtensionContext context) {
// This ensures that we do the replay even if we're not called from AppEngineExtension. It
// should be safe to call replayToSql() twice, as the replay queue should be empty the second
// time.
replayToSql();
// is safe to call replay() twice, as the method ensures idempotence.
replay();
injectExtension.afterEach(context);
if (sqlToDsReplicator != null) {
RegistryConfig.overrideCloudSqlReplicateTransactions(false);
}
}
private static ImmutableSet<String> NON_REPLICATED_TYPES =
@@ -104,7 +127,26 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
"ForeignKeyContactIndex",
"ForeignKeyDomainIndex");
public void replayToSql() {
public void replay() {
if (!replayed) {
if (inOfyContext) {
replayToSql();
} else {
// Disable database backups. For unknown reason, if we don't do this we get residual commit
// log entries that cause timestamp inversions in other tests.
DatabaseHelper.setAlwaysSaveWithBackup(false);
// Do the ofy replay.
replayToOfy();
// Clean out anything that ends up in the replay queue.
ReplayQueue.clear();
}
replayed = true;
}
}
private void replayToSql() {
DatabaseHelper.setAlwaysSaveWithBackup(false);
ImmutableMap<Key<?>, Object> changes = ReplayQueue.replay();
@@ -139,4 +181,18 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
}
}
}
private void replayToOfy() {
if (sqlToDsReplicator == null) {
return;
}
// TODO(mmuller): Verify that all entities are the same across both databases.
for (TransactionEntity txn : sqlToDsReplicator.getTransactionBatch()) {
if (sqlToDsReplicator.applyTransaction(txn)) {
break;
}
clock.advanceOneMilli();
}
}
}