调试 Faust 流处理 - 从主题开头重启应用程序
Debugging Faust Stream Processing - Restart App from Beginning of Topic
我正在调试一个简单的应用程序:
import faust
app = faust.App('app08')
# want to start from the beginning of the
# topic every time the application restarts
@app.agent(topic)
async def process(stream):
async for event in stream:
print(event)
并希望在重新启动此应用程序时让代理从最早的偏移量开始读取。现在,它很聪明,知道读取的最后一条消息的位置,并在重新启动时从该位置开始。尽管搜索了一段时间的文档,但我找不到如何执行此操作的示例。我知道如何做到这一点的唯一方法是更改应用程序名称,例如:app08
到 app09
.
请记住,偏移量是由 Kafka 服务器使用与您的 faust 应用程序同名的消费者组控制的,我一直在使用 kafaka-consumer-groups
CLI(您的 kafka 安装的一部分)来这样做。
kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics
如果您有相对较新版本的 Kafka 运行.[=15,您还可以将 --to-earliest
替换为 --to-datetime
并以 2020-09-20T00:00:00.00
格式提供时间戳=]
如果您想自动执行此操作,我还有 Python API 可自动控制消费者组。
我正在调试一个简单的应用程序:
import faust
app = faust.App('app08')
# want to start from the beginning of the
# topic every time the application restarts
@app.agent(topic)
async def process(stream):
async for event in stream:
print(event)
并希望在重新启动此应用程序时让代理从最早的偏移量开始读取。现在,它很聪明,知道读取的最后一条消息的位置,并在重新启动时从该位置开始。尽管搜索了一段时间的文档,但我找不到如何执行此操作的示例。我知道如何做到这一点的唯一方法是更改应用程序名称,例如:app08
到 app09
.
请记住,偏移量是由 Kafka 服务器使用与您的 faust 应用程序同名的消费者组控制的,我一直在使用 kafaka-consumer-groups
CLI(您的 kafka 安装的一部分)来这样做。
kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics
如果您有相对较新版本的 Kafka 运行.[=15,您还可以将 --to-earliest
替换为 --to-datetime
并以 2020-09-20T00:00:00.00
格式提供时间戳=]
如果您想自动执行此操作,我还有 Python API 可自动控制消费者组。