我可以使用 Lambda 函数中的 AWS MSK kafka 主题吗?
Can I consume an AWS MSK kafka topic from a Lambda function?
我认为使用 lambda 来消费到达 AWS MSK Kafka 集群中的主题的消息是微不足道的,但我无法从 AWS 文档中找到执行此操作的方法。有可能吗?
是的。您可以在 Lambda 代码中使用 Kafka 客户端库。在下面查找 Python 的示例:
#!/bin/env python
import json
import logging
import time
import os
from kafka import KafkaConsumer
"""
Read data from MSK and console it out.
Required environment variables:
MSK_BOOTSTRAP_SRV: MSK Bootstrap servers.
MSK_TOPIC_NAME: MSK topic.
"""
kafka_client = None
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def lambda_handler(event, context):
logger.debug('MSK consumer starts.')
try:
consumer = KafkaConsumer(os.environ['MSK_TOPIC_NAME'], auto_offset_reset='earliest',
bootstrap_servers=os.environ['MSK_BOOTSTRAP_SRV'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
if consumer is not None:
consumer.close()
except Exception as ex:
logger.error('Exception: {}'.format(ex))
return
我认为使用 lambda 来消费到达 AWS MSK Kafka 集群中的主题的消息是微不足道的,但我无法从 AWS 文档中找到执行此操作的方法。有可能吗?
是的。您可以在 Lambda 代码中使用 Kafka 客户端库。在下面查找 Python 的示例:
#!/bin/env python
import json
import logging
import time
import os
from kafka import KafkaConsumer
"""
Read data from MSK and console it out.
Required environment variables:
MSK_BOOTSTRAP_SRV: MSK Bootstrap servers.
MSK_TOPIC_NAME: MSK topic.
"""
kafka_client = None
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def lambda_handler(event, context):
logger.debug('MSK consumer starts.')
try:
consumer = KafkaConsumer(os.environ['MSK_TOPIC_NAME'], auto_offset_reset='earliest',
bootstrap_servers=os.environ['MSK_BOOTSTRAP_SRV'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
if consumer is not None:
consumer.close()
except Exception as ex:
logger.error('Exception: {}'.format(ex))
return