1
0
mirror of https://github.com/google/nomulus synced 2026-04-26 03:00:48 +00:00

Make ExpandRecurringBillingEventAction SQL-aware (#1181)

There is some complication regarding how the
CancellationMatchingBillingEvent of the generated OneTime can be
reconstructed when loading from SQL. I decided to only address it in
testing as there is no real value to fully reconstruct this VKey in
production where we are either in SQL or Ofy mode, both never in both.
Therefore the VKey in a particular mode only needs to contain the
corresponding key in order to function.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1181)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang
2021-06-03 10:21:16 -04:00
committed by GitHub
parent 586189d7ee
commit aa2898ebfc
10 changed files with 4755 additions and 4438 deletions

View File

@@ -21,9 +21,12 @@ import static google.registry.mapreduce.MapreduceRunner.PARAM_DRY_RUN;
import static google.registry.mapreduce.inputs.EppResourceInputs.createChildEntityInput;
import static google.registry.model.common.Cursor.CursorType.RECURRING_BILLING;
import static google.registry.model.domain.Period.Unit.YEARS;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.model.reporting.HistoryEntry.Type.DOMAIN_AUTORENEW;
import static google.registry.persistence.transaction.QueryComposer.Comparator.EQ;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static google.registry.pricing.PricingEngineProxy.getDomainRenewCost;
import static google.registry.util.CollectionUtils.union;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
@@ -38,10 +41,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import com.google.common.collect.Streams;
import com.google.common.flogger.FluentLogger;
import com.googlecode.objectify.Key;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.NullInput;
import google.registry.model.EppResource;
import google.registry.model.ImmutableObject;
import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Flag;
@@ -54,7 +55,7 @@ import google.registry.model.domain.Period;
import google.registry.model.registry.Registry;
import google.registry.model.reporting.DomainTransactionRecord;
import google.registry.model.reporting.DomainTransactionRecord.TransactionReportField;
import google.registry.model.reporting.HistoryEntry;
import google.registry.persistence.VKey;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
@@ -92,31 +93,87 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
@Override
public void run() {
Cursor cursor =
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING)).orElse(null);
DateTime executeTime = clock.nowUtc();
DateTime persistedCursorTime = (cursor == null ? START_OF_TIME : cursor.getCursorTime());
DateTime persistedCursorTime =
transactIfJpaTm(
() ->
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
.getCursorTime());
DateTime cursorTime = cursorTimeParam.orElse(persistedCursorTime);
checkArgument(
cursorTime.isBefore(executeTime),
"Cursor time must be earlier than execution time.");
cursorTime.isBefore(executeTime), "Cursor time must be earlier than execution time.");
logger.atInfo().log(
"Running Recurring billing event expansion for billing time range [%s, %s).",
cursorTime, executeTime);
mrRunner
.setJobName("Expand Recurring billing events into synthetic OneTime events.")
.setModuleName("backend")
.runMapreduce(
new ExpandRecurringBillingEventsMapper(isDryRun, cursorTime, clock.nowUtc()),
new ExpandRecurringBillingEventsReducer(isDryRun, persistedCursorTime),
// Add an extra shard that maps over a null recurring event (see the mapper for why).
ImmutableList.of(
new NullInput<>(),
createChildEntityInput(
ImmutableSet.of(DomainBase.class), ImmutableSet.of(Recurring.class))))
.sendLinkToMapreduceConsole(response);
}
if (tm().isOfy()) {
mrRunner
.setJobName("Expand Recurring billing events into synthetic OneTime events.")
.setModuleName("backend")
.runMapreduce(
new ExpandRecurringBillingEventsMapper(isDryRun, cursorTime, clock.nowUtc()),
new ExpandRecurringBillingEventsReducer(isDryRun, persistedCursorTime),
// Add an extra shard that maps over a null recurring event (see the mapper for why).
ImmutableList.of(
new NullInput<>(),
createChildEntityInput(
ImmutableSet.of(DomainBase.class), ImmutableSet.of(Recurring.class))))
.sendLinkToMapreduceConsole(response);
} else {
int numBillingEventsSaved =
jpaTm()
.transact(
() ->
jpaTm()
.query(
"FROM BillingRecurrence "
+ "WHERE event_time <= :executeTime "
+ "AND event_time < recurrence_end_time",
Recurring.class)
.setParameter("executeTime", executeTime.toDate())
// Need to get a list from the transaction and then convert it to a stream
// for further processing. If we get a stream directly, each elements gets
// processed downstream eagerly but Hibernate returns a
// ScrollableResultsIterator that cannot be advanced outside the
// transaction, resulting in an exception.
.getResultList())
.stream()
.map(
recurring ->
jpaTm()
.transact(
() ->
expandBillingEvent(recurring, executeTime, cursorTime, isDryRun)))
.reduce(0, Integer::sum);
if (!isDryRun) {
logger.atInfo().log("Saved OneTime billing events", numBillingEventsSaved);
} else {
logger.atInfo().log("Generated OneTime billing events (dry run)", numBillingEventsSaved);
}
logger.atInfo().log(
"Recurring event expansion %s complete for billing event range [%s, %s).",
isDryRun ? "(dry run) " : "", cursorTime, executeTime);
tm().transact(
() -> {
// Check for the unlikely scenario where the cursor has been altered during the
// expansion.
DateTime currentCursorTime =
tm().loadByKeyIfPresent(Cursor.createGlobalVKey(RECURRING_BILLING))
.orElse(Cursor.createGlobal(RECURRING_BILLING, START_OF_TIME))
.getCursorTime();
if (!currentCursorTime.equals(persistedCursorTime)) {
throw new IllegalStateException(
String.format(
"Current cursor position %s does not match persisted cursor position %s.",
currentCursorTime, persistedCursorTime));
}
if (!isDryRun) {
tm().put(Cursor.createGlobal(RECURRING_BILLING, executeTime));
}
});
}
}
/** Mapper to expand {@link Recurring} billing events into synthetic {@link OneTime} events. */
public static class ExpandRecurringBillingEventsMapper
extends Mapper<Recurring, DateTime, DateTime> {
@@ -155,100 +212,7 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
try {
numBillingEventsSaved =
tm().transactNew(
() -> {
ImmutableSet.Builder<OneTime> syntheticOneTimesBuilder =
new ImmutableSet.Builder<>();
final Registry tld =
Registry.get(getTldFromDomainName(recurring.getTargetId()));
// Determine the complete set of times at which this recurring event should
// occur (up to and including the runtime of the mapreduce).
Iterable<DateTime> eventTimes =
recurring
.getRecurrenceTimeOfYear()
.getInstancesInRange(
Range.closed(
recurring.getEventTime(),
earliestOf(recurring.getRecurrenceEndTime(), executeTime)));
// Convert these event times to billing times
final ImmutableSet<DateTime> billingTimes =
getBillingTimesInScope(eventTimes, cursorTime, executeTime, tld);
Key<? extends EppResource> domainKey = recurring.getParentKey().getParent();
Iterable<OneTime> oneTimesForDomain =
ofy().load().type(OneTime.class).ancestor(domainKey);
// Determine the billing times that already have OneTime events persisted.
ImmutableSet<DateTime> existingBillingTimes =
getExistingBillingTimes(oneTimesForDomain, recurring);
ImmutableSet.Builder<HistoryEntry> historyEntriesBuilder =
new ImmutableSet.Builder<>();
// Create synthetic OneTime events for all billing times that do not yet have
// an event persisted.
for (DateTime billingTime : difference(billingTimes, existingBillingTimes)) {
// Construct a new HistoryEntry that parents over the OneTime
DomainHistory historyEntry =
new DomainHistory.Builder()
.setBySuperuser(false)
.setClientId(recurring.getClientId())
.setModificationTime(tm().getTransactionTime())
// TODO (jianglai): modify this to use setDomain instead when
// converting this action to be SQL-aware.
.setDomainRepoId(domainKey.getName())
.setPeriod(Period.create(1, YEARS))
.setReason(
"Domain autorenewal by ExpandRecurringBillingEventsAction")
.setRequestedByRegistrar(false)
.setType(DOMAIN_AUTORENEW)
// Don't write a domain transaction record if the recurrence was
// ended prior to the billing time (i.e. a domain was deleted
// during the autorenew grace period).
.setDomainTransactionRecords(
recurring.getRecurrenceEndTime().isBefore(billingTime)
? ImmutableSet.of()
: ImmutableSet.of(
DomainTransactionRecord.create(
tld.getTldStr(),
// We report this when the autorenew grace period
// ends
billingTime,
TransactionReportField.netRenewsFieldFromYears(1),
1)))
.build();
historyEntriesBuilder.add(historyEntry);
DateTime eventTime = billingTime.minus(tld.getAutoRenewGracePeriodLength());
// Determine the cost for a one-year renewal.
Money renewCost = getDomainRenewCost(recurring.getTargetId(), eventTime, 1);
syntheticOneTimesBuilder.add(
new OneTime.Builder()
.setBillingTime(billingTime)
.setClientId(recurring.getClientId())
.setCost(renewCost)
.setEventTime(eventTime)
.setFlags(union(recurring.getFlags(), Flag.SYNTHETIC))
.setParent(historyEntry)
.setPeriodYears(1)
.setReason(recurring.getReason())
.setSyntheticCreationTime(executeTime)
.setCancellationMatchingBillingEvent(recurring.createVKey())
.setTargetId(recurring.getTargetId())
.build());
}
Set<HistoryEntry> historyEntries = historyEntriesBuilder.build();
Set<OneTime> syntheticOneTimes = syntheticOneTimesBuilder.build();
if (!isDryRun) {
ImmutableSet<ImmutableObject> entitiesToSave =
new ImmutableSet.Builder<ImmutableObject>()
.addAll(historyEntries)
.addAll(syntheticOneTimes)
.build();
ofy().save().entities(entitiesToSave).now();
}
return syntheticOneTimes.size();
});
() -> expandBillingEvent(recurring, executeTime, cursorTime, isDryRun));
} catch (Throwable t) {
getContext().incrementCounter("error: " + t.getClass().getSimpleName());
getContext().incrementCounter(ERROR_COUNTER);
@@ -260,45 +224,12 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
if (!isDryRun) {
getContext().incrementCounter("Saved OneTime billing events", numBillingEventsSaved);
} else {
getContext().incrementCounter(
"Generated OneTime billing events (dry run)", numBillingEventsSaved);
getContext()
.incrementCounter("Generated OneTime billing events (dry run)", numBillingEventsSaved);
}
}
/**
* Filters a set of {@link DateTime}s down to event times that are in scope for a particular
* mapreduce run, given the cursor time and the mapreduce execution time.
*/
private ImmutableSet<DateTime> getBillingTimesInScope(
Iterable<DateTime> eventTimes,
DateTime cursorTime,
DateTime executeTime,
final Registry tld) {
return Streams.stream(eventTimes)
.map(eventTime -> eventTime.plus(tld.getAutoRenewGracePeriodLength()))
.filter(Range.closedOpen(cursorTime, executeTime))
.collect(toImmutableSet());
}
/**
* Determines an {@link ImmutableSet} of {@link DateTime}s that have already been persisted
* for a given recurring billing event.
*/
private ImmutableSet<DateTime> getExistingBillingTimes(
Iterable<BillingEvent.OneTime> oneTimesForDomain,
final BillingEvent.Recurring recurringEvent) {
return Streams.stream(oneTimesForDomain)
.filter(
billingEvent ->
recurringEvent
.createVKey()
.equals(billingEvent.getCancellationMatchingBillingEvent()))
.map(OneTime::getBillingTime)
.collect(toImmutableSet());
}
}
/**
* "Reducer" to advance the cursor after all map jobs have been completed. The NullInput into the
* mapper will cause the mapper to emit one timestamp pair (current cursor and execution time),
@@ -331,7 +262,8 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
isDryRun ? "(dry run) " : "", cursorTime, executionTime);
tm().transact(
() -> {
Cursor cursor = ofy().load().key(Cursor.createGlobalKey(RECURRING_BILLING)).now();
Cursor cursor =
auditedOfy().load().key(Cursor.createGlobalKey(RECURRING_BILLING)).now();
DateTime currentCursorTime =
(cursor == null ? START_OF_TIME : cursor.getCursorTime());
if (!currentCursorTime.equals(expectedPersistedCursorTime)) {
@@ -346,4 +278,135 @@ public class ExpandRecurringBillingEventsAction implements Runnable {
});
}
}
private static int expandBillingEvent(
Recurring recurring, DateTime executeTime, DateTime cursorTime, boolean isDryRun) {
ImmutableSet.Builder<OneTime> syntheticOneTimesBuilder = new ImmutableSet.Builder<>();
final Registry tld = Registry.get(getTldFromDomainName(recurring.getTargetId()));
// Determine the complete set of times at which this recurring event should
// occur (up to and including the runtime of the mapreduce).
Iterable<DateTime> eventTimes =
recurring
.getRecurrenceTimeOfYear()
.getInstancesInRange(
Range.closed(
recurring.getEventTime(),
earliestOf(recurring.getRecurrenceEndTime(), executeTime)));
// Convert these event times to billing times
final ImmutableSet<DateTime> billingTimes =
getBillingTimesInScope(eventTimes, cursorTime, executeTime, tld);
VKey<DomainBase> domainKey =
VKey.create(
DomainBase.class, recurring.getDomainRepoId(), recurring.getParentKey().getParent());
Iterable<OneTime> oneTimesForDomain;
if (tm().isOfy()) {
oneTimesForDomain = auditedOfy().load().type(OneTime.class).ancestor(domainKey.getOfyKey());
} else {
oneTimesForDomain =
tm().createQueryComposer(OneTime.class)
.where("domainRepoId", EQ, recurring.getDomainRepoId())
.list();
}
// Determine the billing times that already have OneTime events persisted.
ImmutableSet<DateTime> existingBillingTimes =
getExistingBillingTimes(oneTimesForDomain, recurring);
ImmutableSet.Builder<DomainHistory> historyEntriesBuilder = new ImmutableSet.Builder<>();
// Create synthetic OneTime events for all billing times that do not yet have
// an event persisted.
for (DateTime billingTime : difference(billingTimes, existingBillingTimes)) {
// Construct a new HistoryEntry that parents over the OneTime
DomainHistory historyEntry =
new DomainHistory.Builder()
.setBySuperuser(false)
.setClientId(recurring.getClientId())
.setModificationTime(tm().getTransactionTime())
.setDomain(tm().loadByKey(domainKey))
.setPeriod(Period.create(1, YEARS))
.setReason("Domain autorenewal by ExpandRecurringBillingEventsAction")
.setRequestedByRegistrar(false)
.setType(DOMAIN_AUTORENEW)
// Don't write a domain transaction record if the recurrence was
// ended prior to the billing time (i.e. a domain was deleted
// during the autorenew grace period).
.setDomainTransactionRecords(
recurring.getRecurrenceEndTime().isBefore(billingTime)
? ImmutableSet.of()
: ImmutableSet.of(
DomainTransactionRecord.create(
tld.getTldStr(),
// We report this when the autorenew grace period
// ends
billingTime,
TransactionReportField.netRenewsFieldFromYears(1),
1)))
.build();
historyEntriesBuilder.add(historyEntry);
DateTime eventTime = billingTime.minus(tld.getAutoRenewGracePeriodLength());
// Determine the cost for a one-year renewal.
Money renewCost = getDomainRenewCost(recurring.getTargetId(), eventTime, 1);
syntheticOneTimesBuilder.add(
new OneTime.Builder()
.setBillingTime(billingTime)
.setClientId(recurring.getClientId())
.setCost(renewCost)
.setEventTime(eventTime)
.setFlags(union(recurring.getFlags(), Flag.SYNTHETIC))
.setParent(historyEntry)
.setPeriodYears(1)
.setReason(recurring.getReason())
.setSyntheticCreationTime(executeTime)
.setCancellationMatchingBillingEvent(recurring.createVKey())
.setTargetId(recurring.getTargetId())
.build());
}
Set<DomainHistory> historyEntries = historyEntriesBuilder.build();
Set<OneTime> syntheticOneTimes = syntheticOneTimesBuilder.build();
if (!isDryRun) {
ImmutableSet<ImmutableObject> entitiesToSave =
new ImmutableSet.Builder<ImmutableObject>()
.addAll(historyEntries)
.addAll(syntheticOneTimes)
.build();
tm().putAll(entitiesToSave);
}
return syntheticOneTimes.size();
}
/**
* Filters a set of {@link DateTime}s down to event times that are in scope for a particular
* mapreduce run, given the cursor time and the mapreduce execution time.
*/
protected static ImmutableSet<DateTime> getBillingTimesInScope(
Iterable<DateTime> eventTimes,
DateTime cursorTime,
DateTime executeTime,
final Registry tld) {
return Streams.stream(eventTimes)
.map(eventTime -> eventTime.plus(tld.getAutoRenewGracePeriodLength()))
.filter(Range.closedOpen(cursorTime, executeTime))
.collect(toImmutableSet());
}
/**
* Determines an {@link ImmutableSet} of {@link DateTime}s that have already been persisted for a
* given recurring billing event.
*/
private static ImmutableSet<DateTime> getExistingBillingTimes(
Iterable<BillingEvent.OneTime> oneTimesForDomain,
final BillingEvent.Recurring recurringEvent) {
return Streams.stream(oneTimesForDomain)
.filter(
billingEvent ->
recurringEvent
.createVKey()
.equals(billingEvent.getCancellationMatchingBillingEvent()))
.map(OneTime::getBillingTime)
.collect(toImmutableSet());
}
}

View File

@@ -324,13 +324,22 @@ public abstract class BillingEvent extends ImmutableObject
DateTime syntheticCreationTime;
/**
* For {@link Flag#SYNTHETIC} events, a {@link Key} to the {@link BillingEvent} from which this
* OneTime was created. This is needed in order to properly match billing events against {@link
* Cancellation}s.
* For {@link Flag#SYNTHETIC} events, a {@link Key} to the {@link Recurring} from which this
* {@link OneTime} was created. This is needed in order to properly match billing events against
* {@link Cancellation}s.
*/
@Column(name = "cancellation_matching_billing_recurrence_id")
VKey<Recurring> cancellationMatchingBillingEvent;
/**
* For {@link Flag#SYNTHETIC} events, the {@link DomainHistory} revision ID of the the {@link
* Recurring} from which this {@link OneTime} was created. This is needed in order to recreate
* the {@link VKey} when reading from SQL.
*/
@Ignore
@Column(name = "recurrence_history_revision_id")
Long recurringEventHistoryRevisionId;
/**
* The {@link AllocationToken} used in the creation of this event, or null if one was not used.
*/
@@ -354,10 +363,14 @@ public abstract class BillingEvent extends ImmutableObject
return syntheticCreationTime;
}
public VKey<? extends BillingEvent> getCancellationMatchingBillingEvent() {
public VKey<Recurring> getCancellationMatchingBillingEvent() {
return cancellationMatchingBillingEvent;
}
public Long getRecurringEventHistoryRevisionId() {
return recurringEventHistoryRevisionId;
}
public Optional<VKey<AllocationToken>> getAllocationToken() {
return Optional.ofNullable(allocationToken);
}
@@ -376,6 +389,28 @@ public abstract class BillingEvent extends ImmutableObject
return new Builder(clone(this));
}
@Override
void onLoad() {
super.onLoad();
if (cancellationMatchingBillingEvent != null) {
recurringEventHistoryRevisionId =
cancellationMatchingBillingEvent.getOfyKey().getParent().getId();
}
}
@Override
void postLoad() {
super.postLoad();
if (cancellationMatchingBillingEvent != null) {
cancellationMatchingBillingEvent =
cancellationMatchingBillingEvent.restoreOfy(
DomainBase.class,
domainRepoId,
DomainHistory.class,
recurringEventHistoryRevisionId);
}
}
/** A builder for {@link OneTime} since it is immutable. */
public static class Builder extends BillingEvent.Builder<OneTime, Builder> {
@@ -410,6 +445,8 @@ public abstract class BillingEvent extends ImmutableObject
public Builder setCancellationMatchingBillingEvent(
VKey<Recurring> cancellationMatchingBillingEvent) {
getInstance().cancellationMatchingBillingEvent = cancellationMatchingBillingEvent;
getInstance().recurringEventHistoryRevisionId =
cancellationMatchingBillingEvent.getOfyKey().getParent().getId();
return this;
}
@@ -444,6 +481,11 @@ public abstract class BillingEvent extends ImmutableObject
== (instance.cancellationMatchingBillingEvent != null),
"Cancellation matching billing event must be set if and only if the SYNTHETIC flag "
+ "is set.");
checkState(
!instance.getFlags().contains(Flag.SYNTHETIC)
|| (instance.cancellationMatchingBillingEvent.getOfyKey().getParent().getId()
== instance.recurringEventHistoryRevisionId),
"Cancellation matching billing event and its history revision ID does not match.");
return super.build();
}
}

View File

@@ -184,6 +184,7 @@ public class HistoryEntryDao {
.where("modificationTime", criteriaBuilder::greaterThanOrEqualTo, afterTime)
.where("modificationTime", criteriaBuilder::lessThanOrEqualTo, beforeTime)
.where(repoIdFieldName, criteriaBuilder::equal, parentKey.getSqlKey().toString())
.orderByAsc("id")
.build();
return ImmutableList.sortedCopyOf(