Kafka 根据请求消费单条消息
Kafka consume single message on request
我想用 gunicorn 制作一个烧瓶 application/API,每个请求都可以-
- 从 Kafka 主题读取单个值
- 做一些处理
- 和 returns 处理后的值给用户(或任何调用 API 的应用程序)。
所以,到目前为止我找不到它的任何例子。那么,下面的函数是正确的做法吗?
consumer = KafkaConsumer(
"first_topic",
bootstrap_servers='xxxxxx',
auto_offset_reset='xxxx',
group_id="my_group")
def get_value_from_topic:
for msg in consumer:
return msg
if __name__ == "__main__":
print(get_value_from_topic())
或者有没有更好的方法使用像 Faust 这样的库来做到这一点?
我使用 Kafka 的原因是为了避免烧瓶工作人员之间同步的所有麻烦(在传统数据库的情况下),因为我只想使用一次来自 Kafka 的值。
乍一看这似乎没问题。您的消费者迭代器迭代一次,您 return 该值。
然而,更惯用的方法是这样
def get_value_from_topic():
return next(consumer)
但是,对于您的其他设置,不能保证这只会轮询一条消息,因为 Kafka 消费者会分批轮询,并且会自动提交这些批次的偏移量。因此,您需要禁用自动提交并自行处理它,例如,在处理 http 请求之后提交将给您至少一次交付,而在此之前提交将给您最多一次。由于您正在与 HTTP 服务器进行交互,因此 Kafka 无法为您提供恰好一次处理
我想用 gunicorn 制作一个烧瓶 application/API,每个请求都可以-
- 从 Kafka 主题读取单个值
- 做一些处理
- 和 returns 处理后的值给用户(或任何调用 API 的应用程序)。
所以,到目前为止我找不到它的任何例子。那么,下面的函数是正确的做法吗?
consumer = KafkaConsumer(
"first_topic",
bootstrap_servers='xxxxxx',
auto_offset_reset='xxxx',
group_id="my_group")
def get_value_from_topic:
for msg in consumer:
return msg
if __name__ == "__main__":
print(get_value_from_topic())
或者有没有更好的方法使用像 Faust 这样的库来做到这一点? 我使用 Kafka 的原因是为了避免烧瓶工作人员之间同步的所有麻烦(在传统数据库的情况下),因为我只想使用一次来自 Kafka 的值。
乍一看这似乎没问题。您的消费者迭代器迭代一次,您 return 该值。
然而,更惯用的方法是这样
def get_value_from_topic():
return next(consumer)
但是,对于您的其他设置,不能保证这只会轮询一条消息,因为 Kafka 消费者会分批轮询,并且会自动提交这些批次的偏移量。因此,您需要禁用自动提交并自行处理它,例如,在处理 http 请求之后提交将给您至少一次交付,而在此之前提交将给您最多一次。由于您正在与 HTTP 服务器进行交互,因此 Kafka 无法为您提供恰好一次处理