为什么在流式数据集上使用缓存失败并显示 "AnalysisException: Queries with streaming sources must be executed with writeStream.start()"?
Why does using cache on streaming Datasets fail with "AnalysisException: Queries with streaming sources must be executed with writeStream.start()"?
SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
在 Spark 2.1.0 中执行此示例时出现错误。
没有 .cache
选项它按预期工作但有 .cache
选项我得到:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
有什么想法吗?
您的(非常有趣的)案例归结为以下行(您可以在 spark-shell
中执行):
scala> :type spark
org.apache.spark.sql.SparkSession
scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:104)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
... 48 elided
这个原因很容易解释(对 Spark SQL 的 explain
没有双关语)。
spark.readStream.text("files")
创建所谓的 流数据集 .
scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]
scala> files.isStreaming
res2: Boolean = true
流数据集是 Spark SQL 的 Structured Streaming.
的基础
您可能已经在结构化流的 Quick Example 中读到:
And then start the streaming computation using start()
.
引用 DataStreamWriter 的 scaladoc start:
start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.
因此,您必须使用 start
(或 foreach
)开始执行流式查询。你早就知道了。
但是...结构化流中有Unsupported Operations:
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
这看起来很眼熟,不是吗?
cache
在不受支持的操作列表中是 而不是 ,但那是因为它只是被忽略了(我报告 SPARK-20927 来修复它) .
cache
应该在列表中,因为它 确实 在查询注册到 Spark SQL 的 CacheManager 之前执行查询。
让我们深入Spark的深处SQL...屏住呼吸...
cache
is persist
while persist
requests the current CacheManager to cache the query:
sparkSession.sharedState.cacheManager.cacheQuery(this)
缓存查询时 CacheManager
会 execute it:
sparkSession.sessionState.executePlan(planToCache).executedPlan
我们知道是不允许的,因为start
(或foreach
)这样做是不允许的。
问题已解决!
SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
在 Spark 2.1.0 中执行此示例时出现错误。
没有 .cache
选项它按预期工作但有 .cache
选项我得到:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
有什么想法吗?
您的(非常有趣的)案例归结为以下行(您可以在 spark-shell
中执行):
scala> :type spark
org.apache.spark.sql.SparkSession
scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery.apply(CacheManager.scala:104)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
... 48 elided
这个原因很容易解释(对 Spark SQL 的 explain
没有双关语)。
spark.readStream.text("files")
创建所谓的 流数据集 .
scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]
scala> files.isStreaming
res2: Boolean = true
流数据集是 Spark SQL 的 Structured Streaming.
的基础您可能已经在结构化流的 Quick Example 中读到:
And then start the streaming computation using
start()
.
引用 DataStreamWriter 的 scaladoc start:
start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.
因此,您必须使用 start
(或 foreach
)开始执行流式查询。你早就知道了。
但是...结构化流中有Unsupported Operations:
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
这看起来很眼熟,不是吗?
cache
在不受支持的操作列表中是 而不是 ,但那是因为它只是被忽略了(我报告 SPARK-20927 来修复它) .
cache
应该在列表中,因为它 确实 在查询注册到 Spark SQL 的 CacheManager 之前执行查询。
让我们深入Spark的深处SQL...屏住呼吸...
cache
is persist
while persist
requests the current CacheManager to cache the query:
sparkSession.sharedState.cacheManager.cacheQuery(this)
缓存查询时 CacheManager
会 execute it:
sparkSession.sessionState.executePlan(planToCache).executedPlan
我们知道是不允许的,因为start
(或foreach
)这样做是不允许的。
问题已解决!