我如何使用 dask 分布式?

How do I use dask distributed?

我正在尝试通过查看代码示例和文档来使用 Dask,但无法理解它的工作原理。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。

我尝试的第一个简单的事情是:

from dask.distributed import Client
import dask.bag as db

if __name__ == '__main__':
    client = Client(n_workers=2)

print("hello world")

hello world打印了三次,我想是因为工人吧。我假设除非调用计算,否则工作人员不会启动。 我可以将我的打印语句移动到一个函数中:

if __name__ == '__main__':
    client = Client(n_workers=2)

def print_func():
    print("hello world")

但是,如何确保只有一个 worker 执行此功能? (在 MPI 中,我可以通过使用 rank == 0 来做到这一点;我没有找到任何类似于 MPI_Comm_rank() 的东西可以告诉我 Dask 中的工人编号或 ID)。

我走得更远,开始使用 Dask 中提供的示例:

from dask.distributed import Client
import dask.bag as db

if __name__ == '__main__':
    client = Client()

def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())

但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase。我假设 dask.bag 会自动拆分计算工作。 对于冗长的 post,我深表歉意,但我无法理解 Dask(我已经习惯了 MPI 和 OpenMP 编程)。

But, how do I make sure that only one worker executes this function? (In MPI, I can do this by using rank == 0; I did not find anything similar to MPI_Comm_rank() which can tell me the worker number or id in Dask).

这实际上是 if __name__ == '__main__' 块正在检查的内容。当您的脚本直接 运行 时,该条件为真;当它被工人作为模块导入时,它是 not true。你放在这个块之外的任何代码在启动时都会被每个工作人员运行;这应该仅限于功能定义和必要的全局设置。所有实际 有效的代码 都需要在 if __name__ == '__main__' 块中,或者只在该块内部调用的函数内部。