带有未传递参数的 Dask 延迟函数调用
Dask delayed function call with non-passed parameters
我试图更好地理解使用 dask.delayed
调用依赖于参数的函数时的以下行为。当在 configparser 读取的参数文件中指定参数时,似乎会出现此问题。这是一个完整的例子:
参数文件:
#zpar.ini: parameter file for configparser
[my pars]
my_zpar = 2.
解析器:
#zippy_parser
import configparser
def read(_rundir):
global rundir
rundir = _rundir
cp = configparser.ConfigParser()
cp.read(rundir + '/zpar.ini')
#[my pars]
global my_zpar
my_zpar = cp['my pars'].getfloat('my_zpar')
和主要 python 文件:
# dask test with configparser
import dask
from dask.distributed import Client
import zippy_parser as zpar
def my_func(x, y):
# print stuff
print("parameter from main is: {}".format(main_par))
print("parameter from configparser is: {}".format(zpar.my_zpar))
# do stuff
return x + y
if __name__ == '__main__':
client = Client(n_workers = 4)
#read parameters from input file
rundir = '/path/to/parameter/file'
zpar.read(rundir)
#test zpar
print("zpar is {}".format(zpar.my_zpar))
#define parameter and call my_func
main_par = 5.
z = dask.delayed(my_func)(1., 2.)
z.compute()
client.close()
my_func() 中的第一个打印语句执行得很好,但第二个打印语句引发异常。输出是:
zpar is 2.0
parameter from main is: 5.0
distributed.worker - WARNING - Compute Failed
Function: my_func
args: (1.0, 2.0)
kwargs: {}
Exception: AttributeError("module 'zippy_parser' has no attribute 'my_zpar'",)
我是dask的新手。我想这与我不明白的序列化有关。谁能赐教and/or 指点一下相关文档?谢谢!
我会尽量保持简短。
当函数被序列化以发送给 worker 时,python 也会发送函数(其 "closure")所需的局部变量和函数。但是,它按名称存储它引用的模块,它不会尝试序列化整个运行时。
这意味着 zippy_parser
在 worker 中是 imported,而不是反序列化。由于函数 read
从未被调用过
在 worker 中,global
变量从未被初始化。
因此,您可以在 worker 中调用 read
作为函数的一部分或以其他方式调用,但是使用函数的模式或设置模块全局变量可能不是很好。 Dask 的延迟机制更喜欢功能纯度,你得到的结果不应该依赖于运行时的当前状态。
(请注意,如果您在主脚本中调用 read
后创建了客户端,则工作人员 可能 已获得内存版本,具体取决于如何子流程配置为在您的系统上创建)
我鼓励您显式地将所有参数传递给 dask 延迟函数,而不是依赖全局命名空间。
我试图更好地理解使用 dask.delayed
调用依赖于参数的函数时的以下行为。当在 configparser 读取的参数文件中指定参数时,似乎会出现此问题。这是一个完整的例子:
参数文件:
#zpar.ini: parameter file for configparser
[my pars]
my_zpar = 2.
解析器:
#zippy_parser
import configparser
def read(_rundir):
global rundir
rundir = _rundir
cp = configparser.ConfigParser()
cp.read(rundir + '/zpar.ini')
#[my pars]
global my_zpar
my_zpar = cp['my pars'].getfloat('my_zpar')
和主要 python 文件:
# dask test with configparser
import dask
from dask.distributed import Client
import zippy_parser as zpar
def my_func(x, y):
# print stuff
print("parameter from main is: {}".format(main_par))
print("parameter from configparser is: {}".format(zpar.my_zpar))
# do stuff
return x + y
if __name__ == '__main__':
client = Client(n_workers = 4)
#read parameters from input file
rundir = '/path/to/parameter/file'
zpar.read(rundir)
#test zpar
print("zpar is {}".format(zpar.my_zpar))
#define parameter and call my_func
main_par = 5.
z = dask.delayed(my_func)(1., 2.)
z.compute()
client.close()
my_func() 中的第一个打印语句执行得很好,但第二个打印语句引发异常。输出是:
zpar is 2.0 parameter from main is: 5.0 distributed.worker - WARNING - Compute Failed Function: my_func args: (1.0, 2.0) kwargs: {} Exception: AttributeError("module 'zippy_parser' has no attribute 'my_zpar'",)
我是dask的新手。我想这与我不明白的序列化有关。谁能赐教and/or 指点一下相关文档?谢谢!
我会尽量保持简短。
当函数被序列化以发送给 worker 时,python 也会发送函数(其 "closure")所需的局部变量和函数。但是,它按名称存储它引用的模块,它不会尝试序列化整个运行时。
这意味着 zippy_parser
在 worker 中是 imported,而不是反序列化。由于函数 read
从未被调用过
在 worker 中,global
变量从未被初始化。
因此,您可以在 worker 中调用 read
作为函数的一部分或以其他方式调用,但是使用函数的模式或设置模块全局变量可能不是很好。 Dask 的延迟机制更喜欢功能纯度,你得到的结果不应该依赖于运行时的当前状态。
(请注意,如果您在主脚本中调用 read
后创建了客户端,则工作人员 可能 已获得内存版本,具体取决于如何子流程配置为在您的系统上创建)
我鼓励您显式地将所有参数传递给 dask 延迟函数,而不是依赖全局命名空间。