火花流作业中的任务之间长时间且一致的等待
Long and consistent wait between tasks in spark streaming job
我在 Mesos 上有一个 Spark Streaming 作业 运行ning。
它的所有批次都需要完全相同的时间,而且这个时间比预期的要长得多。
作业从 kafka 中提取数据,处理数据并将其插入到 cassandra 中,然后再次返回到 kafka 到不同的主题中。
每批(下图)有 3 个作业,其中 2 个从 kafka 拉取,处理并插入到 cassandra,另一个从 kafka 拉取,处理并推回 kafka。
我检查了 spark UI 中的批次,发现它们都花费了相同的时间(4 秒),但进一步深入,它们实际上每个处理时间不到一秒,但它们都有一个差距同时(大约 4 秒)。
添加更多的执行器或更多的处理能力看起来不会有什么不同。
Details of batch: Processing time = 12s & total delay = 1.2 s
??
所以我深入研究了批处理的每个作业(它们都花费完全相同的时间 = 4 秒,即使它们进行不同的处理):
他们都需要 4 秒才能 运行 他们的一个阶段(从 kafka 读取的阶段)。
下面我深入到其中一个阶段(都非常相似):
为什么要等?整个过程其实只需要0.5s到运行,就是在等待。是不是在等卡夫卡?
有没有人经历过类似的事情?
我可能编码错误或配置不正确?
编辑:
这是触发此行为的最少代码。这让我觉得它一定是某种设置。
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)
val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)
stream.map(t => "LEN=" + t._2.length).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
即使所有执行者都在同一个节点(spark.executor.cores=2 spark.cores.max=2
),问题仍然存在,和之前一样正好4秒:One mesos executor
即使主题没有消息(0 条记录的批次),Spark Streaming 每批次也需要 4 秒。
我能够解决这个问题的唯一方法是设置 cores=1
和 cores.max=1
以便它只创建一个任务来执行。
此任务有地点 NODE_LOCAL
。因此,当 NODE_LOCAL
时执行是即时的,但当 Locality 为 ANY
时,连接到 kafka 需要 4 秒 。所有机器都在同一个 10Gb 网络中。知道为什么会这样吗?
问题出在 spark.locality.wait,this link 给了我想法
它的默认值为 3 秒,Spark Streaming 中处理的每个批次都占用了整个时间。
我在使用 Mesos (--conf spark.locality.wait=0
) 提交作业时将其设置为 0 秒,现在一切都按预期运行。
我在 Mesos 上有一个 Spark Streaming 作业 运行ning。 它的所有批次都需要完全相同的时间,而且这个时间比预期的要长得多。 作业从 kafka 中提取数据,处理数据并将其插入到 cassandra 中,然后再次返回到 kafka 到不同的主题中。
每批(下图)有 3 个作业,其中 2 个从 kafka 拉取,处理并插入到 cassandra,另一个从 kafka 拉取,处理并推回 kafka。
我检查了 spark UI 中的批次,发现它们都花费了相同的时间(4 秒),但进一步深入,它们实际上每个处理时间不到一秒,但它们都有一个差距同时(大约 4 秒)。 添加更多的执行器或更多的处理能力看起来不会有什么不同。
Details of batch: Processing time = 12s & total delay = 1.2 s
??
所以我深入研究了批处理的每个作业(它们都花费完全相同的时间 = 4 秒,即使它们进行不同的处理):
他们都需要 4 秒才能 运行 他们的一个阶段(从 kafka 读取的阶段)。 下面我深入到其中一个阶段(都非常相似):
为什么要等?整个过程其实只需要0.5s到运行,就是在等待。是不是在等卡夫卡?
有没有人经历过类似的事情? 我可能编码错误或配置不正确?
编辑:
这是触发此行为的最少代码。这让我觉得它一定是某种设置。
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)
val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)
stream.map(t => "LEN=" + t._2.length).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
即使所有执行者都在同一个节点(spark.executor.cores=2 spark.cores.max=2
),问题仍然存在,和之前一样正好4秒:One mesos executor
即使主题没有消息(0 条记录的批次),Spark Streaming 每批次也需要 4 秒。
我能够解决这个问题的唯一方法是设置 cores=1
和 cores.max=1
以便它只创建一个任务来执行。
此任务有地点 NODE_LOCAL
。因此,当 NODE_LOCAL
时执行是即时的,但当 Locality 为 ANY
时,连接到 kafka 需要 4 秒 。所有机器都在同一个 10Gb 网络中。知道为什么会这样吗?
问题出在 spark.locality.wait,this link 给了我想法
它的默认值为 3 秒,Spark Streaming 中处理的每个批次都占用了整个时间。
我在使用 Mesos (--conf spark.locality.wait=0
) 提交作业时将其设置为 0 秒,现在一切都按预期运行。