使用 FastAPI 和 aiokafka 使用 Kafka 消息
Consume Kafka messages with FastAPI and aiokafka
我想创建一个 API 以使用 FastAPI 使用来自 Kafka 主题的消息。
我正在尝试使用以下代码创建入口点:
import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException
from app.config import (
TOPIC_INGESTED_REQUEST,
KAFKA_BOOTSTRAP_SERVER,
)
logger = logging.getLogger(__name__)
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
# define consumer
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # Consumer must be in a group to commit
enable_auto_commit=True, # Is True by default anyway
auto_commit_interval_ms=1000, # Autocommit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
)
# start consumer
await consumer.start()
retrieved_requests = []
try:
data = await consumer.getmany()
for tp, messages in data.items():
for message in messages:
# Process message
retrieved_requests.append({
"key": messages.key.decode("utf-8"),
"value": json.loads(message.value.decode("utf-8")),
})
except Exception as e:
logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests
我总是以空列表结尾。我不明白为什么我无法从配置的主题中获取消息。
我还在 aiokafka doc 中尝试了“经典”异步 for 循环。但它也不起作用。但是,在经典循环中,我可以看到创建了一个消费者并使用了一些消息。使用 getmany
它总是空的。
谢谢!
我终于设法从 aiokafka
消费者那里找到 getmany
的解决方案。
想法是尝试获取消费者的消息并在没有(更多)消息时设置超时。我还设置了要检索的最大消息数,以防消息过多,但它应该是可选的。
代码如下:
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
def kafka_json_deserializer(serialized):
return json.loads(serialized)
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
"""
Consume a list of 'Requests' from 'TOPIC_INGESTED_REQUEST'.
"""
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # unique identifier for each sidecar
enable_auto_commit=True,
auto_commit_interval_ms=1000, # commit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
value_deserializer=kafka_json_deserializer,
)
logger.info(
f"Start consumer with group ID: '{group_id}' on topic '{TOPIC_INGESTED_REQUEST}'."
)
await consumer.start()
logger.info("Consumer started.")
retrieved_requests = []
try:
result = await consumer.getmany(
timeout_ms=CONSUMER_TIMEOUT_MS, max_records=MAX_RECORDS_PER_CONSUMER
)
logger.info(f"Get {len(result)} messages in {TOPIC_INGESTED_REQUEST}.")
for tp, messages in result.items():
if messages:
for message in messages:
retrieved_requests.append(
{"key": message.key.decode("utf-8"), "value": message.value,}
)
# await consumer.commit({tp: messages[-1].offset + 1})
except Exception as e:
logger.error(
f"Error when trying to consume request for {group_id} on topic {TOPIC_INGESTED_REQUEST}: {str(e)}"
)
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests
我想创建一个 API 以使用 FastAPI 使用来自 Kafka 主题的消息。
我正在尝试使用以下代码创建入口点:
import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException
from app.config import (
TOPIC_INGESTED_REQUEST,
KAFKA_BOOTSTRAP_SERVER,
)
logger = logging.getLogger(__name__)
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
# define consumer
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # Consumer must be in a group to commit
enable_auto_commit=True, # Is True by default anyway
auto_commit_interval_ms=1000, # Autocommit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
)
# start consumer
await consumer.start()
retrieved_requests = []
try:
data = await consumer.getmany()
for tp, messages in data.items():
for message in messages:
# Process message
retrieved_requests.append({
"key": messages.key.decode("utf-8"),
"value": json.loads(message.value.decode("utf-8")),
})
except Exception as e:
logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests
我总是以空列表结尾。我不明白为什么我无法从配置的主题中获取消息。
我还在 aiokafka doc 中尝试了“经典”异步 for 循环。但它也不起作用。但是,在经典循环中,我可以看到创建了一个消费者并使用了一些消息。使用 getmany
它总是空的。
谢谢!
我终于设法从 aiokafka
消费者那里找到 getmany
的解决方案。
想法是尝试获取消费者的消息并在没有(更多)消息时设置超时。我还设置了要检索的最大消息数,以防消息过多,但它应该是可选的。
代码如下:
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
def kafka_json_deserializer(serialized):
return json.loads(serialized)
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
"""
Consume a list of 'Requests' from 'TOPIC_INGESTED_REQUEST'.
"""
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # unique identifier for each sidecar
enable_auto_commit=True,
auto_commit_interval_ms=1000, # commit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
value_deserializer=kafka_json_deserializer,
)
logger.info(
f"Start consumer with group ID: '{group_id}' on topic '{TOPIC_INGESTED_REQUEST}'."
)
await consumer.start()
logger.info("Consumer started.")
retrieved_requests = []
try:
result = await consumer.getmany(
timeout_ms=CONSUMER_TIMEOUT_MS, max_records=MAX_RECORDS_PER_CONSUMER
)
logger.info(f"Get {len(result)} messages in {TOPIC_INGESTED_REQUEST}.")
for tp, messages in result.items():
if messages:
for message in messages:
retrieved_requests.append(
{"key": message.key.decode("utf-8"), "value": message.value,}
)
# await consumer.commit({tp: messages[-1].offset + 1})
except Exception as e:
logger.error(
f"Error when trying to consume request for {group_id} on topic {TOPIC_INGESTED_REQUEST}: {str(e)}"
)
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests