Dask:如何 return client.submit 中的期货元组

Dask: How to return a tuple of futures in client.submit

我需要 return 来自必须在主进程中解包的任务的元组,因为元组的每个元素将转到不同的 dask 任务。我想避免不必要的沟通,所以我认为元组元素应该是 Futures。

我想到的最好的方法是只将数据分散给同一个工人来获得未来。有更好的方法吗?

import numpy as np
import time
from dask.distributed import Client
from dask.distributed import get_client, get_worker

def other_task(data):
    return len(data)

def costly_task():
    start = time.time()
    client = get_client()
    data = np.arange(100000000)
    metadata = len(data)
    print('worker time', time.time() - start)
    start = time.time()
    future_data, future_metadata = client.scatter(data, workers=[get_worker().id]), client.scatter(metadata, workers=[get_worker().id])
    print('scatter time', time.time() - start)
    return future_data, future_metadata

if __name__ == '__main__':
    client = Client(processes=True)
    start = time.time()
    future_data, future_metadata = client.submit(costly_task).result()
    metadata = future_metadata.result()
    print('costly task time', time.time() - start)
    other_metadata = client.submit(other_task, future_data).result()
    print('total time', time.time() - start)

我使用上面的脚本得到的时间是:

worker time 0.12443423271179199
scatter time 0.7880995273590088
costly task time 0.923513650894165
total time 0.9366424083709717

可以通过 delayed:

from dask.distributed import Client
from dask import delayed

@delayed(nout=3)
def costly_task():
    return 1, 2, 3

client = Client(processes=True)

a, b, c = costly_task() # these are delayed
future_a, future_b, future_c = client.compute([a, b, c]) # split results

有关详细信息,请参阅