如何将参数列表传递给 Julia Distributed 中的工作人员

How to pass a list of parameters to workers in Julia Distributed

对于 Julia 1.5.3,我想将列表或参数传递给分布式工作人员。

我首先尝试了非分布式方式:

using Distributed

@everywhere begin 
    using SharedArrays

    solve(a,b,c) = return (1,2,3)

    d_rates = LinRange(0.01, 0.33, 5)
    m_rates = LinRange(0.01, 0.25, 5)
    population_size = 10^3
    max_iterations_perloop = 10^3
    nb_repeats = 2

    nb_params = length(d_rates)*length(m_rates)*nb_repeats
    para = enumerate(Base.product(d_rates, m_rates, population_size, max_iterations_perloop, 1:nb_repeats))

    results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
end

for (y , x) in para
    results[y] = solve(x[1], x[2], x[3])
end

效果很好。然后将最终循环更改为:

@sync @distributed for (y , x) in para
    results[y] = solve(x[1], x[2], x[3])
end

然后我得到一个错误(截断):

ERROR: LoadError: TaskFailedException:
MethodError: no method matching firstindex(::Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}})
Closest candidates are:
  firstindex(::Cmd) at process.jl:638
  firstindex(::Core.SimpleVector) at essentials.jl:599
  firstindex(::Base64.Buffer) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Base64/src/buffer.jl:18
  ...
Stacktrace:
 [1] (::Distributed.var"#159#161"{var"#271#272",Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}}})() at ./task.jl:332
Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] top-level scope at task.jl:333
 [3] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1088
 [4] include_string(::Module, ::String, ::String) at ./loading.jl:1096
 [5] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at ./essentials.jl:710
 [6] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N) at ./essentials.jl:709

是否可以传递这样的列表,如果可以如何传递?

我假设您的所有工作人员都在一台服务器上,并且您实际上已经使用 addprocs 命令添加了一些工作人员。您的代码的第一个问题是您在所有工作人员上创建了 SharedArraySharedArray 的语法如下所示:

help?> SharedArray

  SharedArray{T}(dims::NTuple; init=false, pids=Int[])
  SharedArray{T,N}(...)

  Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host. (...)

这意味着您只从 master worker 创建了一次 SharedArray 并且您可以使用 pids 参数指定知道它的 worker(如果您没有指定 pids 所有工作进程都有访问权限)。

因此您的代码将如下所示:

using Distributed, SharedArrays
addprocs(4)
@everywhere using SharedArrays

@everywhere solve(a,b,c) = return (1,2,3)

#(...) # your setup code without @everywhere

results = SharedArray{Tuple{Int, Int, Int}}(nb_params)

@sync @distributed for (y , x) in collect(para)
    results[y] = solve(x[1], x[2], x[3])
end

请注意,您将需要 collect,因为 @distributed 宏需要知道 Vector 的大小,并且它不适用于迭代器。