From 1a8c911f337f66dce56607d23660b2a094f433e9 Mon Sep 17 00:00:00 2001 From: hutiefang76 <137664623+hutiefang76@users.noreply.github.com> Date: Sun, 21 Jun 2026 08:14:53 +0800 Subject: [PATCH] [ISSUE-4315][Bug] Fix multiline dynamic properties parsing --- .../common/util/FlinkConfigurationUtils.scala | 23 +++++++++++++++++-- .../common/util/PropertiesUtilsTestCase.scala | 22 ++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala index f43c0a0b85..b761927253 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala @@ -99,8 +99,9 @@ object FlinkConfigurationUtils extends Logger { @Nonnull def extractDynamicProperties(properties: String): Map[String, String] = { if (StringUtils.isEmpty(properties)) Map.empty[String, String] else { + val normalizedProperties = normalizeDynamicProperties(properties) val map = mutable.Map[String, String]() - val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "") + val simple = normalizedProperties.replaceAll(MULTI_PROPERTY_REGEXP, "") simple.split("\\s?-D") match { case d if Utils.isNotEmpty(d) => d.foreach(x => { @@ -113,7 +114,7 @@ object FlinkConfigurationUtils extends Logger { }) case _ => } - val matcher = MULTI_PROPERTY_PATTERN.matcher(properties) + val matcher = MULTI_PROPERTY_PATTERN.matcher(normalizedProperties) while (matcher.find()) { val opts = matcher.group() val index = opts.indexOf("=") @@ -126,6 +127,24 @@ object FlinkConfigurationUtils extends Logger { } } + private[this] def normalizeDynamicProperties(properties: String): String = { + val normalized = new StringBuilder + properties.split("\\r?\\n").foreach { line => + val trimmed = line.trim + if (trimmed.nonEmpty) { + if (trimmed.startsWith("-D")) { + if (normalized.nonEmpty) { + normalized.append(System.lineSeparator()) + } + normalized.append(trimmed) + } else { + normalized.append(trimmed) + } + } + } + normalized.toString() + } + @Nonnull def extractArguments(args: String): List[String] = { val programArgs = new ArrayBuffer[String]() if (StringUtils.isNotEmpty(args)) { diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 40f9a5afc7..c008c192b2 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -66,4 +66,26 @@ class PropertiesUtilsTestCase { Assertions.assertEquals(map("diy.param.name"), "apache streampark") } + @Test def testDynamicPropertiesWithMultilineValue(): Unit = { + val dynamicProperties = + """ + |-Dexecution.target=yarn-application + |-Dyarn.ship-files=/data/module/flink-1.20.2; + |/opt/module/flink-cdc-3.5.0/test1.pgsql-to-starrocks.yaml; + |/data/module/hadoop/hadoop-3.3.6/etc/hadoop/core-site.xml; + |/data/module/hadoop/hadoop-3.3.6/etc/hadoop/hdfs-site.xml + |-Dyarn.application.queue=flink + |""".stripMargin + + val map = FlinkConfigurationUtils.extractDynamicProperties(dynamicProperties) + Assertions.assertEquals(map("execution.target"), "yarn-application") + Assertions.assertEquals( + map("yarn.ship-files"), + "/data/module/flink-1.20.2;" + + "/opt/module/flink-cdc-3.5.0/test1.pgsql-to-starrocks.yaml;" + + "/data/module/hadoop/hadoop-3.3.6/etc/hadoop/core-site.xml;" + + "/data/module/hadoop/hadoop-3.3.6/etc/hadoop/hdfs-site.xml") + Assertions.assertEquals(map("yarn.application.queue"), "flink") + } + }