每个微批次中的最大偏移量

Max offsets in each Micro Batch

我在默认触发器中执行流式处理。我的目标是限制每次执行中的读取量,以避免出现巨大的微批次。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是用 2 个数据库编写的。测试了两种方法。

官方文档说 maxOffsetsPerTrigger 限制每个触发间隔处理的偏移量,但这对我不起作用。我是不是误解了这个参数的意思?

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", "1")
  .load()

此外,我阅读了 答案,但我不知道在哪里以及如何正确设置 max.poll.records。我尝试了 readStream 的选项,但没有成功。代码如下:

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("max.poll.records", "1")
  .load()

主要功能:

override def execute(spark: SparkSession, args: Array[String]): Unit = {
    val basePath: String = args(0)
    val kafkaServers: String = args(1)
    val kafkaTopic: String = args(2)
    val checkpoint: String = args(3)

    val read = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "1")
      .load()

    val transformed = read
      .transform(applySchema)
      .transform(conversions)
      .transform(dropDuplicates)
      .transform(partitioning)

    val sink = new FileSystemSink(basePath)

    val query = transformed
      .writeStream
      .outputMode(OutputMode.Append)
      .foreachBatch(sink.writeOnS3 _)
      .option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
      .start()

    query.awaitTermination()
  }

除了上面的问题,还有什么写法来限制offsets?

Spark 版本:2.4.5.

我再次测试,maxOffsetsPerTrigger 工作正常。我误解了触发器的结果,现在它是有道理的。该参数表示读取的总偏移量,而不是每个分区的偏移量。