调试 Faust 流处理 - 从主题开头重启应用程序

Debugging Faust Stream Processing - Restart App from Beginning of Topic

我正在调试一个简单的应用程序:

import faust

app = faust.App('app08')

# want to start from the beginning of the 
# topic every time the application restarts
@app.agent(topic) 
async def process(stream):
    async for event in stream:
        print(event)

并希望在重新启动此应用程序时让代理从最早的偏移量开始读取。现在,它很聪明,知道读取的最后一条消息的位置,并在重新启动时从该位置开始。尽管搜索了一段时间的文档,但我找不到如何执行此操作的示例。我知道如何做到这一点的唯一方法是更改​​应用程序名称,例如:app08app09.

请记住,偏移量是由 Kafka 服务器使用与您的 faust 应用程序同名的消费者组控制的,我一直在使用 kafaka-consumer-groups CLI(您的 kafka 安装的一部分)来这样做。

kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics

如果您有相对较新版本的 Kafka 运行.[=15,您还可以将 --to-earliest 替换为 --to-datetime 并以 2020-09-20T00:00:00.00 格式提供时间戳=]

如果您想自动执行此操作,我还有 Python API 可自动控制消费者组。