我如何 运行 在 Julia 中进行简单的并行数组赋值操作?

How can I run a simple parallel array assignment operation in Julia?

我必须多次求解微分方程组,迭代一个参数。为此,我 运行 对参数列表进行循环,并为每个参数存储解决方案(在时间值数组中评估)。所以我有一个二维数组,我在其中存储解决方案(每一行都是参数的值)。

现在,由于任何迭代都与另一个迭代无关,所以我想到并行执行此操作。

这是我的代码:

using DifferentialEquations
using SharedArrays
using DelimitedFiles
using Distributed

function tf(x,w)
    return x*sin(w*x)
end

function sys!(dv,v,w,t)
    dv[1] = w*v[1]
    dv[2] = tf(v[1],w)
end

times = LinRange(0.1,2,25)

params = LinRange(0.1,1.2,100)

sols = SharedArray{Float64,2}((length(times),length(params)))

@distributed for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

writedlm("output.txt",sols)

现在,当我 运行 这个循环没有前缀 @distributed 时,这个 运行 是完美的。

当我运行这段代码的时候,println语句不起作用,虽然文件"output.txt"被存储了,但里面全是零。

我运行以这种方式在命令行中运行这段代码

julia -p 4 trycode.jl

虽然文件 "output.txt" 已存储,但它没有显示任何输出,只工作了一分钟,什么也没做。就好像从未进入循环一样。

如果能就如何设置这个简单的并行循环提供帮助,我将不胜感激。

你能从线程化的 for 而不是 @distributed for 中受益吗?这有效(Julia 1.4):

using DifferentialEquations
using SharedArrays
using DelimitedFiles
using Distributed

function tf(x,w)
    return x*sin(w*x)
end

function sys!(dv,v,w,t)
    dv[1] = w*v[1]
    dv[2] = tf(v[1],w)
end

times = LinRange(0.1,2,25)

params = LinRange(0.1,1.2,100)

sols = SharedArray{Float64,2}((length(times),length(params)))

@Threads.threads for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

writedlm("output.txt",sols)

正如 Bill 所说,在 Julia 中有两种主要的并行处理方式:线程模型,它在 Julia 1.3 中引入并通过 Threads.@threads 宏实现共享内存并行,以及使用 Distributed.@distributed 宏,它在不同的 Julia 进程之间并行化。

Threads 绝对更接近 "automagic" 并行加速,代码重写最少或没有重写,通常是一个不错的选择,尽管必须注意确保无论操作是什么 运行ning 是线程安全的,因此请始终检查结果是否相同。

由于您的问题最初是关于 @distributed 并行性的,所以让我也回答一下。如果您使用 @distributed 并行性,那么考虑正在发生的事情的最简单的心理模型(我相信)是想象您 运行 在完全独立的 Julia REPL 中编写代码。

这是您的代码版本,适用于 @distributed 模型:

using Distributed
addprocs(2)

using SharedArrays
using DelimitedFiles

@everywhere begin 
    using DifferentialEquations

    tf(x,w) = x*sin(w*x)

    function sys!(dv,v,w,t)
        dv[1] = w*v[1]
        dv[2] = tf(v[1],w)
    end

    times = LinRange(0.1,2,25)
    params = LinRange(0.1,1.2,100)
end

sols = SharedArray{Float64,2}((length(times),length(params)))

@sync @distributed for i=1:length(params)
    println(i)
    init_val = [1.0,1.0]
    tspan = (0.0,2.0)
    prob = ODEProblem(sys!,init_val,tspan,params[i])
    sol = solve(prob)
    sols[:,i] .= sol(times)[2,:]
end

sols

发生了什么变化?

  • 我在脚本的开头添加了 addprocs(2)。如果您在启动 Julia 时使用 p -2(或您想要的任何数量的进程),那么这不是必需的,但我经常发现当它在直接上代码。请注意,这对于线程目前是不可能的,即您需要在启动 Julia 之前设置 JULIA_NUM_THREADS 环境变量,并且一旦启动并 运行ning 就无法更改线程数。

  • 然后我将代码的位移动到 @everywhere begin ... end 块中。这本质上是 运行s 同时在所有进程上包含在块中的操作。回到 运行 单独的 Julia 实例的心智模型,您必须查看 @distributed 循环中的内容,并确保所有函数和变量实际上都在所有进程上定义。所以例如为确保每个进程都知道 ODEProblem 是什么,您需要对所有进程执行 using DifferentialEquations

  • 最后,我在分布式循环中添加了@sync@distributed 的文档中引用了这一点。 运行 @distributed 带有 for 循环的宏为分布式执行生成一个异步绿色线程 (Task) 句柄并前进到下一行。由于您想等到执行实际完成,因此需要同步 @sync。您的原始代码的问题是,如果不等待绿色线程完成(同步),它将吞下错误并且 return 立即,这就是您的 sol 数组为空的原因。如果你 运行 你的原始代码,你可以看到这个,并且只添加 @sync - 然后你会得到一个 TaskFailedException: on worker 2 - UndefVarError: #sys! not defined 它告诉你你的工作进程不知道你的功能在主进程上定义。在实践中,你几乎总是想要 @sync 执行,除非你计划并行地 运行 许多这样的分布式循环。在分布式循环(循环的 @distributed (func) for i in 1:1000 形式)中使用聚合函数的地方也不需要 @sync 关键字

现在最好的解决方案是什么?答案是我不知道。 @threads 是一个很好的选择,可以在不重写代码的情况下快速并行化线程安全操作,并且仍在积极开发和改进中,因此将来可能会变得更好。分布式标准库中还有 pmap,它为您提供了额外的选项,但这个答案已经足够长了!以我个人的经验,没有什么可以替代 (1) 思考您的问题和 (2) 基准执行。您要考虑的事情是 运行 您的问题发生时间(包括总时间和您要分发的每个单独操作)和消息 passing/memory 访问要求。

好的一面是,虽然您可能需要花一些精力思考事情,但 Julia 有很多不错的选择,可以充分利用具有两个内核的破旧笔记本电脑(例如我从中输入的一个)到多节点超高性能集群(这使 Julia 成为 achieve petaflop performance 为数不多的编程语言之一 - 尽管公平地说这比我或 Bill 的回答更棘手:))