Kafka消费者不打印任何东西

Kafka consumer does not print anything

我正在学习本教程:https://towardsdatascience.com/kafka-docker-python-408baf0e1088 为了 运行 使用 Kafka 的生产者-消费者示例,Docker 和 Python。我的问题是我的终端打印生产者的迭代,而不打印消费者的迭代。 我在本地 运行 宁这个例子,所以:

  1. 我在一个终端选项卡中完成了:docker-compose -f docker-compose-expose.yml up 我的 docker-compose-expose.yml 是这样的:
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic_test:1:1"
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock

我看到了来自 kafka 和 zookeeper 的日志,所以我假设它正在工作。 2. 在另一个选项卡中,我 运行 python producer.py 我的 producer.py 是这样的:

from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(9999):
    print("Iteration", j)
    data = {'counter': j}
    producer.send('topic_test', value=data)
    sleep(0.5)

它打印了迭代,所以我认为它在工作。 3. 最后,在另一个选项卡中,我 运行 python consumer.py 我的 consumer.py 是:

from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
    'topic_test',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group-id',
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
    event_data = event.value
    # Do whatever you want
    print(event_data)
    sleep(2)

所以,问题来了。根据教程,它应该打印如下内容:

{'counter': 0}
{'counter': 1}
{'counter': 2}
{'counter': 3}
{'counter': 4}
{'counter': 5}
{'counter': 6}
[...]

但是,我的终端没有打印任何东西。 此外,如果我解决了这个问题,我想 运行 一台机器上的生产者和另一台机器上的消费者。我只需要在 docker-compose-expose.yml 中更改 KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 写生产者的 IP 地址代替本地主机,对吗?

更新:我在另一台机器上试过,这些步骤现在有效。也许在我的实验中,我改变了一些关于 kafka 或 docker 的东西,所以它不起作用。但是我不知道为什么

基本上我已经明白问题出在一些正在执行的 images/processes 中。 docker-compose stopdocker-compose rm -f 我解决了。