Spark Streaming 连续作业

Spark Streaming Continuous Job

我有一个实现自定义接收器的 Spark 流作业。此接收器从队列中获取记录,直到队列耗尽(或满足间隔),然后再将这些记录返回到主机上下文以写入数据库。

写入这些记录后,我希望主机进程启动一个新的接收器并继续处理,我该如何使用 API?

主机进程如下所示:

def main(args: Array[String]) {
  val config = new SparkConf()
  config.setAppName("Vehicle Data Queue Consumer")
  config.set("spark.driver.allowMultipleContexts", "true")

  val streamContext = new StreamingContext(config, Seconds(1) )

  val rStream = generateReceiverStream(streamContext)

  val sparkContext = new SparkContext(config)

  streamContext.start()

  streamContext.awaitTermination()
}

def generateReceiverStream(aContext: StreamingContext): ReceiverInputDStream[List[String]] = {

  val rmqReceiver = new RMQReceiver("amqp://myQueue")
  val customReceiverStream = aContext.receiverStream(rmqReceiver)

  val handler = (rdd: RDD[List[String]]) => this.handleStreamResult(rdd)

  customReceiverStream.foreachRDD(handler)


  return customReceiverStream
}

def handleStreamResult(rdd: RDD[List[String]]): Unit ={
  rdd.foreach { record =>
    record.foreach { aString =>
      println("****************************")
      println(s"$aString")
      println("****************************")
    }
  }
}

接收器在流上下文开始时实例化一次,预计在 Spark Streaming 作业处于活动状态时 'alive'。 Spark Streaming 将在接收器上使用 onStart()onStop 方法来管理其生命周期。

在 Spark Streaming 中,receivers 应该在活动时并发生成数据。也就是说,在调用 receiver.start() 之后,自定义接收器应该创建和管理自己的线程,其中对 store(...) 的调用将为从该接收器创建的逻辑 DStream 生成数据。

Spark Streaming 将管理接收器生命周期以处理故障场景和 streamingContext.stop 调用。因此,在流上下文启动后,没有必要甚至不可能以编程方式 'fire up a new receiver'。

设计并实现您的自定义接收器以遵循此预期行为,并且作业将 运行 持续进行,无需任何进一步的努力。