From 634202c0e95d0b9a9bcd2586fd0598e2aac1188f Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Mon, 14 Oct 2024 16:11:29 -0400 Subject: [PATCH] A batch query utility to replace TransactionManager's `loadAllOf` methods (#2589) * Replace with batch query * Addressing CR --- .../transaction/BatchedQueries.java | 154 ++++++++++++++++++ .../transaction/JpaTransactionManager.java | 4 + .../JpaTransactionManagerImpl.java | 6 + .../tools/server/ListHostsAction.java | 6 +- .../transaction/BatchedQueriesTest.java | 105 ++++++++++++ 5 files changed, 273 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/google/registry/persistence/transaction/BatchedQueries.java create mode 100644 core/src/test/java/google/registry/persistence/transaction/BatchedQueriesTest.java diff --git a/core/src/main/java/google/registry/persistence/transaction/BatchedQueries.java b/core/src/main/java/google/registry/persistence/transaction/BatchedQueries.java new file mode 100644 index 000000000..36da34077 --- /dev/null +++ b/core/src/main/java/google/registry/persistence/transaction/BatchedQueries.java @@ -0,0 +1,154 @@ +// Copyright 2024 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.persistence.transaction; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static google.registry.persistence.PersistenceModule.TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; +import com.google.common.collect.UnmodifiableIterator; +import jakarta.persistence.TypedQuery; +import jakarta.persistence.metamodel.EntityType; +import java.util.Optional; +import java.util.stream.Stream; + +/** Helper for querying large data sets in batches. */ +public final class BatchedQueries { + + private BatchedQueries() {} + + private static final int DEFAULT_BATCH_SIZE = 500; + + public static Stream> loadAllOf(Class entityType) { + return loadAllOf(entityType, DEFAULT_BATCH_SIZE); + } + + public static Stream> loadAllOf(Class entityType, int batchSize) { + return loadAllOf(tm(), entityType, batchSize); + } + + /** + * Loads all entities of type {@code T} in batches. + * + *

This method must not be nested in any transaction; same for the traversal of the returned + * {@link Stream}. Each batch is loaded in a separate transaction at the {@code + * TRANSACTION_REPEATABLE_READ} isolation level, and loads the snapshot of the batch at the + * batch's start time. New insertions or updates since then are not reflected in the result. + */ + public static Stream> loadAllOf( + JpaTransactionManager jpaTm, Class entityType, int batchSize) { + checkState(!jpaTm.inTransaction(), "loadAllOf cannot be nested in a transaction"); + checkArgument(batchSize > 0, "batchSize must be positive"); + EntityType jpaEntityType = jpaTm.getMetaModel().entity(entityType); + if (!jpaEntityType.hasSingleIdAttribute()) { + // We should support multi-column primary keys on a case-by-case basis. + throw new UnsupportedOperationException( + "Types with multi-column primary key not supported yet"); + } + return Streams.stream( + new BatchedIterator<>(new SingleColIdBatchQuery<>(jpaTm, jpaEntityType), batchSize)); + } + + public interface BatchQuery { + ImmutableList readBatch(Optional lastRead, int batchSize); + } + + private static class SingleColIdBatchQuery implements BatchQuery { + + private final JpaTransactionManager jpaTm; + private final Class entityType; + private final String initialJpqlQuery; + private final String subsequentJpqlTemplate; + + private SingleColIdBatchQuery(JpaTransactionManager jpaTm, EntityType jpaEntityType) { + checkArgument( + jpaEntityType.hasSingleIdAttribute(), + "%s must have a single ID attribute", + jpaEntityType.getJavaType().getSimpleName()); + this.jpaTm = jpaTm; + this.entityType = jpaEntityType.getJavaType(); + var idAttr = jpaEntityType.getId(jpaEntityType.getIdType().getJavaType()); + this.initialJpqlQuery = + String.format("FROM %s ORDER BY %s", jpaEntityType.getName(), idAttr.getName()); + this.subsequentJpqlTemplate = + String.format( + "FROM %1$s WHERE %2$s > :id ORDER BY %2$s", + jpaEntityType.getName(), idAttr.getName()); + } + + @Override + public ImmutableList readBatch(Optional lastRead, int batchSize) { + checkState(!jpaTm.inTransaction(), "Stream cannot be accessed in a transaction"); + return jpaTm.transact( + TRANSACTION_REPEATABLE_READ, + () -> { + var entityManager = jpaTm.getEntityManager(); + Optional lastReadId = + lastRead.map( + entityManager.getEntityManagerFactory().getPersistenceUnitUtil() + ::getIdentifier); + TypedQuery query = + lastRead.isEmpty() + ? entityManager.createQuery(initialJpqlQuery, entityType) + : entityManager + .createQuery(subsequentJpqlTemplate, entityType) + .setParameter("id", lastReadId.get()); + + var results = ImmutableList.copyOf(query.setMaxResults(batchSize).getResultList()); + results.forEach(entityManager::detach); + return results; + }); + } + } + + private static class BatchedIterator extends UnmodifiableIterator> { + + private final BatchQuery batchQuery; + + private final int batchSize; + + private ImmutableList cachedBatch = null; + + private BatchedIterator(BatchQuery batchQuery, int batchSize) { + this.batchQuery = batchQuery; + this.batchSize = batchSize; + this.cachedBatch = readNextBatch(); + } + + @Override + public boolean hasNext() { + return !cachedBatch.isEmpty(); + } + + @Override + public ImmutableList next() { + var toReturn = cachedBatch; + cachedBatch = cachedBatch.size() < batchSize ? ImmutableList.of() : readNextBatch(); + return toReturn; + } + + private ImmutableList readNextBatch() { + Optional lastRead = + cachedBatch == null + ? Optional.empty() + : Optional.ofNullable(Iterables.getLast(cachedBatch, null)); + return batchQuery.readBatch(lastRead, batchSize); + } + } +} diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java index 860b2c214..7410dba5c 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java @@ -20,6 +20,7 @@ import jakarta.persistence.EntityManager; import jakarta.persistence.Query; import jakarta.persistence.TypedQuery; import jakarta.persistence.criteria.CriteriaQuery; +import jakarta.persistence.metamodel.Metamodel; /** Sub-interface of {@link TransactionManager} which defines JPA related methods. */ public interface JpaTransactionManager extends TransactionManager { @@ -31,6 +32,9 @@ public interface JpaTransactionManager extends TransactionManager { */ EntityManager getStandaloneEntityManager(); + /** Returns the JPA {@link Metamodel}. */ + Metamodel getMetaModel(); + /** * Returns the {@link EntityManager} for the current request. * diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index f693630ea..041d06a8f 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -55,6 +55,7 @@ import jakarta.persistence.TemporalType; import jakarta.persistence.TypedQuery; import jakarta.persistence.criteria.CriteriaQuery; import jakarta.persistence.metamodel.EntityType; +import jakarta.persistence.metamodel.Metamodel; import java.io.Serializable; import java.lang.reflect.Array; import java.lang.reflect.Field; @@ -116,6 +117,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { return emf.createEntityManager(); } + @Override + public Metamodel getMetaModel() { + return this.emf.getMetamodel(); + } + @Override public EntityManager getEntityManager() { assertInTransaction(); diff --git a/core/src/main/java/google/registry/tools/server/ListHostsAction.java b/core/src/main/java/google/registry/tools/server/ListHostsAction.java index 461ceed38..cf3995c44 100644 --- a/core/src/main/java/google/registry/tools/server/ListHostsAction.java +++ b/core/src/main/java/google/registry/tools/server/ListHostsAction.java @@ -15,11 +15,12 @@ package google.registry.tools.server; import static com.google.common.collect.ImmutableSortedSet.toImmutableSortedSet; -import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.BatchedQueries.loadAllOf; import static google.registry.request.Action.Method.GET; import static google.registry.request.Action.Method.POST; import static java.util.Comparator.comparing; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import google.registry.model.EppResourceUtils; import google.registry.model.host.Host; @@ -51,7 +52,8 @@ public final class ListHostsAction extends ListObjectsAction { @Override public ImmutableSet loadObjects() { final DateTime now = clock.nowUtc(); - return tm().transact(() -> tm().loadAllOf(Host.class)).stream() + return loadAllOf(Host.class) + .flatMap(ImmutableList::stream) .filter(host -> EppResourceUtils.isActive(host, now)) .collect(toImmutableSortedSet(comparing(Host::getHostName))); } diff --git a/core/src/test/java/google/registry/persistence/transaction/BatchedQueriesTest.java b/core/src/test/java/google/registry/persistence/transaction/BatchedQueriesTest.java new file mode 100644 index 000000000..4c617389e --- /dev/null +++ b/core/src/test/java/google/registry/persistence/transaction/BatchedQueriesTest.java @@ -0,0 +1,105 @@ +// Copyright 2024 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.persistence.transaction; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.persistence.transaction.BatchedQueries.loadAllOf; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.testing.DatabaseHelper.persistResource; + +import com.google.common.collect.ImmutableList; +import google.registry.model.ImmutableObject; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class BatchedQueriesTest { + + @RegisterExtension + final JpaTestExtensions.JpaUnitTestExtension jpa = + new JpaTestExtensions.Builder() + .withEntityClass(LongIdEntity.class, StringIdEntity.class) + .buildUnitTestExtension(); + + @Test + void loadAllOf_noData() { + assertThat(loadAllOf(StringIdEntity.class)).isEmpty(); + } + + @Test + void loadAllOf_oneEntry() { + StringIdEntity entity = persistResource(new StringIdEntity("C1")); + assertThat(loadAllOf(StringIdEntity.class)).containsExactly(ImmutableList.of(entity)); + } + + @Test + void loadAllOf_multipleEntries_fullBatches() { + // Insert in reverse order. In practice the result of "FROM Contact" will be in this order. + // This tests that the `order by` clause is present in the query. + StringIdEntity entity4 = persistResource(new StringIdEntity("C4")); + StringIdEntity entity3 = persistResource(new StringIdEntity("C3")); + StringIdEntity entity2 = persistResource(new StringIdEntity("C2")); + StringIdEntity entity1 = persistResource(new StringIdEntity("C1")); + assertThat(loadAllOf(StringIdEntity.class, 2)) + .containsExactly(ImmutableList.of(entity1, entity2), ImmutableList.of(entity3, entity4)) + .inOrder(); + } + + @Test + void loadAllOf_multipleEntries_withPartialBatch() { + StringIdEntity entity1 = persistResource(new StringIdEntity("C1")); + StringIdEntity entity2 = persistResource(new StringIdEntity("C2")); + StringIdEntity entity3 = persistResource(new StringIdEntity("C3")); + StringIdEntity entity4 = persistResource(new StringIdEntity("C4")); + assertThat(loadAllOf(StringIdEntity.class, 3)) + .containsExactly(ImmutableList.of(entity1, entity2, entity3), ImmutableList.of(entity4)) + .inOrder(); + } + + @Test + void loadAllOf_multipleEntries_withLongNumberAsId() { + LongIdEntity testEntity2 = new LongIdEntity(2L); + LongIdEntity testEntity10 = new LongIdEntity(10L); + tm().transact(() -> tm().put(testEntity2)); + tm().transact(() -> tm().put(testEntity10)); + + assertThat(loadAllOf(LongIdEntity.class, 1)) + .containsExactly(ImmutableList.of(testEntity2), ImmutableList.of(testEntity10)) + .inOrder(); + } + + @Entity(name = "StringIdEntity") + static class StringIdEntity extends ImmutableObject { + @Id String id; + + StringIdEntity() {} + + private StringIdEntity(String id) { + this.id = id; + } + } + + @Entity(name = "LongIdEntity") + private static class LongIdEntity extends ImmutableObject { + @Id long entityId; + + LongIdEntity() {} + + private LongIdEntity(long id) { + this.entityId = id; + } + } +}