火花流作业中的任务之间长时间且一致的等待

Long and consistent wait between tasks in spark streaming job

我在 Mesos 上有一个 Spark Streaming 作业 运行ning。 它的所有批次都需要完全相同的时间,而且这个时间比预期的要长得多。 作业从 kafka 中提取数据,处理数据并将其插入到 cassandra 中,然后再次返回到 kafka 到不同的主题中。

每批(下图)有 3 个作业,其中 2 个从 kafka 拉取,处理并插入到 cassandra,另一个从 kafka 拉取,处理并推回 kafka。

我检查了 spark UI 中的批次,发现它们都花费了相同的时间(4 秒),但进一步深入,它们实际上每个处理时间不到一秒,但它们都有一个差距同时(大约 4 秒)。 添加更多的执行器或更多的处理能力看起来不会有什么不同。

Details of batch: Processing time = 12s & total delay = 1.2 s ??

所以我深入研究了批处理的每个作业(它们都花费完全相同的时间 = 4 秒,即使它们进行不同的处理):

他们都需要 4 秒才能 运行 他们的一个阶段(从 kafka 读取的阶段)。 下面我深入到其中一个阶段(都非常相似):

为什么要等?整个过程其实只需要0.5s到运行,就是在等待。是不是在等卡夫卡?

有没有人经历过类似的事情? 我可能编码错误或配置不正确?

编辑:

这是触发此行为的最少代码。这让我觉得它一定是某种设置。

object Test {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf(true)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "####,####,####",
      "group.id" -> "test"
    )

    val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      streamingContext, kafkaParams, Set("test_topic")
    )

    stream.map(t => "LEN=" + t._2.length).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

即使所有执行者都在同一个节点(spark.executor.cores=2 spark.cores.max=2),问题仍然存在,和之前一样正好4秒:One mesos executor

即使主题没有消息(0 条记录的批次),Spark Streaming 每批次也需要 4 秒。

我能够解决这个问题的唯一方法是设置 cores=1cores.max=1 以便它只创建一个任务来执行。

此任务有地点 NODE_LOCAL。因此,当 NODE_LOCAL 时执行是即时的,但当 Locality 为 ANY 时,连接到 kafka 需要 4 秒 。所有机器都在同一个 10Gb 网络中。知道为什么会这样吗?

问题出在 spark.locality.wait,this link 给了我想法

它的默认值为 3 秒,Spark Streaming 中处理的每个批次都占用了整个时间。

我在使用 Mesos (--conf spark.locality.wait=0) 提交作业时将其设置为 0 秒,现在一切都按预期运行。