运行 同步Faust代理
Running Faust Agent Synchronously
检查下面的代码
@app.agent()
async def process(stream):
async for value in stream.take(5000, within=5):
process(value)
代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个记录处理完成之前再选择 500 万条记录。基本上我想运行代理同步。有什么办法可以做到吗?
我认为您可以在代理上将 concurrency 设置为 1,这样可以有效地使其同步。
如果您这样做,您可能还会发现修改 topic partitions 很有用,但我对这两个设置之间的关系没有完全理解(只是想指出一个可能有用的途径) .
我尝试使用以下代码查看工作人员是否正在执行第二批记录,而第一批处理尚未完成
@app.agent()
async def process(stream):
async for value in stream.take(5000, within=5):
print(1)
await async.sleep(30)
工人打印了1
,等了30秒才打印2
。 await 语句将控制权交还给事件循环,但在本例中它等待,这意味着批处理一个接一个地执行。因此这是同步的。
PS。提交偏移量、重新平衡、监控等是异步操作,由事件循环处理。
检查下面的代码
@app.agent()
async def process(stream):
async for value in stream.take(5000, within=5):
process(value)
代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个记录处理完成之前再选择 500 万条记录。基本上我想运行代理同步。有什么办法可以做到吗?
我认为您可以在代理上将 concurrency 设置为 1,这样可以有效地使其同步。
如果您这样做,您可能还会发现修改 topic partitions 很有用,但我对这两个设置之间的关系没有完全理解(只是想指出一个可能有用的途径) .
我尝试使用以下代码查看工作人员是否正在执行第二批记录,而第一批处理尚未完成
@app.agent()
async def process(stream):
async for value in stream.take(5000, within=5):
print(1)
await async.sleep(30)
工人打印了1
,等了30秒才打印2
。 await 语句将控制权交还给事件循环,但在本例中它等待,这意味着批处理一个接一个地执行。因此这是同步的。
PS。提交偏移量、重新平衡、监控等是异步操作,由事件循环处理。