如何从Kafka读取XML格式的流数据?

How to read streaming data in XML format from Kafka?

我正在尝试使用 Spark 结构化流从 Kafka 主题读取 XML 数据。

我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,提示该包不支持流式阅读。有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据吗?

我当前的代码:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)

您不能以这种方式混合格式。 Kafka 源加载为 Row,包括值的数量,如 keyvaluetopicvalue 列存储 payload as a binary type:

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

...

value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

解析此内容是用户的责任,不能委托给其他数据源。例如,参见我对 .

的回答

对于 XML,您可能需要一个 UDF (UserDefinedFunction),尽管您可以先尝试 Hive XPath functions。您还应该解码二进制数据。

.format("kafka") \
.format('com.databricks.spark.xml') \

最后一个 com.databricks.spark.xml 获胜并成为流媒体源(隐藏 Kafka 作为源)。

换句话说,以上相当于单独.format('com.databricks.spark.xml')

正如您所经历的,Databricks spark-xml 包不支持流式阅读(即不能充当流式源)。该套餐不适用于直播。

Is there any way I can extract XML data from Kafka topic using structured streaming?

您只能使用标准函数或 UDF 自己访问和处理 XML。在 Spark 2.2.0 之前的结构化流中没有对流 XML 处理的内置支持。

无论如何,这应该不是什么大问题。 Scala 代码可能如下所示。

val input = spark.
  readStream.
  format("kafka").
  ...
  load

val values = input.select('value cast "string")

val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))

// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start

另一种可能的解决方案是编写您自己的自定义流 Source 来处理 def getBatch(start: Option[Offset], end: Offset): DataFrame 中的 XML 格式。 应该有效。

import xml.etree.ElementTree as ET
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

然后我写了一个python UDF

def parse(s):
  xml = ET.fromstring(s)
  ns = {'real_person': 'http://people.example.com',
      'role': 'http://characters.example.com'}
  actor_el = xml.find("DNmS:actor",ns)

  if(actor_el ):
       actor = actor_el.text
  role_el.find('real_person:role', ns)
  if(role_el):
       role = role_el.text
  return actor+"|"+role

注册此 UDF

extractValuesFromXML = udf(parse)

   XML_DF= df .withColumn("mergedCol",extractroot("value"))

   AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\|").getItem(0))\
        .withColumn("Role", split(col("mergedCol"), "\|").getItem(1))

看起来上面的方法可行,但它没有使用传递的模式来解析 XML 文档。

如果打印关系模式,它总是

INFO  XmlToAvroConverter - .convert() : XmlRelation Schema ={} root
 |-- fields: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- nullable: boolean (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)

例如:我正在关注 XML 来自 Kafka 主题的文档

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<Book>
<Author>John Doe</Author>
<Title>Test</Title>
<PubishedDate></PublishedDate>
</Book>

这是我必须将 XML 解析为 DataFrame

的代码
kafkaValueAsStringDF = kafakDF.selectExpr("CAST(key AS STRING) msgKey","CAST(value AS STRING) xmlString")

  var parameters = collection.mutable.Map.empty[String, String]

  parameters.put("rowTag", "Book")

kafkaValueAsStringDF.writeStream.foreachBatch {
          (batchDF: DataFrame, batchId: Long) =>

 val xmlStringDF:DataFrame = batchDF.selectExpr("xmlString")

            xmlStringDF.printSchema()

            val rdd: RDD[String] = xmlStringDF.as[String].rdd


            val relation = XmlRelation(
              () => rdd,
              None,
              parameters.toMap,
              xmlSchema)(spark.sqlContext)


            logger.info(".convert() : XmlRelation Schema ={} "+relation.schema.treeString)

}
        .start()
        .awaitTermination()

当我从文件系统或 S3 读取相同的 XML 文档并使用 spark-xml 时,它正在按预期解析模式。

谢谢 萨提什

您可以使用 SQL built-in 函数 xpath 等从作为 value 的嵌套 XML 结构中提取数据卡夫卡消息。

给定一个嵌套的 XML 赞

<root>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <Finance>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </Finance>
  </InputData>
</root>

然后您可以在 selectExpr 语句中使用这些 SQL 函数,如下所示:

df.readStream.format("kafka").options(...).load()
  .selectExpr("CAST(value AS STRING) as value")
  .selectExpr(
    "xpath(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(value, '/CofiResults/InputData/Finance/HeaderSegment/Version/text()') as VersionAsInt")

请记住,xpath 函数将 return 一个 Array 字符串,而您可能会发现将值提取为字符串甚至 Long 更方便.在带有控制台接收器流的 Spark 3.0.1 中应用上面的代码将导致:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+