如何在流式数据集上执行 df.rdd 或 df.collect().foreach?

How to do df.rdd or df.collect().foreach on streaming dataset?

这是我在尝试转换时遇到的异常。

val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我想做的就是在结构化流中复制以下 sql.dataframe 操作。

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

在 Dataframes 中运行很好,但在结构化流中却不行。

collect 即使在 Spark Core 的 RDD 世界中也是一个很大的禁忌,因为您可能会传输回驱动程序的单个 JVM 的数据的大小。它只是设置了 Spark 优势的边界,因为 collect 你在一个单一的 JVM 中。

话虽如此,请考虑无限数据,即永远不会终止的数据流。这就是 Spark 结构化流。

流式数据集是一个永远不完整的数据集,每次您请求内容时,其中的数据都会发生变化,即对数据流执行结构化查询的结果。

你根本不能说"Hey, give me the data that is the content of a streaming Dataset"。这甚至没有意义。

这就是为什么您不能 collect 在流式数据集上。 Spark 2.2.1(撰写本文时的最新版本)是不可能的。

如果您想在一段时间内接收流式数据集中的数据(在 Spark Streaming 中又称为 批处理间隔触发器 在 Spark Structured Streaming 中)将结果写入流式接收器,例如console.

您还可以在 addBatch 内编写 collect.map(_.toSeq) 的自定义流式接收器,即 the main and only method of a streaming sink. As a matter of fact, console sink does exactly it

All I am trying to do is replicate the following sql.dataframe operations in structured streaming.

df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))

which is running fine in Dataframes but not in structured streaming.

我想到的第一个解决方案是使用 foreach sink:

The foreach operation allows arbitrary operations to be computed on the output data.

那当然意味着这是最好的解决方案。就是我第一时间想到的。