diff --git a/docs/en/connectors/sink/AzureDataExplorer.md b/docs/en/connectors/sink/AzureDataExplorer.md
new file mode 100644
index 000000000000..7371efea896d
--- /dev/null
+++ b/docs/en/connectors/sink/AzureDataExplorer.md
@@ -0,0 +1,73 @@
+# Azure Data Explorer
+
+> Azure Data Explorer (ADX) sink connector
+
+## Support Those Engines
+
+> SeaTunnel Zeta
+
+## Description
+
+The Azure Data Explorer sink connector ingests `SeaTunnelRow` data into ADX using the Kusto ingestion service. It supports queued ingestion for throughput and streaming ingestion for low latency.
+
+## Key features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+
+## Sink Options
+
+| Name | Type | Required | Default | Description |
+|---|---|---|---|---|
+| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. |
+| database | String | Yes | - | Target database name. |
+| table | String | Yes | - | Target table name. |
+| client_id | String | Yes | - | Azure AD application (client) ID. |
+| client_secret | String | Yes | - | Azure AD application secret. |
+| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. |
+| ingestion_mapping_reference | String | No | "" | Pre-created ingestion mapping name on the ADX table. |
+| ingestion_type | Enum | No | QUEUED | `QUEUED` (default) or `STREAMING`. |
+| batch_size | Integer | No | 1000 | Rows to buffer before flushing. |
+| flush_interval_ms | Long | No | 30000 | Max milliseconds between flushes regardless of batch size. |
+
+## Task Example
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields = [
+ { name = "id", type = "INT" },
+ { name = "name", type = "STRING" },
+ { name = "ts", type = "TIMESTAMP" }
+ ]
+ }
+ rows = [
+ { fields = [1, "a", "2024-01-01T00:00:00"] }
+ ]
+ }
+}
+
+sink {
+ AzureDataExplorer {
+ cluster_uri = "https://mycluster.eastus.kusto.windows.net"
+ database = "mydb"
+ table = "mytable"
+ client_id = "${ADX_CLIENT_ID}"
+ client_secret = "${ADX_CLIENT_SECRET}"
+ tenant_id = "${ADX_TENANT_ID}"
+ ingestion_type = "QUEUED"
+ batch_size = 1000
+ flush_interval_ms = 30000
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/en/connectors/source/AzureDataExplorer.md b/docs/en/connectors/source/AzureDataExplorer.md
new file mode 100644
index 000000000000..45b305d8c585
--- /dev/null
+++ b/docs/en/connectors/source/AzureDataExplorer.md
@@ -0,0 +1,64 @@
+# Azure Data Explorer
+
+> Azure Data Explorer (ADX) source connector
+
+## Support Those Engines
+
+> SeaTunnel Zeta
+
+## Description
+
+The Azure Data Explorer source connector executes a Kusto Query Language (KQL) statement against an ADX cluster and emits the query results as `SeaTunnelRow` records.
+
+## Key features
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+
+## Source Options
+
+| Name | Type | Required | Default | Description |
+|---|---|---|---|---|
+| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. |
+| database | String | Yes | - | Target database name. |
+| query | String | Yes | - | Kusto query (KQL) to execute. |
+| client_id | String | Yes | - | Azure AD application (client) ID. |
+| client_secret | String | Yes | - | Azure AD application secret. |
+| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. |
+| schema | Config | No | - | Optional SeaTunnel schema. See [Source Common Options](../common-options/source-common-options.md). |
+
+## Task Example
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ AzureDataExplorer {
+ cluster_uri = "https://mycluster.eastus.kusto.windows.net"
+ database = "mydb"
+ query = "MyTable | take 1000"
+ client_id = "${ADX_CLIENT_ID}"
+ client_secret = "${ADX_CLIENT_SECRET}"
+ tenant_id = "${ADX_TENANT_ID}"
+
+ schema = {
+ fields = [
+ { name = "id", type = "INT" },
+ { name = "name", type = "STRING" },
+ { name = "ts", type = "TIMESTAMP" }
+ ]
+ }
+ }
+}
+
+sink {
+ Console {
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/zh/connectors/sink/AzureDataExplorer.md b/docs/zh/connectors/sink/AzureDataExplorer.md
new file mode 100644
index 000000000000..c7c79f0c6732
--- /dev/null
+++ b/docs/zh/connectors/sink/AzureDataExplorer.md
@@ -0,0 +1,73 @@
+# Azure Data Explorer
+
+> Azure Data Explorer (ADX) 写入连接器
+
+## 支持的引擎
+
+> SeaTunnel Zeta
+
+## 描述
+
+Azure Data Explorer 写入连接器通过 Kusto Ingestion 服务将 `SeaTunnelRow` 数据写入 ADX,支持队列写入与流式写入两种方式。
+
+## 关键特性
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+
+## 写入选项
+
+| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
+|---|---|---|---|---|
+| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 |
+| database | String | 是 | - | 目标数据库名称。 |
+| table | String | 是 | - | 目标表名。 |
+| client_id | String | 是 | - | Azure AD 应用 (client) ID。 |
+| client_secret | String | 是 | - | Azure AD 应用密钥。 |
+| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 |
+| ingestion_mapping_reference | String | 否 | "" | 已存在的 ingestion mapping 名称。 |
+| ingestion_type | Enum | 否 | QUEUED | `QUEUED`(默认)或 `STREAMING`。 |
+| batch_size | Integer | 否 | 1000 | 缓冲后批量写入的行数。 |
+| flush_interval_ms | Long | 否 | 30000 | 无论批量大小,最多等待的毫秒数。 |
+
+## 示例
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields = [
+ { name = "id", type = "INT" },
+ { name = "name", type = "STRING" },
+ { name = "ts", type = "TIMESTAMP" }
+ ]
+ }
+ rows = [
+ { fields = [1, "a", "2024-01-01T00:00:00"] }
+ ]
+ }
+}
+
+sink {
+ AzureDataExplorer {
+ cluster_uri = "https://mycluster.eastus.kusto.windows.net"
+ database = "mydb"
+ table = "mytable"
+ client_id = "${ADX_CLIENT_ID}"
+ client_secret = "${ADX_CLIENT_SECRET}"
+ tenant_id = "${ADX_TENANT_ID}"
+ ingestion_type = "QUEUED"
+ batch_size = 1000
+ flush_interval_ms = 30000
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/zh/connectors/source/AzureDataExplorer.md b/docs/zh/connectors/source/AzureDataExplorer.md
new file mode 100644
index 000000000000..40261bdb8fdf
--- /dev/null
+++ b/docs/zh/connectors/source/AzureDataExplorer.md
@@ -0,0 +1,64 @@
+# Azure Data Explorer
+
+> Azure Data Explorer (ADX) 源连接器
+
+## 支持的引擎
+
+> SeaTunnel Zeta
+
+## 描述
+
+Azure Data Explorer 源连接器用于执行 Kusto Query Language (KQL) 查询,并将结果以 `SeaTunnelRow` 输出。
+
+## 关键特性
+
+- [x] [batch](../../introduction/concepts/connector-v2-features.md)
+- [ ] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+
+## 源选项
+
+| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
+|---|---|---|---|---|
+| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 |
+| database | String | 是 | - | 目标数据库名称。 |
+| query | String | 是 | - | 要执行的 KQL 查询。 |
+| client_id | String | 是 | - | Azure AD 应用 (client) ID。 |
+| client_secret | String | 是 | - | Azure AD 应用密钥。 |
+| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 |
+| schema | Config | 否 | - | 可选的 SeaTunnel schema。参考 [Source Common Options](../common-options/source-common-options.md)。 |
+
+## 示例
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ AzureDataExplorer {
+ cluster_uri = "https://mycluster.eastus.kusto.windows.net"
+ database = "mydb"
+ query = "MyTable | take 1000"
+ client_id = "${ADX_CLIENT_ID}"
+ client_secret = "${ADX_CLIENT_SECRET}"
+ tenant_id = "${ADX_TENANT_ID}"
+
+ schema = {
+ fields = [
+ { name = "id", type = "INT" },
+ { name = "name", type = "STRING" },
+ { name = "ts", type = "TIMESTAMP" }
+ ]
+ }
+ }
+}
+
+sink {
+ Console {
+ }
+}
+```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml
new file mode 100644
index 000000000000..c4eeb19f2fbc
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml
@@ -0,0 +1,50 @@
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ 3.0.0-SNAPSHOT
+
+
+ connector-azuredataexplorer
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+ provided
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+ com.microsoft.azure.kusto
+ kusto-data
+ 6.0.3
+
+
+ com.microsoft.azure.kusto
+ kusto-ingest
+ 6.0.3
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java
new file mode 100644
index 000000000000..8edaa2387fa7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java
@@ -0,0 +1,55 @@
+package org.apache.seatunnel.azuredataexplorer.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.BATCH_SIZE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_ID;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_SECRET;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_TYPE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TABLE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TENANT_ID;
+
+/** Immutable config value object used by the sink. */
+@Getter
+@Builder
+public class AzureDataExplorerConfig {
+
+ private final String clusterUri;
+ private final String database;
+ private final String clientId;
+ private final String clientSecret;
+ private final String tenantId;
+
+ private final String table;
+ private final String ingestionMappingReference;
+ private final AzureDataExplorerSinkOptions.IngestionType ingestionType;
+ private final int batchSize;
+ private final long flushIntervalMs;
+
+ public String getQueuedIngestUri() {
+ if (clusterUri.startsWith("https://")) {
+ return "https://ingest-" + clusterUri.substring("https://".length());
+ }
+ return clusterUri;
+ }
+
+ public static AzureDataExplorerConfig fromSinkConfig(ReadonlyConfig cfg) {
+ return AzureDataExplorerConfig.builder()
+ .clusterUri(cfg.get(AzureDataExplorerSinkOptions.CLUSTER_URI))
+ .database(cfg.get(AzureDataExplorerSinkOptions.DATABASE))
+ .table(cfg.get(TABLE))
+ .clientId(cfg.get(CLIENT_ID))
+ .clientSecret(cfg.get(CLIENT_SECRET))
+ .tenantId(cfg.get(TENANT_ID))
+ .ingestionMappingReference(cfg.get(INGESTION_MAPPING_REFERENCE))
+ .ingestionType(cfg.get(INGESTION_TYPE))
+ .batchSize(cfg.get(BATCH_SIZE))
+ .flushIntervalMs(cfg.get(FLUSH_INTERVAL_MS))
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java
new file mode 100644
index 000000000000..4826ad0270c4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java
@@ -0,0 +1,76 @@
+package org.apache.seatunnel.azuredataexplorer.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AzureDataExplorerSinkOptions {
+
+ public static final Option CLUSTER_URI =
+ Options.key("cluster_uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "ADX cluster URI, e.g. https://mycluster.eastus.kusto.windows.net");
+
+ public static final Option DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target database name.");
+
+ public static final Option TABLE =
+ Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target table name.");
+
+ public static final Option CLIENT_ID =
+ Options.key("client_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD application (client) ID.");
+
+ public static final Option CLIENT_SECRET =
+ Options.key("client_secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD application secret.");
+
+ public static final Option TENANT_ID =
+ Options.key("tenant_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD tenant (directory) ID.");
+
+ public static final Option INGESTION_MAPPING_REFERENCE =
+ Options.key("ingestion_mapping_reference")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "Optional pre-created ingestion mapping name on the ADX table.");
+
+ public static final Option INGESTION_TYPE =
+ Options.key("ingestion_type")
+ .enumType(IngestionType.class)
+ .defaultValue(IngestionType.QUEUED)
+ .withDescription(
+ "QUEUED (default, high throughput, ~5 min latency) or "
+ + "STREAMING (low latency, <=4 MB/s per table).");
+
+ public static final Option BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Rows to buffer before flushing.");
+
+ public static final Option FLUSH_INTERVAL_MS =
+ Options.key("flush_interval_ms")
+ .longType()
+ .defaultValue(30_000L)
+ .withDescription("Max milliseconds between flushes regardless of batch size.");
+
+ public enum IngestionType {
+ QUEUED,
+ STREAMING
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java
new file mode 100644
index 000000000000..4b43ad116cb3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.azuredataexplorer.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_ID;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_SECRET;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLUSTER_URI;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.DATABASE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.QUERY;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.TENANT_ID;
+
+/** Immutable config value object used by the source. */
+@Getter
+@Builder
+public class AzureDataExplorerSourceConfig {
+
+ private final String clusterUri;
+ private final String database;
+ private final String clientId;
+ private final String clientSecret;
+ private final String tenantId;
+ private final String query;
+
+ public static AzureDataExplorerSourceConfig fromSourceConfig(ReadonlyConfig cfg) {
+ return AzureDataExplorerSourceConfig.builder()
+ .clusterUri(cfg.get(CLUSTER_URI))
+ .database(cfg.get(DATABASE))
+ .clientId(cfg.get(CLIENT_ID))
+ .clientSecret(cfg.get(CLIENT_SECRET))
+ .tenantId(cfg.get(TENANT_ID))
+ .query(cfg.get(QUERY))
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java
new file mode 100644
index 000000000000..e17a96490d24
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.azuredataexplorer.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AzureDataExplorerSourceOptions {
+
+ public static final Option CLUSTER_URI =
+ Options.key("cluster_uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "ADX cluster URI, e.g. https://mycluster.eastus.kusto.windows.net");
+
+ public static final Option DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target database name.");
+
+ public static final Option QUERY =
+ Options.key("query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kusto query (KQL) to execute for source reads.");
+
+ public static final Option CLIENT_ID =
+ Options.key("client_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD application (client) ID.");
+
+ public static final Option CLIENT_SECRET =
+ Options.key("client_secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD application secret.");
+
+ public static final Option TENANT_ID =
+ Options.key("tenant_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Azure AD tenant (directory) ID.");
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java
new file mode 100644
index 000000000000..9dabcdad2a9e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java
@@ -0,0 +1,15 @@
+package org.apache.seatunnel.azuredataexplorer.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class AzureDataExplorerConnectorException extends SeaTunnelRuntimeException {
+
+ public AzureDataExplorerConnectorException(AzureDataExplorerErrorCode code, String message) {
+ super(code, message);
+ }
+
+ public AzureDataExplorerConnectorException(
+ AzureDataExplorerErrorCode code, String message, Throwable cause) {
+ super(code, message, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java
new file mode 100644
index 000000000000..1591dd55f46c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java
@@ -0,0 +1,29 @@
+package org.apache.seatunnel.azuredataexplorer.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum AzureDataExplorerErrorCode implements SeaTunnelErrorCode {
+ INGESTION_FAILED("ADX-01", "Failed to ingest data into Azure Data Explorer"),
+ QUERY_FAILED("ADX-02", "Failed to execute KQL query against Azure Data Explorer"),
+ CONNECTION_FAILED("ADX-03", "Failed to connect to Azure Data Explorer cluster"),
+ SERIALIZATION_FAILED("ADX-04", "Failed to serialize SeaTunnelRow to CSV"),
+ UNSUPPORTED_DATA_TYPE("ADX-05", "Unsupported SeaTunnel data type for ADX connector");
+
+ private final String code;
+ private final String description;
+
+ AzureDataExplorerErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java
new file mode 100644
index 000000000000..eb9988165f38
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java
@@ -0,0 +1,80 @@
+package org.apache.seatunnel.azuredataexplorer.serialization;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Serializes a SeaTunnelRow to an RFC-4180 CSV line for ADX ingestion. Column order matches the
+ * SeaTunnelRowType field order. Null fields are emitted as empty (ADX treats empty as null for most
+ * types).
+ */
+public class AzureDataExplorerRowSerializer {
+
+ private static final DateTimeFormatter DT_FMT =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ private static final DateTimeFormatter DATE_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+ private final SeaTunnelRowType rowType;
+
+ public AzureDataExplorerRowSerializer(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ }
+
+ /** Returns a CSV line (including trailing newline) for the given row. */
+ public String toCsvLine(SeaTunnelRow row) {
+ SeaTunnelDataType>[] types = rowType.getFieldTypes();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < types.length; i++) {
+ if (i > 0) sb.append(',');
+ Object val = row.getField(i);
+ if (val != null) {
+ sb.append(serializeField(val, types[i]));
+ }
+ // null => empty field
+ }
+ sb.append('\n');
+ return sb.toString();
+ }
+
+ private String serializeField(Object val, SeaTunnelDataType> type) {
+ if (type == BasicType.BOOLEAN_TYPE) return val.toString();
+ if (type == BasicType.BYTE_TYPE) return Byte.toString((Byte) val);
+ if (type == BasicType.SHORT_TYPE) return Short.toString((Short) val);
+ if (type == BasicType.INT_TYPE) return Integer.toString((Integer) val);
+ if (type == BasicType.LONG_TYPE) return Long.toString((Long) val);
+ if (type == BasicType.FLOAT_TYPE) return Float.toString((Float) val);
+ if (type == BasicType.DOUBLE_TYPE) return Double.toString((Double) val);
+ if (type == BasicType.STRING_TYPE) return csvQuote(val.toString());
+ if (type instanceof DecimalType) return val.toString();
+ if (type instanceof LocalTimeType) {
+ if (type == LocalTimeType.LOCAL_DATE_TIME_TYPE)
+ return ((LocalDateTime) val).format(DT_FMT);
+ if (type == LocalTimeType.LOCAL_DATE_TYPE) return ((LocalDate) val).format(DATE_FMT);
+ return csvQuote(val.toString());
+ }
+ throw new AzureDataExplorerConnectorException(
+ AzureDataExplorerErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Cannot serialize type: " + type.getSqlType());
+ }
+
+ /** RFC-4180: quote if value contains comma, double-quote, CR, or LF. */
+ static String csvQuote(String s) {
+ if (s.indexOf(',') >= 0
+ || s.indexOf('"') >= 0
+ || s.indexOf('\r') >= 0
+ || s.indexOf('\n') >= 0) {
+ return "\"" + s.replace("\"", "\"\"\"") + "\"";
+ }
+ return s;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java
new file mode 100644
index 000000000000..6107bcffd257
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java
@@ -0,0 +1,47 @@
+package org.apache.seatunnel.azuredataexplorer.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerConfig;
+
+public class AzureDataExplorerSink
+ implements SeaTunnelSink, TableSink {
+
+ private final ReadonlyConfig config;
+ private SeaTunnelRowType rowType;
+
+ public AzureDataExplorerSink(ReadonlyConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public String getPluginName() {
+ return AzureDataExplorerSinkFactory.IDENTIFIER;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return rowType;
+ }
+
+ @Override
+ public SinkWriter createWriter(SinkWriter.Context context) {
+ return new AzureDataExplorerSinkWriter(
+ AzureDataExplorerConfig.fromSinkConfig(config), rowType);
+ }
+
+ @Override
+ public SeaTunnelSink createSink() {
+ return this;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java
new file mode 100644
index 000000000000..2f53f4527a72
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java
@@ -0,0 +1,48 @@
+package org.apache.seatunnel.azuredataexplorer.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.BATCH_SIZE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_ID;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_SECRET;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLUSTER_URI;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.DATABASE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_TYPE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TABLE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TENANT_ID;
+
+@AutoService(Factory.class)
+public class AzureDataExplorerSinkFactory implements TableSinkFactory {
+
+ public static final String IDENTIFIER = "AzureDataExplorer";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CLUSTER_URI, DATABASE, TABLE)
+ .bundled(CLIENT_ID, CLIENT_SECRET, TENANT_ID)
+ .optional(
+ INGESTION_MAPPING_REFERENCE, INGESTION_TYPE, BATCH_SIZE, FLUSH_INTERVAL_MS)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig config = context.getOptions();
+ return () -> new AzureDataExplorerSink(config);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java
new file mode 100644
index 000000000000..eccf41e05982
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java
@@ -0,0 +1,148 @@
+package org.apache.seatunnel.azuredataexplorer.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerConfig;
+import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.IngestionType;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode;
+import org.apache.seatunnel.azuredataexplorer.serialization.AzureDataExplorerRowSerializer;
+
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.ingest.IngestClient;
+import com.microsoft.azure.kusto.ingest.IngestClientFactory;
+import com.microsoft.azure.kusto.ingest.IngestionMapping;
+import com.microsoft.azure.kusto.ingest.IngestionProperties;
+import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+public class AzureDataExplorerSinkWriter implements SinkWriter {
+
+ private final AzureDataExplorerConfig config;
+ private final AzureDataExplorerRowSerializer serializer;
+ private final IngestClient ingestClient;
+ private final IngestionProperties ingestionProperties;
+ private final List buffer;
+ private long lastFlushTime;
+
+ public AzureDataExplorerSinkWriter(AzureDataExplorerConfig config, SeaTunnelRowType rowType) {
+ this(config, rowType, buildIngestClient(config));
+ }
+
+ AzureDataExplorerSinkWriter(
+ AzureDataExplorerConfig config, SeaTunnelRowType rowType, IngestClient ingestClient) {
+ this.config = config;
+ this.serializer = new AzureDataExplorerRowSerializer(rowType);
+ this.buffer = new ArrayList<>(Math.max(config.getBatchSize(), 16));
+ this.lastFlushTime = System.currentTimeMillis();
+ this.ingestClient = ingestClient;
+ this.ingestionProperties = buildIngestionProperties(config);
+ }
+
+ private static IngestClient buildIngestClient(AzureDataExplorerConfig config) {
+ try {
+ String uri =
+ config.getIngestionType() == IngestionType.STREAMING
+ ? config.getClusterUri()
+ : config.getQueuedIngestUri();
+ ConnectionStringBuilder csb =
+ ConnectionStringBuilder.createWithAadApplicationCredentials(
+ uri,
+ config.getClientId(),
+ config.getClientSecret(),
+ config.getTenantId());
+ return config.getIngestionType() == IngestionType.STREAMING
+ ? IngestClientFactory.createStreamingIngestClient(csb)
+ : IngestClientFactory.createClient(csb);
+ } catch (Exception e) {
+ throw new AzureDataExplorerConnectorException(
+ AzureDataExplorerErrorCode.CONNECTION_FAILED,
+ "Cannot create ADX ingest client for cluster: " + config.getClusterUri(),
+ e);
+ }
+ }
+
+ private static IngestionProperties buildIngestionProperties(AzureDataExplorerConfig config) {
+ IngestionProperties props =
+ new IngestionProperties(config.getDatabase(), config.getTable());
+ props.setDataFormat(IngestionProperties.DataFormat.CSV);
+ props.setIgnoreFirstRecord(false);
+ String mappingRef = config.getIngestionMappingReference();
+ if (mappingRef != null && !mappingRef.isEmpty()) {
+ props.setIngestionMapping(
+ new IngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.CSV));
+ }
+ return props;
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ buffer.add(serializer.toCsvLine(element));
+ boolean batchFull = buffer.size() >= config.getBatchSize();
+ boolean timedOut =
+ System.currentTimeMillis() - lastFlushTime >= config.getFlushIntervalMs();
+ if (batchFull || timedOut) flush();
+ }
+
+ @Override
+ public Optional prepareCommit() {
+ flush();
+ return Optional.empty();
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void abortPrepare() {
+ buffer.clear();
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (!buffer.isEmpty()) flush();
+ } finally {
+ try {
+ ingestClient.close();
+ } catch (Exception e) {
+ log.warn("Error closing ADX ingest client", e);
+ }
+ }
+ }
+
+ private void flush() {
+ if (buffer.isEmpty()) return;
+ StringBuilder sb = new StringBuilder();
+ for (String line : buffer) sb.append(line);
+ byte[] csv = sb.toString().getBytes(StandardCharsets.UTF_8);
+ StreamSourceInfo si = new StreamSourceInfo(new ByteArrayInputStream(csv));
+ try {
+ ingestClient.ingestFromStream(si, ingestionProperties);
+ log.debug(
+ "Flushed {} rows to {}.{}",
+ buffer.size(),
+ config.getDatabase(),
+ config.getTable());
+ } catch (Exception e) {
+ throw new AzureDataExplorerConnectorException(
+ AzureDataExplorerErrorCode.INGESTION_FAILED,
+ "Ingestion failed for " + buffer.size() + " rows",
+ e);
+ } finally {
+ buffer.clear();
+ lastFlushTime = System.currentTimeMillis();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java
new file mode 100644
index 000000000000..e1f329c47761
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java
@@ -0,0 +1,72 @@
+package org.apache.seatunnel.azuredataexplorer.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.Collections;
+import java.util.List;
+
+public class AzureDataExplorerSource
+ implements SeaTunnelSource<
+ SeaTunnelRow, AzureDataExplorerSourceSplit, AzureDataExplorerSourceState> {
+
+ private final ReadonlyConfig config;
+ private final CatalogTable catalogTable;
+
+ public AzureDataExplorerSource(ReadonlyConfig config) {
+ this.config = config;
+ this.catalogTable =
+ config.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()
+ ? CatalogTableUtil.buildWithConfig(config)
+ : null;
+ }
+
+ @Override
+ public String getPluginName() {
+ return AzureDataExplorerSourceFactory.IDENTIFIER;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public List getProducedCatalogTables() {
+ if (catalogTable == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(catalogTable);
+ }
+
+ @Override
+ public SourceReader createReader(
+ SourceReader.Context readerContext) {
+ return new AzureDataExplorerSourceReader(
+ readerContext,
+ config,
+ catalogTable == null ? null : catalogTable.getSeaTunnelRowType());
+ }
+
+ @Override
+ public SourceSplitEnumerator
+ createEnumerator(
+ SourceSplitEnumerator.Context enumeratorContext) {
+ return new AzureDataExplorerSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator
+ restoreEnumerator(
+ SourceSplitEnumerator.Context enumeratorContext,
+ AzureDataExplorerSourceState checkpointState) {
+ return new AzureDataExplorerSplitEnumerator(enumeratorContext, checkpointState);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java
new file mode 100644
index 000000000000..42e21fe6cc44
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java
@@ -0,0 +1,53 @@
+package org.apache.seatunnel.azuredataexplorer.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SourceConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_ID;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_SECRET;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLUSTER_URI;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.DATABASE;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.QUERY;
+import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.TENANT_ID;
+
+@AutoService(Factory.class)
+public class AzureDataExplorerSourceFactory implements TableSourceFactory {
+ public static final String IDENTIFIER = "AzureDataExplorer";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CLUSTER_URI, DATABASE, QUERY)
+ .bundled(CLIENT_ID, CLIENT_SECRET, TENANT_ID)
+ .optional(SourceConnectorCommonOptions.SCHEMA)
+ .build();
+ }
+
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return AzureDataExplorerSource.class;
+ }
+
+ @Override
+ public
+ TableSource createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource)
+ new AzureDataExplorerSource(context.getOptions());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java
new file mode 100644
index 000000000000..70454f072796
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java
@@ -0,0 +1,207 @@
+package org.apache.seatunnel.azuredataexplorer.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceConfig;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException;
+import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.KustoResultColumn;
+import com.microsoft.azure.kusto.data.KustoResultSetTable;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
+@Slf4j
+public class AzureDataExplorerSourceReader
+ implements SourceReader {
+
+ private final SourceReader.Context context;
+ private final AzureDataExplorerSourceConfig config;
+ private final Queue splitQueue;
+ private SeaTunnelRowType rowType;
+ private Client client;
+ private volatile boolean noMoreSplits;
+
+ AzureDataExplorerSourceReader(
+ SourceReader.Context context, ReadonlyConfig config, SeaTunnelRowType rowType) {
+ this.context = context;
+ this.config = AzureDataExplorerSourceConfig.fromSourceConfig(config);
+ this.rowType = rowType;
+ this.splitQueue = new ArrayDeque<>();
+ }
+
+ @Override
+ public void open() {
+ try {
+ ConnectionStringBuilder csb =
+ ConnectionStringBuilder.createWithAadApplicationCredentials(
+ config.getClusterUri(),
+ config.getClientId(),
+ config.getClientSecret(),
+ config.getTenantId());
+ this.client = ClientFactory.createClient(csb);
+ } catch (Exception e) {
+ throw new AzureDataExplorerConnectorException(
+ AzureDataExplorerErrorCode.CONNECTION_FAILED,
+ "Cannot create ADX data client for cluster: " + config.getClusterUri(),
+ e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // No close required for Kusto data client.
+ }
+
+ @Override
+ public void pollNext(Collector output) {
+ synchronized (output.getCheckpointLock()) {
+ AzureDataExplorerSourceSplit split = splitQueue.poll();
+ if (split != null) {
+ executeQuery(output);
+ } else if (noMoreSplits) {
+ log.info("Closed the bounded Azure Data Explorer source");
+ context.signalNoMoreElement();
+ }
+ }
+ }
+
+ private void executeQuery(Collector output) {
+ try {
+ KustoOperationResult result =
+ client.executeQuery(config.getDatabase(), config.getQuery());
+ KustoResultSetTable table = result.getPrimaryResults();
+ if (rowType == null) {
+ rowType = buildRowType(table.getColumns());
+ }
+ while (table.next()) {
+ List