如何以编程方式检查 Kafka Broker 是否启动以及 运行 in Python
How to programmatically check if Kafka Broker is up and running in Python
我正在尝试使用来自 Kafka 主题的消息。我在 confluent_kafka
消费者周围使用包装器。我需要在开始使用消息之前检查连接是否建立。
我读到消费者很懒,所以我需要执行一些操作才能建立连接。但是我想在不执行 consume
或 poll
操作的情况下检查连接建立。
此外,我尝试提供一些错误的配置以查看民意调查的响应。我得到的回复是:
b'Broker: No more messages'
那么,如何判断是连接参数错误、连接断开还是主题中实际上没有消息?
恐怕没有直接的方法来测试 Kafka 代理是否启动 运行。另请注意,如果您的消费者已经使用了这些消息,这并不意味着这是一种不良行为,显然这并不表示 Kafka 代理已关闭。
一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:
使用confluent-kafka-python
and AdminClient
# Example using confuent_kafka
from confluent_kafka.admin import AdminClient
kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics
if not topics:
raise RuntimeError()
使用kafka-python
and KafkaConsumer
# example using kafka-python
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()
if not topics:
raise RuntimeError()
我正在尝试使用来自 Kafka 主题的消息。我在 confluent_kafka
消费者周围使用包装器。我需要在开始使用消息之前检查连接是否建立。
我读到消费者很懒,所以我需要执行一些操作才能建立连接。但是我想在不执行 consume
或 poll
操作的情况下检查连接建立。
此外,我尝试提供一些错误的配置以查看民意调查的响应。我得到的回复是:
b'Broker: No more messages'
那么,如何判断是连接参数错误、连接断开还是主题中实际上没有消息?
恐怕没有直接的方法来测试 Kafka 代理是否启动 运行。另请注意,如果您的消费者已经使用了这些消息,这并不意味着这是一种不良行为,显然这并不表示 Kafka 代理已关闭。
一种可能的解决方法是执行某种快速操作并查看代理是否响应。一个例子是列出主题:
使用confluent-kafka-python
and AdminClient
# Example using confuent_kafka
from confluent_kafka.admin import AdminClient
kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics
if not topics:
raise RuntimeError()
使用kafka-python
and KafkaConsumer
# example using kafka-python
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()
if not topics:
raise RuntimeError()