运行 同步Faust代理

Running Faust Agent Synchronously

检查下面的代码

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个记录处理完成之前再选择 500 万条记录。基本上我想运行代理同步。有什么办法可以做到吗?

我认为您可以在代理上将 concurrency 设置为 1,这样可以有效地使其同步。

如果您这样做,您可能还会发现修改 topic partitions 很有用,但我对这两个设置之间的关系没有完全理解(只是想指出一个可能有用的途径) .

我尝试使用以下代码查看工作人员是否正在执行第二批记录,而第一批处理尚未完成

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        print(1)
        await async.sleep(30)

工人打印了1,等了30秒才打印2。 await 语句将控制权交还给事件循环,但在本例中它等待,这意味着批处理一个接一个地执行。因此这是同步的。

PS。提交偏移量、重新平衡、监控等是异步操作,由事件循环处理。