分布式 Julia:并行映射 (pmap),每个映射任务都有超时/时间限制以完成
Distributed Julia: parallel map (pmap) with a timeout / time limit for each map task to complete
我的项目涉及使用 Julia 的 Distributed
's pmap
函数并行计算地图。
映射给定元素可能需要几秒钟,也可能需要很长时间。我想要完成单个地图 task/computation 的超时或时间限制。
如果地图任务及时完成,很好,return 计算结果。如果任务没有在时间限制内完成,则在达到时间限制时停止计算,并且 return 一些值或消息指示超时发生。
下面是一个最小的例子。首先是导入模块,然后启动工作进程:
num_procs = 1
using Distributed
if num_procs > 1
# The main process (no calling addprocs) can be used for `pmap`:
addprocs(num_procs-1)
end
接下来,为所有工作进程定义映射任务。映射任务应在 1 秒后超时:
@everywhere import Random
@everywhere begin
"""
Compute stuff for `wait_time` seconds, and return `wait_time`.
If `timeout` seconds elapses, stop computation and return something else.
"""
function waitForTimeUnlessTimeout(wait_time, timeout=1)
# < Insert some sort of timeout code? >
# This block of code simulates a long computation.
# (pretend the computation time is unknown)
x = 0
while time()-t0 < wait_time
x += Random.rand() - 0.5
end
# computation completed before time limit. Return wait_time.
round(wait_time, digits=2)
end
end
执行并行映射(pmap
)的函数定义在主进程上。每个地图任务随机最多需要 2 秒才能完成,但应该会在 1 秒后超时。
function myParallelMapping(num_tasks = 20, max_runtime=2)
# random task runtimes between 0 and max_runtime
runtimes = Random.rand(num_tasks) * max_runtime
# return the parallel computation of the mapping tasks
pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end
print(myParallelMapping())
这个限时并行地图应该如何实现?
你可以把这样的东西放在你的 pmap
体内
pmap(runtimes) do runtime
t0 = time()
task = @async waitForTimeUnlessTimeout(runtime)
while !istaskdone(task) && time()-t0 < time_limit
sleep(1)
end
istaskdone(task) && (return fetch(task))
error("time over")
end
另请注意 (runtime)->waitForTimeUnlessTimeout(runtime)
与 waitForTimeUnlessTimeout
相同。
根据@Fredrik Bagge 的非常有用的回答,这里是完整的工作示例实现以及一些额外的解释。
num_procs = 8
using Distributed
if num_procs > 1
addprocs(num_procs-1)
end
@everywhere import Random
@everywhere begin
function waitForTime(wait_time)
# This code block simulates a long computation.
# Pretend the computation time is unknown.
t0 = time()
x = 0
while time()-t0 < wait_time
x += Random.rand() - 0.5
yield() # CRITICAL to release computation to check if task is done.
# If you comment out #yield(), you will see timeout doesn't work!
end
return round(wait_time, digits=2)
end
end
function myParallelMapping(num_tasks = 16, max_runtime=2, time_limit=1)
# random task runtimes between 0 and max_runtime
runtimes = Random.rand(num_tasks) * max_runtime
# parallel compute the mapping tasks. See "do block" in
# the Julia documentation, it's just syntactic sugar.
return pmap(runtimes) do runtime
t0 = time()
task = @async waitForTime(runtime)
while !istaskdone(task) && time()-t0 < time_limit
# releases computation to waitForTime
sleep(0.1)
# nothing past here will run until waitForTime calls yield()
# *and* 0.1 seconds have passed.
end
# equal to if istaskdone(task); return fetch(task); end
istaskdone(task) && (return fetch(task))
return "TimeOut"
# `return error("TimeOut")` halts pmap unless pmap is
# given an error handler argument. See pmap documentation.
end
end
输出为
julia> print(myParallelMapping())
Any["TimeOut", "TimeOut", 0.33, 0.35, 0.56, 0.41, 0.08, 0.14, 0.72,
"TimeOut", "TimeOut", "TimeOut", 0.52, "TimeOut", 0.33, "TimeOut"]
请注意,在此示例中每个进程有两个任务。原始任务("time checker")每 0.1 秒检查一次其他任务是否已完成计算。另一个任务(使用 @async
创建)正在计算一些东西,定期调用 yield()
将控制权交给时间检查器;如果它不调用 yield()
,则无法进行时间检查。
我的项目涉及使用 Julia 的 Distributed
's pmap
函数并行计算地图。
映射给定元素可能需要几秒钟,也可能需要很长时间。我想要完成单个地图 task/computation 的超时或时间限制。
如果地图任务及时完成,很好,return 计算结果。如果任务没有在时间限制内完成,则在达到时间限制时停止计算,并且 return 一些值或消息指示超时发生。
下面是一个最小的例子。首先是导入模块,然后启动工作进程:
num_procs = 1
using Distributed
if num_procs > 1
# The main process (no calling addprocs) can be used for `pmap`:
addprocs(num_procs-1)
end
接下来,为所有工作进程定义映射任务。映射任务应在 1 秒后超时:
@everywhere import Random
@everywhere begin
"""
Compute stuff for `wait_time` seconds, and return `wait_time`.
If `timeout` seconds elapses, stop computation and return something else.
"""
function waitForTimeUnlessTimeout(wait_time, timeout=1)
# < Insert some sort of timeout code? >
# This block of code simulates a long computation.
# (pretend the computation time is unknown)
x = 0
while time()-t0 < wait_time
x += Random.rand() - 0.5
end
# computation completed before time limit. Return wait_time.
round(wait_time, digits=2)
end
end
执行并行映射(pmap
)的函数定义在主进程上。每个地图任务随机最多需要 2 秒才能完成,但应该会在 1 秒后超时。
function myParallelMapping(num_tasks = 20, max_runtime=2)
# random task runtimes between 0 and max_runtime
runtimes = Random.rand(num_tasks) * max_runtime
# return the parallel computation of the mapping tasks
pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end
print(myParallelMapping())
这个限时并行地图应该如何实现?
你可以把这样的东西放在你的 pmap
体内
pmap(runtimes) do runtime
t0 = time()
task = @async waitForTimeUnlessTimeout(runtime)
while !istaskdone(task) && time()-t0 < time_limit
sleep(1)
end
istaskdone(task) && (return fetch(task))
error("time over")
end
另请注意 (runtime)->waitForTimeUnlessTimeout(runtime)
与 waitForTimeUnlessTimeout
相同。
根据@Fredrik Bagge 的非常有用的回答,这里是完整的工作示例实现以及一些额外的解释。
num_procs = 8
using Distributed
if num_procs > 1
addprocs(num_procs-1)
end
@everywhere import Random
@everywhere begin
function waitForTime(wait_time)
# This code block simulates a long computation.
# Pretend the computation time is unknown.
t0 = time()
x = 0
while time()-t0 < wait_time
x += Random.rand() - 0.5
yield() # CRITICAL to release computation to check if task is done.
# If you comment out #yield(), you will see timeout doesn't work!
end
return round(wait_time, digits=2)
end
end
function myParallelMapping(num_tasks = 16, max_runtime=2, time_limit=1)
# random task runtimes between 0 and max_runtime
runtimes = Random.rand(num_tasks) * max_runtime
# parallel compute the mapping tasks. See "do block" in
# the Julia documentation, it's just syntactic sugar.
return pmap(runtimes) do runtime
t0 = time()
task = @async waitForTime(runtime)
while !istaskdone(task) && time()-t0 < time_limit
# releases computation to waitForTime
sleep(0.1)
# nothing past here will run until waitForTime calls yield()
# *and* 0.1 seconds have passed.
end
# equal to if istaskdone(task); return fetch(task); end
istaskdone(task) && (return fetch(task))
return "TimeOut"
# `return error("TimeOut")` halts pmap unless pmap is
# given an error handler argument. See pmap documentation.
end
end
输出为
julia> print(myParallelMapping())
Any["TimeOut", "TimeOut", 0.33, 0.35, 0.56, 0.41, 0.08, 0.14, 0.72,
"TimeOut", "TimeOut", "TimeOut", 0.52, "TimeOut", 0.33, "TimeOut"]
请注意,在此示例中每个进程有两个任务。原始任务("time checker")每 0.1 秒检查一次其他任务是否已完成计算。另一个任务(使用 @async
创建)正在计算一些东西,定期调用 yield()
将控制权交给时间检查器;如果它不调用 yield()
,则无法进行时间检查。