必须使用 writeStream.start();; 执行流式源查询

Queries with streaming sources must be executed with writeStream.start();;

我正在尝试使用 spark 结构化流从 Kafka 读取数据并预测传入数据。我正在使用我使用 Spark ML 训练过的模型。

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .master("local")
  .getOrCreate()
import spark.implicits._

val toString = udf((payload: Array[Byte]) => new String(payload))
val sentenceDataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe", "topicname1")
  .load().selectExpr("CAST(value AS STRING)").as[(String)]
sentenceDataFrame.printSchema()
val regexTokenizer = new RegexTokenizer()
  .setInputCol("value")
  .setOutputCol("words")
  .setPattern("\W")
val tokencsv = regexTokenizer.transform(sentenceDataFrame)
val remover = new StopWordsRemover()
  .setInputCol("words")
  .setOutputCol("filtered")

val removestopdf = remover.transform(tokencsv)
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("filtered")
  .setOutputCol("result")
  .setVectorSize(300)
  .setMinCount(0)

val model = word2Vec.fit(removestopdf)

val result = model.transform(removestopdf)


val featureIndexer = new VectorIndexer()
  .setInputCol("result")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(2)
  .fit(result)

val some = featureIndexer.transform(result)

val model1 = RandomForestClassificationModel.load("/home/akhil/Documents/traindata/stages/2_rfc_80e12c5d1259")

  val predict = model1.transform(result)

val query = predict.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

当我对流数据进行预测时,出现以下错误:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: 
 Queries with streaming sources must be executed with 
writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:175)
at predict1model$.main(predict1model.scala:53)
at predict1model.main(predict1model.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

错误指的是 word2vec.fit(removestopdf) 行。任何帮助将非常感激 。

一般来说,Structured Streaming 不能(从 Spark 2.2 开始)用于训练 Spark ML 模型。 结构化流中不支持某些操作。其中之一是将 Dataset 转换为其 rdd 表示。 特别是 word2Vecit needs to go to the rdd level to implement fit.

的情况

不过,可以在静态数据集上训练模型并将预测应用于流数据。 transform 操作可用于流 Dataset,如上:val result = model.transform(removestopdf)

简而言之,我们需要在静态数据集上拟合 model。生成的 transformer 可以 应用 到流 Dataset.

您可以在此 Github project "Spark Structured Streaming ML"

上找到概念证明

还有SPARK-16424可以关注