Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3401,7 +3401,7 @@ public Write<T> withNumFileShards(int numFileShards) {
* runner determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()}
* instead.
*
* <p>For batch pipelines, it inserts a redistribute. To not reshufle and keep the pipeline
* <p>For batch pipelines, it inserts a redistribute. To not reshuffle and keep the pipeline
* parallelism as is, set this to zero.
*/
public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
Expand Down Expand Up @@ -3834,13 +3834,9 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn(
"Setting the triggering frequency is only applicable to an unbounded PCollection.");
}
if (getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn(
"Setting the number of Storage API streams is only applicable to an unbounded PCollection.");
}
}

if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) {
if (method != Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn(
"Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"When writing with a streaming pipeline, the BigQueryIO.Write will default to using this number of Storage Write API streams. ")
"When writing with STORAGE_WRITE_API, BigQueryIO.Write will default to using this number of Storage Write API streams. ")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> inputRows = input.getSinglePCollection();

BigQueryIO.Write<Row> write = createStorageWriteApiTransform(inputRows.getSchema());
int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams();

if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
}
Comment thread
SPUERSAIYAN marked this conversation as resolved.
Outdated

if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams();

boolean useAtLeastOnceSemantics =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -196,10 +200,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
}
// set num streams if specified, otherwise default to autoSharding
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
} else if (autoSharding == null || autoSharding) {
// Default to autoSharding for unbounded writes when a fixed stream count is not specified.
if (numStreams <= 0 && (autoSharding == null || autoSharding)) {
write = write.withAutoSharding();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public static Builder builder() {
public abstract Boolean getAutoSharding();

@SchemaFieldDescription(
"Specifies the number of write streams that the Storage API sink will use. "
+ "This parameter is only applicable when writing unbounded data.")
"Specifies the number of write streams that the Storage API sink will use.")
@Nullable
public abstract Integer getNumStreams();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,24 @@ public void testErrorCount() throws Exception {
}
}

@Test
public void testNumStreamsAppliesToBoundedStorageApiWrites() {
BigQueryWriteConfiguration config =
BigQueryWriteConfiguration.builder()
.setTable("project:dataset.bounded_fixed_num_streams")
.setNumStreams(3)
.build();

runWithConfig(config);

RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
boolean hasFixedNumStreamsRedistribute =
pipelineProto.getComponents().getTransformsMap().values().stream()
.anyMatch(tr -> tr.getUniqueName().contains("ResdistibuteNumShards"));
assertTrue(hasFixedNumStreamsRedistribute);
p.enableAbandonedNodeEnforcement(false);
}

@Test
public void testManagedChoosesStorageApiForUnboundedWrites() {
PCollection<Row> batchInput =
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2155,8 +2155,7 @@ def __init__(
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
num_storage_api_streams: Specifies the number of write streams that the
Storage API sink will use. This parameter is only applicable when
writing unbounded data.
Storage API sink will use.
ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,8 @@ TableSchema schema = new TableSchema().setFields(
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_schema >}}
{{< /highlight >}}

For streaming pipelines, you need to set two additional parameters: the number
of streams and the triggering frequency.
For streaming pipelines, you need to set the triggering frequency. You can also
set the number of streams for both batch and streaming pipelines.

{{< highlight java >}}
BigQueryIO.writeTableRows()
Expand All @@ -855,7 +855,8 @@ pipeline uses. You can set it explicitly on the transform via
[`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-)
or provide the `numStorageWriteApiStreams` option to the pipeline as defined in
[`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html).
Please note this is only supported for streaming pipelines.
For batch pipelines, a non-zero stream count inserts a redistribute; set it to
0 or leave it unspecified to keep the pipeline parallelism as is.

Triggering frequency determines how soon the data is visible for querying in
BigQuery. You can explicitly set it via
Expand All @@ -874,10 +875,10 @@ pipelines.

{{< paragraph class="language-java" wrap="span">}}
Similar to streaming inserts, `STORAGE_WRITE_API` supports dynamically determining
the number of parallel streams to write to BigQuery (starting 2.42.0). You can
explicitly enable this using [`withAutoSharding`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--).
the number of parallel streams to write to BigQuery for streaming pipelines
(starting 2.42.0). You can explicitly enable this using [`withAutoSharding`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--).

`STORAGE_WRITE_API` defaults to dynamic sharding when
For streaming writes, `STORAGE_WRITE_API` defaults to dynamic sharding when
`numStorageWriteApiStreams` is set to 0 or is unspecified.

{{< /paragraph >}}
Expand Down
Loading