diff --git a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java index d2ff5f889..fa829b53c 100644 --- a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java @@ -22,12 +22,17 @@ import static google.registry.request.Action.Method.GET; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; +import google.registry.model.common.DatabaseMigrationStateSchedule; +import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; import google.registry.persistence.transaction.Transaction; import google.registry.persistence.transaction.TransactionEntity; import google.registry.request.Action; import google.registry.request.auth.Auth; +import google.registry.util.Clock; import java.io.IOException; import java.util.List; +import javax.inject.Inject; import javax.persistence.NoResultException; /** Cron task to replicate from Cloud SQL to datastore. */ @@ -48,6 +53,13 @@ class ReplicateToDatastoreAction implements Runnable { */ public static final int BATCH_SIZE = 200; + private final Clock clock; + + @Inject + public ReplicateToDatastoreAction(Clock clock) { + this.clock = clock; + } + @VisibleForTesting List getTransactionBatch() { // Get the next batch of transactions that we haven't replicated. @@ -59,7 +71,8 @@ class ReplicateToDatastoreAction implements Runnable { jpaTm() .query( "SELECT txn FROM TransactionEntity txn WHERE id >" - + " :lastId ORDER BY id") + + " :lastId ORDER BY id", + TransactionEntity.class) .setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId()) .setMaxResults(BATCH_SIZE) .getResultList()); @@ -69,7 +82,7 @@ class ReplicateToDatastoreAction implements Runnable { } /** - * Apply a transaction to datastore, returns true if there was a fatal error and the batch should + * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should * be aborted. */ @VisibleForTesting @@ -122,6 +135,13 @@ class ReplicateToDatastoreAction implements Runnable { @Override public void run() { + MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); + if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { + logger.atInfo().log( + String.format( + "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state)); + return; + } // TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex, // EppResourceIndex. logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); diff --git a/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java index cc01b7249..8b839d678 100644 --- a/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java +++ b/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java @@ -18,19 +18,28 @@ import static com.google.common.truth.Truth.assertThat; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.testing.LogsSubject.assertAboutLogs; +import static google.registry.util.DateTimeUtils.START_OF_TIME; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.testing.TestLogHandler; import com.googlecode.objectify.Key; import com.googlecode.objectify.annotation.Entity; import com.googlecode.objectify.annotation.Id; import google.registry.config.RegistryConfig; import google.registry.model.ImmutableObject; +import google.registry.model.common.DatabaseMigrationStateSchedule; +import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import google.registry.model.ofy.Ofy; import google.registry.persistence.VKey; import google.registry.persistence.transaction.TransactionEntity; import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectExtension; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,25 +47,42 @@ import org.junit.jupiter.api.extension.RegisterExtension; public class ReplicateToDatastoreActionTest { + private final FakeClock fakeClock = new FakeClock(DateTime.parse("2000-01-01TZ")); + @RegisterExtension public final AppEngineExtension appEngine = AppEngineExtension.builder() .withDatastoreAndCloudSql() .withOfyTestEntities(TestEntity.class) .withJpaUnitTestEntities(TestEntity.class) + .withClock(fakeClock) .build(); - ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(); + @RegisterExtension final InjectExtension injectExtension = new InjectExtension(); - TestLogHandler logHandler; - - public ReplicateToDatastoreActionTest() {} + private final ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(fakeClock); + private final TestLogHandler logHandler = new TestLogHandler(); @BeforeEach public void setUp() { + injectExtension.setStaticField(Ofy.class, "clock", fakeClock); RegistryConfig.overrideCloudSqlReplicateTransactions(true); - logHandler = new TestLogHandler(); Logger.getLogger(ReplicateToDatastoreAction.class.getCanonicalName()).addHandler(logHandler); + DateTime now = fakeClock.nowUtc(); + ofyTm() + .transact( + () -> + DatabaseMigrationStateSchedule.set( + ImmutableSortedMap.of( + START_OF_TIME, + MigrationState.DATASTORE_ONLY, + now, + MigrationState.DATASTORE_PRIMARY, + now.plusHours(1), + MigrationState.DATASTORE_PRIMARY_READ_ONLY, + now.plusHours(2), + MigrationState.SQL_PRIMARY))); + fakeClock.advanceBy(Duration.standardDays(1)); } @AfterEach @@ -170,6 +196,36 @@ public class ReplicateToDatastoreActionTest { "Missing transaction: last transaction id = -1, next available transaction = 1"); } + @Test + void testNotInMigrationState_doesNothing() { + // set a schedule that backtracks the current status to DATASTORE_PRIMARY_READ_ONLY + DateTime now = fakeClock.nowUtc(); + ofyTm() + .transact( + () -> + DatabaseMigrationStateSchedule.set( + ImmutableSortedMap.naturalOrder() + .put(START_OF_TIME, MigrationState.DATASTORE_ONLY) + .put(START_OF_TIME.plusHours(1), MigrationState.DATASTORE_PRIMARY) + .put(START_OF_TIME.plusHours(2), MigrationState.DATASTORE_PRIMARY_READ_ONLY) + .put(START_OF_TIME.plusHours(3), MigrationState.SQL_PRIMARY) + .put(now.plusHours(1), MigrationState.SQL_PRIMARY_READ_ONLY) + .put(now.plusHours(2), MigrationState.DATASTORE_PRIMARY_READ_ONLY) + .build())); + fakeClock.advanceBy(Duration.standardDays(1)); + + jpaTm().transact(() -> jpaTm().insert(new TestEntity("foo"))); + task.run(); + // Replication shouldn't have happened + assertThat(ofyTm().loadAllOf(TestEntity.class)).isEmpty(); + assertAboutLogs() + .that(logHandler) + .hasLogAtLevelWithMessage( + Level.INFO, + "Skipping ReplicateToDatastoreAction because we are in migration phase " + + "DATASTORE_PRIMARY_READ_ONLY."); + } + @Entity(name = "ReplicationTestEntity") @javax.persistence.Entity(name = "TestEntity") private static class TestEntity extends ImmutableObject {