diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 930a4d4dec79..4a9daa7b0d25 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -849,6 +849,13 @@ jobs: ./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt` run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 1` + # Keep the longest Iceberg and HBase suites in a dedicated shard so this hosted runner + # does not lose heartbeat before the rest of the part-2 modules finish. + # Handle both first-position (no leading comma) and mid/last-position (leading comma). + run_it_modules=${run_it_modules//:connector-iceberg-e2e,/} + run_it_modules=${run_it_modules//,:connector-iceberg-e2e/} + run_it_modules=${run_it_modules//:connector-hbase-e2e,/} + run_it_modules=${run_it_modules//,:connector-hbase-e2e/} ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci env: MAVEN_OPTS: -Xmx4096m @@ -1008,6 +1015,35 @@ jobs: env: MAVEN_OPTS: -Xmx4096m + all-connectors-it-8: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' + runs-on: ${{ matrix.os }} + env: + RUN_ALL_CONTAINER: ${{ needs.changes.outputs.api }} + RUN_ZETA_CONTAINER: ${{ needs.changes.outputs.engine }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 210 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh + - name: run connector-v2 integration test (part-8) + run: | + # These two suites dominated part-2 and triggered hosted-runner heartbeat loss. + ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-iceberg-e2e,:connector-hbase-e2e -am -Pci + env: + MAVEN_OPTS: -Xmx4096m + jdbc-connectors-it-part-1: needs: [ changes, sanity-check ] if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java index 32a3d0076f1e..f95554f79d90 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java @@ -385,7 +385,7 @@ private void assertSchemaEvolutionForAddColumns(String sourceTable, String sinkT sourceDatabase.setTemplateName("add_columns").createAndInitialize(); given().pollDelay(Duration.ofSeconds(5)) .await() - .atMost(120, TimeUnit.SECONDS) + .atMost(SCHEMA_ASSERT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertIterableEquals( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 6bcd10395673..f91c1827a188 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -109,6 +109,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -121,6 +122,10 @@ @Slf4j public class KafkaIT extends TestSuiteBase implements TestResource { + private static final String EXACTLY_ONCE_SOURCE_TOPIC_VARIABLE = "sourceTopic"; + private static final String EXACTLY_ONCE_SINK_TOPIC_VARIABLE = "sinkTopic"; + private static final String EXACTLY_ONCE_CONSUMER_GROUP_VARIABLE = "consumerGroup"; + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; private static final String KAFKA_HOST = "kafkaCluster"; @@ -135,6 +140,9 @@ public class KafkaIT extends TestSuiteBase implements TestResource { private List> nativeData; + /** Topics created dynamically during tests; cleaned up in {@link #tearDown()}. */ + private final List dynamicTopics = new CopyOnWriteArrayList<>(); + @BeforeAll @Override public void startUp() throws Exception { @@ -266,6 +274,14 @@ public void startUp() throws Exception { @AfterAll @Override public void tearDown() throws Exception { + if (!dynamicTopics.isEmpty()) { + try (AdminClient adminClient = createKafkaAdmin()) { + adminClient.deleteTopics(dynamicTopics).all().get(); + log.info("Deleted {} dynamic test topics", dynamicTopics.size()); + } catch (Exception e) { + log.warn("Failed to delete dynamic test topics: {}", e.getMessage()); + } + } if (producer != null) { producer.close(); } @@ -1579,11 +1595,16 @@ public void testKafkaProtobufToAssert(TestContainer container) value = {}) public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer container) throws InterruptedException, IOException { - - String producerTopic = "kafka_topic_exactly_once_1"; - String consumerTopic = "kafka_topic_exactly_once_2"; + String resourceSuffix = Long.toUnsignedString(System.nanoTime()); + String producerTopic = "kafka_topic_exactly_once_source_" + resourceSuffix; + String consumerTopic = "kafka_topic_exactly_once_sink_" + resourceSuffix; + String consumerGroup = "test_exactly_once_" + resourceSuffix; + List exactlyOnceVariables = + buildExactlyOnceStreamingVariables(producerTopic, consumerTopic, consumerGroup); + createKafkaTopic(producerTopic); + createKafkaTopic(consumerTopic); String sourceData = "Seatunnel Exactly Once Example"; - final String jobId = "18696753645413"; + final String jobId = Long.toUnsignedString(System.nanoTime()); long sinkStartOffset = endOffsetOnP0(consumerTopic); for (int i = 0; i < 10; i++) { ProducerRecord record = @@ -1596,7 +1617,9 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe () -> { try { container.executeJob( - "/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId); + "/kafka/kafka_to_kafka_exactly_once_streaming.conf", + jobId, + exactlyOnceVariables.toArray(new String[0])); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -1630,7 +1653,9 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe () -> { try { container.restoreJob( - "/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId); + "/kafka/kafka_to_kafka_exactly_once_streaming.conf", + jobId, + exactlyOnceVariables.toArray(new String[0])); } catch (Exception e) { throw new RuntimeException(e); } @@ -1655,9 +1680,14 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe type = EngineType.SPARK, value = {}) public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container) { - - String producerTopic = "kafka_topic_exactly_once_1"; - String consumerTopic = "kafka_topic_exactly_once_2"; + String resourceSuffix = Long.toUnsignedString(System.nanoTime()); + String producerTopic = "kafka_topic_exactly_once_source_" + resourceSuffix; + String consumerTopic = "kafka_topic_exactly_once_sink_" + resourceSuffix; + String consumerGroup = "test_exactly_once_" + resourceSuffix; + List exactlyOnceVariables = + buildExactlyOnceStreamingVariables(producerTopic, consumerTopic, consumerGroup); + createKafkaTopic(producerTopic); + createKafkaTopic(consumerTopic); String sourceData = "Seatunnel Exactly Once Example"; long sinkStartOffset = endOffsetOnP0(consumerTopic); for (int i = 0; i < 10; i++) { @@ -1671,7 +1701,9 @@ public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container) { CompletableFuture.supplyAsync( () -> { try { - container.executeJob("/kafka/kafka_to_kafka_exactly_once_streaming.conf"); + container.executeJob( + "/kafka/kafka_to_kafka_exactly_once_streaming.conf", + exactlyOnceVariables); } catch (Exception e) { log.error("Commit task exception :" + e.getMessage()); throw new RuntimeException(e); @@ -2013,6 +2045,34 @@ private AdminClient createKafkaAdmin() { return AdminClient.create(props); } + /** + * Create a dedicated Kafka topic for the exactly-once tests so each method reads its own data + * and never reuses offsets from earlier runs in the same class. + */ + private void createKafkaTopic(String topicName) { + NewTopic topic = new NewTopic(topicName, 1, (short) 1); + topic.configs(Collections.singletonMap("retention.ms", "-1")); + try (AdminClient adminClient = createKafkaAdmin()) { + adminClient.createTopics(Collections.singletonList(topic)).all().get(); + dynamicTopics.add(topicName); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "Interrupted while creating Kafka topic " + topicName, e); + } catch (ExecutionException e) { + throw new IllegalStateException("Failed to create Kafka topic " + topicName, e); + } + } + + /** Build the dynamic `-i key=value` variables for the exactly-once streaming template. */ + private List buildExactlyOnceStreamingVariables( + String sourceTopic, String sinkTopic, String consumerGroup) { + return Arrays.asList( + EXACTLY_ONCE_SOURCE_TOPIC_VARIABLE + "=" + sourceTopic, + EXACTLY_ONCE_SINK_TOPIC_VARIABLE + "=" + sinkTopic, + EXACTLY_ONCE_CONSUMER_GROUP_VARIABLE + "=" + consumerGroup); + } + private void initKafkaProducer() { Properties props = new Properties(); String bootstrapServers = kafkaContainer.getBootstrapServers(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf index ddbc6034b037..ac82598d49b7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once_streaming.conf @@ -24,8 +24,8 @@ env { source { Kafka { bootstrap.servers = "kafkaCluster:9092" - topic = "kafka_topic_exactly_once_1" - consumer.group = "test_exactly_once" + topic = "${sourceTopic}" + consumer.group = "${consumerGroup}" # The default format is json, which is optional format = text start_mode = group_offsets @@ -42,7 +42,7 @@ transform {} sink{ kafka { format = text - topic = "kafka_topic_exactly_once_2" + topic = "${sinkTopic}" bootstrap.servers = "kafkaCluster:9092" semantics = EXACTLY_ONCE kafka.config = { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFailureNoRestoreIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFailureNoRestoreIT.java index 3e46dbf6759c..b0ddf6a0e479 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFailureNoRestoreIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFailureNoRestoreIT.java @@ -56,6 +56,22 @@ public class ClusterFailureNoRestoreIT { private static final String TEST_TEMPLATE_FILE_NAME = "cluster_batch_fake_to_localfile_no_restore_template.conf"; + /** + * Keep the bounded fake source busy long enough to observe worker shutdown before the batch job + * converges on its own. + */ + private static final long NO_RESTORE_BATCH_ROW_NUM_PER_PARALLELISM = 20_000L; + + /** Wait for the batch job to enter the steady RUNNING state before shutting a worker down. */ + private static final long PRE_SHUTDOWN_RUNNING_TIMEOUT_SECONDS = 30L; + + /** + * Give the running batch topology a short warm-up window before shutting down a worker. The + * LocalFile sink used by this test commits files transactionally, so intermediate file lines + * are not a reliable progress signal while the job is still running. + */ + private static final long PRE_SHUTDOWN_RUNNING_GRACE_SECONDS = 5L; + private static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name"; private static final String DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM = @@ -68,7 +84,7 @@ public void testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown() throws Exception { String testCaseName = "testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown"; String testClusterName = "ClusterFailureNoRestoreIT_batch_no_restore"; - long testRowNumber = 10000; + long testRowNumber = NO_RESTORE_BATCH_ROW_NUM_PER_PARALLELISM; int testParallelism = 6; HazelcastInstanceImpl node1 = null; @@ -108,21 +124,13 @@ public void testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown() ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); Awaitility.await() - .atMost(60, TimeUnit.SECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(PRE_SHUTDOWN_RUNNING_TIMEOUT_SECONDS, TimeUnit.SECONDS) .untilAsserted( - () -> { - Long lineNumberFromDir = - FileUtils.getFileLineNumberFromDir(testResources.getLeft()); - JobStatus status = clientJobProxy.getJobStatus(); - log.warn( - "\n====================={}=====================\n", - lineNumberFromDir); - Assertions.assertTrue(lineNumberFromDir > 1); - Assertions.assertFalse( - status.isEndState(), - "job finished before worker shutdown: " + status); - }); + () -> + Assertions.assertEquals( + JobStatus.RUNNING, clientJobProxy.getJobStatus())); + + TimeUnit.SECONDS.sleep(PRE_SHUTDOWN_RUNNING_GRACE_SECONDS); CompletableFuture waitForCompleteFuture = CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2);