使用 Kafka 处理生产者和消费者-python

Handling a producer and consumer using Kafka-python

首先我想说我是Kafka和Whosebug的新手,所以如果我没有以正确的方式问这个问题,我很抱歉。 我正在尝试使用 kafka-python 实现生产者-消费者。 但是它不能正常工作

我已经安装了 zookeeper 并启动了 运行。我也有 kafka-server。但是当我 运行 通过 pycharm 成为消费者和生产者时, receiver.The 消费者没有收到消息,消费者继续 运行 但生产者停止了。

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', group_id='test-consumer-group', 
bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
                     auto_offset_reset='earliest')
print("Consuming messages")
for msg in consumer:
    print(msg)

producer.py

from kafka import KafkaProducer

print('above producer')
producer = KafkaProducer(bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
                         compression_type=None
                         )
print('after producer')
for _ in range(100):
    producer.send('test', b'HELLO NITHIN chandran')

print('after sending messages')

代替 my_ip,我已经从 ipconfig 提供了我的系统 ip 地址。

consumer.py输出-

Consuming messages

consumer.py不停运行

producer.py输出-

above producer
after producer
after sending messages

Process finished with exit code 0

producer.py 停止 运行,过程完成,如输出所示。

请帮我解决这个问题。 感谢所有帮助

您的代码没有问题,问题出在您的代理配置上。请设置为初始配置,只需将 log.dirs 更改为您要存储 Kafka 数据的路径即可。 更改配置文件后,请按照以下步骤操作:

  • 停止 zookeeper 和 kafka
  • 清除kafka和zookeeper数据目录
  • 运行动物园管理员和卡法
  • 启动消费者和生产者