如何以编程方式检查 Kafka Broker 是否启动以及 运行 in Python

How to programmatically check if Kafka Broker is up and running in Python

我正在尝试使用来自 Kafka 主题的消息。我在 confluent_kafka 消费者周围使用包装器。我需要在开始使用消息之前检查连接是否建立。

我读到消费者很懒,所以我需要执行一些操作才能建立连接。但是我想在不执行 consumepoll 操作的情况下检查连接建立。

此外,我尝试提供一些错误的配置以查看民意调查的响应。我得到的回复是:

b'Broker: No more messages'

那么,如何判断是连接参数错误、连接断开还是主题中实际上没有消息?

恐怕没有直接的方法来测试 Kafka 代理是否启动 运行。另请注意,如果您的消费者已经使用了这些消息,这并不意味着这是一种不良行为,显然这并不表示 Kafka 代理已关闭。


一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:

使用confluent-kafka-python and AdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()

使用kafka-python and KafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()