1
0
mirror of https://github.com/google/nomulus synced 2026-02-10 06:50:30 +00:00

Fix the JPA Read connector for large data (#1155)

* Fix the JPA Read connector for large data

Allow result set streaming by setting the fetchSize on JDBC statements.
Many JDBC drivers by default buffers the entire result set, causing
delays in first result and/or out of memory errors.

Also fixed a entity instantiation problem exposed in production runs.

Lastly, removed incorrect comments.
This commit is contained in:
Weimin Yu
2021-05-12 19:07:38 -04:00
committed by GitHub
parent 85bac9834f
commit 66ac000ef4
4 changed files with 58 additions and 54 deletions

View File

@@ -30,12 +30,7 @@ import google.registry.testing.DatabaseHelper;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.SystemPropertyExtension;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
@@ -88,7 +83,7 @@ public class RegistryJpaReadTest {
}
@Test
void nonTransactionalQuery_noDedupe() {
void nonTransactionalQuery() {
Read<ContactResource, String> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
@@ -98,22 +93,4 @@ public class RegistryJpaReadTest {
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
testPipeline.run();
}
@Test
void nonTransactionalQuery_dedupe() {
// This method only serves as an example of deduplication. Duplicates are not actually added.
Read<ContactResource, KV<String, String>> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
contact -> KV.of(contact.getRepoId(), contact.getContactId()))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<String> repoIds =
testPipeline
.apply(read)
.apply("Deduplicate", Deduplicate.keyedValues())
.apply("Get values", Values.create());
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
testPipeline.run();
}
}