检查 Python 中是否存在 Kafka 主题

Check whether a Kafka topic exists in Python

我想创建一个 Kafka 主题(如果尚不存在)。我知道如何通过 bash 创建主题,但我不知道如何检查它是否存在。

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])

可以对kafka-topics.sh使用--list (List all available topics)选项,查看topics数组中是否存在self.topic,如下图

根据您拥有的主题数量,此方法可能有点繁重。如果是这种情况,您可以使用 --describe (List details for the given topics) 来逃脱,如果主题不存在, 可能 return 为空。我还没有对此进行彻底测试,所以我不能确定这个解决方案 (--describe) 有多可靠,但您可能值得进一步调查一下。

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

输出:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

还有一个 Broker Configuration 可用,因此您不必执行以下任何步骤:

auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.

如果可能的话,我会采用这种方法。

请注意,您应该在代理上为 num.partitionsdefault.replication.factor 设置主题配置 (server.properties),以匹配您在代码段中的设置。

另一个不错的方法是使用 python kafka 模块:

kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions

if topic_name in server_topics:
   your code....

kafka_client.topic_partitions returns list of topics.

为此使用 kafka-python consumer api。

import kafka 
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=your_server_list) 
new_topics = set(wanted_topics)-set(consumer.topics())
for topic in new_topics:
    create(topic)

使用来自 kafka api 的 KafkaAdminClient。未记录但存在方法 list_topics