如何通过 Spark 从 Kafka 获取至少 N 条日志?
How to get at least N number of logs from Kafka through Spark?
在 Spark 流式处理中,我在收到日志时收到它们。但我想在一次通过中至少获得 N 条日志。如何实现?
来自 answer, it appears there is such a utility in Kafka but doesn't seem to be present in Spark 使之成为可能。
没有允许您为从 Kafka 接收的消息数设置 最小值 值的选项。选项 maxOffsetsPerTrigger
允许您设置 最大 条消息。
如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。
另外(参考你提供的link),这也是Kafka本身无法设置的。您可以设置最小提取字节数,但不能设置最小消息数。
请注意,您可以通过前缀 kafka.
结构化流中的 readStream 设置所有 Kafka 选项,如 Kafka Specific Configurations:
部分所述
"Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")."
这样,您也可以使用消费者配置 kafka.fetch.min.bytes
。然而,在一个 loval Kafka 2.5.0 安装上使用 Spark 3.0.1 测试它没有任何影响。添加配置时 kafka.fetch.max.wait.ms
我测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。
查看Spark的源代码KafkaDataConsumer it looks like the fetch
does not directly account for any min/max bytes compared to the pure KafkaConsumer。
在 Spark 流式处理中,我在收到日志时收到它们。但我想在一次通过中至少获得 N 条日志。如何实现?
来自
没有允许您为从 Kafka 接收的消息数设置 最小值 值的选项。选项 maxOffsetsPerTrigger
允许您设置 最大 条消息。
如果您希望您的微批处理一次处理更多消息,您可以考虑增加触发间隔。
另外(参考你提供的link),这也是Kafka本身无法设置的。您可以设置最小提取字节数,但不能设置最小消息数。
请注意,您可以通过前缀 kafka.
结构化流中的 readStream 设置所有 Kafka 选项,如 Kafka Specific Configurations:
"Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")."
这样,您也可以使用消费者配置 kafka.fetch.min.bytes
。然而,在一个 loval Kafka 2.5.0 安装上使用 Spark 3.0.1 测试它没有任何影响。添加配置时 kafka.fetch.max.wait.ms
我测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我而言)。
查看Spark的源代码KafkaDataConsumer it looks like the fetch
does not directly account for any min/max bytes compared to the pure KafkaConsumer。