在 Spark Structured Streaming 中读取流数据帧的模式

Reading schema of streaming Dataframe in Spark Structured Streaming

我是 Apache Spark 结构化流媒体的新手。我正在尝试从事件中心读取一些事件(XML 格式)并尝试从嵌套的 XML.

创建新的 Spark DF

我正在使用 https://github.com/databricks/spark-xml 中描述的代码示例,在批处理模式下 运行 完美,但在 Structured Spark Streaming 中则不然。

spark-xmlGithub库的代码块

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

我的批号

val df = Seq(
  (8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2>  <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
  (27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)


val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))

val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))

我正在尝试为 Spark Structured Streaming 执行相同的逻辑,如下代码所示:

结构化流代码

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()

val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

display(final_df.select("parsed.*"))

val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String]) 指令的代码部分中抛出错误 Queries with streaming sources must be executed with writeStream.start();;

更新

尝试过


val streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()
    .select(($"body").cast("string"))

val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()


val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))

现在没有 运行 进入错误,但 Databricks 仍处于“等待状态”

谢谢!!

如果您的代码在批处理模式下运行,则没有任何问题。

重要的是不仅要将源转换为流(通过使用readStreamload),而且还需要将接收部分转换为流。

您收到的错误消息只是提醒您还要查看接收器部分。您的 Dataframe final_df 实际上是一个 streaming Dataframe,必须通过 start.

启动

Structured Streaming Guide 为您提供了所有可用的概述Output Sinks,最简单的方法是将结果打印到控制台。

总而言之,您需要将以下内容添加到您的程序中:

final_df.writeStream
    .format("console")
    .start()

spark.streams.awaitAnyTermination()