Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,46 @@ object FlinkConfigurationUtils extends Logger {
@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
return extractArguments(args.split("\\s+"))
return extractArguments(splitQuotedArguments(args).toArray)
}
programArgs.toList
}

private[this] def splitQuotedArguments(args: String): List[String] = {
val arguments = new ArrayBuffer[String]()
val current = new StringBuilder
var quote = 0.toChar
var escaped = false

args.foreach {
case char if escaped =>
current.append(char)
escaped = false
case '\\' =>
current.append('\\')
escaped = true
case char if quote != 0 =>
current.append(char)
if (char == quote) {
quote = 0.toChar
}
case char @ ('\'' | '"') =>
current.append(char)
quote = char
case char if char.isWhitespace =>
if (current.nonEmpty) {
arguments += current.toString()
current.clear()
}
case char =>
current.append(char)
}
if (current.nonEmpty) {
arguments += current.toString()
}
arguments.toList
}

def extractArguments(array: Array[String]): List[String] = {
val programArgs = new ArrayBuffer[String]()
val iter = array.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ class PropertiesUtilsTestCase {
Assertions.assertTrue(programArgs.contains("username=root"))
}

@Test def testExtractProgramArgsKeepsQuotedKeyValueWithSpaces(): Unit = {
val args =
"kafka_sync_database " +
"--kafka_conf properties.sasl.jaas.config='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";' " +
"--kafka_conf topic=zmn_bigdata_market_format"
val jaasConfig =
"properties.sasl.jaas.config=" +
"org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"user\" password=\"pass\";"

val programArgs = FlinkConfigurationUtils.extractArguments(args)

Assertions.assertTrue(programArgs.contains(jaasConfig))
}

@Test def testDynamicProperties(): Unit = {
val dynamicProperties =
"""
Expand Down
Loading