Python 脚本在 IDE 中是 运行,但不在终端(Kafka)中

Python script is running in the IDE, but not in the terminal (Kafka)

这可能与Kafka相关,也可能不相关,但我在学习Kafka时遇到过这个问题。我有一个 python 制作人脚本,如下所示:

from kafka import KafkaProducer
from json import dumps


class Producer:

    def __init__(self):
        self.connection = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )

    def push_client(self, data):
        self.connection.send('client-pusher', value=data)


data = {
    "first_name": "Davey",
    "email": "davey@dave.com",
    "group_id": 3,
    "date": "2021-12-12"
}

producer = Producer()
producer.push_client(data)

我 运行 在 Docker 中连接 Kafka Broker,消息在另一端被这个脚本消耗:

import json
from datetime import date
from typing import Optional

from kafka import KafkaConsumer
from pydantic import BaseModel


class Client(BaseModel):
    first_name: str
    email: str
    group_id: Optional[int] = None
    date: date


consumer = KafkaConsumer(
    'client-pusher',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group-id',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    msg_pack = consumer.poll(timeout_ms=500)

    for tp, messages in msg_pack.items():
        for message in messages:
            client = Client(**message.value)
            print(client)

消费者脚本在无限循环中侦听新消息。我可以在终端或 vscode 中 运行 消费者,它总是会打印出生产者的数据指令,但只有当我 运行 生产者脚本在 Visual Studio 代码中时.

如果我运行终端中的生产者脚本

python producer.py

消息没有传递给消费者。没有 运行 时间错误(生产者中的打印语句顺利通过)。我这辈子都看不出 IDE.

的环境有什么不同

我有不同的虚拟环境管理这两个脚本。我已经尝试 运行 使用 venv 的完整路径连接生产者,直接从 vscode 的终端复制,例如

/home/me/whatever/dummy-producer/.venv/bin/python producer.py

我还打印了 sys.path 中的所有内容 – 它们在 IDE 和终端之间是相同的。

我还可以尝试找出 vscode 的执行与终端的执行之间的区别吗?如果重要的话,我正在使用 zsh

Kafka 客户端不会立即发送消息;如果您的批次大小小于默认值并且应用程序退出,则实际上是在删除事件。

如果想立即发送,producer中还需要一个方法

def push_client(self, data):
    self.connection.send('client-pusher', value=data)
    self.connection.flush()