如何将数据集 <Row> 写入 Spark Structured Streaming 上的 kafka 输出主题 - Java8
How can I write a Dataset<Row> into kafka output topic on Spark Structured Streaming - Java8
我正在尝试使用 Spark 2.1 中的 ForeachWriter
接口,但我无法使用它。
Spark 2.2.0 将支持它。要了解如何使用它,建议您阅读此博客 post:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
您可以尝试 Spark 2.2.0 RC2 [1] 或等待最终版本。
如果您不能使用 Spark 2.2.0+,另一种选择是查看此博客:
它有一个非常简单的 Kafka 接收器,也许这对你来说就足够了。
[1] http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html
首先要知道的是,如果您使用 spark 结构化流并处理流数据,您将拥有一个流数据集。
话虽这么说,编写这个流数据集的方法是调用 ForeachWriter,你没看错。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start
写入主题的函数如下:
private def processRow(value: Commons.UserEvent) = {
/*
* Producer.send(topic, data)
*/
}
我正在尝试使用 Spark 2.1 中的 ForeachWriter
接口,但我无法使用它。
Spark 2.2.0 将支持它。要了解如何使用它,建议您阅读此博客 post:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
您可以尝试 Spark 2.2.0 RC2 [1] 或等待最终版本。
如果您不能使用 Spark 2.2.0+,另一种选择是查看此博客:
它有一个非常简单的 Kafka 接收器,也许这对你来说就足够了。
[1] http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html
首先要知道的是,如果您使用 spark 结构化流并处理流数据,您将拥有一个流数据集。
话虽这么说,编写这个流数据集的方法是调用 ForeachWriter,你没看错。
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start
写入主题的函数如下:
private def processRow(value: Commons.UserEvent) = {
/*
* Producer.send(topic, data)
*/
}