为什么 foreachRDD 不使用 StreamingContext.textFileStream 用新内容填充 DataFrame?

Why does foreachRDD not populate DataFrame with new content using StreamingContext.textFileStream?

我的问题是,当我将代码更改为流模式并将数据框放入 foreach 循环时,数据框显示为空 table!我不填!我也无法将其放入assembler.transform()。错误是:

Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
      val dataFrame = Train_DStream.map()

我的 train.csv 文件如下: 请帮我。 这是我的代码:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

/**
  * Created by saeedtkh on 5/22/17.
  */
object ML_Test {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(10))
    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
      StructField("column0", StringType, true),
      StructField("column1", StringType, true),
      StructField("column2", StringType, true)))

      //val Test_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv").map(LabeledPoint.parse)
      val Train_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv")
      val DStream =Train_DStream.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      Row.fromSeq(Seq(first, second, third))
    })

      DStream.foreachRDD { Test_DStream =>
      val dataFrame = sqlContext.createDataFrame(Test_DStream, customSchema)
      dataFrame.groupBy("column1", "column2").count().show()

      val numFeatures = 3
      val model = new StreamingLinearRegressionWithSGD()
          .setInitialWeights(Vectors.zeros(numFeatures))

      val featureCol = Array("column1", "column2")
      val assembler=new VectorAssembler().setInputCols(featureCol).setOutputCol("features")
      dataFrame.show()
      val df_new=assembler.transform(dataFrame)

    }

    ssc.start()
    ssc.awaitTermination()
  }
}

我的猜测/Users/saeedtkh/Desktop/sharedsaeed/train.csv目录下的所有文件都已经处理完了,所以没有剩下的文件,因此DataFrame是空的。

请注意 StreamingContext.textFileStream 的唯一输入参数是目录而不是文件。

textFileStream(directory: String): DStream[String] Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files

另请注意,一旦文件已在 Spark Streaming 应用程序中处理过,该文件就不应更改(或附加),因为该文件已被标记已处理,Spark Streaming 将忽略任何修改。

引用Basic Sources中Spark Streaming的官方文档:

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported). Note that

  • The files must have the same data format.

  • The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

  • Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

For simple text files, there is an easier method streamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.


同时请将 setMaster("local") 替换为 setMaster("local[*]") 以确保您的 Spark Streaming 应用程序有足够的线程来处理传入数据(您必须至少有 2 个线程)。