diff --git a/docs/en/connectors/sink/Kingbase.md b/docs/en/connectors/sink/Kingbase.md
index 1f4142d1dd5c..b203733219b7 100644
--- a/docs/en/connectors/sink/Kingbase.md
+++ b/docs/en/connectors/sink/Kingbase.md
@@ -79,8 +79,9 @@ import ChangeLog from '../changelog/connector-jdbc.md';
### Tips
-> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed
+> 1. If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed
> in parallel according to the concurrency of tasks.
+> 2. When Kingbase starts in MySQL compatibility mode, existing Kingbase connector configurations can be reused directly. The system automatically connects to the database and dynamically obtains its compatibility mode (compatibleMode) when building KingbaseDialect and KingbaseCatalog.
## Task Example
@@ -169,4 +170,4 @@ sink {
## Changelog
-
\ No newline at end of file
+
diff --git a/docs/en/connectors/source/Kingbase.md b/docs/en/connectors/source/Kingbase.md
index 4c0d74388e55..727798567b6e 100644
--- a/docs/en/connectors/source/Kingbase.md
+++ b/docs/en/connectors/source/Kingbase.md
@@ -83,7 +83,8 @@ Read external data source data through JDBC.
### Tips
-> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
+> 1. If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
+> 2. When Kingbase starts in MySQL compatibility mode, existing Kingbase connector configurations can be reused directly. The system automatically connects to the database and dynamically obtains its compatibility mode (compatibleMode) when building KingbaseDialect and KingbaseCatalog.
## Task Example
@@ -159,4 +160,4 @@ source {
## Changelog
-
\ No newline at end of file
+
diff --git a/docs/zh/connectors/sink/Kingbase.md b/docs/zh/connectors/sink/Kingbase.md
index 1ae8eae8e50f..3d5551135dfa 100644
--- a/docs/zh/connectors/sink/Kingbase.md
+++ b/docs/zh/connectors/sink/Kingbase.md
@@ -78,7 +78,8 @@ import ChangeLog from '../changelog/connector-jdbc.md';
### 提示
-> 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发性并行执行。
+> 1. 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发性并行执行。
+> 2. 当 Kingbase 以 MySQL 兼容模式启动时,可直接复用现有的 Kingbase 连接器配置。系统在构建 KingbaseDialect 和 KingbaseCatalog 时,会自动连接数据库并动态获取其兼容模式(compatibleMode)。
## 任务示例
diff --git a/docs/zh/connectors/source/Kingbase.md b/docs/zh/connectors/source/Kingbase.md
index 4980e86e3815..d634e7566b41 100644
--- a/docs/zh/connectors/source/Kingbase.md
+++ b/docs/zh/connectors/source/Kingbase.md
@@ -83,7 +83,8 @@ import ChangeLog from '../changelog/connector-jdbc.md';
### 提示
-> 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发度并行执行。
+> 1. 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发度并行执行。
+> 2. 当 Kingbase 以 MySQL 兼容模式启动时,可直接复用现有的 Kingbase 连接器配置。系统在构建 KingbaseDialect 和 KingbaseCatalog 时,会自动连接数据库并动态获取其兼容模式(compatibleMode)。
## 任务示例
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
index 6003a9a3f81a..5cb3a38bb04a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
@@ -26,13 +26,22 @@
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.auto.service.AutoService;
+import java.sql.Connection;
+
@AutoService(Factory.class)
public class KingbaseCatalogFactory implements CatalogFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(KingbaseCatalogFactory.class);
@Override
public String factoryIdentifier() {
@@ -46,6 +55,10 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
StringUtils.isNoneBlank(urlWithDatabase),
"Miss config ! Please check your config.");
JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ String compatibleMode = detectCompatibleMode(options);
+ if (isMySQL(compatibleMode)) {
+ return getMySqlCatalog(catalogName, options, urlInfo);
+ }
return new KingbaseCatalog(
catalogName,
options.get(JdbcCommonOptions.USERNAME),
@@ -59,4 +72,35 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
public OptionRule optionRule() {
return JdbcCommonOptions.BASE_CATALOG_RULE.build();
}
+
+ private String detectCompatibleMode(ReadonlyConfig config) {
+ JdbcConnectionConfig jdbcConnectionConfig = JdbcConnectionConfig.of(config);
+ SimpleJdbcConnectionProvider provider =
+ new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
+ try {
+ Connection connection = provider.getOrEstablishConnection();
+ if (connection instanceof com.kingbase8.jdbc.KbConnection) {
+ return ((com.kingbase8.jdbc.KbConnection) connection).getCompatibleLevel();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to detect compatible mode", e);
+ } finally {
+ provider.closeConnection();
+ }
+ return null;
+ }
+
+ private MySqlCatalog getMySqlCatalog(
+ String catalogName, ReadonlyConfig options, JdbcUrlUtil.UrlInfo urlInfo) {
+ return new MySqlCatalog(
+ catalogName,
+ options.get(JdbcCommonOptions.USERNAME),
+ options.get(JdbcCommonOptions.PASSWORD),
+ urlInfo,
+ options.get(JdbcCommonOptions.DRIVER));
+ }
+
+ private boolean isMySQL(String compatibleMode) {
+ return "mysql".equalsIgnoreCase(compatibleMode);
+ }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index 90dbe021fc62..0cedf9eb6b6b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -69,7 +69,8 @@ public JdbcInputFormat(JdbcSourceConfig config, Map tab
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getDialect(),
- config.getCompatibleMode());
+ config.getCompatibleMode(),
+ config.getJdbcConnectionConfig());
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index d943e22d8ada..baab21f1fa67 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -124,6 +124,7 @@ default String hashModForField(String fieldName, int mod) {
default String quoteIdentifier(String identifier) {
return identifier;
}
+
/** Quotes the identifier for database name or field name */
default String quoteDatabaseIdentifier(String identifier) {
return identifier;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
index a89fd8acfdf0..9b54f3d5c3e3 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
@@ -23,18 +23,28 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlJdbcRowConverter;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
public class KingbaseDialect implements JdbcDialect {
+ private final String compatibleLevel;
+ private final String fieldIde;
- public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
-
- public KingbaseDialect() {}
+ public KingbaseDialect() {
+ this(null, FieldIdeEnum.ORIGINAL.getValue());
+ }
public KingbaseDialect(String fieldIde) {
+ this(null, fieldIde);
+ }
+
+ public KingbaseDialect(String compatibleLevel, String fieldIde) {
+ this.compatibleLevel = compatibleLevel;
this.fieldIde = fieldIde;
}
@@ -45,17 +55,26 @@ public String dialectName() {
@Override
public JdbcRowConverter getRowConverter() {
+ if (isMySQL()) {
+ return new MysqlJdbcRowConverter();
+ }
return new KingbaseJdbcRowConverter();
}
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ if (isMySQL()) {
+ return new MySqlTypeMapper();
+ }
return new KingbaseTypeMapper();
}
@Override
public Optional getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] pkNames) {
+ if (isMySQL()) {
+ return new MysqlDialect().getUpsertStatement(database, tableName, fieldNames, pkNames);
+ }
String uniqueColumns =
Arrays.stream(pkNames).map(this::quoteIdentifier).collect(Collectors.joining(", "));
String updateClause =
@@ -107,4 +126,8 @@ public String quoteIdentifier(String identifier) {
public String quoteDatabaseIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
+
+ private boolean isMySQL() {
+ return "mysql".equalsIgnoreCase(this.compatibleLevel);
+ }
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
index cc6d2b5d388d..1c57184c4752 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
@@ -17,18 +17,27 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.auto.service.AutoService;
import javax.annotation.Nonnull;
+import java.sql.Connection;
+
/** Factory for {@link KingbaseDialect}. */
@AutoService(JdbcDialectFactory.class)
public class KingbaseDialectFactory implements JdbcDialectFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(KingbaseDialectFactory.class);
+
@Override
public String dialectFactoryName() {
return DatabaseIdentifier.KINGBASE;
@@ -46,6 +55,34 @@ public JdbcDialect create() {
@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
- return new KingbaseDialect(fieldIde);
+ return new KingbaseDialect(compatibleMode, fieldIde);
+ }
+
+ @Override
+ public JdbcDialect create(
+ String compatibleMode, String fieldIde, JdbcConnectionConfig jdbcConnectionConfig) {
+ String detectedCompatibleMode = compatibleMode;
+ if (detectedCompatibleMode == null && jdbcConnectionConfig != null) {
+ detectedCompatibleMode = detectCompatibleMode(jdbcConnectionConfig);
+ }
+ return new KingbaseDialect(detectedCompatibleMode, fieldIde);
+ }
+
+ private String detectCompatibleMode(JdbcConnectionConfig config) {
+ SimpleJdbcConnectionProvider provider = new SimpleJdbcConnectionProvider(config);
+ try {
+ Connection connection = provider.getOrEstablishConnection();
+ if (connection instanceof com.kingbase8.jdbc.KbConnection) {
+ String level = ((com.kingbase8.jdbc.KbConnection) connection).getCompatibleLevel();
+ return level;
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to detect KingbaseES compatible mode from connection, fallback to default.",
+ e);
+ } finally {
+ provider.closeConnection();
+ }
+ return null;
}
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 5a7d1e756dc8..58411190f004 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -187,7 +187,8 @@ public TableSink createSink(TableSinkFactoryContext context) {
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
sinkConfig.getJdbcConnectionConfig().getDialect(),
- fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
+ fieldIdeEnum == null ? null : fieldIdeEnum.getValue(),
+ sinkConfig.getJdbcConnectionConfig());
dialect.connectionUrlParse(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getProperties(),
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index ba07f04108d5..830f7fe1bba4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -74,7 +74,8 @@ public static Map getTables(
JdbcDialectLoader.load(
jdbcConnectionConfig.getUrl(),
jdbcConnectionConfig.getDialect(),
- jdbcConnectionConfig.getCompatibleMode());
+ jdbcConnectionConfig.getCompatibleMode(),
+ jdbcConnectionConfig);
Optional catalog = findCatalog(jdbcConnectionConfig, jdbcDialect);
if (catalog.isPresent()) {
try (AbstractJdbcCatalog jdbcCatalog = (AbstractJdbcCatalog) catalog.get()) {
@@ -449,6 +450,7 @@ private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig config)
catalogConfig.put(JdbcCommonOptions.INT_TYPE_NARROWING.key(), config.isIntTypeNarrowing());
catalogConfig.put(
JdbcCommonOptions.HANDLE_BLOB_AS_STRING.key(), config.isHandleBlobAsString());
+ catalogConfig.put(JdbcCommonOptions.DRIVER.key(), config.getDriverName());
return ReadonlyConfig.fromMap(catalogConfig);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactoryTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactoryTest.java
new file mode 100644
index 000000000000..d65eac81910b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KingbaseDialectFactoryTest {
+
+ private final KingbaseDialectFactory factory = new KingbaseDialectFactory();
+
+ @Test
+ public void testDialectFactoryName() {
+ Assertions.assertEquals("KingBase", factory.dialectFactoryName());
+ }
+
+ @Test
+ public void testAcceptsURL() {
+ Assertions.assertTrue(factory.acceptsURL("jdbc:kingbase8://localhost:54321/test"));
+ Assertions.assertFalse(factory.acceptsURL("jdbc:mysql://localhost:3306/test"));
+ }
+
+ @Test
+ public void testCreateNoArgs() {
+ JdbcDialect dialect = factory.create();
+ Assertions.assertInstanceOf(KingbaseDialect.class, dialect);
+ Assertions.assertEquals(
+ "KingbaseJdbcRowConverter", dialect.getRowConverter().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testCreateWithCompatibleModeAndFieldIde() {
+ JdbcDialect dialect = factory.create("mysql", "UPPERCASE");
+ Assertions.assertInstanceOf(KingbaseDialect.class, dialect);
+ Assertions.assertInstanceOf(
+ org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql
+ .MysqlJdbcRowConverter.class,
+ dialect.getRowConverter());
+ }
+
+ @Test
+ public void testCreateWithConfigAndExplicitCompatibleMode() {
+ JdbcConnectionConfig config =
+ JdbcConnectionConfig.builder()
+ .url("jdbc:kingbase8://localhost:54321/test")
+ .driverName("com.kingbase8.Driver")
+ .compatibleMode("mysql")
+ .build();
+
+ JdbcDialect dialect = factory.create("mysql", "ORIGINAL", config);
+ Assertions.assertInstanceOf(KingbaseDialect.class, dialect);
+ Assertions.assertInstanceOf(
+ org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql
+ .MysqlJdbcRowConverter.class,
+ dialect.getRowConverter());
+ }
+
+ @Test
+ public void testCreateWithNullCompatibleModeAndNullConfig() {
+ // When both compatibleMode and config are null, should fallback to default Kingbase
+ JdbcDialect dialect = factory.create(null, "ORIGINAL", null);
+ Assertions.assertInstanceOf(KingbaseDialect.class, dialect);
+ Assertions.assertEquals(
+ "KingbaseJdbcRowConverter", dialect.getRowConverter().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testCreatePreservesExplicitModeOverConfig() {
+ // Even if config suggests something else, explicit compatibleMode should win
+ JdbcConnectionConfig config =
+ JdbcConnectionConfig.builder()
+ .url("jdbc:kingbase8://localhost:54321/test")
+ .driverName("com.kingbase8.Driver")
+ .compatibleMode("oracle")
+ .build();
+
+ JdbcDialect dialect = factory.create("mysql", "ORIGINAL", config);
+ Assertions.assertInstanceOf(
+ org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql
+ .MysqlJdbcRowConverter.class,
+ dialect.getRowConverter());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectTest.java
new file mode 100644
index 000000000000..46e69a526b23
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlJdbcRowConverter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class KingbaseDialectTest {
+
+ @Test
+ public void testDialectName() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ Assertions.assertEquals(DatabaseIdentifier.KINGBASE, dialect.dialectName());
+ }
+
+ @Test
+ public void testDefaultConstructor() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ Assertions.assertNotNull(dialect.getRowConverter());
+ Assertions.assertEquals(
+ "KingbaseJdbcRowConverter", dialect.getRowConverter().getClass().getSimpleName());
+ Assertions.assertEquals(
+ "KingbaseTypeMapper",
+ dialect.getJdbcDialectTypeMapper().getClass().getSimpleName());
+ }
+
+ @Test
+ public void testFieldIdeConstructor() {
+ KingbaseDialect dialect = new KingbaseDialect(FieldIdeEnum.UPPERCASE.getValue());
+ Assertions.assertEquals("\"TABLE_NAME\"", dialect.quoteIdentifier("table_name"));
+ }
+
+ @Test
+ public void testMySQLCompatibleMode() {
+ KingbaseDialect dialect = new KingbaseDialect("mysql", FieldIdeEnum.ORIGINAL.getValue());
+ Assertions.assertInstanceOf(MysqlJdbcRowConverter.class, dialect.getRowConverter());
+ Assertions.assertInstanceOf(MySqlTypeMapper.class, dialect.getJdbcDialectTypeMapper());
+ }
+
+ @Test
+ public void testMySQLCompatibleModeCaseInsensitive() {
+ KingbaseDialect dialect = new KingbaseDialect("MySQL", FieldIdeEnum.ORIGINAL.getValue());
+ Assertions.assertInstanceOf(MysqlJdbcRowConverter.class, dialect.getRowConverter());
+ Assertions.assertInstanceOf(MySqlTypeMapper.class, dialect.getJdbcDialectTypeMapper());
+ }
+
+ @Test
+ public void testNullCompatibleModeDefaultsToKingbase() {
+ KingbaseDialect dialect = new KingbaseDialect(null, FieldIdeEnum.ORIGINAL.getValue());
+ Assertions.assertInstanceOf(KingbaseJdbcRowConverter.class, dialect.getRowConverter());
+ Assertions.assertInstanceOf(KingbaseTypeMapper.class, dialect.getJdbcDialectTypeMapper());
+ }
+
+ @Test
+ public void testMySQLCompatibleUpsertStatement() {
+ KingbaseDialect dialect = new KingbaseDialect("mysql", FieldIdeEnum.ORIGINAL.getValue());
+ String[] fieldNames = {"id", "name"};
+ String[] pkNames = {"id"};
+
+ Optional upsert = dialect.getUpsertStatement("db", "table", fieldNames, pkNames);
+ Assertions.assertTrue(upsert.isPresent());
+ String sql = upsert.get();
+ // MySQL upsert uses ON DUPLICATE KEY UPDATE
+ Assertions.assertTrue(sql.contains("ON DUPLICATE KEY UPDATE"));
+ }
+
+ @Test
+ public void testKingbaseUpsertStatement() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ String[] fieldNames = {"id", "name"};
+ String[] pkNames = {"id"};
+
+ Optional upsert = dialect.getUpsertStatement("db", "table", fieldNames, pkNames);
+ Assertions.assertTrue(upsert.isPresent());
+ String sql = upsert.get();
+ // Kingbase upsert uses ON CONFLICT ... DO UPDATE SET
+ Assertions.assertTrue(sql.contains("ON CONFLICT"));
+ Assertions.assertTrue(sql.contains("DO UPDATE SET"));
+ Assertions.assertTrue(sql.contains("EXCLUDED"));
+ }
+
+ @Test
+ public void testQuoteIdentifier() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ Assertions.assertEquals("\"table_name\"", dialect.quoteIdentifier("table_name"));
+ Assertions.assertEquals("\"schema\".\"table\"", dialect.quoteIdentifier("schema.table"));
+ }
+
+ @Test
+ public void testQuoteIdentifierWithFieldIde() {
+ KingbaseDialect dialect = new KingbaseDialect("mysql", FieldIdeEnum.UPPERCASE.getValue());
+ Assertions.assertEquals("\"COLUMN_NAME\"", dialect.quoteIdentifier("column_name"));
+ }
+
+ @Test
+ public void testQuoteDatabaseIdentifier() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ Assertions.assertEquals("\"mydb\"", dialect.quoteDatabaseIdentifier("mydb"));
+ }
+
+ @Test
+ public void testTableIdentifier() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ Assertions.assertEquals("\"db\".\"table\"", dialect.tableIdentifier("db", "table"));
+ }
+
+ @Test
+ public void testParseTablePath() {
+ KingbaseDialect dialect = new KingbaseDialect();
+ TablePath path = dialect.parse("database.schema.table");
+ Assertions.assertEquals("database", path.getDatabaseName());
+ Assertions.assertEquals("schema", path.getSchemaName());
+ Assertions.assertEquals("table", path.getTableName());
+ }
+
+ @Test
+ public void testImmutability() {
+ KingbaseDialect dialect1 = new KingbaseDialect("mysql", FieldIdeEnum.UPPERCASE.getValue());
+ // Row converter should be fixed after construction
+ Assertions.assertInstanceOf(MysqlJdbcRowConverter.class, dialect1.getRowConverter());
+
+ // Even if we create another dialect with different config, first one is unchanged
+ KingbaseDialect dialect2 = new KingbaseDialect("oracle", FieldIdeEnum.LOWERCASE.getValue());
+ Assertions.assertInstanceOf(MysqlJdbcRowConverter.class, dialect1.getRowConverter());
+ Assertions.assertInstanceOf(KingbaseJdbcRowConverter.class, dialect2.getRowConverter());
+ }
+}