分布式Dask如何高效提交大参数任务?
How to efficiently submit tasks with large arguments in Dask distributed?
我想提交带有大(千兆字节)参数的 Dask 函数。做这个的最好方式是什么?我想 运行 这个函数多次使用不同的(小)参数。
例子(不好)
这使用 concurrent.futures 界面。我们可以同样轻松地使用 dask.delayed 界面。
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
但这比我预期的要慢或导致内存错误。
好的,所以这里的问题是每个任务都包含很大的 numpy 数组 x
。对于我们提交的 100 个任务中的每一个,我们都需要序列化 x
,将其发送给调度程序,将其发送给工作人员,等等。
相反,我们会将数组发送到集群一次:
[future] = c.scatter([x])
现在 future
是指向集群上数组 x
的标记。现在我们可以提交引用这个远程未来的任务,而不是我们本地客户端上的 numpy 数组。
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
现在速度更快,让 Dask 更有效地控制数据移动。
向所有工作人员分散数据
如果您希望最终需要将数组 x 移动到所有工作人员,那么您可能希望广播数组以开始
[future] = c.scatter([x], broadcast=True)
延迟使用 Dask
期货也适用于 dask.delayed。这里没有性能优势,但有些人更喜欢这个接口:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
我想提交带有大(千兆字节)参数的 Dask 函数。做这个的最好方式是什么?我想 运行 这个函数多次使用不同的(小)参数。
例子(不好)
这使用 concurrent.futures 界面。我们可以同样轻松地使用 dask.delayed 界面。
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
但这比我预期的要慢或导致内存错误。
好的,所以这里的问题是每个任务都包含很大的 numpy 数组 x
。对于我们提交的 100 个任务中的每一个,我们都需要序列化 x
,将其发送给调度程序,将其发送给工作人员,等等。
相反,我们会将数组发送到集群一次:
[future] = c.scatter([x])
现在 future
是指向集群上数组 x
的标记。现在我们可以提交引用这个远程未来的任务,而不是我们本地客户端上的 numpy 数组。
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
现在速度更快,让 Dask 更有效地控制数据移动。
向所有工作人员分散数据
如果您希望最终需要将数组 x 移动到所有工作人员,那么您可能希望广播数组以开始
[future] = c.scatter([x], broadcast=True)
延迟使用 Dask
期货也适用于 dask.delayed。这里没有性能优势,但有些人更喜欢这个接口:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)