如何在 confluent kafka python 中读取批量消息?

How to read batch messages in confluent kafka python?

我正在尝试从 Kafka 读取消息,所以我编写了简单的消费者来读取来自 Kafka 的消息。

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages

上面代码输出的消息类型是消息对象。如何获取消息数组?

有没有可能??

注:消费者配置不多,基本而已。

librdkafka(底层 C 库)仅 return 消息一条一条地发送到应用程序,但在内部,消息是从代理中批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待您的应用轮询。

有调整行为的配置:

fetch.wait.max.ms(默认100),给broker积累数据发送的时间 fetch.message.max.bytes(默认1048576,1GB),批次的最大大小 queued.max.messages.kbytes(默认1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。

您还可以在这里找到许多其他内容:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


如果你真的因为你处理数据的方式想要一个数据数组,你可以做的是像你一样在循环中调用低超时的轮询,并在你有 x 条消息或在 y 之后停止你的循环ms,将它们累积在一个集合中。处理生成的数组并重复循环。

生产也是如此:您一个接一个地生产数据,但消息在发送给代理之前会进行批处理。