Python 脚本在 IDE 中是 运行,但不在终端(Kafka)中
Python script is running in the IDE, but not in the terminal (Kafka)
这可能与Kafka相关,也可能不相关,但我在学习Kafka时遇到过这个问题。我有一个 python 制作人脚本,如下所示:
from kafka import KafkaProducer
from json import dumps
class Producer:
def __init__(self):
self.connection = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
def push_client(self, data):
self.connection.send('client-pusher', value=data)
data = {
"first_name": "Davey",
"email": "davey@dave.com",
"group_id": 3,
"date": "2021-12-12"
}
producer = Producer()
producer.push_client(data)
我 运行 在 Docker 中连接 Kafka Broker,消息在另一端被这个脚本消耗:
import json
from datetime import date
from typing import Optional
from kafka import KafkaConsumer
from pydantic import BaseModel
class Client(BaseModel):
first_name: str
email: str
group_id: Optional[int] = None
date: date
consumer = KafkaConsumer(
'client-pusher',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
while True:
msg_pack = consumer.poll(timeout_ms=500)
for tp, messages in msg_pack.items():
for message in messages:
client = Client(**message.value)
print(client)
消费者脚本在无限循环中侦听新消息。我可以在终端或 vscode 中 运行 消费者,它总是会打印出生产者的数据指令,但只有当我 运行 生产者脚本在 Visual Studio 代码中时.
如果我运行终端中的生产者脚本
python producer.py
消息没有传递给消费者。没有 运行 时间错误(生产者中的打印语句顺利通过)。我这辈子都看不出 IDE.
的环境有什么不同
我有不同的虚拟环境管理这两个脚本。我已经尝试 运行 使用 venv 的完整路径连接生产者,直接从 vscode 的终端复制,例如
/home/me/whatever/dummy-producer/.venv/bin/python producer.py
我还打印了 sys.path
中的所有内容 – 它们在 IDE 和终端之间是相同的。
我还可以尝试找出 vscode 的执行与终端的执行之间的区别吗?如果重要的话,我正在使用 zsh
。
Kafka 客户端不会立即发送消息;如果您的批次大小小于默认值并且应用程序退出,则实际上是在删除事件。
如果想立即发送,producer中还需要一个方法
def push_client(self, data):
self.connection.send('client-pusher', value=data)
self.connection.flush()
这可能与Kafka相关,也可能不相关,但我在学习Kafka时遇到过这个问题。我有一个 python 制作人脚本,如下所示:
from kafka import KafkaProducer
from json import dumps
class Producer:
def __init__(self):
self.connection = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
def push_client(self, data):
self.connection.send('client-pusher', value=data)
data = {
"first_name": "Davey",
"email": "davey@dave.com",
"group_id": 3,
"date": "2021-12-12"
}
producer = Producer()
producer.push_client(data)
我 运行 在 Docker 中连接 Kafka Broker,消息在另一端被这个脚本消耗:
import json
from datetime import date
from typing import Optional
from kafka import KafkaConsumer
from pydantic import BaseModel
class Client(BaseModel):
first_name: str
email: str
group_id: Optional[int] = None
date: date
consumer = KafkaConsumer(
'client-pusher',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
while True:
msg_pack = consumer.poll(timeout_ms=500)
for tp, messages in msg_pack.items():
for message in messages:
client = Client(**message.value)
print(client)
消费者脚本在无限循环中侦听新消息。我可以在终端或 vscode 中 运行 消费者,它总是会打印出生产者的数据指令,但只有当我 运行 生产者脚本在 Visual Studio 代码中时.
如果我运行终端中的生产者脚本
python producer.py
消息没有传递给消费者。没有 运行 时间错误(生产者中的打印语句顺利通过)。我这辈子都看不出 IDE.
的环境有什么不同我有不同的虚拟环境管理这两个脚本。我已经尝试 运行 使用 venv 的完整路径连接生产者,直接从 vscode 的终端复制,例如
/home/me/whatever/dummy-producer/.venv/bin/python producer.py
我还打印了 sys.path
中的所有内容 – 它们在 IDE 和终端之间是相同的。
我还可以尝试找出 vscode 的执行与终端的执行之间的区别吗?如果重要的话,我正在使用 zsh
。
Kafka 客户端不会立即发送消息;如果您的批次大小小于默认值并且应用程序退出,则实际上是在删除事件。
如果想立即发送,producer中还需要一个方法
def push_client(self, data):
self.connection.send('client-pusher', value=data)
self.connection.flush()