带期货的 Dask 计算子图

Dask compute subgraph with futures

我想提交一个将执行以下操作的 dask 任务:

  1. 使用 dask.bag (def fakejob)
  2. 构建惰性 dask 图
  3. 从 1. 计算图形并将其保存到 parquet(省略这部分,只是一个动机)

我需要为多个输入执行此操作,所以我一直在尝试像这样使用 dask.distributed 的期货功能。

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    return (
        dask.bag
        .read_text(path)
        .to_dataframe()
    )

futures = client.map(fakejob, [input_path1, input_path2])

问题是我不断收到:AssertionError: daemonic processes are not allowed to have children

我试过 this link 并最终得到了第二个版本(与第一个版本有 1 行不同),但未来会永远保持 'pending'。

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    with dask.set_options(get=client.get):
        return (
            dask.bag
            .read_text(path)
            .to_dataframe()
        )

futures = client.map(fakejob, [input_path1, input_path2])

关于如何执行此操作的任何线索?

干杯。

奇怪且有点幽默的错误消息来自试图在工作进程中构建 dask 图(这是一个包),如果用 client.map 调用,这就是事情结束的地方。如果您可以将整个工作流程(包括写入镶木地板)放在函数中,并且不尝试将包返回给调用者,那么您的第二次尝试将与本地客户端一起工作。

解决方案更简单。

bags = [dask.bag.read_text(path)
        .to_dataframe() for path in [input_path1, input_path2])
futures = client.compute(bags)   # run in background on the cluster
client.gather(futures)   # wait and get results

这里,bags 是一个 dask-bags 列表,即已定义但尚未 运行 的工作任务。您可以将最后两行替换为 dask.compute(*bags) 以获得结果,而不必担心未来。