Kafka 根据请求消费单条消息

Kafka consume single message on request

我想用 gunicorn 制作一个烧瓶 application/API,每个请求都可以-

  1. 从 Kafka 主题读取单个值
  2. 做一些处理
  3. 和 returns 处理后的值给用户(或任何调用 API 的应用程序)。

所以,到目前为止我找不到它的任何例子。那么,下面的函数是正确的做法吗?

consumer = KafkaConsumer(
        "first_topic",
        bootstrap_servers='xxxxxx',
        auto_offset_reset='xxxx',
        group_id="my_group")

def get_value_from_topic:
    for msg in consumer:
        return msg

if __name__ == "__main__":
    print(get_value_from_topic())

或者有没有更好的方法使用像 Faust 这样的库来做到这一点? 我使用 Kafka 的原因是为了避免烧瓶工作人员之间同步的所有麻烦(在传统数据库的情况下),因为我只想使用一次来自 Kafka 的值。

乍一看这似乎没问题。您的消费者迭代器迭代一次,您 return 该值。

然而,更惯用的方法是这样

def get_value_from_topic():
    return next(consumer)

但是,对于您的其他设置,不能保证这只会轮询一条消息,因为 Kafka 消费者会分批轮询,并且会自动提交这些批次的偏移量。因此,您需要禁用自动提交并自行处理它,例如,在处理 http 请求之后提交将给您至少一次交付,而在此之前提交将给您最多一次。由于您正在与 HTTP 服务器进行交互,因此 Kafka 无法为您提供恰好一次处理