Kafka消费者不打印任何东西
Kafka consumer does not print anything
我正在学习本教程:https://towardsdatascience.com/kafka-docker-python-408baf0e1088 为了 运行 使用 Kafka 的生产者-消费者示例,Docker 和 Python。我的问题是我的终端打印生产者的迭代,而不打印消费者的迭代。
我在本地 运行 宁这个例子,所以:
- 我在一个终端选项卡中完成了:
docker-compose -f docker-compose-expose.yml up
我的 docker-compose-expose.yml 是这样的:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic_test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
我看到了来自 kafka 和 zookeeper 的日志,所以我假设它正在工作。
2. 在另一个选项卡中,我 运行 python producer.py
我的 producer.py 是这样的:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(9999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data)
sleep(0.5)
它打印了迭代,所以我认为它在工作。
3. 最后,在另一个选项卡中,我 运行 python consumer.py
我的 consumer.py 是:
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
所以,问题来了。根据教程,它应该打印如下内容:
{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
{'counter': 5}
{'counter': 6}
[...]
但是,我的终端没有打印任何东西。
此外,如果我解决了这个问题,我想 运行 一台机器上的生产者和另一台机器上的消费者。我只需要在 docker-compose-expose.yml 中更改 KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
写生产者的 IP 地址代替本地主机,对吗?
更新:我在另一台机器上试过,这些步骤现在有效。也许在我的实验中,我改变了一些关于 kafka 或 docker 的东西,所以它不起作用。但是我不知道为什么
基本上我已经明白问题出在一些正在执行的 images/processes 中。 docker-compose stop
和 docker-compose rm -f
我解决了。
我正在学习本教程:https://towardsdatascience.com/kafka-docker-python-408baf0e1088 为了 运行 使用 Kafka 的生产者-消费者示例,Docker 和 Python。我的问题是我的终端打印生产者的迭代,而不打印消费者的迭代。 我在本地 运行 宁这个例子,所以:
- 我在一个终端选项卡中完成了:
docker-compose -f docker-compose-expose.yml up
我的 docker-compose-expose.yml 是这样的:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic_test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
我看到了来自 kafka 和 zookeeper 的日志,所以我假设它正在工作。
2. 在另一个选项卡中,我 运行 python producer.py
我的 producer.py 是这样的:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(9999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data)
sleep(0.5)
它打印了迭代,所以我认为它在工作。
3. 最后,在另一个选项卡中,我 运行 python consumer.py
我的 consumer.py 是:
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
所以,问题来了。根据教程,它应该打印如下内容:
{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
{'counter': 5}
{'counter': 6}
[...]
但是,我的终端没有打印任何东西。
此外,如果我解决了这个问题,我想 运行 一台机器上的生产者和另一台机器上的消费者。我只需要在 docker-compose-expose.yml 中更改 KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
写生产者的 IP 地址代替本地主机,对吗?
更新:我在另一台机器上试过,这些步骤现在有效。也许在我的实验中,我改变了一些关于 kafka 或 docker 的东西,所以它不起作用。但是我不知道为什么
基本上我已经明白问题出在一些正在执行的 images/processes 中。 docker-compose stop
和 docker-compose rm -f
我解决了。