From cfee4713edf490ba0424ba23f20771786e73ba71 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Mon, 12 Dec 2022 11:55:24 -0500 Subject: [PATCH] Remove sharding parameter from RegistryJpaIO (#1856) This parameter is misleading and does not do what it purports to do. Namely, it does not impact the level of parallelism. Given the input n for this parameter, and m for the batch size, the elements are divided (keyed) into n groups, each of which are then spread evenly across all threads, which are eventually in turn batched into batches with size m: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L227 This is also evident in the implementation itself, where the ShardedKey is determined by the unique number for a worker/thread combo and the original key: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L268 Using a more concrete example, suppose we have 100 elements and 10 worker threads, with a target batch size of 5. If the "shard" number is set to 1, we first spread the 100 elements across 10 threads, resulting in 10 elements per thread, each thread then batches the elements into 2 batches of size 5. If the "shard" number is set to 2, the 100 elements are first divided into 2 "shards" of 50 each. Each "shard" is then distributed within the 10 threads, resulting in 5 elements per "shard" per thread. They then get turned into 1 batch per "shard" per thread. In the end, each thread still processes 2 batches, even though they are from 2 different "shards". Therefore this "shard" number does not perform horizontal partitioning that one normally associates with sharding, and provides no performance benefits but rather confuses the user. It is also suggested that using withShardedKey() alone is already sufficient to achieve auto-sharding within the keyed group. There is no need to manually divide the input by keying them differently based on the "shard" number specified: https://youtu.be/jses0W4Zalc?t=967 --- .../registry/beam/common/RegistryJpaIO.java | 40 +------------------ .../beam/common/RegistryPipelineOptions.java | 15 ++----- .../resave/ResaveAllEppResourcesPipeline.java | 5 +-- .../registry/beam/spec11/Spec11Pipeline.java | 1 - .../beam/invoicing_pipeline_metadata.json | 9 ----- .../beam/spec11_pipeline_metadata.json | 9 ----- .../beam/common/RegistryJpaWriteTest.java | 2 +- 7 files changed, 6 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 2ad78f4da..2a36a7e25 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -26,7 +26,6 @@ import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import javax.persistence.criteria.CriteriaQuery; import org.apache.beam.sdk.coders.Coder; @@ -263,38 +262,11 @@ public final class RegistryJpaIO { public static final int DEFAULT_BATCH_SIZE = 1; - /** The default number of write shard. Please refer to {@link #shards} for more information. */ - public static final int DEFAULT_SHARDS = 1; - public abstract String name(); /** Number of elements to be written in one call. */ public abstract int batchSize(); - /** - * The number of shards the output should be split into. - * - *

This value is a hint to the pipeline runner on the level of parallelism, and should be - * significantly greater than the number of threads working on this transformation (see next - * paragraph for more information). On the other hand, it should not be too large to the point - * that the number of elements per shard is lower than {@link #batchSize()}. As a rule of thumb, - * the following constraint should hold: {@code shards * batchSize * nThreads <= - * inputElementCount}. Although it is not always possible to determine the number of threads - * working on this transform, when the pipeline run is IO-bound, it most likely is close to the - * total number of threads in the pipeline, which is explained below. - * - *

With Cloud Dataflow runner, the total number of worker threads in a batch pipeline (which - * includes all existing Registry pipelines) is the number of vCPUs used by the pipeline, and - * can be set by the {@code --maxNumWorkers} and {@code --workerMachineType} parameters. The - * number of worker threads in a streaming pipeline can be set by the {@code --maxNumWorkers} - * and {@code --numberOfWorkerHarnessThreads} parameters. - * - *

Note that connections on the database server are a limited resource, therefore the number - * of threads that interact with the database should be set to an appropriate limit. Again, we - * cannot control this number, but can influence it by controlling the total number of threads. - */ - public abstract int shards(); - public abstract SerializableFunction jpaConverter(); public Write withName(String name) { @@ -305,10 +277,6 @@ public final class RegistryJpaIO { return toBuilder().batchSize(batchSize).build(); } - public Write withShards(int shards) { - return toBuilder().shards(shards).build(); - } - /** * An optional function that converts the input entities to a form that can be written into the * database. @@ -322,10 +290,7 @@ public final class RegistryJpaIO { @Override public PCollection expand(PCollection input) { return input - .apply( - "Shard data " + name(), - WithKeys.of(e -> ThreadLocalRandom.current().nextInt(shards())) - .withKeyType(integers())) + .apply("Add key to data " + name(), WithKeys.of(0).withKeyType(integers())) // The call to withShardedKey() is performance critical. The resulting transform ensures // that data is spread evenly across all worker threads. .apply( @@ -340,7 +305,6 @@ public final class RegistryJpaIO { return new AutoValue_RegistryJpaIO_Write.Builder() .name(DEFAULT_NAME) .batchSize(DEFAULT_BATCH_SIZE) - .shards(DEFAULT_SHARDS) .jpaConverter(x -> x); } @@ -351,8 +315,6 @@ public final class RegistryJpaIO { abstract Builder batchSize(int batchSize); - abstract Builder shards(int jdbcNumConnsHint); - abstract Builder jpaConverter(SerializableFunction jpaConverter); abstract Write build(); diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index 74186e2e4..f4f350d6a 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -14,7 +14,6 @@ package google.registry.beam.common; -import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; @@ -29,9 +28,9 @@ import org.apache.beam.sdk.options.Description; * Defines Nomulus-specific pipeline options, e.g. JPA configurations. * *

When using the Cloud Dataflow runner, users are recommended to set an upper bound on active - * database connections by setting the pipeline worker options including {@code --maxNumWorkers}, - * {@code workerMachineType}, and {@code numberOfWorkerHarnessThreads}. Please refer to {@link - * Write#shards()} for more information. + * database connections by setting the max number of pipeline worker threads using {@code + * --maxNumWorkers} and {@code workerMachineType} for batch pipelines, or {@code --maxNumWorkers} + * and {@code --numberOfWorkerHarnessThreads} for streaming pipelines. */ public interface RegistryPipelineOptions extends GcpOptions { @@ -58,14 +57,6 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteBatchSize(int sqlWriteBatchSize); - @Description( - "Number of shards to create out of the data before writing to the SQL database. Please refer " - + "to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.") - @Default.Integer(100) - int getSqlWriteShards(); - - void setSqlWriteShards(int maxConcurrentSqlWriters); - @DeleteAfterMigration @Description( "Whether to use self allocated primary IDs when building entities. This should only be used" diff --git a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java index 2f44aec28..a16bff202 100644 --- a/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java +++ b/core/src/main/java/google/registry/beam/resave/ResaveAllEppResourcesPipeline.java @@ -33,7 +33,6 @@ import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.VKey; import google.registry.util.DateTimeUtils; import java.io.Serializable; -import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -143,15 +142,13 @@ public class ResaveAllEppResourcesPipeline implements Serializable { /** Projects and re-saves all resources with repo IDs provided by the {@link Read}. */ private void projectAndResaveResources( Pipeline pipeline, Class clazz, Read repoIdRead) { - int numShards = options.getSqlWriteShards(); int batchSize = options.getSqlWriteBatchSize(); String className = clazz.getSimpleName(); pipeline .apply("Read " + className, repoIdRead) .apply( "Shard data for class" + className, - WithKeys.of(e -> ThreadLocalRandom.current().nextInt(numShards)) - .withKeyType(integers())) + WithKeys.of(0).withKeyType(integers())) .apply( "Group into batches for class" + className, GroupIntoBatches.ofSize(batchSize).withShardedKey()) diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index d012bd649..d1f60fe0e 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -162,7 +162,6 @@ public class Spec11Pipeline implements Serializable { RegistryJpaIO.>write() .withName(transformId) .withBatchSize(options.getSqlWriteBatchSize()) - .withShards(options.getSqlWriteShards()) .withJpaConverter( (kv) -> { DomainNameInfo domainNameInfo = kv.getKey(); diff --git a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json index dc7a8b040..8e95264b8 100644 --- a/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/invoicing_pipeline_metadata.json @@ -29,15 +29,6 @@ "^[1-9][0-9]*$" ] }, - { - "name": "sqlWriteShards", - "label": "Number of output shards to create when writing to SQL.", - "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", - "is_optional": true, - "regexes": [ - "^[1-9][0-9]*$" - ] - }, { "name": "yearMonth", "label": "The year and month we generate invoice and detailed reports for.", diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json index 1e86a5c59..13b0266f2 100644 --- a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json @@ -29,15 +29,6 @@ "^[1-9][0-9]*$" ] }, - { - "name": "sqlWriteShards", - "label": "Number of output shards to create when writing to SQL.", - "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", - "is_optional": true, - "regexes": [ - "^[1-9][0-9]*$" - ] - }, { "name": "date", "label": "The date when the pipeline runs", diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index b52645294..37b61b483 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -59,7 +59,7 @@ class RegistryJpaWriteTest implements Serializable { ImmutableList contacts = contactsBuilder.build(); testPipeline .apply(Create.of(contacts)) - .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4).withShards(2)); + .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4)); testPipeline.run().waitUntilFinish(); assertThat(loadAllOf(Contact.class))