Spark Scala 中的 TextFileStreaming

TextFileStreaming in spark scala

我在本地目录中有很多文本文件。 Spark 程序读取所有文件并将其存储到数据库中。目前,尝试使用文本文件流读取文件无效。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

/**
  * Main Program
  */
object SparkMain extends App {

  // Create a SparkContext to initialize Spark
  val sparkConf: SparkConf =
    new SparkConf()
      .setMaster("local")
      .setAppName("TestProgram")

  // Create a spark streaming context with windows period 2 sec
  val ssc: StreamingContext =
    new StreamingContext(sparkConf, Seconds(2))

  // Create text file stream
  val sourceDir: String = "D:\tmpDir"
  val stream: DStream[String] = ssc.textFileStream(sourceDir)

  case class TextLine(line: String)

  val lineRdd: DStream[TextLine] = stream.map(TextLine)

  lineRdd.foreachRDD(rdd => {
    rdd.foreach(println)
  })

  // Start the computation
  ssc.start()
  // Wait for the computation to terminate
  ssc.awaitTermination()
}

输入:

//1.txt
Hello World

流式传输时不打印任何内容。这有什么问题吗?

TextFileStreaming 不读取目录中已存在的文件。启动程序并创建一个新文件或从任何其他目录移动文件。以下程序是文本文件流的简单字数统计

  val sourceDir: String = "path to streaming directory"
  val stream: DStream[String] = streamingContext.textFileStream(sourceDir)

  case class TextLine(line: String)

  val lineRdd: DStream[TextLine] = stream.map(TextLine)

  lineRdd.foreachRDD(rdd => {
    val words = rdd.flatMap(rdd => rdd.line.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    println("=====================")
    wordCounts.foreach(println)
    println("=====================" + rdd.count())
  })

输出应该是这样的

+++++++++++++++++++++++
=====================0
+++++++++++++++++++++++
(are,1)
(you,1)
(how,1)
(hello,1)
(doing,1)
=====================5
+++++++++++++++++++++++
=====================0

希望对您有所帮助!