Dask:如何 return client.submit 中的期货元组
Dask: How to return a tuple of futures in client.submit
我需要 return 来自必须在主进程中解包的任务的元组,因为元组的每个元素将转到不同的 dask 任务。我想避免不必要的沟通,所以我认为元组元素应该是 Futures。
我想到的最好的方法是只将数据分散给同一个工人来获得未来。有更好的方法吗?
import numpy as np
import time
from dask.distributed import Client
from dask.distributed import get_client, get_worker
def other_task(data):
return len(data)
def costly_task():
start = time.time()
client = get_client()
data = np.arange(100000000)
metadata = len(data)
print('worker time', time.time() - start)
start = time.time()
future_data, future_metadata = client.scatter(data, workers=[get_worker().id]), client.scatter(metadata, workers=[get_worker().id])
print('scatter time', time.time() - start)
return future_data, future_metadata
if __name__ == '__main__':
client = Client(processes=True)
start = time.time()
future_data, future_metadata = client.submit(costly_task).result()
metadata = future_metadata.result()
print('costly task time', time.time() - start)
other_metadata = client.submit(other_task, future_data).result()
print('total time', time.time() - start)
我使用上面的脚本得到的时间是:
worker time 0.12443423271179199
scatter time 0.7880995273590088
costly task time 0.923513650894165
total time 0.9366424083709717
可以通过 delayed
:
from dask.distributed import Client
from dask import delayed
@delayed(nout=3)
def costly_task():
return 1, 2, 3
client = Client(processes=True)
a, b, c = costly_task() # these are delayed
future_a, future_b, future_c = client.compute([a, b, c]) # split results
有关详细信息,请参阅 。
我需要 return 来自必须在主进程中解包的任务的元组,因为元组的每个元素将转到不同的 dask 任务。我想避免不必要的沟通,所以我认为元组元素应该是 Futures。
我想到的最好的方法是只将数据分散给同一个工人来获得未来。有更好的方法吗?
import numpy as np
import time
from dask.distributed import Client
from dask.distributed import get_client, get_worker
def other_task(data):
return len(data)
def costly_task():
start = time.time()
client = get_client()
data = np.arange(100000000)
metadata = len(data)
print('worker time', time.time() - start)
start = time.time()
future_data, future_metadata = client.scatter(data, workers=[get_worker().id]), client.scatter(metadata, workers=[get_worker().id])
print('scatter time', time.time() - start)
return future_data, future_metadata
if __name__ == '__main__':
client = Client(processes=True)
start = time.time()
future_data, future_metadata = client.submit(costly_task).result()
metadata = future_metadata.result()
print('costly task time', time.time() - start)
other_metadata = client.submit(other_task, future_data).result()
print('total time', time.time() - start)
我使用上面的脚本得到的时间是:
worker time 0.12443423271179199
scatter time 0.7880995273590088
costly task time 0.923513650894165
total time 0.9366424083709717
可以通过 delayed
:
from dask.distributed import Client
from dask import delayed
@delayed(nout=3)
def costly_task():
return 1, 2, 3
client = Client(processes=True)
a, b, c = costly_task() # these are delayed
future_a, future_b, future_c = client.compute([a, b, c]) # split results
有关详细信息,请参阅