Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions docs/en/connectors/sink/AzureDataExplorer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Azure Data Explorer

> Azure Data Explorer (ADX) sink connector

## Support Those Engines

> SeaTunnel Zeta

## Description

The Azure Data Explorer sink connector ingests `SeaTunnelRow` data into ADX using the Kusto ingestion service. It supports queued ingestion for throughput and streaming ingestion for low latency.

## Key features

- [x] [batch](../../introduction/concepts/connector-v2-features.md)
- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)

## Sink Options

| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. |
| database | String | Yes | - | Target database name. |
| table | String | Yes | - | Target table name. |
| client_id | String | Yes | - | Azure AD application (client) ID. |
| client_secret | String | Yes | - | Azure AD application secret. |
| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. |
| ingestion_mapping_reference | String | No | "" | Pre-created ingestion mapping name on the ADX table. |
| ingestion_type | Enum | No | QUEUED | `QUEUED` (default) or `STREAMING`. |
| batch_size | Integer | No | 1000 | Rows to buffer before flushing. |
| flush_interval_ms | Long | No | 30000 | Max milliseconds between flushes regardless of batch size. |

## Task Example

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields = [
{ name = "id", type = "INT" },
{ name = "name", type = "STRING" },
{ name = "ts", type = "TIMESTAMP" }
]
}
rows = [
{ fields = [1, "a", "2024-01-01T00:00:00"] }
]
}
}

sink {
AzureDataExplorer {
cluster_uri = "https://mycluster.eastus.kusto.windows.net"
database = "mydb"
table = "mytable"
client_id = "${ADX_CLIENT_ID}"
client_secret = "${ADX_CLIENT_SECRET}"
tenant_id = "${ADX_TENANT_ID}"
ingestion_type = "QUEUED"
batch_size = 1000
flush_interval_ms = 30000
}
}
```
64 changes: 64 additions & 0 deletions docs/en/connectors/source/AzureDataExplorer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Azure Data Explorer

> Azure Data Explorer (ADX) source connector

## Support Those Engines

> SeaTunnel Zeta

## Description

The Azure Data Explorer source connector executes a Kusto Query Language (KQL) statement against an ADX cluster and emits the query results as `SeaTunnelRow` records.

## Key features

- [x] [batch](../../introduction/concepts/connector-v2-features.md)
- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)

## Source Options

| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. |
| database | String | Yes | - | Target database name. |
| query | String | Yes | - | Kusto query (KQL) to execute. |
| client_id | String | Yes | - | Azure AD application (client) ID. |
| client_secret | String | Yes | - | Azure AD application secret. |
| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. |
| schema | Config | No | - | Optional SeaTunnel schema. See [Source Common Options](../common-options/source-common-options.md). |

## Task Example

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}

source {
AzureDataExplorer {
cluster_uri = "https://mycluster.eastus.kusto.windows.net"
database = "mydb"
query = "MyTable | take 1000"
client_id = "${ADX_CLIENT_ID}"
client_secret = "${ADX_CLIENT_SECRET}"
tenant_id = "${ADX_TENANT_ID}"

schema = {
fields = [
{ name = "id", type = "INT" },
{ name = "name", type = "STRING" },
{ name = "ts", type = "TIMESTAMP" }
]
}
}
}

sink {
Console {
}
}
```
73 changes: 73 additions & 0 deletions docs/zh/connectors/sink/AzureDataExplorer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Azure Data Explorer

> Azure Data Explorer (ADX) 写入连接器

## 支持的引擎

> SeaTunnel Zeta

## 描述

Azure Data Explorer 写入连接器通过 Kusto Ingestion 服务将 `SeaTunnelRow` 数据写入 ADX,支持队列写入与流式写入两种方式。

## 关键特性

- [x] [batch](../../introduction/concepts/connector-v2-features.md)
- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)

## 写入选项

| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 |
| database | String | 是 | - | 目标数据库名称。 |
| table | String | 是 | - | 目标表名。 |
| client_id | String | 是 | - | Azure AD 应用 (client) ID。 |
| client_secret | String | 是 | - | Azure AD 应用密钥。 |
| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 |
| ingestion_mapping_reference | String | 否 | "" | 已存在的 ingestion mapping 名称。 |
| ingestion_type | Enum | 否 | QUEUED | `QUEUED`(默认)或 `STREAMING`。 |
| batch_size | Integer | 否 | 1000 | 缓冲后批量写入的行数。 |
| flush_interval_ms | Long | 否 | 30000 | 无论批量大小,最多等待的毫秒数。 |

## 示例

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields = [
{ name = "id", type = "INT" },
{ name = "name", type = "STRING" },
{ name = "ts", type = "TIMESTAMP" }
]
}
rows = [
{ fields = [1, "a", "2024-01-01T00:00:00"] }
]
}
}

sink {
AzureDataExplorer {
cluster_uri = "https://mycluster.eastus.kusto.windows.net"
database = "mydb"
table = "mytable"
client_id = "${ADX_CLIENT_ID}"
client_secret = "${ADX_CLIENT_SECRET}"
tenant_id = "${ADX_TENANT_ID}"
ingestion_type = "QUEUED"
batch_size = 1000
flush_interval_ms = 30000
}
}
```
64 changes: 64 additions & 0 deletions docs/zh/connectors/source/AzureDataExplorer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Azure Data Explorer

> Azure Data Explorer (ADX) 源连接器

## 支持的引擎

> SeaTunnel Zeta

## 描述

Azure Data Explorer 源连接器用于执行 Kusto Query Language (KQL) 查询,并将结果以 `SeaTunnelRow` 输出。

## 关键特性

- [x] [batch](../../introduction/concepts/connector-v2-features.md)
- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)

## 源选项

| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 |
| database | String | 是 | - | 目标数据库名称。 |
| query | String | 是 | - | 要执行的 KQL 查询。 |
| client_id | String | 是 | - | Azure AD 应用 (client) ID。 |
| client_secret | String | 是 | - | Azure AD 应用密钥。 |
| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 |
| schema | Config | 否 | - | 可选的 SeaTunnel schema。参考 [Source Common Options](../common-options/source-common-options.md)。 |

## 示例

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}

source {
AzureDataExplorer {
cluster_uri = "https://mycluster.eastus.kusto.windows.net"
database = "mydb"
query = "MyTable | take 1000"
client_id = "${ADX_CLIENT_ID}"
client_secret = "${ADX_CLIENT_SECRET}"
tenant_id = "${ADX_TENANT_ID}"

schema = {
fields = [
{ name = "id", type = "INT" },
{ name = "name", type = "STRING" },
{ name = "ts", type = "TIMESTAMP" }
]
}
}
}

sink {
Console {
}
}
```
50 changes: 50 additions & 0 deletions seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>connector-azuredataexplorer</artifactId>

<dependencies>
<!-- ADD THIS: direct dependency on seatunnel-api -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>6.0.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-ingest</artifactId>
<version>6.0.3</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.seatunnel.azuredataexplorer.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.Builder;
import lombok.Getter;

import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.BATCH_SIZE;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_ID;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_SECRET;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_TYPE;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TABLE;
import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TENANT_ID;

/** Immutable config value object used by the sink. */
@Getter
@Builder
public class AzureDataExplorerConfig {

private final String clusterUri;
private final String database;
private final String clientId;
private final String clientSecret;
private final String tenantId;

private final String table;
private final String ingestionMappingReference;
private final AzureDataExplorerSinkOptions.IngestionType ingestionType;
private final int batchSize;
private final long flushIntervalMs;

public String getQueuedIngestUri() {
if (clusterUri.startsWith("https://")) {
return "https://ingest-" + clusterUri.substring("https://".length());
}
return clusterUri;
}

public static AzureDataExplorerConfig fromSinkConfig(ReadonlyConfig cfg) {
return AzureDataExplorerConfig.builder()
.clusterUri(cfg.get(AzureDataExplorerSinkOptions.CLUSTER_URI))
.database(cfg.get(AzureDataExplorerSinkOptions.DATABASE))
.table(cfg.get(TABLE))
.clientId(cfg.get(CLIENT_ID))
.clientSecret(cfg.get(CLIENT_SECRET))
.tenantId(cfg.get(TENANT_ID))
.ingestionMappingReference(cfg.get(INGESTION_MAPPING_REFERENCE))
.ingestionType(cfg.get(INGESTION_TYPE))
.batchSize(cfg.get(BATCH_SIZE))
.flushIntervalMs(cfg.get(FLUSH_INTERVAL_MS))
.build();
}
}
Loading
Loading