PyKafka 元数据以字节而不是字符串为单位

PyKafka metadata in bytes instead of strings

我发现 PyKafka 有异常行为,这是我最近才开始使用的客户端。

错误如下:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}

错误的来源在这一行:

    self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )

调试我看到客户端使用正确的字符串 IP 连接到种子代理但是当检索代理列表时,它们的 IP 是二进制的,当 PyKafka 尝试再次连接以创建消费者时,这些 IP显然不行。

另一个可能相关的问题是我需要自己将主题名称和消费者组名称转换为字节(与其他客户端一样),但文档中的所有示例都显示了字符串的用法。

卡夫卡经纪人版本:0.10.1.0 PyKafka 版本:2.7.0

好吧,我完全被误导了:那不是 IP,而是 base64 中的主机名(由 Docker 生成)。

检查您的经纪人的 advertised.listeners 配置 - 它定义了在 pykafka 的 Cluster 初始化期间将发送到 ZooKeeper 并继续发送到 pykafka 客户端的主机名。 Docker 可能会破坏此信息,因此您需要使用 advertised.listeners 覆盖它。来自 documentation:

Listeners to publish to ZooKeeper for clients to use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds.

至于bytes/string问题,pykafka的latest development release接受字符串字节作为主题和消费者组名称,以方便程序员。对于旧版本,您需要使用如下技术将字符串参数转换为字节:

topic_name = str_topic_name.encode('ascii')