Spark 3 结构化流在 Kafka 源代码中使用 maxOffsetsPerTrigger Trigger.Once
Spark 3 structured streaming use maxOffsetsPerTrigger in Kafka source with Trigger.Once
我们需要在 Kafka 源代码中使用 maxOffsetsPerTrigger
,在结构化流中使用 Trigger.Once()
,但基于此 issue,它似乎在 spark 3 中读取 allAvailable
。是否有在这种情况下实现速率限制的方法?
这是 spark 3 中的示例代码:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
没有其他方法可以正确设置速率限制。如果 maxOffsetsPerTrigger
不适用于使用 Once
触发器的流作业,您可以执行以下操作以获得相同的结果:
选择另一个触发器并使用maxOffsetsPerTrigger
限制速率并在处理完所有数据后手动终止此作业。
使用选项 startingOffsets
和 endingOffsets
使作业成为 batch 作业。重复直到处理完主题中的所有数据。但是,“RunOnce 模式下的 Streaming 优于 Batch”是有原因的,详见 here.
最后一个选择是查看链接的拉取请求并自行编译 Spark。
以下是我们“解决”这个问题的方法。这基本上就是 mike
在接受的答案中写到的方法。
在我们的例子中,消息的大小变化很小,因此我们知道处理一个批处理需要多少时间。所以简而言之,我们:
- 将
Trigger.Once()
更改为 Trigger.ProcessingTime(<ms>)
因为 maxOffsetsPerTrigger
使用此模式
- 通过调用
awaitTermination(<ms>)
模仿 Trigger.Once()
终止了这个 运行ning 查询
- 将处理间隔设置为大于终止间隔,以便恰好一个“批次”适合处理
val kafkaOptions = Map[String, String](
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false",
"subscribePattern" -> "testTopic",
"startingOffsets" -> "earliest",
"maxOffsetsPerTrigger" -> "10", // "batch" size
)
val streamWriterOptions = Map[String, String](
"checkpointLocation" -> "path/to/checkpoints",
)
val processingInterval = 30000L
val terminationInterval = 15000L
sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.writeStream
.options(streamWriterOptions)
.format("Console")
.trigger(Trigger.ProcessingTime(processingInterval))
.start()
.awaitTermination(terminationInterval)
这是可行的,因为将根据 maxOffsetsPerTrigger
限制读取和处理第一批。说,在 10 秒内。然后开始处理第二批,但它在大约 5 秒后在操作中间终止,并且从未达到设定的 30 秒标记。但它正确地存储了偏移量。在下一个 运行.
中拾取并处理这个“杀死”的批次
这种方法的缺点是您必须大致知道处理一个“批次”需要多少时间 - 如果您将 terminationInterval
设置得太低,作业的输出将始终为零。
当然,如果您不关心在一个 运行 中处理的批次的确切数量,您可以轻松地将 processingInterval
调整为比 terminationInterval
.在这种情况下,您可以一次处理不同数量的批次,但仍然尊重 maxOffsetsPerTrigger
.
的值
我们需要在 Kafka 源代码中使用 maxOffsetsPerTrigger
,在结构化流中使用 Trigger.Once()
,但基于此 issue,它似乎在 spark 3 中读取 allAvailable
。是否有在这种情况下实现速率限制的方法?
这是 spark 3 中的示例代码:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
没有其他方法可以正确设置速率限制。如果 maxOffsetsPerTrigger
不适用于使用 Once
触发器的流作业,您可以执行以下操作以获得相同的结果:
选择另一个触发器并使用
maxOffsetsPerTrigger
限制速率并在处理完所有数据后手动终止此作业。使用选项
startingOffsets
和endingOffsets
使作业成为 batch 作业。重复直到处理完主题中的所有数据。但是,“RunOnce 模式下的 Streaming 优于 Batch”是有原因的,详见 here.
最后一个选择是查看链接的拉取请求并自行编译 Spark。
以下是我们“解决”这个问题的方法。这基本上就是 mike
在接受的答案中写到的方法。
在我们的例子中,消息的大小变化很小,因此我们知道处理一个批处理需要多少时间。所以简而言之,我们:
- 将
Trigger.Once()
更改为Trigger.ProcessingTime(<ms>)
因为maxOffsetsPerTrigger
使用此模式 - 通过调用
awaitTermination(<ms>)
模仿Trigger.Once()
终止了这个 运行ning 查询
- 将处理间隔设置为大于终止间隔,以便恰好一个“批次”适合处理
val kafkaOptions = Map[String, String](
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false",
"subscribePattern" -> "testTopic",
"startingOffsets" -> "earliest",
"maxOffsetsPerTrigger" -> "10", // "batch" size
)
val streamWriterOptions = Map[String, String](
"checkpointLocation" -> "path/to/checkpoints",
)
val processingInterval = 30000L
val terminationInterval = 15000L
sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.writeStream
.options(streamWriterOptions)
.format("Console")
.trigger(Trigger.ProcessingTime(processingInterval))
.start()
.awaitTermination(terminationInterval)
这是可行的,因为将根据 maxOffsetsPerTrigger
限制读取和处理第一批。说,在 10 秒内。然后开始处理第二批,但它在大约 5 秒后在操作中间终止,并且从未达到设定的 30 秒标记。但它正确地存储了偏移量。在下一个 运行.
这种方法的缺点是您必须大致知道处理一个“批次”需要多少时间 - 如果您将 terminationInterval
设置得太低,作业的输出将始终为零。
当然,如果您不关心在一个 运行 中处理的批次的确切数量,您可以轻松地将 processingInterval
调整为比 terminationInterval
.在这种情况下,您可以一次处理不同数量的批次,但仍然尊重 maxOffsetsPerTrigger
.