diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index aeec8cd8bd..dc848bbabf 100755 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -565,8 +565,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { try { YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); if (yarnAppInfo != null) { - String state = yarnAppInfo.getApp().getFinalStatus(); - flinkAppState = FlinkAppStateEnum.getState(state); + flinkAppState = resolveYarnAppState(yarnAppInfo); } } finally { if (StopFromEnum.NONE.equals(stopFrom)) { @@ -594,8 +593,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { } } else { try { - String state = yarnAppInfo.getApp().getFinalStatus(); - FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.getState(state); + FlinkAppStateEnum flinkAppState = resolveYarnAppState(yarnAppInfo); if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) { return; } @@ -636,6 +634,16 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { } } + static FlinkAppStateEnum resolveYarnAppState(YarnAppInfo yarnAppInfo) { + if (yarnAppInfo == null || yarnAppInfo.getApp() == null) { + return FlinkAppStateEnum.OTHER; + } + FlinkAppStateEnum finalStatus = FlinkAppStateEnum.getState(yarnAppInfo.getApp().getFinalStatus()); + return FlinkAppStateEnum.OTHER.equals(finalStatus) + ? FlinkAppStateEnum.getState(yarnAppInfo.getApp().getState()) + : finalStatus; + } + private void doAlert(FlinkApplication application, FlinkAppStateEnum flinkAppState) { AlertTemplate alertTemplate = AlertTemplateUtils.createAlertTemplate(application, flinkAppState); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcherTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcherTest.java new file mode 100644 index 0000000000..3732deb5ae --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcherTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.watcher; + +import org.apache.streampark.console.core.enums.FlinkAppStateEnum; +import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class FlinkAppHttpWatcherTest { + + @Test + void resolveYarnAppStateFallsBackToStateWhenFinalStatusIsUndefined() { + YarnAppInfo yarnAppInfo = yarnAppInfo("RUNNING", "UNDEFINED"); + + FlinkAppStateEnum state = FlinkAppHttpWatcher.resolveYarnAppState(yarnAppInfo); + + assertThat(state).isEqualTo(FlinkAppStateEnum.RUNNING); + } + + @Test + void resolveYarnAppStatePrefersFinalStatusWhenItIsKnown() { + YarnAppInfo yarnAppInfo = yarnAppInfo("FINISHED", "SUCCEEDED"); + + FlinkAppStateEnum state = FlinkAppHttpWatcher.resolveYarnAppState(yarnAppInfo); + + assertThat(state).isEqualTo(FlinkAppStateEnum.SUCCEEDED); + } + + @Test + void resolveYarnAppStateReturnsOtherWhenAppInfoIsMissing() { + assertThat(FlinkAppHttpWatcher.resolveYarnAppState(null)).isEqualTo(FlinkAppStateEnum.OTHER); + assertThat(FlinkAppHttpWatcher.resolveYarnAppState(new YarnAppInfo())) + .isEqualTo(FlinkAppStateEnum.OTHER); + } + + private YarnAppInfo yarnAppInfo(String state, String finalStatus) { + YarnAppInfo yarnAppInfo = new YarnAppInfo(); + YarnAppInfo.App app = new YarnAppInfo.App(); + app.setState(state); + app.setFinalStatus(finalStatus); + yarnAppInfo.setApp(app); + return yarnAppInfo; + } +}