Spark Structure 流式读取每个微批次的数据两次。如何避免

Spark Structure streaming read data twice per every micro-batch. How to avoid

我对 spark 结构流有一个非常奇怪的问题。 Spark structure streaming 为每个微批次创建两个 spark 作业。 结果,两次从Kafka读取数据。 这是一个简单的代码片段。

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object CheckHowSparkReadFromKafka {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .config(new SparkConf()
        .setAppName(s"simple read from kafka with repartition")
        .setMaster("local[*]")
        .set("spark.driver.host", "localhost"))
      .getOrCreate()
    val testPath = "/tmp/spark-test"
    FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
    import session.implicits._
    val stream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",        "kafka-20002-prod:9092")
      .option("subscribe", "topic")
      .option("maxOffsetsPerTrigger", 1000)
      .option("failOnDataLoss", false)
      .option("startingOffsets", "latest")
      .load()
      .repartitionByRange( $"offset")
      .writeStream
      .option("path", testPath + "/data")
      .option("checkpointLocation", testPath + "/checkpoint")
      .format("parquet")
      .trigger(Trigger.ProcessingTime(10.seconds))
      .start()
    stream.processAllAvailable()

发生这种情况是因为如果 .repartitionByRange( $"offset"),如果我删除此行,一切都会好起来的。 但是使用 spark 创建两个作业,一个具有 1 个阶段,仅从 Kafka 读取,第二个具有 3 个阶段读取 -> 随机播放 -> 写入。 所以第一份工作的结果从未使用过。

这对性能有重大影响。 我的一些 Kafka 主题有 1550 个分区,所以读两遍很重要。 如果我添加缓存,事情会变得更好,但这对我来说不是一种方式。 在本地模式下,批处理中的第一个作业耗时不到 0.1 毫秒,索引为 0 的批处理除外。但在 YARN 集群和 Messos 中,两个作业都完全符合预期,并且在我的主题上耗时将近 1.2 分钟。

为什么会这样?我怎样才能避免这种情况?看起来像虫子?

P.S。我使用 spark 2.4.3.

本例spark没有bug。 两次从 Kafka 读取此数据的根本原因非常简单。 repartitionByRange 函数生成两个 spark 作业。

一个用于实际重新分区。

一个用于采样以找到分区的边界。

请在spark jira

中查找更多详细信息