Spark Streaming 连续作业
Spark Streaming Continuous Job
我有一个实现自定义接收器的 Spark 流作业。此接收器从队列中获取记录,直到队列耗尽(或满足间隔),然后再将这些记录返回到主机上下文以写入数据库。
写入这些记录后,我希望主机进程启动一个新的接收器并继续处理,我该如何使用 API?
主机进程如下所示:
def main(args: Array[String]) {
val config = new SparkConf()
config.setAppName("Vehicle Data Queue Consumer")
config.set("spark.driver.allowMultipleContexts", "true")
val streamContext = new StreamingContext(config, Seconds(1) )
val rStream = generateReceiverStream(streamContext)
val sparkContext = new SparkContext(config)
streamContext.start()
streamContext.awaitTermination()
}
def generateReceiverStream(aContext: StreamingContext): ReceiverInputDStream[List[String]] = {
val rmqReceiver = new RMQReceiver("amqp://myQueue")
val customReceiverStream = aContext.receiverStream(rmqReceiver)
val handler = (rdd: RDD[List[String]]) => this.handleStreamResult(rdd)
customReceiverStream.foreachRDD(handler)
return customReceiverStream
}
def handleStreamResult(rdd: RDD[List[String]]): Unit ={
rdd.foreach { record =>
record.foreach { aString =>
println("****************************")
println(s"$aString")
println("****************************")
}
}
}
接收器在流上下文开始时实例化一次,预计在 Spark Streaming 作业处于活动状态时 'alive'。 Spark Streaming 将在接收器上使用 onStart()
和 onStop
方法来管理其生命周期。
在 Spark Streaming 中,receivers
应该在活动时并发生成数据。也就是说,在调用 receiver.start()
之后,自定义接收器应该创建和管理自己的线程,其中对 store(...)
的调用将为从该接收器创建的逻辑 DStream 生成数据。
Spark Streaming 将管理接收器生命周期以处理故障场景和 streamingContext.stop
调用。因此,在流上下文启动后,没有必要甚至不可能以编程方式 'fire up a new receiver'。
设计并实现您的自定义接收器以遵循此预期行为,并且作业将 运行 持续进行,无需任何进一步的努力。
我有一个实现自定义接收器的 Spark 流作业。此接收器从队列中获取记录,直到队列耗尽(或满足间隔),然后再将这些记录返回到主机上下文以写入数据库。
写入这些记录后,我希望主机进程启动一个新的接收器并继续处理,我该如何使用 API?
主机进程如下所示:
def main(args: Array[String]) {
val config = new SparkConf()
config.setAppName("Vehicle Data Queue Consumer")
config.set("spark.driver.allowMultipleContexts", "true")
val streamContext = new StreamingContext(config, Seconds(1) )
val rStream = generateReceiverStream(streamContext)
val sparkContext = new SparkContext(config)
streamContext.start()
streamContext.awaitTermination()
}
def generateReceiverStream(aContext: StreamingContext): ReceiverInputDStream[List[String]] = {
val rmqReceiver = new RMQReceiver("amqp://myQueue")
val customReceiverStream = aContext.receiverStream(rmqReceiver)
val handler = (rdd: RDD[List[String]]) => this.handleStreamResult(rdd)
customReceiverStream.foreachRDD(handler)
return customReceiverStream
}
def handleStreamResult(rdd: RDD[List[String]]): Unit ={
rdd.foreach { record =>
record.foreach { aString =>
println("****************************")
println(s"$aString")
println("****************************")
}
}
}
接收器在流上下文开始时实例化一次,预计在 Spark Streaming 作业处于活动状态时 'alive'。 Spark Streaming 将在接收器上使用 onStart()
和 onStop
方法来管理其生命周期。
在 Spark Streaming 中,receivers
应该在活动时并发生成数据。也就是说,在调用 receiver.start()
之后,自定义接收器应该创建和管理自己的线程,其中对 store(...)
的调用将为从该接收器创建的逻辑 DStream 生成数据。
Spark Streaming 将管理接收器生命周期以处理故障场景和 streamingContext.stop
调用。因此,在流上下文启动后,没有必要甚至不可能以编程方式 'fire up a new receiver'。
设计并实现您的自定义接收器以遵循此预期行为,并且作业将 运行 持续进行,无需任何进一步的努力。