为什么 dask.delayed 在使用 networkx 时比串行代码花费的时间更长?
Why dask.delayed takes longer than serial code when working with networkx?
我想通过 dask.delayed
.
使用并行计算来加速函数 my_func()
的执行
在 3 个维度的循环中,my_func()
从 iris.cube.Cube
中提取一个值(本质上是从循环外的文件加载的 dask.array
),并且取决于值,使用 networkx
创建一个随机网络,并找到从节点 0 到节点 16 的最短路径。每个数组点的计算是独立的。
- 为什么执行并行代码需要比串行代码(2.94 秒)更长的时间(5.43 秒)?
- 是否有更好的方法可以使用
dask
或 multiprocessing
或其他方法加快速度?
这是一个可重现的例子:
import random
import dask
import iris
import networkx as nx
from dask import delayed
from dask.distributed import Client
from networkx.generators.random_graphs import gnp_random_graph
# Input
fname = iris.sample_data_path("uk_hires.pp") # https://anaconda.org/conda-forge/iris-sample-data
temp_ptntl = iris.load_cube(fname, "air_potential_temperature")[-1, ...] # last time step only
# Dimensions
zs = temp_ptntl.coord("model_level_number").points
lats = temp_ptntl.coord("grid_latitude").points
lons = temp_ptntl.coord("grid_longitude").points
def my_func(iz, iy, ix):
constraint = iris.Constraint(model_level_number=iz, grid_latitude=iy, grid_longitude=ix)
temp_virt = temp_ptntl.extract(constraint) * (1 + 0.61 * 0.04)
if float(temp_virt.data) > 295:
G = nx.gnp_random_graph(30, 0.2, seed=random.randint(1, 10), directed=True)
distance, path = nx.single_source_dijkstra(G, source=0, target=16)
else:
pass
return temp_virt, distance, path
序列号:
%%time
results_serial = [] # serial code
for iz in zs:
for iy in lats[0:5]:
for ix in lons[0:2]:
results_serial.append(my_func(iz, iy, ix))
>>> CPU times: user 2.94 s, sys: 44 ms, total: 2.99 s
>>> Wall time: 2.94 s
使用dask
:
client = Client(processes=True, n_workers=4, threads_per_worker=36)
results_parallel = [] # parallel code
for iz in zs:
for iy in lats[0:5]:
for ix in lons[0:2]:
results_parallel.append(delayed(my_func)(iz, iy, ix))
%%time
computed = dask.compute(results_parallel)
>>> CPU times: user 3.56 s, sys: 344 ms, total: 3.91 s
>>> Wall time: 5.43 s
# client.close()
dask
会有一些开销,因此在小样本上表现不佳并不罕见。当我尝试通过更改为 for iy in lats[0:15]:
来增加计算次数时,我发现串行计算需要 10 秒,而 dask 在 4 秒内完成。
(还有函数的序列化,可能需要一些时间,但只适用于函数第一次发送给worker)
我想通过 dask.delayed
.
my_func()
的执行
在 3 个维度的循环中,my_func()
从 iris.cube.Cube
中提取一个值(本质上是从循环外的文件加载的 dask.array
),并且取决于值,使用 networkx
创建一个随机网络,并找到从节点 0 到节点 16 的最短路径。每个数组点的计算是独立的。
- 为什么执行并行代码需要比串行代码(2.94 秒)更长的时间(5.43 秒)?
- 是否有更好的方法可以使用
dask
或multiprocessing
或其他方法加快速度?
这是一个可重现的例子:
import random
import dask
import iris
import networkx as nx
from dask import delayed
from dask.distributed import Client
from networkx.generators.random_graphs import gnp_random_graph
# Input
fname = iris.sample_data_path("uk_hires.pp") # https://anaconda.org/conda-forge/iris-sample-data
temp_ptntl = iris.load_cube(fname, "air_potential_temperature")[-1, ...] # last time step only
# Dimensions
zs = temp_ptntl.coord("model_level_number").points
lats = temp_ptntl.coord("grid_latitude").points
lons = temp_ptntl.coord("grid_longitude").points
def my_func(iz, iy, ix):
constraint = iris.Constraint(model_level_number=iz, grid_latitude=iy, grid_longitude=ix)
temp_virt = temp_ptntl.extract(constraint) * (1 + 0.61 * 0.04)
if float(temp_virt.data) > 295:
G = nx.gnp_random_graph(30, 0.2, seed=random.randint(1, 10), directed=True)
distance, path = nx.single_source_dijkstra(G, source=0, target=16)
else:
pass
return temp_virt, distance, path
序列号:
%%time
results_serial = [] # serial code
for iz in zs:
for iy in lats[0:5]:
for ix in lons[0:2]:
results_serial.append(my_func(iz, iy, ix))
>>> CPU times: user 2.94 s, sys: 44 ms, total: 2.99 s
>>> Wall time: 2.94 s
使用dask
:
client = Client(processes=True, n_workers=4, threads_per_worker=36)
results_parallel = [] # parallel code
for iz in zs:
for iy in lats[0:5]:
for ix in lons[0:2]:
results_parallel.append(delayed(my_func)(iz, iy, ix))
%%time
computed = dask.compute(results_parallel)
>>> CPU times: user 3.56 s, sys: 344 ms, total: 3.91 s
>>> Wall time: 5.43 s
# client.close()
dask
会有一些开销,因此在小样本上表现不佳并不罕见。当我尝试通过更改为 for iy in lats[0:15]:
来增加计算次数时,我发现串行计算需要 10 秒,而 dask 在 4 秒内完成。
(还有函数的序列化,可能需要一些时间,但只适用于函数第一次发送给worker)