每个微批次中的最大偏移量
Max offsets in each Micro Batch
我在默认触发器中执行流式处理。我的目标是限制每次执行中的读取量,以避免出现巨大的微批次。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是用 2 个数据库编写的。测试了两种方法。
官方文档说 maxOffsetsPerTrigger 限制每个触发间隔处理的偏移量,但这对我不起作用。我是不是误解了这个参数的意思?
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
此外,我阅读了 答案,但我不知道在哪里以及如何正确设置 max.poll.records。我尝试了 readStream 的选项,但没有成功。代码如下:
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("max.poll.records", "1")
.load()
主要功能:
override def execute(spark: SparkSession, args: Array[String]): Unit = {
val basePath: String = args(0)
val kafkaServers: String = args(1)
val kafkaTopic: String = args(2)
val checkpoint: String = args(3)
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
val transformed = read
.transform(applySchema)
.transform(conversions)
.transform(dropDuplicates)
.transform(partitioning)
val sink = new FileSystemSink(basePath)
val query = transformed
.writeStream
.outputMode(OutputMode.Append)
.foreachBatch(sink.writeOnS3 _)
.option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
.start()
query.awaitTermination()
}
除了上面的问题,还有什么写法来限制offsets?
Spark 版本:2.4.5.
我再次测试,maxOffsetsPerTrigger 工作正常。我误解了触发器的结果,现在它是有道理的。该参数表示读取的总偏移量,而不是每个分区的偏移量。
我在默认触发器中执行流式处理。我的目标是限制每次执行中的读取量,以避免出现巨大的微批次。有时我的 Spark Jobs 整个周末都会停止,所以当我重新启动它们时,它们需要很长时间才能完成第一个。我还保留了 Dataframes,因为这是用 2 个数据库编写的。测试了两种方法。
官方文档说 maxOffsetsPerTrigger 限制每个触发间隔处理的偏移量,但这对我不起作用。我是不是误解了这个参数的意思?
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
此外,我阅读了
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("max.poll.records", "1")
.load()
主要功能:
override def execute(spark: SparkSession, args: Array[String]): Unit = {
val basePath: String = args(0)
val kafkaServers: String = args(1)
val kafkaTopic: String = args(2)
val checkpoint: String = args(3)
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
val transformed = read
.transform(applySchema)
.transform(conversions)
.transform(dropDuplicates)
.transform(partitioning)
val sink = new FileSystemSink(basePath)
val query = transformed
.writeStream
.outputMode(OutputMode.Append)
.foreachBatch(sink.writeOnS3 _)
.option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
.start()
query.awaitTermination()
}
除了上面的问题,还有什么写法来限制offsets?
Spark 版本:2.4.5.
我再次测试,maxOffsetsPerTrigger 工作正常。我误解了触发器的结果,现在它是有道理的。该参数表示读取的总偏移量,而不是每个分区的偏移量。