(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收空值

(spark-xml) Receiving only null when parsing xml column using from_xml function

我正在尝试使用 spark-xml 解析一个非常简单的 XML 字符串列,但我只能设法接收 null 值,即使 XML 已正确填充。

我用来解析 xml 的 XSD 是:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="note">
        <xs:complexType>
            <xs:sequence>
                <xs:element type="xs:string" name="from"/>
                <xs:element type="xs:string" name="to"/>
                <xs:element type="xs:string" name="message"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>

虽然列中的 XML 显示为字符串,但每个标签都已正确填充:

<?xml version="1.0" encoding="UTF-8"?>
<note>
    <from>Jani</from>
    <to>Tove</to>
    <message>Remember me this weekend</message>
</note>

我用scala写的spark代码是这样的:

    // XML Schema
    val schema = XSDToSchema.read("<the XSD as string>")
    
    // Spark Structured Streaming (N.b. the column value contains the xml as string)
    import spark.implicits._
    var df = initSource(spark)
      .withColumn("parsed", from_xml($"value", schema, 
          Map(
              "mode" -> "FAILFAST",
              "nullValue"-> "",
              "rowTag" -> "note",
              "ignoreSurroundingSpaces" -> "true"
          )
      ))
      .select($"value",$"parsed.note.from", $"parsed.note.to", $"parsed.note.message")
      .writeStream
      .format("console")
      // .option("mode", "FAILFAST")
      // .option("nullValue", "")
      // .option("rowTag", "note")
      // .option("ignoreSurroundingSpaces","true")
      .outputMode("append")
      .start()
      .awaitTermination(30*1000)

打印此 dataFrame 的模式(在 select 语句之前)将给出预期的模式

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- note: struct (nullable = false)
 |    |    |-- from: string (nullable = false)
 |    |    |-- to: string (nullable = false)
 |    |    |-- message: string (nullable = false)

但是当在控制台打印结果时,我得到的只是 null 值,如下所示:

....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
|               value|from|  to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
....

我认为它不相关,但此 xml 专栏的来源来自阅读定义如下的 Kafka 主题:

    def initSource(spark: SparkSession) : DataFrame = {
        spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("startingoffsets", "earliest")
          .option("subscribe", "my-test-topic")
          .load()
          .selectExpr("CAST(value AS STRING)")
    }

有没有其他人遇到并解决了这个问题?我 运行 别无选择,非常感谢对此的提示 :)

我用的spark-xml版本是最新的atm,0.12.0 with spark 3.1.1.

更新

我在调用 writeStream 后错误地传递了 spark-xml 选项,而是需要将它们作为 from_xml 函数的第三个参数传递。我仍然只得到空值...

最后睁开眼睛的是阅读 spark-xml documentation 提到的部分:

Path to an XSD file that is used to validate the XML for each row individually

这意味着架构匹配是通过每一行而不是整个 XML 完成的,在这种情况下,我的示例的架构需要类似于以下内容:

val schema = StructType(Array(
        StructField("from", StringType, nullable = true),
        StructField("to", StringType, nullable = true),
        StructField("message", StringType, nullable = true)))

也可以使用 XSD:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element type="xs:string" name="from"/>
    <xs:element type="xs:string" name="to"/>
    <xs:element type="xs:string" name="message"/>
</xs:schema>

这两种声明模式的方法对我有用。希望对以后的人有所帮助。