如何将参数列表传递给 Julia Distributed 中的工作人员
How to pass a list of parameters to workers in Julia Distributed
对于 Julia 1.5.3,我想将列表或参数传递给分布式工作人员。
我首先尝试了非分布式方式:
using Distributed
@everywhere begin
using SharedArrays
solve(a,b,c) = return (1,2,3)
d_rates = LinRange(0.01, 0.33, 5)
m_rates = LinRange(0.01, 0.25, 5)
population_size = 10^3
max_iterations_perloop = 10^3
nb_repeats = 2
nb_params = length(d_rates)*length(m_rates)*nb_repeats
para = enumerate(Base.product(d_rates, m_rates, population_size, max_iterations_perloop, 1:nb_repeats))
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
end
for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
效果很好。然后将最终循环更改为:
@sync @distributed for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
然后我得到一个错误(截断):
ERROR: LoadError: TaskFailedException:
MethodError: no method matching firstindex(::Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}})
Closest candidates are:
firstindex(::Cmd) at process.jl:638
firstindex(::Core.SimpleVector) at essentials.jl:599
firstindex(::Base64.Buffer) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Base64/src/buffer.jl:18
...
Stacktrace:
[1] (::Distributed.var"#159#161"{var"#271#272",Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}}})() at ./task.jl:332
Stacktrace:
[1] sync_end(::Channel{Any}) at ./task.jl:314
[2] top-level scope at task.jl:333
[3] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1088
[4] include_string(::Module, ::String, ::String) at ./loading.jl:1096
[5] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at ./essentials.jl:710
[6] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N) at ./essentials.jl:709
是否可以传递这样的列表,如果可以如何传递?
我假设您的所有工作人员都在一台服务器上,并且您实际上已经使用 addprocs
命令添加了一些工作人员。您的代码的第一个问题是您在所有工作人员上创建了 SharedArray
。 SharedArray
的语法如下所示:
help?> SharedArray
SharedArray{T}(dims::NTuple; init=false, pids=Int[])
SharedArray{T,N}(...)
Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host. (...)
这意味着您只从 master worker 创建了一次 SharedArray
并且您可以使用 pids
参数指定知道它的 worker(如果您没有指定 pids
所有工作进程都有访问权限)。
因此您的代码将如下所示:
using Distributed, SharedArrays
addprocs(4)
@everywhere using SharedArrays
@everywhere solve(a,b,c) = return (1,2,3)
#(...) # your setup code without @everywhere
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
@sync @distributed for (y , x) in collect(para)
results[y] = solve(x[1], x[2], x[3])
end
请注意,您将需要 collect
,因为 @distributed
宏需要知道 Vector
的大小,并且它不适用于迭代器。
对于 Julia 1.5.3,我想将列表或参数传递给分布式工作人员。
我首先尝试了非分布式方式:
using Distributed
@everywhere begin
using SharedArrays
solve(a,b,c) = return (1,2,3)
d_rates = LinRange(0.01, 0.33, 5)
m_rates = LinRange(0.01, 0.25, 5)
population_size = 10^3
max_iterations_perloop = 10^3
nb_repeats = 2
nb_params = length(d_rates)*length(m_rates)*nb_repeats
para = enumerate(Base.product(d_rates, m_rates, population_size, max_iterations_perloop, 1:nb_repeats))
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
end
for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
效果很好。然后将最终循环更改为:
@sync @distributed for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
然后我得到一个错误(截断):
ERROR: LoadError: TaskFailedException:
MethodError: no method matching firstindex(::Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}})
Closest candidates are:
firstindex(::Cmd) at process.jl:638
firstindex(::Core.SimpleVector) at essentials.jl:599
firstindex(::Base64.Buffer) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Base64/src/buffer.jl:18
...
Stacktrace:
[1] (::Distributed.var"#159#161"{var"#271#272",Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}}})() at ./task.jl:332
Stacktrace:
[1] sync_end(::Channel{Any}) at ./task.jl:314
[2] top-level scope at task.jl:333
[3] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1088
[4] include_string(::Module, ::String, ::String) at ./loading.jl:1096
[5] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at ./essentials.jl:710
[6] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N) at ./essentials.jl:709
是否可以传递这样的列表,如果可以如何传递?
我假设您的所有工作人员都在一台服务器上,并且您实际上已经使用 addprocs
命令添加了一些工作人员。您的代码的第一个问题是您在所有工作人员上创建了 SharedArray
。 SharedArray
的语法如下所示:
help?> SharedArray
SharedArray{T}(dims::NTuple; init=false, pids=Int[])
SharedArray{T,N}(...)
Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host. (...)
这意味着您只从 master worker 创建了一次 SharedArray
并且您可以使用 pids
参数指定知道它的 worker(如果您没有指定 pids
所有工作进程都有访问权限)。
因此您的代码将如下所示:
using Distributed, SharedArrays
addprocs(4)
@everywhere using SharedArrays
@everywhere solve(a,b,c) = return (1,2,3)
#(...) # your setup code without @everywhere
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
@sync @distributed for (y , x) in collect(para)
results[y] = solve(x[1], x[2], x[3])
end
请注意,您将需要 collect
,因为 @distributed
宏需要知道 Vector
的大小,并且它不适用于迭代器。