Streamz/Dask:gather不等待buffer的所有结果

Streamz/Dask: gather does not wait for all results of buffer

进口:

from dask.distributed import Client
import streamz
import time

模拟工作量:

def increment(x):
    time.sleep(0.5)
    return x + 1

假设我想在本地 Dask 客户端上处理一些工作负载:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)

这按预期工作,但是sink(print)当然会强制等待每个结果,因此流将不会并行执行。

但是,如果我使用 buffer() 允许缓存结果,那么 gather() 似乎不再正确收集所有结果并且解释器在获取结果之前退出。这种做法:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()

...不为我打印任何结果。 Python 解释器在启动脚本后不久就退出,before buffer() 发出结果,因此 nothing 被打印出来。

但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会相互等待,而是几乎同时打印):

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)

        for i in range(10):
            ps.emit(i)

        time.sleep(10)  # <- force main process to wait while ps is working

这是为什么?我认为 gather() 应该等待一批 10 个结果,因为 buffer() 应该在将它们刷新到 gather() 之前并行缓存恰好 10 个结果。为什么 gather() 在这种情况下不阻塞?

是否有其他好的方法检查 Stream 是否仍包含正在处理的元素以防止主进程过早退出?

  1. "Why is that?":因为 Dask 分布式调度程序(执行流映射器和接收器功能)和您的 python 脚本 运行 在不同的进程中。当 "with" 块上下文结束时,您的 Dask 客户端关闭并且执行在发送到流的项目能够到达接收器功能之前关闭。

  2. "Is there a nice way to otherwise check if a Stream still contains elements being processed":我不知道。 但是:如果您想要的行为是(我只是在这里猜测)并行处理一堆项目,那么 Streamz 不是您应该使用的,vanilla Dask 就足够了。