diff --git a/docs/en/developer/sink-connector-development.md b/docs/en/developer/sink-connector-development.md index a96625f97c17..b97bcbdf2576 100644 --- a/docs/en/developer/sink-connector-development.md +++ b/docs/en/developer/sink-connector-development.md @@ -141,6 +141,31 @@ Use an aggregated committer when: This model is especially important for table-oriented sinks and strong consistency use cases. +## Sink Partition Strategy + +A sink can declare an engine-side routing requirement by overriding +`SeaTunnelSink#getPartitionStrategy()`. Keep the default empty strategy unless writer correctness +depends on all rows for the same logical key reaching the same sink writer, such as document +lifecycle, upsert, or stale chunk cleanup for RAG/vector sinks. + +```java +@Override +public Optional getPartitionStrategy() { + return Optional.of( + SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id"))); +} +``` + +Guidelines: + +- Use physical input field names from the sink's consumed `SeaTunnelRowType`. +- Keep the routing key small and stable, for example `document_id`. +- Route only ordinary data records; checkpoint barriers, schema-change events, and other control + records must keep their original control-flow semantics. +- `Optional.empty()` and `SinkPartitionStrategy.none()` keep the existing routing behavior. +- Current engine scope is Zeta-oriented. Flink and Spark translations fail fast when a non-empty + sink partition strategy is declared so users do not assume the strategy is silently honored. + ## CDC-Aware Sink Design If the sink accepts CDC input, define the mapping very clearly: diff --git a/docs/zh/developer/sink-connector-development.md b/docs/zh/developer/sink-connector-development.md index e842f7b4e8ce..1eecb0f5eb87 100644 --- a/docs/zh/developer/sink-connector-development.md +++ b/docs/zh/developer/sink-connector-development.md @@ -139,6 +139,30 @@ connector-/ 对表类 sink 和强一致性场景,这种模型尤其重要。 +## Sink Partition Strategy + +如果 sink writer 的正确性依赖同一个逻辑 key 的所有行进入同一个 writer,可以覆写 +`SeaTunnelSink#getPartitionStrategy()` 声明引擎侧路由需求。常见场景是 RAG/vector sink +按 `document_id` 做文档级生命周期处理、upsert、delete 或 stale chunk 清理。 + +```java +@Override +public Optional getPartitionStrategy() { + return Optional.of( + SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id"))); +} +``` + +建议: + +- 字段名使用 sink 输入 `SeaTunnelRowType` 里的物理字段名。 +- 路由 key 应尽量小且稳定,例如 `document_id`。 +- 路由只应该影响普通 data record;checkpoint barrier、schema change event 和其他控制消息 + 必须保持原有控制流语义。 +- `Optional.empty()` 和 `SinkPartitionStrategy.none()` 都表示保持现有路由行为。 +- 当前支持范围以 Zeta 为目标;Flink 和 Spark translation 遇到非空 sink partition strategy + 会 fail-fast,避免用户误以为该策略已经静默生效。 + ## CDC-Aware Sink 设计 如果 sink 接受 CDC 输入,必须把映射规则写清楚: diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java index 954bec748ce4..66bbb8ca2b6b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java @@ -145,4 +145,15 @@ default Optional> getAggregatedCommitInfoSeria default Optional getWriteCatalogTable() { return Optional.empty(); } + + /** + * Get the partition strategy required by this sink. + * + *

The default empty strategy preserves existing sink behavior. + * + * @return Optional sink partition strategy. + */ + default Optional getPartitionStrategy() { + return Optional.empty(); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategy.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategy.java new file mode 100644 index 000000000000..73ef5e545b69 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategy.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** Describes how sink input rows should be partitioned before reaching sink writers. */ +public final class SinkPartitionStrategy implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Mode mode; + private final List partitionFields; + + private SinkPartitionStrategy(Mode mode, List partitionFields) { + this.mode = Objects.requireNonNull(mode, "mode must not be null"); + this.partitionFields = + Collections.unmodifiableList( + new ArrayList<>( + Objects.requireNonNull( + partitionFields, "partitionFields must not be null"))); + } + + /** + * Creates a strategy that keeps the existing sink routing behavior. + * + * @return No-op sink partition strategy. + */ + public static SinkPartitionStrategy none() { + return new SinkPartitionStrategy(Mode.NONE, Collections.emptyList()); + } + + /** + * Creates a strategy that routes rows by hashing the declared physical row fields. + * + * @param partitionFields Physical row field names used to calculate the routing hash. + * @return Hash-by-fields sink partition strategy. + */ + public static SinkPartitionStrategy hashByFields(List partitionFields) { + return new SinkPartitionStrategy(Mode.HASH_BY_FIELDS, partitionFields); + } + + public Mode getMode() { + return mode; + } + + public List getPartitionFields() { + return partitionFields; + } + + public enum Mode { + NONE, + HASH_BY_FIELDS + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidator.java new file mode 100644 index 000000000000..f44a84f55400 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** Validates sink partition strategies against the sink input row type. */ +public final class SinkPartitionStrategyValidator { + + private SinkPartitionStrategyValidator() {} + + /** + * Validates that a sink partition strategy can be applied to the sink input row type. + * + * @param partitionStrategy Optional sink partition strategy declared by a sink. + * @param rowType Sink input row type. + * @param sinkIdentity Human-readable sink identity used in validation errors. + */ + public static void validate( + Optional partitionStrategy, + SeaTunnelRowType rowType, + String sinkIdentity) { + Objects.requireNonNull(partitionStrategy, "partitionStrategy must not be null"); + if (!partitionStrategy.isPresent()) { + return; + } + validate(partitionStrategy.get(), rowType, sinkIdentity); + } + + /** + * Validates that a sink partition strategy can be applied to the sink input row type. + * + * @param partitionStrategy Sink partition strategy declared by a sink. + * @param rowType Sink input row type. + * @param sinkIdentity Human-readable sink identity used in validation errors. + */ + public static void validate( + SinkPartitionStrategy partitionStrategy, + SeaTunnelRowType rowType, + String sinkIdentity) { + Objects.requireNonNull(partitionStrategy, "partitionStrategy must not be null"); + Objects.requireNonNull(rowType, "rowType must not be null"); + + switch (partitionStrategy.getMode()) { + case NONE: + return; + case HASH_BY_FIELDS: + validateHashByFields( + partitionStrategy, rowType, normalizeSinkIdentity(sinkIdentity)); + return; + default: + throw new IllegalArgumentException( + String.format( + "Sink %s declares unsupported partition strategy mode: %s", + normalizeSinkIdentity(sinkIdentity), partitionStrategy.getMode())); + } + } + + private static void validateHashByFields( + SinkPartitionStrategy partitionStrategy, + SeaTunnelRowType rowType, + String sinkIdentity) { + List partitionFields = partitionStrategy.getPartitionFields(); + if (partitionFields.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "Sink %s declares HASH_BY_FIELDS partition strategy, but partition fields are empty.", + sinkIdentity)); + } + + List missingFields = new ArrayList<>(); + for (String partitionField : partitionFields) { + if (rowType.indexOf(partitionField, false) < 0) { + missingFields.add(partitionField); + } + } + + if (!missingFields.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "Sink %s declares HASH_BY_FIELDS partition fields %s, but input row type is missing fields %s. Available fields: %s", + sinkIdentity, + partitionFields, + missingFields, + Arrays.toString(rowType.getFieldNames()))); + } + } + + private static String normalizeSinkIdentity(String sinkIdentity) { + if (sinkIdentity == null || sinkIdentity.trim().isEmpty()) { + return "unknown sink"; + } + return sinkIdentity; + } +} diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyTest.java new file mode 100644 index 000000000000..fefcca9b762b --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class SinkPartitionStrategyTest { + + @Test + public void defaultSinkPartitionStrategyIsEmpty() { + Assertions.assertFalse(new DefaultRoutingSink().getPartitionStrategy().isPresent()); + } + + @Test + public void noneStrategyKeepsEmptyRoutingFields() { + SinkPartitionStrategy strategy = SinkPartitionStrategy.none(); + + Assertions.assertEquals(SinkPartitionStrategy.Mode.NONE, strategy.getMode()); + Assertions.assertTrue(strategy.getPartitionFields().isEmpty()); + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> strategy.getPartitionFields().add("document_id")); + } + + @Test + public void hashByFieldsStrategyKeepsImmutableRoutingFields() { + List fields = new ArrayList<>(); + fields.add("document_id"); + + SinkPartitionStrategy strategy = SinkPartitionStrategy.hashByFields(fields); + fields.add("chunk_id"); + + Assertions.assertEquals(SinkPartitionStrategy.Mode.HASH_BY_FIELDS, strategy.getMode()); + Assertions.assertEquals( + Collections.singletonList("document_id"), strategy.getPartitionFields()); + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> strategy.getPartitionFields().add("chunk_id")); + } + + @Test + public void sinkCanDeclareHashByFieldStrategy() { + Optional strategy = new DocumentRoutingSink().getPartitionStrategy(); + + Assertions.assertTrue(strategy.isPresent()); + Assertions.assertEquals( + SinkPartitionStrategy.Mode.HASH_BY_FIELDS, strategy.get().getMode()); + Assertions.assertEquals( + Collections.singletonList("document_id"), strategy.get().getPartitionFields()); + } + + private static class DefaultRoutingSink + implements SeaTunnelSink { + + @Override + public SinkWriter createWriter(SinkWriter.Context context) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public String getPluginName() { + return "DefaultRoutingSink"; + } + } + + private static class DocumentRoutingSink extends DefaultRoutingSink { + + @Override + public Optional getPartitionStrategy() { + return Optional.of( + SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id"))); + } + + @Override + public String getPluginName() { + return "DocumentRoutingSink"; + } + } +} diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidatorTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidatorTest.java new file mode 100644 index 000000000000..aaa84fdb6d9d --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/SinkPartitionStrategyValidatorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +public class SinkPartitionStrategyValidatorTest { + + @Test + void validateAcceptsEmptyStrategy() { + SinkPartitionStrategyValidator.validate(Optional.empty(), rowType(), "QdrantSink"); + } + + @Test + void validateAcceptsHashFieldsDeclaredInRowType() { + SinkPartitionStrategyValidator.validate( + Optional.of(SinkPartitionStrategy.hashByFields(Arrays.asList("document_id"))), + rowType(), + "QdrantSink"); + } + + @Test + void validateRejectsEmptyHashFields() { + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + SinkPartitionStrategyValidator.validate( + Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.emptyList())), + rowType(), + "QdrantSink")); + + Assertions.assertTrue(exception.getMessage().contains("QdrantSink")); + Assertions.assertTrue(exception.getMessage().contains("HASH_BY_FIELDS")); + Assertions.assertTrue(exception.getMessage().contains("partition fields")); + } + + @Test + void validateRejectsMissingHashFieldsWithSinkIdentity() { + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + SinkPartitionStrategyValidator.validate( + Optional.of( + SinkPartitionStrategy.hashByFields( + Arrays.asList("document_id", "tenant_id"))), + rowType(), + "QdrantSink")); + + Assertions.assertTrue(exception.getMessage().contains("QdrantSink")); + Assertions.assertTrue(exception.getMessage().contains("tenant_id")); + } + + private SeaTunnelRowType rowType() { + return new SeaTunnelRowType( + new String[] {"document_id", "chunk_id", "text"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + }); + } +} diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowOptionsTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowOptionsTest.java new file mode 100644 index 000000000000..00dc0285a8d0 --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowOptionsTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SeaTunnelRowOptionsTest { + + @Test + void copyPreservesIndependentOptions() { + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"doc-1", "chunk-1"}); + row.getOptions().put("partition", "doc-1"); + row.getOptions().put("source", "markdown"); + + SeaTunnelRow copied = row.copy(); + + Assertions.assertEquals(row.getOptions(), copied.getOptions()); + Assertions.assertNotSame(row.getOptions(), copied.getOptions()); + + row.getOptions().put("partition", "doc-2"); + Assertions.assertEquals("doc-1", copied.getOptions().get("partition")); + } + + @Test + void projectedCopyPreservesIndependentOptions() { + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"doc-1", "chunk-1", "text"}); + row.getOptions().put("partition", "doc-1"); + row.getOptions().put("source", "markdown"); + + SeaTunnelRow copied = row.copy(new int[] {0, 2}); + + Assertions.assertEquals(row.getOptions(), copied.getOptions()); + Assertions.assertNotSame(row.getOptions(), copied.getOptions()); + Assertions.assertEquals(2, copied.getArity()); + Assertions.assertEquals("doc-1", copied.getField(0)); + Assertions.assertEquals("text", copied.getField(1)); + + row.getOptions().put("partition", "doc-2"); + Assertions.assertEquals("doc-1", copied.getOptions().get("partition")); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 42a6c5afeb44..deed3c7c75a5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -21,6 +21,10 @@ import org.apache.seatunnel.api.options.EnvCommonOptions; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.sink.SinkPartitionStrategyValidator; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.utils.IdGenerator; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; @@ -390,20 +394,27 @@ private List getSourceTask( if (sourceWithSink(flow)) { flows.addAll(splitSinkFromFlow(flow)); } + List taskGroupLocations = new ArrayList<>(); for (int i = 0; i < flow.getAction().getParallelism(); i++) { - long taskGroupId = taskGroupIdGenerator.getNextId(); - int finalParallelismIndex = i; - TaskGroupLocation taskGroupLocation = + taskGroupLocations.add( new TaskGroupLocation( jobImmutableInformation.getJobId(), pipelineIndex, - taskGroupId); + taskGroupIdGenerator.getNextId())); + } + Map, List> + sinkPartitionTargetTasks = + getSinkPartitionTargetTasks(flows, taskGroupLocations); + for (int i = 0; i < flow.getAction().getParallelism(); i++) { + int finalParallelismIndex = i; + TaskGroupLocation taskGroupLocation = taskGroupLocations.get(i); AtomicInteger taskInTaskGroupIndex = new AtomicInteger(0); List taskList = flows.stream() .map( f -> { - setFlowConfig(f); + setFlowConfig( + f, sinkPartitionTargetTasks); final TaskLocation taskLocation = new TaskLocation( taskGroupLocation, @@ -529,7 +540,8 @@ private void fillCheckpointPlan(SeaTunnelTask task) { * @param f flow */ @SuppressWarnings("unchecked") - private void setFlowConfig(Flow f) { + private void setFlowConfig( + Flow f, Map, List> sinkPartitionTargetTasks) { if (f instanceof PhysicalExecutionFlow) { PhysicalExecutionFlow flow = (PhysicalExecutionFlow) f; @@ -540,10 +552,12 @@ private void setFlowConfig(Flow f) { flow.setConfig(config); } else if (flow.getAction() instanceof SinkAction) { SinkConfig config = new SinkConfig(); - if (committerTaskIDMap.containsKey((SinkAction) flow.getAction())) { + SinkAction sinkAction = (SinkAction) flow.getAction(); + configureSinkPartitionStrategy( + config, sinkAction, sinkPartitionTargetTasks.get(sinkAction)); + if (committerTaskIDMap.containsKey(sinkAction)) { config.setContainCommitter(true); - config.setCommitterTask( - committerTaskIDMap.get((SinkAction) flow.getAction())); + config.setCommitterTask(committerTaskIDMap.get(sinkAction)); } flow.setConfig(config); } @@ -557,8 +571,102 @@ private void setFlowConfig(Flow f) { } if (!f.getNext().isEmpty()) { - f.getNext().forEach(this::setFlowConfig); + f.getNext().forEach(next -> setFlowConfig(next, sinkPartitionTargetTasks)); + } + } + + private static void configureSinkPartitionStrategy( + SinkConfig config, + SinkAction sinkAction, + List partitionTargetTasks) { + Optional partitionStrategy = + sinkAction.getSink().getPartitionStrategy(); + if (!partitionStrategy.isPresent()) { + return; + } + + config.setPartitionStrategy(partitionStrategy.get()); + if (partitionStrategy.get().getMode() == SinkPartitionStrategy.Mode.NONE) { + return; + } + + SeaTunnelRowType rowType = getSinkInputRowType(sinkAction); + SinkPartitionStrategyValidator.validate(partitionStrategy, rowType, sinkAction.getName()); + config.setPartitionRowType(rowType); + if (partitionTargetTasks == null || partitionTargetTasks.isEmpty()) { + throw new IllegalStateException( + String.format( + "Sink %s declares a partition strategy but no Zeta sink writer " + + "target tasks were planned.", + sinkAction.getName())); + } + config.setPartitionTargetTasks(partitionTargetTasks); + } + + private static Map, List> getSinkPartitionTargetTasks( + List flows, List taskGroupLocations) { + Map, List> result = new HashMap<>(); + for (int taskInGroupIndex = 0; taskInGroupIndex < flows.size(); taskInGroupIndex++) { + List> sinkActions = + collectSinkActions(flows.get(taskInGroupIndex)); + for (SinkAction sinkAction : sinkActions) { + if (!hasNonEmptyPartitionStrategy(sinkAction)) { + continue; + } + List targetTasks = new ArrayList<>(); + for (int parallelismIndex = 0; + parallelismIndex < taskGroupLocations.size(); + parallelismIndex++) { + targetTasks.add( + new TaskLocation( + taskGroupLocations.get(parallelismIndex), + taskInGroupIndex, + parallelismIndex)); + } + result.put(sinkAction, targetTasks); + } + } + return result; + } + + private static List> collectSinkActions(Flow flow) { + List> sinkActions = new ArrayList<>(); + if (flow instanceof PhysicalExecutionFlow + && ((PhysicalExecutionFlow) flow).getAction() instanceof SinkAction) { + sinkActions.add( + (SinkAction) ((PhysicalExecutionFlow) flow).getAction()); + } + flow.getNext().forEach(next -> sinkActions.addAll(collectSinkActions(next))); + return sinkActions; + } + + private static boolean hasNonEmptyPartitionStrategy(SinkAction sinkAction) { + Optional partitionStrategy = + sinkAction.getSink().getPartitionStrategy(); + return partitionStrategy.isPresent() + && partitionStrategy.get().getMode() != SinkPartitionStrategy.Mode.NONE; + } + + private static SeaTunnelRowType getSinkInputRowType(SinkAction sinkAction) { + SeaTunnelDataType consumedType; + try { + consumedType = sinkAction.getSink().getConsumedType(); + } catch (UnsupportedOperationException e) { + throw new IllegalStateException( + String.format( + "Sink %s declares a partition strategy but does not expose a " + + "SeaTunnelRowType from getConsumedType().", + sinkAction.getName()), + e); + } + if (consumedType instanceof SeaTunnelRowType) { + return (SeaTunnelRowType) consumedType; } + throw new IllegalStateException( + String.format( + "Sink %s declares a partition strategy but consumed type is %s, " + + "expected SeaTunnelRowType.", + sinkAction.getName(), consumedType)); } /** diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/SinkConfig.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/SinkConfig.java index 7dd060deea07..9bb1db295fe1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/SinkConfig.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/SinkConfig.java @@ -17,12 +17,21 @@ package org.apache.seatunnel.engine.server.dag.physical.config; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.engine.server.execution.TaskLocation; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + public class SinkConfig implements FlowConfig { private TaskLocation committerTask; private boolean containCommitter; + private SinkPartitionStrategy partitionStrategy; + private SeaTunnelRowType partitionRowType; + private List partitionTargetTasks = Collections.emptyList(); public TaskLocation getCommitterTask() { return committerTask; @@ -39,4 +48,31 @@ public boolean isContainCommitter() { public void setContainCommitter(boolean containCommitter) { this.containCommitter = containCommitter; } + + public SinkPartitionStrategy getPartitionStrategy() { + return partitionStrategy; + } + + public void setPartitionStrategy(SinkPartitionStrategy partitionStrategy) { + this.partitionStrategy = partitionStrategy; + } + + public SeaTunnelRowType getPartitionRowType() { + return partitionRowType; + } + + public void setPartitionRowType(SeaTunnelRowType partitionRowType) { + this.partitionRowType = partitionRowType; + } + + public List getPartitionTargetTasks() { + return partitionTargetTasks; + } + + public void setPartitionTargetTasks(List partitionTargetTasks) { + this.partitionTargetTasks = + partitionTargetTasks == null + ? Collections.emptyList() + : Collections.unmodifiableList(new ArrayList<>(partitionTargetTasks)); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java index da2e18510891..3c9ad1b78860 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java @@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMemberNodeOperation; import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation; import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation; +import org.apache.seatunnel.engine.server.task.operation.sink.SinkPartitionExchangeOperation; import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation; import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; @@ -110,6 +111,8 @@ public class TaskDataSerializerHook implements DataSerializerHook { public static final int REPORT_METRICS_OPERATION = 28; + public static final int SINK_PARTITION_EXCHANGE_OPERATION = 29; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY, @@ -186,6 +189,8 @@ public IdentifiedDataSerializable create(int typeId) { return new CleanLogOperation(); case REPORT_METRICS_OPERATION: return new ReportMetricsOperation(); + case SINK_PARTITION_EXCHANGE_OPERATION: + return new SinkPartitionExchangeOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index 3b5c24fe1524..6806b7ba802d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -52,6 +52,7 @@ import org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle; +import org.apache.seatunnel.engine.server.task.flow.SinkPartitionExchangeEnvelope; import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle; import org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue; @@ -296,14 +297,16 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex this.getMetricsContext()); outputs = flowLifeCycles; } else if (f.getAction() instanceof SinkAction) { + SinkConfig sinkConfig = (SinkConfig) f.getConfig(); lifeCycle = new SinkFlowLifeCycle<>( (SinkAction) f.getAction(), taskLocation, indexID, this, - ((SinkConfig) f.getConfig()).getCommitterTask(), - ((SinkConfig) f.getConfig()).isContainCommitter(), + sinkConfig, + sinkConfig.getCommitterTask(), + sinkConfig.isContainCommitter(), completableFuture, this.getMetricsContext()); } else if (f.getAction() instanceof TransformChainAction) { @@ -339,6 +342,21 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex CompletableFuture completableFuture, MetricsContext metricsContext); + public void receivedSinkPartitionExchange(SinkPartitionExchangeEnvelope envelope) + throws Exception { + for (FlowLifeCycle cycle : allCycles) { + if (cycle instanceof SinkFlowLifeCycle) { + ((SinkFlowLifeCycle) cycle).receivedSinkPartitionExchange(envelope); + return; + } + } + throw new IllegalStateException( + String.format( + "Task %s received a sink partition exchange envelope but has no sink " + + "flow lifecycle.", + taskLocation)); + } + protected abstract void collect() throws Exception; @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/HazelcastSinkPartitionExchangeDispatcher.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/HazelcastSinkPartitionExchangeDispatcher.java new file mode 100644 index 000000000000..24b3da963554 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/HazelcastSinkPartitionExchangeDispatcher.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.SeaTunnelTask; +import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation; +import org.apache.seatunnel.engine.server.task.operation.sink.SinkPartitionExchangeOperation; + +import com.hazelcast.cluster.Address; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +/** Sends sink partition exchange batches through Hazelcast task operations. */ +final class HazelcastSinkPartitionExchangeDispatcher implements SinkPartitionExchangeDispatcher { + + private final SeaTunnelTask runningTask; + private final List targetTaskLocations; + private final Map targetTaskAddresses = new HashMap<>(); + + HazelcastSinkPartitionExchangeDispatcher( + SeaTunnelTask runningTask, List targetTaskLocations) { + this.runningTask = Objects.requireNonNull(runningTask, "runningTask must not be null"); + this.targetTaskLocations = + Objects.requireNonNull(targetTaskLocations, "targetTaskLocations must not be null"); + } + + @Override + public void open() throws Exception { + for (TaskLocation targetTaskLocation : targetTaskLocations) { + if (!targetTaskLocation.equals(runningTask.getTaskLocation())) { + targetTaskAddresses.put(targetTaskLocation, getTaskAddress(targetTaskLocation)); + } + } + } + + @Override + public void dispatch(SinkPartitionExchangeEnvelope envelope) throws Exception { + TaskLocation targetTaskLocation = envelope.getTargetTaskLocation(); + if (targetTaskLocation.equals(runningTask.getTaskLocation())) { + runningTask.receivedSinkPartitionExchange(envelope); + return; + } + Address targetAddress = targetTaskAddresses.get(targetTaskLocation); + if (targetAddress == null) { + throw new IllegalStateException( + String.format( + "No task address was resolved for sink partition target %s", + targetTaskLocation)); + } + runningTask + .getExecutionContext() + .sendToMember(new SinkPartitionExchangeOperation(envelope), targetAddress) + .get(); + } + + private Address getTaskAddress(TaskLocation taskLocation) + throws ExecutionException, InterruptedException { + return (Address) + runningTask + .getExecutionContext() + .sendToMaster(new GetTaskGroupAddressOperation(taskLocation)) + .get(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 86e1615b456e..6779011c4640 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.event.StainTraceEvent; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SinkWriter.Context; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; @@ -39,6 +40,7 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState; +import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig; import org.apache.seatunnel.engine.server.event.JobEventListener; import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext; @@ -109,6 +111,10 @@ public class SinkFlowLifeCycle tablesMaps = new HashMap<>(); + private final SinkConfig sinkConfig; + + private SinkPartitionExchange partitionExchange; + private final Counter stainTraceEventsReportedTotal; private final Counter stainTraceInvalidPayloadTotal; private volatile Counter stainTraceEntriesTruncatedTotal; @@ -119,6 +125,7 @@ public SinkFlowLifeCycle( TaskLocation taskLocation, int indexID, SeaTunnelTask runningTask, + SinkConfig sinkConfig, TaskLocation committerTaskLocation, boolean containAggCommitter, CompletableFuture completableFuture, @@ -127,6 +134,7 @@ public SinkFlowLifeCycle( this.sinkAction = sinkAction; this.indexID = indexID; this.taskLocation = taskLocation; + this.sinkConfig = sinkConfig; this.committerTaskLocation = committerTaskLocation; this.containAggCommitter = containAggCommitter; this.metricsContext = metricsContext; @@ -159,6 +167,44 @@ public void init() throws Exception { this.writerStateSerializer = sinkAction.getSink().getWriterStateSerializer(); this.committer = sinkAction.getSink().createCommitter(); this.lastCommitInfo = Optional.empty(); + initPartitionExchange(); + } + + private void initPartitionExchange() { + if (sinkConfig == null || sinkConfig.getPartitionStrategy() == null) { + return; + } + SinkPartitionStrategy partitionStrategy = sinkConfig.getPartitionStrategy(); + if (partitionStrategy.getMode() == SinkPartitionStrategy.Mode.NONE) { + return; + } + if (sinkConfig.getPartitionRowType() == null) { + throw new IllegalStateException( + String.format( + "Sink %s declares a partition strategy but no row type was planned.", + sinkAction.getName())); + } + if (sinkConfig.getPartitionTargetTasks().isEmpty()) { + throw new IllegalStateException( + String.format( + "Sink %s declares a partition strategy but no target writer tasks " + + "were planned.", + sinkAction.getName())); + } + SinkPartitionRouter router = + SinkPartitionRouter.create( + Optional.of(partitionStrategy), + sinkConfig.getPartitionRowType(), + sinkConfig.getPartitionTargetTasks().size(), + sinkAction.getName()); + partitionExchange = + new SinkPartitionExchange( + taskLocation, + sinkConfig.getPartitionTargetTasks(), + router, + new HazelcastSinkPartitionExchangeDispatcher( + runningTask, sinkConfig.getPartitionTargetTasks()), + this::handleRecord); } @Override @@ -167,6 +213,9 @@ public void open() throws Exception { if (containAggCommitter) { committerTaskAddress = getCommitterTaskAddress(); } + if (partitionExchange != null) { + partitionExchange.open(); + } registerCommitter(); } @@ -180,6 +229,13 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted @Override public void close() throws IOException { + if (partitionExchange != null) { + try { + partitionExchange.close(); + } catch (Exception e) { + throw new IOException(e); + } + } super.close(); writer.close(); writerContext.getEventListener().onEvent(new WriterCloseEvent()); @@ -200,141 +256,161 @@ private void registerCommitter() { @Override public void received(Record record) { try { - if (record.getData() instanceof Barrier) { - long startTime = System.currentTimeMillis(); + if (partitionExchange != null) { + partitionExchange.received(record); + } else { + handleRecord(record); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void receivedSinkPartitionExchange(SinkPartitionExchangeEnvelope envelope) { + if (partitionExchange == null) { + throw new IllegalStateException( + "Received sink partition exchange envelope but routing is not enabled for " + + taskLocation); + } + try { + partitionExchange.receive(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void handleRecord(Record record) throws Exception { + if (record.getData() instanceof Barrier) { + long startTime = System.currentTimeMillis(); - Barrier barrier = (Barrier) record.getData(); - connectorMetricsCalcContext.sealCheckpointMetrics(barrier.getId()); - if (barrier.prepareClose(this.taskLocation)) { - prepareClose = true; + Barrier barrier = (Barrier) record.getData(); + connectorMetricsCalcContext.sealCheckpointMetrics(barrier.getId()); + if (barrier.prepareClose(this.taskLocation)) { + prepareClose = true; + } + if (barrier.snapshot()) { + try { + lastCommitInfo = writer.prepareCommit(barrier.getId()); + } catch (Exception e) { + writer.abortPrepare(); + throw e; } - if (barrier.snapshot()) { - try { - lastCommitInfo = writer.prepareCommit(barrier.getId()); - } catch (Exception e) { - writer.abortPrepare(); - throw e; - } - List states = writer.snapshotState(barrier.getId()); - if (!writerStateSerializer.isPresent()) { - runningTask.addState( - barrier, ActionStateKey.of(sinkAction), Collections.emptyList()); - } else { - runningTask.addState( - barrier, - ActionStateKey.of(sinkAction), - serializeStates(writerStateSerializer.get(), states)); - } - if (containAggCommitter) { - CommitInfoT commitInfoT = null; - if (lastCommitInfo.isPresent()) { - commitInfoT = lastCommitInfo.get(); - } - runningTask - .getExecutionContext() - .sendToMember( - new SinkPrepareCommitOperation( - barrier, - committerTaskLocation, - commitInfoSerializer.isPresent() - ? commitInfoSerializer - .get() - .serialize(commitInfoT) - : null), - committerTaskAddress) - .join(); - } + List states = writer.snapshotState(barrier.getId()); + if (!writerStateSerializer.isPresent()) { + runningTask.addState( + barrier, ActionStateKey.of(sinkAction), Collections.emptyList()); } else { - if (containAggCommitter) { - runningTask - .getExecutionContext() - .sendToMember( - new BarrierFlowOperation(barrier, committerTaskLocation), - committerTaskAddress) - .join(); - } - } - runningTask.ack(barrier); - - log.debug( - "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", - barrier.getId(), - System.currentTimeMillis() - startTime, - taskLocation); - } else if (record.getData() instanceof SchemaChangeEvent) { - if (prepareClose) { - return; + runningTask.addState( + barrier, + ActionStateKey.of(sinkAction), + serializeStates(writerStateSerializer.get(), states)); } - SchemaChangeEvent event = (SchemaChangeEvent) record.getData(); - if (writer instanceof SupportSchemaEvolutionSinkWriter) { - ((SupportSchemaEvolutionSinkWriter) writer).applySchemaChange(event); - } else { - // todo remove deprecated method - writer.applySchemaChange(event); + if (containAggCommitter) { + CommitInfoT commitInfoT = null; + if (lastCommitInfo.isPresent()) { + commitInfoT = lastCommitInfo.get(); + } + runningTask + .getExecutionContext() + .sendToMember( + new SinkPrepareCommitOperation( + barrier, + committerTaskLocation, + commitInfoSerializer.isPresent() + ? commitInfoSerializer + .get() + .serialize(commitInfoT) + : null), + committerTaskAddress) + .join(); } } else { - if (prepareClose) { - return; + if (containAggCommitter) { + runningTask + .getExecutionContext() + .sendToMember( + new BarrierFlowOperation(barrier, committerTaskLocation), + committerTaskAddress) + .join(); } - String tableId; - writer.write((T) record.getData()); - if (record.getData() instanceof SeaTunnelRow) { - SeaTunnelRow row = (SeaTunnelRow) record.getData(); - if (this.sinkAction.getSink() instanceof MultiTableSink) { - if (row.getTableId() == null || row.getTableId().isEmpty()) { - tableId = row.getTableId(); - } else { - - TablePath tablePath = tablesMaps.get(TablePath.of(row.getTableId())); - tableId = - tablePath != null - ? tablePath.getFullName() - : TablePath.DEFAULT.getFullName(); - } - + } + runningTask.ack(barrier); + + log.debug( + "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", + barrier.getId(), + System.currentTimeMillis() - startTime, + taskLocation); + } else if (record.getData() instanceof SchemaChangeEvent) { + if (prepareClose) { + return; + } + SchemaChangeEvent event = (SchemaChangeEvent) record.getData(); + if (writer instanceof SupportSchemaEvolutionSinkWriter) { + ((SupportSchemaEvolutionSinkWriter) writer).applySchemaChange(event); + } else { + // todo remove deprecated method + writer.applySchemaChange(event); + } + } else { + if (prepareClose) { + return; + } + String tableId; + writer.write((T) record.getData()); + if (record.getData() instanceof SeaTunnelRow) { + SeaTunnelRow row = (SeaTunnelRow) record.getData(); + if (this.sinkAction.getSink() instanceof MultiTableSink) { + if (row.getTableId() == null || row.getTableId().isEmpty()) { + tableId = row.getTableId(); } else { - Optional writeCatalogTable = - this.sinkAction.getSink().getWriteCatalogTable(); + + TablePath tablePath = tablesMaps.get(TablePath.of(row.getTableId())); tableId = - writeCatalogTable - .map( - catalogTable -> - catalogTable.getTablePath().getFullName()) - .orElseGet(TablePath.DEFAULT::getFullName); + tablePath != null + ? tablePath.getFullName() + : TablePath.DEFAULT.getFullName(); } - connectorMetricsCalcContext.updateMetrics(record.getData(), tableId); - - if (StainTraceUtils.hasPayload(row)) { - long nowMs = System.currentTimeMillis(); - StainTraceUtils.appendIfPresent( - row, - StainTraceStage.SINK_WRITE_DONE, - runningTask.getTaskID(), - nowMs, - getStainTraceMaxEntriesPerTrace(), - getStainTraceEntriesTruncatedTotal()); - byte[] payload = StainTraceUtils.getPayloadOrNull(row); - if (payload != null) { - try { - long traceId = StainTracePayload.readTraceId(payload); - eventListener.onEvent( - new StainTraceEvent( - traceId, - payload, - taskLocation.getTaskID(), - tableId)); - stainTraceEventsReportedTotal.inc(); - } catch (Exception e) { - stainTraceInvalidPayloadTotal.inc(); - log.debug("Failed to report stain trace event", e); - } - } - } + } else { + Optional writeCatalogTable = + this.sinkAction.getSink().getWriteCatalogTable(); + tableId = + writeCatalogTable + .map(catalogTable -> catalogTable.getTablePath().getFullName()) + .orElseGet(TablePath.DEFAULT::getFullName); } + + connectorMetricsCalcContext.updateMetrics(record.getData(), tableId); + reportStainTraceIfPresent(row, tableId); } + } + } + + private void reportStainTraceIfPresent(SeaTunnelRow row, String tableId) { + if (!StainTraceUtils.hasPayload(row)) { + return; + } + long nowMs = System.currentTimeMillis(); + StainTraceUtils.appendIfPresent( + row, + StainTraceStage.SINK_WRITE_DONE, + runningTask.getTaskID(), + nowMs, + getStainTraceMaxEntriesPerTrace(), + getStainTraceEntriesTruncatedTotal()); + byte[] payload = StainTraceUtils.getPayloadOrNull(row); + if (payload == null) { + return; + } + try { + long traceId = StainTracePayload.readTraceId(payload); + eventListener.onEvent( + new StainTraceEvent(traceId, payload, taskLocation.getTaskID(), tableId)); + stainTraceEventsReportedTotal.inc(); } catch (Exception e) { - throw new RuntimeException(e); + stainTraceInvalidPayloadTotal.inc(); + log.debug("Failed to report stain trace event", e); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchange.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchange.java new file mode 100644 index 000000000000..66f7577c161c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchange.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.common.utils.function.ConsumerWithException; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.record.Barrier; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** Batches and aligns Zeta sink partition records before they reach the sink writer. */ +final class SinkPartitionExchange { + + static final int DEFAULT_BATCH_SIZE = 64; + + private final TaskLocation localTaskLocation; + private final List targetTaskLocations; + private final SinkPartitionRouter router; + private final SinkPartitionExchangeDispatcher dispatcher; + private final ConsumerWithException> localConsumer; + private final int batchSize; + private final Object lock = new Object(); + private final Map>> pendingRecords = new HashMap<>(); + private final Map nextSequences = new HashMap<>(); + private final Map> receivedBarrierSources = new HashMap<>(); + + SinkPartitionExchange( + TaskLocation localTaskLocation, + List targetTaskLocations, + SinkPartitionRouter router, + SinkPartitionExchangeDispatcher dispatcher, + ConsumerWithException> localConsumer) { + this( + localTaskLocation, + targetTaskLocations, + router, + dispatcher, + localConsumer, + DEFAULT_BATCH_SIZE); + } + + SinkPartitionExchange( + TaskLocation localTaskLocation, + List targetTaskLocations, + SinkPartitionRouter router, + SinkPartitionExchangeDispatcher dispatcher, + ConsumerWithException> localConsumer, + int batchSize) { + this.localTaskLocation = + Objects.requireNonNull(localTaskLocation, "localTaskLocation must not be null"); + this.targetTaskLocations = + Collections.unmodifiableList( + new ArrayList<>( + Objects.requireNonNull( + targetTaskLocations, + "targetTaskLocations must not be null"))); + this.router = Objects.requireNonNull(router, "router must not be null"); + this.dispatcher = Objects.requireNonNull(dispatcher, "dispatcher must not be null"); + this.localConsumer = + Objects.requireNonNull(localConsumer, "localConsumer must not be null"); + if (this.targetTaskLocations.isEmpty()) { + throw new IllegalArgumentException("targetTaskLocations must not be empty"); + } + if (!this.targetTaskLocations.contains(localTaskLocation)) { + throw new IllegalArgumentException( + "targetTaskLocations must contain the local sink writer task"); + } + if (batchSize <= 0) { + throw new IllegalArgumentException("batchSize must be greater than zero"); + } + this.batchSize = batchSize; + } + + void open() throws Exception { + dispatcher.open(); + } + + void received(Record record) throws Exception { + Objects.requireNonNull(record, "record must not be null"); + Object data = record.getData(); + if (data instanceof Barrier) { + dispatchAll(drainAllBatches()); + dispatchAll(createBroadcastEnvelopes(record)); + return; + } + if (data instanceof SchemaChangeEvent) { + dispatchAll(drainAllBatches()); + dispatchAll(createBroadcastEnvelopes(record)); + return; + } + + Optional target = router.route(record); + if (!target.isPresent()) { + dispatchAll(drainAllBatches()); + consumeLocally(record); + return; + } + + SinkPartitionExchangeEnvelope batch = addDataRecord(target.get(), record); + if (batch != null) { + dispatcher.dispatch(batch); + } + } + + void receive(SinkPartitionExchangeEnvelope envelope) throws Exception { + Objects.requireNonNull(envelope, "envelope must not be null"); + if (!localTaskLocation.equals(envelope.getTargetTaskLocation())) { + throw new IOException( + String.format( + "Sink partition exchange envelope target %s does not match local task %s", + envelope.getTargetTaskLocation(), localTaskLocation)); + } + synchronized (lock) { + for (Record record : envelope.getRecords()) { + Object data = record.getData(); + if (data instanceof Barrier) { + if (alignBarrier(envelope.getSourceTaskLocation(), (Barrier) data)) { + localConsumer.accept(record); + } + } else { + localConsumer.accept(record); + } + } + } + } + + void flush() throws Exception { + dispatchAll(drainAllBatches()); + } + + void close() throws Exception { + flush(); + } + + private SinkPartitionExchangeEnvelope addDataRecord(int targetIndex, Record record) { + synchronized (lock) { + TaskLocation targetTaskLocation = targetTaskLocations.get(targetIndex); + List> records = + pendingRecords.computeIfAbsent( + targetTaskLocation, ignored -> new ArrayList<>()); + records.add(record); + if (records.size() < batchSize) { + return null; + } + return drainBatch(targetTaskLocation); + } + } + + private List drainAllBatches() { + synchronized (lock) { + List envelopes = new ArrayList<>(); + for (TaskLocation targetTaskLocation : targetTaskLocations) { + SinkPartitionExchangeEnvelope envelope = drainBatch(targetTaskLocation); + if (envelope != null) { + envelopes.add(envelope); + } + } + return envelopes; + } + } + + private SinkPartitionExchangeEnvelope drainBatch(TaskLocation targetTaskLocation) { + List> records = pendingRecords.remove(targetTaskLocation); + if (records == null || records.isEmpty()) { + return null; + } + return createEnvelope(targetTaskLocation, records); + } + + private List createBroadcastEnvelopes(Record record) { + List envelopes = new ArrayList<>(); + for (TaskLocation targetTaskLocation : targetTaskLocations) { + envelopes.add(createEnvelope(targetTaskLocation, Collections.singletonList(record))); + } + return envelopes; + } + + private SinkPartitionExchangeEnvelope createEnvelope( + TaskLocation targetTaskLocation, List> records) { + return new SinkPartitionExchangeEnvelope( + localTaskLocation, targetTaskLocation, nextSequence(targetTaskLocation), records); + } + + private long nextSequence(TaskLocation targetTaskLocation) { + Long next = nextSequences.get(targetTaskLocation); + if (next == null) { + next = 0L; + } + nextSequences.put(targetTaskLocation, next + 1); + return next; + } + + private boolean alignBarrier(TaskLocation sourceTaskLocation, Barrier barrier) { + Set sources = + receivedBarrierSources.computeIfAbsent(barrier.getId(), ignored -> new HashSet<>()); + sources.add(sourceTaskLocation); + if (sources.containsAll(targetTaskLocations)) { + receivedBarrierSources.remove(barrier.getId()); + return true; + } + return false; + } + + private void consumeLocally(Record record) throws Exception { + synchronized (lock) { + localConsumer.accept(record); + } + } + + private void dispatchAll(List envelopes) throws Exception { + for (SinkPartitionExchangeEnvelope envelope : envelopes) { + dispatcher.dispatch(envelope); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeDispatcher.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeDispatcher.java new file mode 100644 index 000000000000..e358f7876efb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeDispatcher.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +/** Dispatches sink partition exchange envelopes to their target writer task. */ +interface SinkPartitionExchangeDispatcher { + + default void open() throws Exception {} + + void dispatch(SinkPartitionExchangeEnvelope envelope) throws Exception; +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeEnvelope.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeEnvelope.java new file mode 100644 index 000000000000..1659e4ffbc6b --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeEnvelope.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.engine.server.execution.TaskLocation; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** Internal metadata envelope for Zeta sink partition exchange batches. */ +public final class SinkPartitionExchangeEnvelope implements Serializable { + + private final TaskLocation sourceTaskLocation; + private final TaskLocation targetTaskLocation; + private final long sequence; + private final List> records; + + public SinkPartitionExchangeEnvelope( + TaskLocation sourceTaskLocation, + TaskLocation targetTaskLocation, + long sequence, + List> records) { + this.sourceTaskLocation = + Objects.requireNonNull(sourceTaskLocation, "sourceTaskLocation must not be null"); + this.targetTaskLocation = + Objects.requireNonNull(targetTaskLocation, "targetTaskLocation must not be null"); + this.sequence = sequence; + this.records = + Collections.unmodifiableList( + new ArrayList<>( + Objects.requireNonNull(records, "records must not be null"))); + } + + public TaskLocation getSourceTaskLocation() { + return sourceTaskLocation; + } + + public TaskLocation getTargetTaskLocation() { + return targetTaskLocation; + } + + public long getSequence() { + return sequence; + } + + public List> getRecords() { + return records; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouter.java new file mode 100644 index 000000000000..8b8957dedee5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouter.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.sink.SinkPartitionStrategyValidator; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +/** Calculates deterministic sink-writer targets for sink-declared field routing. */ +final class SinkPartitionRouter { + + private final int[] partitionFieldIndexes; + private final int parallelism; + + private SinkPartitionRouter(int[] partitionFieldIndexes, int parallelism) { + this.partitionFieldIndexes = partitionFieldIndexes; + this.parallelism = parallelism; + } + + static SinkPartitionRouter create( + Optional partitionStrategy, + SeaTunnelRowType rowType, + int parallelism, + String sinkIdentity) { + Objects.requireNonNull(partitionStrategy, "partitionStrategy must not be null"); + if (!partitionStrategy.isPresent() + || partitionStrategy.get().getMode() == SinkPartitionStrategy.Mode.NONE) { + return new SinkPartitionRouter(new int[0], parallelism); + } + if (parallelism <= 0) { + throw new IllegalArgumentException("sink parallelism must be greater than zero"); + } + + SinkPartitionStrategy strategy = partitionStrategy.get(); + SinkPartitionStrategyValidator.validate(strategy, rowType, sinkIdentity); + int[] fieldIndexes = + strategy.getPartitionFields().stream() + .mapToInt(field -> rowType.indexOf(field, false)) + .toArray(); + return new SinkPartitionRouter(fieldIndexes, parallelism); + } + + Optional route(Record record) { + Objects.requireNonNull(record, "record must not be null"); + if (!hasRouting() || !(record.getData() instanceof SeaTunnelRow)) { + return Optional.empty(); + } + return Optional.of(selectSubtask((SeaTunnelRow) record.getData())); + } + + int selectSubtask(SeaTunnelRow row) { + if (!hasRouting()) { + throw new IllegalStateException("sink partition routing is not enabled"); + } + int hash = 1; + for (int fieldIndex : partitionFieldIndexes) { + hash = 31 * hash + stableHash(row.getField(fieldIndex)); + } + return Math.floorMod(hash, parallelism); + } + + boolean hasRouting() { + return partitionFieldIndexes.length > 0; + } + + private static int stableHash(Object value) { + if (value == null) { + return 0; + } + if (!value.getClass().isArray()) { + return value.hashCode(); + } + if (value instanceof Object[]) { + return Arrays.deepHashCode((Object[]) value); + } + if (value instanceof byte[]) { + return Arrays.hashCode((byte[]) value); + } + if (value instanceof short[]) { + return Arrays.hashCode((short[]) value); + } + if (value instanceof int[]) { + return Arrays.hashCode((int[]) value); + } + if (value instanceof long[]) { + return Arrays.hashCode((long[]) value); + } + if (value instanceof char[]) { + return Arrays.hashCode((char[]) value); + } + if (value instanceof float[]) { + return Arrays.hashCode((float[]) value); + } + if (value instanceof double[]) { + return Arrays.hashCode((double[]) value); + } + if (value instanceof boolean[]) { + return Arrays.hashCode((boolean[]) value); + } + return value.hashCode(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPartitionExchangeOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPartitionExchangeOperation.java new file mode 100644 index 000000000000..be9d0feec091 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPartitionExchangeOperation.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.operation.sink; + +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.TaskExecutionService; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.SeaTunnelTask; +import org.apache.seatunnel.engine.server.task.flow.SinkPartitionExchangeEnvelope; +import org.apache.seatunnel.engine.server.task.operation.TaskOperation; +import org.apache.seatunnel.engine.server.task.record.Barrier; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import lombok.NoArgsConstructor; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Delivers a batched sink partition exchange envelope to a target Zeta sink writer task. */ +@NoArgsConstructor +public class SinkPartitionExchangeOperation extends TaskOperation { + + private static final byte ROW_RECORD = 0; + private static final byte BARRIER_RECORD = 1; + private static final byte SCHEMA_CHANGE_RECORD = 2; + + private SinkPartitionExchangeEnvelope envelope; + + public SinkPartitionExchangeOperation(SinkPartitionExchangeEnvelope envelope) { + super(envelope.getTargetTaskLocation()); + this.envelope = envelope; + } + + @Override + public String getServiceName() { + return SeaTunnelServer.SERVICE_NAME; + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.SINK_PARTITION_EXCHANGE_OPERATION; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeObject(envelope.getSourceTaskLocation()); + out.writeLong(envelope.getSequence()); + writeRecords(out, envelope.getRecords()); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + TaskLocation sourceTaskLocation = in.readObject(); + long sequence = in.readLong(); + List> records = readRecords(in); + envelope = + new SinkPartitionExchangeEnvelope( + sourceTaskLocation, taskLocation, sequence, records); + } + + @Override + public void runInternal() throws Exception { + TaskExecutionService taskExecutionService = + ((SeaTunnelServer) getService()).getTaskExecutionService(); + SeaTunnelTask task = taskExecutionService.getTask(taskLocation); + ClassLoader taskClassLoader = + taskExecutionService + .getExecutionContext(taskLocation.getTaskGroupLocation()) + .getClassLoader(task.getTaskID()); + ClassLoader mainClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(taskClassLoader); + task.receivedSinkPartitionExchange(envelope); + } finally { + Thread.currentThread().setContextClassLoader(mainClassLoader); + } + } + + private static void writeRecords(ObjectDataOutput out, List> records) + throws IOException { + out.writeInt(records.size()); + for (Record record : records) { + Object data = record.getData(); + if (data instanceof SeaTunnelRow) { + out.writeByte(ROW_RECORD); + writeRow(out, (SeaTunnelRow) data); + } else if (data instanceof Barrier) { + out.writeByte(BARRIER_RECORD); + out.writeObject(data); + } else if (data instanceof SchemaChangeEvent) { + out.writeByte(SCHEMA_CHANGE_RECORD); + out.writeObject(data); + } else { + throw new UnsupportedEncodingException( + "Unsupported sink partition exchange record class: " + data.getClass()); + } + } + } + + private static List> readRecords(ObjectDataInput in) throws IOException { + int size = in.readInt(); + List> records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + byte recordType = in.readByte(); + if (recordType == ROW_RECORD) { + records.add(new Record<>(readRow(in))); + } else if (recordType == BARRIER_RECORD) { + records.add(new Record<>(in.readObject())); + } else if (recordType == SCHEMA_CHANGE_RECORD) { + records.add(new Record<>(in.readObject())); + } else { + throw new UnsupportedEncodingException( + "Unsupported sink partition exchange record type: " + recordType); + } + } + return records; + } + + private static void writeRow(ObjectDataOutput out, SeaTunnelRow row) throws IOException { + out.writeString(row.getTableId()); + out.writeByte(row.getRowKind().toByteValue()); + out.writeInt(row.getArity()); + for (Object field : row.getFields()) { + out.writeObject(field); + } + out.writeObject(row.getOptions()); + } + + private static SeaTunnelRow readRow(ObjectDataInput in) throws IOException { + String tableId = in.readString(); + byte rowKind = in.readByte(); + int arity = in.readInt(); + SeaTunnelRow row = new SeaTunnelRow(arity); + row.setTableId(tableId); + row.setRowKind(RowKind.fromByteValue(rowKind)); + for (int i = 0; i < arity; i++) { + row.setField(i, in.readObject()); + } + Map options = in.readObject(); + row.setOptions(options); + return row; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeTest.java new file mode 100644 index 000000000000..6801d3cf570c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.record.Barrier; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class SinkPartitionExchangeTest { + + @Test + public void sameDocumentIdIsDeliveredToOneWriterAcrossSenders() throws Exception { + ExchangeHarness harness = new ExchangeHarness(4, 8); + + harness.exchange(0).received(new Record<>(row("document-a", "chunk-1"))); + harness.exchange(1).received(new Record<>(row("document-a", "chunk-2"))); + harness.exchange(2).received(new Record<>(row("document-a", "chunk-3"))); + harness.flushAll(); + + int writersWithDocument = 0; + int chunkCount = 0; + for (List> records : harness.writerRecords.values()) { + long recordsForDocument = + records.stream() + .filter(record -> record.getData() instanceof SeaTunnelRow) + .filter(record -> "document-a".equals(documentId(record))) + .count(); + if (recordsForDocument > 0) { + writersWithDocument++; + chunkCount += recordsForDocument; + } + } + + Assertions.assertEquals(1, writersWithDocument); + Assertions.assertEquals(3, chunkCount); + } + + @Test + public void differentDocumentIdsCanUseMultipleWriters() throws Exception { + ExchangeHarness harness = new ExchangeHarness(8, 4); + + for (int i = 0; i < 64; i++) { + harness.exchange(0).received(new Record<>(row("document-" + i, "chunk"))); + } + harness.flushAll(); + + long writersWithRows = + harness.writerRecords.values().stream() + .filter(records -> !records.isEmpty()) + .count(); + + Assertions.assertTrue( + writersWithRows > 1, "different document_id values should remain parallelizable"); + } + + @Test + public void barrierIsForwardedOnceAfterAllSendersArrive() throws Exception { + ExchangeHarness harness = new ExchangeHarness(3, 8); + Barrier barrier = new CheckpointBarrier(7L, 1000L, CheckpointType.CHECKPOINT_TYPE); + + harness.exchange(0).received(new Record<>(barrier)); + Assertions.assertEquals(0, harness.totalBarrierCount()); + + harness.exchange(1).received(new Record<>(barrier)); + Assertions.assertEquals(0, harness.totalBarrierCount()); + + harness.exchange(2).received(new Record<>(barrier)); + + for (List> records : harness.writerRecords.values()) { + Assertions.assertEquals(1, barrierCount(records)); + } + } + + @Test + public void nonRowControlsAreNotHashedToOtherWriters() throws Exception { + ExchangeHarness harness = new ExchangeHarness(4, 2); + + harness.exchange(2).received(new Record<>("local-control")); + + Assertions.assertEquals(1, harness.writerRecords.get(harness.location(2)).size()); + Assertions.assertEquals(1, harness.totalRecordCount()); + } + + @Test + public void schemaChangeIsBroadcastBeforeRemoteRoutedRows() throws Exception { + int parallelism = 4; + int sourceIndex = 0; + ExchangeHarness harness = new ExchangeHarness(parallelism, 8); + String documentId = remoteDocumentId(sourceIndex, parallelism); + int targetIndex = targetIndex(documentId, parallelism); + SchemaChangeEvent schemaChange = schemaChange(); + + harness.exchange(sourceIndex).received(new Record<>(schemaChange)); + harness.exchange(sourceIndex).received(new Record<>(row(documentId, "chunk-after-schema"))); + harness.flushAll(); + + for (List> records : harness.writerRecords.values()) { + Assertions.assertEquals(1, schemaChangeCount(records)); + } + + List> targetRecords = harness.writerRecords.get(harness.location(targetIndex)); + Assertions.assertTrue(indexOfSchemaChange(targetRecords) >= 0); + Assertions.assertTrue( + indexOfDocument(targetRecords, documentId) > indexOfSchemaChange(targetRecords)); + } + + private static String documentId(Record record) { + return (String) ((SeaTunnelRow) record.getData()).getField(0); + } + + private static long barrierCount(List> records) { + return records.stream().filter(record -> record.getData() instanceof Barrier).count(); + } + + private static long schemaChangeCount(List> records) { + return records.stream() + .filter(record -> record.getData() instanceof SchemaChangeEvent) + .count(); + } + + private static int indexOfSchemaChange(List> records) { + for (int i = 0; i < records.size(); i++) { + if (records.get(i).getData() instanceof SchemaChangeEvent) { + return i; + } + } + return -1; + } + + private static int indexOfDocument(List> records, String documentId) { + for (int i = 0; i < records.size(); i++) { + if (records.get(i).getData() instanceof SeaTunnelRow + && documentId.equals(documentId(records.get(i)))) { + return i; + } + } + return -1; + } + + private static SeaTunnelRow row(String documentId, String chunkId) { + return new SeaTunnelRow(new Object[] {documentId, chunkId}); + } + + private static SchemaChangeEvent schemaChange() { + return AlterTableAddColumnEvent.add( + TableIdentifier.of("", TablePath.DEFAULT), + PhysicalColumn.of("new_col", BasicType.STRING_TYPE, (Long) null, true, null, null)); + } + + private static String remoteDocumentId(int sourceIndex, int parallelism) { + for (int i = 0; i < 100; i++) { + String documentId = "remote-document-" + i; + if (targetIndex(documentId, parallelism) != sourceIndex) { + return documentId; + } + } + throw new IllegalStateException("Unable to find a remote routing key"); + } + + private static int targetIndex(String documentId, int parallelism) { + return router(parallelism).route(new Record<>(row(documentId, "chunk"))).get(); + } + + private static SinkPartitionRouter router(int parallelism) { + return SinkPartitionRouter.create( + Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.singletonList("document_id"))), + rowType(), + parallelism, + "DocumentSink"); + } + + private static SeaTunnelRowType rowType() { + return new SeaTunnelRowType( + new String[] {"document_id", "chunk_id"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE}); + } + + private static final class ExchangeHarness { + private final InMemoryDispatcher dispatcher = new InMemoryDispatcher(); + private final List locations = new ArrayList<>(); + private final Map exchanges = new HashMap<>(); + private final Map>> writerRecords = new HashMap<>(); + + private ExchangeHarness(int parallelism, int batchSize) { + for (int i = 0; i < parallelism; i++) { + locations.add(new TaskLocation(new TaskGroupLocation(1L, 1, i + 1), 1, i)); + } + for (TaskLocation location : locations) { + List> records = new ArrayList<>(); + writerRecords.put(location, records); + SinkPartitionExchange exchange = + new SinkPartitionExchange( + location, + locations, + router(parallelism), + dispatcher, + records::add, + batchSize); + exchanges.put(location, exchange); + dispatcher.register(location, exchange); + } + } + + private SinkPartitionExchange exchange(int index) { + return exchanges.get(location(index)); + } + + private TaskLocation location(int index) { + return locations.get(index); + } + + private void flushAll() throws Exception { + for (SinkPartitionExchange exchange : exchanges.values()) { + exchange.flush(); + } + } + + private long totalBarrierCount() { + return writerRecords.values().stream() + .mapToLong(SinkPartitionExchangeTest::barrierCount) + .sum(); + } + + private int totalRecordCount() { + return writerRecords.values().stream().mapToInt(List::size).sum(); + } + } + + private static final class InMemoryDispatcher implements SinkPartitionExchangeDispatcher { + private final Map exchanges = new HashMap<>(); + + private void register(TaskLocation location, SinkPartitionExchange exchange) { + exchanges.put(location, exchange); + } + + @Override + public void dispatch(SinkPartitionExchangeEnvelope envelope) throws Exception { + exchanges.get(envelope.getTargetTaskLocation()).receive(envelope); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouterTest.java new file mode 100644 index 000000000000..6af30710e3f5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouterTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.flow; + +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +public class SinkPartitionRouterTest { + + @Test + public void dataRowsWithSamePartitionFieldsRouteToSameSubtask() { + SinkPartitionRouter router = documentRouter(4); + + int first = router.selectSubtask(row("document-1", "chunk-1")); + int second = router.selectSubtask(row("document-1", "chunk-2")); + + Assertions.assertEquals(first, second); + } + + @Test + public void differentPartitionFieldValuesCanUseDifferentSubtasks() { + SinkPartitionRouter router = documentRouter(8); + Set selectedSubtasks = new HashSet<>(); + + for (int i = 0; i < 64; i++) { + selectedSubtasks.add(router.selectSubtask(row("document-" + i, "chunk"))); + } + + Assertions.assertTrue( + selectedSubtasks.size() > 1, + "different document_id values should remain parallelizable"); + } + + @Test + public void controlRecordsDoNotReceiveDataRoutingTarget() { + SinkPartitionRouter router = documentRouter(4); + + Optional target = router.route(new Record<>("schema-or-checkpoint-control")); + + Assertions.assertFalse(target.isPresent()); + } + + @Test + public void defaultStrategyDoesNotRouteDataRows() { + SinkPartitionRouter router = + SinkPartitionRouter.create(Optional.empty(), rowType(), 4, "DefaultSink"); + + Optional target = router.route(new Record<>(row("document-1", "chunk-1"))); + + Assertions.assertFalse(target.isPresent()); + } + + private static SinkPartitionRouter documentRouter(int parallelism) { + return SinkPartitionRouter.create( + Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.singletonList("document_id"))), + rowType(), + parallelism, + "DocumentSink"); + } + + private static SeaTunnelRow row(String documentId, String chunkId) { + return new SeaTunnelRow(new Object[] {documentId, chunkId}); + } + + private static SeaTunnelRowType rowType() { + return new SeaTunnelRowType( + new String[] {"document_id", "chunk_id"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.STRING_TYPE}); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/trace/StainTraceFlowTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/trace/StainTraceFlowTest.java index d57df3dea677..6d8ac784803e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/trace/StainTraceFlowTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/trace/StainTraceFlowTest.java @@ -374,6 +374,7 @@ private static SinkFlowLifeCycle createSin taskLocation, 0, runningTask, + null, new TaskLocation(new TaskGroupLocation(1L, 1, 2L), 2L, 0), false, new CompletableFuture<>(), diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuard.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuard.java new file mode 100644 index 000000000000..c5c5b88d90a1 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuard.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation; + +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; + +import java.util.Objects; +import java.util.Optional; + +/** Guards translations that do not yet implement sink-declared field routing. */ +public final class SinkPartitionStrategyGuard { + + private SinkPartitionStrategyGuard() {} + + public static void checkUnsupportedNonZetaSinkPartitionStrategy( + SeaTunnelSink sink, String engineName) { + Objects.requireNonNull(sink, "sink must not be null"); + Optional strategy = sink.getPartitionStrategy(); + if (!strategy.isPresent() || strategy.get().getMode() == SinkPartitionStrategy.Mode.NONE) { + return; + } + + throw new UnsupportedOperationException( + String.format( + "%s translation does not support sink field routing yet. " + + "Sink %s declares %s partition strategy with fields %s. " + + "Run this sink with Zeta until %s implements equivalent routing.", + engineName, + sink.getPluginName(), + strategy.get().getMode(), + strategy.get().getPartitionFields(), + engineName)); + } +} diff --git a/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuardTest.java b/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuardTest.java new file mode 100644 index 000000000000..3b0dbae5de50 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/SinkPartitionStrategyGuardTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation; + +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class SinkPartitionStrategyGuardTest { + + @Test + public void flinkRejectsUnsupportedHashByFieldsStrategy() { + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + SinkPartitionStrategyGuard + .checkUnsupportedNonZetaSinkPartitionStrategy( + hashByDocumentSink(), "Flink")); + + Assertions.assertTrue(exception.getMessage().contains("Flink")); + Assertions.assertTrue(exception.getMessage().contains("Zeta")); + Assertions.assertTrue(exception.getMessage().contains("HASH_BY_FIELDS")); + } + + @Test + public void sparkRejectsUnsupportedHashByFieldsStrategy() { + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + SinkPartitionStrategyGuard + .checkUnsupportedNonZetaSinkPartitionStrategy( + hashByDocumentSink(), "Spark")); + + Assertions.assertTrue(exception.getMessage().contains("Spark")); + Assertions.assertTrue(exception.getMessage().contains("Zeta")); + Assertions.assertTrue(exception.getMessage().contains("document_id")); + } + + @Test + public void defaultSinkStrategyRemainsAccepted() { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy( + defaultSink(), "Flink"); + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy( + defaultSink(), "Spark"); + } + + @Test + public void explicitNoneStrategyRemainsAccepted() { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy( + noneStrategySink(), "Flink"); + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy( + noneStrategySink(), "Spark"); + } + + private static SeaTunnelSink hashByDocumentSink() { + return new TestSink( + Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.singletonList("document_id")))); + } + + private static SeaTunnelSink defaultSink() { + return new TestSink(Optional.empty()); + } + + private static SeaTunnelSink noneStrategySink() { + return new TestSink(Optional.of(SinkPartitionStrategy.none())); + } + + private static class TestSink implements SeaTunnelSink { + private final Optional partitionStrategy; + + private TestSink(Optional partitionStrategy) { + this.partitionStrategy = partitionStrategy; + } + + @Override + public String getPluginName() { + return "TestSink"; + } + + @Override + public SinkWriter createWriter(SinkWriter.Context context) + throws IOException { + return null; + } + + @Override + public Optional getPartitionStrategy() { + return partitionStrategy; + } + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java index 4aa58059dbe3..98cc4b9c3b3b 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.translation.SinkPartitionStrategyGuard; import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer; import org.apache.seatunnel.translation.flink.serialization.EmptyFlinkWriterStateSerializer; import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer; @@ -54,6 +55,8 @@ public FlinkSink( SeaTunnelSink seaTunnelSink, List catalogTables, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy( + seaTunnelSink, "Flink"); this.seaTunnelSink = (SeaTunnelSink) seaTunnelSink; this.catalogTables = catalogTables; diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java index f961b67b9266..93b03b14042e 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.translation.SinkPartitionStrategyGuard; import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer; import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer; import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer; @@ -69,6 +70,7 @@ public FlinkSink( SeaTunnelSink sink, List catalogTables, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy(sink, "Flink"); this.sink = sink; this.catalogTables = catalogTables; this.parallelism = parallelism; diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkPartitionStrategyGuardTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkPartitionStrategyGuardTest.java new file mode 100644 index 000000000000..c4e900ea36d1 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkPartitionStrategyGuardTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.sink; + +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class FlinkSinkPartitionStrategyGuardTest { + + @Test + public void rejectsHashByFieldsSinkPartitionStrategy() { + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> new FlinkSink<>(hashByDocumentSink(), Collections.emptyList(), 2)); + } + + @Test + public void acceptsDefaultSinkPartitionStrategy() { + new FlinkSink<>(defaultSink(), Collections.emptyList(), 2); + } + + private static SeaTunnelSink hashByDocumentSink() { + return new TestSink( + Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.singletonList("document_id")))); + } + + private static SeaTunnelSink defaultSink() { + return new TestSink(Optional.empty()); + } + + private static class TestSink implements SeaTunnelSink { + private final Optional partitionStrategy; + + private TestSink(Optional partitionStrategy) { + this.partitionStrategy = partitionStrategy; + } + + @Override + public String getPluginName() { + return "TestSink"; + } + + @Override + public SinkWriter createWriter(SinkWriter.Context context) + throws IOException { + return null; + } + + @Override + public Optional getPartitionStrategy() { + return partitionStrategy; + } + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java index ff889f21d443..0121471a74e8 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.utils.SerializationUtils; +import org.apache.seatunnel.translation.SinkPartitionStrategyGuard; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Row; @@ -42,6 +43,7 @@ public static DataStreamWriter inject( CatalogTable[] catalogTables, String applicationId, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy(sink, "Spark"); return dataset.format(SPARK_SINK_CLASS_NAME) .outputMode(OutputMode.Append()) .option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink)) @@ -56,6 +58,7 @@ public static DataFrameWriter inject( CatalogTable[] catalogTables, String applicationId, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy(sink, "Spark"); return dataset.format(SPARK_SINK_CLASS_NAME) .option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink)) .option(SINK_CATALOG_TABLE, SerializationUtils.objectToString(catalogTables)) diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java index 531d0d9511c9..9cbc8b67d7f7 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.utils.SerializationUtils; +import org.apache.seatunnel.translation.SinkPartitionStrategyGuard; import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Row; @@ -43,6 +44,7 @@ public static DataStreamWriter inject( CatalogTable[] catalogTables, String applicationId, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy(sink, "Spark"); return dataset.format(SINK_NAME) .outputMode(OutputMode.Append()) .option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink)) @@ -58,6 +60,7 @@ public static DataFrameWriter inject( CatalogTable[] catalogTables, String applicationId, int parallelism) { + SinkPartitionStrategyGuard.checkUnsupportedNonZetaSinkPartitionStrategy(sink, "Spark"); return dataset.format(SINK_NAME) .option(Constants.SINK_SERIALIZATION, SerializationUtils.objectToString(sink)) // TODO this should require fetching the catalog table in sink diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkPartitionStrategyGuardTest.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkPartitionStrategyGuardTest.java new file mode 100644 index 000000000000..e552533d6b4b --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkPartitionStrategyGuardTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.sink; + +import org.apache.seatunnel.api.sink.SinkPartitionStrategy; + +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Row; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Optional; + +public class SparkSinkPartitionStrategyGuardTest { + + @Test + public void sparkInjectorRejectsUnsupportedHashByFieldsStrategy() { + SeaTunnelSinkWithBuffer sink = + new SeaTunnelSinkWithBuffer() { + @Override + public Optional getPartitionStrategy() { + return Optional.of( + SinkPartitionStrategy.hashByFields( + Collections.singletonList("document_id"))); + } + }; + + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + SparkSinkInjector.inject( + (DataFrameWriter) null, sink, null, "app", 1)); + + Assertions.assertTrue(exception.getMessage().contains("Spark")); + Assertions.assertTrue(exception.getMessage().contains("Zeta")); + Assertions.assertTrue(exception.getMessage().contains("document_id")); + } +} diff --git a/tools/seatunnel-python-sdk/seatunnel/endpoints/config.py b/tools/seatunnel-python-sdk/seatunnel/endpoints/config.py index 9b23812d98ac..bfecddfc3c54 100644 --- a/tools/seatunnel-python-sdk/seatunnel/endpoints/config.py +++ b/tools/seatunnel-python-sdk/seatunnel/endpoints/config.py @@ -26,7 +26,7 @@ def encrypt_config(self, conf: str): Encrypt Config """ return self.client.request( - HttpMethod.POST, + HttpMethod.POST, "/encrypt-config", content=conf - ) \ No newline at end of file + )