1
0
mirror of https://github.com/google/nomulus synced 2026-04-26 03:00:48 +00:00

Fix BsaRefreshAction bugs (#2294)

* Fix BsaRefreshAction bugs

Added functional tests for BsaRefreshAction, which checks for changes in
domain registration and reservation, and apply them to the Unblockable
domain list.

Fixed a few bugs exposed by the tests.

Also refactored a few other tests.
This commit is contained in:
Weimin Yu
2024-01-22 12:23:29 -05:00
committed by GitHub
parent c414e38a98
commit f61579b350
14 changed files with 509 additions and 74 deletions

View File

@@ -54,7 +54,7 @@ public class BsaRefreshAction implements Runnable {
private final GcsClient gcsClient;
private final BsaReportSender bsaReportSender;
private final int transactionBatchSize;
private final Duration domainTxnMaxDuration;
private final Duration domainCreateTxnCommitTimeLag;
private final BsaLock bsaLock;
private final Clock clock;
private final Response response;
@@ -65,7 +65,7 @@ public class BsaRefreshAction implements Runnable {
GcsClient gcsClient,
BsaReportSender bsaReportSender,
@Config("bsaTxnBatchSize") int transactionBatchSize,
@Config("domainCreateTxnCommitTimeLag") Duration domainTxnMaxDuration,
@Config("domainCreateTxnCommitTimeLag") Duration domainCreateTxnCommitTimeLag,
BsaLock bsaLock,
Clock clock,
Response response) {
@@ -73,7 +73,7 @@ public class BsaRefreshAction implements Runnable {
this.gcsClient = gcsClient;
this.bsaReportSender = bsaReportSender;
this.transactionBatchSize = transactionBatchSize;
this.domainTxnMaxDuration = domainTxnMaxDuration;
this.domainCreateTxnCommitTimeLag = domainCreateTxnCommitTimeLag;
this.bsaLock = bsaLock;
this.clock = clock;
this.response = response;
@@ -109,17 +109,21 @@ public class BsaRefreshAction implements Runnable {
RefreshSchedule schedule = maybeSchedule.get();
DomainsRefresher refresher =
new DomainsRefresher(
schedule.prevRefreshTime(), clock.nowUtc(), domainTxnMaxDuration, transactionBatchSize);
schedule.prevRefreshTime(),
clock.nowUtc(),
domainCreateTxnCommitTimeLag,
transactionBatchSize);
switch (schedule.stage()) {
case CHECK_FOR_CHANGES:
ImmutableList<UnblockableDomainChange> blockabilityChanges =
refresher.checkForBlockabilityChanges();
// Always write even if no change. Easier for manual inspection of GCS bucket.
gcsClient.writeRefreshChanges(schedule.jobName(), blockabilityChanges.stream());
if (blockabilityChanges.isEmpty()) {
logger.atInfo().log("No change to Unblockable domains found.");
schedule.updateJobStage(RefreshStage.DONE);
return null;
}
gcsClient.writeRefreshChanges(schedule.jobName(), blockabilityChanges.stream());
schedule.updateJobStage(RefreshStage.APPLY_CHANGES);
// Fall through
case APPLY_CHANGES:
@@ -132,10 +136,12 @@ public class BsaRefreshAction implements Runnable {
case UPLOAD_REMOVALS:
try (Stream<UnblockableDomainChange> changes =
gcsClient.readRefreshChanges(schedule.jobName())) {
// Unblockables with changes in REASON are removed then added back. That is why they are
// included in this stage.
Optional<String> report =
JsonSerializations.toUnblockableDomainsRemovalReport(
changes
.filter(UnblockableDomainChange::isDelete)
.filter(UnblockableDomainChange::isChangeOrDelete)
.map(UnblockableDomainChange::domainName));
if (report.isPresent()) {
gcsClient.logRemovedUnblockableDomainsReport(
@@ -153,12 +159,12 @@ public class BsaRefreshAction implements Runnable {
Optional<String> report =
JsonSerializations.toUnblockableDomainsReport(
changes
.filter(UnblockableDomainChange::isAddOrChange)
.filter(UnblockableDomainChange::isNewOrChange)
.map(UnblockableDomainChange::newValue));
if (report.isPresent()) {
gcsClient.logRemovedUnblockableDomainsReport(
schedule.jobName(), LINE_SPLITTER.splitToStream(report.get()));
bsaReportSender.removeUnblockableDomainsUpdates(report.get());
bsaReportSender.addUnblockableDomainsUpdates(report.get());
} else {
logger.atInfo().log("No new Unblockable domains to add.");
}

View File

@@ -30,7 +30,7 @@ public class BsaStringUtils {
public static final Splitter LINE_SPLITTER = Splitter.on('\n');
public static String getLabelInDomain(String domainName) {
List<String> parts = DOMAIN_SPLITTER.limit(1).splitToList(domainName);
List<String> parts = DOMAIN_SPLITTER.splitToList(domainName);
checkArgument(!parts.isEmpty(), "Not a valid domain: [%s]", domainName);
return parts.get(0);
}

View File

@@ -21,6 +21,7 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import google.registry.persistence.transaction.TransactionManager.ThrowingRunnable;
import java.util.concurrent.Callable;
/**
@@ -39,6 +40,11 @@ public final class BsaTransactions {
return tm().transact(work, TRANSACTION_REPEATABLE_READ);
}
public static void bsaTransact(ThrowingRunnable work) {
verify(!isInTransaction(), "May only be used for top-level transactions.");
tm().transact(work, TRANSACTION_REPEATABLE_READ);
}
@CanIgnoreReturnValue
public static <T> T bsaQuery(Callable<T> work) {
verify(!isInTransaction(), "May only be used for top-level transactions.");

View File

@@ -161,7 +161,7 @@ public class GcsClient {
}
Stream<UnblockableDomainChange> readRefreshChanges(String jobName) {
BlobId blobId = getBlobId(jobName, UNBLOCKABLE_DOMAINS_FILE);
BlobId blobId = getBlobId(jobName, REFRESHED_UNBLOCKABLE_DOMAINS_FILE);
return readStream(blobId).map(UnblockableDomainChange::deserialize);
}

View File

@@ -28,9 +28,9 @@ import java.util.List;
// TODO(1/15/2024): rename to UnblockableDomain.
@AutoValue
public abstract class UnblockableDomain {
abstract String domainName();
public abstract String domainName();
abstract Reason reason();
public abstract Reason reason();
/** Reasons why a valid domain name cannot be blocked. */
public enum Reason {

View File

@@ -52,12 +52,16 @@ public abstract class UnblockableDomainChange {
return UnblockableDomain.of(unblockable().domainName(), newReason().get());
}
public boolean isAddOrChange() {
public boolean isNewOrChange() {
return newReason().isPresent();
}
public boolean isChangeOrDelete() {
return !isNew();
}
public boolean isDelete() {
return !this.isAddOrChange();
return !this.isNewOrChange();
}
public boolean isNew() {

View File

@@ -105,6 +105,10 @@ class BsaUnblockableDomain {
return new BsaUnblockableDomain(parts.get(0), parts.get(1), reason);
}
static BsaUnblockableDomain of(UnblockableDomain unblockable) {
return of(unblockable.domainName(), Reason.valueOf(unblockable.reason().name()));
}
static VKey<BsaUnblockableDomain> vKey(String domainName) {
ImmutableList<String> parts = ImmutableList.copyOf(DOMAIN_SPLITTER.splitToList(domainName));
verify(parts.size() == 2, "Invalid domain name: %s", domainName);

View File

@@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.bsa.BsaTransactions.bsaQuery;
import static google.registry.bsa.BsaTransactions.bsaTransact;
import static google.registry.bsa.ReservedDomainsUtils.getAllReservedNames;
import static google.registry.bsa.ReservedDomainsUtils.isReservedDomain;
import static google.registry.bsa.persistence.Queries.batchReadUnblockables;
@@ -44,6 +45,7 @@ import google.registry.model.domain.Domain;
import google.registry.model.tld.Tld;
import google.registry.model.tld.Tld.TldType;
import google.registry.util.BatchedStreams;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -107,6 +109,8 @@ public final class DomainsRefresher {
ImmutableSet<String> upgradedDomains =
upgrades.stream().map(UnblockableDomainChange::domainName).collect(toImmutableSet());
// Upgrades are collected in its own txn after downgrades so need reconciliation. Conflicts are
// unlikely but possible: domains may be recreated or reserved names added during the interval.
ImmutableList<UnblockableDomainChange> trueDowngrades =
downgrades.stream()
.filter(c -> !upgradedDomains.contains(c.domainName()))
@@ -198,22 +202,50 @@ public final class DomainsRefresher {
}
public ImmutableList<UnblockableDomainChange> getNewUnblockables() {
return bsaQuery(
() -> {
// TODO(weiminyu): both methods below use `queryBsaLabelByLabels`. Should combine.
ImmutableSet<String> newCreated = getNewlyCreatedUnblockables(prevRefreshStartTime, now);
ImmutableSet<String> newReserved =
getNewlyReservedUnblockables(now, transactionBatchSize);
SetView<String> reservedNotCreated = Sets.difference(newReserved, newCreated);
return Streams.concat(
newCreated.stream()
.map(name -> UnblockableDomain.of(name, Reason.REGISTERED))
.map(UnblockableDomainChange::ofNew),
reservedNotCreated.stream()
.map(name -> UnblockableDomain.of(name, Reason.RESERVED))
.map(UnblockableDomainChange::ofNew))
.collect(toImmutableList());
});
// TODO(weiminyu): both methods below use `queryBsaLabelByLabels`. Should combine in a single
// query.
HashSet<String> newCreated =
Sets.newHashSet(bsaQuery(() -> getNewlyCreatedUnblockables(prevRefreshStartTime, now)));
// We cannot identify new reserved unblockables so must look at all of them. There are not many
// of these.
HashSet<String> allReserved =
Sets.newHashSet(bsaQuery(() -> getAllReservedUnblockables(now, transactionBatchSize)));
HashSet<String> reservedNotCreated = Sets.newHashSet(Sets.difference(allReserved, newCreated));
ImmutableList.Builder<UnblockableDomainChange> changes = new ImmutableList.Builder<>();
ImmutableList<BsaUnblockableDomain> batch;
Optional<BsaUnblockableDomain> lastRead = Optional.empty();
do {
batch = batchReadUnblockables(lastRead, transactionBatchSize);
if (batch.isEmpty()) {
break;
}
lastRead = Optional.of(batch.get(batch.size() - 1));
for (BsaUnblockableDomain unblockable : batch) {
String domainName = unblockable.domainName();
if (unblockable.reason.equals(BsaUnblockableDomain.Reason.REGISTERED)) {
newCreated.remove(domainName);
reservedNotCreated.remove(domainName);
} else {
reservedNotCreated.remove(domainName);
if (newCreated.remove(domainName)) {
changes.add(
UnblockableDomainChange.ofChanged(
unblockable.toUnblockableDomain(), Reason.REGISTERED));
}
}
}
} while (batch.size() == transactionBatchSize);
Streams.concat(
newCreated.stream()
.map(name -> UnblockableDomain.of(name, Reason.REGISTERED))
.map(UnblockableDomainChange::ofNew),
reservedNotCreated.stream()
.map(name -> UnblockableDomain.of(name, Reason.RESERVED))
.map(UnblockableDomainChange::ofNew))
.forEach(changes::add);
return changes.build();
}
static ImmutableSet<String> getNewlyCreatedUnblockables(
@@ -225,18 +257,18 @@ public final class DomainsRefresher {
.collect(toImmutableSet());
ImmutableSet<String> liveDomains =
queryNewlyCreatedDomains(bsaEnabledTlds, prevRefreshStartTime, now);
return getUnblockedDomainNames(liveDomains);
return getBlockedDomainNames(liveDomains);
}
static ImmutableSet<String> getNewlyReservedUnblockables(DateTime now, int batchSize) {
static ImmutableSet<String> getAllReservedUnblockables(DateTime now, int batchSize) {
Stream<String> allReserved = getAllReservedNames(now);
return BatchedStreams.toBatches(allReserved, batchSize)
.map(DomainsRefresher::getUnblockedDomainNames)
.map(DomainsRefresher::getBlockedDomainNames)
.flatMap(ImmutableSet::stream)
.collect(toImmutableSet());
}
static ImmutableSet<String> getUnblockedDomainNames(ImmutableCollection<String> domainNames) {
static ImmutableSet<String> getBlockedDomainNames(ImmutableCollection<String> domainNames) {
Map<String, List<String>> labelToNames =
domainNames.stream().collect(groupingBy(BsaStringUtils::getLabelInDomain));
ImmutableSet<String> bsaLabels =
@@ -244,7 +276,7 @@ public final class DomainsRefresher {
.map(BsaLabel::getLabel)
.collect(toImmutableSet());
return labelToNames.entrySet().stream()
.filter(entry -> !bsaLabels.contains(entry.getKey()))
.filter(entry -> bsaLabels.contains(entry.getKey()))
.map(Entry::getValue)
.flatMap(List::stream)
.collect(toImmutableSet());
@@ -257,20 +289,21 @@ public final class DomainsRefresher {
.collect(
groupingBy(
change -> change.isDelete() ? "remove" : "change", toImmutableSet())));
tm().transact(
() -> {
if (changesByType.containsKey("remove")) {
tm().delete(
changesByType.get("remove").stream()
.map(c -> BsaUnblockableDomain.vKey(c.domainName()))
.collect(toImmutableSet()));
}
if (changesByType.containsKey("change")) {
tm().putAll(
changesByType.get("change").stream()
.map(UnblockableDomainChange::newValue)
.collect(toImmutableSet()));
}
});
bsaTransact(
() -> {
if (changesByType.containsKey("remove")) {
tm().delete(
changesByType.get("remove").stream()
.map(c -> BsaUnblockableDomain.vKey(c.domainName()))
.collect(toImmutableSet()));
}
if (changesByType.containsKey("change")) {
tm().putAll(
changesByType.get("change").stream()
.map(UnblockableDomainChange::newValue)
.map(BsaUnblockableDomain::of)
.collect(toImmutableSet()));
}
});
}
}