diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index 83a09a227c..b70ab9d525 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -37,6 +37,8 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger { private[this] lazy val FLINK_SCALA_VERSION_PATTERN = Pattern.compile("^flink-dist_(\\d\\.\\d+).*.jar$") + private[this] lazy val VERSION_SEGMENT_PATTERN = Pattern.compile("^(\\d+).*$") + private[this] lazy val APACHE_FLINK_VERSION_PATTERN = Pattern.compile("(^\\d+\\.\\d+\\.\\d+)") private[this] lazy val OTHER_FLINK_VERSION_PATTERN = Pattern.compile("(\\d+\\.\\d+)(-*)") @@ -125,7 +127,7 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger { } def checkVersion(throwException: Boolean = true): Boolean = { - version.split("\\.").map(_.trim.toInt) match { + versionSegments match { case Array(1, v, _) if v >= 12 && v <= 20 => true case _ => if (throwException) { @@ -137,12 +139,23 @@ class FlinkVersion(val flinkHome: String) extends Serializable with Logger { } def checkVersion(sinceVersion: Int): Boolean = { - version.split("\\.").map(_.trim.toInt) match { + versionSegments match { case Array(1, v, _) if v >= sinceVersion => true case _ => false } } + private[this] def versionSegments: Array[Int] = { + version.split("\\.").map { segment => + val matcher = VERSION_SEGMENT_PATTERN.matcher(segment.trim) + if (matcher.matches()) { + matcher.group(1).toInt + } else { + throw new NumberFormatException(s"""For input string: "$segment"""") + } + } + } + // StreamPark flink shims version, like "streampark-flink-shims_flink-1.13" private lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion" diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/conf/FlinkVersionTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/conf/FlinkVersionTest.scala new file mode 100644 index 0000000000..1267e0bc38 --- /dev/null +++ b/streampark-common/src/test/scala/org/apache/streampark/common/conf/FlinkVersionTest.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.conf + +import org.junit.jupiter.api.{Assertions, Test} + +class FlinkVersionTest { + + @Test def checkVersionShouldAcceptVendorSuffixedPatchVersions(): Unit = { + Assertions.assertTrue(flinkVersion("1.19.1-amzn-1").checkVersion()) + } + + @Test def checkVersionShouldRejectUnsupportedVendorSuffixedPatchVersions(): Unit = { + Assertions.assertFalse(flinkVersion("1.11.1-amzn-1").checkVersion(false)) + } + + @Test def checkVersionWithSinceVersionShouldAcceptVendorSuffixedPatchVersions(): Unit = { + Assertions.assertTrue(flinkVersion("1.19.1-amzn-1").checkVersion(19)) + } + + private def flinkVersion(versionString: String): FlinkVersion = + new FlinkVersion("unused") { + override lazy val version: String = versionString + } +}