在 Spark Structured Streaming 中读取流数据帧的模式
Reading schema of streaming Dataframe in Spark Structured Streaming
我是 Apache Spark 结构化流媒体的新手。我正在尝试从事件中心读取一些事件(XML 格式)并尝试从嵌套的 XML.
创建新的 Spark DF
我正在使用 https://github.com/databricks/spark-xml 中描述的代码示例,在批处理模式下 运行 完美,但在 Structured Spark Streaming 中则不然。
spark-xmlGithub库的代码块
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
我的批号
val df = Seq(
(8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)
val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
我正在尝试为 Spark Structured Streaming 执行相同的逻辑,如下代码所示:
结构化流代码
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
在 val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
指令的代码部分中抛出错误 Queries with streaming sources must be executed with writeStream.start();;
更新
尝试过
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
.select(($"body").cast("string"))
val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
现在没有 运行 进入错误,但 Databricks 仍处于“等待状态”
谢谢!!
如果您的代码在批处理模式下运行,则没有任何问题。
重要的是不仅要将源转换为流(通过使用readStream
和load
),而且还需要将接收部分转换为流。
您收到的错误消息只是提醒您还要查看接收器部分。您的 Dataframe final_df
实际上是一个 streaming Dataframe,必须通过 start
.
启动
Structured Streaming Guide 为您提供了所有可用的概述Output Sinks,最简单的方法是将结果打印到控制台。
总而言之,您需要将以下内容添加到您的程序中:
final_df.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
我是 Apache Spark 结构化流媒体的新手。我正在尝试从事件中心读取一些事件(XML 格式)并尝试从嵌套的 XML.
创建新的 Spark DF我正在使用 https://github.com/databricks/spark-xml 中描述的代码示例,在批处理模式下 运行 完美,但在 Structured Spark Streaming 中则不然。
spark-xmlGithub库的代码块
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
我的批号
val df = Seq(
(8, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>7</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(64, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>6</tag1> <tag2>4</tag2> <mode>0</mode> <Quantity>1</Quantity></AccountSetup>"),
(27, "<AccountSetup xmlns:xsi=\"test\"><Customers test=\"a\">d</Customers><tag1>4</tag1> <tag2>4</tag2> <mode>3</mode> <Quantity>1</Quantity></AccountSetup>")
).toDF("number", "body")
)
val payloadSchema = schema_of_xml(df.select("body").as[String])
val parsed = df.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
我正在尝试为 Spark Structured Streaming 执行相同的逻辑,如下代码所示:
结构化流代码
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
val parsed = streamingSelectDF.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
display(final_df.select("parsed.*"))
在 val payloadSchema = schema_of_xml(streamingInputDF.select("body").as[String])
指令的代码部分中抛出错误 Queries with streaming sources must be executed with writeStream.start();;
更新
尝试过
val streamingInputDF =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
.select(($"body").cast("string"))
val body_value = streamingInputDF.select("body").as[String]
body_value.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()
val payloadSchema = schema_of_xml(body_value)
val parsed = body_value.withColumn("parsed", from_xml($"body", payloadSchema))
val final_df = parsed.select(parsed.col("parsed"))
现在没有 运行 进入错误,但 Databricks 仍处于“等待状态”
如果您的代码在批处理模式下运行,则没有任何问题。
重要的是不仅要将源转换为流(通过使用readStream
和load
),而且还需要将接收部分转换为流。
您收到的错误消息只是提醒您还要查看接收器部分。您的 Dataframe final_df
实际上是一个 streaming Dataframe,必须通过 start
.
Structured Streaming Guide 为您提供了所有可用的概述Output Sinks,最简单的方法是将结果打印到控制台。
总而言之,您需要将以下内容添加到您的程序中:
final_df.writeStream
.format("console")
.start()
spark.streams.awaitAnyTermination()