Apache Beam ReadFromKafka 与 KafkaConsume

Apache Beam ReadFromKafka vs KafkaConsume

我正在使用一个简单的 Apache Beam 管道,该管道包括从无限制的 Kafka 主题中读取并打印出值。我有两种口味。这是通过 Flink Runner 完成的。

版本 1

  with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic" >> ReadFromKafka(
                    consumer_config=consumer_config,
                    topics=[producer_topic])
         | 'log' >> beam.ParDo(LogData())

这个使用 from apache_beam.io.kafka import ReadFromKafka(即 Apache Beam 附带的默认实现)。

版本 2

   with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
                    consumer_config={
                        "topic": producer_topic,
                        'auto_offset_reset': 'earliest',
                        "group_id": 'transaction_classification',
                        "bootstrap_servers": servers,
                    })

这个正在使用光束块:

from beam_nuggets.io.kafkaio import KafkaConsume

我已将 Kafka 生产者配置为每 1 秒生产一个元素。

我观察到的是,当我从 ReadFromKafka(版本 1)消费时,元素的生成间隔大约为 4-6 秒,并一起批处理。

另一方面,如果我用 KafkaConsume(版本 2)尝试同样的事情,那么我会在生成元素时(即每秒)获取元素,这正是我预期的行为。

我试图让两者的 consumer_config 相同,但它似乎对版本 1 没有任何影响。

现在,我想坚持使用版本 1,因为它在 Flink UI 中为我提供了正确的指标,而版本 2 工作得更好,我在 Flink 中没有得到任何指标(一切都被报告为收到 0 个字节/收到 0 个记录)。

有没有人有什么想法?

后者使用原生的Pythonkafka库;看到 Beam 可能需要 Kafka Client JMX Mbeans 用于度量公开,这可以解释为什么在使用非 JVM 客户端时它们为零

https://github.com/mohaseeb/beam-nuggets/blob/master/beam_nuggets/io/kafkaio.py#L4

虽然前者是 Java 代码的包装器,因此可以解释差异;消费者轮询 returns 一个 Java 迭代器对象,而不是像 Python 客户端那样直接通过本机生成器直接通过单个记录 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py#L103