Julia - 映射功能状态

Julia - map function status

在 Julia 中是否有一种舒适的方式来获得 map/pmap 的 'status'?

如果我有一个数组 a = [1:10] 我想:

1:枚举数组,使用if条件添加打印命令

((index,value) -> 5*value ......, enumerate(a)

在“......”所在的位置,有一种方法可以将匿名函数 'chain' 变成类似

的东西
"5*value and then print index/length(a) if index%200 == 0"

2:知道是否已经存在这个选项,因为 pmap 适用于并行任务,通常用于大型进程,所以它已经存在有意义吗?

此外,有没有办法让匿名函数一个接一个地做两件 'separate' 事情?

例子

如果我有

a = [1:1000]
function f(n) #something that takes a huge ammount of time
end 

然后我执行

map(x -> f(x), a)

REPL 会打印出状态

"0.1 completed"
.
.
.
"0.9 completed"

解决方案

ProgressMeter 包默认不包含这个有点奇怪

Pkg.add("ProgressMeter")
Pkg.clone("https://github.com/slundberg/PmapProgressMeter.jl")
@everywhere using ProgressMeter
@everywhere using PmapProgressmeter
pmap(x->begin sleep(1); x end, Progress(10), 1:10)

PmapProgressMeter 在 github

为什么不将它包含在您的函数定义中以打印此信息?例如。

function f(n) #something that takes a huge amount of time
    ...
    do stuff.
    ...
    println("completed $n")
end 

并且,如果需要,您可以向您的函数添加一个额外的参数,该参数将在您的示例中包含 0.1,...,0.9(我不太确定这些是什么,但不管它们是什么,它们都可以只是你函数中的一个参数。

如果您查看下面关于 pmap@parallel 的示例,您会发现一个函数示例,该示例提供给 pmap 并打印输出。

另请参阅 and this SO post,了解将多个参数提供给与 mappmap 一起使用的函数的信息。



Julia documentation 建议

pmap() is designed for the case where each function call does a large amount of work. In contrast, @parallel for can handle situations where each iteration is tiny, perhaps merely summing two numbers.

这有几个原因。首先,pmap 会产生更大的启动成本来启动工人的工作。因此,如果工作量很小,这些启动成本可能会变得低效。然而,相反地,pmap 在工人之间分配工作 "smarter"。特别是,它建立了一个作业队列,并在每个工人可用时向每个工人发送一个新作业。 @parallel 相比之下,在调用时将所有工作分配给工人。因此,如果某些工人的工作时间比其他工人更长,那么您最终可能会遇到这样一种情况,即您的大多数工人已经完成并处于闲置状态,而少数人则在过多的时间内保持活跃,以完成他们的工作。然而,这种情况不太可能发生在非常小和简单的工作中。

下面说明了这一点:假设我们有两个工人,一个慢,另一个快两倍。理想情况下,我们希望给快手的工作量是慢手的两倍。 (或者,我们可以有快活和慢活,但原理是完全一样的)。 pmap 将完成此操作,但 @parallel 不会。

对于每个测试,我们初始化以下内容:

addprocs(2)

@everywhere begin
    function parallel_func(idx)
        workernum = myid() - 1 
        sleep(workernum)
        println("job $idx")
    end
end

现在,对于 @parallel 测试,我们 运行 以下内容:

@parallel for idx = 1:12
    parallel_func(idx)
end

并取回打印输出:

julia>     From worker 2:    job 1
    From worker 3:    job 7
    From worker 2:    job 2
    From worker 2:    job 3
    From worker 3:    job 8
    From worker 2:    job 4
    From worker 2:    job 5
    From worker 3:    job 9
    From worker 2:    job 6
    From worker 3:    job 10
    From worker 3:    job 11
    From worker 3:    job 12

快甜了。工人 "shared" 工作均匀。注意每个worker都完成了6个job,虽然worker 2的速度是worker 3的两倍,可能感人,但效率低

对于 pmap 测试,我 运行 以下内容:

pmap(parallel_func, 1:12)

并得到输出:

From worker 2:    job 1
From worker 3:    job 2
From worker 2:    job 3
From worker 2:    job 5
From worker 3:    job 4
From worker 2:    job 6
From worker 2:    job 8
From worker 3:    job 7
From worker 2:    job 9
From worker 2:    job 11
From worker 3:    job 10
From worker 2:    job 12

现在,请注意,工人 2 执行了 8 个工作,工人 3 执行了 4 个。这与他们的速度以及我们想要的最佳效率成正比。 pmap 是个硬任务高手——量力而行。

您可以通过实现 'closure',按照您的要求使用 'state' 创建一个函数。例如

julia> F = function ()
  ClosedVar = 5
  return (x) -> x + ClosedVar
end;
julia> f = F();
julia> f(5)
10
julia> ClosedVar = 1000;
julia> f(5)
10

如您所见,函数f维护'state'(即内部变量ClosedVarF的本地变量,f维护访问尽管 F 本身在技术上早已超出范围。

注意与正常的非封闭函数定义的区别:

julia> MyVar = 5;
julia> g(x) = 5 + MyVar;
julia> g(5)
10
julia> MyVar = 1000;
julia> g(5)
1005

您可以创建自己的闭包,在 运行 时查询/更新其闭包变量,并根据其状态每次执行不同的操作。

话虽如此,从您的示例来看,您似乎期望 pmap 将按顺序 运行。这是无法保证的。因此,不要依赖 'which index is this thread processing' 方法每 200 次操作打印一次。您可能必须在闭包中维护一个封闭的 'counter' 变量,并依赖它。这大概也意味着您的闭包需要可访问 @everywhere

ProgressMeter.jl 有一个 pmap 分支。

您还可以让 Juno 进度条在 pmap 中工作。这是一种使用未记录的东西,所以如果你想要更多信息,你应该在 Gitter 中询问,因为发布这个 public 只会让人们感到困惑 if/when 它改变了。

另一种可能性是使用 SharedArray 作为工作人员共享的计数器。例如

addprocs(2)

Counter = convert(SharedArray, zeros(Int64, nworkers()))

## Make sure each worker has the SharedArray declared on it, so that it need not be fed as an explicit argument
function sendto(p::Int; args...)
  for (nm, val) in args
    @spawnat(p, eval(Main, Expr(:(=), nm, val)))
  end
end

for (idx, pid) in enumerate(workers())
  sendto(pid, Counter = Counter)
end
@everywhere global Counter


@everywhere begin
    function do_stuff(n)
        sleep(rand())
        Counter[(myid()-1)] += 1
        TotalJobs = sum(Counter)
        println("Jobs Completed = $TotalJobs")
    end
end

pmap(do_stuff, 1:10)