在使用结果的同时将 dask 数组提交给分布式客户端

Submit dask arrays to distributed client while using results at the same time

我有代表视频帧的 dask 数组,我想创建多个视频文件。我正在使用 imageio 库,它允许我将帧 "append" 发送到 ffmpeg 子进程。所以我可能有这样的东西:

my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]

所以每个内部列表代表一个视频(或产品)的帧。我正在寻找计算 send/submit 帧的最佳方法,同时也在完成时(按顺序)将帧写入 imageio 。更复杂的是,上面的内部列表实际上是生成器,可以是 100 或 1000 帧。还要记住,由于 imageio 的工作方式,我认为它需要存在于一个进程中。这是我目前所做工作的简化版本:

for frame_arrays in frames_to_write:
    # 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
    future_list = _client.compute(frame_arrays)
    # key -> future
    future_dict = dict(zip(frame_keys, future_list))

    # write the current frame
    # future -> key
    rev_future_dict = {v: k for k, v in future_dict.items()}
    result_iter = as_completed(future_dict.values(), with_results=True)
    for future, result in result_iter:
        frame_key = rev_future_dict[future]
        # get the writer for this specific video and add a new frame
        w = writers[frame_key]
        w.append_data(result)

这行得通,我的实际代码是根据上面的代码重新组织的,以便在编写当前帧的同时提交下一帧,所以我认为有一些好处。我正在考虑用户说 "I want to process X frames at a time" 的解决方案,所以我发送 50 帧,写入 50 帧,再发送 50 帧,写入 50 帧,等等

经过一段时间的研究后我的问题:

  1. result 的数据何时保存在本地内存中?什么时候被迭代器返回或者什么时候完成?
  2. 是否可以使用 dask-core 线程调度程序做这样的事情,这样用户就不必安装分布式?
  3. 是否可以根据 worker 的数量调整发送多少帧?
  4. 有没有办法发送 dask 数组的字典 and/or 使用 as_completed 并包含 "frame_key"?
  5. 如果我加载整个系列的帧并将它们提交给 client/cluster 我可能会终止调度程序,对吗?
  6. 使用 get_client() 然后在 ValueError 上使用 Client() 是否是获取客户端的首选方式(如果用户未提供)?
  7. 是否可以提供 dask/distributed 一个或多个迭代器,以便在工作人员可用时从中提取?
  8. 我是不是傻了?太复杂了?

注意:这是我刚才对 this issue 的扩展,但略有不同。

在看了很多例子之后here我得到了以下结果:

    try:
        # python 3
        from queue import Queue
    except ImportError:
        # python 2
        from Queue import Queue
    from threading import Thread

    def load_data(frame_gen, q):
        for frame_arrays in frame_gen:
            future_list = client.compute(frame_arrays)
            for frame_key, arr_future in zip(frame_keys, future_list):
                q.put({frame_key: arr_future})
        q.put(None)

    input_q = Queue(batch_size if batch_size is not None else 1)
    load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
    remote_q = client.gather(input_q)
    load_thread.start()

    while True:
        future_dict = remote_q.get()
        if future_dict is None:
            break

        # write the current frame
        # this should only be one element in the dictionary, but this is
        # also the easiest way to get access to the data
        for frame_key, result in future_dict.items():
            # frame_key = rev_future_dict[future]
            w = writers[frame_key]
            w.append_data(result)
        input_q.task_done()

    load_thread.join()

这回答了我遇到的大部分问题,并且总体上似乎按照我想要的方式工作。