1
0
mirror of https://github.com/google/nomulus synced 2026-06-09 16:33:02 +00:00

Add dual read for cursors (#473)

* Add dual read for cursors

* Fix loadAndCompareAll to batch load cursors

* fix javadocs
This commit is contained in:
sarahcaseybot
2020-02-19 16:10:19 -05:00
committed by GitHub
parent be395611ca
commit f53aa8d55e
13 changed files with 323 additions and 50 deletions

View File

@@ -25,6 +25,8 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_AUTORENEW;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.pricing.PricingEngineProxy.getDomainRenewCost;
import static google.registry.schema.cursor.Cursor.GLOBAL;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static google.registry.util.CollectionUtils.union;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static google.registry.util.DateTimeUtils.earliestOf;
@@ -93,6 +95,7 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
@Override
public void run() {
Cursor cursor = ofy().load().key(Cursor.createGlobalKey(RECURRING_BILLING)).now();
loadAndCompare(cursor, GLOBAL);
DateTime executeTime = clock.nowUtc();
DateTime persistedCursorTime = (cursor == null ? START_OF_TIME : cursor.getCursorTime());
DateTime cursorTime = cursorTimeParam.orElse(persistedCursorTime);
@@ -317,6 +320,7 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
tm().transact(
() -> {
Cursor cursor = ofy().load().key(Cursor.createGlobalKey(RECURRING_BILLING)).now();
loadAndCompare(cursor, GLOBAL);
DateTime currentCursorTime =
(cursor == null ? START_OF_TIME : cursor.getCursorTime());
if (!currentCursorTime.equals(expectedPersistedCursorTime)) {
@@ -327,8 +331,7 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
}
if (!isDryRun) {
CursorDao.saveCursor(
Cursor.createGlobal(RECURRING_BILLING, executionTime),
google.registry.schema.cursor.Cursor.GLOBAL);
Cursor.createGlobal(RECURRING_BILLING, executionTime), GLOBAL);
}
});
}

View File

@@ -25,6 +25,8 @@ import static google.registry.model.registrar.RegistrarContact.Type.LEGAL;
import static google.registry.model.registrar.RegistrarContact.Type.MARKETING;
import static google.registry.model.registrar.RegistrarContact.Type.TECH;
import static google.registry.model.registrar.RegistrarContact.Type.WHOIS;
import static google.registry.schema.cursor.Cursor.GLOBAL;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.base.Joiner;
@@ -62,6 +64,7 @@ class SyncRegistrarsSheet {
*/
boolean wereRegistrarsModified() {
Cursor cursor = ofy().load().key(Cursor.createGlobalKey(SYNC_REGISTRAR_SHEET)).now();
loadAndCompare(cursor, GLOBAL);
DateTime lastUpdateTime = (cursor == null) ? START_OF_TIME : cursor.getCursorTime();
for (Registrar registrar : Registrar.loadAllCached()) {
if (DateTimeUtils.isAtOrAfter(registrar.getLastUpdateTime(), lastUpdateTime)) {

View File

@@ -15,6 +15,7 @@
package google.registry.rde;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import com.google.common.flogger.FluentLogger;
import google.registry.model.common.Cursor;
@@ -91,6 +92,7 @@ class EscrowTaskRunner {
logger.atInfo().log("TLD: %s", registry.getTld());
DateTime startOfToday = clock.nowUtc().withTimeAtStartOfDay();
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
loadAndCompare(cursor, registry.getTldStr());
final DateTime nextRequiredRun = (cursor == null ? startOfToday : cursor.getCursorTime());
if (nextRequiredRun.isAfter(startOfToday)) {
throw new NoContentException("Already completed");

View File

@@ -17,6 +17,7 @@ package google.registry.rde;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.google.common.collect.ImmutableSetMultimap;
@@ -91,6 +92,7 @@ public final class PendingDepositChecker {
}
// Avoid creating a transaction unless absolutely necessary.
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
loadAndCompare(cursor, registry.getTldStr());
DateTime cursorValue = (cursor != null ? cursor.getCursorTime() : startingPoint);
if (isBeforeOrAt(cursorValue, now)) {
DateTime watermark = (cursor != null
@@ -111,6 +113,7 @@ public final class PendingDepositChecker {
return tm().transact(
() -> {
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
loadAndCompare(cursor, registry.getTldStr());
if (cursor != null) {
return cursor.getCursorTime();
}

View File

@@ -20,6 +20,7 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.google.appengine.tools.cloudstorage.GcsFilename;
@@ -76,9 +77,10 @@ public final class RdeReportAction implements Runnable, EscrowTask {
@Override
public void runWithLock(DateTime watermark) throws Exception {
DateTime cursorTime =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD, Registry.get(tld))).now());
Cursor cursor =
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD, Registry.get(tld))).now();
loadAndCompare(cursor, tld);
DateTime cursorTime = getCursorTimeOrStartOfTime(cursor);
if (isBeforeOrAt(cursorTime, watermark)) {
throw new NoContentException(
String.format(

View File

@@ -22,6 +22,7 @@ import static com.google.common.base.Verify.verify;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.appengine.tools.cloudstorage.GcsFilename;
@@ -207,9 +208,9 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
tm().transact(
() -> {
Registry registry = Registry.get(tld);
DateTime position =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(key.cursor(), registry)).now());
Cursor cursor = ofy().load().key(Cursor.createKey(key.cursor(), registry)).now();
loadAndCompare(cursor, tld);
DateTime position = getCursorTimeOrStartOfTime(cursor);
checkState(key.interval() != null, "Interval must be present");
DateTime newPosition = key.watermark().plus(key.interval());
if (!position.isBefore(newPosition)) {

View File

@@ -24,6 +24,7 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.request.Action.Method.POST;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import static java.util.Arrays.asList;
@@ -132,8 +133,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Override
public void runWithLock(final DateTime watermark) throws Exception {
logger.atInfo().log("Verifying readiness to upload the RDE deposit.");
DateTime stagingCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now());
Cursor cursor =
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now();
loadAndCompare(cursor, tld);
DateTime stagingCursorTime = getCursorTimeOrStartOfTime(cursor);
if (isBeforeOrAt(stagingCursorTime, watermark)) {
throw new NoContentException(
String.format(
@@ -141,9 +144,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
+ "last RDE staging completion was at %s",
tld, watermark, stagingCursorTime));
}
DateTime sftpCursorTime =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now());
Cursor sftpCursor =
ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now();
loadAndCompare(sftpCursor, tld);
DateTime sftpCursorTime = getCursorTimeOrStartOfTime(sftpCursor);
Duration timeSinceLastSftp = new Duration(sftpCursorTime, clock.nowUtc());
if (timeSinceLastSftp.isShorterThan(sftpCooldown)) {
throw new NoContentException(

View File

@@ -21,10 +21,10 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.request.Action.Method.POST;
import static google.registry.schema.cursor.CursorDao.loadAndCompareAll;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
@@ -54,7 +54,6 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.mail.internet.InternetAddress;
import org.joda.time.DateTime;
@@ -108,7 +107,7 @@ public final class IcannReportingUploadAction implements Runnable {
() -> {
ImmutableMap.Builder<String, Boolean> reportSummaryBuilder = new ImmutableMap.Builder<>();
ImmutableMap<Cursor, CursorInfo> cursors = loadCursors();
ImmutableMap<Cursor, String> cursors = loadCursors();
// If cursor time is before now, upload the corresponding report
cursors.entrySet().stream()
@@ -118,8 +117,8 @@ public final class IcannReportingUploadAction implements Runnable {
DateTime cursorTime = getCursorTimeOrStartOfTime(entry.getKey());
uploadReport(
cursorTime,
entry.getValue().getType(),
entry.getValue().getTld(),
entry.getKey().getType(),
entry.getValue(),
reportSummaryBuilder);
});
// Send email of which reports were uploaded
@@ -205,8 +204,8 @@ public final class IcannReportingUploadAction implements Runnable {
cursorTimeMinusMonth.monthOfYear().get());
}
/** Returns a map of each cursor to the CursorType and tld. */
private ImmutableMap<Cursor, CursorInfo> loadCursors() {
/** Returns a map of each cursor to the tld. */
private ImmutableMap<Cursor, String> loadCursors() {
ImmutableSet<Registry> registries = Registries.getTldEntitiesOfType(TldType.REAL);
@@ -220,11 +219,13 @@ public final class IcannReportingUploadAction implements Runnable {
keys.addAll(transactionKeyMap.keySet());
Map<Key<Cursor>, Cursor> cursorMap = ofy().load().keys(keys.build());
ImmutableMap.Builder<Cursor, CursorInfo> cursors = new ImmutableMap.Builder<>();
defaultNullCursorsToNextMonthAndAddToMap(
activityKeyMap, CursorType.ICANN_UPLOAD_ACTIVITY, cursorMap, cursors);
defaultNullCursorsToNextMonthAndAddToMap(
transactionKeyMap, CursorType.ICANN_UPLOAD_TX, cursorMap, cursors);
ImmutableMap.Builder<Cursor, String> cursors = new ImmutableMap.Builder<>();
cursors.putAll(
defaultNullCursorsToNextMonthAndAddToMap(
activityKeyMap, CursorType.ICANN_UPLOAD_ACTIVITY, cursorMap));
cursors.putAll(
defaultNullCursorsToNextMonthAndAddToMap(
transactionKeyMap, CursorType.ICANN_UPLOAD_TX, cursorMap));
return cursors.build();
}
@@ -234,15 +235,13 @@ public final class IcannReportingUploadAction implements Runnable {
}
/**
* Populate the cursors map with the Cursor and CursorInfo for each key in the keyMap. If the key
* from the keyMap does not have an existing cursor, create a new cursor with a default cursorTime
* of the first of next month.
* Return a map with the Cursor and scope for each key in the keyMap. If the key from the keyMap
* does not have an existing cursor, create a new cursor with a default cursorTime of the first of
* next month.
*/
private void defaultNullCursorsToNextMonthAndAddToMap(
Map<Key<Cursor>, Registry> keyMap,
CursorType type,
Map<Key<Cursor>, Cursor> cursorMap,
ImmutableMap.Builder<Cursor, CursorInfo> cursors) {
private ImmutableMap<Cursor, String> defaultNullCursorsToNextMonthAndAddToMap(
Map<Key<Cursor>, Registry> keyMap, CursorType type, Map<Key<Cursor>, Cursor> cursorMap) {
ImmutableMap.Builder<Cursor, String> cursors = new ImmutableMap.Builder<>();
keyMap.forEach(
(key, registry) -> {
// Cursor time is defaulted to the first of next month since a new tld will not yet have a
@@ -257,8 +256,10 @@ public final class IcannReportingUploadAction implements Runnable {
if (!cursorMap.containsValue(cursor)) {
tm().transact(() -> ofy().save().entity(cursor));
}
cursors.put(cursor, CursorInfo.create(type, registry.getTldStr()));
cursors.put(cursor, registry.getTldStr());
});
loadAndCompareAll(cursors.build(), type);
return cursors.build();
}
/** Don't retry when reports are already uploaded or can't be uploaded. */
@@ -305,15 +306,4 @@ public final class IcannReportingUploadAction implements Runnable {
gcsFilename.getBucketName());
}
@AutoValue
abstract static class CursorInfo {
static CursorInfo create(CursorType type, @Nullable String tld) {
return new AutoValue_IcannReportingUploadAction_CursorInfo(type, tld);
}
public abstract CursorType getType();
@Nullable
abstract String getTld();
}
}

View File

@@ -15,6 +15,7 @@
package google.registry.schema.cursor;
import static com.google.appengine.api.search.checkers.Preconditions.checkNotNull;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
@@ -25,6 +26,7 @@ import com.google.common.flogger.FluentLogger;
import google.registry.model.common.Cursor.CursorType;
import google.registry.schema.cursor.Cursor.CursorId;
import java.util.List;
import javax.annotation.Nullable;
/** Data access object class for {@link Cursor}. */
public class CursorDao {
@@ -130,4 +132,73 @@ public class CursorDao {
logger.atSevere().withCause(e).log("Error saving cursor to Cloud SQL.");
}
}
/**
* Loads in cursor from Cloud SQL and compares it to the Datastore cursor
*
* <p>This takes in a cursor from Datastore and checks to see if it exists in Cloud SQL and has
* the same value. If a difference is detected, or the Cloud SQL cursor does not exist, a warning
* is logged.
*/
public static void loadAndCompare(
@Nullable google.registry.model.common.Cursor datastoreCursor, String scope) {
if (datastoreCursor == null) {
return;
}
try {
// Load the corresponding cursor from Cloud SQL
Cursor cloudSqlCursor = load(datastoreCursor.getType(), scope);
compare(datastoreCursor, cloudSqlCursor, scope);
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Error comparing cursors.");
}
}
/**
* Loads in all cursors of a given type from Cloud SQL and compares them to Datastore
*
* <p>This takes in cursors from Datastore and checks to see if they exists in Cloud SQL and have
* the same value. If a difference is detected, or a Cloud SQL cursor does not exist, a warning is
* logged.
*/
public static void loadAndCompareAll(
ImmutableMap<google.registry.model.common.Cursor, String> cursors, CursorType type) {
try {
// Load all the cursors of that type from Cloud SQL
List<Cursor> cloudSqlCursors = loadByType(type);
// Create a map of each tld to its cursor if one exists
ImmutableMap<String, Cursor> cloudSqlCursorMap =
cloudSqlCursors.stream().collect(toImmutableMap(c -> c.getScope(), c -> c));
// Compare each Datastore cursor with its corresponding Cloud SQL cursor
for (google.registry.model.common.Cursor cursor : cursors.keySet()) {
Cursor cloudSqlCursor = cloudSqlCursorMap.get(cursors.get(cursor));
compare(cursor, cloudSqlCursor, cursors.get(cursor));
}
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Error comparing cursors.");
}
}
private static void compare(
google.registry.model.common.Cursor datastoreCursor,
@Nullable Cursor cloudSqlCursor,
String scope) {
if (cloudSqlCursor == null) {
logger.atWarning().log(
String.format(
"Cursor of type %s with the scope %s was not found in Cloud SQL.",
datastoreCursor.getType().name(), scope));
} else if (!datastoreCursor.getCursorTime().equals(cloudSqlCursor.getCursorTime())) {
logger.atWarning().log(
String.format(
"This cursor of type %s with the scope %s has a cursorTime of %s in Datastore and %s"
+ " in Cloud SQL.",
datastoreCursor.getType(),
scope,
datastoreCursor.getCursorTime(),
cloudSqlCursor.getCursorTime()));
}
}
}

View File

@@ -16,6 +16,7 @@ package google.registry.tools;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.schema.cursor.CursorDao.loadAndCompare;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
@@ -74,6 +75,9 @@ final class ListCursorsCommand implements CommandWithRemoteApi {
}
private static String renderLine(String tld, Optional<Cursor> cursor) {
if (cursor.isPresent()) {
loadAndCompare(cursor.get(), tld);
}
return String.format(
OUTPUT_FMT,
tld,