如何解决 DataSet.toJSON 与结构化流媒体不兼容的问题
How to work around DataSet.toJSON being incompatible with Structured Streaming
我想把Twitter的数据写入Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我已经创建了一个基于 socket-Source 的 Twitter-Source,效果很好。
我的源设置如下:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这为我提供了一个用于分析查询的良好数据集。太棒了!
接下来我想将每条推文以略微闪亮的模式保存到 Kafka 中:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
这很简单!除了,它不起作用。在 QueryExecution.scala
中,调用进入 assertSupported
并最终被抛出,因为
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到 toJSON
是一个纯批处理操作,但没有它,使用 say select($"text" as "value")
代替,代码就可以工作。
现在,我有点吃惊,希望有人能解释为什么 toJSON 不应该与流媒体兼容(这是一个错误吗?缺少的功能?),并告诉我是否有结构化流媒体方式将我的对象的序列化表示形式放入 Kafka。
有点冗长,但 to_json
函数应该可以解决问题:
import org.apache.spark.sql.functions.{to_json, struct, col}
tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
.writeStream
...
toJSON
的问题似乎是 this conversion to RDD:
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
...
并且(正如maasg in 所指出的)似乎已经在开发版本中解决了。
我想把Twitter的数据写入Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我已经创建了一个基于 socket-Source 的 Twitter-Source,效果很好。
我的源设置如下:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
这为我提供了一个用于分析查询的良好数据集。太棒了!
接下来我想将每条推文以略微闪亮的模式保存到 Kafka 中:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
这很简单!除了,它不起作用。在 QueryExecution.scala
中,调用进入 assertSupported
并最终被抛出,因为
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
我没想到 toJSON
是一个纯批处理操作,但没有它,使用 say select($"text" as "value")
代替,代码就可以工作。
现在,我有点吃惊,希望有人能解释为什么 toJSON 不应该与流媒体兼容(这是一个错误吗?缺少的功能?),并告诉我是否有结构化流媒体方式将我的对象的序列化表示形式放入 Kafka。
有点冗长,但 to_json
函数应该可以解决问题:
import org.apache.spark.sql.functions.{to_json, struct, col}
tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
.writeStream
...
toJSON
的问题似乎是 this conversion to RDD:
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
...
并且(正如maasg in