为什么在流式数据集上使用缓存失败并显示 "AnalysisException: Queries with streaming sources must be executed with writeStream.start()"?

Why does using cache on streaming Datasets fail with "AnalysisException: Queries with streaming sources must be executed with writeStream.start()"?

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

在 Spark 2.1.0 中执行此示例时出现错误。 没有 .cache 选项它按预期工作但有 .cache 选项我得到:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)

有什么想法吗?

您的(非常有趣的)案例归结为以下行(您可以在 spark-shell 中执行):

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

这个原因很容易解释(对 Spark SQL 的 explain 没有双关语)。

spark.readStream.text("files") 创建所谓的 流数据集 .

scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

流数据集是 Spark SQL 的 Structured Streaming.

的基础

您可能已经在结构化流的 Quick Example 中读到:

And then start the streaming computation using start().

引用 DataStreamWriter 的 scaladoc start:

start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.

因此,您必须使用 start(或 foreach)开始执行流式查询。你早就知道了。

但是...结构化流中有Unsupported Operations

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.

If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".

这看起来很眼熟,不是吗?

cache 在不受支持的操作列表中是 而不是 ,但那是因为它只是被忽略了(我报告 SPARK-20927 来修复它) .

cache 应该在列表中,因为它 确实 在查询注册到 Spark SQL 的 CacheManager 之前执行查询。

让我们深入Spark的深处SQL...屏住呼吸...

cache is persist while persist requests the current CacheManager to cache the query:

sparkSession.sharedState.cacheManager.cacheQuery(this)

缓存查询时 CacheManager execute it:

sparkSession.sessionState.executePlan(planToCache).executedPlan

我们知道是不允许的,因为start(或foreach)这样做是不允许的。

问题已解决!