Kafka consumer/producer 与 Python 在 WSL2 Ubuntu
Kafka consumer/producer with Python in WSL2 Ubuntu
这是 的后续问题。
根据线程的建议,我的 Python 代码不起作用的可能原因是因为我要连接到 WSL2 中的远程服务器。 WSL2 Ubuntu.
可能存在未知问题
所以我正在使用以下两种在 WLS2 Ubuntu 本地通信的方法(即通过 localhost:9092
)来测试该假设:
请注意,对于以下两种方法,我已经在一个终端 (T1
) 中安装了 zookeeper 运行ning,其中:
bin/zookeeper-server-start.sh config/zookeeper.properties
方法 1:按照以下命令(取自 this tutorial)
在控制台中与 producer/consumer 通信
步骤 1:创建主题 TutorialTopic
(在终端 T2
)
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
步骤 2:在终端中生成消息 T3
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic
第三步:在另一个终端消费消息T4
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
结果:我在终端 T4
看到一条消息 Hello, World
方法 2:通过两个 Python 模块进行通信,consumer.py
和 producer.py
,如 this tutorial:
步骤 1:在终端 T5
中创建主题 sample
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
步骤 2: 运行 终端中的生产者模块 T6
producer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', value='Hello, World!')
步骤 3:在终端中使用该消息 T7
consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('sample', bootstrap_servers=['localhost: 9092'])
for message in consumer:
print (message.value)
结果:T7
上没有任何显示。 Python 控制台卡在 运行,除非我执行 Ctrl+C
。不会产生其他错误或消息。与我得到 No Broker
错误的 不同。
然而,在这种方法 2 中,如果我通过 T6
中的命令生成消息,如下所示,我惊讶地在消费者终端 T7
:
中收到了它
echo "Hello" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample
我的最终目标是通过 Python 应用从远程服务器使用消息并使用 WSL2 Ubuntu。这个实验似乎暗示 WSL2 不是问题所在。如果有人可以启发这里发生的事情。
produce the message through a command ... I surprisingly receive it in the consumer terminal T7
这并不奇怪,因为您没有在 Python 生产者应用 启动消费者循环后调用 producer.flush()
或 producer.close()
。
控制台生产者通过在未来调用 get() 来阻塞每条记录 - source,有效地刷新其缓冲区
或者,如果您想查看之前发送的记录
,您在 Python 消费者中缺少 --from-beginning
的匹配选项
最终,在同一网络 adapter/subnet 中测试本地 client/server 无助于解决外部网络连接问题
这是
根据线程的建议,我的 Python 代码不起作用的可能原因是因为我要连接到 WSL2 中的远程服务器。 WSL2 Ubuntu.
可能存在未知问题所以我正在使用以下两种在 WLS2 Ubuntu 本地通信的方法(即通过 localhost:9092
)来测试该假设:
请注意,对于以下两种方法,我已经在一个终端 (T1
) 中安装了 zookeeper 运行ning,其中:
bin/zookeeper-server-start.sh config/zookeeper.properties
方法 1:按照以下命令(取自 this tutorial)
在控制台中与 producer/consumer 通信步骤 1:创建主题 TutorialTopic
(在终端 T2
)
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
步骤 2:在终端中生成消息 T3
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic
第三步:在另一个终端消费消息T4
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
结果:我在终端 T4
看到一条消息Hello, World
方法 2:通过两个 Python 模块进行通信,consumer.py
和 producer.py
,如 this tutorial:
步骤 1:在终端 T5
sample
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
步骤 2: 运行 终端中的生产者模块 T6
producer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', value='Hello, World!')
步骤 3:在终端中使用该消息 T7
consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('sample', bootstrap_servers=['localhost: 9092'])
for message in consumer:
print (message.value)
结果:T7
上没有任何显示。 Python 控制台卡在 运行,除非我执行 Ctrl+C
。不会产生其他错误或消息。与我得到 No Broker
错误的
然而,在这种方法 2 中,如果我通过 T6
中的命令生成消息,如下所示,我惊讶地在消费者终端 T7
:
echo "Hello" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample
我的最终目标是通过 Python 应用从远程服务器使用消息并使用 WSL2 Ubuntu。这个实验似乎暗示 WSL2 不是问题所在。如果有人可以启发这里发生的事情。
produce the message through a command ... I surprisingly receive it in the consumer terminal T7
这并不奇怪,因为您没有在 Python 生产者应用 启动消费者循环后调用 producer.flush()
或 producer.close()
。
控制台生产者通过在未来调用 get() 来阻塞每条记录 - source,有效地刷新其缓冲区
或者,如果您想查看之前发送的记录
,您在 Python 消费者中缺少--from-beginning
的匹配选项
最终,在同一网络 adapter/subnet 中测试本地 client/server 无助于解决外部网络连接问题