From bb1e01b3cd544b6deeaff1b1be1e53e2037244bc Mon Sep 17 00:00:00 2001 From: Kai Devrim Date: Thu, 14 May 2026 15:08:49 -0700 Subject: [PATCH 1/5] start adding azure data explorer connector --- .../connector-azuredataexplorer/pom.xml | 20 +++++++++++++++++++ seatunnel-connectors-v2/pom.xml | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml new file mode 100644 index 000000000000..873aca373243 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + 3.0.0-SNAPSHOT + + + connector-azuredataexplorer + + + 24 + 24 + UTF-8 + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 5b8836bd45f0..1ff6df723665 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -92,6 +92,7 @@ connector-sensorsdata connector-fluss connector-lance + connector-azuredataexplorer @@ -132,4 +133,4 @@ - + \ No newline at end of file From 6f01ef6901e525e7a12792c7493ad78fbd77421e Mon Sep 17 00:00:00 2001 From: Kai Devrim Date: Thu, 14 May 2026 16:16:10 -0700 Subject: [PATCH 2/5] create files needed for Azure Data Explorer --- .../connector-azuredataexplorer/pom.xml | 12 ++++++++++++ .../config/AzureDataExplorerConfig.java | 4 ++++ .../AzureDataExplorerConnectorException.java | 4 ++++ .../exception/AzureDataExplorerErrorCode.java | 4 ++++ .../sink/AzureDataExplorerSink.java | 4 ++++ .../sink/AzureDataExplorerSinkFactory.java | 4 ++++ .../sink/AzureDataExplorerSinkWriter.java | 4 ++++ .../source/AzureDataExplorerSource.java | 4 ++++ .../source/AzureDataExplorerSourceFactory.java | 4 ++++ .../source/AzureDataExplorerSourceReader.java | 4 ++++ .../source/AzureDataExplorerSourceSplit.java | 4 ++++ .../source/AzureDataExplorerSplitEnumerator.java | 4 ++++ .../org.apache.seatunnel.api.table.factory.Factory | 0 .../test/java/AzureDataExplorerSinkWriterTest.java | 2 ++ seatunnel-dist/pom.xml | 7 ++++++- 15 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml index 873aca373243..499f3cf0ad0a 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml @@ -15,6 +15,18 @@ 24 24 UTF-8 + + com.microsoft.azure.kusto + kusto-data + 6.0.3 + + + com.microsoft.azure.kusto + kusto-ingest + 6.0.3 + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java new file mode 100644 index 000000000000..26edf90f35b9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.config; + +public class AzureDataExplorerConfig { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java new file mode 100644 index 000000000000..994a586a4804 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.exception; + +public class AzureDataExplorerConnectorException { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java new file mode 100644 index 000000000000..356fcfec8a67 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.exception; + +public class AzureDataExplorerErrorCode { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java new file mode 100644 index 000000000000..074dd83f21d8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.sink; + +public class AzureDataExplorerSink { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java new file mode 100644 index 000000000000..d289b00a2e8a --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.sink; + +public class AzureDataExplorerSinkFactory { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java new file mode 100644 index 000000000000..1f5bb6aeb5b8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.sink; + +public class AzureDataExplorerSinkWriter { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java new file mode 100644 index 000000000000..fa2482db6824 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.source; + +public class AzureDataExplorerSource { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java new file mode 100644 index 000000000000..4b0717a6671b --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.source; + +public class AzureDataExplorerSourceFactory { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java new file mode 100644 index 000000000000..017c3a9c1991 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.source; + +public class AzureDataExplorerSourceReader { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java new file mode 100644 index 000000000000..190a1ec613ee --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.source; + +public class AzureDataExplorerSourceSplit { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java new file mode 100644 index 000000000000..728f327467d3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java @@ -0,0 +1,4 @@ +package org.apache.seatunnel.azuredataexplorer.source; + +public class AzureDataExplorerSplitEnumerator { +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java new file mode 100644 index 000000000000..8c97dbfa88ae --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java @@ -0,0 +1,2 @@ +public class AzureDataExplorerSinkWriterTest { +} \ No newline at end of file diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 4314565dfd01..2b8870118f6a 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -376,6 +376,11 @@ ${project.version} provided + + org.apache.seatunnel + connector-azuredataexplorer + ${project.version} + org.apache.seatunnel connector-iotdb @@ -1263,4 +1268,4 @@ - + \ No newline at end of file From 0f0e4b85e3c286af6772793809c8da799e8c24b1 Mon Sep 17 00:00:00 2001 From: Kai Devrim Date: Thu, 14 May 2026 16:19:58 -0700 Subject: [PATCH 3/5] Implement the AzureDataExplorerConfig --- .../config/AzureDataExplorerConfig.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java index 26edf90f35b9..c50dae0f3b5a 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java @@ -1,4 +1,42 @@ package org.apache.seatunnel.azuredataexplorer.config; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + public class AzureDataExplorerConfig { + public static final Option CLUSTER_URL = + Options.key("cluster_url").stringType().noDefaultValue() + .withDescription("ADX cluster URI e.g. https://mycluster.eastus.kusto.windows.net"); + + public static final Option DATABASE = + Options.key("database").stringType().noDefaultValue() + .withDescription("Database name"); + + public static final Option TABLE = + Options.key("table").stringType().noDefaultValue() + .withDescription("Table name"); + + public static final Option CLIENT_ID = + Options.key("client_id").stringType().noDefaultValue() + .withDescription("Azure AD service principal client ID"); + + public static final Option CLIENT_SECRET = + Options.key("client_secret").stringType().noDefaultValue() + .withDescription("Azure AD service principal client secret"); + + public static final Option TENANT_ID = + Options.key("tenant_id").stringType().noDefaultValue() + .withDescription("Azure AD tenant ID"); + + public static final Option QUERY = + Options.key("query").stringType().noDefaultValue() + .withDescription("KQL query for reading, e.g. MyTable | take 1000"); + + public static final Option USE_STREAMING_INGEST = + Options.key("use_streaming_ingest").booleanType().defaultValue(false) + .withDescription("Use streaming ingestion (must be enabled on cluster)"); + + public static final Option BATCH_SIZE = + Options.key("batch_size").intType().defaultValue(1000) + .withDescription("Row buffer size before flushing to ADX"); } \ No newline at end of file From b242a0eabbfc148b8789cc2f70857f7180d169ba Mon Sep 17 00:00:00 2001 From: Kai Devrim Date: Mon, 18 May 2026 22:50:17 -0700 Subject: [PATCH 4/5] Start work on creating the sink, added the sink, config, and serializer, and exceptions --- .../config/AzureDataExplorerConfig.java | 79 +++++----- .../config/AzureDataExplorerSinkOptions.java | 64 ++++++++ .../AzureDataExplorerConnectorException.java | 14 +- .../exception/AzureDataExplorerErrorCode.java | 21 ++- .../AzureDataExplorerRowSerializer.java | 77 +++++++++ .../sink/AzureDataExplorerSink.java | 44 +++++- .../sink/AzureDataExplorerSinkFactory.java | 38 ++++- .../sink/AzureDataExplorerSinkWriter.java | 148 +++++++++++++++++- 8 files changed, 443 insertions(+), 42 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java index c50dae0f3b5a..ec3755874f0e 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java @@ -1,42 +1,49 @@ package org.apache.seatunnel.azuredataexplorer.config; -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; +import lombok.Builder; +import lombok.Getter; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; -public class AzureDataExplorerConfig { - public static final Option CLUSTER_URL = - Options.key("cluster_url").stringType().noDefaultValue() - .withDescription("ADX cluster URI e.g. https://mycluster.eastus.kusto.windows.net"); - - public static final Option DATABASE = - Options.key("database").stringType().noDefaultValue() - .withDescription("Database name"); - - public static final Option TABLE = - Options.key("table").stringType().noDefaultValue() - .withDescription("Table name"); - - public static final Option CLIENT_ID = - Options.key("client_id").stringType().noDefaultValue() - .withDescription("Azure AD service principal client ID"); +import static org.apache.seatunnel.api.options.table.TableIdentifierOptions.TABLE; +import static org.apache.seatunnel.azuredataexplorer.config + .AzureDataExplorerSinkOptions.*; - public static final Option CLIENT_SECRET = - Options.key("client_secret").stringType().noDefaultValue() - .withDescription("Azure AD service principal client secret"); - - public static final Option TENANT_ID = - Options.key("tenant_id").stringType().noDefaultValue() - .withDescription("Azure AD tenant ID"); - - public static final Option QUERY = - Options.key("query").stringType().noDefaultValue() - .withDescription("KQL query for reading, e.g. MyTable | take 1000"); - - public static final Option USE_STREAMING_INGEST = - Options.key("use_streaming_ingest").booleanType().defaultValue(false) - .withDescription("Use streaming ingestion (must be enabled on cluster)"); +/** Immutable config value object used by both sink and source. */ +@Getter +@Builder +public class AzureDataExplorerConfig { - public static final Option BATCH_SIZE = - Options.key("batch_size").intType().defaultValue(1000) - .withDescription("Row buffer size before flushing to ADX"); + private final String clusterUri; + private final String database; + private final String clientId; + private final String clientSecret; + private final String tenantId; + + private final String table; + private final String ingestionMappingReference; + private final AzureDataExplorerSinkOptions.IngestionType ingestionType; + private final int batchSize; + private final long flushIntervalMs; + + public String getQueuedIngestUri() { + if (clusterUri.startsWith("https://")) { + return "https://ingest-" + clusterUri.substring("https://".length()); + } + return clusterUri; + } + + public static AzureDataExplorerConfig fromSinkConfig(ReadonlyConfig cfg) { + return AzureDataExplorerConfig.builder() + .clusterUri(cfg.get(AzureDataExplorerSinkOptions.CLUSTER_URI)) + .database(cfg.get(AzureDataExplorerSinkOptions.DATABASE)) + .table(cfg.get(TABLE)) + .clientId(cfg.get(CLIENT_ID)) + .clientSecret(cfg.get(CLIENT_SECRET)) + .tenantId(cfg.get(TENANT_ID)) + .ingestionMappingReference(cfg.get(INGESTION_MAPPING_REFERENCE)) + .ingestionType(cfg.get(INGESTION_TYPE)) + .batchSize(cfg.get(BATCH_SIZE)) + .flushIntervalMs(cfg.get(FLUSH_INTERVAL_MS)) + .build(); + } } \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java new file mode 100644 index 000000000000..f20d60392a20 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java @@ -0,0 +1,64 @@ +package org.apache.seatunnel.azuredataexplorer.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class AzureDataExplorerSinkOptions { + + public static final Option CLUSTER_URI = + Options.key("cluster_uri") + .stringType().noDefaultValue() + .withDescription( + "ADX cluster URI, e.g. https://mycluster.eastus.kusto.windows.net"); + + public static final Option DATABASE = + Options.key("database") + .stringType().noDefaultValue() + .withDescription("Target database name."); + + public static final Option TABLE = + Options.key("table") + .stringType().noDefaultValue() + .withDescription("Target table name."); + + public static final Option CLIENT_ID = + Options.key("client_id") + .stringType().noDefaultValue() + .withDescription("Azure AD application (client) ID."); + + public static final Option CLIENT_SECRET = + Options.key("client_secret") + .stringType().noDefaultValue() + .withDescription("Azure AD application secret."); + + public static final Option TENANT_ID = + Options.key("tenant_id") + .stringType().noDefaultValue() + .withDescription("Azure AD tenant (directory) ID."); + + public static final Option INGESTION_MAPPING_REFERENCE = + Options.key("ingestion_mapping_reference") + .stringType().defaultValue("") + .withDescription( + "Optional pre-created ingestion mapping name on the ADX table."); + + public static final Option INGESTION_TYPE = + Options.key("ingestion_type") + .enumType(IngestionType.class) + .defaultValue(IngestionType.QUEUED) + .withDescription( + "QUEUED (default, high throughput, ~5 min latency) or " + + "STREAMING (low latency, <=4 MB/s per table)."); + + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType().defaultValue(1000) + .withDescription("Rows to buffer before flushing."); + + public static final Option FLUSH_INTERVAL_MS = + Options.key("flush_interval_ms") + .longType().defaultValue(30_000L) + .withDescription("Max milliseconds between flushes regardless of batch size."); + + public enum IngestionType { QUEUED, STREAMING } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java index 994a586a4804..5446b208ce83 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java @@ -1,4 +1,16 @@ package org.apache.seatunnel.azuredataexplorer.exception; -public class AzureDataExplorerConnectorException { +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class AzureDataExplorerConnectorException extends SeaTunnelRuntimeException { + + public AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode code, String message) { + super(code, message); + } + + public AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode code, String message, Throwable cause) { + super(code, message, cause); + } } \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java index 356fcfec8a67..e08939e41388 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java @@ -1,4 +1,23 @@ package org.apache.seatunnel.azuredataexplorer.exception; -public class AzureDataExplorerErrorCode { +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum AzureDataExplorerErrorCode implements SeaTunnelErrorCode { + + INGESTION_FAILED("ADX-01", "Failed to ingest data into Azure Data Explorer"), + QUERY_FAILED("ADX-02", "Failed to execute KQL query against Azure Data Explorer"), + CONNECTION_FAILED("ADX-03", "Failed to connect to Azure Data Explorer cluster"), + SERIALIZATION_FAILED("ADX-04", "Failed to serialize SeaTunnelRow to CSV"), + UNSUPPORTED_DATA_TYPE("ADX-05", "Unsupported SeaTunnel data type for ADX connector"); + + private final String code; + private final String description; + + AzureDataExplorerErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override public String getCode() { return code; } + @Override public String getDescription() { return description; } } \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java new file mode 100644 index 000000000000..6f8ef436c0e4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java @@ -0,0 +1,77 @@ +package org.apache.seatunnel.azuredataexplorer.serialization; + +import org.apache.seatunnel.api.table.type.*; +import org.apache.seatunnel.azuredataexplorer.exception + .AzureDataExplorerErrorCode; +import org.apache.seatunnel.azuredataexplorer.exception + .AzureDataExplorerConnectorException; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * Serializes a SeaTunnelRow to an RFC-4180 CSV line for ADX ingestion. + * Column order matches the SeaTunnelRowType field order. + * Null fields are emitted as empty (ADX treats empty as null for most types). + */ +public class AzureDataExplorerRowSerializer { + + private static final DateTimeFormatter DT_FMT = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + private static final DateTimeFormatter DATE_FMT = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + private final SeaTunnelRowType rowType; + + public AzureDataExplorerRowSerializer(SeaTunnelRowType rowType) { + this.rowType = rowType; + } + + /** Returns a CSV line (including trailing newline) for the given row. */ + public String toCsvLine(SeaTunnelRow row) { + SeaTunnelDataType[] types = rowType.getFieldTypes(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < types.length; i++) { + if (i > 0) sb.append(','); + Object val = row.getField(i); + if (val != null) { + sb.append(serializeField(val, types[i])); + } + // null => empty field + } + sb.append('\n'); + return sb.toString(); + } + + private String serializeField(Object val, SeaTunnelDataType type) { + if (type == BasicType.BOOLEAN_TYPE) return val.toString(); + if (type == BasicType.BYTE_TYPE) return Byte.toString((Byte) val); + if (type == BasicType.SHORT_TYPE) return Short.toString((Short) val); + if (type == BasicType.INT_TYPE) return Integer.toString((Integer) val); + if (type == BasicType.LONG_TYPE) return Long.toString((Long) val); + if (type == BasicType.FLOAT_TYPE) return Float.toString((Float) val); + if (type == BasicType.DOUBLE_TYPE) return Double.toString((Double) val); + if (type == BasicType.STRING_TYPE) return csvQuote(val.toString()); + if (type instanceof DecimalType) return val.toString(); + if (type instanceof LocalTimeType) { + if (type == LocalTimeType.LOCAL_DATE_TIME_TYPE) + return ((LocalDateTime) val).format(DT_FMT); + if (type == LocalTimeType.LOCAL_DATE_TYPE) + return ((LocalDate) val).format(DATE_FMT); + return csvQuote(val.toString()); + } + throw new AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode.UNSUPPORTED_DATA_TYPE, + "Cannot serialize type: " + type.getSqlType()); + } + + /** RFC-4180: quote if value contains comma, double-quote, CR, or LF. */ + static String csvQuote(String s) { + if (s.indexOf(',') >= 0 || s.indexOf('"') >= 0 + || s.indexOf('\r') >= 0 || s.indexOf('\n') >= 0) { + return "\"" + s.replace("\"", "\"\"\"") + "\""; + } + return s; + } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java index 074dd83f21d8..ea2850731e45 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java @@ -1,4 +1,46 @@ package org.apache.seatunnel.azuredataexplorer.sink; -public class AzureDataExplorerSink { +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerConfig; + +import java.io.IOException; + +public class AzureDataExplorerSink + implements SeaTunnelSink, TableSink { + + private final ReadonlyConfig config; + private SeaTunnelRowType rowType; + + public AzureDataExplorerSink(ReadonlyConfig config) { + this.config = config; + } + + @Override + public String getPluginName() { return AzureDataExplorerSinkFactory.IDENTIFIER; } + + @Override + public void setTypeInfo(SeaTunnelRowType rowType) { + this.rowType = rowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { return rowType; } + + @Override + public SinkWriter createWriter( + SinkWriter.Context context) throws IOException { + return new AzureDataExplorerSinkWriter( + AzureDataExplorerConfig.fromSinkConfig(config), rowType); + } + + @Override + public SeaTunnelSink createSink() { + return null; + } } \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java index d289b00a2e8a..447caf7b35a6 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java @@ -1,4 +1,38 @@ -package org.apache.seatunnel.azuredataexplorer.sink; +package org.apache.seatunnel.azuredataexplorer.sink; // fix 1 -public class AzureDataExplorerSinkFactory { +import com.google.auto.service.AutoService; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import java.util.Optional; + +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.*; // fix 1 + +@AutoService(Factory.class) +public class AzureDataExplorerSinkFactory implements TableSinkFactory { + + public static final String IDENTIFIER = "AzureDataExplorer"; + + @Override + public String factoryIdentifier() { return IDENTIFIER; } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(CLUSTER_URI, DATABASE, TABLE) + .bundled(CLIENT_ID, CLIENT_SECRET, TENANT_ID) + .optional(INGESTION_MAPPING_REFERENCE, INGESTION_TYPE, + BATCH_SIZE, FLUSH_INTERVAL_MS) + .build(); + } + + @Override + public AzureDataExplorerSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig config = context.getOptions(); + return new AzureDataExplorerSink(config); + } } \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java index 1f5bb6aeb5b8..085d7b440f3e 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java @@ -1,4 +1,150 @@ package org.apache.seatunnel.azuredataexplorer.sink; -public class AzureDataExplorerSinkWriter { +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.ingest.IngestClient; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionMapping; +import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.azuredataexplorer.config + .AzureDataExplorerConfig; +import org.apache.seatunnel.azuredataexplorer.config + .AzureDataExplorerSinkOptions.IngestionType; +import org.apache.seatunnel.azuredataexplorer.exception + .AzureDataExplorerErrorCode; +import org.apache.seatunnel.azuredataexplorer.exception + .AzureDataExplorerConnectorException; +import org.apache.seatunnel.azuredataexplorer.serialization + .AzureDataExplorerRowSerializer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +@Slf4j +public class AzureDataExplorerSinkWriter + implements SinkWriter { + + private final AzureDataExplorerConfig config; + private final AzureDataExplorerRowSerializer serializer; + private final IngestClient ingestClient; + private final IngestionProperties ingestionProperties; + private final List buffer; + private long lastFlushTime; + + public AzureDataExplorerSinkWriter( + AzureDataExplorerConfig config, SeaTunnelRowType rowType) { + this(config, rowType, buildIngestClient(config)); + } + + AzureDataExplorerSinkWriter( + AzureDataExplorerConfig config, + SeaTunnelRowType rowType, + IngestClient ingestClient) { + this.config = config; + this.serializer = new AzureDataExplorerRowSerializer(rowType); + this.buffer = new ArrayList<>(Math.max(config.getBatchSize(), 16)); + this.lastFlushTime = System.currentTimeMillis(); + this.ingestClient = ingestClient; + this.ingestionProperties = buildIngestionProperties(config); + } + + private static IngestClient buildIngestClient(AzureDataExplorerConfig config) { + try { + String uri = config.getIngestionType() == IngestionType.STREAMING + ? config.getClusterUri() + : config.getQueuedIngestUri(); + ConnectionStringBuilder csb = + ConnectionStringBuilder.createWithAadApplicationCredentials( + uri, + config.getClientId(), + config.getClientSecret(), + config.getTenantId()); + return config.getIngestionType() == IngestionType.STREAMING + ? IngestClientFactory.createStreamingIngestClient(csb) + : IngestClientFactory.createClient(csb); + } catch (Exception e) { + throw new AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode.CONNECTION_FAILED, + "Cannot create ADX ingest client for cluster: " + config.getClusterUri(), e); + } + } + + private static IngestionProperties buildIngestionProperties( + AzureDataExplorerConfig config) { + IngestionProperties props = + new IngestionProperties(config.getDatabase(), config.getTable()); + props.setDataFormat(IngestionProperties.DataFormat.CSV); + props.setIgnoreFirstRecord(false); + String mappingRef = config.getIngestionMappingReference(); + if (mappingRef != null && !mappingRef.isEmpty()) { + props.setIngestionMapping(new IngestionMapping( + mappingRef, IngestionMapping.IngestionMappingKind.Csv)); + } + return props; + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + buffer.add(serializer.toCsvLine(element)); + boolean batchFull = buffer.size() >= config.getBatchSize(); + boolean timedOut = + System.currentTimeMillis() - lastFlushTime >= config.getFlushIntervalMs(); + if (batchFull || timedOut) flush(); + } + + @Override + public Optional prepareCommit() throws IOException { + flush(); + return Optional.empty(); + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + return Collections.emptyList(); + } + + @Override + public void abortPrepare() { + buffer.clear(); + } + + @Override + public void close() throws IOException { + try { + if (!buffer.isEmpty()) flush(); + } finally { + try { ingestClient.close(); } + catch (Exception e) { log.warn("Error closing ADX ingest client", e); } + } + } + + private void flush() throws IOException { + if (buffer.isEmpty()) return; + StringBuilder sb = new StringBuilder(); + for (String line : buffer) sb.append(line); + byte[] csv = sb.toString().getBytes(StandardCharsets.UTF_8); + StreamSourceInfo si = new StreamSourceInfo(new ByteArrayInputStream(csv)); + try { + ingestClient.ingestFromStream(si, ingestionProperties); + log.debug("Flushed {} rows to {}.{}", + buffer.size(), config.getDatabase(), config.getTable()); + } catch (Exception e) { + throw new AzureDataExplorerConnectorException( + AzureDataExplorerConnectorErrorCode.INGESTION_FAILED, + "Ingestion failed for " + buffer.size() + " rows", e); + } finally { + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + } } \ No newline at end of file From 1d1397232afc2f41a40af6c5b347c633644f21af Mon Sep 17 00:00:00 2001 From: Kai Devrim Date: Sat, 30 May 2026 16:06:24 -0700 Subject: [PATCH 5/5] Implement Azure Data Explorer Source and Sink and add tests and add documentation for Source and Sink --- docs/en/connectors/sink/AzureDataExplorer.md | 73 ++++++ .../en/connectors/source/AzureDataExplorer.md | 64 ++++++ docs/zh/connectors/sink/AzureDataExplorer.md | 73 ++++++ .../zh/connectors/source/AzureDataExplorer.md | 64 ++++++ .../connector-azuredataexplorer/pom.xml | 38 +++- .../config/AzureDataExplorerConfig.java | 18 +- .../config/AzureDataExplorerSinkOptions.java | 34 ++- .../config/AzureDataExplorerSourceConfig.java | 54 +++++ .../AzureDataExplorerSourceOptions.java | 61 ++++++ .../AzureDataExplorerConnectorException.java | 5 +- .../exception/AzureDataExplorerErrorCode.java | 14 +- .../AzureDataExplorerRowSerializer.java | 49 +++-- .../sink/AzureDataExplorerSink.java | 21 +- .../sink/AzureDataExplorerSinkFactory.java | 30 ++- .../sink/AzureDataExplorerSinkWriter.java | 84 ++++--- .../source/AzureDataExplorerSource.java | 72 +++++- .../AzureDataExplorerSourceFactory.java | 53 ++++- .../source/AzureDataExplorerSourceReader.java | 207 +++++++++++++++++- .../source/AzureDataExplorerSourceSplit.java | 21 +- .../source/AzureDataExplorerSourceState.java | 35 +++ .../AzureDataExplorerSplitEnumerator.java | 137 +++++++++++- ...apache.seatunnel.api.table.factory.Factory | 2 + .../java/AzureDataExplorerSinkWriterTest.java | 2 - .../config/AzureDataExplorerConfigTest.java | 73 ++++++ .../AzureDataExplorerSourceConfigTest.java | 51 +++++ .../factory/AzureDataExplorerFactoryTest.java | 146 ++++++++++++ seatunnel-connectors-v2/pom.xml | 2 +- seatunnel-dist/pom.xml | 2 +- 28 files changed, 1351 insertions(+), 134 deletions(-) create mode 100644 docs/en/connectors/sink/AzureDataExplorer.md create mode 100644 docs/en/connectors/source/AzureDataExplorer.md create mode 100644 docs/zh/connectors/sink/AzureDataExplorer.md create mode 100644 docs/zh/connectors/source/AzureDataExplorer.md create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceState.java delete mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfigTest.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfigTest.java create mode 100644 seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/factory/AzureDataExplorerFactoryTest.java diff --git a/docs/en/connectors/sink/AzureDataExplorer.md b/docs/en/connectors/sink/AzureDataExplorer.md new file mode 100644 index 000000000000..7371efea896d --- /dev/null +++ b/docs/en/connectors/sink/AzureDataExplorer.md @@ -0,0 +1,73 @@ +# Azure Data Explorer + +> Azure Data Explorer (ADX) sink connector + +## Support Those Engines + +> SeaTunnel Zeta + +## Description + +The Azure Data Explorer sink connector ingests `SeaTunnelRow` data into ADX using the Kusto ingestion service. It supports queued ingestion for throughput and streaming ingestion for low latency. + +## Key features + +- [x] [batch](../../introduction/concepts/connector-v2-features.md) +- [ ] [stream](../../introduction/concepts/connector-v2-features.md) +- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md) +- [ ] [column projection](../../introduction/concepts/connector-v2-features.md) +- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md) + +## Sink Options + +| Name | Type | Required | Default | Description | +|---|---|---|---|---| +| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. | +| database | String | Yes | - | Target database name. | +| table | String | Yes | - | Target table name. | +| client_id | String | Yes | - | Azure AD application (client) ID. | +| client_secret | String | Yes | - | Azure AD application secret. | +| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. | +| ingestion_mapping_reference | String | No | "" | Pre-created ingestion mapping name on the ADX table. | +| ingestion_type | Enum | No | QUEUED | `QUEUED` (default) or `STREAMING`. | +| batch_size | Integer | No | 1000 | Rows to buffer before flushing. | +| flush_interval_ms | Long | No | 30000 | Max milliseconds between flushes regardless of batch size. | + +## Task Example + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields = [ + { name = "id", type = "INT" }, + { name = "name", type = "STRING" }, + { name = "ts", type = "TIMESTAMP" } + ] + } + rows = [ + { fields = [1, "a", "2024-01-01T00:00:00"] } + ] + } +} + +sink { + AzureDataExplorer { + cluster_uri = "https://mycluster.eastus.kusto.windows.net" + database = "mydb" + table = "mytable" + client_id = "${ADX_CLIENT_ID}" + client_secret = "${ADX_CLIENT_SECRET}" + tenant_id = "${ADX_TENANT_ID}" + ingestion_type = "QUEUED" + batch_size = 1000 + flush_interval_ms = 30000 + } +} +``` \ No newline at end of file diff --git a/docs/en/connectors/source/AzureDataExplorer.md b/docs/en/connectors/source/AzureDataExplorer.md new file mode 100644 index 000000000000..45b305d8c585 --- /dev/null +++ b/docs/en/connectors/source/AzureDataExplorer.md @@ -0,0 +1,64 @@ +# Azure Data Explorer + +> Azure Data Explorer (ADX) source connector + +## Support Those Engines + +> SeaTunnel Zeta + +## Description + +The Azure Data Explorer source connector executes a Kusto Query Language (KQL) statement against an ADX cluster and emits the query results as `SeaTunnelRow` records. + +## Key features + +- [x] [batch](../../introduction/concepts/connector-v2-features.md) +- [ ] [stream](../../introduction/concepts/connector-v2-features.md) +- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md) +- [ ] [column projection](../../introduction/concepts/connector-v2-features.md) +- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md) + +## Source Options + +| Name | Type | Required | Default | Description | +|---|---|---|---|---| +| cluster_uri | String | Yes | - | ADX cluster URI, e.g. `https://mycluster.eastus.kusto.windows.net`. | +| database | String | Yes | - | Target database name. | +| query | String | Yes | - | Kusto query (KQL) to execute. | +| client_id | String | Yes | - | Azure AD application (client) ID. | +| client_secret | String | Yes | - | Azure AD application secret. | +| tenant_id | String | Yes | - | Azure AD tenant (directory) ID. | +| schema | Config | No | - | Optional SeaTunnel schema. See [Source Common Options](../common-options/source-common-options.md). | + +## Task Example + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + AzureDataExplorer { + cluster_uri = "https://mycluster.eastus.kusto.windows.net" + database = "mydb" + query = "MyTable | take 1000" + client_id = "${ADX_CLIENT_ID}" + client_secret = "${ADX_CLIENT_SECRET}" + tenant_id = "${ADX_TENANT_ID}" + + schema = { + fields = [ + { name = "id", type = "INT" }, + { name = "name", type = "STRING" }, + { name = "ts", type = "TIMESTAMP" } + ] + } + } +} + +sink { + Console { + } +} +``` \ No newline at end of file diff --git a/docs/zh/connectors/sink/AzureDataExplorer.md b/docs/zh/connectors/sink/AzureDataExplorer.md new file mode 100644 index 000000000000..c7c79f0c6732 --- /dev/null +++ b/docs/zh/connectors/sink/AzureDataExplorer.md @@ -0,0 +1,73 @@ +# Azure Data Explorer + +> Azure Data Explorer (ADX) 写入连接器 + +## 支持的引擎 + +> SeaTunnel Zeta + +## 描述 + +Azure Data Explorer 写入连接器通过 Kusto Ingestion 服务将 `SeaTunnelRow` 数据写入 ADX,支持队列写入与流式写入两种方式。 + +## 关键特性 + +- [x] [batch](../../introduction/concepts/connector-v2-features.md) +- [ ] [stream](../../introduction/concepts/connector-v2-features.md) +- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md) +- [ ] [column projection](../../introduction/concepts/connector-v2-features.md) +- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md) + +## 写入选项 + +| 名称 | 类型 | 是否必填 | 默认值 | 描述 | +|---|---|---|---|---| +| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 | +| database | String | 是 | - | 目标数据库名称。 | +| table | String | 是 | - | 目标表名。 | +| client_id | String | 是 | - | Azure AD 应用 (client) ID。 | +| client_secret | String | 是 | - | Azure AD 应用密钥。 | +| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 | +| ingestion_mapping_reference | String | 否 | "" | 已存在的 ingestion mapping 名称。 | +| ingestion_type | Enum | 否 | QUEUED | `QUEUED`(默认)或 `STREAMING`。 | +| batch_size | Integer | 否 | 1000 | 缓冲后批量写入的行数。 | +| flush_interval_ms | Long | 否 | 30000 | 无论批量大小,最多等待的毫秒数。 | + +## 示例 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields = [ + { name = "id", type = "INT" }, + { name = "name", type = "STRING" }, + { name = "ts", type = "TIMESTAMP" } + ] + } + rows = [ + { fields = [1, "a", "2024-01-01T00:00:00"] } + ] + } +} + +sink { + AzureDataExplorer { + cluster_uri = "https://mycluster.eastus.kusto.windows.net" + database = "mydb" + table = "mytable" + client_id = "${ADX_CLIENT_ID}" + client_secret = "${ADX_CLIENT_SECRET}" + tenant_id = "${ADX_TENANT_ID}" + ingestion_type = "QUEUED" + batch_size = 1000 + flush_interval_ms = 30000 + } +} +``` \ No newline at end of file diff --git a/docs/zh/connectors/source/AzureDataExplorer.md b/docs/zh/connectors/source/AzureDataExplorer.md new file mode 100644 index 000000000000..40261bdb8fdf --- /dev/null +++ b/docs/zh/connectors/source/AzureDataExplorer.md @@ -0,0 +1,64 @@ +# Azure Data Explorer + +> Azure Data Explorer (ADX) 源连接器 + +## 支持的引擎 + +> SeaTunnel Zeta + +## 描述 + +Azure Data Explorer 源连接器用于执行 Kusto Query Language (KQL) 查询,并将结果以 `SeaTunnelRow` 输出。 + +## 关键特性 + +- [x] [batch](../../introduction/concepts/connector-v2-features.md) +- [ ] [stream](../../introduction/concepts/connector-v2-features.md) +- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md) +- [ ] [column projection](../../introduction/concepts/connector-v2-features.md) +- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md) + +## 源选项 + +| 名称 | 类型 | 是否必填 | 默认值 | 描述 | +|---|---|---|---|---| +| cluster_uri | String | 是 | - | ADX 集群地址,例如 `https://mycluster.eastus.kusto.windows.net`。 | +| database | String | 是 | - | 目标数据库名称。 | +| query | String | 是 | - | 要执行的 KQL 查询。 | +| client_id | String | 是 | - | Azure AD 应用 (client) ID。 | +| client_secret | String | 是 | - | Azure AD 应用密钥。 | +| tenant_id | String | 是 | - | Azure AD 租户 (directory) ID。 | +| schema | Config | 否 | - | 可选的 SeaTunnel schema。参考 [Source Common Options](../common-options/source-common-options.md)。 | + +## 示例 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + AzureDataExplorer { + cluster_uri = "https://mycluster.eastus.kusto.windows.net" + database = "mydb" + query = "MyTable | take 1000" + client_id = "${ADX_CLIENT_ID}" + client_secret = "${ADX_CLIENT_SECRET}" + tenant_id = "${ADX_TENANT_ID}" + + schema = { + fields = [ + { name = "id", type = "INT" }, + { name = "name", type = "STRING" }, + { name = "ts", type = "TIMESTAMP" } + ] + } + } +} + +sink { + Console { + } +} +``` \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml index 499f3cf0ad0a..c4eeb19f2fbc 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/pom.xml @@ -1,6 +1,5 @@ - 4.0.0 @@ -11,10 +10,20 @@ connector-azuredataexplorer - - 24 - 24 - UTF-8 + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + provided + + + + org.apache.seatunnel + connector-common + ${project.version} + com.microsoft.azure.kusto kusto-data @@ -26,7 +35,16 @@ 6.0.3 - - - - \ No newline at end of file + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java index ec3755874f0e..8edaa2387fa7 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfig.java @@ -1,14 +1,20 @@ package org.apache.seatunnel.azuredataexplorer.config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + import lombok.Builder; import lombok.Getter; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import static org.apache.seatunnel.api.options.table.TableIdentifierOptions.TABLE; -import static org.apache.seatunnel.azuredataexplorer.config - .AzureDataExplorerSinkOptions.*; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.BATCH_SIZE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_ID; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_SECRET; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_TYPE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TABLE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TENANT_ID; -/** Immutable config value object used by both sink and source. */ +/** Immutable config value object used by the sink. */ @Getter @Builder public class AzureDataExplorerConfig { @@ -46,4 +52,4 @@ public static AzureDataExplorerConfig fromSinkConfig(ReadonlyConfig cfg) { .flushIntervalMs(cfg.get(FLUSH_INTERVAL_MS)) .build(); } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java index f20d60392a20..4826ad0270c4 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSinkOptions.java @@ -7,38 +7,45 @@ public class AzureDataExplorerSinkOptions { public static final Option CLUSTER_URI = Options.key("cluster_uri") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription( "ADX cluster URI, e.g. https://mycluster.eastus.kusto.windows.net"); public static final Option DATABASE = Options.key("database") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription("Target database name."); public static final Option TABLE = Options.key("table") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription("Target table name."); public static final Option CLIENT_ID = Options.key("client_id") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription("Azure AD application (client) ID."); public static final Option CLIENT_SECRET = Options.key("client_secret") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription("Azure AD application secret."); public static final Option TENANT_ID = Options.key("tenant_id") - .stringType().noDefaultValue() + .stringType() + .noDefaultValue() .withDescription("Azure AD tenant (directory) ID."); public static final Option INGESTION_MAPPING_REFERENCE = Options.key("ingestion_mapping_reference") - .stringType().defaultValue("") + .stringType() + .defaultValue("") .withDescription( "Optional pre-created ingestion mapping name on the ADX table."); @@ -52,13 +59,18 @@ public class AzureDataExplorerSinkOptions { public static final Option BATCH_SIZE = Options.key("batch_size") - .intType().defaultValue(1000) + .intType() + .defaultValue(1000) .withDescription("Rows to buffer before flushing."); public static final Option FLUSH_INTERVAL_MS = Options.key("flush_interval_ms") - .longType().defaultValue(30_000L) + .longType() + .defaultValue(30_000L) .withDescription("Max milliseconds between flushes regardless of batch size."); - public enum IngestionType { QUEUED, STREAMING } -} \ No newline at end of file + public enum IngestionType { + QUEUED, + STREAMING + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java new file mode 100644 index 000000000000..4b43ad116cb3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Builder; +import lombok.Getter; + +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_ID; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_SECRET; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLUSTER_URI; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.DATABASE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.QUERY; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.TENANT_ID; + +/** Immutable config value object used by the source. */ +@Getter +@Builder +public class AzureDataExplorerSourceConfig { + + private final String clusterUri; + private final String database; + private final String clientId; + private final String clientSecret; + private final String tenantId; + private final String query; + + public static AzureDataExplorerSourceConfig fromSourceConfig(ReadonlyConfig cfg) { + return AzureDataExplorerSourceConfig.builder() + .clusterUri(cfg.get(CLUSTER_URI)) + .database(cfg.get(DATABASE)) + .clientId(cfg.get(CLIENT_ID)) + .clientSecret(cfg.get(CLIENT_SECRET)) + .tenantId(cfg.get(TENANT_ID)) + .query(cfg.get(QUERY)) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java new file mode 100644 index 000000000000..e17a96490d24 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceOptions.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class AzureDataExplorerSourceOptions { + + public static final Option CLUSTER_URI = + Options.key("cluster_uri") + .stringType() + .noDefaultValue() + .withDescription( + "ADX cluster URI, e.g. https://mycluster.eastus.kusto.windows.net"); + + public static final Option DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("Target database name."); + + public static final Option QUERY = + Options.key("query") + .stringType() + .noDefaultValue() + .withDescription("Kusto query (KQL) to execute for source reads."); + + public static final Option CLIENT_ID = + Options.key("client_id") + .stringType() + .noDefaultValue() + .withDescription("Azure AD application (client) ID."); + + public static final Option CLIENT_SECRET = + Options.key("client_secret") + .stringType() + .noDefaultValue() + .withDescription("Azure AD application secret."); + + public static final Option TENANT_ID = + Options.key("tenant_id") + .stringType() + .noDefaultValue() + .withDescription("Azure AD tenant (directory) ID."); +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java index 5446b208ce83..9dabcdad2a9e 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerConnectorException.java @@ -4,8 +4,7 @@ public class AzureDataExplorerConnectorException extends SeaTunnelRuntimeException { - public AzureDataExplorerConnectorException( - AzureDataExplorerErrorCode code, String message) { + public AzureDataExplorerConnectorException(AzureDataExplorerErrorCode code, String message) { super(code, message); } @@ -13,4 +12,4 @@ public AzureDataExplorerConnectorException( AzureDataExplorerErrorCode code, String message, Throwable cause) { super(code, message, cause); } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java index e08939e41388..1591dd55f46c 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/exception/AzureDataExplorerErrorCode.java @@ -3,7 +3,6 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum AzureDataExplorerErrorCode implements SeaTunnelErrorCode { - INGESTION_FAILED("ADX-01", "Failed to ingest data into Azure Data Explorer"), QUERY_FAILED("ADX-02", "Failed to execute KQL query against Azure Data Explorer"), CONNECTION_FAILED("ADX-03", "Failed to connect to Azure Data Explorer cluster"), @@ -18,6 +17,13 @@ public enum AzureDataExplorerErrorCode implements SeaTunnelErrorCode { this.description = description; } - @Override public String getCode() { return code; } - @Override public String getDescription() { return description; } -} \ No newline at end of file + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java index 6f8ef436c0e4..eb9988165f38 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/serialization/AzureDataExplorerRowSerializer.java @@ -1,26 +1,28 @@ package org.apache.seatunnel.azuredataexplorer.serialization; -import org.apache.seatunnel.api.table.type.*; -import org.apache.seatunnel.azuredataexplorer.exception - .AzureDataExplorerErrorCode; -import org.apache.seatunnel.azuredataexplorer.exception - .AzureDataExplorerConnectorException; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** - * Serializes a SeaTunnelRow to an RFC-4180 CSV line for ADX ingestion. - * Column order matches the SeaTunnelRowType field order. - * Null fields are emitted as empty (ADX treats empty as null for most types). + * Serializes a SeaTunnelRow to an RFC-4180 CSV line for ADX ingestion. Column order matches the + * SeaTunnelRowType field order. Null fields are emitted as empty (ADX treats empty as null for most + * types). */ public class AzureDataExplorerRowSerializer { private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); - private static final DateTimeFormatter DATE_FMT = - DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final DateTimeFormatter DATE_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private final SeaTunnelRowType rowType; @@ -46,19 +48,18 @@ public String toCsvLine(SeaTunnelRow row) { private String serializeField(Object val, SeaTunnelDataType type) { if (type == BasicType.BOOLEAN_TYPE) return val.toString(); - if (type == BasicType.BYTE_TYPE) return Byte.toString((Byte) val); - if (type == BasicType.SHORT_TYPE) return Short.toString((Short) val); - if (type == BasicType.INT_TYPE) return Integer.toString((Integer) val); - if (type == BasicType.LONG_TYPE) return Long.toString((Long) val); - if (type == BasicType.FLOAT_TYPE) return Float.toString((Float) val); - if (type == BasicType.DOUBLE_TYPE) return Double.toString((Double) val); - if (type == BasicType.STRING_TYPE) return csvQuote(val.toString()); - if (type instanceof DecimalType) return val.toString(); + if (type == BasicType.BYTE_TYPE) return Byte.toString((Byte) val); + if (type == BasicType.SHORT_TYPE) return Short.toString((Short) val); + if (type == BasicType.INT_TYPE) return Integer.toString((Integer) val); + if (type == BasicType.LONG_TYPE) return Long.toString((Long) val); + if (type == BasicType.FLOAT_TYPE) return Float.toString((Float) val); + if (type == BasicType.DOUBLE_TYPE) return Double.toString((Double) val); + if (type == BasicType.STRING_TYPE) return csvQuote(val.toString()); + if (type instanceof DecimalType) return val.toString(); if (type instanceof LocalTimeType) { if (type == LocalTimeType.LOCAL_DATE_TIME_TYPE) return ((LocalDateTime) val).format(DT_FMT); - if (type == LocalTimeType.LOCAL_DATE_TYPE) - return ((LocalDate) val).format(DATE_FMT); + if (type == LocalTimeType.LOCAL_DATE_TYPE) return ((LocalDate) val).format(DATE_FMT); return csvQuote(val.toString()); } throw new AzureDataExplorerConnectorException( @@ -68,10 +69,12 @@ private String serializeField(Object val, SeaTunnelDataType type) { /** RFC-4180: quote if value contains comma, double-quote, CR, or LF. */ static String csvQuote(String s) { - if (s.indexOf(',') >= 0 || s.indexOf('"') >= 0 - || s.indexOf('\r') >= 0 || s.indexOf('\n') >= 0) { + if (s.indexOf(',') >= 0 + || s.indexOf('"') >= 0 + || s.indexOf('\r') >= 0 + || s.indexOf('\n') >= 0) { return "\"" + s.replace("\"", "\"\"\"") + "\""; } return s; } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java index ea2850731e45..6107bcffd257 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSink.java @@ -1,16 +1,14 @@ package org.apache.seatunnel.azuredataexplorer.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerConfig; -import java.io.IOException; - public class AzureDataExplorerSink implements SeaTunnelSink, TableSink { @@ -22,7 +20,9 @@ public AzureDataExplorerSink(ReadonlyConfig config) { } @Override - public String getPluginName() { return AzureDataExplorerSinkFactory.IDENTIFIER; } + public String getPluginName() { + return AzureDataExplorerSinkFactory.IDENTIFIER; + } @Override public void setTypeInfo(SeaTunnelRowType rowType) { @@ -30,17 +30,18 @@ public void setTypeInfo(SeaTunnelRowType rowType) { } @Override - public SeaTunnelDataType getConsumedType() { return rowType; } + public SeaTunnelDataType getConsumedType() { + return rowType; + } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public SinkWriter createWriter(SinkWriter.Context context) { return new AzureDataExplorerSinkWriter( AzureDataExplorerConfig.fromSinkConfig(config), rowType); } @Override public SeaTunnelSink createSink() { - return null; + return this; } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java index 447caf7b35a6..2f53f4527a72 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkFactory.java @@ -1,6 +1,5 @@ -package org.apache.seatunnel.azuredataexplorer.sink; // fix 1 +package org.apache.seatunnel.azuredataexplorer.sink; -import com.google.auto.service.AutoService; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.connector.TableSink; @@ -8,9 +7,18 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; -import java.util.Optional; +import com.google.auto.service.AutoService; -import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.*; // fix 1 +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.BATCH_SIZE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_ID; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLIENT_SECRET; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.CLUSTER_URI; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.DATABASE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.INGESTION_TYPE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TABLE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.TENANT_ID; @AutoService(Factory.class) public class AzureDataExplorerSinkFactory implements TableSinkFactory { @@ -18,21 +26,23 @@ public class AzureDataExplorerSinkFactory implements TableSinkFactory { public static final String IDENTIFIER = "AzureDataExplorer"; @Override - public String factoryIdentifier() { return IDENTIFIER; } + public String factoryIdentifier() { + return IDENTIFIER; + } @Override public OptionRule optionRule() { return OptionRule.builder() .required(CLUSTER_URI, DATABASE, TABLE) .bundled(CLIENT_ID, CLIENT_SECRET, TENANT_ID) - .optional(INGESTION_MAPPING_REFERENCE, INGESTION_TYPE, - BATCH_SIZE, FLUSH_INTERVAL_MS) + .optional( + INGESTION_MAPPING_REFERENCE, INGESTION_TYPE, BATCH_SIZE, FLUSH_INTERVAL_MS) .build(); } @Override - public AzureDataExplorerSink createSink(TableSinkFactoryContext context) { + public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig config = context.getOptions(); - return new AzureDataExplorerSink(config); + return () -> new AzureDataExplorerSink(config); } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java index 085d7b440f3e..eccf41e05982 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/sink/AzureDataExplorerSinkWriter.java @@ -1,29 +1,23 @@ package org.apache.seatunnel.azuredataexplorer.sink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerConfig; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions.IngestionType; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode; +import org.apache.seatunnel.azuredataexplorer.serialization.AzureDataExplorerRowSerializer; + import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; import com.microsoft.azure.kusto.ingest.IngestClient; import com.microsoft.azure.kusto.ingest.IngestClientFactory; import com.microsoft.azure.kusto.ingest.IngestionMapping; import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; - import lombok.extern.slf4j.Slf4j; -import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.azuredataexplorer.config - .AzureDataExplorerConfig; -import org.apache.seatunnel.azuredataexplorer.config - .AzureDataExplorerSinkOptions.IngestionType; -import org.apache.seatunnel.azuredataexplorer.exception - .AzureDataExplorerErrorCode; -import org.apache.seatunnel.azuredataexplorer.exception - .AzureDataExplorerConnectorException; -import org.apache.seatunnel.azuredataexplorer.serialization - .AzureDataExplorerRowSerializer; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -31,8 +25,7 @@ import java.util.Optional; @Slf4j -public class AzureDataExplorerSinkWriter - implements SinkWriter { +public class AzureDataExplorerSinkWriter implements SinkWriter { private final AzureDataExplorerConfig config; private final AzureDataExplorerRowSerializer serializer; @@ -41,15 +34,12 @@ public class AzureDataExplorerSinkWriter private final List buffer; private long lastFlushTime; - public AzureDataExplorerSinkWriter( - AzureDataExplorerConfig config, SeaTunnelRowType rowType) { + public AzureDataExplorerSinkWriter(AzureDataExplorerConfig config, SeaTunnelRowType rowType) { this(config, rowType, buildIngestClient(config)); } AzureDataExplorerSinkWriter( - AzureDataExplorerConfig config, - SeaTunnelRowType rowType, - IngestClient ingestClient) { + AzureDataExplorerConfig config, SeaTunnelRowType rowType, IngestClient ingestClient) { this.config = config; this.serializer = new AzureDataExplorerRowSerializer(rowType); this.buffer = new ArrayList<>(Math.max(config.getBatchSize(), 16)); @@ -60,9 +50,10 @@ public AzureDataExplorerSinkWriter( private static IngestClient buildIngestClient(AzureDataExplorerConfig config) { try { - String uri = config.getIngestionType() == IngestionType.STREAMING - ? config.getClusterUri() - : config.getQueuedIngestUri(); + String uri = + config.getIngestionType() == IngestionType.STREAMING + ? config.getClusterUri() + : config.getQueuedIngestUri(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials( uri, @@ -75,41 +66,41 @@ private static IngestClient buildIngestClient(AzureDataExplorerConfig config) { } catch (Exception e) { throw new AzureDataExplorerConnectorException( AzureDataExplorerErrorCode.CONNECTION_FAILED, - "Cannot create ADX ingest client for cluster: " + config.getClusterUri(), e); + "Cannot create ADX ingest client for cluster: " + config.getClusterUri(), + e); } } - private static IngestionProperties buildIngestionProperties( - AzureDataExplorerConfig config) { + private static IngestionProperties buildIngestionProperties(AzureDataExplorerConfig config) { IngestionProperties props = new IngestionProperties(config.getDatabase(), config.getTable()); props.setDataFormat(IngestionProperties.DataFormat.CSV); props.setIgnoreFirstRecord(false); String mappingRef = config.getIngestionMappingReference(); if (mappingRef != null && !mappingRef.isEmpty()) { - props.setIngestionMapping(new IngestionMapping( - mappingRef, IngestionMapping.IngestionMappingKind.Csv)); + props.setIngestionMapping( + new IngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.CSV)); } return props; } @Override - public void write(SeaTunnelRow element) throws IOException { + public void write(SeaTunnelRow element) { buffer.add(serializer.toCsvLine(element)); boolean batchFull = buffer.size() >= config.getBatchSize(); - boolean timedOut = + boolean timedOut = System.currentTimeMillis() - lastFlushTime >= config.getFlushIntervalMs(); if (batchFull || timedOut) flush(); } @Override - public Optional prepareCommit() throws IOException { + public Optional prepareCommit() { flush(); return Optional.empty(); } @Override - public List snapshotState(long checkpointId) throws IOException { + public List snapshotState(long checkpointId) { return Collections.emptyList(); } @@ -119,16 +110,19 @@ public void abortPrepare() { } @Override - public void close() throws IOException { + public void close() { try { if (!buffer.isEmpty()) flush(); } finally { - try { ingestClient.close(); } - catch (Exception e) { log.warn("Error closing ADX ingest client", e); } + try { + ingestClient.close(); + } catch (Exception e) { + log.warn("Error closing ADX ingest client", e); + } } } - private void flush() throws IOException { + private void flush() { if (buffer.isEmpty()) return; StringBuilder sb = new StringBuilder(); for (String line : buffer) sb.append(line); @@ -136,15 +130,19 @@ private void flush() throws IOException { StreamSourceInfo si = new StreamSourceInfo(new ByteArrayInputStream(csv)); try { ingestClient.ingestFromStream(si, ingestionProperties); - log.debug("Flushed {} rows to {}.{}", - buffer.size(), config.getDatabase(), config.getTable()); + log.debug( + "Flushed {} rows to {}.{}", + buffer.size(), + config.getDatabase(), + config.getTable()); } catch (Exception e) { throw new AzureDataExplorerConnectorException( - AzureDataExplorerConnectorErrorCode.INGESTION_FAILED, - "Ingestion failed for " + buffer.size() + " rows", e); + AzureDataExplorerErrorCode.INGESTION_FAILED, + "Ingestion failed for " + buffer.size() + " rows", + e); } finally { buffer.clear(); lastFlushTime = System.currentTimeMillis(); } } -} \ No newline at end of file +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java index fa2482db6824..e1f329c47761 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSource.java @@ -1,4 +1,72 @@ package org.apache.seatunnel.azuredataexplorer.source; -public class AzureDataExplorerSource { -} \ No newline at end of file +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.options.ConnectorCommonOptions; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.Collections; +import java.util.List; + +public class AzureDataExplorerSource + implements SeaTunnelSource< + SeaTunnelRow, AzureDataExplorerSourceSplit, AzureDataExplorerSourceState> { + + private final ReadonlyConfig config; + private final CatalogTable catalogTable; + + public AzureDataExplorerSource(ReadonlyConfig config) { + this.config = config; + this.catalogTable = + config.getOptional(ConnectorCommonOptions.SCHEMA).isPresent() + ? CatalogTableUtil.buildWithConfig(config) + : null; + } + + @Override + public String getPluginName() { + return AzureDataExplorerSourceFactory.IDENTIFIER; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public List getProducedCatalogTables() { + if (catalogTable == null) { + return Collections.emptyList(); + } + return Collections.singletonList(catalogTable); + } + + @Override + public SourceReader createReader( + SourceReader.Context readerContext) { + return new AzureDataExplorerSourceReader( + readerContext, + config, + catalogTable == null ? null : catalogTable.getSeaTunnelRowType()); + } + + @Override + public SourceSplitEnumerator + createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) { + return new AzureDataExplorerSplitEnumerator(enumeratorContext); + } + + @Override + public SourceSplitEnumerator + restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + AzureDataExplorerSourceState checkpointState) { + return new AzureDataExplorerSplitEnumerator(enumeratorContext, checkpointState); + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java index 4b0717a6671b..42e21fe6cc44 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceFactory.java @@ -1,4 +1,53 @@ package org.apache.seatunnel.azuredataexplorer.source; -public class AzureDataExplorerSourceFactory { -} \ No newline at end of file +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.options.SourceConnectorCommonOptions; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_ID; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLIENT_SECRET; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.CLUSTER_URI; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.DATABASE; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.QUERY; +import static org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions.TENANT_ID; + +@AutoService(Factory.class) +public class AzureDataExplorerSourceFactory implements TableSourceFactory { + public static final String IDENTIFIER = "AzureDataExplorer"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(CLUSTER_URI, DATABASE, QUERY) + .bundled(CLIENT_ID, CLIENT_SECRET, TENANT_ID) + .optional(SourceConnectorCommonOptions.SCHEMA) + .build(); + } + + @Override + public Class getSourceClass() { + return AzureDataExplorerSource.class; + } + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> + (SeaTunnelSource) + new AzureDataExplorerSource(context.getOptions()); + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java index 017c3a9c1991..70454f072796 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceReader.java @@ -1,4 +1,207 @@ package org.apache.seatunnel.azuredataexplorer.source; -public class AzureDataExplorerSourceReader { -} \ No newline at end of file +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceConfig; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerConnectorException; +import org.apache.seatunnel.azuredataexplorer.exception.AzureDataExplorerErrorCode; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.KustoResultColumn; +import com.microsoft.azure.kusto.data.KustoResultSetTable; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; + +@Slf4j +public class AzureDataExplorerSourceReader + implements SourceReader { + + private final SourceReader.Context context; + private final AzureDataExplorerSourceConfig config; + private final Queue splitQueue; + private SeaTunnelRowType rowType; + private Client client; + private volatile boolean noMoreSplits; + + AzureDataExplorerSourceReader( + SourceReader.Context context, ReadonlyConfig config, SeaTunnelRowType rowType) { + this.context = context; + this.config = AzureDataExplorerSourceConfig.fromSourceConfig(config); + this.rowType = rowType; + this.splitQueue = new ArrayDeque<>(); + } + + @Override + public void open() { + try { + ConnectionStringBuilder csb = + ConnectionStringBuilder.createWithAadApplicationCredentials( + config.getClusterUri(), + config.getClientId(), + config.getClientSecret(), + config.getTenantId()); + this.client = ClientFactory.createClient(csb); + } catch (Exception e) { + throw new AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode.CONNECTION_FAILED, + "Cannot create ADX data client for cluster: " + config.getClusterUri(), + e); + } + } + + @Override + public void close() { + // No close required for Kusto data client. + } + + @Override + public void pollNext(Collector output) { + synchronized (output.getCheckpointLock()) { + AzureDataExplorerSourceSplit split = splitQueue.poll(); + if (split != null) { + executeQuery(output); + } else if (noMoreSplits) { + log.info("Closed the bounded Azure Data Explorer source"); + context.signalNoMoreElement(); + } + } + } + + private void executeQuery(Collector output) { + try { + KustoOperationResult result = + client.executeQuery(config.getDatabase(), config.getQuery()); + KustoResultSetTable table = result.getPrimaryResults(); + if (rowType == null) { + rowType = buildRowType(table.getColumns()); + } + while (table.next()) { + List currentRow = table.getCurrentRow(); + output.collect(new SeaTunnelRow(convertRow(currentRow, rowType))); + } + } catch (Exception e) { + throw new AzureDataExplorerConnectorException( + AzureDataExplorerErrorCode.QUERY_FAILED, + "Failed to execute query on database " + config.getDatabase(), + e); + } + } + + private Object[] convertRow(List row, SeaTunnelRowType rowType) { + Object[] converted = new Object[row.size()]; + SeaTunnelDataType[] fieldTypes = rowType.getFieldTypes(); + for (int i = 0; i < row.size(); i++) { + Object value = row.get(i); + SeaTunnelDataType type = fieldTypes[i]; + converted[i] = convertValue(value, type); + } + return converted; + } + + private Object convertValue(Object value, SeaTunnelDataType type) { + if (value == null) { + return null; + } + if (type == LocalTimeType.LOCAL_DATE_TYPE) { + if (value instanceof Date) { + return ((Date) value).toLocalDate(); + } + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toLocalDate(); + } + } + if (type == LocalTimeType.LOCAL_DATE_TIME_TYPE) { + if (value instanceof Timestamp) { + return ((Timestamp) value).toLocalDateTime(); + } + if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toLocalDateTime(); + } + if (value instanceof Instant) { + return LocalDateTime.ofInstant((Instant) value, ZoneOffset.UTC); + } + } + return value; + } + + private SeaTunnelRowType buildRowType(KustoResultColumn[] columns) { + String[] names = new String[columns.length]; + SeaTunnelDataType[] types = new SeaTunnelDataType[columns.length]; + for (int i = 0; i < columns.length; i++) { + names[i] = columns[i].getColumnName(); + types[i] = mapKustoType(columns[i].getColumnType()); + } + return new SeaTunnelRowType(names, types); + } + + private SeaTunnelDataType mapKustoType(String kustoType) { + if (kustoType == null) { + return BasicType.STRING_TYPE; + } + String normalizedType = kustoType.toLowerCase(); + if ("string".equals(normalizedType)) { + return BasicType.STRING_TYPE; + } + if ("int".equals(normalizedType)) { + return BasicType.INT_TYPE; + } + if ("long".equals(normalizedType)) { + return BasicType.LONG_TYPE; + } + if ("real".equals(normalizedType)) { + return BasicType.DOUBLE_TYPE; + } + if ("bool".equals(normalizedType)) { + return BasicType.BOOLEAN_TYPE; + } + if ("datetime".equals(normalizedType)) { + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + } + if ("timespan".equals(normalizedType) + || "guid".equals(normalizedType) + || "dynamic".equals(normalizedType)) { + return BasicType.STRING_TYPE; + } + if ("decimal".equals(normalizedType)) { + return new DecimalType(38, 18); + } + return BasicType.STRING_TYPE; + } + + @Override + public List snapshotState(long checkpointId) { + return List.copyOf(splitQueue); + } + + @Override + public void addSplits(List splits) { + splitQueue.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplits = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java index 190a1ec613ee..9939389a667b 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceSplit.java @@ -1,4 +1,21 @@ package org.apache.seatunnel.azuredataexplorer.source; -public class AzureDataExplorerSourceSplit { -} \ No newline at end of file +import org.apache.seatunnel.api.source.SourceSplit; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serial; + +@AllArgsConstructor +@Getter +public class AzureDataExplorerSourceSplit implements SourceSplit { + @Serial private static final long serialVersionUID = 1L; + + private final String splitId; + + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceState.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceState.java new file mode 100644 index 000000000000..d6e4e7c349ea --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSourceState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.source; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@AllArgsConstructor +@Getter +public class AzureDataExplorerSourceState implements Serializable { + @Serial private static final long serialVersionUID = 1L; + + private final boolean shouldEnumerate; + private final Map> pendingSplit; +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java index 728f327467d3..ac27f36016a0 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/java/org/apache/seatunnel/azuredataexplorer/source/AzureDataExplorerSplitEnumerator.java @@ -1,4 +1,137 @@ package org.apache.seatunnel.azuredataexplorer.source; -public class AzureDataExplorerSplitEnumerator { -} \ No newline at end of file +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Slf4j +public class AzureDataExplorerSplitEnumerator + implements SourceSplitEnumerator< + AzureDataExplorerSourceSplit, AzureDataExplorerSourceState> { + + private static final String SINGLE_SPLIT_ID = "adx-query"; + + private final Context context; + private final Object stateLock = new Object(); + private final Map> pendingSplit; + private volatile boolean shouldEnumerate; + + public AzureDataExplorerSplitEnumerator(Context context) { + this(context, null); + } + + public AzureDataExplorerSplitEnumerator( + Context context, AzureDataExplorerSourceState state) { + this.context = context; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = state == null; + if (state != null) { + this.shouldEnumerate = state.isShouldEnumerate(); + this.pendingSplit.putAll(state.getPendingSplit()); + } + } + + @Override + public void open() { + // No-op. + } + + @Override + public void run() { + Set readers = context.registeredReaders(); + if (shouldEnumerate) { + List splits = + Collections.singletonList(new AzureDataExplorerSourceSplit(SINGLE_SPLIT_ID)); + synchronized (stateLock) { + addPendingSplit(splits); + shouldEnumerate = false; + } + assignSplit(readers); + } + readers.forEach(context::signalNoMoreSplits); + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (AzureDataExplorerSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + pendingSplit + .computeIfAbsent(ownerReader, ignoredReader -> new ArrayList<>()) + .add(split); + } + } + + private void assignSplit(Collection readers) { + for (int reader : readers) { + List assignmentForReader = pendingSplit.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplit.put(reader, assignmentForReader); + } + } + } + } + + private static int getSplitOwner(String splitId, int numReaders) { + return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @Override + public void close() { + // No-op. + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new SeaTunnelRuntimeException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "Unsupported handleSplitRequest: " + subtaskId); + } + + @Override + public void registerReader(int subtaskId) { + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public AzureDataExplorerSourceState snapshotState(long checkpointId) { + synchronized (stateLock) { + return new AzureDataExplorerSourceState(shouldEnumerate, pendingSplit); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory index e69de29bb2d1..8c68b4c4a620 100644 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory @@ -0,0 +1,2 @@ +org.apache.seatunnel.azuredataexplorer.sink.AzureDataExplorerSinkFactory +org.apache.seatunnel.azuredataexplorer.source.AzureDataExplorerSourceFactory \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java deleted file mode 100644 index 8c97dbfa88ae..000000000000 --- a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/AzureDataExplorerSinkWriterTest.java +++ /dev/null @@ -1,2 +0,0 @@ -public class AzureDataExplorerSinkWriterTest { -} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfigTest.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfigTest.java new file mode 100644 index 000000000000..18f4865b8499 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerConfigTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class AzureDataExplorerConfigTest { + + @Test + public void testFromSinkConfigUsesDefaults() { + Map configMap = new HashMap<>(); + configMap.put("cluster_uri", "https://example.kusto.windows.net"); + configMap.put("database", "db"); + configMap.put("table", "table_a"); + configMap.put("client_id", "client"); + configMap.put("client_secret", "secret"); + configMap.put("tenant_id", "tenant"); + + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + AzureDataExplorerConfig sinkConfig = AzureDataExplorerConfig.fromSinkConfig(config); + + Assertions.assertEquals("https://example.kusto.windows.net", sinkConfig.getClusterUri()); + Assertions.assertEquals("db", sinkConfig.getDatabase()); + Assertions.assertEquals("table_a", sinkConfig.getTable()); + Assertions.assertEquals("client", sinkConfig.getClientId()); + Assertions.assertEquals("secret", sinkConfig.getClientSecret()); + Assertions.assertEquals("tenant", sinkConfig.getTenantId()); + Assertions.assertEquals("", sinkConfig.getIngestionMappingReference()); + Assertions.assertEquals( + AzureDataExplorerSinkOptions.IngestionType.QUEUED, sinkConfig.getIngestionType()); + Assertions.assertEquals(1000, sinkConfig.getBatchSize()); + Assertions.assertEquals(30_000L, sinkConfig.getFlushIntervalMs()); + Assertions.assertEquals( + "https://ingest-example.kusto.windows.net", sinkConfig.getQueuedIngestUri()); + } + + @Test + public void testQueuedIngestUriPreservesNonHttps() { + Map configMap = new HashMap<>(); + configMap.put("cluster_uri", "http://example"); + configMap.put("database", "db"); + configMap.put("table", "table_a"); + configMap.put("client_id", "client"); + configMap.put("client_secret", "secret"); + configMap.put("tenant_id", "tenant"); + + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + AzureDataExplorerConfig sinkConfig = AzureDataExplorerConfig.fromSinkConfig(config); + + Assertions.assertEquals("http://example", sinkConfig.getQueuedIngestUri()); + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfigTest.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfigTest.java new file mode 100644 index 000000000000..d47427809d77 --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/config/AzureDataExplorerSourceConfigTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class AzureDataExplorerSourceConfigTest { + + @Test + public void testFromSourceConfig() { + Map configMap = new HashMap<>(); + configMap.put("cluster_uri", "https://example.kusto.windows.net"); + configMap.put("database", "db"); + configMap.put("query", "MyTable | take 10"); + configMap.put("client_id", "client"); + configMap.put("client_secret", "secret"); + configMap.put("tenant_id", "tenant"); + + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + AzureDataExplorerSourceConfig sourceConfig = + AzureDataExplorerSourceConfig.fromSourceConfig(config); + + Assertions.assertEquals("https://example.kusto.windows.net", sourceConfig.getClusterUri()); + Assertions.assertEquals("db", sourceConfig.getDatabase()); + Assertions.assertEquals("MyTable | take 10", sourceConfig.getQuery()); + Assertions.assertEquals("client", sourceConfig.getClientId()); + Assertions.assertEquals("secret", sourceConfig.getClientSecret()); + Assertions.assertEquals("tenant", sourceConfig.getTenantId()); + } +} diff --git a/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/factory/AzureDataExplorerFactoryTest.java b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/factory/AzureDataExplorerFactoryTest.java new file mode 100644 index 000000000000..2b184d9452da --- /dev/null +++ b/seatunnel-connectors-v2/connector-azuredataexplorer/src/test/java/org/apache/seatunnel/azuredataexplorer/factory/AzureDataExplorerFactoryTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.azuredataexplorer.factory; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.RequiredOption; +import org.apache.seatunnel.api.options.SourceConnectorCommonOptions; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSinkOptions; +import org.apache.seatunnel.azuredataexplorer.config.AzureDataExplorerSourceOptions; +import org.apache.seatunnel.azuredataexplorer.sink.AzureDataExplorerSink; +import org.apache.seatunnel.azuredataexplorer.sink.AzureDataExplorerSinkFactory; +import org.apache.seatunnel.azuredataexplorer.source.AzureDataExplorerSource; +import org.apache.seatunnel.azuredataexplorer.source.AzureDataExplorerSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AzureDataExplorerFactoryTest { + + @Test + public void testSinkFactoryRuleAndCreation() { + AzureDataExplorerSinkFactory factory = new AzureDataExplorerSinkFactory(); + OptionRule rule = factory.optionRule(); + + List> requiredOptions = flattenRequiredOptions(rule); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSinkOptions.CLUSTER_URI)); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSinkOptions.DATABASE)); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSinkOptions.TABLE)); + + Assertions.assertTrue( + rule.getOptionalOptions() + .contains(AzureDataExplorerSinkOptions.INGESTION_MAPPING_REFERENCE)); + Assertions.assertTrue( + rule.getOptionalOptions().contains(AzureDataExplorerSinkOptions.INGESTION_TYPE)); + Assertions.assertTrue( + rule.getOptionalOptions().contains(AzureDataExplorerSinkOptions.BATCH_SIZE)); + Assertions.assertTrue( + rule.getOptionalOptions().contains(AzureDataExplorerSinkOptions.FLUSH_INTERVAL_MS)); + + Assertions.assertTrue(hasBundledCredentialOptions(rule)); + + Map configMap = new HashMap<>(); + configMap.put("cluster_uri", "https://example.kusto.windows.net"); + configMap.put("database", "db"); + configMap.put("table", "table_a"); + configMap.put("client_id", "client"); + configMap.put("client_secret", "secret"); + configMap.put("tenant_id", "tenant"); + + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + TableSinkFactoryContext context = + new TableSinkFactoryContext( + null, config, Thread.currentThread().getContextClassLoader()); + + TableSink tableSink = factory.createSink(context); + SeaTunnelSink sink = tableSink.createSink(); + Assertions.assertInstanceOf(AzureDataExplorerSink.class, sink); + } + + @Test + public void testSourceFactoryRuleAndCreation() { + AzureDataExplorerSourceFactory factory = new AzureDataExplorerSourceFactory(); + OptionRule rule = factory.optionRule(); + + List> requiredOptions = flattenRequiredOptions(rule); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSourceOptions.CLUSTER_URI)); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSourceOptions.DATABASE)); + Assertions.assertTrue(requiredOptions.contains(AzureDataExplorerSourceOptions.QUERY)); + Assertions.assertTrue( + rule.getOptionalOptions().contains(SourceConnectorCommonOptions.SCHEMA)); + + Assertions.assertTrue(hasBundledCredentialOptions(rule)); + + Map configMap = new HashMap<>(); + configMap.put("cluster_uri", "https://example.kusto.windows.net"); + configMap.put("database", "db"); + configMap.put("query", "MyTable | take 10"); + configMap.put("client_id", "client"); + configMap.put("client_secret", "secret"); + configMap.put("tenant_id", "tenant"); + + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + TableSourceFactoryContext context = + new TableSourceFactoryContext( + config, Thread.currentThread().getContextClassLoader()); + + TableSource tableSource = factory.createSource(context); + SeaTunnelSource source = tableSource.createSource(); + Assertions.assertInstanceOf(AzureDataExplorerSource.class, source); + } + + private static List> flattenRequiredOptions(OptionRule rule) { + List> options = new ArrayList<>(); + for (RequiredOption requiredOption : rule.getRequiredOptions()) { + options.addAll(requiredOption.getOptions()); + } + return options; + } + + private static boolean hasBundledCredentialOptions(OptionRule rule) { + for (RequiredOption requiredOption : rule.getRequiredOptions()) { + if (requiredOption instanceof RequiredOption.BundledRequiredOptions) { + List> options = requiredOption.getOptions(); + if (options.contains(AzureDataExplorerSinkOptions.CLIENT_ID) + && options.contains(AzureDataExplorerSinkOptions.CLIENT_SECRET) + && options.contains(AzureDataExplorerSinkOptions.TENANT_ID)) { + return true; + } + if (options.contains(AzureDataExplorerSourceOptions.CLIENT_ID) + && options.contains(AzureDataExplorerSourceOptions.CLIENT_SECRET) + && options.contains(AzureDataExplorerSourceOptions.TENANT_ID)) { + return true; + } + } + } + return false; + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 1ff6df723665..65ddfeae2f84 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -133,4 +133,4 @@ - \ No newline at end of file + diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index 2b8870118f6a..b53b18af77d3 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -1268,4 +1268,4 @@ - \ No newline at end of file +