diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 930a4d4dec79..b0b557cf87e5 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -1240,7 +1240,8 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 60 + # Kudu E2E expands each @TestTemplate case across several PR test containers. + timeout-minutes: 90 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java index 374571d0c4c0..8c5794886e91 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java @@ -47,11 +47,7 @@ import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.PulsarContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import lombok.extern.slf4j.Slf4j; @@ -62,7 +58,6 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; @Slf4j public class PulsarBatchIT extends TestSuiteBase implements TestResource { @@ -113,15 +108,12 @@ public class PulsarBatchIT extends TestSuiteBase implements TestResource { @BeforeAll public void startUp() throws Exception { pulsarContainer = - new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) - .withNetwork(NETWORK) - .withNetworkAliases(PULSAR_HOST) - .withStartupTimeout(Duration.ofMinutes(3)) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME))); - - Startables.deepStart(Stream.of(pulsarContainer)).join(); + PulsarContainerSupport.startPulsarContainer( + dockerClient, + PULSAR_IMAGE_NAME, + NETWORK, + PULSAR_HOST, + Duration.ofMinutes(3)); Awaitility.given() .ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java new file mode 100644 index 000000000000..ad9c0bc49279 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.pulsar; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.exception.NotFoundException; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +final class PulsarContainerSupport { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerSupport.class); + private static final Duration IMAGE_PULL_TIMEOUT = Duration.ofMinutes(10); + + private PulsarContainerSupport() {} + + /** + * Pre-pulls the Pulsar image before Testcontainers starts the container. This keeps the first + * CI pull from failing when GitHub runners briefly make no download progress and Testcontainers + * aborts the pull attempt early. + */ + static PulsarContainer startPulsarContainer( + DockerClient dockerClient, + String imageName, + Network network, + String networkAlias, + Duration startupTimeout) + throws InterruptedException { + ensureImageAvailable(dockerClient, imageName); + + PulsarContainer pulsarContainer = + new PulsarContainer(DockerImageName.parse(imageName)) + .withNetwork(network) + .withNetworkAliases(networkAlias) + .withStartupTimeout(startupTimeout) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName))); + + Startables.deepStart(Stream.of(pulsarContainer)).join(); + return pulsarContainer; + } + + private static void ensureImageAvailable(DockerClient dockerClient, String imageName) + throws InterruptedException { + if (isImageAvailable(dockerClient, imageName)) { + LOG.info("Reuse local Pulsar image {}", imageName); + return; + } + + DockerImageName dockerImageName = DockerImageName.parse(imageName); + LOG.info( + "Pre-pulling Pulsar image {} with timeout {} to avoid flaky first-pull failures", + imageName, + IMAGE_PULL_TIMEOUT); + boolean completed = + dockerClient + .pullImageCmd(dockerImageName.getUnversionedPart()) + .withTag(dockerImageName.getVersionPart()) + .start() + .awaitCompletion(IMAGE_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + if (!completed || !isImageAvailable(dockerClient, imageName)) { + throw new IllegalStateException( + String.format( + "Failed to pre-pull Pulsar image %s within %s", + imageName, IMAGE_PULL_TIMEOUT)); + } + } + + private static boolean isImageAvailable(DockerClient dockerClient, String imageName) { + try { + dockerClient.inspectImageCmd(imageName).exec(); + return true; + } catch (NotFoundException ignored) { + return false; + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java index 5eef343acd65..ea6c59efd7f6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java @@ -31,11 +31,7 @@ import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.PulsarContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import lombok.extern.slf4j.Slf4j; @@ -43,7 +39,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; @Slf4j public class PulsarMultiTableIT extends TestSuiteBase implements TestResource { @@ -60,14 +55,12 @@ public class PulsarMultiTableIT extends TestSuiteBase implements TestResource { @BeforeAll public void startUp() throws Exception { pulsarContainer = - new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) - .withNetwork(NETWORK) - .withNetworkAliases(PULSAR_HOST) - .withStartupTimeout(Duration.ofMinutes(3)) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME))); - Startables.deepStart(Stream.of(pulsarContainer)).join(); + PulsarContainerSupport.startPulsarContainer( + dockerClient, + PULSAR_IMAGE_NAME, + NETWORK, + PULSAR_HOST, + Duration.ofMinutes(3)); Awaitility.given() .ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java index 829c9b7cc5b8..252fdcbd1244 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java @@ -35,11 +35,7 @@ import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; import org.testcontainers.containers.PulsarContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import lombok.extern.slf4j.Slf4j; @@ -49,7 +45,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import static java.time.temporal.ChronoUnit.SECONDS; @@ -65,15 +60,12 @@ public class PulsarSinkIT extends TestSuiteBase implements TestResource { @BeforeAll public void startUp() throws Exception { pulsarContainer = - new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) - .withNetwork(NETWORK) - .withNetworkAliases(PULSAR_HOST) - .withStartupTimeout(Duration.of(400, SECONDS)) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME))); - - Startables.deepStart(Stream.of(pulsarContainer)).join(); + PulsarContainerSupport.startPulsarContainer( + dockerClient, + PULSAR_IMAGE_NAME, + NETWORK, + PULSAR_HOST, + Duration.of(400, SECONDS)); Awaitility.given() .ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java index 632c44cbed0e..e12c5b6c01c7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.trace; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; @@ -88,7 +89,11 @@ public static AppendResult append( } byte[] newPayload = Arrays.copyOf(payload, payload.length + ENTRY_LENGTH); ByteBuffer buffer = ByteBuffer.wrap(newPayload).order(ByteOrder.BIG_ENDIAN); - buffer.position(payload.length); + // Cast to Buffer (superclass) so that position(int) resolves to Buffer#position(int) + // returning Buffer, which exists on JDK 8. ByteBuffer overrode this method with a + // covariant ByteBuffer return type only in JDK 9+; calling the ByteBuffer override on + // JDK 8 would throw NoSuchMethodError at runtime. + ((Buffer) buffer).position(payload.length); buffer.put(stage.getCode()); buffer.putLong(taskId); buffer.putLong(tsMs);