Kafka consumer/producer 与 Python 在 WSL2 Ubuntu

Kafka consumer/producer with Python in WSL2 Ubuntu

这是 的后续问题。

根据线程的建议,我的 Python 代码不起作用的可能原因是因为我要连接到 WSL2 中的远程服务器。 WSL2 Ubuntu.

可能存在未知问题

所以我正在使用以下两种在 WLS2 Ubuntu 本地通信的方法(即通过 localhost:9092)来测试该假设:

请注意,对于以下两种方法,我已经在一个终端 (T1) 中安装了 zookeeper 运行ning,其中:

bin/zookeeper-server-start.sh config/zookeeper.properties

方法 1:按照以下命令(取自 this tutorial

在控制台中与 producer/consumer 通信

步骤 1:创建主题 TutorialTopic(在终端 T2

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

步骤 2:在终端中生成消息 T3

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

第三步:在另一个终端消费消息T4

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

结果:我在终端 T4

看到一条消息 Hello, World

方法 2:通过两个 Python 模块进行通信,consumer.pyproducer.py,如 this tutorial:

步骤 1:在终端 T5

中创建主题 sample
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample

步骤 2: 运行 终端中的生产者模块 T6

producer.py
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', value='Hello, World!')

步骤 3:在终端中使用该消息 T7

consumer.py
from kafka import KafkaConsumer

consumer = KafkaConsumer('sample', bootstrap_servers=['localhost: 9092'])

for message in consumer:
    print (message.value)

结果T7 上没有任何显示。 Python 控制台卡在 运行,除非我执行 Ctrl+C。不会产生其他错误或消息。与我得到 No Broker 错误的 不同。

然而,在这种方法 2 中,如果我通过 T6 中的命令生成消息,如下所示,我惊讶地在消费者终端 T7:

中收到了它
echo "Hello" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample

我的最终目标是通过 Python 应用从远程服务器使用消息并使用 WSL2 Ubuntu。这个实验似乎暗示 WSL2 不是问题所在。如果有人可以启发这里发生的事情。

produce the message through a command ... I surprisingly receive it in the consumer terminal T7

这并不奇怪,因为您没有在 Python 生产者应用 启动消费者循环后调用 producer.flush()producer.close()

控制台生产者通过在未来调用 get() 来阻塞每条记录 - source,有效地刷新其缓冲区

或者,如果您想查看之前发送的记录

,您在 Python 消费者中缺少 --from-beginning 的匹配选项

最终,在同一网络 adapter/subnet 中测试本地 client/server 无助于解决外部网络连接问题