Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: run validatesRunnerStreaming script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreaming
gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingEngine
max-workers: 12
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v7
Expand Down
1 change: 1 addition & 0 deletions buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ dependencies {
runtimeOnly("ca.cutterslade.gradle:gradle-dependency-analyze:1.8.3") // Enable dep analysis
runtimeOnly("gradle.plugin.net.ossindex:ossindex-gradle-plugin:0.4.11") // Enable dep vulnerability analysis
runtimeOnly("org.checkerframework:checkerframework-gradle-plugin:0.6.56") // Enable enhanced static checking plugin
runtimeOnly("org.gradle.test-retry:org.gradle.test-retry.gradle.plugin:1.6.0")
}

// Because buildSrc is built and tested automatically _before_ gradle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,7 @@ class BeamModulePlugin implements Plugin<Project> {
}

project.apply plugin: "java"
project.apply plugin: "org.gradle.test-retry"

// We create a testRuntimeMigration configuration here to extend
// testImplementation, testRuntimeOnly, and default (similar to what
Expand Down Expand Up @@ -1218,6 +1219,15 @@ class BeamModulePlugin implements Plugin<Project> {
useJUnit {}
// default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests
maxHeapSize = '2g'

def isCI = System.getenv("GITHUB_ACTIONS") != null || System.getenv("JENKINS_HOME") != null
if (project.plugins.hasPlugin('org.gradle.test-retry')) {
retry {
maxRetries = isCI ? 3 : 0
maxFailures = 15
failOnPassedAfterRetry = false
}
}
}

List<String> skipDefRegexes = []
Expand Down
12 changes: 6 additions & 6 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ def createLegacyWorkerValidatesRunnerTest = { Map args ->

systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)

// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
// By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion.
// Can be overridden via -PmaxParallelForks=N.
maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
Expand Down Expand Up @@ -263,9 +263,9 @@ def createRunnerV2ValidatesRunnerTest = { Map args ->
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)

// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
// By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion.
// Can be overridden via -PmaxParallelForks=N.
maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,28 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
ErrorMonitorMessagesHandler messageHandler =
new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());

java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef =
new java.util.concurrent.atomic.AtomicReference<>(Optional.absent());

if (options.isStreaming()) {
if (options.isBlockOnRun()) {
jobSuccess = waitForStreamingJobTermination(job, messageHandler);
jobSuccess = waitForStreamingJobTermination(job, messageHandler, assertionsPassedRef);
} else {
jobSuccess = true;
}
// No metrics in streaming
allAssertionsPassed = Optional.absent();
allAssertionsPassed = assertionsPassedRef.get();
if (!allAssertionsPassed.isPresent()) {
allAssertionsPassed = checkForPAssertSuccess(job);
}
} else {
jobSuccess = waitForBatchJobTermination(job, messageHandler);
allAssertionsPassed = checkForPAssertSuccess(job);
}

if (allAssertionsPassed.isPresent() && allAssertionsPassed.get()) {
jobSuccess = true;
Comment thread
durgaprasadml marked this conversation as resolved.
}

// If there is a certain assertion failure, throw the most precise exception we can.
// There are situations where the metric will not be available, but as long as we recover
// the actionable message from the logs it is acceptable.
Expand All @@ -160,11 +169,13 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
@SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
private boolean waitForStreamingJobTermination(
final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
final DataflowPipelineJob job,
ErrorMonitorMessagesHandler messageHandler,
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef) {
// In streaming, there are infinite retries, so rather than timeout
// we try to terminate early by polling and canceling if we see
// an error message
options.getExecutorService().submit(new CancelOnError(job, messageHandler));
// an error message or when all assertions have succeeded
options.getExecutorService().submit(new CancelOnError(job, messageHandler, this, assertionsPassedRef));

// Whether we canceled or not, this gets the final state of the job or times out
State finalState;
Expand Down Expand Up @@ -373,29 +384,66 @@ private static class CancelOnError implements Callable<Void> {

private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;

public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
private final TestDataflowRunner runner;
private final java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef;

public CancelOnError(
DataflowPipelineJob job,
ErrorMonitorMessagesHandler messageHandler,
TestDataflowRunner runner,
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
this.runner = runner;
this.assertionsPassedRef = assertionsPassedRef;
}

@Override
public Void call() throws Exception {
int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 * 3s)
int steps = 0;
while (true) {
State jobState = job.getState();

// If we see an error, cancel and note failure
if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
job.cancel();
LOG.info("Cancelling Dataflow job {}", job.getJobId());
if (jobState.isTerminal()) {
return null;
}

if (jobState.isTerminal()) {
// Check metrics for early success/failure cancellation
if (steps % checkMetricsIntervalSteps == 0) {
Optional<Boolean> assertionsPassed = runner.checkForPAssertSuccess(job);
if (assertionsPassed.isPresent()) {
assertionsPassedRef.set(assertionsPassed);
if (assertionsPassed.get()) {
LOG.info(
"All assertions passed for streaming job {}, cancelling job.",
job.getJobId());
job.cancel();
return null;
} else {
LOG.info(
"Found failed assertion for streaming job {}, cancelling job.",
job.getJobId());
job.cancel();
return null;
}
}
}
Comment thread
durgaprasadml marked this conversation as resolved.
Outdated

// If we see a terminal error message and no assertions have passed yet, cancel and fail
long runningTimeMillis = steps * 3000L;
if (messageHandler.hasSeenError()
&& !jobState.isTerminal()
&& (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) {
LOG.info(
"Cancelling Dataflow job due to error messages seen: {}",
messageHandler.getErrorMessage());
job.cancel();
return null;
}

Thread.sleep(3000L);
steps++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,54 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with AssertionError
}

@Test
public void testRunStreamingJobEarlySuccess() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);

DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.CANCELLED);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.CANCELLED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");

DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);

when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.run(p, mockRunner);
}
Comment thread
durgaprasadml marked this conversation as resolved.

@Test
public void testRunStreamingJobEarlyFailure() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);

DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.CANCELLED);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.CANCELLED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");

DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);

when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);

expectedException.expect(AssertionError.class);
runner.run(p, mockRunner);
}
Comment thread
durgaprasadml marked this conversation as resolved.

static class TestSuccessMatcher extends BaseMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
private final transient DataflowPipelineJob mockJob;
Expand Down
Loading