将 dask delayed 与函数返回列表一起使用
Using dask delayed with functions returning lists
我正在尝试使用 dask.delayed 构建任务图。这大部分工作得很好,但我经常 运行 遇到这样的情况,在这种情况下,我有许多延迟对象,这些对象有一个方法返回一个对象列表,该列表的长度不容易根据我在这点:
items = get_collection() # known length
def do_work(item):
# get_list_of_things returns list of "unknown" length
return map(lambda x: x.DoStuff(), item.get_list_of_things())
results = [delayed(do_work(x)) for x in items]
这给出了
TypeError: Delayed objects of unspecified length are not iterable
dask 中有什么方法可以解决这个问题,最好是不必在中间结果上调用 .compute(),因为这会破坏拥有任务图的大部分优势?它基本上意味着该图在其某些步骤具有 运行 之后才能完全解析,但唯一可变的是平行部分的宽度,它不会改变结构或深度图.
不幸的是,如果您想对列表中的每个元素调用单独的函数,那么 是 图形结构的一部分,并且必须在图形构造时知道时间如果你想使用 dask.delayed.
一般来说我看到两个选项:
不要为列表中的每个元素分别制定任务,而是为前 10%、第二个 10% 等制定任务。这与dask.bag,它还处理具有未知数量元素的并行性(这可能值得考虑。
切换到实时concurrent.futures界面,等待列表结果再提交更多工作
from dask.distributed import Client
client = Client()
list_future = client.submit(do_work, *args)
len_future = client.submit(len, list_future)
n = len_future.result() # wait until the length is computed
futures = [client.submit(operator.getitem, list_future, i) for i in range(n)]
... do more stuff with futures
我正在尝试使用 dask.delayed 构建任务图。这大部分工作得很好,但我经常 运行 遇到这样的情况,在这种情况下,我有许多延迟对象,这些对象有一个方法返回一个对象列表,该列表的长度不容易根据我在这点:
items = get_collection() # known length
def do_work(item):
# get_list_of_things returns list of "unknown" length
return map(lambda x: x.DoStuff(), item.get_list_of_things())
results = [delayed(do_work(x)) for x in items]
这给出了
TypeError: Delayed objects of unspecified length are not iterable
dask 中有什么方法可以解决这个问题,最好是不必在中间结果上调用 .compute(),因为这会破坏拥有任务图的大部分优势?它基本上意味着该图在其某些步骤具有 运行 之后才能完全解析,但唯一可变的是平行部分的宽度,它不会改变结构或深度图.
不幸的是,如果您想对列表中的每个元素调用单独的函数,那么 是 图形结构的一部分,并且必须在图形构造时知道时间如果你想使用 dask.delayed.
一般来说我看到两个选项:
不要为列表中的每个元素分别制定任务,而是为前 10%、第二个 10% 等制定任务。这与dask.bag,它还处理具有未知数量元素的并行性(这可能值得考虑。
切换到实时concurrent.futures界面,等待列表结果再提交更多工作
from dask.distributed import Client client = Client() list_future = client.submit(do_work, *args) len_future = client.submit(len, list_future) n = len_future.result() # wait until the length is computed futures = [client.submit(operator.getitem, list_future, i) for i in range(n)] ... do more stuff with futures