Python Kafka - 文件中的最大消息数
Python Kafka - max messages in file
有没有办法只获取一条消息,将其保存到文件中,然后终止脚本?
我想循环播放,以便将每条消息保存到一个单独的文件中。
从 cmd 级别,我有一个参数:
max-messages 1
但是在 python 中我还没有找到这样的东西。代码如下:
from kafka import KafkaConsumer
import sys
import datetime
now_day = datetime.datetime.now().strftime("%Y-%m-%d")
print(now_day)
directory = "D:/Kafka/test/files_" + now_day + "/earnix_topic_"
print(directory)
now_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M")
print("Current date: " + now_datetime)
full_path = directory + now_datetime + ".txt"
print(full_path)
bootstrap_servers = ['prod-kafka-wrk01:0000','prod-kafka-wrk02:0001','prod-kafka-wrk03:0003']
# Define topic name from where the message will recieve
topicName = 'notify.products.client.topic'
consumer = KafkaConsumer(topicName, group_id ='groupABasdsadC',bootstrap_servers = bootstrap_servers)
# Read and print message from consumer
for msg in consumer:
file = open(full_path, "w+")
file.write(str(msg.value))
file.close()
sys.exit()
如果您使用:
,则您隐式地对消费者使用了 __iter__()
或 __getitem__()
方法:
for msg in consumer:
#do something...
您可以使用 built-in next()
函数,该函数也将使用 __iter__()
方法:
msg = next(consumer)
有关 python 可迭代类型的更多信息,请参阅:https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPython/Iterables.html
或者您可以忽略迭代器适配器并使用 poll(...)
方法,这将 return 一个较小的主题当前消息列表。参见 https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
如果你只想消耗一条记录并结束脚本,你需要在循环中缩进sys.exit()
有没有办法只获取一条消息,将其保存到文件中,然后终止脚本? 我想循环播放,以便将每条消息保存到一个单独的文件中。 从 cmd 级别,我有一个参数:
max-messages 1
但是在 python 中我还没有找到这样的东西。代码如下:
from kafka import KafkaConsumer
import sys
import datetime
now_day = datetime.datetime.now().strftime("%Y-%m-%d")
print(now_day)
directory = "D:/Kafka/test/files_" + now_day + "/earnix_topic_"
print(directory)
now_datetime = datetime.datetime.now().strftime("%Y-%m-%d_%H%M")
print("Current date: " + now_datetime)
full_path = directory + now_datetime + ".txt"
print(full_path)
bootstrap_servers = ['prod-kafka-wrk01:0000','prod-kafka-wrk02:0001','prod-kafka-wrk03:0003']
# Define topic name from where the message will recieve
topicName = 'notify.products.client.topic'
consumer = KafkaConsumer(topicName, group_id ='groupABasdsadC',bootstrap_servers = bootstrap_servers)
# Read and print message from consumer
for msg in consumer:
file = open(full_path, "w+")
file.write(str(msg.value))
file.close()
sys.exit()
如果您使用:
,则您隐式地对消费者使用了__iter__()
或 __getitem__()
方法:
for msg in consumer:
#do something...
您可以使用 built-in next()
函数,该函数也将使用 __iter__()
方法:
msg = next(consumer)
有关 python 可迭代类型的更多信息,请参阅:https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPython/Iterables.html
或者您可以忽略迭代器适配器并使用 poll(...)
方法,这将 return 一个较小的主题当前消息列表。参见 https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
如果你只想消耗一条记录并结束脚本,你需要在循环中缩进sys.exit()