mirror of
https://github.com/google/nomulus
synced 2026-02-10 06:50:30 +00:00
Add locking and a response in ReplicateToDatastoreAction (#1328)
* Add locking and a response in ReplicateToDatastoreAction The response is necessary to get nicer logs in GAE and nicer cron job behavior. In addition: - fix issues where locks would be backed up and replayed to Datastore (they shouldn't be replayed) - do ignore-read-only writes when replaying the transactions
This commit is contained in:
@@ -20,21 +20,30 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
|
||||
import static google.registry.testing.DatabaseHelper.insertInDb;
|
||||
import static google.registry.testing.LogsSubject.assertAboutLogs;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableSortedMap;
|
||||
import com.google.common.testing.TestLogHandler;
|
||||
import com.google.common.truth.Truth8;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule;
|
||||
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
|
||||
import google.registry.model.ofy.CommitLogBucket;
|
||||
import google.registry.model.ofy.Ofy;
|
||||
import google.registry.model.server.Lock;
|
||||
import google.registry.persistence.transaction.TransactionEntity;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.DatabaseHelper;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import google.registry.testing.InjectExtension;
|
||||
import google.registry.testing.TestObject;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@@ -54,18 +63,20 @@ public class ReplicateToDatastoreActionTest {
|
||||
public final AppEngineExtension appEngine =
|
||||
AppEngineExtension.builder()
|
||||
.withDatastoreAndCloudSql()
|
||||
.withOfyTestEntities(TestObject.class)
|
||||
.withJpaUnitTestEntities(TestObject.class)
|
||||
.withOfyTestEntities(Lock.class, TestObject.class)
|
||||
.withJpaUnitTestEntities(Lock.class, TestObject.class)
|
||||
.withClock(fakeClock)
|
||||
.build();
|
||||
|
||||
@RegisterExtension final InjectExtension injectExtension = new InjectExtension();
|
||||
|
||||
private final ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(fakeClock);
|
||||
private final TestLogHandler logHandler = new TestLogHandler();
|
||||
private ReplicateToDatastoreAction action;
|
||||
private FakeResponse response;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
resetAction();
|
||||
injectExtension.setStaticField(Ofy.class, "clock", fakeClock);
|
||||
// Use a single bucket to expose timestamp inversion problems.
|
||||
injectExtension.setStaticField(
|
||||
@@ -95,7 +106,7 @@ public class ReplicateToDatastoreActionTest {
|
||||
jpaTm().insert(foo);
|
||||
jpaTm().insert(bar);
|
||||
});
|
||||
task.run();
|
||||
runAndVerifySuccess();
|
||||
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo);
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
|
||||
@@ -107,7 +118,7 @@ public class ReplicateToDatastoreActionTest {
|
||||
jpaTm().delete(bar.key());
|
||||
jpaTm().insert(baz);
|
||||
});
|
||||
task.run();
|
||||
runAndVerifySuccess();
|
||||
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()).isPresent())).isFalse();
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz);
|
||||
@@ -120,7 +131,7 @@ public class ReplicateToDatastoreActionTest {
|
||||
|
||||
// Write a transaction containing "foo".
|
||||
insertInDb(foo);
|
||||
task.run();
|
||||
runAndVerifySuccess();
|
||||
|
||||
// Verify that it propagated to datastore, then remove "foo" directly from datastore.
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo);
|
||||
@@ -128,7 +139,7 @@ public class ReplicateToDatastoreActionTest {
|
||||
|
||||
// Write "bar"
|
||||
insertInDb(bar);
|
||||
task.run();
|
||||
runAndVerifySuccess();
|
||||
|
||||
// If we replayed only the most recent transaction, we should have "bar" but not "foo".
|
||||
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
|
||||
@@ -142,23 +153,23 @@ public class ReplicateToDatastoreActionTest {
|
||||
|
||||
// Write a transaction and run just the batch fetch.
|
||||
insertInDb(foo);
|
||||
List<TransactionEntity> txns1 = task.getTransactionBatch();
|
||||
List<TransactionEntity> txns1 = action.getTransactionBatch();
|
||||
assertThat(txns1).hasSize(1);
|
||||
|
||||
// Write a second transaction and do another batch fetch.
|
||||
insertInDb(bar);
|
||||
List<TransactionEntity> txns2 = task.getTransactionBatch();
|
||||
List<TransactionEntity> txns2 = action.getTransactionBatch();
|
||||
assertThat(txns2).hasSize(2);
|
||||
|
||||
// Apply the first batch.
|
||||
task.applyTransaction(txns1.get(0));
|
||||
action.applyTransaction(txns1.get(0));
|
||||
|
||||
// Remove the foo record so we can ensure that this transaction doesn't get doublle-played.
|
||||
ofyTm().transact(() -> ofyTm().delete(foo.key()));
|
||||
|
||||
// Apply the second batch.
|
||||
for (TransactionEntity txn : txns2) {
|
||||
task.applyTransaction(txn);
|
||||
action.applyTransaction(txn);
|
||||
}
|
||||
|
||||
// Verify that the first transaction didn't get replayed but the second one did.
|
||||
@@ -179,9 +190,10 @@ public class ReplicateToDatastoreActionTest {
|
||||
// Force the last transaction id back to -1 so that we look for transaction 0.
|
||||
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
|
||||
|
||||
List<TransactionEntity> txns = task.getTransactionBatch();
|
||||
List<TransactionEntity> txns = action.getTransactionBatch();
|
||||
assertThat(txns).hasSize(1);
|
||||
assertThat(assertThrows(IllegalStateException.class, () -> task.applyTransaction(txns.get(0))))
|
||||
assertThat(
|
||||
assertThrows(IllegalStateException.class, () -> action.applyTransaction(txns.get(0))))
|
||||
.hasMessageThat()
|
||||
.isEqualTo("Missing transaction: last txn id = -1, next available txn = 1");
|
||||
}
|
||||
@@ -194,19 +206,21 @@ public class ReplicateToDatastoreActionTest {
|
||||
|
||||
// Force the last transaction id back to -1 so that we look for transaction 0.
|
||||
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
|
||||
task.run();
|
||||
action.run();
|
||||
assertAboutLogs()
|
||||
.that(logHandler)
|
||||
.hasSevereLogWithCause(
|
||||
new IllegalStateException(
|
||||
"Missing transaction: last txn id = -1, next available txn = 1"));
|
||||
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
|
||||
assertThat(response.getPayload()).isEqualTo("Errored out replaying files");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBeforeDatastoreSaveCallback() {
|
||||
TestObject testObject = TestObject.create("foo");
|
||||
jpaTm().transact(() -> jpaTm().put(testObject));
|
||||
task.run();
|
||||
action.run();
|
||||
assertThat(ofyTm().loadAllOf(TestObject.class)).containsExactly(testObject);
|
||||
assertThat(TestObject.beforeDatastoreSaveCallCount).isEqualTo(1);
|
||||
}
|
||||
@@ -231,7 +245,7 @@ public class ReplicateToDatastoreActionTest {
|
||||
fakeClock.advanceBy(Duration.standardDays(1));
|
||||
|
||||
insertInDb(TestObject.create("foo"));
|
||||
task.run();
|
||||
action.run();
|
||||
// Replication shouldn't have happened
|
||||
assertThat(ofyTm().loadAllOf(TestObject.class)).isEmpty();
|
||||
assertAboutLogs()
|
||||
@@ -240,5 +254,46 @@ public class ReplicateToDatastoreActionTest {
|
||||
Level.INFO,
|
||||
"Skipping ReplicateToDatastoreAction because we are in migration phase "
|
||||
+ "DATASTORE_PRIMARY.");
|
||||
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
|
||||
assertThat(response.getPayload())
|
||||
.isEqualTo(
|
||||
"Skipping ReplicateToDatastoreAction because we are in migration phase"
|
||||
+ " DATASTORE_PRIMARY.");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFailure_cannotAcquireLock() {
|
||||
RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class);
|
||||
when(requestStatusChecker.getLogId()).thenReturn("logId");
|
||||
Truth8.assertThat(
|
||||
Lock.acquire(
|
||||
ReplicateToDatastoreAction.class.getSimpleName(),
|
||||
null,
|
||||
Duration.standardHours(1),
|
||||
requestStatusChecker,
|
||||
false))
|
||||
.isPresent();
|
||||
fakeClock.advanceOneMilli();
|
||||
|
||||
resetAction();
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
|
||||
assertThat(response.getPayload())
|
||||
.isEqualTo("Can't acquire ReplicateToDatastoreAction lock, aborting.");
|
||||
}
|
||||
|
||||
private void runAndVerifySuccess() {
|
||||
resetAction();
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
assertThat(response.getPayload())
|
||||
.isEqualTo("Replayed 1 transaction(s) from Cloud SQL -> Datastore");
|
||||
}
|
||||
|
||||
private void resetAction() {
|
||||
response = new FakeResponse();
|
||||
RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class);
|
||||
when(requestStatusChecker.getLogId()).thenReturn("logId");
|
||||
action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ import google.registry.persistence.transaction.Transaction.Delete;
|
||||
import google.registry.persistence.transaction.Transaction.Mutation;
|
||||
import google.registry.persistence.transaction.Transaction.Update;
|
||||
import google.registry.persistence.transaction.TransactionEntity;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@@ -45,6 +46,7 @@ import javax.annotation.Nullable;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* A JUnit extension that replays datastore transactions against postgresql.
|
||||
@@ -81,7 +83,11 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
|
||||
* Create a replay extension that replays from SQL to cloud datastore when running in SQL mode.
|
||||
*/
|
||||
public static ReplayExtension createWithDoubleReplay(FakeClock clock) {
|
||||
return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock));
|
||||
return new ReplayExtension(
|
||||
clock,
|
||||
true,
|
||||
new ReplicateToDatastoreAction(
|
||||
clock, Mockito.mock(RequestStatusChecker.class), new FakeResponse()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user