Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,13 @@ jobs:
./mvnw help:evaluate -Dexpression=project.modules -q -DforceStdout -pl :seatunnel-connector-v2-e2e >> /tmp/sub_module.txt
sub_modules=`python tools/update_modules_check/update_modules_check.py sub /tmp/sub_module.txt`
run_it_modules=`python tools/update_modules_check/update_modules_check.py sub_it_module "$sub_modules" 7 1`
# Keep the longest Iceberg and HBase suites in a dedicated shard so this hosted runner
# does not lose heartbeat before the rest of the part-2 modules finish.
# Handle both first-position (no leading comma) and mid/last-position (leading comma).
run_it_modules=${run_it_modules//:connector-iceberg-e2e,/}
run_it_modules=${run_it_modules//,:connector-iceberg-e2e/}
run_it_modules=${run_it_modules//:connector-hbase-e2e,/}
run_it_modules=${run_it_modules//,:connector-hbase-e2e/}
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl $run_it_modules -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
Expand Down Expand Up @@ -1008,6 +1015,35 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

all-connectors-it-8:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true'
runs-on: ${{ matrix.os }}
env:
RUN_ALL_CONTAINER: ${{ needs.changes.outputs.api }}
RUN_ZETA_CONTAINER: ${{ needs.changes.outputs.engine }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 210
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run connector-v2 integration test (part-8)
run: |
# These two suites dominated part-2 and triggered hosted-runner heartbeat loss.
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl :connector-iceberg-e2e,:connector-hbase-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-1:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private void assertSchemaEvolutionForAddColumns(String sourceTable, String sinkT
sourceDatabase.setTemplateName("add_columns").createAndInitialize();
given().pollDelay(Duration.ofSeconds(5))
.await()
.atMost(120, TimeUnit.SECONDS)
.atMost(SCHEMA_ASSERT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -121,6 +122,10 @@

@Slf4j
public class KafkaIT extends TestSuiteBase implements TestResource {
private static final String EXACTLY_ONCE_SOURCE_TOPIC_VARIABLE = "sourceTopic";
private static final String EXACTLY_ONCE_SINK_TOPIC_VARIABLE = "sinkTopic";
private static final String EXACTLY_ONCE_CONSUMER_GROUP_VARIABLE = "consumerGroup";

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";

private static final String KAFKA_HOST = "kafkaCluster";
Expand All @@ -135,6 +140,9 @@ public class KafkaIT extends TestSuiteBase implements TestResource {

private List<ConsumerRecord<String, String>> nativeData;

/** Topics created dynamically during tests; cleaned up in {@link #tearDown()}. */
private final List<String> dynamicTopics = new CopyOnWriteArrayList<>();

@BeforeAll
@Override
public void startUp() throws Exception {
Expand Down Expand Up @@ -266,6 +274,14 @@ public void startUp() throws Exception {
@AfterAll
@Override
public void tearDown() throws Exception {
if (!dynamicTopics.isEmpty()) {
try (AdminClient adminClient = createKafkaAdmin()) {
adminClient.deleteTopics(dynamicTopics).all().get();
log.info("Deleted {} dynamic test topics", dynamicTopics.size());
} catch (Exception e) {
log.warn("Failed to delete dynamic test topics: {}", e.getMessage());
}
}
if (producer != null) {
producer.close();
}
Expand Down Expand Up @@ -1579,11 +1595,16 @@ public void testKafkaProtobufToAssert(TestContainer container)
value = {})
public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
throws InterruptedException, IOException {

String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String resourceSuffix = Long.toUnsignedString(System.nanoTime());
String producerTopic = "kafka_topic_exactly_once_source_" + resourceSuffix;
String consumerTopic = "kafka_topic_exactly_once_sink_" + resourceSuffix;
String consumerGroup = "test_exactly_once_" + resourceSuffix;
List<String> exactlyOnceVariables =
buildExactlyOnceStreamingVariables(producerTopic, consumerTopic, consumerGroup);
createKafkaTopic(producerTopic);
createKafkaTopic(consumerTopic);
String sourceData = "Seatunnel Exactly Once Example";
final String jobId = "18696753645413";
final String jobId = Long.toUnsignedString(System.nanoTime());
long sinkStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
Expand All @@ -1596,7 +1617,9 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe
() -> {
try {
container.executeJob(
"/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId);
"/kafka/kafka_to_kafka_exactly_once_streaming.conf",
jobId,
exactlyOnceVariables.toArray(new String[0]));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -1630,7 +1653,9 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe
() -> {
try {
container.restoreJob(
"/kafka/kafka_to_kafka_exactly_once_streaming.conf", jobId);
"/kafka/kafka_to_kafka_exactly_once_streaming.conf",
jobId,
exactlyOnceVariables.toArray(new String[0]));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -1655,9 +1680,14 @@ public void testRestoreKafkaToKafkaExactlyOnceOnStreaming(TestContainer containe
type = EngineType.SPARK,
value = {})
public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container) {

String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String resourceSuffix = Long.toUnsignedString(System.nanoTime());
String producerTopic = "kafka_topic_exactly_once_source_" + resourceSuffix;
String consumerTopic = "kafka_topic_exactly_once_sink_" + resourceSuffix;
String consumerGroup = "test_exactly_once_" + resourceSuffix;
List<String> exactlyOnceVariables =
buildExactlyOnceStreamingVariables(producerTopic, consumerTopic, consumerGroup);
createKafkaTopic(producerTopic);
createKafkaTopic(consumerTopic);
String sourceData = "Seatunnel Exactly Once Example";
long sinkStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
Expand All @@ -1671,7 +1701,9 @@ public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container) {
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/kafka/kafka_to_kafka_exactly_once_streaming.conf");
container.executeJob(
"/kafka/kafka_to_kafka_exactly_once_streaming.conf",
exactlyOnceVariables);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
Expand Down Expand Up @@ -2013,6 +2045,34 @@ private AdminClient createKafkaAdmin() {
return AdminClient.create(props);
}

/**
* Create a dedicated Kafka topic for the exactly-once tests so each method reads its own data
* and never reuses offsets from earlier runs in the same class.
*/
private void createKafkaTopic(String topicName) {
NewTopic topic = new NewTopic(topicName, 1, (short) 1);
topic.configs(Collections.singletonMap("retention.ms", "-1"));
try (AdminClient adminClient = createKafkaAdmin()) {
adminClient.createTopics(Collections.singletonList(topic)).all().get();
dynamicTopics.add(topicName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Interrupted while creating Kafka topic " + topicName, e);
} catch (ExecutionException e) {
throw new IllegalStateException("Failed to create Kafka topic " + topicName, e);
}
}

/** Build the dynamic `-i key=value` variables for the exactly-once streaming template. */
private List<String> buildExactlyOnceStreamingVariables(
String sourceTopic, String sinkTopic, String consumerGroup) {
return Arrays.asList(
EXACTLY_ONCE_SOURCE_TOPIC_VARIABLE + "=" + sourceTopic,
EXACTLY_ONCE_SINK_TOPIC_VARIABLE + "=" + sinkTopic,
EXACTLY_ONCE_CONSUMER_GROUP_VARIABLE + "=" + consumerGroup);
}

private void initKafkaProducer() {
Properties props = new Properties();
String bootstrapServers = kafkaContainer.getBootstrapServers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ env {
source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "kafka_topic_exactly_once_1"
consumer.group = "test_exactly_once"
topic = "${sourceTopic}"
consumer.group = "${consumerGroup}"
# The default format is json, which is optional
format = text
start_mode = group_offsets
Expand All @@ -42,7 +42,7 @@ transform {}
sink{
kafka {
format = text
topic = "kafka_topic_exactly_once_2"
topic = "${sinkTopic}"
bootstrap.servers = "kafkaCluster:9092"
semantics = EXACTLY_ONCE
kafka.config = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ public class ClusterFailureNoRestoreIT {
private static final String TEST_TEMPLATE_FILE_NAME =
"cluster_batch_fake_to_localfile_no_restore_template.conf";

/**
* Keep the bounded fake source busy long enough to observe worker shutdown before the batch job
* converges on its own.
*/
private static final long NO_RESTORE_BATCH_ROW_NUM_PER_PARALLELISM = 20_000L;

/** Wait for the batch job to enter the steady RUNNING state before shutting a worker down. */
private static final long PRE_SHUTDOWN_RUNNING_TIMEOUT_SECONDS = 30L;

/**
* Give the running batch topology a short warm-up window before shutting down a worker. The
* LocalFile sink used by this test commits files transactionally, so intermediate file lines
* are not a reliable progress signal while the job is still running.
*/
private static final long PRE_SHUTDOWN_RUNNING_GRACE_SECONDS = 5L;

private static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";

private static final String DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM =
Expand All @@ -68,7 +84,7 @@ public void testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown()
throws Exception {
String testCaseName = "testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown";
String testClusterName = "ClusterFailureNoRestoreIT_batch_no_restore";
long testRowNumber = 10000;
long testRowNumber = NO_RESTORE_BATCH_ROW_NUM_PER_PARALLELISM;
int testParallelism = 6;

HazelcastInstanceImpl node1 = null;
Expand Down Expand Up @@ -108,21 +124,13 @@ public void testBatchJobWithoutCheckpointAndRetryConvergesAfterWorkerShutdown()
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(PRE_SHUTDOWN_RUNNING_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Long lineNumberFromDir =
FileUtils.getFileLineNumberFromDir(testResources.getLeft());
JobStatus status = clientJobProxy.getJobStatus();
log.warn(
"\n====================={}=====================\n",
lineNumberFromDir);
Assertions.assertTrue(lineNumberFromDir > 1);
Assertions.assertFalse(
status.isEndState(),
"job finished before worker shutdown: " + status);
});
() ->
Assertions.assertEquals(
JobStatus.RUNNING, clientJobProxy.getJobStatus()));

TimeUnit.SECONDS.sleep(PRE_SHUTDOWN_RUNNING_GRACE_SECONDS);

CompletableFuture<JobResult> waitForCompleteFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2);
Expand Down
Loading