朱莉娅 pmap 性能
Julia pmap performance
我正在尝试将我的一些 R 代码移植到 Julia;
基本上我在 Julia 中重写了以下 R 代码:
library(parallel)
eps_1<-rnorm(1000000)
eps_2<-rnorm(1000000)
large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0)
matrix_to_compare = expand.grid(c(0,1),c(0,1))
indices<-seq(1,1000000,4)
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),]))
function_compare<-function(x){
which((rowSums(x==matrix_to_compare)==2) %in% TRUE)
}
> system.time(lapply(large_matrix,function_compare))
user system elapsed
38.812 0.024 38.828
> system.time(mclapply(large_matrix,function_compare,mc.cores=11))
user system elapsed
63.128 1.648 6.108
正如人们所注意到的那样,当我从一个内核升级到 11 个内核时,速度得到了显着提升。现在我正尝试在 Julia 中做同样的事情:
#Define cluster:
addprocs(11);
using Distributions;
@everywhere using Iterators;
d = Normal();
eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);
#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000)
#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];
#Define the function to apply:
@everywhere function function_split(x)
matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
matrix_to_compare = matrix_to_compare.>0;
find(sum(x.==matrix_to_compare,2).==2)
end
@time map(function_split,large_matrix )
@time pmap(function_split,large_matrix )
5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time)
18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time)
正如人们所注意到的那样,我没有通过 pmap 获得任何加速。也许有人可以提出替代方案。
我认为这里的一些问题是 @parallel
和 @pmap
并不总是能很好地处理与 worker 之间的数据移动。因此,它们往往在您执行的操作根本不需要太多数据移动的情况下工作得最好。我还怀疑可能可以做一些事情来提高它们的性能,但我不确定细节。
对于确实需要移动更多数据的情况,最好坚持直接调用 worker 上的函数的选项,然后使用这些函数访问这些 worker space 内存中的对象。我在下面举了一个例子,它使用多个 worker 来加速你的函数。它可能使用最简单的选项,即 @everywhere
,但 @spawn
、remotecall()
等也值得考虑,具体取决于您的情况。
addprocs(11);
using Distributions;
@everywhere using Iterators;
d = Normal();
eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);
#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000);
#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];
large_matrix = convert(Array{BitArray}, large_matrix);
function sendto(p::Int; args...)
for (nm, val) in args
@spawnat(p, eval(Main, Expr(:(=), nm, val)))
end
end
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
@everywhere function function_split(x::BitArray)
matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
matrix_to_compare = matrix_to_compare.>0;
find(sum(x.==matrix_to_compare,2).==2)
end
function distribute_data(X::Array, WorkerName::Symbol)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
@spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx])))
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(large_matrix, :large_matrix)
function parallel_split()
@everywhere begin
if myid() != 1
result = map(function_split,large_matrix );
end
end
results = cell(nworkers())
for (idx, pid) in enumerate(workers())
results[idx] = getfrom(pid, :result)
end
vcat(results...)
end
## results given after running once to compile
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time)
@time b = parallel_split(); ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time)
julia> a == b
true
注意:即使这样,多进程的加速也不是完美的。但是,这是可以预料的,因为作为您的函数的结果,仍有适量的数据要返回,并且必须移动这些数据,这需要时间。
P.S。有关我在此处使用的 sendto
和 getfrom
函数的更多信息,请参阅此 post (Julia: How to copy data to another processor in Julia) or this package (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl)。
我正在尝试将我的一些 R 代码移植到 Julia; 基本上我在 Julia 中重写了以下 R 代码:
library(parallel)
eps_1<-rnorm(1000000)
eps_2<-rnorm(1000000)
large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0)
matrix_to_compare = expand.grid(c(0,1),c(0,1))
indices<-seq(1,1000000,4)
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),]))
function_compare<-function(x){
which((rowSums(x==matrix_to_compare)==2) %in% TRUE)
}
> system.time(lapply(large_matrix,function_compare))
user system elapsed
38.812 0.024 38.828
> system.time(mclapply(large_matrix,function_compare,mc.cores=11))
user system elapsed
63.128 1.648 6.108
正如人们所注意到的那样,当我从一个内核升级到 11 个内核时,速度得到了显着提升。现在我正尝试在 Julia 中做同样的事情:
#Define cluster:
addprocs(11);
using Distributions;
@everywhere using Iterators;
d = Normal();
eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);
#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000)
#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];
#Define the function to apply:
@everywhere function function_split(x)
matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
matrix_to_compare = matrix_to_compare.>0;
find(sum(x.==matrix_to_compare,2).==2)
end
@time map(function_split,large_matrix )
@time pmap(function_split,large_matrix )
5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time)
18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time)
正如人们所注意到的那样,我没有通过 pmap 获得任何加速。也许有人可以提出替代方案。
我认为这里的一些问题是 @parallel
和 @pmap
并不总是能很好地处理与 worker 之间的数据移动。因此,它们往往在您执行的操作根本不需要太多数据移动的情况下工作得最好。我还怀疑可能可以做一些事情来提高它们的性能,但我不确定细节。
对于确实需要移动更多数据的情况,最好坚持直接调用 worker 上的函数的选项,然后使用这些函数访问这些 worker space 内存中的对象。我在下面举了一个例子,它使用多个 worker 来加速你的函数。它可能使用最简单的选项,即 @everywhere
,但 @spawn
、remotecall()
等也值得考虑,具体取决于您的情况。
addprocs(11);
using Distributions;
@everywhere using Iterators;
d = Normal();
eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);
#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000);
#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];
large_matrix = convert(Array{BitArray}, large_matrix);
function sendto(p::Int; args...)
for (nm, val) in args
@spawnat(p, eval(Main, Expr(:(=), nm, val)))
end
end
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
@everywhere function function_split(x::BitArray)
matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
matrix_to_compare = matrix_to_compare.>0;
find(sum(x.==matrix_to_compare,2).==2)
end
function distribute_data(X::Array, WorkerName::Symbol)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
@spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx])))
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(large_matrix, :large_matrix)
function parallel_split()
@everywhere begin
if myid() != 1
result = map(function_split,large_matrix );
end
end
results = cell(nworkers())
for (idx, pid) in enumerate(workers())
results[idx] = getfrom(pid, :result)
end
vcat(results...)
end
## results given after running once to compile
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time)
@time b = parallel_split(); ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time)
julia> a == b
true
注意:即使这样,多进程的加速也不是完美的。但是,这是可以预料的,因为作为您的函数的结果,仍有适量的数据要返回,并且必须移动这些数据,这需要时间。
P.S。有关我在此处使用的 sendto
和 getfrom
函数的更多信息,请参阅此 post (Julia: How to copy data to another processor in Julia) or this package (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl)。