From 14c142b49afc613ac8ddb556d5a840a4a528e363 Mon Sep 17 00:00:00 2001 From: shangeyao Date: Thu, 25 Jun 2026 00:03:26 +0800 Subject: [PATCH] [Common][Spark] Support Spark 4.x env registration without upgrading service JDK Parse Spark version from installation files before invoking spark-submit, and auto-resolve JAVA_HOME from spark-env.sh for Spark job submission on Yarn. Generated-by: Cursor Co-authored-by: Cursor --- .../streampark/common/conf/SparkVersion.scala | 139 ++++++++++++++---- .../common/util/SparkEnvUtils.scala | 100 +++++++++++++ .../common/conf/SparkVersionTest.scala | 62 ++++++++ .../common/util/SparkEnvUtilsTest.scala | 92 ++++++++++++ .../src/main/assembly/conf/streampark-env.sh | 4 + .../src/main/assembly/script/README.md | 7 + .../main/assembly/script/SPARK_JDK_GUIDE.md | 80 ++++++++++ .../assembly/script/SPARK_JDK_GUIDE.zh.md | 75 ++++++++++ .../src/locales/lang/en/spark/home.ts | 2 +- .../src/locales/lang/zh-CN/spark/home.ts | 2 +- .../spark/client/impl/YarnClient.scala | 12 +- 11 files changed, 540 insertions(+), 35 deletions(-) create mode 100644 streampark-common/src/main/scala/org/apache/streampark/common/util/SparkEnvUtils.scala create mode 100644 streampark-common/src/test/scala/org/apache/streampark/common/conf/SparkVersionTest.scala create mode 100644 streampark-common/src/test/scala/org/apache/streampark/common/util/SparkEnvUtilsTest.scala create mode 100644 streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.md create mode 100644 streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.zh.md diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala index cbbe113218..7421e80a29 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala @@ -17,10 +17,13 @@ package org.apache.streampark.common.conf -import org.apache.streampark.common.util.{CommandUtils, Logger} +import org.apache.streampark.common.util.{CommandUtils, Logger, SparkEnvUtils} import org.apache.streampark.common.util.Implicits._ +import org.apache.commons.io.FileUtils + import java.io.File +import java.nio.charset.StandardCharsets import java.util.function.Consumer import java.util.regex.Pattern @@ -35,36 +38,19 @@ class SparkVersion(val sparkHome: String) extends Serializable with Logger { private[this] lazy val SPARK_SCALA_VERSION_PATTERN = Pattern.compile("Using\\sScala\\sversion\\s(\\d+\\.\\d+)") - val (version, scalaVersion) = { - var sparkVersion: String = null - var scalaVersion: String = null - val cmd = List(s"export SPARK_HOME=$sparkHome&&$sparkHome/bin/spark-submit --version") - val buffer = new mutable.StringBuilder + private[this] lazy val SPARK_RELEASE_VERSION_PATTERN = Pattern.compile("^Spark\\s+(\\d+\\.\\d+\\.\\d+)") - CommandUtils.execute( - sparkHome, - cmd, - new Consumer[String]() { - override def accept(out: String): Unit = { - buffer.append(out).append("\n") - val matcher = SPARK_VERSION_PATTERN.matcher(out) - if (matcher.find) { - sparkVersion = matcher.group(1) - } else { - val matcher1 = SPARK_SCALA_VERSION_PATTERN.matcher(out) - if (matcher1.find) { - scalaVersion = matcher1.group(1) - } - } - } - }) + private[this] lazy val SPARK_CORE_JAR_PATTERN = + Pattern.compile("^spark-core_(\\d+\\.\\d+)-(\\d+\\.\\d+\\.\\d+)\\.jar$") - logInfo(buffer.toString()) - if (sparkVersion == null || scalaVersion == null) { - throw new IllegalStateException(s"[StreamPark] parse spark version failed. $buffer") - } - buffer.clear() - (sparkVersion, scalaVersion) + val (version, scalaVersion) = { + parseFromSparkCoreJar() + .orElse(parseFromReleaseFile()) + .orElse(parseFromSparkSubmit()) + .getOrElse( + throw new IllegalStateException( + s"[StreamPark] parse spark version failed for sparkHome: $sparkHome. " + + "Please check whether $SPARK_HOME/jars/spark-core_*.jar or RELEASE exists.")) } lazy val majorVersion: String = { @@ -79,19 +65,22 @@ class SparkVersion(val sparkHome: String) extends Serializable with Logger { lazy val fullVersion: String = s"${version}_$scalaVersion" + /** Resolved JAVA_HOME for Spark CLI and SparkLauncher, based on spark-env.sh or auto-detection. */ + lazy val javaHome: Option[String] = SparkEnvUtils.resolveJavaHome(sparkHome, version) + lazy val sparkLib: File = { require(sparkHome != null, "[StreamPark] sparkHome must not be null.") require(new File(sparkHome).exists(), "[StreamPark] sparkHome must be exists.") val lib = new File(s"$sparkHome/jars") require( lib.exists() && lib.isDirectory, - s"[StreamPark] $sparkHome/lib must be exists and must be directory.") + s"[StreamPark] $sparkHome/jars must be exists and must be directory.") lib } def checkVersion(throwException: Boolean = true): Boolean = { version.split("\\.").map(_.trim.toInt) match { - case Array(v, _, _) if v == 2 || v == 3 => true + case Array(v, _, _) if v == 2 || v == 3 || v == 4 => true case _ => if (throwException) { throw new UnsupportedOperationException(s"Unsupported spark version: $version") @@ -101,12 +90,100 @@ class SparkVersion(val sparkHome: String) extends Serializable with Logger { } } + private def parseFromSparkCoreJar(): Option[(String, String)] = { + val jarsDir = new File(s"$sparkHome/jars") + if (!jarsDir.exists() || !jarsDir.isDirectory) { + None + } else { + jarsDir.listFiles().collectFirst { + case file if SPARK_CORE_JAR_PATTERN.matcher(file.getName).matches() => + val matcher = SPARK_CORE_JAR_PATTERN.matcher(file.getName) + matcher.matches() + val parsed = matcher.group(2) -> matcher.group(1) + logInfo(s"Spark version parsed from spark-core jar name: ${parsed._1}, scala: ${parsed._2}") + parsed + } + } + } + + private def parseFromReleaseFile(): Option[(String, String)] = { + val releaseFile = new File(s"$sparkHome/RELEASE") + if (!releaseFile.exists()) { + None + } else { + val firstLine = FileUtils.readFileToString(releaseFile, StandardCharsets.UTF_8).trim.split("\n").headOption.getOrElse("") + val matcher = SPARK_RELEASE_VERSION_PATTERN.matcher(firstLine) + if (matcher.find()) { + parseFromSparkCoreJar().map { case (_, scalaVer) => + val parsed = matcher.group(1) -> scalaVer + logInfo(s"Spark version parsed from RELEASE file: ${parsed._1}, scala: ${parsed._2}") + parsed + } + } else { + None + } + } + } + + private def hintSparkVersion(): String = { + parseFromSparkCoreJar().map(_._1).orElse { + val releaseFile = new File(s"$sparkHome/RELEASE") + if (!releaseFile.exists()) { + None + } else { + val firstLine = + FileUtils.readFileToString(releaseFile, StandardCharsets.UTF_8).trim.split("\n").headOption.getOrElse("") + val matcher = SPARK_RELEASE_VERSION_PATTERN.matcher(firstLine) + if (matcher.find()) Some(matcher.group(1)) else None + } + }.getOrElse("3.0.0") + } + + private def parseFromSparkSubmit(): Option[(String, String)] = { + var sparkVersion: String = null + var scalaVersion: String = null + val javaHomeExport = SparkEnvUtils + .resolveJavaHome(sparkHome, hintSparkVersion()) + .map(javaHome => s"export JAVA_HOME=$javaHome&&") + .getOrElse("") + val cmd = List(s"export SPARK_HOME=$sparkHome&&${javaHomeExport}$sparkHome/bin/spark-submit --version") + val buffer = new mutable.StringBuilder + + CommandUtils.execute( + sparkHome, + cmd, + new Consumer[String]() { + override def accept(out: String): Unit = { + buffer.append(out).append("\n") + val matcher = SPARK_VERSION_PATTERN.matcher(out) + if (matcher.find) { + sparkVersion = matcher.group(1) + } else { + val matcher1 = SPARK_SCALA_VERSION_PATTERN.matcher(out) + if (matcher1.find) { + scalaVersion = matcher1.group(1) + } + } + } + }) + + logInfo(buffer.toString()) + if (sparkVersion != null && scalaVersion != null) { + logInfo(s"Spark version parsed from spark-submit: $sparkVersion, scala: $scalaVersion") + buffer.clear() + Some(sparkVersion -> scalaVersion) + } else { + None + } + } + override def toString: String = s""" |----------------------------------------- spark version ----------------------------------- | sparkHome : $sparkHome | sparkVersion : $version | scalaVersion : $scalaVersion + | javaHome : ${javaHome.getOrElse("not resolved")} |------------------------------------------------------------------------------------------- |""".stripMargin diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkEnvUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkEnvUtils.scala new file mode 100644 index 0000000000..85d11d8673 --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SparkEnvUtils.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import java.io.File +import java.nio.charset.StandardCharsets +import java.util.regex.Pattern + +import scala.util.Try + +object SparkEnvUtils extends Logger { + + private[this] lazy val JAVA_HOME_PATTERN = + Pattern.compile("""(?:^|\n)\s*(?:export\s+)?JAVA_HOME\s*=\s*(?:["']([^"']+)["']|(\S+))""") + + /** Minimum Java major version required by the given Spark version string. */ + def requiredJavaMajorVersion(sparkVersion: String): Int = { + sparkVersion.split("\\.").headOption.flatMap(v => Try(v.trim.toInt).toOption) match { + case Some(major) if major >= 4 => 17 + case _ => 8 + } + } + + /** + * Resolve JAVA_HOME for Spark CLI and SparkLauncher. + * + * Resolution order: + * 1. `$SPARK_HOME/conf/spark-env.sh` + * 2. process environment `JAVA_HOME` + * 3. system auto-detection (macOS `/usr/libexec/java_home`, common Linux paths) + */ + def resolveJavaHome(sparkHome: String, sparkVersion: String): Option[String] = { + val minVersion = requiredJavaMajorVersion(sparkVersion) + parseJavaHomeFromSparkEnv(sparkHome) + .filter(isValidJavaHome) + .orElse(Option(System.getenv("JAVA_HOME")).filter(isValidJavaHome)) + .orElse(detectSystemJavaHome(minVersion).filter(isValidJavaHome)) + } + + def parseJavaHomeFromSparkEnv(sparkHome: String): Option[String] = { + val sparkEnvFile = new File(sparkHome, "conf/spark-env.sh") + if (!sparkEnvFile.exists()) { + None + } else { + val content = org.apache.commons.io.FileUtils.readFileToString(sparkEnvFile, StandardCharsets.UTF_8) + extractJavaHome(content) + } + } + + private[util] def extractJavaHome(content: String): Option[String] = { + val matcher = JAVA_HOME_PATTERN.matcher(content) + var result: Option[String] = None + while (matcher.find() && result.isEmpty) { + val value = Option(matcher.group(1)).getOrElse(matcher.group(2)) + if (value != null && value.nonEmpty && !value.startsWith("#")) { + result = Some(value.trim) + } + } + result + } + + private def detectSystemJavaHome(minMajor: Int): Option[String] = { + val os = System.getProperty("os.name", "").toLowerCase + if (os.contains("mac")) { + Try { + val (code, output) = CommandUtils.execute(s"/usr/libexec/java_home -v $minMajor 2>/dev/null") + if (code == 0 && output.trim.nonEmpty) Some(output.trim) else None + }.getOrElse(None) + } else { + val candidates = List( + Option(System.getenv(s"JAVA${minMajor}_HOME")), + Option(s"/usr/lib/jvm/java-$minMajor-openjdk"), + Option(s"/usr/lib/jvm/java-$minMajor-openjdk-amd64"), + Option(s"/usr/lib/jvm/java-$minMajor")) + .flatten + .filter(isValidJavaHome) + candidates.headOption + } + } + + private def isValidJavaHome(javaHome: String): Boolean = { + javaHome != null && javaHome.nonEmpty && new File(javaHome, "bin/java").exists() + } + +} diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/conf/SparkVersionTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/conf/SparkVersionTest.scala new file mode 100644 index 0000000000..6b84a1c756 --- /dev/null +++ b/streampark-common/src/test/scala/org/apache/streampark/common/conf/SparkVersionTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.conf + +import org.apache.commons.io.FileUtils +import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.io.TempDir + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Path + +class SparkVersionTest { + + @TempDir + private var tempDir: Path = _ + + private var sparkHome: File = _ + + @AfterEach + def cleanup(): Unit = { + if (sparkHome != null && sparkHome.exists()) { + FileUtils.deleteDirectory(sparkHome) + } + } + + @Test + def parseSparkVersionFromJarWithoutRunningSparkSubmit(): Unit = { + sparkHome = tempDir.resolve("spark-4.1.2").toFile + val jarsDir = new File(sparkHome, "jars") + jarsDir.mkdirs() + new File(jarsDir, "spark-core_2.13-4.1.2.jar").createNewFile() + FileUtils.writeStringToFile( + new File(sparkHome, "RELEASE"), + "Spark 4.1.2 (git revision f0bb2e6a47d) built for Hadoop 3.4.2\n", + StandardCharsets.UTF_8) + + val sparkVersion = new SparkVersion(sparkHome.getAbsolutePath) + + assertEquals("4.1.2", sparkVersion.version) + assertEquals("2.13", sparkVersion.scalaVersion) + assertEquals("4.1", sparkVersion.majorVersion) + assertTrue(sparkVersion.checkVersion(false)) + } + +} diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/SparkEnvUtilsTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/SparkEnvUtilsTest.scala new file mode 100644 index 0000000000..3e8ea96a99 --- /dev/null +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/SparkEnvUtilsTest.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.util + +import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.io.TempDir + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Path + +class SparkEnvUtilsTest { + + @TempDir + private var tempDir: Path = _ + + private var sparkHome: File = _ + + @AfterEach + def cleanup(): Unit = { + if (sparkHome != null && sparkHome.exists()) { + org.apache.commons.io.FileUtils.deleteDirectory(sparkHome) + } + } + + @Test + def requiredJavaMajorVersionForSpark4(): Unit = { + assertEquals(17, SparkEnvUtils.requiredJavaMajorVersion("4.1.2")) + assertEquals(8, SparkEnvUtils.requiredJavaMajorVersion("3.5.1")) + assertEquals(8, SparkEnvUtils.requiredJavaMajorVersion("2.4.8")) + } + + @Test + def extractJavaHomeFromSparkEnvContent(): Unit = { + val content = + """ + |# export other settings + |export JAVA_HOME=/usr/lib/jvm/java-17-openjdk + |export HADOOP_CONF_DIR=/etc/hadoop/conf + |""".stripMargin + assertEquals(Some("/usr/lib/jvm/java-17-openjdk"), SparkEnvUtils.extractJavaHome(content)) + } + + @Test + def parseJavaHomeFromSparkEnvFile(): Unit = { + sparkHome = tempDir.resolve("spark").toFile + val confDir = new File(sparkHome, "conf") + confDir.mkdirs() + org.apache.commons.io.FileUtils.writeStringToFile( + new File(confDir, "spark-env.sh"), + "export JAVA_HOME=\"/opt/java/jdk-17\"\n", + StandardCharsets.UTF_8) + + assertEquals(Some("/opt/java/jdk-17"), SparkEnvUtils.parseJavaHomeFromSparkEnv(sparkHome.getAbsolutePath)) + } + + @Test + def resolveJavaHomePrefersSparkEnv(): Unit = { + sparkHome = tempDir.resolve("spark").toFile + val confDir = new File(sparkHome, "conf") + confDir.mkdirs() + val javaHome = tempDir.resolve("jdk-17").toFile + javaHome.mkdirs() + new File(javaHome, "bin").mkdirs() + new File(javaHome, "bin/java").createNewFile() + org.apache.commons.io.FileUtils.writeStringToFile( + new File(confDir, "spark-env.sh"), + s"export JAVA_HOME=${javaHome.getAbsolutePath}\n", + StandardCharsets.UTF_8) + + assertEquals( + Some(javaHome.getAbsolutePath), + SparkEnvUtils.resolveJavaHome(sparkHome.getAbsolutePath, "4.1.2")) + } + +} diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/streampark-env.sh b/streampark-console/streampark-console-service/src/main/assembly/conf/streampark-env.sh index 3fa607db0d..f5ffd7e1f0 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/conf/streampark-env.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/conf/streampark-env.sh @@ -38,3 +38,7 @@ # The java implementation to use. By default, this environment # variable is REQUIRED on ALL platforms except OS X! # export JAVA_HOME= + +# Spark 4.x requires JDK 17+ for job submission. StreamPark resolves JAVA_HOME from each Spark +# installation's conf/spark-env.sh automatically. Configure JAVA_HOME there when auto-detection +# fails. See script/SPARK_JDK_GUIDE.md for details. diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/README.md b/streampark-console/streampark-console-service/src/main/assembly/script/README.md index 0191fe1411..6a1fe17ca3 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/README.md +++ b/streampark-console/streampark-console-service/src/main/assembly/script/README.md @@ -21,3 +21,10 @@ For example: - `1.2.3.sql` needs to be executed when StreamPark is upgraded from `1.2.2` to `1.2.3`. - `1.2.3.sql` and `2.0.0.sql` needs to be executed when StreamPark is upgraded from `1.2.2` to `2.0.0`. + +## Spark JDK Guide + +If you use Spark 4.x or need to configure a dedicated JDK for Spark job submission, see: + +- English: [SPARK_JDK_GUIDE.md](./SPARK_JDK_GUIDE.md) +- 中文: [SPARK_JDK_GUIDE.zh.md](./SPARK_JDK_GUIDE.zh.md) diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.md b/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.md new file mode 100644 index 0000000000..742f10a7f8 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.md @@ -0,0 +1,80 @@ +# Spark JDK Configuration Guide + +StreamPark registers Spark environments and submits Spark jobs on behalf of users. Spark major +versions have different minimum JDK requirements. StreamPark resolves the JDK automatically in +most cases, but some deployments still require explicit configuration. + +## JDK Requirements by Spark Version + +| Spark Version | Minimum JDK | Recommended JDK | +|---------------|-------------|-----------------| +| Spark 2.x | JDK 8 | JDK 8 | +| Spark 3.x | JDK 8 | JDK 8 / JDK 11 | +| Spark 4.x | JDK 17 | JDK 17 / JDK 21 | + +> **Note:** StreamPark Console itself can run on JDK 8. Registering Spark 4.x does **not** require +> upgrading the StreamPark service JDK. + +## What StreamPark Does Automatically + +1. **Spark environment registration** + - Parses Spark version from `$SPARK_HOME/jars/spark-core_*.jar` or the `RELEASE` file. + - Does **not** depend on the StreamPark service JDK. +2. **Spark job submission** + - Resolves `JAVA_HOME` in the following order: + 1. `$SPARK_HOME/conf/spark-env.sh` + 2. process environment variable `JAVA_HOME` + 3. system auto-detection (macOS `/usr/libexec/java_home`, common Linux JDK paths) + +## When Manual JDK Configuration Is Required + +Configure JDK manually when **all** of the following are true: + +- Spark 4.x (or another version with a higher JDK requirement) is used, and +- `JAVA_HOME` is not set in `$SPARK_HOME/conf/spark-env.sh`, and +- the target JDK cannot be auto-detected on the StreamPark host (common on Linux servers with + multiple JDK installations). + +### Recommended: configure Spark `spark-env.sh` + +Edit `$SPARK_HOME/conf/spark-env.sh` on the StreamPark host (create it from +`spark-env.sh.template` if needed): + +```bash +# Spark 4.x example +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +``` + +After updating `spark-env.sh`, re-register or update the Spark environment in StreamPark if the +Spark installation path changed. + +### Alternative: configure StreamPark host environment + +Set `JAVA_HOME` before starting StreamPark only when the same JDK should be used for all Spark +versions on that host: + +```bash +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +./streampark.sh start +``` + +For mixed Spark 2/3 and Spark 4 deployments on one StreamPark instance, prefer configuring +`JAVA_HOME` in each Spark installation's `spark-env.sh` instead of changing the StreamPark +service JDK. + +## Verification + +Check Spark CLI locally with the same `SPARK_HOME`: + +```bash +export SPARK_HOME=/path/to/spark +source $SPARK_HOME/conf/spark-env.sh +$SPARK_HOME/bin/spark-submit --version +``` + +If this command succeeds, StreamPark can submit jobs with the same Spark installation. + +## Related Files + +- StreamPark service JDK: `$STREAMPARK_HOME/conf/streampark-env.sh` +- Spark installation JDK: `$SPARK_HOME/conf/spark-env.sh` diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.zh.md b/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.zh.md new file mode 100644 index 0000000000..6b54651977 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/assembly/script/SPARK_JDK_GUIDE.zh.md @@ -0,0 +1,75 @@ +# Spark JDK 配置说明 + +StreamPark 负责注册 Spark 环境并代为提交 Spark 作业。不同 Spark 大版本对 JDK 的最低要求不同。 +StreamPark 在多数场景下会自动解析 JDK,但部分部署环境仍需要手动配置。 + +## Spark 版本与 JDK 要求 + +| Spark 版本 | 最低 JDK | 推荐 JDK | +|------------|----------|-----------------| +| Spark 2.x | JDK 8 | JDK 8 | +| Spark 3.x | JDK 8 | JDK 8 / JDK 11 | +| Spark 4.x | JDK 17 | JDK 17 / JDK 21 | + +> **说明:** StreamPark Console 本身可以继续运行在 JDK 8 上。注册 Spark 4.x **不需要**升级 +> StreamPark 服务的 JDK。 + +## StreamPark 自动处理的内容 + +1. **注册 Spark 环境** + - 从 `$SPARK_HOME/jars/spark-core_*.jar` 或 `RELEASE` 文件解析版本。 + - **不依赖** StreamPark 服务当前使用的 JDK。 +2. **提交 Spark 作业** + - 按以下顺序解析 `JAVA_HOME`: + 1. `$SPARK_HOME/conf/spark-env.sh` + 2. 进程环境变量 `JAVA_HOME` + 3. 系统自动探测(macOS `/usr/libexec/java_home`、Linux 常见 JDK 路径) + +## 何时需要手动配置 JDK + +当 **同时满足** 以下条件时,需要手动配置: + +- 使用 Spark 4.x(或其他对 JDK 要求更高的版本),且 +- `$SPARK_HOME/conf/spark-env.sh` 中未设置 `JAVA_HOME`,且 +- StreamPark 所在主机无法自动探测到目标 JDK(Linux 多 JDK 环境较常见)。 + +### 推荐方式:配置 Spark 的 `spark-env.sh` + +在 StreamPark 所在主机编辑 `$SPARK_HOME/conf/spark-env.sh`(若不存在,可从 +`spark-env.sh.template` 复制): + +```bash +# Spark 4.x 示例 +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +``` + +修改后,如 Spark 安装路径有变化,请在 StreamPark 中重新注册或更新对应 Spark 环境。 + +### 备选方式:配置 StreamPark 主机环境变量 + +仅当同一台 StreamPark 主机上的所有 Spark 版本共用同一 JDK 时,可在启动 StreamPark 前设置: + +```bash +export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 +./streampark.sh start +``` + +若同一 StreamPark 实例需要同时管理 Spark 2/3 与 Spark 4,请优先在各 Spark 安装的 +`spark-env.sh` 中分别配置 `JAVA_HOME`,而不是修改 StreamPark 服务 JDK。 + +## 验证方式 + +在 StreamPark 主机上使用相同的 `SPARK_HOME` 验证: + +```bash +export SPARK_HOME=/path/to/spark +source $SPARK_HOME/conf/spark-env.sh +$SPARK_HOME/bin/spark-submit --version +``` + +若上述命令可正常执行,StreamPark 通常也能使用该 Spark 安装提交作业。 + +## 相关配置文件 + +- StreamPark 服务 JDK:`$STREAMPARK_HOME/conf/streampark-env.sh` +- Spark 安装 JDK:`$SPARK_HOME/conf/spark-env.sh` diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts index 19742742d6..2f656c4fae 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/spark/home.ts @@ -27,7 +27,7 @@ export default { setDefault: 'Successfully set the default spark home.', sparkName: 'Spark alias, for example: Spark-1.12', sparkHome: - 'The absolute path of the server where Spark is located, for example: /usr/local/spark', + 'The absolute path of the server where Spark is located, for example: /usr/local/spark. Spark 4.x requires JDK 17+. StreamPark service JDK upgrade is usually not required. See script/SPARK_JDK_GUIDE.md in the installation package.', sparkNameIsRequired: 'Spark name is required', sparkHomeIsRequired: 'Spark Home is required', sparkNameIsRepeated: 'Spark name already exists', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/home.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/home.ts index a81d74bcc7..c69f69238d 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/home.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/spark/home.ts @@ -26,7 +26,7 @@ export default { remove: '当前的 spark home 已被成功删除。', setDefault: '成功设置默认spark home', sparkName: 'Spark别名,举例: Spark-1.12', - sparkHome: 'Spark所在服务器的绝对路径,举例: /usr/local/spark', + sparkHome: 'Spark所在服务器的绝对路径,举例: /usr/local/spark。Spark 4.x 需 JDK 17+,通常无需修改 StreamPark 服务 JDK,详见安装包 script/SPARK_JDK_GUIDE.zh.md', sparkNameIsRequired: 'Spark名称必填', sparkHomeIsRequired: 'Spark Home 不能为空', sparkNameIsRepeated: 'Spark名称已存在', diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala index 007b80d06c..974b61bca9 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala @@ -19,7 +19,7 @@ package org.apache.streampark.spark.client.impl import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.SparkDeployMode -import org.apache.streampark.common.util.{HadoopUtils, YarnUtils} +import org.apache.streampark.common.util.{HadoopUtils, SparkEnvUtils, YarnUtils} import org.apache.streampark.common.util.Implicits._ import org.apache.streampark.spark.client.`trait`.SparkClientTrait import org.apache.streampark.spark.client.bean._ @@ -126,7 +126,7 @@ object YarnClient extends SparkClientTrait { if (StringUtils.isNotBlank(submitRequest.hadoopUser)) { env.put("HADOOP_USER_NAME", submitRequest.hadoopUser) } - new SparkLauncher(env) + val sparkLauncher = new SparkLauncher(env) .setSparkHome(submitRequest.sparkVersion.sparkHome) .setAppResource(submitRequest.userJarPath) .setMainClass(submitRequest.appMain) @@ -141,6 +141,14 @@ object YarnClient extends SparkClientTrait { case _ => throw new IllegalArgumentException("[StreamPark][Spark][YarnClient] Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".") }) + SparkEnvUtils + .resolveJavaHome(submitRequest.sparkVersion.sparkHome, submitRequest.sparkVersion.version) + .foreach { javaHome => + env.put("JAVA_HOME", javaHome) + sparkLauncher.setJavaHome(javaHome) + logger.info(s"[StreamPark][Spark][YarnClient] Using JAVA_HOME: $javaHome") + } + sparkLauncher } private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: SparkLauncher): Unit = {