diff --git a/java/google/registry/beam/BUILD b/java/google/registry/beam/BUILD
index e96d0117d..842dcf407 100644
--- a/java/google/registry/beam/BUILD
+++ b/java/google/registry/beam/BUILD
@@ -6,4 +6,15 @@ licenses(["notice"]) # Apache 2.0
java_library(
name = "beam",
+ srcs = glob(["*.java"]),
+ deps = [
+ "@com_google_flogger",
+ "@com_google_flogger_system_backend",
+ "@com_google_guava",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ ],
)
diff --git a/java/google/registry/beam/BeamUtils.java b/java/google/registry/beam/BeamUtils.java
new file mode 100644
index 000000000..faa603a81
--- /dev/null
+++ b/java/google/registry/beam/BeamUtils.java
@@ -0,0 +1,57 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+
+/** Static utilities for {@code Beam} pipelines. */
+public class BeamUtils {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ /** Extracts a string representation of a field in a {@link GenericRecord}. */
+ public static String extractField(GenericRecord record, String fieldName) {
+ return String.valueOf(record.get(fieldName));
+ }
+
+ /**
+ * Checks that no expected fields in the record are missing.
+ *
+ *
Note that this simply makes sure the field is not null; it may still generate a parse error
+ * when interpreting the string representation of an object.
+ *
+ * @throws IllegalStateException if the record returns null for any field in {@code fieldNames}
+ */
+ public static void checkFieldsNotNull(
+ ImmutableList fieldNames, SchemaAndRecord schemaAndRecord) {
+ GenericRecord record = schemaAndRecord.getRecord();
+ ImmutableList nullFields =
+ fieldNames
+ .stream()
+ .filter(fieldName -> record.get(fieldName) == null)
+ .collect(ImmutableList.toImmutableList());
+ String missingFieldList = Joiner.on(", ").join(nullFields);
+ if (!nullFields.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Read unexpected null value for field(s) %s for record %s",
+ missingFieldList, record));
+ }
+ }
+}
diff --git a/java/google/registry/beam/invoicing/BUILD b/java/google/registry/beam/invoicing/BUILD
index 155b119ec..af2e4b32b 100644
--- a/java/google/registry/beam/invoicing/BUILD
+++ b/java/google/registry/beam/invoicing/BUILD
@@ -9,6 +9,7 @@ java_library(
srcs = glob(["*.java"]),
resources = glob(["sql/*"]),
deps = [
+ "//java/google/registry/beam",
"//java/google/registry/config",
"//java/google/registry/model",
"//java/google/registry/reporting/billing",
diff --git a/java/google/registry/beam/invoicing/BillingEvent.java b/java/google/registry/beam/invoicing/BillingEvent.java
index 719179154..69c108567 100644
--- a/java/google/registry/beam/invoicing/BillingEvent.java
+++ b/java/google/registry/beam/invoicing/BillingEvent.java
@@ -14,6 +14,9 @@
package google.registry.beam.invoicing;
+import static google.registry.beam.BeamUtils.checkFieldsNotNull;
+import static google.registry.beam.BeamUtils.extractField;
+
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -108,7 +111,7 @@ public abstract class BillingEvent implements Serializable {
* Apache AVRO GenericRecord
*/
static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
- checkFieldsNotNull(schemaAndRecord);
+ checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
GenericRecord record = schemaAndRecord.getRecord();
String flags = extractField(record, "flags");
double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags);
@@ -337,30 +340,4 @@ public abstract class BillingEvent implements Serializable {
}
}
}
-
- /** Extracts a string representation of a field in a {@code GenericRecord}. */
- private static String extractField(GenericRecord record, String fieldName) {
- return String.valueOf(record.get(fieldName));
- }
-
- /**
- * Checks that no expected fields in the record are missing.
- *
- * Note that this simply makes sure the field is not null; it may still generate a parse error
- * in {@code parseFromRecord}.
- */
- private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) {
- GenericRecord record = schemaAndRecord.getRecord();
- ImmutableList nullFields =
- FIELD_NAMES
- .stream()
- .filter(fieldName -> record.get(fieldName) == null)
- .collect(ImmutableList.toImmutableList());
- if (!nullFields.isEmpty()) {
- logger.atSevere().log(
- "Found unexpected null value(s) in field(s) %s for record %s",
- Joiner.on(", ").join(nullFields), record);
- throw new IllegalStateException("Read null value from Bigquery query");
- }
- }
}
diff --git a/java/google/registry/beam/invoicing/InvoicingPipeline.java b/java/google/registry/beam/invoicing/InvoicingPipeline.java
index 88c81a1cb..69c8c1f0f 100644
--- a/java/google/registry/beam/invoicing/InvoicingPipeline.java
+++ b/java/google/registry/beam/invoicing/InvoicingPipeline.java
@@ -67,8 +67,8 @@ public class InvoicingPipeline implements Serializable {
String invoiceTemplateUrl;
@Inject
- @Config("invoiceStagingUrl")
- String invoiceStagingUrl;
+ @Config("beamStagingUrl")
+ String beamStagingUrl;
@Inject
@Config("billingBucketUrl")
@@ -99,7 +99,7 @@ public class InvoicingPipeline implements Serializable {
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(invoiceTemplateUrl);
- options.setStagingLocation(invoiceStagingUrl);
+ options.setStagingLocation(beamStagingUrl);
Pipeline p = Pipeline.create(options);
PCollection billingEvents =
diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java
index 428925a41..ea84636da 100644
--- a/java/google/registry/config/RegistryConfig.java
+++ b/java/google/registry/config/RegistryConfig.java
@@ -520,25 +520,38 @@ public final class RegistryConfig {
}
/**
- * Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline.
+ * Returns the URL of the GCS location for storing the monthly spec11 Beam template.
*
- * @see google.registry.beam.invoicing.InvoicingPipeline
+ * @see google.registry.beam.spec11.Spec11Pipeline
*/
@Provides
- @Config("invoiceStagingUrl")
+ @Config("spec11TemplateUrl")
+ public static String provideSpec11TemplateUrl(
+ @Config("apacheBeamBucketUrl") String beamBucketUrl) {
+ return beamBucketUrl + "/templates/spec11";
+ }
+
+ /**
+ * Returns the URL of the GCS location we store jar dependencies for beam pipelines.
+ *
+ * @see google.registry.beam.invoicing.InvoicingPipeline
+ * @see google.registry.beam.spec11.Spec11Pipeline
+ */
+ @Provides
+ @Config("beamStagingUrl")
public static String provideInvoiceStagingUrl(
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
return beamBucketUrl + "/staging";
}
/**
- * Returns the Google Cloud Storage bucket for ICANN transaction and activity reports to
- * be uploaded.
+ * Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports
+ * to be uploaded.
*
* @see google.registry.reporting.icann.IcannReportingUploadAction
*/
@Provides
- @Config("icannReportingBucket")
+ @Config("reportingBucket")
public static String provideIcannReportingBucket(@Config("projectId") String projectId) {
return projectId + "-reporting";
}
@@ -588,6 +601,17 @@ public final class RegistryConfig {
return "gs://" + billingBucket;
}
+ /**
+ * Returns the URL of the GCS subdirectory we store Spec11 reports in.
+ *
+ * @see google.registry.beam.spec11.Spec11Pipeline
+ */
+ @Provides
+ @Config("spec11BucketUrl")
+ public static String provideSpec11BucketUrl(@Config("reportingBucket") String reportingBucket) {
+ return "gs://" + reportingBucket + "/icann/spec11";
+ }
+
/**
* Returns whether or not we should publish invoices to partners automatically by default.
*
diff --git a/java/google/registry/reporting/icann/IcannReportingStager.java b/java/google/registry/reporting/icann/IcannReportingStager.java
index ec94e8b5d..a988c29c6 100644
--- a/java/google/registry/reporting/icann/IcannReportingStager.java
+++ b/java/google/registry/reporting/icann/IcannReportingStager.java
@@ -60,7 +60,7 @@ public class IcannReportingStager {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- @Inject @Config("icannReportingBucket") String reportingBucket;
+ @Inject @Config("reportingBucket") String reportingBucket;
@Inject YearMonth yearMonth;
@Inject @ReportingSubdir
diff --git a/java/google/registry/reporting/icann/IcannReportingUploadAction.java b/java/google/registry/reporting/icann/IcannReportingUploadAction.java
index f5c6d0f7b..1cd6c5371 100644
--- a/java/google/registry/reporting/icann/IcannReportingUploadAction.java
+++ b/java/google/registry/reporting/icann/IcannReportingUploadAction.java
@@ -60,7 +60,7 @@ public final class IcannReportingUploadAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Inject
- @Config("icannReportingBucket")
+ @Config("reportingBucket")
String reportingBucket;
@Inject @ReportingSubdir
diff --git a/java/google/registry/tools/BUILD b/java/google/registry/tools/BUILD
index 3def12767..90e9fb3dc 100644
--- a/java/google/registry/tools/BUILD
+++ b/java/google/registry/tools/BUILD
@@ -36,6 +36,7 @@ java_library(
deps = [
"//java/google/registry/backup",
"//java/google/registry/beam/invoicing",
+ "//java/google/registry/beam/spec11",
"//java/google/registry/bigquery",
"//java/google/registry/config",
"//java/google/registry/dns",
diff --git a/java/google/registry/tools/DeploySpec11PipelineCommand.java b/java/google/registry/tools/DeploySpec11PipelineCommand.java
new file mode 100644
index 000000000..bf0438e47
--- /dev/null
+++ b/java/google/registry/tools/DeploySpec11PipelineCommand.java
@@ -0,0 +1,32 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.tools;
+
+import com.beust.jcommander.Parameters;
+import google.registry.beam.spec11.Spec11Pipeline;
+import javax.inject.Inject;
+
+/** Nomulus command that deploys the {@link Spec11Pipeline} template. */
+@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
+public class DeploySpec11PipelineCommand implements Command {
+
+ @Inject Spec11Pipeline spec11Pipeline;
+
+ @Override
+ public void run() {
+ spec11Pipeline.deploy();
+ }
+}
+
diff --git a/java/google/registry/tools/RegistryTool.java b/java/google/registry/tools/RegistryTool.java
index f0ba33c5a..e46b5980d 100644
--- a/java/google/registry/tools/RegistryTool.java
+++ b/java/google/registry/tools/RegistryTool.java
@@ -54,6 +54,7 @@ public final class RegistryTool {
.put("delete_reserved_list", DeleteReservedListCommand.class)
.put("delete_tld", DeleteTldCommand.class)
.put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
+ .put("deploy_spec11_pipeline", DeploySpec11PipelineCommand.class)
.put("domain_application_info", DomainApplicationInfoCommand.class)
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
.put("execute_epp", ExecuteEppCommand.class)
diff --git a/java/google/registry/tools/RegistryToolComponent.java b/java/google/registry/tools/RegistryToolComponent.java
index 290e1af79..157b69cf9 100644
--- a/java/google/registry/tools/RegistryToolComponent.java
+++ b/java/google/registry/tools/RegistryToolComponent.java
@@ -83,6 +83,7 @@ interface RegistryToolComponent {
void inject(CreateLrpTokensCommand command);
void inject(CreateTldCommand command);
void inject(DeployInvoicingPipelineCommand command);
+ void inject(DeploySpec11PipelineCommand command);
void inject(EncryptEscrowDepositCommand command);
void inject(GenerateAllocationTokensCommand command);
void inject(GenerateApplicationsReportCommand command);
diff --git a/javatests/google/registry/beam/BUILD b/javatests/google/registry/beam/BUILD
index 86750c423..ee5efa64e 100644
--- a/javatests/google/registry/beam/BUILD
+++ b/javatests/google/registry/beam/BUILD
@@ -9,4 +9,27 @@ load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
java_library(
name = "beam",
+ srcs = glob(["*.java"]),
+ deps = [
+ "//java/google/registry/beam",
+ "//javatests/google/registry/testing",
+ "@com_google_dagger",
+ "@com_google_guava",
+ "@com_google_truth",
+ "@com_google_truth_extensions_truth_java8_extension",
+ "@junit",
+ "@org_apache_avro",
+ "@org_apache_beam_runners_direct_java",
+ "@org_apache_beam_runners_google_cloud_dataflow_java",
+ "@org_apache_beam_sdks_java_core",
+ "@org_apache_beam_sdks_java_io_google_cloud_platform",
+ "@org_mockito_all",
+ ],
+)
+
+GenTestRules(
+ name = "GeneratedTestRules",
+ default_test_size = "small",
+ test_files = glob(["*Test.java"]),
+ deps = [":beam"],
)
diff --git a/javatests/google/registry/beam/BeamUtilsTest.java b/javatests/google/registry/beam/BeamUtilsTest.java
new file mode 100644
index 000000000..1f6c9d2db
--- /dev/null
+++ b/javatests/google/registry/beam/BeamUtilsTest.java
@@ -0,0 +1,86 @@
+// Copyright 2018 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam;
+
+import static com.google.common.truth.Truth.assertThat;
+import static google.registry.testing.JUnitBackports.assertThrows;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link BeamUtils} */
+@RunWith(JUnit4.class)
+public class BeamUtilsTest {
+
+ private static final String GENERIC_SCHEMA =
+ "{\"name\": \"AnObject\", "
+ + "\"type\": \"record\", "
+ + "\"fields\": ["
+ + "{\"name\": \"aString\", \"type\": \"string\"},"
+ + "{\"name\": \"aFloat\", \"type\": \"float\"}"
+ + "]}";
+
+ private SchemaAndRecord schemaAndRecord;
+
+ @Before
+ public void initializeRecord() {
+ // Create a record with a given JSON schema.
+ GenericRecord record = new GenericData.Record(new Schema.Parser().parse(GENERIC_SCHEMA));
+ record.put("aString", "hello world");
+ record.put("aFloat", 2.54);
+ schemaAndRecord = new SchemaAndRecord(record, null);
+ }
+
+ @Test
+ public void testExtractField_fieldExists_returnsExpectedStringValues() {
+ assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aString"))
+ .isEqualTo("hello world");
+ assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("2.54");
+ }
+
+ @Test
+ public void testExtractField_fieldDoesntExist_returnsNull() {
+ schemaAndRecord.getRecord().put("aFloat", null);
+ assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("null");
+ assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "missing")).isEqualTo("null");
+ }
+
+ @Test
+ public void testCheckFieldsNotNull_noExceptionIfAllPresent() {
+ BeamUtils.checkFieldsNotNull(ImmutableList.of("aString", "aFloat"), schemaAndRecord);
+ }
+
+ @Test
+ public void testCheckFieldsNotNull_fieldMissing_throwsException() {
+ IllegalStateException expected =
+ assertThrows(
+ IllegalStateException.class,
+ () ->
+ BeamUtils.checkFieldsNotNull(
+ ImmutableList.of("aString", "aFloat", "notAField"), schemaAndRecord));
+ assertThat(expected)
+ .hasMessageThat()
+ .isEqualTo(
+ "Read unexpected null value for field(s) notAField for record "
+ + "{\"aString\": \"hello world\", \"aFloat\": 2.54}");
+ }
+}