触发一系列并行任务
firing a sequence of parallel tasks
对于这个 dask 代码:
def inc(x):
return x + 1
for x in range(5):
array[x] = delay(inc)(x)
我想通过执行延迟任务来访问 array
中的所有元素。但是我不能调用 array.compute()
因为 array
不是一个函数。如果我这样做
for x in range(5):
array[x].compute()
那么每个任务是并行执行还是 a[1]
仅在 a[0]
终止后才被触发?有没有更好的方法来编写这段代码?
如果你强迫它们花费很长时间,就很容易判断它们是否并行执行。如果您 运行 此代码:
from time import sleep, time
from dask import delayed
start = time()
def inc(x):
sleep(1)
print('[inc(%s): %s]' % (x, time() - start))
return x + 1
array = [0] * 5
for x in range(5):
array[x] = delayed(inc)(x)
for x in range(5):
array[x].compute()
很明显调用是按顺序发生的。但是,如果你用这个替换最后一个循环:
delayed(array).compute()
然后你可以看到它们是平行的。在我的机器上,输出如下所示:
[inc(1): 1.00373506546]
[inc(4): 1.00429320335]
[inc(2): 1.00471806526]
[inc(3): 1.00475406647]
[inc(0): 2.00795912743]
很明显,它执行的前四个任务是并行的。大概默认并行度设置为机器上的内核数,因为对于 CPU 密集型任务,拥有更多内核通常没有用。
您可以使用 dask.compute
函数一次计算多个延迟值
from dask import delayed, compute
array = [delayed(inc)(i) for i in range(5)]
result = compute(*array)
对于这个 dask 代码:
def inc(x):
return x + 1
for x in range(5):
array[x] = delay(inc)(x)
我想通过执行延迟任务来访问 array
中的所有元素。但是我不能调用 array.compute()
因为 array
不是一个函数。如果我这样做
for x in range(5):
array[x].compute()
那么每个任务是并行执行还是 a[1]
仅在 a[0]
终止后才被触发?有没有更好的方法来编写这段代码?
如果你强迫它们花费很长时间,就很容易判断它们是否并行执行。如果您 运行 此代码:
from time import sleep, time
from dask import delayed
start = time()
def inc(x):
sleep(1)
print('[inc(%s): %s]' % (x, time() - start))
return x + 1
array = [0] * 5
for x in range(5):
array[x] = delayed(inc)(x)
for x in range(5):
array[x].compute()
很明显调用是按顺序发生的。但是,如果你用这个替换最后一个循环:
delayed(array).compute()
然后你可以看到它们是平行的。在我的机器上,输出如下所示:
[inc(1): 1.00373506546]
[inc(4): 1.00429320335]
[inc(2): 1.00471806526]
[inc(3): 1.00475406647]
[inc(0): 2.00795912743]
很明显,它执行的前四个任务是并行的。大概默认并行度设置为机器上的内核数,因为对于 CPU 密集型任务,拥有更多内核通常没有用。
您可以使用 dask.compute
函数一次计算多个延迟值
from dask import delayed, compute
array = [delayed(inc)(i) for i in range(5)]
result = compute(*array)