使用 dask 的并行化不佳
Poor parallelization using dask
我有一个二维网格,上面有一条路径。我想计算网格的每个点到路径上的每个点的距离,然后对这些网格进行一些操作。我正在使用 dask.dataframe 和 dask.array 来完成这项任务。
密码是:
import dask.dataframe as dd
import dask.array as da
x = np.linspace(-60, 60, 10000)
xv, yv = da.meshgrid(x, x, sparse='True')
path = da.from_array(np.random.rand(100, 2))
h = 100.0
# function to calculate distance to point
def dist_to_point(x, y, p):
x_dist = x-p[0]
y_dist = y-p[1]
dist = da.sqrt(x_dist**2+y_dist**2)
d2 = da.sqrt(dist**2 + h**2)
return dd.from_dask_array(d2)
distances = [dist_to_point(xv, yv, path[i, :]) for i in range(npath)]
distances_grid = dd.multi.concat(distances, axis=1, ignore_index=True)
所以 distances_grid 应该串联 [网格到点 1 的距离,网格到点 2 的距离,...,网格到点 100 的距离]
现在假设我想在我应用的所有数据帧中获得最大值
l_max = distances_grid.map_partitions(lambda x: x.groupby(level=0, axis=1).max())
这个的 dask 图看起来像这样,在我看来这不像是任务的正确并行化。谁能帮我指出我做错了什么或我该如何改进?我的最终应用程序将在 100000x100000 网格上因此使用 dask
因此,为了防止有人遇到这个问题,我通过广播数组和避免 for 循环一起解决了这个问题。我最终使用的代码是
x = da.from_array(np.linspace(-60, 60, 10000), chunks=1000)
xv, yv = da.meshgrid(x, x, sparse='True')
path = da.from_array(np.random.rand(10, 2))
h = 100.0
ngrid = x.shape[0]
xd = x[:, np.newaxis] - path[:, 0]
yd = x[:, np.newaxis] - path[:, 1]
z = xd**2 + yd[:, np.newaxis]**2 + h**2
# euclidian distance at height = 100
z = xd**2 + yd[:, np.newaxis]**2 + h**2
distances_grid = z**0.5
l_max = distances_grid.max(axis=2)
这给了我一个更好的图表,我可以通过改变块的大小来平衡更多。
我有一个二维网格,上面有一条路径。我想计算网格的每个点到路径上的每个点的距离,然后对这些网格进行一些操作。我正在使用 dask.dataframe 和 dask.array 来完成这项任务。
密码是:
import dask.dataframe as dd
import dask.array as da
x = np.linspace(-60, 60, 10000)
xv, yv = da.meshgrid(x, x, sparse='True')
path = da.from_array(np.random.rand(100, 2))
h = 100.0
# function to calculate distance to point
def dist_to_point(x, y, p):
x_dist = x-p[0]
y_dist = y-p[1]
dist = da.sqrt(x_dist**2+y_dist**2)
d2 = da.sqrt(dist**2 + h**2)
return dd.from_dask_array(d2)
distances = [dist_to_point(xv, yv, path[i, :]) for i in range(npath)]
distances_grid = dd.multi.concat(distances, axis=1, ignore_index=True)
所以 distances_grid 应该串联 [网格到点 1 的距离,网格到点 2 的距离,...,网格到点 100 的距离]
现在假设我想在我应用的所有数据帧中获得最大值
l_max = distances_grid.map_partitions(lambda x: x.groupby(level=0, axis=1).max())
这个的 dask 图看起来像这样,在我看来这不像是任务的正确并行化。谁能帮我指出我做错了什么或我该如何改进?我的最终应用程序将在 100000x100000 网格上因此使用 dask
因此,为了防止有人遇到这个问题,我通过广播数组和避免 for 循环一起解决了这个问题。我最终使用的代码是
x = da.from_array(np.linspace(-60, 60, 10000), chunks=1000)
xv, yv = da.meshgrid(x, x, sparse='True')
path = da.from_array(np.random.rand(10, 2))
h = 100.0
ngrid = x.shape[0]
xd = x[:, np.newaxis] - path[:, 0]
yd = x[:, np.newaxis] - path[:, 1]
z = xd**2 + yd[:, np.newaxis]**2 + h**2
# euclidian distance at height = 100
z = xd**2 + yd[:, np.newaxis]**2 + h**2
distances_grid = z**0.5
l_max = distances_grid.max(axis=2)
这给了我一个更好的图表,我可以通过改变块的大小来平衡更多。