如何使用 Apache Kafka 中的数据
How to consume data from a Apache Kafka
参加挑战赛,内容如下:您的第一步 - 使用来自 Apache Kafka 的数据样本。
所以他们给了我主题名称、API_KEY和API_SECRET。哦,还有 bootstrap 服务器 。
然后他们声称好像你不熟悉 Kafka,Confluent 提供了全面的文档。好吧,登录到 confluent,创建一个集群,然后.. 消费数据的下一步是什么?
这是将来自 Kafka 的消息放入 Python 列表中的基本模式。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'someTopicName',
bootstrap_servers=['192.168.1.160:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)
messageCache = []
for message in consumer:
messageCache.append(message.value)
在这种情况下,我的 Kafka 代理在我的私人局域网上,使用默认端口,所以我的 bootstrap 服务器列表只是 ["192.168.1.160:9092"].
您可以使用标准计数器和 if 语句将列表保存到文件或其他任何内容,因为假设 Kafka 流永远 运行。例如,我有一个进程使用 Kafka 消息并将它们作为镶木地板中的数据帧保存到每 1,000,000 条消息的 HDFS 中。在这种情况下,我想保存历史消息以开发 ML 模型。 Kafka 的伟大之处在于我可以编写另一个进程来实时评估并可能响应每条消息。
参加挑战赛,内容如下:您的第一步 - 使用来自 Apache Kafka 的数据样本。 所以他们给了我主题名称、API_KEY和API_SECRET。哦,还有 bootstrap 服务器 。 然后他们声称好像你不熟悉 Kafka,Confluent 提供了全面的文档。好吧,登录到 confluent,创建一个集群,然后.. 消费数据的下一步是什么?
这是将来自 Kafka 的消息放入 Python 列表中的基本模式。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'someTopicName',
bootstrap_servers=['192.168.1.160:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)
messageCache = []
for message in consumer:
messageCache.append(message.value)
在这种情况下,我的 Kafka 代理在我的私人局域网上,使用默认端口,所以我的 bootstrap 服务器列表只是 ["192.168.1.160:9092"].
您可以使用标准计数器和 if 语句将列表保存到文件或其他任何内容,因为假设 Kafka 流永远 运行。例如,我有一个进程使用 Kafka 消息并将它们作为镶木地板中的数据帧保存到每 1,000,000 条消息的 HDFS 中。在这种情况下,我想保存历史消息以开发 ML 模型。 Kafka 的伟大之处在于我可以编写另一个进程来实时评估并可能响应每条消息。