diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b222b358f547..3dd3dbdc2728 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2483,6 +2483,7 @@ public static Write write() { .setSchemaUpdateOptions(Collections.emptySet()) .setNumFileShards(0) .setNumStorageWriteApiStreams(0) + .setNumStorageWriteApiStreamsConfigured(false) .setMethod(Write.Method.DEFAULT) .setExtendedErrorInfo(false) .setSkipInvalidRows(false) @@ -2733,6 +2734,8 @@ public enum Method { abstract int getNumStorageWriteApiStreams(); + abstract boolean isNumStorageWriteApiStreamsConfigured(); + abstract boolean getPropagateSuccessfulStorageApiWrites(); abstract Predicate getPropagateSuccessfulStorageApiWritesPredicate(); @@ -2850,6 +2853,8 @@ abstract Builder setAvroSchemaFactory( abstract Builder setNumStorageWriteApiStreams(int numStorageApiStreams); + abstract Builder setNumStorageWriteApiStreamsConfigured(boolean configured); + abstract Builder setPropagateSuccessfulStorageApiWrites( boolean propagateSuccessfulStorageApiWrites); @@ -3401,11 +3406,14 @@ public Write withNumFileShards(int numFileShards) { * runner determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()} * instead. * - *

For batch pipelines, it inserts a redistribute. To not reshufle and keep the pipeline + *

For batch pipelines, it inserts a redistribute. To not reshuffle and keep the pipeline * parallelism as is, set this to zero. */ public Write withNumStorageWriteApiStreams(int numStorageWriteApiStreams) { - return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build(); + return toBuilder() + .setNumStorageWriteApiStreams(numStorageWriteApiStreams) + .setNumStorageWriteApiStreamsConfigured(true) + .build(); } /** @@ -3736,7 +3744,7 @@ private Duration getStorageApiTriggeringFrequency(BigQueryOptions options) { } private int getStorageApiNumStreams(BigQueryOptions options) { - if (getNumStorageWriteApiStreams() != 0) { + if (isNumStorageWriteApiStreamsConfigured()) { return getNumStorageWriteApiStreams(); } return options.getNumStorageWriteApiStreams(); @@ -3834,13 +3842,9 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) { LOG.warn( "Setting the triggering frequency is only applicable to an unbounded PCollection."); } - if (getStorageApiNumStreams(bqOptions) != 0) { - LOG.warn( - "Setting the number of Storage API streams is only applicable to an unbounded PCollection."); - } } - if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) { + if (method != Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) != 0) { LOG.warn( "Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index dd59939726bf..ca5a8314e074 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -534,7 +534,9 @@ public Row toConfigRow(Write transform) { fieldValues.put("max_file_size", transform.getMaxFileSize()); } fieldValues.put("num_file_shards", transform.getNumFileShards()); - fieldValues.put("num_storage_write_api_streams", transform.getNumStorageWriteApiStreams()); + if (transform.isNumStorageWriteApiStreamsConfigured()) { + fieldValues.put("num_storage_write_api_streams", transform.getNumStorageWriteApiStreams()); + } fieldValues.put( "propagate_successful_storage_api_writes", transform.getPropagateSuccessfulStorageApiWrites()); @@ -615,7 +617,10 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0"; try { - BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>(); + BigQueryIO.Write.Builder builder = + new AutoValue_BigQueryIO_Write.Builder<>() + .setNumStorageWriteApiStreams(0) + .setNumStorageWriteApiStreamsConfigured(false); String jsonTableRef = configRow.getString("json_table_ref"); if (jsonTableRef != null) { @@ -749,7 +754,10 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { } Integer numStorageWriteApiStreams = configRow.getInt32("num_storage_write_api_streams"); if (numStorageWriteApiStreams != null) { - builder = builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams); + builder = + builder + .setNumStorageWriteApiStreams(numStorageWriteApiStreams) + .setNumStorageWriteApiStreamsConfigured(true); } if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") >= 0) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index face2ef5841a..44d1d2c4cb0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -102,7 +102,7 @@ public interface BigQueryOptions void setUseStorageWriteApiAtLeastOnce(Boolean value); @Description( - "When writing with a streaming pipeline, the BigQueryIO.Write will default to using this number of Storage Write API streams. ") + "When writing with STORAGE_WRITE_API, BigQueryIO.Write will default to using this number of Storage Write API streams. ") @Default.Integer(0) Integer getNumStorageWriteApiStreams(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index e3d947235015..2f07aa4fe85b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -179,11 +179,16 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection inputRows = input.getSinglePCollection(); BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema()); + Integer configuredStreams = configuration.getNumStreams(); + int numStreams = configuredStreams == null ? 0 : configuredStreams; + + if (configuredStreams != null) { + write = write.withNumStorageWriteApiStreams(configuredStreams); + } if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); - int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); boolean useAtLeastOnceSemantics = configuration.getUseAtLeastOnceSemantics() != null @@ -196,10 +201,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { ? DEFAULT_TRIGGERING_FREQUENCY : Duration.standardSeconds(triggeringFrequency)); } - // set num streams if specified, otherwise default to autoSharding - if (numStreams > 0) { - write = write.withNumStorageWriteApiStreams(numStreams); - } else if (autoSharding == null || autoSharding) { + // Default to autoSharding for unbounded writes when a fixed stream count is not specified. + if (numStreams <= 0 && (autoSharding == null || autoSharding)) { write = write.withAutoSharding(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 55d7f7c8d72a..1f1f18f02190 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -101,6 +101,12 @@ public void validate() { Boolean autoSharding = getAutoSharding(); Integer numStreams = getNumStreams(); + if (numStreams != null) { + checkArgument( + numStreams >= 0, + invalidConfigMessage + "Number of streams must be non-negative, but was: %s", + numStreams); + } if (autoSharding != null && autoSharding && numStreams != null) { checkArgument( numStreams == 0, @@ -152,8 +158,7 @@ public static Builder builder() { public abstract Boolean getAutoSharding(); @SchemaFieldDescription( - "Specifies the number of write streams that the Storage API sink will use. " - + "This parameter is only applicable when writing unbounded data.") + "Specifies the number of write streams that the Storage API sink will use.") @Nullable public abstract Integer getNumStreams(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 584309778286..adff21ecdb6c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -39,6 +39,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; @@ -122,7 +123,10 @@ public void testInvalidConfig() { Arrays.asList( BigQueryWriteConfiguration.builder() .setTable("project:dataset.table") - .setCreateDisposition("INVALID_DISPOSITION")); + .setCreateDisposition("INVALID_DISPOSITION"), + BigQueryWriteConfiguration.builder() + .setTable("project:dataset.table") + .setNumStreams(-1)); for (BigQueryWriteConfiguration.Builder config : invalidConfigs) { assertThrows( @@ -483,6 +487,43 @@ public void testErrorCount() throws Exception { } } + @Test + public void testNumStreamsAppliesToBoundedStorageApiWrites() { + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() + .setTable("project:dataset.bounded_fixed_num_streams") + .setNumStreams(3) + .build(); + + runWithConfig(config); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + boolean hasFixedNumStreamsRedistribute = + pipelineProto.getComponents().getTransformsMap().values().stream() + .anyMatch(tr -> tr.getUniqueName().contains("ResdistibuteNumShards")); + assertTrue(hasFixedNumStreamsRedistribute); + p.enableAbandonedNodeEnforcement(false); + } + + @Test + public void testZeroNumStreamsOverridesPipelineOptionForBoundedStorageApiWrites() { + p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3); + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() + .setTable("project:dataset.bounded_zero_num_streams") + .setNumStreams(0) + .build(); + + runWithConfig(config); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + boolean hasFixedNumStreamsRedistribute = + pipelineProto.getComponents().getTransformsMap().values().stream() + .anyMatch(tr -> tr.getUniqueName().contains("ResdistibuteNumShards")); + assertTrue(!hasFixedNumStreamsRedistribute); + p.enableAbandonedNodeEnforcement(false); + } + @Test public void testManagedChoosesStorageApiForUnboundedWrites() { PCollection batchInput = diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index d751d60c905f..a2d17f12569e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2155,8 +2155,7 @@ def __init__( all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only applicable to unbounded input. num_storage_api_streams: Specifies the number of write streams that the - Storage API sink will use. This parameter is only applicable when - writing unbounded data. + Storage API sink will use. ignore_unknown_columns: Accept rows that contain values that do not match the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. This option is only valid for diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md index 9c205f092663..7db827ce190a 100644 --- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md +++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md @@ -835,8 +835,8 @@ TableSchema schema = new TableSchema().setFields( {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_schema >}} {{< /highlight >}} -For streaming pipelines, you need to set two additional parameters: the number -of streams and the triggering frequency. +For streaming pipelines, you need to set the triggering frequency. You can also +set the number of streams for both batch and streaming pipelines. {{< highlight java >}} BigQueryIO.writeTableRows() @@ -855,7 +855,8 @@ pipeline uses. You can set it explicitly on the transform via [`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-) or provide the `numStorageWriteApiStreams` option to the pipeline as defined in [`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html). -Please note this is only supported for streaming pipelines. +For batch pipelines, a non-zero stream count inserts a redistribute; set it to +0 or leave it unspecified to keep the pipeline parallelism as is. Triggering frequency determines how soon the data is visible for querying in BigQuery. You can explicitly set it via @@ -874,10 +875,10 @@ pipelines. {{< paragraph class="language-java" wrap="span">}} Similar to streaming inserts, `STORAGE_WRITE_API` supports dynamically determining -the number of parallel streams to write to BigQuery (starting 2.42.0). You can -explicitly enable this using [`withAutoSharding`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--). +the number of parallel streams to write to BigQuery for streaming pipelines +(starting 2.42.0). You can explicitly enable this using [`withAutoSharding`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--). -`STORAGE_WRITE_API` defaults to dynamic sharding when +For streaming writes, `STORAGE_WRITE_API` defaults to dynamic sharding when `numStorageWriteApiStreams` is set to 0 or is unspecified. {{< /paragraph >}}