如何将 pykafka 组消费者与 gevent 一起使用?
How to use pykafka group consumer with gevent?
我用gevent的pykafka group consumer,结果有重复数据。
显示我的代码:
import gevent
from pykafka import KafkaClient
topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'
def get_consumer():
client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
topic = client.topics[topic_name.encode()]
consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
consumer_group=group.encode(),
auto_commit_enable=True,
)
return consumer
def worker(worker_id):
consumer = get_consumer()
for msg in consumer:
print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))
if __name__ == '__main__':
tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
ret = gevent.joinall(tasks)
结果:
任何人都可以告诉我如何让它工作,pykafka 不支持 gevent 吗?
我打赌这个问题与您使用 gevent 没有任何关系。您注意到消费者数据重复的原因是您使用的是 SimpleConsumer
而不是 BalancedConsumer
。 SimpleConsumer
不执行自动平衡 - 它只是从其起始偏移量开始消耗整个主题。因此,如果您有许多 SimpleConsumer
个实例 运行 并排,就像您在这里所做的那样,每个实例都会从其起始偏移量开始使用整个主题。 BalancedConsumer
(topic.get_balanced_consumer(consumer_group='mygroup')
) 可能就是您想要的。它使用消费者再平衡算法来确保同一组中的消费者 运行 不会收到相同的消息。为此,您的主题需要至少与您使用它的进程一样多的分区。有关详细信息,请参阅 pykafka README and documentation。
我用gevent的pykafka group consumer,结果有重复数据。 显示我的代码:
import gevent
from pykafka import KafkaClient
topic_name = 'test2'
bootstrap_servers = '192.168.199.228:9094,192.168.199.228:9092,192.168.199.228:9093'
group = 'test_g'
def get_consumer():
client = KafkaClient(hosts=bootstrap_servers, use_greenlets=True)
topic = client.topics[topic_name.encode()]
consumer = topic.get_simple_consumer(auto_commit_interval_ms=10000,
consumer_group=group.encode(),
auto_commit_enable=True,
)
return consumer
def worker(worker_id):
consumer = get_consumer()
for msg in consumer:
print('worker {} partition: {}, offset: {}'.format(worker_id, msg.partition, msg.offset))
if __name__ == '__main__':
tasks = [gevent.spawn(worker, *(i, )) for i in range(3)]
ret = gevent.joinall(tasks)
结果: 任何人都可以告诉我如何让它工作,pykafka 不支持 gevent 吗?
我打赌这个问题与您使用 gevent 没有任何关系。您注意到消费者数据重复的原因是您使用的是 SimpleConsumer
而不是 BalancedConsumer
。 SimpleConsumer
不执行自动平衡 - 它只是从其起始偏移量开始消耗整个主题。因此,如果您有许多 SimpleConsumer
个实例 运行 并排,就像您在这里所做的那样,每个实例都会从其起始偏移量开始使用整个主题。 BalancedConsumer
(topic.get_balanced_consumer(consumer_group='mygroup')
) 可能就是您想要的。它使用消费者再平衡算法来确保同一组中的消费者 运行 不会收到相同的消息。为此,您的主题需要至少与您使用它的进程一样多的分区。有关详细信息,请参阅 pykafka README and documentation。