在达到最新偏移量之前,Kafka 消息不会写入镶木地板文件
Kafka messages not writen to parquet file until the latest offset is reached
我想读取 Kafka 主题并写入 parquet 或 delta 文件,并能够在读取 Kafka 主题中的所有消息之前从该 parquet 文件中读取。我有这个工作,但后来我做了一个改变,现在我必须等到所有消息都被消耗完之后,镶木地板文件中才有任何东西。我的代码如下。
import org.apache.spark.sql.SparkSession
object MinimalTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("MinimalTest")
.getOrCreate()
val kafkaBrokers = "localhost:9092"
val topic = "FakeTopic"
val startingOffsets = "earliest"
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("startingOffsets", startingOffsets)
.option("subscribe", topic)
.load()
val path = "<dir>/MinimalTest"
val checkpointLocation = "<dir>/CheckpointMinimalTest"
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.option("path", path)
.start()
spark.streams.awaitAnyTermination()
}
}
我还没有发现任何人有同样的问题,我也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“enable.auto.commit”设置为 true,但随后我收到一条错误消息,指出“enable.auto.commit”不受支持。
我正在使用 Spark.2.4.4
您可以通过在 Kafka 源选项 (Structured Streaming + Kafka Integration Guide) 中设置 maxOffsetsPerTrigger
来限制每个触发器处理的偏移量数:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("startingOffsets", startingOffsets)
.option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
.load()
如果未定义 maxOffsetsPerTrigger
,将使用最新的偏移量,如您在 Spark 2.4.4 code 中所见。
我想读取 Kafka 主题并写入 parquet 或 delta 文件,并能够在读取 Kafka 主题中的所有消息之前从该 parquet 文件中读取。我有这个工作,但后来我做了一个改变,现在我必须等到所有消息都被消耗完之后,镶木地板文件中才有任何东西。我的代码如下。
import org.apache.spark.sql.SparkSession
object MinimalTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("MinimalTest")
.getOrCreate()
val kafkaBrokers = "localhost:9092"
val topic = "FakeTopic"
val startingOffsets = "earliest"
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("startingOffsets", startingOffsets)
.option("subscribe", topic)
.load()
val path = "<dir>/MinimalTest"
val checkpointLocation = "<dir>/CheckpointMinimalTest"
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.option("path", path)
.start()
spark.streams.awaitAnyTermination()
}
}
我还没有发现任何人有同样的问题,我也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“enable.auto.commit”设置为 true,但随后我收到一条错误消息,指出“enable.auto.commit”不受支持。
我正在使用 Spark.2.4.4
您可以通过在 Kafka 源选项 (Structured Streaming + Kafka Integration Guide) 中设置 maxOffsetsPerTrigger
来限制每个触发器处理的偏移量数:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("startingOffsets", startingOffsets)
.option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
.load()
如果未定义 maxOffsetsPerTrigger
,将使用最新的偏移量,如您在 Spark 2.4.4 code 中所见。