Python Kafka - 文件中的最大消息数

Python Kafka - max messages in file

有没有办法只获取一条消息,将其保存到文件中,然后终止脚本? 我想循环播放,以便将每条消息保存到一个单独的文件中。 从 cmd 级别,我有一个参数:

max-messages 1 

但是在 python 中我还没有找到这样的东西。代码如下:

from kafka import KafkaConsumer
import sys
import datetime

now_day = datetime.datetime.now().strftime("%Y-%m-%d")
print(now_day)

directory = "D:/Kafka/test/files_" + now_day + "/earnix_topic_"
print(directory)

now_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M")
print("Current date: " + now_datetime)

full_path = directory + now_datetime + ".txt"
print(full_path)

bootstrap_servers = ['prod-kafka-wrk01:0000','prod-kafka-wrk02:0001','prod-kafka-wrk03:0003']

# Define topic name from where the message will recieve
topicName = 'notify.products.client.topic'

consumer = KafkaConsumer(topicName,  group_id ='groupABasdsadC',bootstrap_servers = bootstrap_servers)

# Read and print message from consumer
for msg in consumer:
   file = open(full_path, "w+")
   file.write(str(msg.value))
   file.close()

sys.exit()

如果您使用:

,则您隐式地对消费者使用了 __iter__()__getitem__() 方法:
for msg in consumer:
  #do something...

您可以使用 built-in next() 函数,该函数也将使用 __iter__() 方法:

msg = next(consumer)

有关 python 可迭代类型的更多信息,请参阅:https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPython/Iterables.html

或者您可以忽略迭代器适配器并使用 poll(...) 方法,这将 return 一个较小的主题当前消息列表。参见 https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

如果你只想消耗一条记录并结束脚本,你需要在循环中缩进sys.exit()