mirror of
https://github.com/google/nomulus
synced 2026-02-06 21:11:34 +00:00
Use DB migration state to determine running async replay SQL->DS (#1191)
* Use DB migration state to determine running async replay SQL->DS The SQL->DS replay likely could use more work (locking, returning the right codes, things like that) but that's outside the scope of this PR.
This commit is contained in:
@@ -22,12 +22,17 @@ import static google.registry.request.Action.Method.GET;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
|
||||
import google.registry.persistence.transaction.Transaction;
|
||||
import google.registry.persistence.transaction.TransactionEntity;
|
||||
import google.registry.request.Action;
|
||||
import google.registry.request.auth.Auth;
|
||||
import google.registry.util.Clock;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import javax.inject.Inject;
|
||||
import javax.persistence.NoResultException;
|
||||
|
||||
/** Cron task to replicate from Cloud SQL to datastore. */
|
||||
@@ -48,6 +53,13 @@ class ReplicateToDatastoreAction implements Runnable {
|
||||
*/
|
||||
public static final int BATCH_SIZE = 200;
|
||||
|
||||
private final Clock clock;
|
||||
|
||||
@Inject
|
||||
public ReplicateToDatastoreAction(Clock clock) {
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<TransactionEntity> getTransactionBatch() {
|
||||
// Get the next batch of transactions that we haven't replicated.
|
||||
@@ -59,7 +71,8 @@ class ReplicateToDatastoreAction implements Runnable {
|
||||
jpaTm()
|
||||
.query(
|
||||
"SELECT txn FROM TransactionEntity txn WHERE id >"
|
||||
+ " :lastId ORDER BY id")
|
||||
+ " :lastId ORDER BY id",
|
||||
TransactionEntity.class)
|
||||
.setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId())
|
||||
.setMaxResults(BATCH_SIZE)
|
||||
.getResultList());
|
||||
@@ -69,7 +82,7 @@ class ReplicateToDatastoreAction implements Runnable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a transaction to datastore, returns true if there was a fatal error and the batch should
|
||||
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
|
||||
* be aborted.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@@ -122,6 +135,13 @@ class ReplicateToDatastoreAction implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
|
||||
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
|
||||
logger.atInfo().log(
|
||||
String.format(
|
||||
"Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state));
|
||||
return;
|
||||
}
|
||||
// TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex,
|
||||
// EppResourceIndex.
|
||||
logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore");
|
||||
|
||||
@@ -18,19 +18,28 @@ import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
|
||||
import static google.registry.testing.LogsSubject.assertAboutLogs;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
|
||||
import com.google.common.collect.ImmutableSortedMap;
|
||||
import com.google.common.testing.TestLogHandler;
|
||||
import com.googlecode.objectify.Key;
|
||||
import com.googlecode.objectify.annotation.Entity;
|
||||
import com.googlecode.objectify.annotation.Id;
|
||||
import google.registry.config.RegistryConfig;
|
||||
import google.registry.model.ImmutableObject;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
|
||||
import google.registry.model.ofy.Ofy;
|
||||
import google.registry.persistence.VKey;
|
||||
import google.registry.persistence.transaction.TransactionEntity;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.InjectExtension;
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -38,25 +47,42 @@ import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
public class ReplicateToDatastoreActionTest {
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock(DateTime.parse("2000-01-01TZ"));
|
||||
|
||||
@RegisterExtension
|
||||
public final AppEngineExtension appEngine =
|
||||
AppEngineExtension.builder()
|
||||
.withDatastoreAndCloudSql()
|
||||
.withOfyTestEntities(TestEntity.class)
|
||||
.withJpaUnitTestEntities(TestEntity.class)
|
||||
.withClock(fakeClock)
|
||||
.build();
|
||||
|
||||
ReplicateToDatastoreAction task = new ReplicateToDatastoreAction();
|
||||
@RegisterExtension final InjectExtension injectExtension = new InjectExtension();
|
||||
|
||||
TestLogHandler logHandler;
|
||||
|
||||
public ReplicateToDatastoreActionTest() {}
|
||||
private final ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(fakeClock);
|
||||
private final TestLogHandler logHandler = new TestLogHandler();
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
injectExtension.setStaticField(Ofy.class, "clock", fakeClock);
|
||||
RegistryConfig.overrideCloudSqlReplicateTransactions(true);
|
||||
logHandler = new TestLogHandler();
|
||||
Logger.getLogger(ReplicateToDatastoreAction.class.getCanonicalName()).addHandler(logHandler);
|
||||
DateTime now = fakeClock.nowUtc();
|
||||
ofyTm()
|
||||
.transact(
|
||||
() ->
|
||||
DatabaseMigrationStateSchedule.set(
|
||||
ImmutableSortedMap.of(
|
||||
START_OF_TIME,
|
||||
MigrationState.DATASTORE_ONLY,
|
||||
now,
|
||||
MigrationState.DATASTORE_PRIMARY,
|
||||
now.plusHours(1),
|
||||
MigrationState.DATASTORE_PRIMARY_READ_ONLY,
|
||||
now.plusHours(2),
|
||||
MigrationState.SQL_PRIMARY)));
|
||||
fakeClock.advanceBy(Duration.standardDays(1));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
@@ -170,6 +196,36 @@ public class ReplicateToDatastoreActionTest {
|
||||
"Missing transaction: last transaction id = -1, next available transaction = 1");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testNotInMigrationState_doesNothing() {
|
||||
// set a schedule that backtracks the current status to DATASTORE_PRIMARY_READ_ONLY
|
||||
DateTime now = fakeClock.nowUtc();
|
||||
ofyTm()
|
||||
.transact(
|
||||
() ->
|
||||
DatabaseMigrationStateSchedule.set(
|
||||
ImmutableSortedMap.<DateTime, MigrationState>naturalOrder()
|
||||
.put(START_OF_TIME, MigrationState.DATASTORE_ONLY)
|
||||
.put(START_OF_TIME.plusHours(1), MigrationState.DATASTORE_PRIMARY)
|
||||
.put(START_OF_TIME.plusHours(2), MigrationState.DATASTORE_PRIMARY_READ_ONLY)
|
||||
.put(START_OF_TIME.plusHours(3), MigrationState.SQL_PRIMARY)
|
||||
.put(now.plusHours(1), MigrationState.SQL_PRIMARY_READ_ONLY)
|
||||
.put(now.plusHours(2), MigrationState.DATASTORE_PRIMARY_READ_ONLY)
|
||||
.build()));
|
||||
fakeClock.advanceBy(Duration.standardDays(1));
|
||||
|
||||
jpaTm().transact(() -> jpaTm().insert(new TestEntity("foo")));
|
||||
task.run();
|
||||
// Replication shouldn't have happened
|
||||
assertThat(ofyTm().loadAllOf(TestEntity.class)).isEmpty();
|
||||
assertAboutLogs()
|
||||
.that(logHandler)
|
||||
.hasLogAtLevelWithMessage(
|
||||
Level.INFO,
|
||||
"Skipping ReplicateToDatastoreAction because we are in migration phase "
|
||||
+ "DATASTORE_PRIMARY_READ_ONLY.");
|
||||
}
|
||||
|
||||
@Entity(name = "ReplicationTestEntity")
|
||||
@javax.persistence.Entity(name = "TestEntity")
|
||||
private static class TestEntity extends ImmutableObject {
|
||||
|
||||
Reference in New Issue
Block a user