分布式 Julia:并行映射 (pmap),每个映射任务都有超时/时间限制以完成

Distributed Julia: parallel map (pmap) with a timeout / time limit for each map task to complete

我的项目涉及使用 Julia 的 Distributed's pmap 函数并行计算地图。

映射给定元素可能需要几秒钟,也可能需要很长时间。我想要完成单个地图 task/computation 的超时或时间限制。

如果地图任务及时完成,很好,return 计算结果。如果任务没有在时间限制内完成,则在达到时间限制时停止计算,并且 return 一些值或消息指示超时发生。

下面是一个最小的例子。首先是导入模块,然后启动工作进程:

num_procs = 1
using Distributed
if num_procs > 1
    # The main process (no calling addprocs) can be used for `pmap`:
    addprocs(num_procs-1)
end

接下来,为所有工作进程定义映射任务。映射任务应在 1 秒后超时:

@everywhere import Random
@everywhere begin
    """
    Compute stuff for `wait_time` seconds, and return `wait_time`.
    If `timeout` seconds elapses, stop computation and return something else.
    """
    function waitForTimeUnlessTimeout(wait_time, timeout=1)

        # < Insert some sort of timeout code? >

        # This block of code simulates a long computation.
        # (pretend the computation time is unknown)
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
        end

        # computation completed before time limit. Return wait_time.
        round(wait_time, digits=2)
    end
end

执行并行映射(pmap)的函数定义在主进程上。每个地图任务随机最多需要 2 秒才能完成,但应该会在 1 秒后超时。

function myParallelMapping(num_tasks = 20, max_runtime=2)    
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # return the parallel computation of the mapping tasks
    pmap((runtime)->waitForTimeUnlessTimeout(runtime), runtimes)
end

print(myParallelMapping())

这个限时并行地图应该如何实现?

你可以把这样的东西放在你的 pmap 体内

pmap(runtimes) do runtime
  t0 = time()
  task = @async waitForTimeUnlessTimeout(runtime)
  while !istaskdone(task) && time()-t0 < time_limit
      sleep(1)
  end
  istaskdone(task) && (return fetch(task))
  error("time over")
end

另请注意 (runtime)->waitForTimeUnlessTimeout(runtime)waitForTimeUnlessTimeout 相同。

根据@Fredrik Bagge 的非常有用的回答,这里是完整的工作示例实现以及一些额外的解释。

num_procs = 8
using Distributed
if num_procs > 1
    addprocs(num_procs-1)
end

@everywhere import Random
@everywhere begin
    function waitForTime(wait_time)
         # This code block simulates a long computation.
         # Pretend the computation time is unknown.
        t0 = time()
        x = 0
        while time()-t0 < wait_time
            x += Random.rand() - 0.5
            yield() # CRITICAL to release computation to check if task is done.
            # If you comment out #yield(), you will see timeout doesn't work!
        end

        return round(wait_time, digits=2)
    end
end

function myParallelMapping(num_tasks = 16, max_runtime=2, time_limit=1)
    # random task runtimes between 0 and max_runtime
    runtimes = Random.rand(num_tasks) * max_runtime

    # parallel compute the mapping tasks. See "do block" in 
    # the Julia documentation, it's just syntactic sugar.
    return pmap(runtimes) do runtime
                  t0 = time()
                  task = @async waitForTime(runtime)
                  while !istaskdone(task) && time()-t0 < time_limit
                      # releases computation to waitForTime
                      sleep(0.1)
                      # nothing past here will run until waitForTime calls yield()
                      # *and* 0.1 seconds have passed.
                  end
                  # equal to if istaskdone(task); return fetch(task); end
                  istaskdone(task) && (return fetch(task))
                  return "TimeOut"
                  # `return error("TimeOut")` halts pmap unless pmap is
                  #  given an error handler argument. See pmap documentation.
              end
end

输出为

julia> print(myParallelMapping())

       Any["TimeOut", "TimeOut", 0.33, 0.35, 0.56, 0.41, 0.08, 0.14, 0.72, 
           "TimeOut", "TimeOut", "TimeOut", 0.52, "TimeOut", 0.33, "TimeOut"]

请注意,在此示例中每个进程有两个任务。原始任务("time checker")每 0.1 秒检查一次其他任务是否已完成计算。另一个任务(使用 @async 创建)正在计算一些东西,定期调用 yield() 将控制权交给时间检查器;如果它不调用 yield(),则无法进行时间检查。