Assign a delayed object to a dask array TypeError: Delayed objects of unspecified length have no len()
Assign a delayed object to a dask array TypeError: Delayed objects of unspecified length have no len()
我有以下设置:一个函数 returning 一个数组和一个 Dask 数组。
我想在 for 循环中调用函数并用函数的 return 填充一个 dask 数组。这应该并行完成。
import dask
import numpy as np
def some_function(params):
# do calculations and returns an array X
... # calculations
return some_array
我想以这种方式并行填充 Dask 数组:
(下面的代码将不起作用,因为输出是延迟对象)
if __name__ == '__main__' :
client = Client(n_workers=4)
N = 20_000
# (20,2) is the shape of the returned array by some_function
X = dask.da.zeros(shape=(N, 20, 2), chunks=(1, 20, 2))
# List of parameters taken by some_function
l = [ np.random.random(size=3) for i in range(N)]
for i, param in enumerate(l):
output = dask.delayed(some_function)(param)
X[i] = output
我想要的是能够并行进行计算和存储。
感谢您的帮助。
您似乎想要 dask.array.from_delayed
,您可以稍后在需要时 .compute
结果。
import numpy as np
import dask
import dask.array as da
from dask.distributed import Client
@dask.delayed
def some_function(param):
return np.random.rand(20, 2)
if __name__ == "__main__":
client = Client(n_workers=2)
N = 10
X = da.zeros(shape=(N, 20, 2), chunks=(1, 20, 2))
l = [np.random.random(size=3) for i in range(N)]
for i, param in enumerate(l):
output = some_function(param)
X[i] = da.from_delayed(output, shape=(20, 2), dtype=np.float64)
输出
print(X[0].compute())
[[0.3521712 0.6159578 ]
[0.67023109 0.13890086]
[0.71952075 0.3986291 ]
[0.76702816 0.84669244]
[0.82703851 0.72321066]
[0.92060717 0.77926133]
[0.27857667 0.2510426 ]
[0.85014582 0.34709649]
[0.46328749 0.44324011]
[0.84134094 0.28890227]
[0.33616886 0.09771338]
[0.35734385 0.0832578 ]
[0.04038898 0.41059205]
[0.01776568 0.31226509]
[0.03036941 0.70490505]
[0.78646762 0.33381309]
[0.02535621 0.5715431 ]
[0.16349511 0.37746425]
[0.11798384 0.87281911]
[0.26136318 0.59016981]]
我有以下设置:一个函数 returning 一个数组和一个 Dask 数组。
我想在 for 循环中调用函数并用函数的 return 填充一个 dask 数组。这应该并行完成。
import dask
import numpy as np
def some_function(params):
# do calculations and returns an array X
... # calculations
return some_array
我想以这种方式并行填充 Dask 数组: (下面的代码将不起作用,因为输出是延迟对象)
if __name__ == '__main__' :
client = Client(n_workers=4)
N = 20_000
# (20,2) is the shape of the returned array by some_function
X = dask.da.zeros(shape=(N, 20, 2), chunks=(1, 20, 2))
# List of parameters taken by some_function
l = [ np.random.random(size=3) for i in range(N)]
for i, param in enumerate(l):
output = dask.delayed(some_function)(param)
X[i] = output
我想要的是能够并行进行计算和存储。
感谢您的帮助。
您似乎想要 dask.array.from_delayed
,您可以稍后在需要时 .compute
结果。
import numpy as np
import dask
import dask.array as da
from dask.distributed import Client
@dask.delayed
def some_function(param):
return np.random.rand(20, 2)
if __name__ == "__main__":
client = Client(n_workers=2)
N = 10
X = da.zeros(shape=(N, 20, 2), chunks=(1, 20, 2))
l = [np.random.random(size=3) for i in range(N)]
for i, param in enumerate(l):
output = some_function(param)
X[i] = da.from_delayed(output, shape=(20, 2), dtype=np.float64)
输出
print(X[0].compute())
[[0.3521712 0.6159578 ]
[0.67023109 0.13890086]
[0.71952075 0.3986291 ]
[0.76702816 0.84669244]
[0.82703851 0.72321066]
[0.92060717 0.77926133]
[0.27857667 0.2510426 ]
[0.85014582 0.34709649]
[0.46328749 0.44324011]
[0.84134094 0.28890227]
[0.33616886 0.09771338]
[0.35734385 0.0832578 ]
[0.04038898 0.41059205]
[0.01776568 0.31226509]
[0.03036941 0.70490505]
[0.78646762 0.33381309]
[0.02535621 0.5715431 ]
[0.16349511 0.37746425]
[0.11798384 0.87281911]
[0.26136318 0.59016981]]