为什么转换在结构化流中只产生一次副作用(println)?
Why does transform do side effects (println) only once in Structured Streaming?
为什么select
每批打印一次,而hello world
只打印一次?
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
val in: DataFrame = sparkSession.readStream
.schema(schema)
.format("csv")
.option("header", false)
.option("maxFilesPerTrigger", 1)
.option("delimiter", ";")
.load("s3://xxxxxxxx")
val input: DataFrame = in.select("*")
.transform { ds =>
println("hello world") // <-- Why is this printed out once?
ds
}
import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
.format("console")
.start
Spark 2.1.0-SNAPSHOT 此处(今天构建)但我相信它在 2.0 和现在之间没有变化。
$ ./bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
在 Spark 的 Structured Streaming 中,您的流应用程序只是 将相同的物理查询计划应用于输入数据源的技巧。
请注意,物理查询计划使您的 Dataset
成为可能(而且我对 Spark SQL 的了解越多,我就越发现查询和数据集之间没有区别——它们是这些天可以简单地互换)。
当你描述一个结构化查询时(无论是一次性查询还是流式查询),它都会经历解析、分析、优化和最终生成物理计划的 4 个阶段。您可以使用 explain(extended = true)
方法查看它。
scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
阶段是惰性的,只执行一次。
一旦你有了物理计划,这些阶段就不会再执行了。您的 Dataset
管道已经计算完毕,唯一缺少的部分是流经管道的数据。
这就是为什么您只看到 "hello world" 一次——当流式查询计划 "executed" 生成物理计划时。它被执行一次并针对处理源 Dataset
进行了优化(并且只有 Dataset
所以任何副作用都已经被触发)。
一个有趣的案例。在这里提出来就够了!
为什么select
每批打印一次,而hello world
只打印一次?
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
val in: DataFrame = sparkSession.readStream
.schema(schema)
.format("csv")
.option("header", false)
.option("maxFilesPerTrigger", 1)
.option("delimiter", ";")
.load("s3://xxxxxxxx")
val input: DataFrame = in.select("*")
.transform { ds =>
println("hello world") // <-- Why is this printed out once?
ds
}
import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
.format("console")
.start
Spark 2.1.0-SNAPSHOT 此处(今天构建)但我相信它在 2.0 和现在之间没有变化。
$ ./bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
在 Spark 的 Structured Streaming 中,您的流应用程序只是 将相同的物理查询计划应用于输入数据源的技巧。
请注意,物理查询计划使您的 Dataset
成为可能(而且我对 Spark SQL 的了解越多,我就越发现查询和数据集之间没有区别——它们是这些天可以简单地互换)。
当你描述一个结构化查询时(无论是一次性查询还是流式查询),它都会经历解析、分析、优化和最终生成物理计划的 4 个阶段。您可以使用 explain(extended = true)
方法查看它。
scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]
== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
阶段是惰性的,只执行一次。
一旦你有了物理计划,这些阶段就不会再执行了。您的 Dataset
管道已经计算完毕,唯一缺少的部分是流经管道的数据。
这就是为什么您只看到 "hello world" 一次——当流式查询计划 "executed" 生成物理计划时。它被执行一次并针对处理源 Dataset
进行了优化(并且只有 Dataset
所以任何副作用都已经被触发)。
一个有趣的案例。在这里提出来就够了!