From 494d3b9d8b237855f9f9a569b958f91c7339d117 Mon Sep 17 00:00:00 2001 From: hutiefang Date: Mon, 22 Jun 2026 10:00:31 +0800 Subject: [PATCH] [ISSUE-4203][Bug] Preserve quoted program args --- .../common/util/FlinkConfigurationUtils.scala | 37 ++++++++++++++++++- .../common/util/PropertiesUtilsTestCase.scala | 15 ++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala index f43c0a0b85..31069a5db1 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FlinkConfigurationUtils.scala @@ -129,11 +129,46 @@ object FlinkConfigurationUtils extends Logger { @Nonnull def extractArguments(args: String): List[String] = { val programArgs = new ArrayBuffer[String]() if (StringUtils.isNotEmpty(args)) { - return extractArguments(args.split("\\s+")) + return extractArguments(splitQuotedArguments(args).toArray) } programArgs.toList } + private[this] def splitQuotedArguments(args: String): List[String] = { + val arguments = new ArrayBuffer[String]() + val current = new StringBuilder + var quote = 0.toChar + var escaped = false + + args.foreach { + case char if escaped => + current.append(char) + escaped = false + case '\\' => + current.append('\\') + escaped = true + case char if quote != 0 => + current.append(char) + if (char == quote) { + quote = 0.toChar + } + case char @ ('\'' | '"') => + current.append(char) + quote = char + case char if char.isWhitespace => + if (current.nonEmpty) { + arguments += current.toString() + current.clear() + } + case char => + current.append(char) + } + if (current.nonEmpty) { + arguments += current.toString() + } + arguments.toList + } + def extractArguments(array: Array[String]): List[String] = { val programArgs = new ArrayBuffer[String]() val iter = array.iterator diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 40f9a5afc7..67c5e7385c 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -43,6 +43,21 @@ class PropertiesUtilsTestCase { Assertions.assertTrue(programArgs.contains("username=root")) } + @Test def testExtractProgramArgsKeepsQuotedKeyValueWithSpaces(): Unit = { + val args = + "kafka_sync_database " + + "--kafka_conf properties.sasl.jaas.config='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";' " + + "--kafka_conf topic=zmn_bigdata_market_format" + val jaasConfig = + "properties.sasl.jaas.config=" + + "org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=\"user\" password=\"pass\";" + + val programArgs = FlinkConfigurationUtils.extractArguments(args) + + Assertions.assertTrue(programArgs.contains(jaasConfig)) + } + @Test def testDynamicProperties(): Unit = { val dynamicProperties = """