AnalysisException:必须使用 writeStream.start() 执行流式源查询
AnalysisException: Queries with streaming sources must be executed with writeStream.start()
我收到一个异常,表明我需要启动流才能使用它。但是,流正在启动。这个设置有什么问题?
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", "inputTopic")
.option("startingOffsets", "earliest")
.load
.selectExpr(deserializeKeyExpression, deserializeValueExpression)
.select("value.*")
.withColumn("date", to_timestamp(from_unixtime(col("date"))))
.transform(model.transform)
.select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
.selectExpr(serializeKeyExpression, serializeValueExpression)
.writeStream
.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("checkpointLocation", "checkpoint")
.option("topic", "outputTopic")
.start
例外情况:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
...
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply$mcI$sp(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply(VectorAssembler.scala:88)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun.apply(VectorAssembler.scala:58)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
at org.apache.spark.ml.PipelineModel$$anonfun$transform.apply(Pipeline.scala:306)
at org.apache.spark.ml.PipelineModel$$anonfun$transform.apply(Pipeline.scala:306)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
at com.company.project.Stream$$anonfun$transform.apply(NewsRateJob.scala:65)
at com.company.project.Stream$$anonfun$transform.apply(NewsRateJob.scala:65)
at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
at com.company.project.Stream.transform(NewsRateJob.scala:65)
at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
... 18 common frames omitted
我熟悉 spark 2.2 和 VectorAssembler 的问题,但是我使用的是 spark 2.3.1。
出现异常是因为您正在尝试将 ML Transformer
与流 Dataset
一起使用。如 中所述,截至今天,Spark 不支持结构化流上的 ML。
您必须重写代码才能在不依赖 RDD 和 ML 库的情况下手动转换数据。
发生此异常是因为模型试图在流开始之前访问流中的数据。在这种情况下,VectorAssembler 在数据集上调用 first
以确定向量的宽度。
2.3 不会自动修复 VectorAssembler 和结构化流的问题,它只是提供了一个 class(特别是 VectorSizeHint class),可以与具有结构化流的 VectorAssembler 结合使用。将此添加到管道的阶段解决了问题。
stages += new VectorSizeHint()
.setInputCol(column)
.setSize(size)
这里有一些说明如何使用它的文档:https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html
注意:这不是 OneHotEncoderEstimator 功能所必需的。
由于其他几个原因,我们遇到了类似的堆栈跟踪。一个是因为我们在模型中使用了 OneHotEstimator(需要更新为 OneHotEncoderEstimator),另一个是因为我们正在缓存管道(我们删除了缓存步骤)。
我收到一个异常,表明我需要启动流才能使用它。但是,流正在启动。这个设置有什么问题?
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", "inputTopic")
.option("startingOffsets", "earliest")
.load
.selectExpr(deserializeKeyExpression, deserializeValueExpression)
.select("value.*")
.withColumn("date", to_timestamp(from_unixtime(col("date"))))
.transform(model.transform)
.select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
.selectExpr(serializeKeyExpression, serializeValueExpression)
.writeStream
.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("checkpointLocation", "checkpoint")
.option("topic", "outputTopic")
.start
例外情况:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
...
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply$mcI$sp(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$$anonfun.apply(VectorAssembler.scala:88)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun.apply(VectorAssembler.scala:58)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
at org.apache.spark.ml.PipelineModel$$anonfun$transform.apply(Pipeline.scala:306)
at org.apache.spark.ml.PipelineModel$$anonfun$transform.apply(Pipeline.scala:306)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
at com.company.project.Stream$$anonfun$transform.apply(NewsRateJob.scala:65)
at com.company.project.Stream$$anonfun$transform.apply(NewsRateJob.scala:65)
at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
at com.company.project.Stream.transform(NewsRateJob.scala:65)
at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
... 18 common frames omitted
我熟悉 spark 2.2 和 VectorAssembler 的问题,但是我使用的是 spark 2.3.1。
出现异常是因为您正在尝试将 ML Transformer
与流 Dataset
一起使用。如
您必须重写代码才能在不依赖 RDD 和 ML 库的情况下手动转换数据。
发生此异常是因为模型试图在流开始之前访问流中的数据。在这种情况下,VectorAssembler 在数据集上调用 first
以确定向量的宽度。
2.3 不会自动修复 VectorAssembler 和结构化流的问题,它只是提供了一个 class(特别是 VectorSizeHint class),可以与具有结构化流的 VectorAssembler 结合使用。将此添加到管道的阶段解决了问题。
stages += new VectorSizeHint()
.setInputCol(column)
.setSize(size)
这里有一些说明如何使用它的文档:https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html
注意:这不是 OneHotEncoderEstimator 功能所必需的。
由于其他几个原因,我们遇到了类似的堆栈跟踪。一个是因为我们在模型中使用了 OneHotEstimator(需要更新为 OneHotEncoderEstimator),另一个是因为我们正在缓存管道(我们删除了缓存步骤)。