使用 Kafka 处理生产者和消费者-python
Handling a producer and consumer using Kafka-python
首先我想说我是Kafka和Whosebug的新手,所以如果我没有以正确的方式问这个问题,我很抱歉。
我正在尝试使用 kafka-python 实现生产者-消费者。
但是它不能正常工作
我已经安装了 zookeeper 并启动了 运行。我也有 kafka-server。但是当我 运行 通过 pycharm 成为消费者和生产者时, receiver.The 消费者没有收到消息,消费者继续 运行 但生产者停止了。
consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', group_id='test-consumer-group',
bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
auto_offset_reset='earliest')
print("Consuming messages")
for msg in consumer:
print(msg)
producer.py
from kafka import KafkaProducer
print('above producer')
producer = KafkaProducer(bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
compression_type=None
)
print('after producer')
for _ in range(100):
producer.send('test', b'HELLO NITHIN chandran')
print('after sending messages')
代替 my_ip,我已经从 ipconfig 提供了我的系统 ip 地址。
consumer.py输出-
Consuming messages
consumer.py不停运行
producer.py输出-
above producer
after producer
after sending messages
Process finished with exit code 0
producer.py 停止 运行,过程完成,如输出所示。
请帮我解决这个问题。
感谢所有帮助
您的代码没有问题,问题出在您的代理配置上。请设置为初始配置,只需将 log.dirs
更改为您要存储 Kafka 数据的路径即可。
更改配置文件后,请按照以下步骤操作:
- 停止 zookeeper 和 kafka
- 清除kafka和zookeeper数据目录
- 运行动物园管理员和卡法
- 启动消费者和生产者
首先我想说我是Kafka和Whosebug的新手,所以如果我没有以正确的方式问这个问题,我很抱歉。 我正在尝试使用 kafka-python 实现生产者-消费者。 但是它不能正常工作
我已经安装了 zookeeper 并启动了 运行。我也有 kafka-server。但是当我 运行 通过 pycharm 成为消费者和生产者时, receiver.The 消费者没有收到消息,消费者继续 运行 但生产者停止了。
consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', group_id='test-consumer-group',
bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
auto_offset_reset='earliest')
print("Consuming messages")
for msg in consumer:
print(msg)
producer.py
from kafka import KafkaProducer
print('above producer')
producer = KafkaProducer(bootstrap_servers=['my_ip:9092'], api_version=(0, 10, 1),
compression_type=None
)
print('after producer')
for _ in range(100):
producer.send('test', b'HELLO NITHIN chandran')
print('after sending messages')
代替 my_ip,我已经从 ipconfig 提供了我的系统 ip 地址。
consumer.py输出-
Consuming messages
consumer.py不停运行
producer.py输出-
above producer
after producer
after sending messages
Process finished with exit code 0
producer.py 停止 运行,过程完成,如输出所示。
请帮我解决这个问题。 感谢所有帮助
您的代码没有问题,问题出在您的代理配置上。请设置为初始配置,只需将 log.dirs
更改为您要存储 Kafka 数据的路径即可。
更改配置文件后,请按照以下步骤操作:
- 停止 zookeeper 和 kafka
- 清除kafka和zookeeper数据目录
- 运行动物园管理员和卡法
- 启动消费者和生产者