Python 具有偏移量管理的 Kafka 消费者
Python Kafka consumer with offset management
我是 Kafka 的新手,我正在尝试在 Kafka 中设置一个消费者,以便它读取 Kafka Producer 发布的消息。
如果我错了,请纠正我,我理解 Kafka 消费者存储是否在 ZooKeeper 中偏移的方式?但是,我没有 zookeeper 实例 运行,我想每 5 分钟轮询一次,看看是否有任何新消息发布。
到目前为止,我的代码是:
import logging
from django.conf import settings
import kafka
import sys
import json
bootstrap_servers = ['localhost:8080']
topicName = 'test-info'
consumer = kafka.KafkaConsumer (topicName, group_id = 'test',bootstrap_servers =
bootstrap_servers,
auto_offset_reset = 'earliest')
count = 0
#print(consumer.topic)
try:
for message in consumer:
#print(type(message.value))
print("\n")
print("<>"*20)
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
print("--"*20)
info = json.loads(message.value)
if info['event'] == "new_record" and info['data']['userId'] == "user1" and info['data']['details']['userTeam'] == "foo":
count = count + 1
print(count, info['data']['details']['team'], info['data']['details']['leadername'],info['data']['details']['category'])
else:
print("Skipping")
print(count)
except KeyboardInterrupt:
sys.exit()
如何保存偏移量,以便下次轮询时读取增量数据?任何指针都会有所帮助。
Kafka消费者在ZooKeeper中存储offset是真的。因为你没有安装 zookeeper。 Kafka 可能使用其内置的 zookeeper。
在您的情况下,您无需再做任何事情,因为您已经设置了 group_id、group_id = 'test'
。因此,消费者将继续自动为特定组消费最后一个偏移量的数据。因为它自动提交了 zookeeper 中的最新偏移量(auto_commit 默认为 True)。
有关更多信息,您可以查看 here
如果您想每 5 分钟检查一次是否有任何新消息发布,您可以在您的消费者 for 循环中添加 time.sleep(300)
。
我是 Kafka 的新手,我正在尝试在 Kafka 中设置一个消费者,以便它读取 Kafka Producer 发布的消息。 如果我错了,请纠正我,我理解 Kafka 消费者存储是否在 ZooKeeper 中偏移的方式?但是,我没有 zookeeper 实例 运行,我想每 5 分钟轮询一次,看看是否有任何新消息发布。
到目前为止,我的代码是:
import logging
from django.conf import settings
import kafka
import sys
import json
bootstrap_servers = ['localhost:8080']
topicName = 'test-info'
consumer = kafka.KafkaConsumer (topicName, group_id = 'test',bootstrap_servers =
bootstrap_servers,
auto_offset_reset = 'earliest')
count = 0
#print(consumer.topic)
try:
for message in consumer:
#print(type(message.value))
print("\n")
print("<>"*20)
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
print("--"*20)
info = json.loads(message.value)
if info['event'] == "new_record" and info['data']['userId'] == "user1" and info['data']['details']['userTeam'] == "foo":
count = count + 1
print(count, info['data']['details']['team'], info['data']['details']['leadername'],info['data']['details']['category'])
else:
print("Skipping")
print(count)
except KeyboardInterrupt:
sys.exit()
如何保存偏移量,以便下次轮询时读取增量数据?任何指针都会有所帮助。
Kafka消费者在ZooKeeper中存储offset是真的。因为你没有安装 zookeeper。 Kafka 可能使用其内置的 zookeeper。
在您的情况下,您无需再做任何事情,因为您已经设置了 group_id、
group_id = 'test'
。因此,消费者将继续自动为特定组消费最后一个偏移量的数据。因为它自动提交了 zookeeper 中的最新偏移量(auto_commit 默认为 True)。 有关更多信息,您可以查看 here如果您想每 5 分钟检查一次是否有任何新消息发布,您可以在您的消费者 for 循环中添加
time.sleep(300)
。