Dask:为什么我的脚本在提交给 dask 客户端时表现不同?
Dask : why is my script behaving differently when submitted to a dask client?
我使用 dask 编写了一个简短的 python 脚本来编排工作流程。
它所做的是:启动第一批工作,不超过可用工人的数量。然后,使用 as_completed() 我监视期货的状态,一旦一项工作完成,就会向序列提交一个新的。
import random
import time
from dask.distributed import Client, as_completed, get_client
def inc(x):
time.sleep(4 + random.random())
return x + 1
def orchestrate(n):
job_count = 0
nb_workers = 2
client = get_client()
futures = []
print(len(n))
# Do not launch or put into graph more tasks than available workers
for _ in range(nb_workers):
futures.append(client.submit(inc, n[_]))
job_count += 1
print("Total number of jobs: ",job_count)
# Use a sequence to check futures as they are completed
sequence = as_completed(futures)
print("Remaining futures", sequence.count())
for seq in sequence:
if job_count < len(n):
new_futures = client.submit(inc, n[job_count])
print(sequence)
sequence.add(new_futures)
job_count += 1
print("Number of jobs submitted: ", job_count)
print("Remaining futures: ", sequence.count())
elif sequence.count() == 0:
break
else:
print("Remaining futures:", sequence.count())
return None
但是我注意到了一个奇怪的行为。
如果代码是 运行 如下所示,那么它似乎按预期工作。好几个作业提交给客户端,cell在jupyter notebook中执行。
x = [i for i in range(4)]
orchestrate(x) #This works fine, 4 jobs are submitted.
但是,我担心这样做会导致 Jupyter notebook 挂起,直到单元格执行完毕。所以我想改为使用 submit 来执行该方法,这样就不会阻塞笔记本(这样我就可以退出 jupyterlab 并且执行不会停止),如下所示:
futs = client.submit(orchestrate, x)
然而,这并没有像预期的那样工作。期货的状态始终保持为“待定”:提交并执行除 1 以外的所有作业,最后一个永远不会 运行.
请注意,我不关心检索执行方法的结果(在我的实际情况中,结果是将文件写入磁盘)。
如何解释这种行为?
“编排”的方法还是一样,只是提交给客户端...
我做错了什么吗?为了不锁定笔记本的执行,我应该采取不同的方式吗?
将您的代码复制粘贴并 运行 到我的机器上后,我看到了脚本的完整执行。我是运行最新版dask(2021.03.0)和分布式(2021.03.0)。
在对实际数据执行时,使用 get_client()
存在潜在问题,根据 docs:
However, this can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.
为了避免它,文档建议使用 secede
/rejoin
或 worker_client
上下文管理器。
我使用 dask 编写了一个简短的 python 脚本来编排工作流程。 它所做的是:启动第一批工作,不超过可用工人的数量。然后,使用 as_completed() 我监视期货的状态,一旦一项工作完成,就会向序列提交一个新的。
import random
import time
from dask.distributed import Client, as_completed, get_client
def inc(x):
time.sleep(4 + random.random())
return x + 1
def orchestrate(n):
job_count = 0
nb_workers = 2
client = get_client()
futures = []
print(len(n))
# Do not launch or put into graph more tasks than available workers
for _ in range(nb_workers):
futures.append(client.submit(inc, n[_]))
job_count += 1
print("Total number of jobs: ",job_count)
# Use a sequence to check futures as they are completed
sequence = as_completed(futures)
print("Remaining futures", sequence.count())
for seq in sequence:
if job_count < len(n):
new_futures = client.submit(inc, n[job_count])
print(sequence)
sequence.add(new_futures)
job_count += 1
print("Number of jobs submitted: ", job_count)
print("Remaining futures: ", sequence.count())
elif sequence.count() == 0:
break
else:
print("Remaining futures:", sequence.count())
return None
但是我注意到了一个奇怪的行为。 如果代码是 运行 如下所示,那么它似乎按预期工作。好几个作业提交给客户端,cell在jupyter notebook中执行。
x = [i for i in range(4)]
orchestrate(x) #This works fine, 4 jobs are submitted.
但是,我担心这样做会导致 Jupyter notebook 挂起,直到单元格执行完毕。所以我想改为使用 submit 来执行该方法,这样就不会阻塞笔记本(这样我就可以退出 jupyterlab 并且执行不会停止),如下所示:
futs = client.submit(orchestrate, x)
然而,这并没有像预期的那样工作。期货的状态始终保持为“待定”:提交并执行除 1 以外的所有作业,最后一个永远不会 运行.
请注意,我不关心检索执行方法的结果(在我的实际情况中,结果是将文件写入磁盘)。
如何解释这种行为? “编排”的方法还是一样,只是提交给客户端...
我做错了什么吗?为了不锁定笔记本的执行,我应该采取不同的方式吗?
将您的代码复制粘贴并 运行 到我的机器上后,我看到了脚本的完整执行。我是运行最新版dask(2021.03.0)和分布式(2021.03.0)。
在对实际数据执行时,使用 get_client()
存在潜在问题,根据 docs:
However, this can deadlock the scheduler if too many tasks request jobs at once. Each task does not communicate to the scheduler that they are waiting on results and are free to compute other tasks. This can deadlock the cluster if every scheduling slot is running a task and they all request more tasks.
为了避免它,文档建议使用 secede
/rejoin
或 worker_client
上下文管理器。