带期货的 Dask 计算子图
Dask compute subgraph with futures
我想提交一个将执行以下操作的 dask 任务:
- 使用 dask.bag (
def fakejob
) 构建惰性 dask 图
- 从 1. 计算图形并将其保存到 parquet(省略这部分,只是一个动机)
我需要为多个输入执行此操作,所以我一直在尝试像这样使用 dask.distributed 的期货功能。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
问题是我不断收到:AssertionError: daemonic processes are not allowed to have children
我试过 this link 并最终得到了第二个版本(与第一个版本有 1 行不同),但未来会永远保持 'pending'。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
with dask.set_options(get=client.get):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
关于如何执行此操作的任何线索?
干杯。
奇怪且有点幽默的错误消息来自试图在工作进程中构建 dask 图(这是一个包),如果用 client.map 调用,这就是事情结束的地方。如果您可以将整个工作流程(包括写入镶木地板)放在函数中,并且不尝试将包返回给调用者,那么您的第二次尝试将与本地客户端一起工作。
解决方案更简单。
bags = [dask.bag.read_text(path)
.to_dataframe() for path in [input_path1, input_path2])
futures = client.compute(bags) # run in background on the cluster
client.gather(futures) # wait and get results
这里,bags
是一个 dask-bags 列表,即已定义但尚未 运行 的工作任务。您可以将最后两行替换为 dask.compute(*bags)
以获得结果,而不必担心未来。
我想提交一个将执行以下操作的 dask 任务:
- 使用 dask.bag (
def fakejob
) 构建惰性 dask 图
- 从 1. 计算图形并将其保存到 parquet(省略这部分,只是一个动机)
我需要为多个输入执行此操作,所以我一直在尝试像这样使用 dask.distributed 的期货功能。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
问题是我不断收到:AssertionError: daemonic processes are not allowed to have children
我试过 this link 并最终得到了第二个版本(与第一个版本有 1 行不同),但未来会永远保持 'pending'。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
with dask.set_options(get=client.get):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
关于如何执行此操作的任何线索?
干杯。
奇怪且有点幽默的错误消息来自试图在工作进程中构建 dask 图(这是一个包),如果用 client.map 调用,这就是事情结束的地方。如果您可以将整个工作流程(包括写入镶木地板)放在函数中,并且不尝试将包返回给调用者,那么您的第二次尝试将与本地客户端一起工作。
解决方案更简单。
bags = [dask.bag.read_text(path)
.to_dataframe() for path in [input_path1, input_path2])
futures = client.compute(bags) # run in background on the cluster
client.gather(futures) # wait and get results
这里,bags
是一个 dask-bags 列表,即已定义但尚未 运行 的工作任务。您可以将最后两行替换为 dask.compute(*bags)
以获得结果,而不必担心未来。