如何通过 Spark 从 Kafka 获取至少 N 条日志?

How to get at least N number of logs from Kafka through Spark?

在 Spark 流式处理中,我在收到日志时收到它们。但我想在一次通过中至少获得 N 条日志。如何实现?

来自 answer, it appears there is such a utility in Kafka but doesn't seem to be present in Spark 使之成为可能。

没有允许您为从 Kafka 接收的消息数设置 最小值 值的选项。选项 maxOffsetsPerTrigger 允许您设置 最大 条消息。

如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。

另外(参考你提供的link),这也是Kafka本身无法设置的。您可以设置最小提取字节数,但不能设置最小消息数。

请注意,您可以通过前缀 kafka. 结构化流中的 readStream 设置所有 Kafka 选项,如 Kafka Specific Configurations:

部分所述

"Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")."

这样,您也可以使用消费者配置 kafka.fetch.min.bytes。然而,在一个 loval Kafka 2.5.0 安装上使用 Spark 3.0.1 测试它没有任何影响。添加配置时 kafka.fetch.max.wait.ms 我测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。

查看Spark的源代码KafkaDataConsumer it looks like the fetch does not directly account for any min/max bytes compared to the pure KafkaConsumer