Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/en/developer/sink-connector-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,31 @@ Use an aggregated committer when:

This model is especially important for table-oriented sinks and strong consistency use cases.

## Sink Partition Strategy

A sink can declare an engine-side routing requirement by overriding
`SeaTunnelSink#getPartitionStrategy()`. Keep the default empty strategy unless writer correctness
depends on all rows for the same logical key reaching the same sink writer, such as document
lifecycle, upsert, or stale chunk cleanup for RAG/vector sinks.

```java
@Override
public Optional<SinkPartitionStrategy> getPartitionStrategy() {
return Optional.of(
SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id")));
}
```

Guidelines:

- Use physical input field names from the sink's consumed `SeaTunnelRowType`.
- Keep the routing key small and stable, for example `document_id`.
- Route only ordinary data records; checkpoint barriers, schema-change events, and other control
records must keep their original control-flow semantics.
- `Optional.empty()` and `SinkPartitionStrategy.none()` keep the existing routing behavior.
- Current engine scope is Zeta-oriented. Flink and Spark translations fail fast when a non-empty
sink partition strategy is declared so users do not assume the strategy is silently honored.

## CDC-Aware Sink Design

If the sink accepts CDC input, define the mapping very clearly:
Expand Down
24 changes: 24 additions & 0 deletions docs/zh/developer/sink-connector-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ connector-<name>/

对表类 sink 和强一致性场景,这种模型尤其重要。

## Sink Partition Strategy

如果 sink writer 的正确性依赖同一个逻辑 key 的所有行进入同一个 writer,可以覆写
`SeaTunnelSink#getPartitionStrategy()` 声明引擎侧路由需求。常见场景是 RAG/vector sink
按 `document_id` 做文档级生命周期处理、upsert、delete 或 stale chunk 清理。

```java
@Override
public Optional<SinkPartitionStrategy> getPartitionStrategy() {
return Optional.of(
SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id")));
}
```

建议:

- 字段名使用 sink 输入 `SeaTunnelRowType` 里的物理字段名。
- 路由 key 应尽量小且稳定,例如 `document_id`。
- 路由只应该影响普通 data record;checkpoint barrier、schema change event 和其他控制消息
必须保持原有控制流语义。
- `Optional.empty()` 和 `SinkPartitionStrategy.none()` 都表示保持现有路由行为。
- 当前支持范围以 Zeta 为目标;Flink 和 Spark translation 遇到非空 sink partition strategy
会 fail-fast,避免用户误以为该策略已经静默生效。

## CDC-Aware Sink 设计

如果 sink 接受 CDC 输入,必须把映射规则写清楚:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,15 @@ default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSeria
default Optional<CatalogTable> getWriteCatalogTable() {
return Optional.empty();
}

/**
* Get the partition strategy required by this sink.
*
* <p>The default empty strategy preserves existing sink behavior.
*
* @return Optional sink partition strategy.
*/
default Optional<SinkPartitionStrategy> getPartitionStrategy() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/** Describes how sink input rows should be partitioned before reaching sink writers. */
public final class SinkPartitionStrategy implements Serializable {

private static final long serialVersionUID = 1L;

private final Mode mode;
private final List<String> partitionFields;

private SinkPartitionStrategy(Mode mode, List<String> partitionFields) {
this.mode = Objects.requireNonNull(mode, "mode must not be null");
this.partitionFields =
Collections.unmodifiableList(
new ArrayList<>(
Objects.requireNonNull(
partitionFields, "partitionFields must not be null")));
}

/**
* Creates a strategy that keeps the existing sink routing behavior.
*
* @return No-op sink partition strategy.
*/
public static SinkPartitionStrategy none() {
return new SinkPartitionStrategy(Mode.NONE, Collections.emptyList());
}

/**
* Creates a strategy that routes rows by hashing the declared physical row fields.
*
* @param partitionFields Physical row field names used to calculate the routing hash.
* @return Hash-by-fields sink partition strategy.
*/
public static SinkPartitionStrategy hashByFields(List<String> partitionFields) {
return new SinkPartitionStrategy(Mode.HASH_BY_FIELDS, partitionFields);
}

public Mode getMode() {
return mode;
}

public List<String> getPartitionFields() {
return partitionFields;
}

public enum Mode {
NONE,
HASH_BY_FIELDS
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** Validates sink partition strategies against the sink input row type. */
public final class SinkPartitionStrategyValidator {

private SinkPartitionStrategyValidator() {}

/**
* Validates that a sink partition strategy can be applied to the sink input row type.
*
* @param partitionStrategy Optional sink partition strategy declared by a sink.
* @param rowType Sink input row type.
* @param sinkIdentity Human-readable sink identity used in validation errors.
*/
public static void validate(
Optional<SinkPartitionStrategy> partitionStrategy,
SeaTunnelRowType rowType,
String sinkIdentity) {
Objects.requireNonNull(partitionStrategy, "partitionStrategy must not be null");
if (!partitionStrategy.isPresent()) {
return;
}
validate(partitionStrategy.get(), rowType, sinkIdentity);
}

/**
* Validates that a sink partition strategy can be applied to the sink input row type.
*
* @param partitionStrategy Sink partition strategy declared by a sink.
* @param rowType Sink input row type.
* @param sinkIdentity Human-readable sink identity used in validation errors.
*/
public static void validate(
SinkPartitionStrategy partitionStrategy,
SeaTunnelRowType rowType,
String sinkIdentity) {
Objects.requireNonNull(partitionStrategy, "partitionStrategy must not be null");
Objects.requireNonNull(rowType, "rowType must not be null");

switch (partitionStrategy.getMode()) {
case NONE:
return;
case HASH_BY_FIELDS:
validateHashByFields(
partitionStrategy, rowType, normalizeSinkIdentity(sinkIdentity));
return;
default:
throw new IllegalArgumentException(
String.format(
"Sink %s declares unsupported partition strategy mode: %s",
normalizeSinkIdentity(sinkIdentity), partitionStrategy.getMode()));
}
}

private static void validateHashByFields(
SinkPartitionStrategy partitionStrategy,
SeaTunnelRowType rowType,
String sinkIdentity) {
List<String> partitionFields = partitionStrategy.getPartitionFields();
if (partitionFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Sink %s declares HASH_BY_FIELDS partition strategy, but partition fields are empty.",
sinkIdentity));
}

List<String> missingFields = new ArrayList<>();
for (String partitionField : partitionFields) {
if (rowType.indexOf(partitionField, false) < 0) {
missingFields.add(partitionField);
}
}

if (!missingFields.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Sink %s declares HASH_BY_FIELDS partition fields %s, but input row type is missing fields %s. Available fields: %s",
sinkIdentity,
partitionFields,
missingFields,
Arrays.toString(rowType.getFieldNames())));
}
}

private static String normalizeSinkIdentity(String sinkIdentity) {
if (sinkIdentity == null || sinkIdentity.trim().isEmpty()) {
return "unknown sink";
}
return sinkIdentity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class SinkPartitionStrategyTest {

@Test
public void defaultSinkPartitionStrategyIsEmpty() {
Assertions.assertFalse(new DefaultRoutingSink().getPartitionStrategy().isPresent());
}

@Test
public void noneStrategyKeepsEmptyRoutingFields() {
SinkPartitionStrategy strategy = SinkPartitionStrategy.none();

Assertions.assertEquals(SinkPartitionStrategy.Mode.NONE, strategy.getMode());
Assertions.assertTrue(strategy.getPartitionFields().isEmpty());
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> strategy.getPartitionFields().add("document_id"));
}

@Test
public void hashByFieldsStrategyKeepsImmutableRoutingFields() {
List<String> fields = new ArrayList<>();
fields.add("document_id");

SinkPartitionStrategy strategy = SinkPartitionStrategy.hashByFields(fields);
fields.add("chunk_id");

Assertions.assertEquals(SinkPartitionStrategy.Mode.HASH_BY_FIELDS, strategy.getMode());
Assertions.assertEquals(
Collections.singletonList("document_id"), strategy.getPartitionFields());
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> strategy.getPartitionFields().add("chunk_id"));
}

@Test
public void sinkCanDeclareHashByFieldStrategy() {
Optional<SinkPartitionStrategy> strategy = new DocumentRoutingSink().getPartitionStrategy();

Assertions.assertTrue(strategy.isPresent());
Assertions.assertEquals(
SinkPartitionStrategy.Mode.HASH_BY_FIELDS, strategy.get().getMode());
Assertions.assertEquals(
Collections.singletonList("document_id"), strategy.get().getPartitionFields());
}

private static class DefaultRoutingSink
implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> {

@Override
public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context)
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public String getPluginName() {
return "DefaultRoutingSink";
}
}

private static class DocumentRoutingSink extends DefaultRoutingSink {

@Override
public Optional<SinkPartitionStrategy> getPartitionStrategy() {
return Optional.of(
SinkPartitionStrategy.hashByFields(Collections.singletonList("document_id")));
}

@Override
public String getPluginName() {
return "DocumentRoutingSink";
}
}
}
Loading
Loading