Apache Beam ReadFromKafka 与 KafkaConsume
Apache Beam ReadFromKafka vs KafkaConsume
我正在使用一个简单的 Apache Beam 管道,该管道包括从无限制的 Kafka 主题中读取并打印出值。我有两种口味。这是通过 Flink Runner 完成的。
版本 1
with beam.Pipeline(options=beam_options) as p:
(p
| "Read from Kafka topic" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[producer_topic])
| 'log' >> beam.ParDo(LogData())
这个使用 from apache_beam.io.kafka import ReadFromKafka
(即 Apache Beam 附带的默认实现)。
版本 2
with beam.Pipeline(options=beam_options) as p:
(p
| "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
consumer_config={
"topic": producer_topic,
'auto_offset_reset': 'earliest',
"group_id": 'transaction_classification',
"bootstrap_servers": servers,
})
这个正在使用光束块:
from beam_nuggets.io.kafkaio import KafkaConsume
我已将 Kafka 生产者配置为每 1 秒生产一个元素。
我观察到的是,当我从 ReadFromKafka
(版本 1)消费时,元素的生成间隔大约为 4-6 秒,并一起批处理。
另一方面,如果我用 KafkaConsume
(版本 2)尝试同样的事情,那么我会在生成元素时(即每秒)获取元素,这正是我预期的行为。
我试图让两者的 consumer_config
相同,但它似乎对版本 1 没有任何影响。
现在,我想坚持使用版本 1,因为它在 Flink UI 中为我提供了正确的指标,而版本 2 工作得更好,我在 Flink 中没有得到任何指标(一切都被报告为收到 0 个字节/收到 0 个记录)。
有没有人有什么想法?
后者使用原生的Pythonkafka库;看到 Beam 可能需要 Kafka Client JMX Mbeans 用于度量公开,这可以解释为什么在使用非 JVM 客户端时它们为零
https://github.com/mohaseeb/beam-nuggets/blob/master/beam_nuggets/io/kafkaio.py#L4
虽然前者是 Java 代码的包装器,因此可以解释差异;消费者轮询 returns 一个 Java 迭代器对象,而不是像 Python 客户端那样直接通过本机生成器直接通过单个记录
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py#L103
我正在使用一个简单的 Apache Beam 管道,该管道包括从无限制的 Kafka 主题中读取并打印出值。我有两种口味。这是通过 Flink Runner 完成的。
版本 1
with beam.Pipeline(options=beam_options) as p:
(p
| "Read from Kafka topic" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[producer_topic])
| 'log' >> beam.ParDo(LogData())
这个使用 from apache_beam.io.kafka import ReadFromKafka
(即 Apache Beam 附带的默认实现)。
版本 2
with beam.Pipeline(options=beam_options) as p:
(p
| "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
consumer_config={
"topic": producer_topic,
'auto_offset_reset': 'earliest',
"group_id": 'transaction_classification',
"bootstrap_servers": servers,
})
这个正在使用光束块:
from beam_nuggets.io.kafkaio import KafkaConsume
我已将 Kafka 生产者配置为每 1 秒生产一个元素。
我观察到的是,当我从 ReadFromKafka
(版本 1)消费时,元素的生成间隔大约为 4-6 秒,并一起批处理。
另一方面,如果我用 KafkaConsume
(版本 2)尝试同样的事情,那么我会在生成元素时(即每秒)获取元素,这正是我预期的行为。
我试图让两者的 consumer_config
相同,但它似乎对版本 1 没有任何影响。
现在,我想坚持使用版本 1,因为它在 Flink UI 中为我提供了正确的指标,而版本 2 工作得更好,我在 Flink 中没有得到任何指标(一切都被报告为收到 0 个字节/收到 0 个记录)。
有没有人有什么想法?
后者使用原生的Pythonkafka库;看到 Beam 可能需要 Kafka Client JMX Mbeans 用于度量公开,这可以解释为什么在使用非 JVM 客户端时它们为零
https://github.com/mohaseeb/beam-nuggets/blob/master/beam_nuggets/io/kafkaio.py#L4
虽然前者是 Java 代码的包装器,因此可以解释差异;消费者轮询 returns 一个 Java 迭代器对象,而不是像 Python 客户端那样直接通过本机生成器直接通过单个记录 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py#L103