diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 2ad78f4da..2a36a7e25 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -26,7 +26,6 @@ import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import javax.persistence.criteria.CriteriaQuery; import org.apache.beam.sdk.coders.Coder; @@ -263,38 +262,11 @@ public final class RegistryJpaIO { public static final int DEFAULT_BATCH_SIZE = 1; - /** The default number of write shard. Please refer to {@link #shards} for more information. */ - public static final int DEFAULT_SHARDS = 1; - public abstract String name(); /** Number of elements to be written in one call. */ public abstract int batchSize(); - /** - * The number of shards the output should be split into. - * - *

This value is a hint to the pipeline runner on the level of parallelism, and should be - * significantly greater than the number of threads working on this transformation (see next - * paragraph for more information). On the other hand, it should not be too large to the point - * that the number of elements per shard is lower than {@link #batchSize()}. As a rule of thumb, - * the following constraint should hold: {@code shards * batchSize * nThreads <= - * inputElementCount}. Although it is not always possible to determine the number of threads - * working on this transform, when the pipeline run is IO-bound, it most likely is close to the - * total number of threads in the pipeline, which is explained below. - * - *

With Cloud Dataflow runner, the total number of worker threads in a batch pipeline (which - * includes all existing Registry pipelines) is the number of vCPUs used by the pipeline, and - * can be set by the {@code --maxNumWorkers} and {@code --workerMachineType} parameters. The - * number of worker threads in a streaming pipeline can be set by the {@code --maxNumWorkers} - * and {@code --numberOfWorkerHarnessThreads} parameters. - * - *

Note that connections on the database server are a limited resource, therefore the number - * of threads that interact with the database should be set to an appropriate limit. Again, we - * cannot control this number, but can influence it by controlling the total number of threads. - */ - public abstract int shards(); - public abstract SerializableFunction jpaConverter(); public Write withName(String name) { @@ -305,10 +277,6 @@ public final class RegistryJpaIO { return toBuilder().batchSize(batchSize).build(); } - public Write withShards(int shards) { - return toBuilder().shards(shards).build(); - } - /** * An optional function that converts the input entities to a form that can be written into the * database. @@ -322,10 +290,7 @@ public final class RegistryJpaIO { @Override public PCollection expand(PCollection input) { return input - .apply( - "Shard data " + name(), - WithKeys.of(e -> ThreadLocalRandom.current().nextInt(shards())) - .withKeyType(integers())) + .apply("Add key to data " + name(), WithKeys.of(0).withKeyType(integers())) // The call to withShardedKey() is performance critical. The resulting transform ensures // that data is spread evenly across all worker threads. .apply( @@ -340,7 +305,6 @@ public final class RegistryJpaIO { return new AutoValue_RegistryJpaIO_Write.Builder() .name(DEFAULT_NAME) .batchSize(DEFAULT_BATCH_SIZE) - .shards(DEFAULT_SHARDS) .jpaConverter(x -> x); } @@ -351,8 +315,6 @@ public final class RegistryJpaIO { abstract Builder batchSize(int batchSize); - abstract Builder shards(int jdbcNumConnsHint); - abstract Builder jpaConverter(SerializableFunction jpaConverter); abstract Write build(); diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index 74186e2e4..f4f350d6a 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -14,7 +14,6 @@ package google.registry.beam.common; -import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; @@ -29,9 +28,9 @@ import org.apache.beam.sdk.options.Description; * Defines Nomulus-specific pipeline options, e.g. JPA configurations. * *

When using the Cloud Dataflow runner, users are recommended to set an upper bound on active - * database connections by setting the pipeline worker options including {@code --maxNumWorkers}, - * {@code workerMachineType}, and {@code numberOfWorkerHarnessThreads}. Please refer to {@link - * Write#shards()} for more information. + * database connections by setting the max number of pipeline worker threads using {@code + * --maxNumWorkers} and {@code workerMachineType} for batch pipelines, or {@code --maxNumWorkers} + * and {@code --numberOfWorkerHarnessThreads} for streaming pipelines. */ public interface RegistryPipelineOptions extends GcpOptions { @@ -58,14 +57,6 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteBatchSize(int sqlWriteBatchSize); - @Description( - "Number of shards to create out of the data before writing to the SQL database. Please refer " - + "to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.") - @Default.Integer(100) - int getSqlWriteShards(); - - void setSqlWriteShards(int maxConcurrentSqlWriters); - @DeleteAfterMigration @Description( "Whether to use self allocated primary IDs when building entities. This should only be used" diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java index 2f44aec28..a16bff202 100644 --- a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java @@ -33,7 +33,6 @@ import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.VKey; import google.registry.util.DateTimeUtils; import java.io.Serializable; -import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -143,15 +142,13 @@ public class ResaveAllEppResourcesPipeline implements Serializable { /** Projects and re-saves all resources with repo IDs provided by the {@link Read}. */ private void projectAndResaveResources( Pipeline pipeline, Class clazz, Read repoIdRead) { - int numShards = options.getSqlWriteShards(); int batchSize = options.getSqlWriteBatchSize(); String className = clazz.getSimpleName(); pipeline .apply("Read " + className, repoIdRead) .apply( "Shard data for class" + className, - WithKeys.of(e -> ThreadLocalRandom.current().nextInt(numShards)) - .withKeyType(integers())) + WithKeys.of(0).withKeyType(integers())) .apply( "Group into batches for class" + className, GroupIntoBatches.ofSize(batchSize).withShardedKey()) diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index d012bd649..d1f60fe0e 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -162,7 +162,6 @@ public class Spec11Pipeline implements Serializable { RegistryJpaIO.>write() .withName(transformId) .withBatchSize(options.getSqlWriteBatchSize()) - .withShards(options.getSqlWriteShards()) .withJpaConverter( (kv) -> { DomainNameInfo domainNameInfo = kv.getKey(); diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json index dc7a8b040..8e95264b8 100644 --- a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json @@ -29,15 +29,6 @@ "^[1-9][0-9]*$" ] }, - { - "name": "sqlWriteShards", - "label": "Number of output shards to create when writing to SQL.", - "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", - "is_optional": true, - "regexes": [ - "^[1-9][0-9]*$" - ] - }, { "name": "yearMonth", "label": "The year and month we generate invoice and detailed reports for.", diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json index 1e86a5c59..13b0266f2 100644 --- a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json @@ -29,15 +29,6 @@ "^[1-9][0-9]*$" ] }, - { - "name": "sqlWriteShards", - "label": "Number of output shards to create when writing to SQL.", - "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", - "is_optional": true, - "regexes": [ - "^[1-9][0-9]*$" - ] - }, { "name": "date", "label": "The date when the pipeline runs", diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index b52645294..37b61b483 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -59,7 +59,7 @@ class RegistryJpaWriteTest implements Serializable { ImmutableList contacts = contactsBuilder.build(); testPipeline .apply(Create.of(contacts)) - .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4).withShards(2)); + .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4)); testPipeline.run().waitUntilFinish(); assertThat(loadAllOf(Contact.class))