为什么在spark streaming中使用mapWithState/checkpoint会在处理时序图中看到周期性的脉冲?
Why will I see the periodic pulses in processing time chart when using mapWithState/checkpoint in spark streaming?
我写了一个 stateful-wordCount spark streaming 应用程序,它可以连续接收来自 Kafka 的数据。我的代码包含一个 mapWithState
函数并且可以 运行 正确。当我检查 spark UI 的流统计数据时,我在 处理时间 图表中发现了一些 周期性脉冲 。我认为这可能是由于使用 checkpoint 引起的。希望有人能解释一下,非常感谢!
和完成的批次 table:
我发现一些 1 秒时间成本的批次会定期出现。然后我进入一个 1 秒时间成本批次和一个亚秒时间成本批次,发现 1 秒时间成本批次比另一个多一个作业。
比较两种批次:
好像是由checkpoint
引起的,但我不确定。
有谁能详细解释一下吗?谢谢!
这是我的代码:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object StateApp {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(
s"""
|Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <batchDuration> is the batch duration of spark streaming
| <checkpointPath> is the checkpoint directory
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics, bd, cpp) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test")
val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt))
ssc.checkpoint(cpp)
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// test the messages' receiving speed
messages.foreachRDD(rdd =>
println(System.currentTimeMillis() + "\t" + System.currentTimeMillis() / 1000 + "\t" + (rdd.count() / bd.toInt).toString))
// the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC"
// wordDstream: (word, 1), eg. (ABC, 1)
val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1))
// this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc)).print()
// Start the computation
ssc.start()
ssc.awaitTermination() }
}
您看到的这些小峰值是由于将数据检查点设置到持久存储造成的。为了让 Spark 进行状态完整转换,它需要在每个定义的时间间隔内可靠地存储您的数据,以便能够在发生故障时恢复。
请注意,峰值在时间上是一致的,因为它们每 50 秒执行一次。此计算是:(batch time * default multiplier
),其中当前默认乘数为 10。在您的情况下,这是 5 * 10 = 50
,这解释了为什么尖峰每 50 秒可见一次。
我写了一个 stateful-wordCount spark streaming 应用程序,它可以连续接收来自 Kafka 的数据。我的代码包含一个 mapWithState
函数并且可以 运行 正确。当我检查 spark UI 的流统计数据时,我在 处理时间 图表中发现了一些 周期性脉冲 。我认为这可能是由于使用 checkpoint 引起的。希望有人能解释一下,非常感谢!
和完成的批次 table:
我发现一些 1 秒时间成本的批次会定期出现。然后我进入一个 1 秒时间成本批次和一个亚秒时间成本批次,发现 1 秒时间成本批次比另一个多一个作业。
比较两种批次:
好像是由checkpoint
引起的,但我不确定。
有谁能详细解释一下吗?谢谢!
这是我的代码:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object StateApp {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(
s"""
|Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <batchDuration> is the batch duration of spark streaming
| <checkpointPath> is the checkpoint directory
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics, bd, cpp) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test")
val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt))
ssc.checkpoint(cpp)
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// test the messages' receiving speed
messages.foreachRDD(rdd =>
println(System.currentTimeMillis() + "\t" + System.currentTimeMillis() / 1000 + "\t" + (rdd.count() / bd.toInt).toString))
// the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC"
// wordDstream: (word, 1), eg. (ABC, 1)
val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1))
// this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc)).print()
// Start the computation
ssc.start()
ssc.awaitTermination() }
}
您看到的这些小峰值是由于将数据检查点设置到持久存储造成的。为了让 Spark 进行状态完整转换,它需要在每个定义的时间间隔内可靠地存储您的数据,以便能够在发生故障时恢复。
请注意,峰值在时间上是一致的,因为它们每 50 秒执行一次。此计算是:(batch time * default multiplier
),其中当前默认乘数为 10。在您的情况下,这是 5 * 10 = 50
,这解释了为什么尖峰每 50 秒可见一次。