如何在 confluent kafka python 中读取批量消息?
How to read batch messages in confluent kafka python?
我正在尝试从 Kafka 读取消息,所以我编写了简单的消费者来读取来自 Kafka 的消息。
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
上面代码输出的消息类型是消息对象。如何获取消息数组?
有没有可能??
注:消费者配置不多,基本而已。
librdkafka(底层 C 库)仅 return 消息一条一条地发送到应用程序,但在内部,消息是从代理中批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待您的应用轮询。
有调整行为的配置:
fetch.wait.max.ms
(默认100),给broker积累数据发送的时间
fetch.message.max.bytes
(默认1048576,1GB),批次的最大大小
queued.max.messages.kbytes
(默认1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。
您还可以在这里找到许多其他内容:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md
如果你真的因为你处理数据的方式想要一个数据数组,你可以做的是像你一样在循环中调用低超时的轮询,并在你有 x 条消息或在 y 之后停止你的循环ms,将它们累积在一个集合中。处理生成的数组并重复循环。
生产也是如此:您一个接一个地生产数据,但消息在发送给代理之前会进行批处理。
我正在尝试从 Kafka 读取消息,所以我编写了简单的消费者来读取来自 Kafka 的消息。
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
上面代码输出的消息类型是消息对象。如何获取消息数组?
有没有可能??
注:消费者配置不多,基本而已。
librdkafka(底层 C 库)仅 return 消息一条一条地发送到应用程序,但在内部,消息是从代理中批量获取的,因此没有性能下降。消息在内部缓冲区中排队,等待您的应用轮询。
有调整行为的配置:
fetch.wait.max.ms
(默认100),给broker积累数据发送的时间
fetch.message.max.bytes
(默认1048576,1GB),批次的最大大小
queued.max.messages.kbytes
(默认1000000),内部队列中数据的最大大小。如果您不定期轮询,数据将不会从队列中清除,您将无法获取更多数据。
您还可以在这里找到许多其他内容:https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md
如果你真的因为你处理数据的方式想要一个数据数组,你可以做的是像你一样在循环中调用低超时的轮询,并在你有 x 条消息或在 y 之后停止你的循环ms,将它们累积在一个集合中。处理生成的数组并重复循环。
生产也是如此:您一个接一个地生产数据,但消息在发送给代理之前会进行批处理。