如何在流式数据集上执行 df.rdd 或 df.collect().foreach?
How to do df.rdd or df.collect().foreach on streaming dataset?
这是我在尝试转换时遇到的异常。
val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我想做的就是在结构化流中复制以下 sql.dataframe 操作。
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
在 Dataframes 中运行很好,但在结构化流中却不行。
collect
即使在 Spark Core 的 RDD 世界中也是一个很大的禁忌,因为您可能会传输回驱动程序的单个 JVM 的数据的大小。它只是设置了 Spark 优势的边界,因为 collect
你在一个单一的 JVM 中。
话虽如此,请考虑无限数据,即永远不会终止的数据流。这就是 Spark 结构化流。
流式数据集是一个永远不完整的数据集,每次您请求内容时,其中的数据都会发生变化,即对数据流执行结构化查询的结果。
你根本不能说"Hey, give me the data that is the content of a streaming Dataset"。这甚至没有意义。
这就是为什么您不能 collect
在流式数据集上。 Spark 2.2.1(撰写本文时的最新版本)是不可能的。
如果您想在一段时间内接收流式数据集中的数据(在 Spark Streaming 中又称为 批处理间隔 或 触发器 在 Spark Structured Streaming 中)将结果写入流式接收器,例如console
.
您还可以在 addBatch
内编写 collect.map(_.toSeq)
的自定义流式接收器,即 the main and only method of a streaming sink. As a matter of fact, console
sink does exactly it。
All I am trying to do is replicate the following sql.dataframe
operations in structured streaming.
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
which is running fine in Dataframes but not in structured streaming.
我想到的第一个解决方案是使用 foreach
sink:
The foreach
operation allows arbitrary operations to be computed on the output data.
那当然不意味着这是最好的解决方案。就是我第一时间想到的。
这是我在尝试转换时遇到的异常。
val df_col = df.select("ts.user.friends_count").collect.map(_.toSeq)
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我想做的就是在结构化流中复制以下 sql.dataframe 操作。
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
在 Dataframes 中运行很好,但在结构化流中却不行。
collect
即使在 Spark Core 的 RDD 世界中也是一个很大的禁忌,因为您可能会传输回驱动程序的单个 JVM 的数据的大小。它只是设置了 Spark 优势的边界,因为 collect
你在一个单一的 JVM 中。
话虽如此,请考虑无限数据,即永远不会终止的数据流。这就是 Spark 结构化流。
流式数据集是一个永远不完整的数据集,每次您请求内容时,其中的数据都会发生变化,即对数据流执行结构化查询的结果。
你根本不能说"Hey, give me the data that is the content of a streaming Dataset"。这甚至没有意义。
这就是为什么您不能 collect
在流式数据集上。 Spark 2.2.1(撰写本文时的最新版本)是不可能的。
如果您想在一段时间内接收流式数据集中的数据(在 Spark Streaming 中又称为 批处理间隔 或 触发器 在 Spark Structured Streaming 中)将结果写入流式接收器,例如console
.
您还可以在 addBatch
内编写 collect.map(_.toSeq)
的自定义流式接收器,即 the main and only method of a streaming sink. As a matter of fact, console
sink does exactly it。
All I am trying to do is replicate the following sql.dataframe operations in structured streaming.
df.collect().foreach(row => droolsCaseClass(row.getLong(0), row.getString(1)))
which is running fine in Dataframes but not in structured streaming.
我想到的第一个解决方案是使用 foreach
sink:
The
foreach
operation allows arbitrary operations to be computed on the output data.
那当然不意味着这是最好的解决方案。就是我第一时间想到的。