Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/en/connectors/sink/Kingbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ import ChangeLog from '../changelog/connector-jdbc.md';

### Tips

> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed
> 1. If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed
> in parallel according to the concurrency of tasks.
> 2. When Kingbase starts in MySQL compatibility mode, existing Kingbase connector configurations can be reused directly. The system automatically connects to the database and dynamically obtains its compatibility mode (compatibleMode) when building KingbaseDialect and KingbaseCatalog.

## Task Example

Expand Down Expand Up @@ -169,4 +170,4 @@ sink {

## Changelog

<ChangeLog />
<ChangeLog />
5 changes: 3 additions & 2 deletions docs/en/connectors/source/Kingbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ Read external data source data through JDBC.

### Tips

> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
> 1. If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
> 2. When Kingbase starts in MySQL compatibility mode, existing Kingbase connector configurations can be reused directly. The system automatically connects to the database and dynamically obtains its compatibility mode (compatibleMode) when building KingbaseDialect and KingbaseCatalog.

## Task Example

Expand Down Expand Up @@ -159,4 +160,4 @@ source {

## Changelog

<ChangeLog />
<ChangeLog />
3 changes: 2 additions & 1 deletion docs/zh/connectors/sink/Kingbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ import ChangeLog from '../changelog/connector-jdbc.md';

### 提示

> 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发性并行执行。
> 1. 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发性并行执行。
> 2. 当 Kingbase 以 MySQL 兼容模式启动时,可直接复用现有的 Kingbase 连接器配置。系统在构建 KingbaseDialect 和 KingbaseCatalog 时,会自动连接数据库并动态获取其兼容模式(compatibleMode)。

## 任务示例

Expand Down
3 changes: 2 additions & 1 deletion docs/zh/connectors/source/Kingbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ import ChangeLog from '../changelog/connector-jdbc.md';

### 提示

> 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发度并行执行。
> 1. 如果未设置 partition_column,它将以单并发运行,如果设置了 partition_column,它将根据任务的并发度并行执行。
> 2. 当 Kingbase 以 MySQL 兼容模式启动时,可直接复用现有的 Kingbase 连接器配置。系统在构建 KingbaseDialect 和 KingbaseCatalog 时,会自动连接数据库并动态获取其兼容模式(compatibleMode)。

## 任务示例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,22 @@
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.auto.service.AutoService;

import java.sql.Connection;

@AutoService(Factory.class)
public class KingbaseCatalogFactory implements CatalogFactory {
private static final Logger LOG = LoggerFactory.getLogger(KingbaseCatalogFactory.class);

@Override
public String factoryIdentifier() {
Expand All @@ -46,6 +55,10 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
StringUtils.isNoneBlank(urlWithDatabase),
"Miss config <base-url>! Please check your config.");
JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
String compatibleMode = detectCompatibleMode(options);
if (isMySQL(compatibleMode)) {
return getMySqlCatalog(catalogName, options, urlInfo);
}
return new KingbaseCatalog(
catalogName,
options.get(JdbcCommonOptions.USERNAME),
Expand All @@ -59,4 +72,35 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
public OptionRule optionRule() {
return JdbcCommonOptions.BASE_CATALOG_RULE.build();
}

private String detectCompatibleMode(ReadonlyConfig config) {
JdbcConnectionConfig jdbcConnectionConfig = JdbcConnectionConfig.of(config);
SimpleJdbcConnectionProvider provider =
new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
try {
Connection connection = provider.getOrEstablishConnection();
if (connection instanceof com.kingbase8.jdbc.KbConnection) {
return ((com.kingbase8.jdbc.KbConnection) connection).getCompatibleLevel();
}
} catch (Exception e) {
LOG.error("Failed to detect compatible mode", e);
} finally {
provider.closeConnection();
}
return null;
}

private MySqlCatalog getMySqlCatalog(
String catalogName, ReadonlyConfig options, JdbcUrlUtil.UrlInfo urlInfo) {
return new MySqlCatalog(
catalogName,
options.get(JdbcCommonOptions.USERNAME),
options.get(JdbcCommonOptions.PASSWORD),
urlInfo,
options.get(JdbcCommonOptions.DRIVER));
}

private boolean isMySQL(String compatibleMode) {
return "mysql".equalsIgnoreCase(compatibleMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, CatalogTable> tab
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getDialect(),
config.getCompatibleMode());
config.getCompatibleMode(),
config.getJdbcConnectionConfig());
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ default String hashModForField(String fieldName, int mod) {
default String quoteIdentifier(String identifier) {
return identifier;
}

/** Quotes the identifier for database name or field name */
default String quoteDatabaseIdentifier(String identifier) {
return identifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,28 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlJdbcRowConverter;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

public class KingbaseDialect implements JdbcDialect {
private final String compatibleLevel;
private final String fieldIde;

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public KingbaseDialect() {}
public KingbaseDialect() {
this(null, FieldIdeEnum.ORIGINAL.getValue());
}

public KingbaseDialect(String fieldIde) {
this(null, fieldIde);
}

public KingbaseDialect(String compatibleLevel, String fieldIde) {
this.compatibleLevel = compatibleLevel;
this.fieldIde = fieldIde;
}

Expand All @@ -45,17 +55,26 @@ public String dialectName() {

@Override
public JdbcRowConverter getRowConverter() {
if (isMySQL()) {
return new MysqlJdbcRowConverter();
}
return new KingbaseJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
if (isMySQL()) {
return new MySqlTypeMapper();
}
return new KingbaseTypeMapper();
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] pkNames) {
if (isMySQL()) {
return new MysqlDialect().getUpsertStatement(database, tableName, fieldNames, pkNames);
}
String uniqueColumns =
Arrays.stream(pkNames).map(this::quoteIdentifier).collect(Collectors.joining(", "));
String updateClause =
Expand Down Expand Up @@ -107,4 +126,8 @@ public String quoteIdentifier(String identifier) {
public String quoteDatabaseIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

private boolean isMySQL() {
return "mysql".equalsIgnoreCase(this.compatibleLevel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;

import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

import java.sql.Connection;

/** Factory for {@link KingbaseDialect}. */
@AutoService(JdbcDialectFactory.class)
public class KingbaseDialectFactory implements JdbcDialectFactory {

private static final Logger LOG = LoggerFactory.getLogger(KingbaseDialectFactory.class);

@Override
public String dialectFactoryName() {
return DatabaseIdentifier.KINGBASE;
Expand All @@ -46,6 +55,34 @@ public JdbcDialect create() {

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new KingbaseDialect(fieldIde);
return new KingbaseDialect(compatibleMode, fieldIde);
}

@Override
public JdbcDialect create(
String compatibleMode, String fieldIde, JdbcConnectionConfig jdbcConnectionConfig) {
String detectedCompatibleMode = compatibleMode;
if (detectedCompatibleMode == null && jdbcConnectionConfig != null) {
detectedCompatibleMode = detectCompatibleMode(jdbcConnectionConfig);
}
return new KingbaseDialect(detectedCompatibleMode, fieldIde);
}

private String detectCompatibleMode(JdbcConnectionConfig config) {
SimpleJdbcConnectionProvider provider = new SimpleJdbcConnectionProvider(config);
try {
Connection connection = provider.getOrEstablishConnection();
if (connection instanceof com.kingbase8.jdbc.KbConnection) {
String level = ((com.kingbase8.jdbc.KbConnection) connection).getCompatibleLevel();
return level;
}
} catch (Exception e) {
LOG.warn(
"Failed to detect KingbaseES compatible mode from connection, fallback to default.",
e);
} finally {
provider.closeConnection();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public TableSink createSink(TableSinkFactoryContext context) {
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
sinkConfig.getJdbcConnectionConfig().getDialect(),
fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
fieldIdeEnum == null ? null : fieldIdeEnum.getValue(),
sinkConfig.getJdbcConnectionConfig());
dialect.connectionUrlParse(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getProperties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static Map<TablePath, JdbcSourceTable> getTables(
JdbcDialectLoader.load(
jdbcConnectionConfig.getUrl(),
jdbcConnectionConfig.getDialect(),
jdbcConnectionConfig.getCompatibleMode());
jdbcConnectionConfig.getCompatibleMode(),
jdbcConnectionConfig);
Optional<Catalog> catalog = findCatalog(jdbcConnectionConfig, jdbcDialect);
if (catalog.isPresent()) {
try (AbstractJdbcCatalog jdbcCatalog = (AbstractJdbcCatalog) catalog.get()) {
Expand Down Expand Up @@ -449,6 +450,7 @@ private static ReadonlyConfig extractCatalogConfig(JdbcConnectionConfig config)
catalogConfig.put(JdbcCommonOptions.INT_TYPE_NARROWING.key(), config.isIntTypeNarrowing());
catalogConfig.put(
JdbcCommonOptions.HANDLE_BLOB_AS_STRING.key(), config.isHandleBlobAsString());
catalogConfig.put(JdbcCommonOptions.DRIVER.key(), config.getDriverName());
return ReadonlyConfig.fromMap(catalogConfig);
}

Expand Down
Loading
Loading