我如何使用 dask 分布式?
How do I use dask distributed?
我正在尝试通过查看代码示例和文档来使用 Dask,但无法理解它的工作原理。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。
我尝试的第一个简单的事情是:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client(n_workers=2)
print("hello world")
hello world
打印了三次,我想是因为工人吧。我假设除非调用计算,否则工作人员不会启动。
我可以将我的打印语句移动到一个函数中:
if __name__ == '__main__':
client = Client(n_workers=2)
def print_func():
print("hello world")
但是,如何确保只有一个 worker 执行此功能? (在 MPI 中,我可以通过使用 rank == 0
来做到这一点;我没有找到任何类似于 MPI_Comm_rank()
的东西可以告诉我 Dask 中的工人编号或 ID)。
我走得更远,开始使用 Dask 中提供的示例:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client()
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())
但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase
。我假设 dask.bag
会自动拆分计算工作。
对于冗长的 post,我深表歉意,但我无法理解 Dask(我已经习惯了 MPI 和 OpenMP 编程)。
But, how do I make sure that only one worker executes this function? (In MPI, I can do this by using rank == 0
; I did not find anything similar to MPI_Comm_rank()
which can tell me the worker number or id in Dask).
这实际上是 if __name__ == '__main__'
块正在检查的内容。当您的脚本直接 运行 时,该条件为真;当它被工人作为模块导入时,它是 not true。你放在这个块之外的任何代码在启动时都会被每个工作人员运行;这应该仅限于功能定义和必要的全局设置。所有实际 有效的代码 都需要在 if __name__ == '__main__'
块中,或者只在该块内部调用的函数内部。
我正在尝试通过查看代码示例和文档来使用 Dask,但无法理解它的工作原理。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。
我尝试的第一个简单的事情是:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client(n_workers=2)
print("hello world")
hello world
打印了三次,我想是因为工人吧。我假设除非调用计算,否则工作人员不会启动。
我可以将我的打印语句移动到一个函数中:
if __name__ == '__main__':
client = Client(n_workers=2)
def print_func():
print("hello world")
但是,如何确保只有一个 worker 执行此功能? (在 MPI 中,我可以通过使用 rank == 0
来做到这一点;我没有找到任何类似于 MPI_Comm_rank()
的东西可以告诉我 Dask 中的工人编号或 ID)。
我走得更远,开始使用 Dask 中提供的示例:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client()
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())
但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase
。我假设 dask.bag
会自动拆分计算工作。
对于冗长的 post,我深表歉意,但我无法理解 Dask(我已经习惯了 MPI 和 OpenMP 编程)。
But, how do I make sure that only one worker executes this function? (In MPI, I can do this by using
rank == 0
; I did not find anything similar toMPI_Comm_rank()
which can tell me the worker number or id in Dask).
这实际上是 if __name__ == '__main__'
块正在检查的内容。当您的脚本直接 运行 时,该条件为真;当它被工人作为模块导入时,它是 not true。你放在这个块之外的任何代码在启动时都会被每个工作人员运行;这应该仅限于功能定义和必要的全局设置。所有实际 有效的代码 都需要在 if __name__ == '__main__'
块中,或者只在该块内部调用的函数内部。