在达到最新偏移量之前,Kafka 消息不会写入镶木地板文件

Kafka messages not writen to parquet file until the latest offset is reached

我想读取 Kafka 主题并写入 parquet 或 delta 文件,并能够在读取 Kafka 主题中的所有消息之前从该 parquet 文件中读取。我有这个工作,但后来我做了一个改变,现在我必须等到所有消息都被消耗完之后,镶木地板文件中才有任何东西。我的代码如下。

import org.apache.spark.sql.SparkSession

object MinimalTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("MinimalTest")
      .getOrCreate()

    val kafkaBrokers = "localhost:9092"
    val topic = "FakeTopic"

    val startingOffsets = "earliest"

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("startingOffsets", startingOffsets)
      .option("subscribe", topic)
      .load()

    val path = "<dir>/MinimalTest"
    val checkpointLocation = "<dir>/CheckpointMinimalTest"

    df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", checkpointLocation)
      .option("path", path)
      .start()

    spark.streams.awaitAnyTermination()
  }
}

我还没有发现任何人有同样的问题,我也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“enable.auto.commit”设置为 true,但随后我收到一条错误消息,指出“enable.auto.commit”不受支持。

我正在使用 Spark.2.4.4

您可以通过在 Kafka 源选项 (Structured Streaming + Kafka Integration Guide) 中设置 maxOffsetsPerTrigger 来限制每个触发器处理的偏移量数:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("startingOffsets", startingOffsets)
      .option("maxOffsetsPerTrigger", 10)
      .option("subscribe", topic)
      .load()

如果未定义 maxOffsetsPerTrigger,将使用最新的偏移量,如您在 Spark 2.4.4 code 中所见。