diff --git a/docs/en/connectors/sink/Paimon.md b/docs/en/connectors/sink/Paimon.md index 060bbcff9f0d..f915f6829029 100644 --- a/docs/en/connectors/sink/Paimon.md +++ b/docs/en/connectors/sink/Paimon.md @@ -78,7 +78,7 @@ libfb303-xxx.jar | paimon.hadoop.conf | Map | No | - | Properties in hadoop conf | | paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files | | paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` | -| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. | +| branch | String | No | main | The branch name of Paimon table to write data to. For non-main branches, the main table and target branch must already exist, and `schema_save_mode=RECREATE_SCHEMA` or `data_save_mode=DROP_DATA` is not supported. | ## Checkpoint in batch mode diff --git a/docs/zh/connectors/sink/Paimon.md b/docs/zh/connectors/sink/Paimon.md index 8c3799a80c5f..3aaa9e18d27f 100644 --- a/docs/zh/connectors/sink/Paimon.md +++ b/docs/zh/connectors/sink/Paimon.md @@ -77,7 +77,7 @@ libfb303-xxx.jar | paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | | paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | | paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 | -| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 | +| branch | 字符串 | 否 | main | 要写入数据的Paimon表分支名称。非 main 分支要求 main 表和目标分支已存在,且不支持 `schema_save_mode=RECREATE_SCHEMA` 或 `data_save_mode=DROP_DATA`。 | ## 批模式下的checkpoint diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java index f369b8cf6621..6acd6e70fd9d 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java @@ -33,7 +33,9 @@ public enum PaimonConnectorErrorCode implements SeaTunnelErrorCode { NON_PRIMARY_KEY_CHECK_ERROR( "PAIMON-10", "Primary keys should be empty when nonPrimaryKey is true"), DECIMAL_PRECISION_INCOMPATIBLE("PAIMON-11", "decimal type precision is incompatible. "), - BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "); + BRANCH_NOT_EXISTS("PAIMON-12", "Specified branch: %s does not exist. "), + UNSUPPORTED_BRANCH_SAVE_MODE( + "PAIMON-13", "The save mode is not supported for non-main branches. "); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java index a89edc02c59d..1d9311904199 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java @@ -26,10 +26,13 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.BranchManager; public class PaimonSaveModeHandler extends DefaultSaveModeHandler { @@ -55,15 +58,67 @@ public PaimonSaveModeHandler( @Override public void handleSchemaSaveMode() { + checkBranchSaveMode(); super.handleSchemaSaveMode(); TablePath tablePath = catalogTable.getTablePath(); Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); Table loadTable = this.supportLoadTable.getLoadTable(); if (loadTable == null || this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) { - if (StringUtils.isNotEmpty(branch)) { + if (isNonMainBranch()) { paimonTable = ((FileStoreTable) paimonTable).switchToBranch(branch); } this.supportLoadTable.setLoadTable(paimonTable); } } + + @Override + public void handleDataSaveMode() { + checkBranchSaveMode(); + super.handleDataSaveMode(); + } + + @Override + public void handleSchemaSaveModeWithRestore() { + checkBranchSaveMode(); + super.handleSchemaSaveModeWithRestore(); + } + + private boolean isNonMainBranch() { + return StringUtils.isNotEmpty(branch) + && !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch); + } + + private void checkBranchSaveMode() { + if (!isNonMainBranch()) { + return; + } + if (!catalog.tableExists(tablePath)) { + throw unsupportedBranchSaveMode( + "The main table must exist before writing to a non-main branch."); + } + Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); + if (!((FileStoreTable) paimonTable).branchManager().branchExists(branch)) { + throw new PaimonConnectorException( + PaimonConnectorErrorCode.BRANCH_NOT_EXISTS, + String.format( + "Specified branch '%s' of table '%s' does not exist.", + branch, tablePath)); + } + if (this.schemaSaveMode == SchemaSaveMode.RECREATE_SCHEMA) { + throw unsupportedBranchSaveMode( + "schema_save_mode=RECREATE_SCHEMA would drop and recreate the main table."); + } + if (this.dataSaveMode == DataSaveMode.DROP_DATA) { + throw unsupportedBranchSaveMode( + "data_save_mode=DROP_DATA would truncate the main table."); + } + } + + private PaimonConnectorException unsupportedBranchSaveMode(String reason) { + return new PaimonConnectorException( + PaimonConnectorErrorCode.UNSUPPORTED_BRANCH_SAVE_MODE, + String.format( + "Paimon branch '%s' does not support this save mode for table '%s'. %s", + branch, tablePath, reason)); + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index 2d8fad38c0fd..076f7796ad88 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -268,7 +268,8 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException { sourceTableSchema, paimonCatalog, sinkPaimonTableSchema, - paimonTablePath) + paimonTablePath, + paimonSinkConfig.getBranch()) .apply(event); reOpenTableWrite(); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java index d37ab217585a..e0faa687b29d 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java @@ -38,6 +38,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import lombok.extern.slf4j.Slf4j; @@ -61,15 +62,19 @@ public class AlterPaimonTableSchemaEventHandler { private final TablePath paimonTablePath; + private final String branch; + public AlterPaimonTableSchemaEventHandler( TableSchema sourceTableSchema, PaimonCatalog paimonCatalog, org.apache.paimon.schema.TableSchema sinkPaimonTableSchema, - TablePath paimonTablePath) { + TablePath paimonTablePath, + String branch) { this.sourceTableSchema = sourceTableSchema; this.paimonCatalog = paimonCatalog; this.sinkPaimonTableSchema = sinkPaimonTableSchema; this.paimonTablePath = paimonTablePath; + this.branch = branch; } public TableSchema apply(SchemaChangeEvent event) { @@ -87,9 +92,7 @@ public TableSchema apply(SchemaChangeEvent event) { } private void applySingleSchemaChangeEvent(SchemaChangeEvent event) { - Identifier identifier = - Identifier.create( - paimonTablePath.getDatabaseName(), paimonTablePath.getTableName()); + Identifier identifier = toIdentifier(); if (event instanceof AlterTableAddColumnEvent) { AlterTableAddColumnEvent alterTableAddColumnEvent = (AlterTableAddColumnEvent) event; Column column = alterTableAddColumnEvent.getColumn(); @@ -130,6 +133,15 @@ private void applySingleSchemaChangeEvent(SchemaChangeEvent event) { } } + private Identifier toIdentifier() { + if (StringUtils.isNotEmpty(branch) + && !BranchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branch)) { + return new Identifier( + paimonTablePath.getDatabaseName(), paimonTablePath.getTableName(), branch); + } + return Identifier.create(paimonTablePath.getDatabaseName(), paimonTablePath.getTableName()); + } + private void updateColumn( Column newColumn, String oldColumnName, Identifier identifier, String afterTheColumn) { BasicTypeDefine reconvertColumn = PaimonTypeMapper.INSTANCE.reconvert(newColumn); diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java new file mode 100644 index 000000000000..87540d7b8c75 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonBranchSaveModeHandlerTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.handler; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class PaimonBranchSaveModeHandlerTest { + + private static final String CATALOG_NAME = "paimon_catalog"; + private static final String DATABASE_NAME = "default"; + private static final String TABLE_NAME = "branch_table"; + private static final String BRANCH_NAME = "test_branch"; + + @TempDir private Path temporaryFolder; + + private PaimonCatalog paimonCatalog; + private ReadonlyConfig readonlyConfig; + private TablePath tablePath; + private CatalogTable catalogTable; + + @BeforeEach + public void before() throws Exception { + Map properties = new HashMap<>(); + properties.put("warehouse", temporaryFolder.toString()); + properties.put("plugin_name", "Paimon"); + properties.put("database", DATABASE_NAME); + properties.put("table", TABLE_NAME); + properties.put("branch", BRANCH_NAME); + properties.put("paimon.table.write-props", new HashMap()); + readonlyConfig = ReadonlyConfig.fromMap(properties); + + tablePath = TablePath.of(DATABASE_NAME, TABLE_NAME); + catalogTable = createCatalogTable(TABLE_NAME); + + paimonCatalog = new PaimonCatalog(CATALOG_NAME, readonlyConfig); + paimonCatalog.open(); + paimonCatalog.createDatabase(tablePath, true); + paimonCatalog.createTable(tablePath, catalogTable, false); + + FileStoreTable table = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + if (!table.branchManager().branchExists(BRANCH_NAME)) { + table.createBranch(BRANCH_NAME); + } + } + + @Test + public void createSchemaWithBranchShouldFailWithoutCreatingMainTableWhenTableMissing() { + String missingTableName = "missing_branch_table"; + TablePath missingTablePath = TablePath.of(DATABASE_NAME, missingTableName); + CatalogTable missingCatalogTable = createCatalogTable(missingTableName); + PaimonSaveModeHandler handler = + new PaimonSaveModeHandler( + new TestSupportLoadTable(), + SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST, + DataSaveMode.APPEND_DATA, + paimonCatalog, + missingCatalogTable, + null, + BRANCH_NAME); + + PaimonConnectorException exception = + Assertions.assertThrows( + PaimonConnectorException.class, handler::handleSchemaSaveMode); + Assertions.assertTrue( + exception.getMessage().contains("main table must exist"), + "The error message should explain that branch writes require an existing table."); + Assertions.assertFalse( + paimonCatalog.tableExists(missingTablePath), + "Branch save mode must not auto-create a main table."); + } + + @Test + public void createSchemaWithBranchShouldFailWhenBranchMissing() { + String tableWithoutBranch = "table_without_branch"; + TablePath tableWithoutBranchPath = TablePath.of(DATABASE_NAME, tableWithoutBranch); + CatalogTable tableWithoutBranchCatalogTable = createCatalogTable(tableWithoutBranch); + paimonCatalog.createTable(tableWithoutBranchPath, tableWithoutBranchCatalogTable, false); + PaimonSaveModeHandler handler = + new PaimonSaveModeHandler( + new TestSupportLoadTable(), + SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST, + DataSaveMode.APPEND_DATA, + paimonCatalog, + tableWithoutBranchCatalogTable, + null, + BRANCH_NAME); + + PaimonConnectorException exception = + Assertions.assertThrows( + PaimonConnectorException.class, handler::handleSchemaSaveMode); + Assertions.assertTrue( + exception.getMessage().contains("does not exist"), + "The error message should explain that the branch does not exist."); + Assertions.assertTrue(paimonCatalog.tableExists(tableWithoutBranchPath)); + Assertions.assertFalse( + ((FileStoreTable) paimonCatalog.getPaimonTable(tableWithoutBranchPath)) + .branchManager() + .branchExists(BRANCH_NAME), + "Branch save mode must not auto-create the branch."); + } + + @Test + public void dropDataWithBranchShouldFailWithoutDroppingMainTableOrBranch() { + PaimonSaveModeHandler handler = + new PaimonSaveModeHandler( + new TestSupportLoadTable(), + SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST, + DataSaveMode.DROP_DATA, + paimonCatalog, + catalogTable, + null, + BRANCH_NAME); + + PaimonConnectorException exception = + Assertions.assertThrows( + PaimonConnectorException.class, handler::handleSchemaSaveMode); + Assertions.assertTrue( + exception.getMessage().contains("data_save_mode=DROP_DATA"), + "The error message should explain the unsupported save mode."); + + FileStoreTable mainTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + Assertions.assertTrue( + mainTable.branchManager().branchExists(BRANCH_NAME), + "DROP_DATA on a branch must not delete branch metadata."); + } + + @Test + public void recreateSchemaWithBranchShouldFailWithoutDroppingMainTableOrBranch() { + PaimonSaveModeHandler handler = + new PaimonSaveModeHandler( + new TestSupportLoadTable(), + SchemaSaveMode.RECREATE_SCHEMA, + DataSaveMode.APPEND_DATA, + paimonCatalog, + catalogTable, + null, + BRANCH_NAME); + + PaimonConnectorException exception = + Assertions.assertThrows( + PaimonConnectorException.class, handler::handleSchemaSaveMode); + Assertions.assertTrue( + exception.getMessage().contains("schema_save_mode=RECREATE_SCHEMA"), + "The error message should explain the unsupported save mode."); + + FileStoreTable mainTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + Assertions.assertTrue( + mainTable.branchManager().branchExists(BRANCH_NAME), + "RECREATE_SCHEMA on a branch must not delete branch metadata."); + } + + @Test + public void restoreSchemaSaveModeWithBranchShouldFailForDestructiveSaveMode() { + PaimonSaveModeHandler handler = + new PaimonSaveModeHandler( + new TestSupportLoadTable(), + SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST, + DataSaveMode.DROP_DATA, + paimonCatalog, + catalogTable, + null, + BRANCH_NAME); + + PaimonConnectorException exception = + Assertions.assertThrows( + PaimonConnectorException.class, handler::handleSchemaSaveModeWithRestore); + Assertions.assertTrue( + exception.getMessage().contains("data_save_mode=DROP_DATA"), + "The error message should explain the unsupported save mode."); + } + + @Test + public void schemaEvolutionWithBranchShouldAlterBranchSchemaOnly() throws Exception { + PaimonSinkConfig sinkConfig = new PaimonSinkConfig(readonlyConfig); + PaimonSinkWriter writer = + new PaimonSinkWriter( + new TestSinkWriterContext(), + readonlyConfig, + catalogTable, + ((FileStoreTable) paimonCatalog.getPaimonTable(tablePath)) + .switchToBranch(BRANCH_NAME), + UUID.randomUUID().toString(), + null, + sinkConfig, + new PaimonHadoopConfiguration(), + new PaimonBucketAssignerFactory()); + try { + writer.applySchemaChange( + AlterTableAddColumnEvent.add( + catalogTable.getTableId(), + PhysicalColumn.of( + "branch_only_column", + BasicType.STRING_TYPE, + (Long) null, + true, + null, + null))); + } finally { + writer.close(); + } + + FileStoreTable mainTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + FileStoreTable branchTable = mainTable.switchToBranch(BRANCH_NAME); + Assertions.assertFalse( + mainTable.schema().fieldNames().contains("branch_only_column"), + "Schema evolution configured with a branch must not alter the main schema."); + Assertions.assertTrue( + branchTable.schema().fieldNames().contains("branch_only_column"), + "Schema evolution configured with a branch must alter the branch schema."); + } + + @AfterEach + public void after() { + if (paimonCatalog != null) { + paimonCatalog.close(); + } + } + + private static TableSchema baseSchema() { + return TableSchema.builder() + .column(PhysicalColumn.of("id", BasicType.INT_TYPE, (Long) null, false, null, null)) + .primaryKey( + org.apache.seatunnel.api.table.catalog.PrimaryKey.of( + "pk", Collections.singletonList("id"))) + .build(); + } + + private static CatalogTable createCatalogTable(String tableName) { + return CatalogTable.of( + TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, tableName), + baseSchema(), + new HashMap<>(), + Collections.emptyList(), + "branch test table"); + } + + private static class TestSupportLoadTable implements SupportLoadTable { + private Table table; + + @Override + public void setLoadTable(Table table) { + this.table = table; + } + + @Override + public Table getLoadTable() { + return table; + } + } + + private static class TestSinkWriterContext + implements org.apache.seatunnel.api.sink.SinkWriter.Context { + @Override + public int getIndexOfSubtask() { + return 0; + } + + @Override + public org.apache.seatunnel.api.common.metrics.MetricsContext getMetricsContext() { + return null; + } + + @Override + public org.apache.seatunnel.api.event.EventListener getEventListener() { + return null; + } + } +}