为什么转换在结构化流中只产生一次副作用(println)?

Why does transform do side effects (println) only once in Structured Streaming?

为什么select每批打印一次,而hello world只打印一次?

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
 .schema(schema)
 .format("csv")
 .option("header", false)
 .option("maxFilesPerTrigger", 1)
 .option("delimiter", ";")
 .load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
 .transform { ds =>
   println("hello world")  // <-- Why is this printed out once?
   ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
  .format("console")
  .start

Spark 2.1.0-SNAPSHOT 此处(今天构建)但我相信它在 2.0 和现在之间没有变化。

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.

在 Spark 的 Structured Streaming 中,您的流应用程序只是 将相同的物理查询计划应用于输入数据源的技巧。

请注意,物理查询计划使您的 Dataset 成为可能(而且我对 Spark SQL 的了解越多,我就越发现查询和数据集之间没有区别——它们是这些天可以简单地互换)。

当你描述一个结构化查询时(无论是一次性查询还是流式查询),它都会经历解析、分析、优化和最终生成物理计划的 4 个阶段。您可以使用 explain(extended = true) 方法查看它。

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]

阶段是惰性的,只执行一次

一旦你有了物理计划,这些阶段就不会再执行了。您的 Dataset 管道已经计算完毕,唯一缺少的部分是流经管道的数据。

这就是为什么您只看到 "hello world" 一次——当流式查询计划 "executed" 生成物理计划时。它被执行一次并针对处理源 Dataset 进行了优化(并且只有 Dataset 所以任何副作用都已经被触发)。

一个有趣的案例。在这里提出来就够了!