如何在 Apache Spark 中使用 DStream 进行特征提取
How to use feature extraction with DStream in Apache Spark
我有通过 DStream 从 Kafka 到达的数据。我想做特征抽取,得到一些关键词。
我不想等待所有数据的到达(因为它应该是可能永远不会结束的连续流),所以我希望以块的形式执行提取 - 准确性对我来说并不重要会有点吃亏。
到目前为止,我整理了类似的东西:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
但是,我收到了 java.lang.IllegalStateException: Haven't seen any document yet.
- 我并不感到惊讶,因为我只是想把东西拼凑在一起,而且我明白,因为我不是在等待一些数据的到来,所以生成的模型可能是当我尝试在数据上使用它时为空。
解决这个问题的正确方法是什么?
我使用了评论中的建议并将程序分成 2 次运行:
计算 IDF 模型并将其保存到文件的
def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = {
val session: SparkSession = SparkSession.builder.getOrCreate
val wordsDf = session.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
val featurizedDf = hashingTF.transform(wordsDf)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedDf)
idfModel.write.save(idfModelFile.getAbsolutePath)
}
从文件中读取 IDF 模型并简单地对所有传入信息运行它
val idfModel = IDFModel.load(idfModelFile.getAbsolutePath)
val documentDf = spark.createDataFrame(rdd).toDF("update", "document")
val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words")
val wordsDf = tokenizer.transform(documentDf)
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
val featurizedDf = hashingTF.transform(wordsDf)
val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features")
val featuresDf = extractor.transform(featurizedDf)
featuresDf.select("update", "features")
我有通过 DStream 从 Kafka 到达的数据。我想做特征抽取,得到一些关键词。
我不想等待所有数据的到达(因为它应该是可能永远不会结束的连续流),所以我希望以块的形式执行提取 - 准确性对我来说并不重要会有点吃亏。
到目前为止,我整理了类似的东西:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
但是,我收到了 java.lang.IllegalStateException: Haven't seen any document yet.
- 我并不感到惊讶,因为我只是想把东西拼凑在一起,而且我明白,因为我不是在等待一些数据的到来,所以生成的模型可能是当我尝试在数据上使用它时为空。
解决这个问题的正确方法是什么?
我使用了评论中的建议并将程序分成 2 次运行:
计算 IDF 模型并将其保存到文件的
def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = { val session: SparkSession = SparkSession.builder.getOrCreate val wordsDf = session.createDataFrame(rdd).toDF("data", "words") val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") val featurizedDf = hashingTF.transform(wordsDf) val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel = idf.fit(featurizedDf) idfModel.write.save(idfModelFile.getAbsolutePath) }
从文件中读取 IDF 模型并简单地对所有传入信息运行它
val idfModel = IDFModel.load(idfModelFile.getAbsolutePath) val documentDf = spark.createDataFrame(rdd).toDF("update", "document") val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words") val wordsDf = tokenizer.transform(documentDf) val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") val featurizedDf = hashingTF.transform(wordsDf) val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features") val featuresDf = extractor.transform(featurizedDf) featuresDf.select("update", "features")