如何区分排队和 运行 期货(以及杀死 运行 太久的期货)
How to differentiate between queued and running futures (and kill futures that have been running for too long)
在 dask.distributed 上使用期货时,是否有办法区分当前正在评估的 pending
期货和仍在队列中的期货?
原因是我正在将大量任务 (~8000) 提交给较小的一组工作人员 (100),因此并非所有任务都可以立即处理。这些任务涉及调用第三方可执行文件(通过 subprocess.check_output
),在极少数情况下会进入无限循环。
因此,我想取消期货 运行 太久(使用任意超时)。然而,似乎没有办法判断 future 是否长时间处于 pending
状态,因为计算时间比平时长,或者仅仅因为它必须等待 worker 可用。
我的设置分别涉及一个 SGE 集群 运行、dask-scheduler
和 dask-worker
job/job-array。
我尝试直接在提交的 Python 函数中设置超时,使用 timeout_decorator package 中的 @timeout_decorator.timeout(60, use_signals=False)
,但出现以下错误:
"daemonic processes are not allowed to have children"
如有任何帮助,我们将不胜感激。
不,您无法确定任务是否已开始执行。通常我们建议将此逻辑放在任务本身中,就像您尝试使用超时装饰器一样。
我建议尝试将 timeout=
关键字改为 subprocess.check_outputs
本身。我怀疑这会更简单并且有更高的机会顺利工作。
对于用户 运行 Python 2,timeout=
关键字在 subprocess.check_output
中不可用。
我可以通过使用 subprocess.Popen
来获得预期的效果,returns 立即:
import subprocess
import shlex # useful to split up arguments for subprocess
import time
p = subprocess.Popen(shlex.split('/path/to/binary arg1 arg2'),
stderr=subprocess.STDOUT)
for _ in range(60): # wait for up to 60 seconds
if p.poll() is not None:
break # process completed
else:
time.sleep(1.0) # give it more time
if p.poll() is None: # time is up, are we done?
try:
p.kill()
except:
raise
raise RuntimeError('Binary failed to complete in time.')
在 dask.distributed 上使用期货时,是否有办法区分当前正在评估的 pending
期货和仍在队列中的期货?
原因是我正在将大量任务 (~8000) 提交给较小的一组工作人员 (100),因此并非所有任务都可以立即处理。这些任务涉及调用第三方可执行文件(通过 subprocess.check_output
),在极少数情况下会进入无限循环。
因此,我想取消期货 运行 太久(使用任意超时)。然而,似乎没有办法判断 future 是否长时间处于 pending
状态,因为计算时间比平时长,或者仅仅因为它必须等待 worker 可用。
我的设置分别涉及一个 SGE 集群 运行、dask-scheduler
和 dask-worker
job/job-array。
我尝试直接在提交的 Python 函数中设置超时,使用 timeout_decorator package 中的 @timeout_decorator.timeout(60, use_signals=False)
,但出现以下错误:
"daemonic processes are not allowed to have children"
如有任何帮助,我们将不胜感激。
不,您无法确定任务是否已开始执行。通常我们建议将此逻辑放在任务本身中,就像您尝试使用超时装饰器一样。
我建议尝试将 timeout=
关键字改为 subprocess.check_outputs
本身。我怀疑这会更简单并且有更高的机会顺利工作。
对于用户 运行 Python 2,timeout=
关键字在 subprocess.check_output
中不可用。
我可以通过使用 subprocess.Popen
来获得预期的效果,returns 立即:
import subprocess
import shlex # useful to split up arguments for subprocess
import time
p = subprocess.Popen(shlex.split('/path/to/binary arg1 arg2'),
stderr=subprocess.STDOUT)
for _ in range(60): # wait for up to 60 seconds
if p.poll() is not None:
break # process completed
else:
time.sleep(1.0) # give it more time
if p.poll() is None: # time is up, are we done?
try:
p.kill()
except:
raise
raise RuntimeError('Binary failed to complete in time.')