Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/connectors/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
| paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` |
| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. |
| branch | String | No | main | The branch name of Paimon table to write data to. For non-main branches, the main table and target branch must already exist, and `schema_save_mode=RECREATE_SCHEMA` or `data_save_mode=DROP_DATA` is not supported. |


## Checkpoint in batch mode
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/connectors/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
| paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 |
| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 |
| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。非 main 分支要求 main 表和目标分支已存在,且不支持 `schema_save_mode=RECREATE_SCHEMA` 或 `data_save_mode=DROP_DATA`。 |

## 批模式下的checkpoint

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode {
NON_PRIMARY_KEY_CHECK_ERROR(
"PAIMON-10", "Primary keys should be empty when nonPrimaryKey is true"),
DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "),
BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. ");
BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "),
UNSUPPORTED_BRANCH_SAVE_MODE(
"PAIMON-13", "The save mode is not supported for non-main branches. ");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable;

import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BranchManager;

public class PaimonSaveModeHandler extends DefaultSaveModeHandler {

Expand All @@ -55,15 +58,67 @@ public PaimonSaveModeHandler(

@Override
public void handleSchemaSaveMode() {
checkBranchSaveMode();
super.handleSchemaSaveMode();
TablePath tablePath = catalogTable.getTablePath();
Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath);
Table loadTable = this.supportLoadTable.getLoadTable();
if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
if (StringUtils.isNotEmpty(branch)) {
if (isNonMainBranch()) {
paimonTable = ((FileStoreTable) paimonTable).switchToBranch(branch);
}
this.supportLoadTable.setLoadTable(paimonTable);
}
}

@Override
public void handleDataSaveMode() {
checkBranchSaveMode();
super.handleDataSaveMode();
}

@Override
public void handleSchemaSaveModeWithRestore() {
checkBranchSaveMode();
super.handleSchemaSaveModeWithRestore();
}

private boolean isNonMainBranch() {
return StringUtils.isNotEmpty(branch)
&& !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch);
}

private void checkBranchSaveMode() {
if (!isNonMainBranch()) {
return;
}
if (!catalog.tableExists(tablePath)) {
throw unsupportedBranchSaveMode(
"The main table must exist before writing to a non-main branch.");
}
Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath);
if (!((FileStoreTable) paimonTable).branchManager().branchExists(branch)) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.BRANCH_NOT_EXISTS,
String.format(
"Specified branch '%s' of table '%s' does not exist.",
branch, tablePath));
}
if (this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) {
throw unsupportedBranchSaveMode(
"schema_save_mode=RECREATE_SCHEMA would drop and recreate the main table.");
}
if (this.dataSaveMode == DataSaveMode.DROP_DATA) {
throw unsupportedBranchSaveMode(
"data_save_mode=DROP_DATA would truncate the main table.");
}
}

private PaimonConnectorException unsupportedBranchSaveMode(String reason) {
return new PaimonConnectorException(
PaimonConnectorErrorCode.UNSUPPORTED_BRANCH_SAVE_MODE,
String.format(
"Paimon branch '%s' does not support this save mode for table '%s'. %s",
branch, tablePath, reason));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
sourceTableSchema,
paimonCatalog,
sinkPaimonTableSchema,
paimonTablePath)
paimonTablePath,
paimonSinkConfig.getBranch())
.apply(event);
reOpenTableWrite();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -61,15 +62,19 @@ public class AlterPaimonTableSchemaEventHandler {

private final TablePath paimonTablePath;

private final String branch;

public AlterPaimonTableSchemaEventHandler(
TableSchema sourceTableSchema,
PaimonCatalog paimonCatalog,
org.apache.paimon.schema.TableSchema sinkPaimonTableSchema,
TablePath paimonTablePath) {
TablePath paimonTablePath,
String branch) {
this.sourceTableSchema = sourceTableSchema;
this.paimonCatalog = paimonCatalog;
this.sinkPaimonTableSchema = sinkPaimonTableSchema;
this.paimonTablePath = paimonTablePath;
this.branch = branch;
}

public TableSchema apply(SchemaChangeEvent event) {
Expand All @@ -87,9 +92,7 @@ public TableSchema apply(SchemaChangeEvent event) {
}

private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
Identifier identifier =
Identifier.create(
paimonTablePath.getDatabaseName(), paimonTablePath.getTableName());
Identifier identifier = toIdentifier();
if (event instanceof AlterTableAddColumnEvent) {
AlterTableAddColumnEvent alterTableAddColumnEvent = (AlterTableAddColumnEvent) event;
Column column = alterTableAddColumnEvent.getColumn();
Expand Down Expand Up @@ -130,6 +133,15 @@ private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
}
}

private Identifier toIdentifier() {
if (StringUtils.isNotEmpty(branch)
&& !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch)) {
return new Identifier(
paimonTablePath.getDatabaseName(), paimonTablePath.getTableName(), branch);
}
return Identifier.create(paimonTablePath.getDatabaseName(), paimonTablePath.getTableName());
}

private void updateColumn(
Column newColumn, String oldColumnName, Identifier identifier, String afterTheColumn) {
BasicTypeDefine<DataType> reconvertColumn = PaimonTypeMapper.INSTANCE.reconvert(newColumn);
Expand Down
Loading
Loading